Skip to content

Commit

Permalink
Merge pull request #2 from MiloszKrajewski/batch-subscriber
Browse files Browse the repository at this point in the history
Batch subscriber
  • Loading branch information
MiloszKrajewski authored Nov 6, 2023
2 parents ffee1ca + 49b179d commit a22c60e
Show file tree
Hide file tree
Showing 21 changed files with 579 additions and 219 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.0.15 (2023/11/06)
* FIXED: even more bugs in AliveKeeper
* ADDED: BatchSubscriber allows to subscribe to any poll-able source

## 0.0.14 (2023/11/02)
* FIXED: multiple bugs in AliveKeeper

Expand Down
62 changes: 0 additions & 62 deletions src/K4os.Async.Toys.App/ColorConsoleProvider.cs

This file was deleted.

12 changes: 0 additions & 12 deletions src/K4os.Async.Toys.App/K4os.Async.Toys.App.csproj

This file was deleted.

91 changes: 0 additions & 91 deletions src/K4os.Async.Toys.App/Program.cs

This file was deleted.

7 changes: 0 additions & 7 deletions src/K4os.Async.Toys.App/paket.references

This file was deleted.

6 changes: 0 additions & 6 deletions src/K4os.Async.Toys.sln
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "K4os.Async.Toys", "K4os.Asy
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "K4os.Async.Toys.Tests", "K4os.Async.Toys.Tests\K4os.Async.Toys.Tests.csproj", "{1FB5731C-2571-43BD-A19A-45110360FC44}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "K4os.Async.Toys.App", "K4os.Async.Toys.App\K4os.Async.Toys.App.csproj", "{3E541988-CA33-477D-907E-DA43944AF400}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Benchmarks", "Benchmarks\Benchmarks.csproj", "{BA1AD6AB-F72E-4A0C-ACA7-AA328C70CD59}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Playground", "Playground\Playground.csproj", "{5A8468E0-AB21-4749-AD96-6179566BF31F}"
Expand All @@ -51,10 +49,6 @@ Global
{1FB5731C-2571-43BD-A19A-45110360FC44}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1FB5731C-2571-43BD-A19A-45110360FC44}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1FB5731C-2571-43BD-A19A-45110360FC44}.Release|Any CPU.Build.0 = Release|Any CPU
{3E541988-CA33-477D-907E-DA43944AF400}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3E541988-CA33-477D-907E-DA43944AF400}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3E541988-CA33-477D-907E-DA43944AF400}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3E541988-CA33-477D-907E-DA43944AF400}.Release|Any CPU.Build.0 = Release|Any CPU
{17C2C146-01F7-4C31-89FF-F20958A8C147}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{17C2C146-01F7-4C31-89FF-F20958A8C147}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BA1AD6AB-F72E-4A0C-ACA7-AA328C70CD59}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
Expand Down
30 changes: 13 additions & 17 deletions src/K4os.Async.Toys/AliveKeeper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private class InFlight
private readonly IAliveKeeperSyncPolicy _syncPolicy;

private readonly CancellationTokenSource _cancel = new();

/// <summary>
/// Creates new instance of <see cref="AliveKeeper{T}"/>.
/// </summary>
Expand Down Expand Up @@ -213,10 +213,8 @@ private async Task<T[]> DeleteMany(T[] items)
}
}

private async Task TouchOne(string display, T item)
private async Task TouchOne(T item)
{
Log.LogDebug("Touching [{Key}]...", display);

try
{
await _touchBatch.Request(item);
Expand All @@ -227,10 +225,8 @@ private async Task TouchOne(string display, T item)
}
}

private async Task DeleteOne(string display, T item)
private async Task DeleteOne(T item)
{
Log.LogDebug("Deleting [{Key}]...", display);

try
{
await _deleteBatch.Request(item);
Expand All @@ -248,9 +244,8 @@ protected virtual string Display(T key) =>
// ReSharper disable once NullCoalescingConditionIsAlwaysNotNullAccordingToAPIContract
_keyToString?.Invoke(key) ?? key.ToString() ?? "<null>";

private async Task TouchOneLoop(T item, CancellationToken token)
private async Task TouchOneLoop(T item, InFlight? inFlight, CancellationToken token)
{
var inFlight = TryActivate(item);
if (inFlight is null) return;

try
Expand All @@ -261,7 +256,6 @@ private async Task TouchOneLoop(T item, CancellationToken token)
var interval = _settings.TouchInterval;
var retry = _settings.RetryInterval;

var display = Display(item);
var failed = 0;

while (!combinedToken.IsCancellationRequested)
Expand All @@ -272,12 +266,12 @@ private async Task TouchOneLoop(T item, CancellationToken token)

try
{
await TouchOne(display, item);
await TouchOne(item);
failed = 0;
}
catch (Exception e)
{
if (!OnOperationFailed(e, "Touch", display, ++failed))
if (!ShouldRetry(e, "Touch", Display(item), ++failed))
return;
}
}
Expand All @@ -300,7 +294,6 @@ protected async Task DeleteOneLoop(T item, CancellationToken token)
{
var retry = _settings.RetryInterval;

var display = Display(item);
var failed = 0;
while (!token.IsCancellationRequested)
{
Expand All @@ -317,23 +310,25 @@ protected async Task DeleteOneLoop(T item, CancellationToken token)

try
{
await DeleteOne(display, item);
await DeleteOne(item);
return;
}
catch (Exception e)
{
if (!OnOperationFailed(e, "Delete", display, ++failed))
if (!ShouldRetry(e, "Delete", Display(item), ++failed))
return;
}
}
}
finally
{
// this is questionable, maybe we could deactivate it as soon as this method is called?
// this would require some changes as we still need to monitor Forget() calls
Deactivate(item);
}
}

private bool OnOperationFailed(
private bool ShouldRetry(
Exception exception, string operation, string display, int failed)
{
var retryLimit = _settings.RetryLimit;
Expand All @@ -360,7 +355,8 @@ public void Register(T item, CancellationToken token = default)
if (IsDisposing)
return;

Task.Run(() => TouchOneLoop(item, token), token).Forget();
var inFlight = TryActivate(item);
Task.Run(() => TouchOneLoop(item, inFlight, token), token).Forget();
}

/// <summary>
Expand Down
1 change: 0 additions & 1 deletion src/K4os.Async.Toys/AliveKeeper.static.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace K4os.Async.Toys;
Expand Down
4 changes: 2 additions & 2 deletions src/K4os.Async.Toys/AliveKeeperSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class AliveKeeperSettings: IAliveKeeperSettings
public int TouchBatchSize { get; set; } = 1;

/// <inheritdoc />
public TimeSpan TouchBatchDelay { get; set; }
public TimeSpan TouchBatchDelay { get; set; } = TimeSpan.Zero;

/// <inheritdoc />
public int DeleteBatchSize { get; set; } = 1;
Expand All @@ -80,7 +80,7 @@ public class AliveKeeperSettings: IAliveKeeperSettings
public TimeSpan RetryInterval { get; set; } = TimeSpan.FromMilliseconds(100);

/// <inheritdoc />
public int RetryLimit { get; set; }
public int RetryLimit { get; set; } = 0;

/// <inheritdoc />
public int Concurrency { get; set; } = 1;
Expand Down
Loading

0 comments on commit a22c60e

Please sign in to comment.