Skip to content

Commit

Permalink
explosing a DisposableBag
Browse files Browse the repository at this point in the history
  • Loading branch information
MiloszKrajewski committed Nov 7, 2023
1 parent a22c60e commit 9a35172
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 75 deletions.
113 changes: 113 additions & 0 deletions src/K4os.Async.Toys/Internal/AsyncExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
using System.Threading.Channels;

namespace K4os.Async.Toys.Internal;

/// <summary>
/// Async extensions.
/// </summary>
public static class AsyncExtensions
{
/// <summary>
/// Reads multiple items from channel reader.
/// </summary>
/// <param name="reader">Reader.</param>
/// <param name="length">Maximum length.</param>
/// <param name="token">Cancellation token.</param>
/// <typeparam name="T">Type of item.</typeparam>
/// <returns>List of read items.</returns>
public static async Task<List<T>?> ReadManyAsync<T>(
this ChannelReader<T> reader, int length = int.MaxValue,
CancellationToken token = default)
{
var ready = await reader.WaitToReadAsync(token);
if (!ready) return null;

var list = default(List<T>);
Drain(reader, ref list, ref length);

return list;
}

/// <summary>
/// Reads multiple items from channel reader, waiting for more items after initial item.
/// </summary>
/// <param name="reader">Reader.</param>
/// <param name="delay">Delay.</param>
/// <param name="length">Maximum length.</param>
/// <param name="delayer">Delay method, <see cref="Task.Delay(TimeSpan,CancellationToken)"/>
/// is used if not specified.</param>
/// <param name="token">Cancellation token.</param>
/// <typeparam name="T">Type of item.</typeparam>
/// <returns>List of read items.</returns>
public static async Task<List<T>?> ReadManyAsync<T>(
this ChannelReader<T> reader, TimeSpan delay,
int length = int.MaxValue,
Func<TimeSpan, CancellationToken, Task>? delayer = null,
CancellationToken token = default)
{
var list = await reader.ReadManyAsync(length, token);
if (list is null || list.Count >= length || delay <= TimeSpan.Zero)
return list;

using var cancel = CancellationTokenSource.CreateLinkedTokenSource(token);
using var window = (delayer ?? Task.Delay)(delay, cancel.Token);
await reader.ReadManyMoreAsync(list, length, window);
cancel.Cancel();
return list;
}

/// <summary>
/// Reads multiple items from channel reader, waiting for more items after initial item.
/// </summary>
/// <param name="reader">Reader.</param>
/// <param name="delay">Delay.</param>
/// <param name="length">Maximum length.</param>
/// <param name="timeSource">Time source, system clock used in not specified.</param>
/// <param name="token">Cancellation token.</param>
/// <typeparam name="T">Type of item.</typeparam>
/// <returns>List of read items.</returns>
public static Task<List<T>?> ReadManyAsync<T>(
this ChannelReader<T> reader, TimeSpan delay,
int length = int.MaxValue,
ITimeSource? timeSource = null,
CancellationToken token = default) =>
reader.ReadManyAsync(delay, length, (timeSource ?? TimeSource.Default).Delay, token);

private static async Task ReadManyMoreAsync<T>(
this ChannelReader<T> reader, List<T> list, int length, Task window)
{
var completed = reader.Completion;
length -= list.Count; // length left

while (true)
{
Drain(reader, ref list!, ref length);
if (length <= 0) break;

var ready = reader.WaitToReadAsync().AsTask();
var evt = await Task.WhenAny(window, completed, ready);
if (evt != ready) break;
}
}

private static void Drain<T>(
ChannelReader<T> reader, ref List<T>? list, ref int length)
{
while (length > 0 && reader.TryRead(out var item))
{
(list ??= new List<T>()).Add(item);
length--;
}
}

/// <summary>
/// Explicitly forgets about task, silencing potential error. Used for fire-and-forget.
/// </summary>
/// <param name="task">Task to be forgotten.</param>
public static void Forget(this Task task)
{
task.ContinueWith(
t => t.Exception, // clear exception so TPL stops complaining
TaskContinuationOptions.NotOnRanToCompletion);
}
}
2 changes: 1 addition & 1 deletion src/K4os.Async.Toys/Internal/Disposable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ namespace K4os.Async.Toys.Internal;
/// <summary>
/// Disposable that executes given action when disposed.
/// </summary>
internal class Disposable: IDisposable
public class Disposable: IDisposable
{
/// <summary>Creates disposable that executes given action when disposed.</summary>
/// <param name="action">Action to be executed. Note, <c>null</c> is a valid value for no-action.</param>
Expand Down
2 changes: 1 addition & 1 deletion src/K4os.Async.Toys/Internal/DisposableBag.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace K4os.Async.Toys.Internal;

/// <summary>A disposable collection of disposables.</summary>
/// <seealso cref="IDisposable" />
internal class DisposableBag: IDisposable
public class DisposableBag: IDisposable
{
private readonly object _mutex = new();
private readonly List<IDisposable> _bag = new();
Expand Down
74 changes: 1 addition & 73 deletions src/K4os.Async.Toys/Internal/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Channels;

namespace K4os.Async.Toys.Internal;

Expand All @@ -23,77 +22,6 @@ public static void ForEach<T>(this IEnumerable<T> sequence, Action<T> action)
action(item);
}

public static async Task<List<T>?> ReadManyAsync<T>(
this ChannelReader<T> reader, int length = int.MaxValue,
CancellationToken token = default)
{
var ready = await reader.WaitToReadAsync(token);
if (!ready) return null;

var list = default(List<T>);
Drain(reader, ref list, ref length);

return list;
}

public static async Task<List<T>?> ReadManyAsync<T>(
this ChannelReader<T> reader, TimeSpan delay,
int length = int.MaxValue,
Func<TimeSpan, CancellationToken, Task>? delayer = null,
CancellationToken token = default)
{
var list = await reader.ReadManyAsync(length, token);
if (list is null || list.Count >= length || delay <= TimeSpan.Zero)
return list;

using var cancel = CancellationTokenSource.CreateLinkedTokenSource(token);
using var window = (delayer ?? Task.Delay)(delay, cancel.Token);
await reader.ReadManyMoreAsync(list, length, window);
cancel.Cancel();
return list;
}

public static Task<List<T>?> ReadManyAsync<T>(
this ChannelReader<T> reader, TimeSpan delay,
int length = int.MaxValue,
ITimeSource? timeSource = null,
CancellationToken token = default) =>
reader.ReadManyAsync(delay, length, (timeSource ?? TimeSource.Default).Delay, token);

private static async Task ReadManyMoreAsync<T>(
this ChannelReader<T> reader, List<T> list, int length, Task window)
{
var completed = reader.Completion;
length -= list.Count; // length left

while (true)
{
Drain(reader, ref list!, ref length);
if (length <= 0) break;

var ready = reader.WaitToReadAsync().AsTask();
var evt = await Task.WhenAny(window, completed, ready);
if (evt != ready) break;
}
}

private static void Drain<T>(
ChannelReader<T> reader, ref List<T>? list, ref int length)
{
while (length > 0 && reader.TryRead(out var item))
{
(list ??= new List<T>()).Add(item);
length--;
}
}

public static void Forget(this Task task)
{
task.ContinueWith(
t => t.Exception, // clear exception so TPL stops complaining
TaskContinuationOptions.NotOnRanToCompletion);
}

public static T Required<T>(
this T argument,
[CallerArgumentExpression("argument")] string? argumentName = null) where T: class =>
Expand All @@ -106,4 +34,4 @@ public static T[] EmptyIfNull<T>(this T[]? argument) =>
public static TValue? TryGetOrDefault<TKey, TValue>(
this IDictionary<TKey, TValue> dictionary, TKey key, TValue? fallback = default) =>
dictionary.TryGetValue(key, out var result) ? result : fallback;
}
}

0 comments on commit 9a35172

Please sign in to comment.