Skip to content

Commit

Permalink
ending agent work without desposing
Browse files Browse the repository at this point in the history
  • Loading branch information
MiloszKrajewski committed May 20, 2023
1 parent 1dd02d7 commit a9f309d
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 38 deletions.
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
## 0.0.11 (2023/05/18)
## 0.0.12 (2023/05/18)
* ADDED: maximum concurrency setting
* ADDED: concurrency policies for AliveKeeper
* FIXED: safe sync policy was deadlocking in some cases
* ADDED: ability for agents to end their work without being disposed

## 0.0.8 (2023/05/06)
* CHANGED: build process
Expand Down
41 changes: 22 additions & 19 deletions src/K4os.Async.Toys/Agent.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using K4os.Async.Toys.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
Expand All @@ -13,11 +11,11 @@ public interface IAgent: IDisposable
{
/// <summary>Starts agent. Does nothing if agent is already started.</summary>
void Start();

/// <summary>Awaitable indicator that agent finished working.</summary>
public Task Done { get; }
}

/// <summary>Agent interface with inbox queue.</summary>
/// <typeparam name="T">Type of inbox items.</typeparam>
public interface IAgent<in T>: IAgent
Expand All @@ -32,7 +30,7 @@ public interface IAgentContext
{
/// <summary>Log.</summary>
ILogger Log { get; }

/// <summary>Cancellation token.</summary>
CancellationToken Token { get; }
}
Expand All @@ -55,7 +53,7 @@ public abstract class AbstractAgent: IAgent, IAgentContext
SingleReader = true,
AllowSynchronousContinuations = false,
};

/// <summary>Agent's log.</summary>
protected ILogger Log { get; }

Expand All @@ -77,7 +75,7 @@ protected AbstractAgent(ILogger? logger)

/// <inheritdoc />
public void Start() => _ready.TrySetResult(null);

private Task Stop()
{
_cancel.Cancel();
Expand All @@ -98,7 +96,11 @@ private async Task Loop()
{
try
{
await Execute().ConfigureAwait(false);
var next = await Execute().ConfigureAwait(false);
if (next) continue;

Log.LogDebug("agent finished work successfully");
return;
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
Expand All @@ -112,8 +114,9 @@ private async Task Loop()
}

/// <summary>Actual implementation of single iteration.</summary>
/// <returns>Task indicating iteration is finished.</returns>
protected abstract Task Execute();
/// <returns>Task indicating iteration is finished, <c>true</c> if next iteration should be
/// scheduled, <c>false</c> if job is done and no more iterations are needed.</returns>
protected abstract Task<bool> Execute();

/// <summary>Stops agent.</summary>
/// <param name="disposing"><c>true</c> if agents is disposed by user.</param>
Expand Down Expand Up @@ -141,34 +144,34 @@ public void Dispose()
/// <summary>Agent base class.</summary>
public partial class Agent: AbstractAgent
{
private readonly Func<IAgentContext, Task> _action;
private readonly Func<IAgentContext, Task<bool>> _action;

/// <summary>Creates new agent.</summary>
/// <param name="logger">Logger.</param>
/// <param name="action">Action to be executed in the loop.</param>
public Agent(Func<IAgentContext, Task> action, ILogger? logger = null): base(logger) =>
public Agent(Func<IAgentContext, Task<bool>> action, ILogger? logger = null): base(logger) =>
_action = action.Required(nameof(action));

/// <inheritdoc />
protected override Task Execute() => _action(this);
protected override Task<bool> Execute() => _action(this);
}

/// <summary>Agent with a queue base class.</summary>
/// <typeparam name="T">Type of items.</typeparam>
public class Agent<T>: AbstractAgent, IAgent<T>, IAgentContext<T>
{
private readonly Channel<T> _queue = Channel.CreateUnbounded<T>(ChannelOptions);
private readonly Func<IAgentContext<T>, Task> _action;
private readonly Func<IAgentContext<T>, Task<bool>> _action;

/// <summary>Creates new agent and starts it.</summary>
/// <param name="logger">Log.</param>
/// <param name="action">Agent's action.</param>
public Agent(Func<IAgentContext<T>, Task> action, ILogger? logger = null): base(logger) =>
public Agent(Func<IAgentContext<T>, Task<bool>> action, ILogger? logger = null): base(logger) =>
_action = action.Required(nameof(action));

/// <inheritdoc />
protected override Task Execute() => _action(this);

protected override Task<bool> Execute() => _action(this);
/// <summary>Enqueues item to be processed by agent.</summary>
/// <param name="item">Item.</param>
/// <exception cref="InvalidOperationException">Thrown when queue is full.</exception>
Expand All @@ -182,6 +185,6 @@ public void Enqueue(T item)

private static InvalidOperationException QueueIsFull() =>
new("Internal queue is full");

Channel<T> IAgentContext<T>.Queue => _queue;
}
}
48 changes: 46 additions & 2 deletions src/K4os.Async.Toys/Agent.static.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace K4os.Async.Toys;
Expand All @@ -11,6 +10,13 @@ public partial class Agent
/// <param name="logger">Logger to be used (can be <c>null</c>)</param>
/// <returns>New agent.</returns>
public static Agent Create(Func<IAgentContext, Task> action, ILogger? logger = null) =>
new(Endless(action), logger);

/// <summary>Creates and starts new agent.</summary>
/// <param name="action">Action to be executed (continuously) by agent.</param>
/// <param name="logger">Logger to be used (can be <c>null</c>)</param>
/// <returns>New agent.</returns>
public static Agent Create(Func<IAgentContext, Task<bool>> action, ILogger? logger = null) =>
new(action, logger);

/// <summary>Creates and starts new agent with inbox queue.</summary>
Expand All @@ -19,8 +25,17 @@ public static Agent Create(Func<IAgentContext, Task> action, ILogger? logger = n
/// <returns>New agent.</returns>
public static Agent<T> Create<T>(
Func<IAgentContext<T>, Task> action, ILogger? logger = null) =>
new(Endless(action), logger);

/// <summary>Creates and starts new agent with inbox queue.</summary>
/// <param name="action">Action to be executed (continuously) by agent.</param>
/// <param name="logger">Logger to be used (can be <c>null</c>)</param>
/// <returns>New agent.</returns>
public static Agent<T> Create<T>(
Func<IAgentContext<T>, Task<bool>> action, ILogger? logger = null) =>
new(action, logger);


/// <summary>Creates and starts new agent.</summary>
/// <param name="action">Action to be executed (continuously) by agent.</param>
/// <param name="logger">Logger to be used (can be <c>null</c>)</param>
Expand All @@ -31,6 +46,17 @@ public static Agent Launch(Func<IAgentContext, Task> action, ILogger? logger = n
agent.Start();
return agent;
}

/// <summary>Creates and starts new agent.</summary>
/// <param name="action">Action to be executed (continuously) by agent.</param>
/// <param name="logger">Logger to be used (can be <c>null</c>)</param>
/// <returns>New agent.</returns>
public static Agent Launch(Func<IAgentContext, Task<bool>> action, ILogger? logger = null)
{
var agent = Create(action, logger);
agent.Start();
return agent;
}

/// <summary>Creates and starts new agent with inbox queue.</summary>
/// <param name="action">Action to be executed (continuously) by agent.</param>
Expand All @@ -43,4 +69,22 @@ public static Agent<T> Launch<T>(
agent.Start();
return agent;
}
}

/// <summary>Creates and starts new agent with inbox queue.</summary>
/// <param name="action">Action to be executed (continuously) by agent.</param>
/// <param name="logger">Logger to be used (can be <c>null</c>)</param>
/// <returns>New agent.</returns>
public static Agent<T> Launch<T>(
Func<IAgentContext<T>, Task<bool>> action, ILogger? logger = null)
{
var agent = Create(action, logger);
agent.Start();
return agent;
}

private static Func<T, Task<bool>> Endless<T>(Func<T, Task> action) =>
async ctx => {
await action(ctx).ConfigureAwait(false);
return true;
};
}
33 changes: 17 additions & 16 deletions src/K4os.Async.Toys/K4os.Async.Toys.csproj
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
<?xml version="1.0" encoding="utf-8"?>

<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>net462;netstandard2.0;netstandard2.1;net5.0;net6.0</TargetFrameworks>
<IsPackable>true</IsPackable>
</PropertyGroup>
<ItemGroup>
<InternalsVisibleTo Include="K4os.Async.Toys.Tests"/>
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)'=='net5.0' ">
<PackageReference Include="System.Threading.Channels" Version="6.0.0"/>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.3"/>
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)'!='net5.0' ">
<PackageReference Include="System.Threading.Channels" Version="7.0.0"/>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="7.0.0"/>
</ItemGroup>
<Import Project="$(PublicAssemblyProps)"/>
<PropertyGroup>
<TargetFrameworks>net462;netstandard2.0;netstandard2.1;net5.0;net6.0</TargetFrameworks>
<IsPackable>true</IsPackable>
</PropertyGroup>
<ItemGroup>
<InternalsVisibleTo Include="K4os.Async.Toys.Tests" />
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)'=='net5.0' ">
<PackageReference Include="System.Threading.Channels" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.3" />
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)'!='net5.0' ">
<PackageReference Include="System.Threading.Channels" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="7.0.0" />
</ItemGroup>
<Import Project="$(PublicAssemblyProps)" />
</Project>

0 comments on commit a9f309d

Please sign in to comment.