Skip to content

Commit

Permalink
Merge pull request #64 from mauroservienti/timeouts-reschedule-support
Browse files Browse the repository at this point in the history
Timeouts reschedule support
  • Loading branch information
mauroservienti authored Jun 3, 2020
2 parents 245c770 + 528d86d commit f2ec070
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 3 deletions.
12 changes: 10 additions & 2 deletions src/MyService/ASaga.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ namespace MyService
{
public class ASaga : Saga<ASagaData>,
IAmStartedByMessages<StartASaga>,
IHandleMessages<CompleteASaga>
IHandleMessages<CompleteASaga>,
IHandleTimeouts<ASaga.MyTimeout>
{
public Task Handle(StartASaga message, IMessageHandlerContext context)
{
return Task.CompletedTask;
return RequestTimeout<MyTimeout>(context, DateTime.Now.AddDays(10));
}

public Task Handle(CompleteASaga message, IMessageHandlerContext context)
Expand All @@ -20,11 +21,18 @@ public Task Handle(CompleteASaga message, IMessageHandlerContext context)
return Task.CompletedTask;
}

public Task Timeout(MyTimeout state, IMessageHandlerContext context)
{
return Task.CompletedTask;
}

protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ASagaData> mapper)
{
mapper.ConfigureMapping<StartASaga>(m => m.SomeId).ToSaga(s => s.SomeId);
mapper.ConfigureMapping<CompleteASaga>(m => m.SomeId).ToSaga(s => s.SomeId);
}

public class MyTimeout { }
}

public class ASagaData : ContainSagaData
Expand Down
43 changes: 43 additions & 0 deletions src/MySystem.AcceptanceTests/When_requesting_a_timeout.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using MyMessages.Messages;
using MyService;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.DelayedDelivery;
using NServiceBus.IntegrationTesting;
using NUnit.Framework;
using System;
using System.Linq;
using System.Threading.Tasks;

namespace MySystem.AcceptanceTests
{
public class When_requesting_a_timeout
{
[Test]
public async Task It_should_be_rescheduled_and_handled()
{
var context = await Scenario.Define<IntegrationScenarioContext>(ctx =>
{
ctx.RegisterTimeoutRescheduleRule<ASaga.MyTimeout>(currentDelay =>
{
return new DoNotDeliverBefore(DateTime.Now.AddSeconds(5));
});
})
.WithEndpoint<MyServiceEndpoint>(g => g.When(session => session.Send("MyService", new StartASaga() { SomeId = Guid.NewGuid() })))
.Done(c => c.MessageWasProcessedBySaga<ASaga.MyTimeout, ASaga>() || c.HasFailedMessages())
.Run();

Assert.True(context.MessageWasProcessedBySaga<ASaga.MyTimeout, ASaga>());
Assert.False(context.HasFailedMessages());
Assert.False(context.HasHandlingErrors());
}

class MyServiceEndpoint : EndpointConfigurationBuilder
{
public MyServiceEndpoint()
{
EndpointSetup<EndpointTemplate<MyServiceConfiguration>>();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using MyMessages.Messages;
using NServiceBus.DelayedDelivery;
using NServiceBus.DeliveryConstraints;
using NServiceBus.Pipeline;
using NServiceBus.Testing;
using NUnit.Framework;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace NServiceBus.IntegrationTesting.Tests
{
public class Reschedule_Timeout_Behavior
{
[Test]
public async Task Should_Reschedule_Timeouts()
{
var expectedDeliveryAt = new DateTime(2020, 1, 1);

var scenarioContext = new IntegrationScenarioContext();
scenarioContext.RegisterTimeoutRescheduleRule<AMessage>(currentDelay =>
{
return new DoNotDeliverBefore(expectedDeliveryAt);
});

var context = new TestableOutgoingSendContext
{
Message = new OutgoingLogicalMessage(typeof(AMessage), new AMessage())
};
context.Headers.Add(Headers.SagaId, "a-saga-id");
context.Headers.Add(Headers.SagaType, "a-saga-type");
context.Headers.Add(Headers.IsSagaTimeoutMessage, bool.TrueString);

var constraints = new List<DeliveryConstraint>
{
new DoNotDeliverBefore(new DateTime(2030, 1, 1))
};
context.Extensions.Set(constraints);

var sut = new RescheduleTimeoutsBehavior(scenarioContext); ;
await sut.Invoke(context, () => Task.CompletedTask).ConfigureAwait(false);

var rescheduledDoNotDeliverBefore = constraints.OfType<DoNotDeliverBefore>().Single();
Assert.AreEqual(expectedDeliveryAt, rescheduledDoNotDeliverBefore.At);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using MyMessages.Messages;
using NServiceBus.DelayedDelivery;
using NServiceBus.DeliveryConstraints;
using NServiceBus.Pipeline;
using NServiceBus.Testing;
using NUnit.Framework;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public static void RegisterRequiredPipelineBehaviors(this EndpointConfiguration
builder.Pipeline.Register(new InterceptSendOperations(endpointName, integrationScenarioContext), "Intercept send operations");
builder.Pipeline.Register(new InterceptPublishOperations(endpointName, integrationScenarioContext), "Intercept publish operations");
builder.Pipeline.Register(new InterceptReplyOperations(endpointName, integrationScenarioContext), "Intercept reply operations");
builder.Pipeline.Register(new RescheduleTimeoutsBehavior(integrationScenarioContext), "Intercept serialized message dispatch");
}

public static void RegisterScenarioContext(this EndpointConfiguration builder, ScenarioContext scenarioContext)
Expand Down
34 changes: 34 additions & 0 deletions src/NServiceBus.IntegrationTesting/IntegrationScenarioContext.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using NServiceBus.AcceptanceTesting;
using NServiceBus.DelayedDelivery;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
Expand All @@ -11,6 +13,7 @@ public class IntegrationScenarioContext : ScenarioContext
readonly ConcurrentBag<HandlerInvocation> invokedHandlers = new ConcurrentBag<HandlerInvocation>();
readonly ConcurrentBag<SagaInvocation> invokedSagas = new ConcurrentBag<SagaInvocation>();
readonly ConcurrentBag<OutgoingMessageOperation> outgoingMessageOperations = new ConcurrentBag<OutgoingMessageOperation>();
readonly Dictionary<Type, Func<DoNotDeliverBefore, DoNotDeliverBefore>> timeoutRescheduleRules = new Dictionary<Type, Func<DoNotDeliverBefore, DoNotDeliverBefore>>();

public IEnumerable<HandlerInvocation> InvokedHandlers { get { return invokedHandlers; } }
public IEnumerable<SagaInvocation> InvokedSagas { get { return invokedSagas; } }
Expand All @@ -23,6 +26,21 @@ internal HandlerInvocation CaptureInvokedHandler(HandlerInvocation invocation)
return invocation;
}

public void RegisterTimeoutRescheduleRule<TTimeout>(Func<DoNotDeliverBefore, DoNotDeliverBefore> rule)
{
if (timeoutRescheduleRules.ContainsKey(typeof(TTimeout)))
{
throw new NotSupportedException("Only one rule per timeout message type is allowed.");
}

timeoutRescheduleRules.Add(typeof(TTimeout), rule);
}

internal bool TryGetTimeoutRescheduleRule(Type timeoutMessageType, out Func<DoNotDeliverBefore, DoNotDeliverBefore> rule)
{
return timeoutRescheduleRules.TryGetValue(timeoutMessageType, out rule);
}

internal void AddOutogingOperation(OutgoingMessageOperation outgoingMessageOperation)
{
outgoingMessageOperations.Add(outgoingMessageOperation);
Expand All @@ -45,6 +63,22 @@ public bool SagaWasInvoked<TSaga>() where TSaga : Saga
return InvokedSagas.Any(invocation => invocation.SagaType == typeof(TSaga));
}

public bool MessageWasProcessed<TMessage>()
{
return invokedHandlers.Any(invocation => invocation.MessageType == typeof(TMessage))
|| invokedSagas.Any(invocation => invocation.MessageType == typeof(TMessage));
}

public bool MessageWasProcessedBySaga<TMessage, TSaga>()
{
return invokedSagas.Any(i => i.SagaType == typeof(TSaga) && i.MessageType == typeof(TMessage));
}

public bool MessageWasProcessedByHandler<TMessage, THandler>()
{
return invokedHandlers.Any(i => i.HandlerType == typeof(THandler) && i.MessageType == typeof(TMessage));
}

public bool HasFailedMessages()
{
return !FailedMessages.IsEmpty;
Expand Down
35 changes: 35 additions & 0 deletions src/NServiceBus.IntegrationTesting/InterceptPhysicalDispatch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using NServiceBus.DelayedDelivery;
using NServiceBus.DeliveryConstraints;
using NServiceBus.Pipeline;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace NServiceBus.IntegrationTesting
{
class RescheduleTimeoutsBehavior : Behavior<IOutgoingSendContext>
{
readonly IntegrationScenarioContext integrationContext;

public RescheduleTimeoutsBehavior(IntegrationScenarioContext integrationContext)
{
this.integrationContext = integrationContext;
}

public override Task Invoke(IOutgoingSendContext context, Func<Task> next)
{
if (integrationContext.TryGetTimeoutRescheduleRule(context.Message.MessageType, out Func<DoNotDeliverBefore, DoNotDeliverBefore> rule))
{
var constraints = context.Extensions.Get<List<DeliveryConstraint>>();
var doNotDeliverBefore = constraints.OfType<DoNotDeliverBefore>().SingleOrDefault();

var newDoNotDeliverBefore = rule(doNotDeliverBefore);
constraints.Remove(doNotDeliverBefore);
constraints.Add(newDoNotDeliverBefore);
}

return next();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
using NServiceBus.Pipeline;
using NServiceBus.DelayedDelivery;
using NServiceBus.DeliveryConstraints;
using NServiceBus.Pipeline;
using NUnit.Framework;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace NServiceBus.IntegrationTesting
Expand Down

0 comments on commit f2ec070

Please sign in to comment.