Skip to content

Commit

Permalink
Merge pull request #3 from MiloszKrajewski/multi-subscriber
Browse files Browse the repository at this point in the history
Multiple pollers
  • Loading branch information
MiloszKrajewski authored Nov 17, 2023
2 parents e637426 + 0ea1e3c commit 0ecf563
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 60 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 0.0.18 (2023/11/17)
* ADDED: Configurable PollerCount in BatchBuilder

## 0.0.17 (2023/11/10)
* FIXED: double dispose on BatchBuilder was throwing an exception

Expand Down
9 changes: 8 additions & 1 deletion src/K4os.Async.Toys/Agent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ public interface IAgentContext

/// <summary>Cancellation token.</summary>
CancellationToken Token { get; }

/// <summary>Agent instance.</summary>
IAgent Agent { get; }
}

/// <summary>Agent's context from inside handler.</summary>
Expand Down Expand Up @@ -77,7 +80,8 @@ protected AbstractAgent(ILogger? logger, CancellationToken token = default)
/// <inheritdoc />
public void Start() => _ready.TrySetResult(null);

private Task Stop()
/// <inheritdoc />
public Task Stop()
{
_cancel.Cancel();
_ready.TrySetCanceled(_cancel.Token);
Expand Down Expand Up @@ -140,6 +144,9 @@ public void Dispose()

/// <summary>Cancellation token.</summary>
CancellationToken IAgentContext.Token => _cancel.Token;

/// <inheritdoc />
public IAgent Agent => this;
}

/// <summary>Agent base class.</summary>
Expand Down
12 changes: 6 additions & 6 deletions src/K4os.Async.Toys/AliveKeeper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -247,26 +247,26 @@ protected virtual string Display(T key) =>
private async Task TouchOneLoop(T item, InFlight? inFlight, CancellationToken token)
{
if (inFlight is null) return;

using var cts = CancellationTokenSource.CreateLinkedTokenSource(
token, inFlight.Token);

try
{
var combinedToken = CancellationTokenSource.CreateLinkedTokenSource(
token, inFlight.Token).Token;

var interval = _settings.TouchInterval;
var retry = _settings.RetryInterval;

var failed = 0;

while (!combinedToken.IsCancellationRequested)
while (!cts.Token.IsCancellationRequested)
{
await Delay(failed > 0 ? retry : interval, combinedToken);
await Delay(failed > 0 ? retry : interval, cts.Token).ConfigureAwait(false);

if (!IsActive(item)) return;

try
{
await TouchOne(item);
await TouchOne(item).ConfigureAwait(false);
failed = 0;
}
catch (Exception e)
Expand Down
157 changes: 109 additions & 48 deletions src/K4os.Async.Toys/BatchSubscriber.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using K4os.Async.Toys.Internal;
using System.Threading.Channels;
using K4os.Async.Toys.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

Expand All @@ -12,7 +13,7 @@ public abstract class BatchSubscriber
{
/// <summary>Minimum interval between touch operations.</summary>
protected static readonly TimeSpan MinimumTouchInterval = TimeSpan.FromMilliseconds(10);

/// <summary>Minimum interval between retry attempts. This is to prevent retry storms.</summary>
protected static readonly TimeSpan MinimumRetryInterval = TimeSpan.FromMilliseconds(10);
}
Expand All @@ -32,14 +33,15 @@ public class BatchSubscriber<TMessage, TReceipt>: BatchSubscriber, IDisposable

private readonly IBatchPoller<TMessage, TReceipt> _poller;
private readonly Func<TMessage, CancellationToken, Task> _handler;
private readonly SemaphoreSlim _semaphore;

private readonly CancellationTokenSource _cancel;

private readonly IAgent _agent;
private readonly IAliveKeeper<TReceipt> _keeper;
private readonly CancellationTokenSource _cancel;

private readonly bool _asyncDelete;
private readonly IAgent _supervisor;
private readonly Channel<Burrito> _burritos;
private readonly IBatchSubscriberSettings _settings;

private readonly SemaphoreSlim _runnerGate;
private readonly SemaphoreSlim _pollerGate;

private record struct Burrito(TMessage Message, TReceipt Receipt);

Expand All @@ -57,13 +59,20 @@ public BatchSubscriber(
ILogger? logger = null)
{
Log = logger ?? NullLogger.Instance;

settings = Validate(settings ?? new BatchSubscriberSettings());
_asyncDelete = settings.AsynchronousDeletes;

_settings = settings = Validate(settings ?? new BatchSubscriberSettings());
_poller = poller;
_handler = handler;
_cancel = CancellationTokenSource.CreateLinkedTokenSource(token);
_semaphore = new SemaphoreSlim(settings.HandlerCount);

_burritos = Channel.CreateBounded<Burrito>(
new BoundedChannelOptions(settings.InternalQueueSize) {
SingleReader = false,
SingleWriter = false,
FullMode = BoundedChannelFullMode.Wait,
AllowSynchronousContinuations = false,
});

_keeper = AliveKeeper.Create<TReceipt>(
TouchMany,
DeleteMany,
Expand All @@ -81,7 +90,11 @@ public BatchSubscriber(
: AliveKeeperSyncPolicy.Unrestricted,
},
logger);
_agent = Agent.Create(Loop, logger, _cancel.Token);

_pollerGate = new SemaphoreSlim(settings.PollerCount);
_runnerGate = new SemaphoreSlim(settings.HandlerCount);

_supervisor = Agent.Create(Supervisor, logger, _cancel.Token);
}

private static IBatchSubscriberSettings Validate(IBatchSubscriberSettings settings) =>
Expand All @@ -95,10 +108,13 @@ private static IBatchSubscriberSettings Validate(IBatchSubscriberSettings settin
TouchInterval = settings.TouchInterval.NotLessThan(MinimumTouchInterval),
TouchBatchDelay = settings.TouchBatchDelay.NotLessThan(TimeSpan.Zero),
AlternateBatches = settings.AlternateBatches,
AsynchronousDeletes = settings.AsynchronousDeletes,
InternalQueueSize = settings.InternalQueueSize.NotLessThan(1),
PollerCount = settings.PollerCount.NotLessThan(1),
};

/// <summary>Starts polling loop.</summary>
public void Start() { _agent.Start(); }
public void Start() { _supervisor.Start(); }

private string KeyOf(TReceipt message) =>
_poller.IdentityOf(message);
Expand All @@ -108,74 +124,119 @@ private Task<TReceipt[]> DeleteMany(TReceipt[] receipts) =>

private Task<TReceipt[]> TouchMany(TReceipt[] receipts) =>
_poller.Touch(receipts, _cancel.Token);

private static async Task GatedFork(
SemaphoreSlim gate, Func<CancellationToken, Task> func, CancellationToken token)
{
await gate.WaitAsync(token);
Task
.Run(() => func(token), token)
.ContinueWith(_ => gate.Release(), CancellationToken.None)
.Forget();
}

private static Task ForkOrWait<T>(
bool wait,
T state, Func<T, CancellationToken, Task> func, CancellationToken token)
private static Task ForkOrWait(
bool wait, Func<CancellationToken, Task> func, CancellationToken token)
{
if (wait) return func(state, token);
if (wait) return func(token);

Fork(state, func, token);
Task.Run(() => func(token), token).Forget();
return Task.CompletedTask;
}

private static void Fork<T>(
T state, Func<T, CancellationToken, Task?> func, CancellationToken token)

private async Task Supervisor(IAgentContext context)
{
Task.Run(() => func(state, token), token).Forget();
var token = context.Token;
var interval = TimeSpan.FromSeconds(1);

var poller = Agent.Create(Poller, Log, token);
var runner = Agent.Create(Runner, Log, token);

runner.Start();
poller.Start();

try
{
while (!token.IsCancellationRequested)
{
// we will need to periodically review number of pollers/runners later
// for now this is just idle loop
await Task.Delay(interval, token);
}
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
// ignore
}
finally
{
await poller.Done;
_burritos.Writer.Complete();
await runner.Done;
}
}

private async Task Loop(IAgentContext context)
private async Task<bool> Runner(IAgentContext context)
{
var token = context.Token;

while (!token.IsCancellationRequested)
{
var messages = await _poller.Receive(token);
if (messages.Length == 0) continue;
var burrito = await _burritos.Reader.ReadAsync(token);
await GatedFork(_runnerGate, ct => HandleOne(burrito, ct), token);
}

var burritos = messages
.Select(m => new Burrito(m, _poller.ReceiptFor(m)))
.ToArray();
return false;
}

foreach (var b in burritos)
{
Register(b);
}
private async Task<bool> Poller(IAgentContext context)
{
var token = context.Token;

foreach (var b in burritos)
{
await _semaphore.WaitAsync(token);
Fork(b, Handle, token);
}
while (!token.IsCancellationRequested)
{
await GatedFork(_pollerGate, ReceiveMany, token);
}
}

private void Register(Burrito burrito) =>
_keeper.Register(burrito.Receipt, _cancel.Token);
return false;
}

private Burrito Register(TMessage message)
{
var receipt = _poller.ReceiptFor(message);
_keeper.Register(receipt, _cancel.Token);
return new Burrito(message, receipt);
}

private void Forget(Burrito burrito) =>
_keeper.Forget(burrito.Receipt);

private Task Complete(Burrito burrito, CancellationToken token) =>
_keeper.Delete(burrito.Receipt, token);

private async Task Handle(Burrito burrito, CancellationToken token)
private async Task ReceiveMany(CancellationToken token)
{
var messages = await _poller.Receive(token);
var burritos = messages.Select(Register).ToArray();

foreach (var burrito in burritos)
await _burritos.Writer.WriteAsync(burrito, token);
}

private async Task HandleOne(Burrito burrito, CancellationToken token)
{
var asyncDelete = _settings.AsynchronousDeletes;

try
{
await _handler(burrito.Message, token);
await ForkOrWait(!_asyncDelete, burrito, Complete, token);
await ForkOrWait(!asyncDelete, ct => Complete(burrito, ct), token);
}
catch (Exception ex)
{
Log.LogError(ex, "Failed to handle message {Receipt}", burrito.Receipt);
Forget(burrito);
}
finally
{
_semaphore.Release();
}
}

/// <summary>
Expand All @@ -186,7 +247,7 @@ protected virtual void Dispose(bool disposing)
{
if (!disposing) return;

_agent.Dispose();
_supervisor.Dispose();
_keeper.Dispose();
}

Expand Down
21 changes: 20 additions & 1 deletion src/K4os.Async.Toys/BatchSubscriberSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace K4os.Async.Toys;
/// </summary>
public interface IBatchSubscriberSettings
{
/// <summary>Number of handlers to be used to process messages.</summary>
/// <summary>Number of handlers to be used to process messages concurrently.</summary>
int HandlerCount { get; set; }

/// <summary>
Expand Down Expand Up @@ -53,6 +53,19 @@ public interface IBatchSubscriberSettings
/// messages will not ensure everything is deleted, but give a little bit higher throughput.
/// </summary>
bool AsynchronousDeletes { get; set; }

/// <summary>
/// Length of internal queue. Increasing it may help smoothing processing speed, but
/// using 1 is also fine.
/// </summary>
int InternalQueueSize { get; set; }

/// <summary>
/// Number of source date pollers. Affects how quickly new job as acquired, increase it only
/// if your runners are idle, but not overdo it as all messages which are polled but not handled
/// yet will need to be tracked and touched periodically. Default value is usually good enough.
/// </summary>
int PollerCount { get; set; }
}

/// <summary>
Expand Down Expand Up @@ -89,4 +102,10 @@ public class BatchSubscriberSettings: IBatchSubscriberSettings

/// <inheritdoc />
public bool AsynchronousDeletes { get; set; } = false;

/// <inheritdoc />
public int InternalQueueSize { get; set; } = 1;

/// <inheritdoc />
public int PollerCount { get; set; } = 1;
}
9 changes: 9 additions & 0 deletions src/K4os.Async.Toys/Internal/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ public static T NotLessThan<T>(this T value, T limit, IComparer<T>? comparer = n

public static T NotMoreThan<T>(this T value, T limit, IComparer<T>? comparer = null) =>
Compare(value, limit, comparer) > 0 ? limit : value;

public static IEnumerable<T> TapEach<T>(this IEnumerable<T> sequence, Action<T> action)
{
foreach (var item in sequence)
{
action(item);
yield return item;
}
}

public static void ForEach<T>(this IEnumerable<T> sequence, Action<T> action)
{
Expand Down
Loading

0 comments on commit 0ecf563

Please sign in to comment.