Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support direct get batch API #680

Draft
wants to merge 10 commits into
base: server-2.11-features
Choose a base branch
from
5 changes: 5 additions & 0 deletions src/NATS.Client.Core/NatsHeaderParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ private bool TryParseHeaderLine(ReadOnlySpan<byte> headerLine, NatsHeaders heade
headers.Message = NatsHeaders.Messages.MessageSizeExceedsMaxBytes;
headers.MessageText = NatsHeaders.MessageMessageSizeExceedsMaxBytesStr;
}
else if (headerLine.SequenceEqual(NatsHeaders.MessageEobCode))
{
headers.Message = NatsHeaders.Messages.EobCode;
headers.MessageText = NatsHeaders.MessageEobCodeStr;
}
else
{
headers.Message = NatsHeaders.Messages.Text;
Expand Down
5 changes: 5 additions & 0 deletions src/NATS.Client.Core/NatsHeaders.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public enum Messages
NoMessages,
RequestTimeout,
MessageSizeExceedsMaxBytes,
EobCode,
}

// Uses C# compiler's optimization for static byte[] data
Expand Down Expand Up @@ -56,6 +57,10 @@ public enum Messages
internal static ReadOnlySpan<byte> MessageMessageSizeExceedsMaxBytes => new byte[] { 77, 101, 115, 115, 97, 103, 101, 32, 83, 105, 122, 101, 32, 69, 120, 99, 101, 101, 100, 115, 32, 77, 97, 120, 66, 121, 116, 101, 115 };
internal static readonly string MessageMessageSizeExceedsMaxBytesStr = "Message Size Exceeds MaxBytes";

// EOB
internal static ReadOnlySpan<byte> MessageEobCode => new byte[] { 69, 79, 66 };
internal static readonly string MessageEobCodeStr = "EOB";

private static readonly string[] EmptyKeys = Array.Empty<string>();
private static readonly StringValues[] EmptyValues = Array.Empty<StringValues>();

Expand Down
10 changes: 10 additions & 0 deletions src/NATS.Client.JetStream/INatsJSStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,15 @@ ValueTask UpdateAsync(

ValueTask<NatsMsg<T>> GetDirectAsync<T>(StreamMsgGetRequest request, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default);

/// <summary>
/// Request a direct batch message
/// </summary>
/// <param name="request">Batch message request.</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="includeEob"><c>true</c> to send the last empty message with eobCode in the header; otherwise <c>false</c></param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <exception cref="InvalidOperationException">There was an issue, stream must have allow direct set.</exception>
IAsyncEnumerable<NatsMsg<T>> GetBatchDirectAsync<T>(StreamMsgBatchGetRequest request, INatsDeserialize<T>? serializer = default, bool includeEob = false, CancellationToken cancellationToken = default);

ValueTask<StreamMsgGetResponse> GetAsync(StreamMsgGetRequest request, CancellationToken cancellationToken = default);
}
78 changes: 78 additions & 0 deletions src/NATS.Client.JetStream/Models/StreamMsgBatchGetRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using System.ComponentModel.DataAnnotations;
using System.Text.Json.Serialization;

namespace NATS.Client.JetStream.Models;

/// <summary>
/// A request to the JetStream $JS.API.STREAM.MSG.GET API
/// </summary>
public record StreamMsgBatchGetRequest
{
/// <summary>
/// The maximum amount of messages to be returned for this request
/// </summary>
[JsonPropertyName("batch")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
[Range(-1, int.MaxValue)]
public int Batch { get; set; }

/// <summary>
/// The maximum amount of returned bytes for this request.
/// </summary>
[JsonPropertyName("max_bytes")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
[Range(-1, int.MaxValue)]
public int MaxBytes { get; set; }

/// <summary>
/// The minimum sequence for returned message
/// </summary>
[JsonPropertyName("seq")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
[Range(ulong.MinValue, ulong.MaxValue)]
public ulong MinSequence { get; set; }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these names might've been update. Also inline with StreamMsgGetRequest we are calling this property Seq

https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-31.md#request

Copy link
Contributor Author

@Ivandemidov00 Ivandemidov00 Feb 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamMsgBatchGetRequest updated


/// <summary>
/// The minimum start time for returned message
/// </summary>
[JsonPropertyName("start_time")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public DateTimeOffset StartTime { get; set; }

/// <summary>
/// The subject used filter messages that should be returned
/// </summary>
[JsonPropertyName("next_by_subj")]
[JsonIgnore(Condition = JsonIgnoreCondition.Never)]
[Required]
#if NET6_0
public string Subject { get; set; } = default!;
#else
#pragma warning disable SA1206
public required string Subject { get; set; }

#pragma warning restore SA1206
#endif

/// <summary>
/// Return last messages mathing the subjects
/// </summary>
[JsonPropertyName("multi_last")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public string[] LastBySubjects { get; set; } = [];

/// <summary>
/// Return message after sequence
/// </summary>
[JsonPropertyName("up_to_seq")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
[Range(ulong.MinValue, ulong.MaxValue)]
public ulong UpToSequence { get; set; }

/// <summary>
/// Return message after time
/// </summary>
[JsonPropertyName("up_to_time")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public DateTimeOffset UpToTime { get; set; }
}
1 change: 1 addition & 0 deletions src/NATS.Client.JetStream/NatsJSJsonSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public static class NatsJSJsonSerializer<T>
[JsonSerializable(typeof(StreamMsgDeleteResponse))]
[JsonSerializable(typeof(StreamMsgGetRequest))]
[JsonSerializable(typeof(StreamMsgGetResponse))]
[JsonSerializable(typeof(StreamMsgBatchGetRequest))]
[JsonSerializable(typeof(StreamNamesRequest))]
[JsonSerializable(typeof(StreamNamesResponse))]
[JsonSerializable(typeof(StreamPurgeRequest))]
Expand Down
29 changes: 29 additions & 0 deletions src/NATS.Client.JetStream/NatsJSStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,27 @@ public ValueTask<NatsMsg<T>> GetDirectAsync<T>(StreamMsgGetRequest request, INat
cancellationToken: cancellationToken);
}

/// <summary>
/// Request a direct batch message
/// </summary>
/// <param name="request">Batch message request.</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="includeEob"><c>true</c> to send the last empty message with eobCode in the header; otherwise <c>false</c></param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <exception cref="InvalidOperationException">There was an issue, stream must have allow direct set.</exception>
public IAsyncEnumerable<NatsMsg<T>> GetBatchDirectAsync<T>(StreamMsgBatchGetRequest request, INatsDeserialize<T>? serializer = default, bool includeEob = false, CancellationToken cancellationToken = default)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably shouldn't expose includeEob since it's kind of internal. Do you see that being used somehow?

Copy link
Contributor Author

@Ivandemidov00 Ivandemidov00 Feb 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I understanding correctly that we don't want to send eodCode related messages to the client?
Similar functionality is available in nats.java

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not if there is a use case for it but as long as it defaults to false I'm ok with it.

{
ValidateStream();

return _context.Connection.RequestManyAsync<StreamMsgBatchGetRequest, T>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should inline the RequestMany method here and handle error conditions as well.

subject: $"{_context.Opts.Prefix}.DIRECT.GET.{_name}",
data: request,
requestSerializer: NatsJSJsonSerializer<StreamMsgBatchGetRequest>.Default,
replySerializer: serializer,
replyOpts: new NatsSubOpts() { StopOnEmptyMsg = !includeEob, ThrowIfNoResponders = true },
cancellationToken: cancellationToken);
}

public ValueTask<StreamMsgGetResponse> GetAsync(StreamMsgGetRequest request, CancellationToken cancellationToken = default) =>
_context.JSRequestResponseAsync<StreamMsgGetRequest, StreamMsgGetResponse>(
subject: $"{_context.Opts.Prefix}.STREAM.MSG.GET.{_name}",
Expand All @@ -192,4 +213,12 @@ private void ThrowIfDeleted()
if (_deleted)
throw new NatsJSException($"Stream '{_name}' is deleted");
}

private void ValidateStream()
{
if (!Info.Config.AllowDirect)
{
throw new InvalidOperationException("StreamMsgBatchGetRequest is not permitted when AllowDirect on stream disable");
}
}
}
1 change: 1 addition & 0 deletions tests/NATS.Client.CoreUnit.Tests/NatsHeaderTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public void ParserTests()
[InlineData("Request Timeout", NatsHeaders.Messages.RequestTimeout)]
[InlineData("Message Size Exceeds MaxBytes", NatsHeaders.Messages.MessageSizeExceedsMaxBytes)]
[InlineData("test message", NatsHeaders.Messages.Text)]
[InlineData("EOB", NatsHeaders.Messages.EobCode)]
public void ParserMessageEnumTests(string message, NatsHeaders.Messages result)
{
var parser = new NatsHeaderParser(Encoding.UTF8);
Expand Down
116 changes: 116 additions & 0 deletions tests/NATS.Client.JetStream.Tests/DirectGetTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
using NATS.Client.Core.Tests;
using NATS.Client.JetStream.Models;

namespace NATS.Client.JetStream.Tests;

public class DirectGetTest
{
[SkipIfNatsServer(versionEarlierThan: "2.11")]
public async Task Direct_get_when_stream_disable()
{
await using var server = NatsServer.StartJS();
var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource();
var cancellationToken = cts.Token;
var streamConfig = new StreamConfig("stream_disable", new[] { "stream_disable.x" });

var stream = await js.CreateStreamAsync(streamConfig, cancellationToken);

async Task GetBatchAction()
{
var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { Subject = "stream_disable.x" };
await foreach (var unused in stream.GetBatchDirectAsync<string>(streamMsgBatchGetRequest, cancellationToken: cancellationToken))
{
}
}

await Assert.ThrowsAsync<InvalidOperationException>(GetBatchAction);
}

[SkipIfNatsServer(versionEarlierThan: "2.11")]
public async Task Direct_get_when_stream_enable()
{
var testDataList = new List<TestData?>();
await using var server = NatsServer.StartJS();
var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100));
var cancellationToken = cts.Token;
var streamConfig = new StreamConfig("stream_enable", new[] { "stream_enable.x" }) { AllowDirect = true };

var stream = await js.CreateStreamAsync(streamConfig, cancellationToken);

for (var i = 0; i < 1; i++)
{
await js.PublishAsync("stream_enable.x", new TestData { Test = i }, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken);
}

var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { Subject = "stream_enable.x", Batch = 3 };
await foreach (var msg in stream.GetBatchDirectAsync(streamMsgBatchGetRequest, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken))
{
testDataList.Add(msg.Data);
}

Assert.Single(testDataList);
}

[SkipIfNatsServer(versionEarlierThan: "2.11")]
public async Task Direct_get_with_eobCode()
{
var testDataList = new List<TestData?>();
await using var server = NatsServer.StartJS();
var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100));
var cancellationToken = cts.Token;
var streamConfig = new StreamConfig("eobCode", new[] { "eobCode.x" }) { AllowDirect = true };

var stream = await js.CreateStreamAsync(streamConfig, cancellationToken);

for (var i = 0; i < 1; i++)
{
await js.PublishAsync("eobCode.x", new TestData { Test = i }, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken);
}

var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { Subject = "eobCode.x", Batch = 3 };
await foreach (var msg in stream.GetBatchDirectAsync(streamMsgBatchGetRequest, TestDataJsonSerializer<TestData>.Default, includeEob: true, cancellationToken: cancellationToken))
{
testDataList.Add(msg.Data);
}

Assert.Equal(2, testDataList.Count);
}

[SkipIfNatsServer(versionEarlierThan: "2.11")]
public async Task Direct_get_min_sequence()
{
var testDataList = new List<TestData?>();
await using var server = NatsServer.StartJS();
var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100));
var cancellationToken = cts.Token;
var streamConfig = new StreamConfig("min_sequence", new[] { "min_sequence.x" }) { AllowDirect = true };

var stream = await js.CreateStreamAsync(streamConfig, cancellationToken);

for (var i = 0; i < 3; i++)
{
await js.PublishAsync("min_sequence.x", new TestData { Test = i }, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken);
}

var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { Subject = "min_sequence.x", Batch = 1, MinSequence = 3 };
await foreach (var msg in stream.GetBatchDirectAsync(streamMsgBatchGetRequest, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken))
{
testDataList.Add(msg.Data);
}

Assert.Single(testDataList);
Assert.Equal(2, testDataList[0]?.Test);
}
}