-
Notifications
You must be signed in to change notification settings - Fork 60
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added support direct get batch API #680
base: server-2.11-features
Are you sure you want to change the base?
Changes from 6 commits
ae90e37
075f749
95a495f
c633869
4110d1d
8e31dda
26d42c4
84e64f9
edb6b4f
4b49424
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
using System.ComponentModel.DataAnnotations; | ||
using System.Text.Json.Serialization; | ||
|
||
namespace NATS.Client.JetStream.Models; | ||
|
||
/// <summary> | ||
/// A request to the JetStream $JS.API.STREAM.MSG.GET API | ||
/// </summary> | ||
public record StreamMsgBatchGetRequest | ||
{ | ||
/// <summary> | ||
/// The maximum amount of messages to be returned for this request | ||
/// </summary> | ||
[JsonPropertyName("batch")] | ||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] | ||
[Range(-1, int.MaxValue)] | ||
public int Batch { get; set; } | ||
|
||
/// <summary> | ||
/// The maximum amount of returned bytes for this request. | ||
/// </summary> | ||
[JsonPropertyName("max_bytes")] | ||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] | ||
[Range(-1, int.MaxValue)] | ||
public int MaxBytes { get; set; } | ||
|
||
/// <summary> | ||
/// The minimum sequence for returned message | ||
/// </summary> | ||
[JsonPropertyName("seq")] | ||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] | ||
[Range(ulong.MinValue, ulong.MaxValue)] | ||
public ulong MinSequence { get; set; } | ||
|
||
/// <summary> | ||
/// The minimum start time for returned message | ||
/// </summary> | ||
[JsonPropertyName("start_time")] | ||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] | ||
public DateTimeOffset StartTime { get; set; } | ||
|
||
/// <summary> | ||
/// The subject used filter messages that should be returned | ||
/// </summary> | ||
[JsonPropertyName("next_by_subj")] | ||
[JsonIgnore(Condition = JsonIgnoreCondition.Never)] | ||
[Required] | ||
#if NET6_0 | ||
public string Subject { get; set; } = default!; | ||
#else | ||
#pragma warning disable SA1206 | ||
public required string Subject { get; set; } | ||
|
||
#pragma warning restore SA1206 | ||
#endif | ||
|
||
/// <summary> | ||
/// Return last messages mathing the subjects | ||
/// </summary> | ||
[JsonPropertyName("multi_last")] | ||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] | ||
public string[] LastBySubjects { get; set; } = []; | ||
|
||
/// <summary> | ||
/// Return message after sequence | ||
/// </summary> | ||
[JsonPropertyName("up_to_seq")] | ||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] | ||
[Range(ulong.MinValue, ulong.MaxValue)] | ||
public ulong UpToSequence { get; set; } | ||
|
||
/// <summary> | ||
/// Return message after time | ||
/// </summary> | ||
[JsonPropertyName("up_to_time")] | ||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] | ||
public DateTimeOffset UpToTime { get; set; } | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -181,6 +181,27 @@ public ValueTask<NatsMsg<T>> GetDirectAsync<T>(StreamMsgGetRequest request, INat | |
cancellationToken: cancellationToken); | ||
} | ||
|
||
/// <summary> | ||
/// Request a direct batch message | ||
/// </summary> | ||
/// <param name="request">Batch message request.</param> | ||
/// <param name="serializer">Serializer to use for the message type.</param> | ||
/// <param name="includeEob"><c>true</c> to send the last empty message with eobCode in the header; otherwise <c>false</c></param> | ||
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param> | ||
/// <exception cref="InvalidOperationException">There was an issue, stream must have allow direct set.</exception> | ||
public IAsyncEnumerable<NatsMsg<T>> GetBatchDirectAsync<T>(StreamMsgBatchGetRequest request, INatsDeserialize<T>? serializer = default, bool includeEob = false, CancellationToken cancellationToken = default) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we probably shouldn't expose There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Am I understanding correctly that we don't want to send eodCode related messages to the client? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not if there is a use case for it but as long as it defaults to false I'm ok with it. |
||
{ | ||
ValidateStream(); | ||
|
||
return _context.Connection.RequestManyAsync<StreamMsgBatchGetRequest, T>( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should inline the RequestMany method here and handle error conditions as well. |
||
subject: $"{_context.Opts.Prefix}.DIRECT.GET.{_name}", | ||
data: request, | ||
requestSerializer: NatsJSJsonSerializer<StreamMsgBatchGetRequest>.Default, | ||
replySerializer: serializer, | ||
replyOpts: new NatsSubOpts() { StopOnEmptyMsg = !includeEob, ThrowIfNoResponders = true }, | ||
cancellationToken: cancellationToken); | ||
} | ||
|
||
public ValueTask<StreamMsgGetResponse> GetAsync(StreamMsgGetRequest request, CancellationToken cancellationToken = default) => | ||
_context.JSRequestResponseAsync<StreamMsgGetRequest, StreamMsgGetResponse>( | ||
subject: $"{_context.Opts.Prefix}.STREAM.MSG.GET.{_name}", | ||
|
@@ -192,4 +213,12 @@ private void ThrowIfDeleted() | |
if (_deleted) | ||
throw new NatsJSException($"Stream '{_name}' is deleted"); | ||
} | ||
|
||
private void ValidateStream() | ||
{ | ||
if (!Info.Config.AllowDirect) | ||
{ | ||
throw new InvalidOperationException("StreamMsgBatchGetRequest is not permitted when AllowDirect on stream disable"); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
using NATS.Client.Core.Tests; | ||
using NATS.Client.JetStream.Models; | ||
|
||
namespace NATS.Client.JetStream.Tests; | ||
|
||
public class DirectGetTest | ||
{ | ||
[SkipIfNatsServer(versionEarlierThan: "2.11")] | ||
public async Task Direct_get_when_stream_disable() | ||
{ | ||
await using var server = NatsServer.StartJS(); | ||
var nats = server.CreateClientConnection(); | ||
var js = new NatsJSContext(nats); | ||
|
||
var cts = new CancellationTokenSource(); | ||
var cancellationToken = cts.Token; | ||
var streamConfig = new StreamConfig("stream_disable", new[] { "stream_disable.x" }); | ||
|
||
var stream = await js.CreateStreamAsync(streamConfig, cancellationToken); | ||
|
||
async Task GetBatchAction() | ||
{ | ||
var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { Subject = "stream_disable.x" }; | ||
await foreach (var unused in stream.GetBatchDirectAsync<string>(streamMsgBatchGetRequest, cancellationToken: cancellationToken)) | ||
{ | ||
} | ||
} | ||
|
||
await Assert.ThrowsAsync<InvalidOperationException>(GetBatchAction); | ||
} | ||
|
||
[SkipIfNatsServer(versionEarlierThan: "2.11")] | ||
public async Task Direct_get_when_stream_enable() | ||
{ | ||
var testDataList = new List<TestData?>(); | ||
await using var server = NatsServer.StartJS(); | ||
var nats = server.CreateClientConnection(); | ||
var js = new NatsJSContext(nats); | ||
|
||
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100)); | ||
var cancellationToken = cts.Token; | ||
var streamConfig = new StreamConfig("stream_enable", new[] { "stream_enable.x" }) { AllowDirect = true }; | ||
|
||
var stream = await js.CreateStreamAsync(streamConfig, cancellationToken); | ||
|
||
for (var i = 0; i < 1; i++) | ||
{ | ||
await js.PublishAsync("stream_enable.x", new TestData { Test = i }, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken); | ||
} | ||
|
||
var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { Subject = "stream_enable.x", Batch = 3 }; | ||
await foreach (var msg in stream.GetBatchDirectAsync(streamMsgBatchGetRequest, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken)) | ||
{ | ||
testDataList.Add(msg.Data); | ||
} | ||
|
||
Assert.Single(testDataList); | ||
} | ||
|
||
[SkipIfNatsServer(versionEarlierThan: "2.11")] | ||
public async Task Direct_get_with_eobCode() | ||
{ | ||
var testDataList = new List<TestData?>(); | ||
await using var server = NatsServer.StartJS(); | ||
var nats = server.CreateClientConnection(); | ||
var js = new NatsJSContext(nats); | ||
|
||
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100)); | ||
var cancellationToken = cts.Token; | ||
var streamConfig = new StreamConfig("eobCode", new[] { "eobCode.x" }) { AllowDirect = true }; | ||
|
||
var stream = await js.CreateStreamAsync(streamConfig, cancellationToken); | ||
|
||
for (var i = 0; i < 1; i++) | ||
{ | ||
await js.PublishAsync("eobCode.x", new TestData { Test = i }, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken); | ||
} | ||
|
||
var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { Subject = "eobCode.x", Batch = 3 }; | ||
await foreach (var msg in stream.GetBatchDirectAsync(streamMsgBatchGetRequest, TestDataJsonSerializer<TestData>.Default, includeEob: true, cancellationToken: cancellationToken)) | ||
{ | ||
testDataList.Add(msg.Data); | ||
} | ||
|
||
Assert.Equal(2, testDataList.Count); | ||
} | ||
|
||
[SkipIfNatsServer(versionEarlierThan: "2.11")] | ||
public async Task Direct_get_min_sequence() | ||
{ | ||
var testDataList = new List<TestData?>(); | ||
await using var server = NatsServer.StartJS(); | ||
var nats = server.CreateClientConnection(); | ||
var js = new NatsJSContext(nats); | ||
|
||
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(100)); | ||
var cancellationToken = cts.Token; | ||
var streamConfig = new StreamConfig("min_sequence", new[] { "min_sequence.x" }) { AllowDirect = true }; | ||
|
||
var stream = await js.CreateStreamAsync(streamConfig, cancellationToken); | ||
|
||
for (var i = 0; i < 3; i++) | ||
{ | ||
await js.PublishAsync("min_sequence.x", new TestData { Test = i }, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken); | ||
} | ||
|
||
var streamMsgBatchGetRequest = new StreamMsgBatchGetRequest { Subject = "min_sequence.x", Batch = 1, MinSequence = 3 }; | ||
await foreach (var msg in stream.GetBatchDirectAsync(streamMsgBatchGetRequest, TestDataJsonSerializer<TestData>.Default, cancellationToken: cancellationToken)) | ||
{ | ||
testDataList.Add(msg.Data); | ||
} | ||
|
||
Assert.Single(testDataList); | ||
Assert.Equal(2, testDataList[0]?.Test); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these names might've been update. Also inline with
StreamMsgGetRequest
we are calling this propertySeq
https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-31.md#request
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamMsgBatchGetRequest updated