From 1285871ff6f602d9955c1faaade640d151ae1679 Mon Sep 17 00:00:00 2001 From: Mauro Servienti Date: Tue, 2 Jun 2020 21:23:57 +0200 Subject: [PATCH 1/5] Allow to register timeout reschedule rules --- .../IntegrationScenarioContext.cs | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/src/NServiceBus.IntegrationTesting/IntegrationScenarioContext.cs b/src/NServiceBus.IntegrationTesting/IntegrationScenarioContext.cs index 4fb58d40..46c46db3 100644 --- a/src/NServiceBus.IntegrationTesting/IntegrationScenarioContext.cs +++ b/src/NServiceBus.IntegrationTesting/IntegrationScenarioContext.cs @@ -1,4 +1,6 @@ using NServiceBus.AcceptanceTesting; +using NServiceBus.DelayedDelivery; +using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; @@ -11,7 +13,8 @@ public class IntegrationScenarioContext : ScenarioContext readonly ConcurrentBag invokedHandlers = new ConcurrentBag(); readonly ConcurrentBag invokedSagas = new ConcurrentBag(); readonly ConcurrentBag outgoingMessageOperations = new ConcurrentBag(); - + readonly Dictionary> timeoutRescheduleRules = new Dictionary>(); + public IEnumerable InvokedHandlers { get { return invokedHandlers; } } public IEnumerable InvokedSagas { get { return invokedSagas; } } public IEnumerable OutgoingMessageOperations { get { return outgoingMessageOperations; } } @@ -23,6 +26,21 @@ internal HandlerInvocation CaptureInvokedHandler(HandlerInvocation invocation) return invocation; } + public void RegisterTimeoutRescheduleRule(Func rule) + { + if (timeoutRescheduleRules.ContainsKey(typeof(TTimeout))) + { + throw new NotSupportedException("Only one rule per timeout message type is allowed."); + } + + timeoutRescheduleRules.Add(typeof(TTimeout), rule); + } + + internal bool TryGetTimeoutRescheduleRule(Type timeoutMessageType, out Func rule) + { + return timeoutRescheduleRules.TryGetValue(timeoutMessageType, out rule); + } + internal void AddOutogingOperation(OutgoingMessageOperation outgoingMessageOperation) { outgoingMessageOperations.Add(outgoingMessageOperation); From f78ba53f6f1ad2920809245b32f654d22e5f8e24 Mon Sep 17 00:00:00 2001 From: Mauro Servienti Date: Tue, 2 Jun 2020 21:24:25 +0200 Subject: [PATCH 2/5] Invokes timeouts reschedule rules --- .../InterceptSendOperations.cs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/NServiceBus.IntegrationTesting/InterceptSendOperations.cs b/src/NServiceBus.IntegrationTesting/InterceptSendOperations.cs index b047074e..ade1e4a5 100644 --- a/src/NServiceBus.IntegrationTesting/InterceptSendOperations.cs +++ b/src/NServiceBus.IntegrationTesting/InterceptSendOperations.cs @@ -1,5 +1,10 @@ -using NServiceBus.Pipeline; +using NServiceBus.DelayedDelivery; +using NServiceBus.DeliveryConstraints; +using NServiceBus.Pipeline; +using NUnit.Framework; using System; +using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; namespace NServiceBus.IntegrationTesting @@ -20,6 +25,16 @@ public override async Task Invoke(IOutgoingSendContext context, Func next) OutgoingMessageOperation outgoingOperation; if (context.Headers.ContainsKey(Headers.IsSagaTimeoutMessage) && context.Headers[Headers.IsSagaTimeoutMessage] == bool.TrueString) { + if (integrationContext.TryGetTimeoutRescheduleRule(context.Message.MessageType, out Func rule)) + { + var constraints = context.Extensions.Get>(); + var doNotDeliverBefore = constraints.OfType().SingleOrDefault(); + + var newDoNotDeliverBefore = rule(doNotDeliverBefore); + constraints.Remove(doNotDeliverBefore); + constraints.Add(newDoNotDeliverBefore); + } + outgoingOperation = new RequestTimeoutOperation() { SagaId = context.Headers[Headers.SagaId], From ac114912fe70ba410962af012837a33bb2caf9aa Mon Sep 17 00:00:00 2001 From: Mauro Servienti Date: Tue, 2 Jun 2020 21:32:10 +0200 Subject: [PATCH 3/5] test behavior --- .../Send_Operation_Interceptor.cs | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/src/NServiceBus.IntegrationTesting.Tests/Send_Operation_Interceptor.cs b/src/NServiceBus.IntegrationTesting.Tests/Send_Operation_Interceptor.cs index 487959f5..24be1111 100644 --- a/src/NServiceBus.IntegrationTesting.Tests/Send_Operation_Interceptor.cs +++ b/src/NServiceBus.IntegrationTesting.Tests/Send_Operation_Interceptor.cs @@ -1,8 +1,11 @@ using MyMessages.Messages; +using NServiceBus.DelayedDelivery; +using NServiceBus.DeliveryConstraints; using NServiceBus.Pipeline; using NServiceBus.Testing; using NUnit.Framework; using System; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; @@ -53,5 +56,37 @@ public async Task Should_Capture_TimeoutRequest_Operation() Assert.AreEqual(expectedSagaId, requestTimeoutOperation.SagaId); Assert.AreEqual(expectedSagaType, requestTimeoutOperation.SagaTypeAssemblyQualifiedName); } + + [Test] + public async Task Should_Reschedule_Timeouts() + { + var expectedDeliveryAt = new DateTime(2020, 1, 1); + + var scenarioContext = new IntegrationScenarioContext(); + scenarioContext.RegisterTimeoutRescheduleRule(currentDelay => + { + return new DoNotDeliverBefore(expectedDeliveryAt); + }); + + var context = new TestableOutgoingSendContext + { + Message = new OutgoingLogicalMessage(typeof(AMessage), new AMessage()) + }; + context.Headers.Add(Headers.SagaId, "a-saga-id"); + context.Headers.Add(Headers.SagaType, "a-saga-type"); + context.Headers.Add(Headers.IsSagaTimeoutMessage, bool.TrueString); + + var constraints = new List + { + new DoNotDeliverBefore(new DateTime(2030, 1, 1)) + }; + context.Extensions.Set(constraints); + + var sut = new InterceptSendOperations("fake-endpoint", scenarioContext); ; + await sut.Invoke(context, () => Task.CompletedTask).ConfigureAwait(false); + + var rescheduledDoNotDeliverBefore = constraints.OfType().Single(); + Assert.AreEqual(expectedDeliveryAt, rescheduledDoNotDeliverBefore.At); + } } } From c09d282ada3f035fa42d566eedfbe43e1ad435ae Mon Sep 17 00:00:00 2001 From: Mauro Servienti Date: Tue, 2 Jun 2020 21:43:13 +0200 Subject: [PATCH 4/5] new done condition --- src/MyService/ASaga.cs | 12 +++- .../When_requesting_a_timeout.cs | 59 +++++++++++++++++++ 2 files changed, 69 insertions(+), 2 deletions(-) create mode 100644 src/MySystem.AcceptanceTests/When_requesting_a_timeout.cs diff --git a/src/MyService/ASaga.cs b/src/MyService/ASaga.cs index 4285f6f3..9ff550bc 100644 --- a/src/MyService/ASaga.cs +++ b/src/MyService/ASaga.cs @@ -7,11 +7,12 @@ namespace MyService { public class ASaga : Saga, IAmStartedByMessages, - IHandleMessages + IHandleMessages, + IHandleTimeouts { public Task Handle(StartASaga message, IMessageHandlerContext context) { - return Task.CompletedTask; + return RequestTimeout(context, DateTime.Now.AddDays(10)); } public Task Handle(CompleteASaga message, IMessageHandlerContext context) @@ -20,11 +21,18 @@ public Task Handle(CompleteASaga message, IMessageHandlerContext context) return Task.CompletedTask; } + public Task Timeout(MyTimeout state, IMessageHandlerContext context) + { + return Task.CompletedTask; + } + protected override void ConfigureHowToFindSaga(SagaPropertyMapper mapper) { mapper.ConfigureMapping(m => m.SomeId).ToSaga(s => s.SomeId); mapper.ConfigureMapping(m => m.SomeId).ToSaga(s => s.SomeId); } + + public class MyTimeout { } } public class ASagaData : ContainSagaData diff --git a/src/MySystem.AcceptanceTests/When_requesting_a_timeout.cs b/src/MySystem.AcceptanceTests/When_requesting_a_timeout.cs new file mode 100644 index 00000000..0e1c3a47 --- /dev/null +++ b/src/MySystem.AcceptanceTests/When_requesting_a_timeout.cs @@ -0,0 +1,59 @@ +using MyMessages.Messages; +using MyService; +using NServiceBus; +using NServiceBus.AcceptanceTesting; +using NServiceBus.DelayedDelivery; +using NServiceBus.IntegrationTesting; +using NUnit.Framework; +using System; +using System.Linq; +using System.Threading.Tasks; + +namespace MySystem.AcceptanceTests +{ + public class When_requesting_a_timeout + { + [Test] + public async Task It_should_be_rescheduled_and_handled() + { + var theExpectedSagaId = Guid.NewGuid(); + var context = await Scenario.Define(ctx=> + { + ctx.RegisterTimeoutRescheduleRule(currentDelay => + { + return new DoNotDeliverBefore(DateTime.Now.AddSeconds(5)); + }); + }) + .WithEndpoint(g => + { + g.When(session => session.Send("MyService", new StartASaga() { SomeId = theExpectedSagaId })); + }) + .Done(c => + { + return + ( + c.SagaHandledMessage() + ) + || c.HasFailedMessages(); + }) + .Run(); + + var invokedSagas = context.InvokedSagas.Where(s => s.SagaType == typeof(ASaga)); + var newSaga = invokedSagas.SingleOrDefault(s => s.IsNew); + var completedSaga = invokedSagas.SingleOrDefault(s => s.IsCompleted); + + Assert.IsNotNull(newSaga); + Assert.IsNotNull(completedSaga); + Assert.False(context.HasFailedMessages()); + Assert.False(context.HasHandlingErrors()); + } + + class MyServiceEndpoint : EndpointConfigurationBuilder + { + public MyServiceEndpoint() + { + EndpointSetup>(); + } + } + } +} From 528d86de2e5a089f40faa2ecbd965a9a462fc9b7 Mon Sep 17 00:00:00 2001 From: Mauro Servienti Date: Wed, 3 Jun 2020 06:25:50 +0200 Subject: [PATCH 5/5] dedicated behavior to reschedule timeouts, integration test --- .../When_requesting_a_timeout.cs | 26 ++-------- .../Reschedule_Timeout_Behavior.cs | 48 +++++++++++++++++++ .../Send_Operation_Interceptor.cs | 32 ------------- .../EndpointConfigurationExtensions.cs | 1 + .../IntegrationScenarioContext.cs | 22 +++++++-- .../InterceptPhysicalDispatch.cs | 35 ++++++++++++++ .../InterceptSendOperations.cs | 10 ---- 7 files changed, 108 insertions(+), 66 deletions(-) create mode 100644 src/NServiceBus.IntegrationTesting.Tests/Reschedule_Timeout_Behavior.cs create mode 100644 src/NServiceBus.IntegrationTesting/InterceptPhysicalDispatch.cs diff --git a/src/MySystem.AcceptanceTests/When_requesting_a_timeout.cs b/src/MySystem.AcceptanceTests/When_requesting_a_timeout.cs index 0e1c3a47..4aa1f96a 100644 --- a/src/MySystem.AcceptanceTests/When_requesting_a_timeout.cs +++ b/src/MySystem.AcceptanceTests/When_requesting_a_timeout.cs @@ -16,34 +16,18 @@ public class When_requesting_a_timeout [Test] public async Task It_should_be_rescheduled_and_handled() { - var theExpectedSagaId = Guid.NewGuid(); - var context = await Scenario.Define(ctx=> + var context = await Scenario.Define(ctx => { - ctx.RegisterTimeoutRescheduleRule(currentDelay => + ctx.RegisterTimeoutRescheduleRule(currentDelay => { return new DoNotDeliverBefore(DateTime.Now.AddSeconds(5)); }); }) - .WithEndpoint(g => - { - g.When(session => session.Send("MyService", new StartASaga() { SomeId = theExpectedSagaId })); - }) - .Done(c => - { - return - ( - c.SagaHandledMessage() - ) - || c.HasFailedMessages(); - }) + .WithEndpoint(g => g.When(session => session.Send("MyService", new StartASaga() { SomeId = Guid.NewGuid() }))) + .Done(c => c.MessageWasProcessedBySaga() || c.HasFailedMessages()) .Run(); - var invokedSagas = context.InvokedSagas.Where(s => s.SagaType == typeof(ASaga)); - var newSaga = invokedSagas.SingleOrDefault(s => s.IsNew); - var completedSaga = invokedSagas.SingleOrDefault(s => s.IsCompleted); - - Assert.IsNotNull(newSaga); - Assert.IsNotNull(completedSaga); + Assert.True(context.MessageWasProcessedBySaga()); Assert.False(context.HasFailedMessages()); Assert.False(context.HasHandlingErrors()); } diff --git a/src/NServiceBus.IntegrationTesting.Tests/Reschedule_Timeout_Behavior.cs b/src/NServiceBus.IntegrationTesting.Tests/Reschedule_Timeout_Behavior.cs new file mode 100644 index 00000000..fcac744d --- /dev/null +++ b/src/NServiceBus.IntegrationTesting.Tests/Reschedule_Timeout_Behavior.cs @@ -0,0 +1,48 @@ +using MyMessages.Messages; +using NServiceBus.DelayedDelivery; +using NServiceBus.DeliveryConstraints; +using NServiceBus.Pipeline; +using NServiceBus.Testing; +using NUnit.Framework; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace NServiceBus.IntegrationTesting.Tests +{ + public class Reschedule_Timeout_Behavior + { + [Test] + public async Task Should_Reschedule_Timeouts() + { + var expectedDeliveryAt = new DateTime(2020, 1, 1); + + var scenarioContext = new IntegrationScenarioContext(); + scenarioContext.RegisterTimeoutRescheduleRule(currentDelay => + { + return new DoNotDeliverBefore(expectedDeliveryAt); + }); + + var context = new TestableOutgoingSendContext + { + Message = new OutgoingLogicalMessage(typeof(AMessage), new AMessage()) + }; + context.Headers.Add(Headers.SagaId, "a-saga-id"); + context.Headers.Add(Headers.SagaType, "a-saga-type"); + context.Headers.Add(Headers.IsSagaTimeoutMessage, bool.TrueString); + + var constraints = new List + { + new DoNotDeliverBefore(new DateTime(2030, 1, 1)) + }; + context.Extensions.Set(constraints); + + var sut = new RescheduleTimeoutsBehavior(scenarioContext); ; + await sut.Invoke(context, () => Task.CompletedTask).ConfigureAwait(false); + + var rescheduledDoNotDeliverBefore = constraints.OfType().Single(); + Assert.AreEqual(expectedDeliveryAt, rescheduledDoNotDeliverBefore.At); + } + } +} diff --git a/src/NServiceBus.IntegrationTesting.Tests/Send_Operation_Interceptor.cs b/src/NServiceBus.IntegrationTesting.Tests/Send_Operation_Interceptor.cs index 24be1111..e4b328b6 100644 --- a/src/NServiceBus.IntegrationTesting.Tests/Send_Operation_Interceptor.cs +++ b/src/NServiceBus.IntegrationTesting.Tests/Send_Operation_Interceptor.cs @@ -56,37 +56,5 @@ public async Task Should_Capture_TimeoutRequest_Operation() Assert.AreEqual(expectedSagaId, requestTimeoutOperation.SagaId); Assert.AreEqual(expectedSagaType, requestTimeoutOperation.SagaTypeAssemblyQualifiedName); } - - [Test] - public async Task Should_Reschedule_Timeouts() - { - var expectedDeliveryAt = new DateTime(2020, 1, 1); - - var scenarioContext = new IntegrationScenarioContext(); - scenarioContext.RegisterTimeoutRescheduleRule(currentDelay => - { - return new DoNotDeliverBefore(expectedDeliveryAt); - }); - - var context = new TestableOutgoingSendContext - { - Message = new OutgoingLogicalMessage(typeof(AMessage), new AMessage()) - }; - context.Headers.Add(Headers.SagaId, "a-saga-id"); - context.Headers.Add(Headers.SagaType, "a-saga-type"); - context.Headers.Add(Headers.IsSagaTimeoutMessage, bool.TrueString); - - var constraints = new List - { - new DoNotDeliverBefore(new DateTime(2030, 1, 1)) - }; - context.Extensions.Set(constraints); - - var sut = new InterceptSendOperations("fake-endpoint", scenarioContext); ; - await sut.Invoke(context, () => Task.CompletedTask).ConfigureAwait(false); - - var rescheduledDoNotDeliverBefore = constraints.OfType().Single(); - Assert.AreEqual(expectedDeliveryAt, rescheduledDoNotDeliverBefore.At); - } } } diff --git a/src/NServiceBus.IntegrationTesting/EndpointConfigurationExtensions.cs b/src/NServiceBus.IntegrationTesting/EndpointConfigurationExtensions.cs index 2720a0f9..618bd008 100644 --- a/src/NServiceBus.IntegrationTesting/EndpointConfigurationExtensions.cs +++ b/src/NServiceBus.IntegrationTesting/EndpointConfigurationExtensions.cs @@ -11,6 +11,7 @@ public static void RegisterRequiredPipelineBehaviors(this EndpointConfiguration builder.Pipeline.Register(new InterceptSendOperations(endpointName, integrationScenarioContext), "Intercept send operations"); builder.Pipeline.Register(new InterceptPublishOperations(endpointName, integrationScenarioContext), "Intercept publish operations"); builder.Pipeline.Register(new InterceptReplyOperations(endpointName, integrationScenarioContext), "Intercept reply operations"); + builder.Pipeline.Register(new RescheduleTimeoutsBehavior(integrationScenarioContext), "Intercept serialized message dispatch"); } public static void RegisterScenarioContext(this EndpointConfiguration builder, ScenarioContext scenarioContext) diff --git a/src/NServiceBus.IntegrationTesting/IntegrationScenarioContext.cs b/src/NServiceBus.IntegrationTesting/IntegrationScenarioContext.cs index 46c46db3..4a9f6766 100644 --- a/src/NServiceBus.IntegrationTesting/IntegrationScenarioContext.cs +++ b/src/NServiceBus.IntegrationTesting/IntegrationScenarioContext.cs @@ -14,7 +14,7 @@ public class IntegrationScenarioContext : ScenarioContext readonly ConcurrentBag invokedSagas = new ConcurrentBag(); readonly ConcurrentBag outgoingMessageOperations = new ConcurrentBag(); readonly Dictionary> timeoutRescheduleRules = new Dictionary>(); - + public IEnumerable InvokedHandlers { get { return invokedHandlers; } } public IEnumerable InvokedSagas { get { return invokedSagas; } } public IEnumerable OutgoingMessageOperations { get { return outgoingMessageOperations; } } @@ -26,9 +26,9 @@ internal HandlerInvocation CaptureInvokedHandler(HandlerInvocation invocation) return invocation; } - public void RegisterTimeoutRescheduleRule(Func rule) + public void RegisterTimeoutRescheduleRule(Func rule) { - if (timeoutRescheduleRules.ContainsKey(typeof(TTimeout))) + if (timeoutRescheduleRules.ContainsKey(typeof(TTimeout))) { throw new NotSupportedException("Only one rule per timeout message type is allowed."); } @@ -63,6 +63,22 @@ public bool SagaWasInvoked() where TSaga : Saga return InvokedSagas.Any(invocation => invocation.SagaType == typeof(TSaga)); } + public bool MessageWasProcessed() + { + return invokedHandlers.Any(invocation => invocation.MessageType == typeof(TMessage)) + || invokedSagas.Any(invocation => invocation.MessageType == typeof(TMessage)); + } + + public bool MessageWasProcessedBySaga() + { + return invokedSagas.Any(i => i.SagaType == typeof(TSaga) && i.MessageType == typeof(TMessage)); + } + + public bool MessageWasProcessedByHandler() + { + return invokedHandlers.Any(i => i.HandlerType == typeof(THandler) && i.MessageType == typeof(TMessage)); + } + public bool HasFailedMessages() { return !FailedMessages.IsEmpty; diff --git a/src/NServiceBus.IntegrationTesting/InterceptPhysicalDispatch.cs b/src/NServiceBus.IntegrationTesting/InterceptPhysicalDispatch.cs new file mode 100644 index 00000000..06cba21e --- /dev/null +++ b/src/NServiceBus.IntegrationTesting/InterceptPhysicalDispatch.cs @@ -0,0 +1,35 @@ +using NServiceBus.DelayedDelivery; +using NServiceBus.DeliveryConstraints; +using NServiceBus.Pipeline; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace NServiceBus.IntegrationTesting +{ + class RescheduleTimeoutsBehavior : Behavior + { + readonly IntegrationScenarioContext integrationContext; + + public RescheduleTimeoutsBehavior(IntegrationScenarioContext integrationContext) + { + this.integrationContext = integrationContext; + } + + public override Task Invoke(IOutgoingSendContext context, Func next) + { + if (integrationContext.TryGetTimeoutRescheduleRule(context.Message.MessageType, out Func rule)) + { + var constraints = context.Extensions.Get>(); + var doNotDeliverBefore = constraints.OfType().SingleOrDefault(); + + var newDoNotDeliverBefore = rule(doNotDeliverBefore); + constraints.Remove(doNotDeliverBefore); + constraints.Add(newDoNotDeliverBefore); + } + + return next(); + } + } +} diff --git a/src/NServiceBus.IntegrationTesting/InterceptSendOperations.cs b/src/NServiceBus.IntegrationTesting/InterceptSendOperations.cs index ade1e4a5..a3019f2e 100644 --- a/src/NServiceBus.IntegrationTesting/InterceptSendOperations.cs +++ b/src/NServiceBus.IntegrationTesting/InterceptSendOperations.cs @@ -25,16 +25,6 @@ public override async Task Invoke(IOutgoingSendContext context, Func next) OutgoingMessageOperation outgoingOperation; if (context.Headers.ContainsKey(Headers.IsSagaTimeoutMessage) && context.Headers[Headers.IsSagaTimeoutMessage] == bool.TrueString) { - if (integrationContext.TryGetTimeoutRescheduleRule(context.Message.MessageType, out Func rule)) - { - var constraints = context.Extensions.Get>(); - var doNotDeliverBefore = constraints.OfType().SingleOrDefault(); - - var newDoNotDeliverBefore = rule(doNotDeliverBefore); - constraints.Remove(doNotDeliverBefore); - constraints.Add(newDoNotDeliverBefore); - } - outgoingOperation = new RequestTimeoutOperation() { SagaId = context.Headers[Headers.SagaId],