Skip to content

Commit

Permalink
dedicated behavior to reschedule timeouts, integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
mauroservienti committed Jun 3, 2020
1 parent c09d282 commit 528d86d
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 66 deletions.
26 changes: 5 additions & 21 deletions src/MySystem.AcceptanceTests/When_requesting_a_timeout.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,18 @@ public class When_requesting_a_timeout
[Test]
public async Task It_should_be_rescheduled_and_handled()
{
var theExpectedSagaId = Guid.NewGuid();
var context = await Scenario.Define<IntegrationScenarioContext>(ctx=>
var context = await Scenario.Define<IntegrationScenarioContext>(ctx =>
{
ctx.RegisterTimeoutRescheduleRule<ASaga.MyTimeout>(currentDelay =>
ctx.RegisterTimeoutRescheduleRule<ASaga.MyTimeout>(currentDelay =>
{
return new DoNotDeliverBefore(DateTime.Now.AddSeconds(5));
});
})
.WithEndpoint<MyServiceEndpoint>(g =>
{
g.When(session => session.Send("MyService", new StartASaga() { SomeId = theExpectedSagaId }));
})
.Done(c =>
{
return
(
c.SagaHandledMessage<ASaga, ASaga.MyTimeout>()
)
|| c.HasFailedMessages();
})
.WithEndpoint<MyServiceEndpoint>(g => g.When(session => session.Send("MyService", new StartASaga() { SomeId = Guid.NewGuid() })))
.Done(c => c.MessageWasProcessedBySaga<ASaga.MyTimeout, ASaga>() || c.HasFailedMessages())
.Run();

var invokedSagas = context.InvokedSagas.Where(s => s.SagaType == typeof(ASaga));
var newSaga = invokedSagas.SingleOrDefault(s => s.IsNew);
var completedSaga = invokedSagas.SingleOrDefault(s => s.IsCompleted);

Assert.IsNotNull(newSaga);
Assert.IsNotNull(completedSaga);
Assert.True(context.MessageWasProcessedBySaga<ASaga.MyTimeout, ASaga>());
Assert.False(context.HasFailedMessages());
Assert.False(context.HasHandlingErrors());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using MyMessages.Messages;
using NServiceBus.DelayedDelivery;
using NServiceBus.DeliveryConstraints;
using NServiceBus.Pipeline;
using NServiceBus.Testing;
using NUnit.Framework;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace NServiceBus.IntegrationTesting.Tests
{
public class Reschedule_Timeout_Behavior
{
[Test]
public async Task Should_Reschedule_Timeouts()
{
var expectedDeliveryAt = new DateTime(2020, 1, 1);

var scenarioContext = new IntegrationScenarioContext();
scenarioContext.RegisterTimeoutRescheduleRule<AMessage>(currentDelay =>
{
return new DoNotDeliverBefore(expectedDeliveryAt);
});

var context = new TestableOutgoingSendContext
{
Message = new OutgoingLogicalMessage(typeof(AMessage), new AMessage())
};
context.Headers.Add(Headers.SagaId, "a-saga-id");
context.Headers.Add(Headers.SagaType, "a-saga-type");
context.Headers.Add(Headers.IsSagaTimeoutMessage, bool.TrueString);

var constraints = new List<DeliveryConstraint>
{
new DoNotDeliverBefore(new DateTime(2030, 1, 1))
};
context.Extensions.Set(constraints);

var sut = new RescheduleTimeoutsBehavior(scenarioContext); ;
await sut.Invoke(context, () => Task.CompletedTask).ConfigureAwait(false);

var rescheduledDoNotDeliverBefore = constraints.OfType<DoNotDeliverBefore>().Single();
Assert.AreEqual(expectedDeliveryAt, rescheduledDoNotDeliverBefore.At);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,37 +56,5 @@ public async Task Should_Capture_TimeoutRequest_Operation()
Assert.AreEqual(expectedSagaId, requestTimeoutOperation.SagaId);
Assert.AreEqual(expectedSagaType, requestTimeoutOperation.SagaTypeAssemblyQualifiedName);
}

[Test]
public async Task Should_Reschedule_Timeouts()
{
var expectedDeliveryAt = new DateTime(2020, 1, 1);

var scenarioContext = new IntegrationScenarioContext();
scenarioContext.RegisterTimeoutRescheduleRule<AMessage>(currentDelay =>
{
return new DoNotDeliverBefore(expectedDeliveryAt);
});

var context = new TestableOutgoingSendContext
{
Message = new OutgoingLogicalMessage(typeof(AMessage), new AMessage())
};
context.Headers.Add(Headers.SagaId, "a-saga-id");
context.Headers.Add(Headers.SagaType, "a-saga-type");
context.Headers.Add(Headers.IsSagaTimeoutMessage, bool.TrueString);

var constraints = new List<DeliveryConstraint>
{
new DoNotDeliverBefore(new DateTime(2030, 1, 1))
};
context.Extensions.Set(constraints);

var sut = new InterceptSendOperations("fake-endpoint", scenarioContext); ;
await sut.Invoke(context, () => Task.CompletedTask).ConfigureAwait(false);

var rescheduledDoNotDeliverBefore = constraints.OfType<DoNotDeliverBefore>().Single();
Assert.AreEqual(expectedDeliveryAt, rescheduledDoNotDeliverBefore.At);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public static void RegisterRequiredPipelineBehaviors(this EndpointConfiguration
builder.Pipeline.Register(new InterceptSendOperations(endpointName, integrationScenarioContext), "Intercept send operations");
builder.Pipeline.Register(new InterceptPublishOperations(endpointName, integrationScenarioContext), "Intercept publish operations");
builder.Pipeline.Register(new InterceptReplyOperations(endpointName, integrationScenarioContext), "Intercept reply operations");
builder.Pipeline.Register(new RescheduleTimeoutsBehavior(integrationScenarioContext), "Intercept serialized message dispatch");
}

public static void RegisterScenarioContext(this EndpointConfiguration builder, ScenarioContext scenarioContext)
Expand Down
22 changes: 19 additions & 3 deletions src/NServiceBus.IntegrationTesting/IntegrationScenarioContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class IntegrationScenarioContext : ScenarioContext
readonly ConcurrentBag<SagaInvocation> invokedSagas = new ConcurrentBag<SagaInvocation>();
readonly ConcurrentBag<OutgoingMessageOperation> outgoingMessageOperations = new ConcurrentBag<OutgoingMessageOperation>();
readonly Dictionary<Type, Func<DoNotDeliverBefore, DoNotDeliverBefore>> timeoutRescheduleRules = new Dictionary<Type, Func<DoNotDeliverBefore, DoNotDeliverBefore>>();

public IEnumerable<HandlerInvocation> InvokedHandlers { get { return invokedHandlers; } }
public IEnumerable<SagaInvocation> InvokedSagas { get { return invokedSagas; } }
public IEnumerable<OutgoingMessageOperation> OutgoingMessageOperations { get { return outgoingMessageOperations; } }
Expand All @@ -26,9 +26,9 @@ internal HandlerInvocation CaptureInvokedHandler(HandlerInvocation invocation)
return invocation;
}

public void RegisterTimeoutRescheduleRule<TTimeout>(Func<DoNotDeliverBefore, DoNotDeliverBefore> rule)
public void RegisterTimeoutRescheduleRule<TTimeout>(Func<DoNotDeliverBefore, DoNotDeliverBefore> rule)
{
if (timeoutRescheduleRules.ContainsKey(typeof(TTimeout)))
if (timeoutRescheduleRules.ContainsKey(typeof(TTimeout)))
{
throw new NotSupportedException("Only one rule per timeout message type is allowed.");
}
Expand Down Expand Up @@ -63,6 +63,22 @@ public bool SagaWasInvoked<TSaga>() where TSaga : Saga
return InvokedSagas.Any(invocation => invocation.SagaType == typeof(TSaga));
}

public bool MessageWasProcessed<TMessage>()
{
return invokedHandlers.Any(invocation => invocation.MessageType == typeof(TMessage))
|| invokedSagas.Any(invocation => invocation.MessageType == typeof(TMessage));
}

public bool MessageWasProcessedBySaga<TMessage, TSaga>()
{
return invokedSagas.Any(i => i.SagaType == typeof(TSaga) && i.MessageType == typeof(TMessage));
}

public bool MessageWasProcessedByHandler<TMessage, THandler>()
{
return invokedHandlers.Any(i => i.HandlerType == typeof(THandler) && i.MessageType == typeof(TMessage));
}

public bool HasFailedMessages()
{
return !FailedMessages.IsEmpty;
Expand Down
35 changes: 35 additions & 0 deletions src/NServiceBus.IntegrationTesting/InterceptPhysicalDispatch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using NServiceBus.DelayedDelivery;
using NServiceBus.DeliveryConstraints;
using NServiceBus.Pipeline;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace NServiceBus.IntegrationTesting
{
class RescheduleTimeoutsBehavior : Behavior<IOutgoingSendContext>
{
readonly IntegrationScenarioContext integrationContext;

public RescheduleTimeoutsBehavior(IntegrationScenarioContext integrationContext)
{
this.integrationContext = integrationContext;
}

public override Task Invoke(IOutgoingSendContext context, Func<Task> next)
{
if (integrationContext.TryGetTimeoutRescheduleRule(context.Message.MessageType, out Func<DoNotDeliverBefore, DoNotDeliverBefore> rule))
{
var constraints = context.Extensions.Get<List<DeliveryConstraint>>();
var doNotDeliverBefore = constraints.OfType<DoNotDeliverBefore>().SingleOrDefault();

var newDoNotDeliverBefore = rule(doNotDeliverBefore);
constraints.Remove(doNotDeliverBefore);
constraints.Add(newDoNotDeliverBefore);
}

return next();
}
}
}
10 changes: 0 additions & 10 deletions src/NServiceBus.IntegrationTesting/InterceptSendOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,6 @@ public override async Task Invoke(IOutgoingSendContext context, Func<Task> next)
OutgoingMessageOperation outgoingOperation;
if (context.Headers.ContainsKey(Headers.IsSagaTimeoutMessage) && context.Headers[Headers.IsSagaTimeoutMessage] == bool.TrueString)
{
if (integrationContext.TryGetTimeoutRescheduleRule(context.Message.MessageType, out Func<DoNotDeliverBefore, DoNotDeliverBefore> rule))
{
var constraints = context.Extensions.Get<List<DeliveryConstraint>>();
var doNotDeliverBefore = constraints.OfType<DoNotDeliverBefore>().SingleOrDefault();

var newDoNotDeliverBefore = rule(doNotDeliverBefore);
constraints.Remove(doNotDeliverBefore);
constraints.Add(newDoNotDeliverBefore);
}

outgoingOperation = new RequestTimeoutOperation()
{
SagaId = context.Headers[Headers.SagaId],
Expand Down

0 comments on commit 528d86d

Please sign in to comment.