Skip to content

Commit

Permalink
Merge pull request #54 from mauroservienti/capture-outgoing-messages
Browse files Browse the repository at this point in the history
Capture outgoing operations
  • Loading branch information
mauroservienti authored Jun 2, 2020
2 parents 2c9d552 + 6973b06 commit e9bfbbd
Show file tree
Hide file tree
Showing 15 changed files with 332 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/MySystem.AcceptanceTests/When_sending_AMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class When_sending_AMessage
public async Task AReplyMessage_is_received_and_ASaga_is_started()
{
var theExpectedSagaId = Guid.NewGuid();
var context = await Scenario.Define<IntegrationContext>()
var context = await Scenario.Define<IntegrationScenarioContext>()
.WithEndpoint<MyServiceEndpoint>(g => g.When(b => b.Send(new AMessage() { ThisWillBeTheSagaId = theExpectedSagaId })))
.WithEndpoint<MyOtherServiceEndpoint>()
.Done(c =>
Expand Down
2 changes: 1 addition & 1 deletion src/MySystem.AcceptanceTests/When_sending_CompleteASaga.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class When_sending_CompleteASaga
public async Task ASaga_is_completed()
{
var theExpectedSagaId = Guid.NewGuid();
var context = await Scenario.Define<IntegrationContext>()
var context = await Scenario.Define<IntegrationScenarioContext>()
.WithEndpoint<MyServiceEndpoint>(g =>
{
g.When(session => session.Send("MyService", new StartASaga() { SomeId = theExpectedSagaId }));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.6.1" />
<PackageReference Include="NETStandard.Library" Version="2.0.3" />
<PackageReference Include="NServiceBus.Testing" Version="7.2.0" />
<PackageReference Include="NUnit" Version="3.12.0" />
<PackageReference Include="NUnit3TestAdapter" Version="3.16.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\MyMessages\MyMessages.csproj" />
<ProjectReference Include="..\NServiceBus.IntegrationTesting\NServiceBus.IntegrationTesting.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using MyMessages.Messages;
using NServiceBus.Pipeline;
using NServiceBus.Testing;
using NUnit.Framework;
using System;
using System.Linq;
using System.Threading.Tasks;

namespace NServiceBus.IntegrationTesting.Tests
{
public class Publish_Operation_Interceptor
{
[Test]
public async Task Should_Capture_Publish_Message_Operation()
{
var scenarioContext = new IntegrationScenarioContext();
var context = new TestableOutgoingPublishContext
{
Message = new OutgoingLogicalMessage(typeof(AMessage), new AMessage())
};

var sut = new InterceptPublishOperations("fake-endpoint", scenarioContext);
await sut.Invoke(context, () => Task.CompletedTask).ConfigureAwait(false);

var operation = scenarioContext.OutgoingMessageOperations.SingleOrDefault() as PublishOperation;

Assert.AreEqual(1, scenarioContext.OutgoingMessageOperations.Count());
Assert.IsNotNull(operation);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using MyMessages.Messages;
using NServiceBus.Pipeline;
using NServiceBus.Testing;
using NUnit.Framework;
using System;
using System.Linq;
using System.Threading.Tasks;

namespace NServiceBus.IntegrationTesting.Tests
{
public class Reply_Operation_Interceptor
{
[Test]
public async Task Should_Capture_Reply_Message_Operation()
{
var scenarioContext = new IntegrationScenarioContext();
var context = new TestableOutgoingReplyContext
{
Message = new OutgoingLogicalMessage(typeof(AMessage), new AMessage())
};

var sut = new InterceptReplyOperations("fake-endpoint", scenarioContext);
await sut.Invoke(context, () => Task.CompletedTask).ConfigureAwait(false);

var operation = scenarioContext.OutgoingMessageOperations.SingleOrDefault() as ReplyOperation;

Assert.AreEqual(1, scenarioContext.OutgoingMessageOperations.Count());
Assert.IsNotNull(operation);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using MyMessages.Messages;
using NServiceBus.Pipeline;
using NServiceBus.Testing;
using NUnit.Framework;
using System;
using System.Linq;
using System.Threading.Tasks;

namespace NServiceBus.IntegrationTesting.Tests
{
public class Send_Operation_Interceptor
{
[Test]
public async Task Should_Capture_Sent_Message_Operation()
{
var scenarioContext = new IntegrationScenarioContext();
var context = new TestableOutgoingSendContext
{
Message = new OutgoingLogicalMessage(typeof(AMessage), new AMessage())
};

var sut = new InterceptSendOperations("fake-endpoint", scenarioContext);
await sut.Invoke(context, () => Task.CompletedTask).ConfigureAwait(false);

var sendOperation = scenarioContext.OutgoingMessageOperations.SingleOrDefault() as SendOperation;

Assert.AreEqual(1, scenarioContext.OutgoingMessageOperations.Count());
Assert.IsNotNull(sendOperation);
}

[Test]
public async Task Should_Capture_TimeoutRequest_Operation()
{
var expectedSagaId = "a-saga-id";
var expectedSagaType = "a-saga-type";

var scenarioContext = new IntegrationScenarioContext();
var context = new TestableOutgoingSendContext
{
Message = new OutgoingLogicalMessage(typeof(AMessage), new AMessage())
};
context.Headers.Add(Headers.SagaId, expectedSagaId);
context.Headers.Add(Headers.SagaType, expectedSagaType);
context.Headers.Add(Headers.IsSagaTimeoutMessage, bool.TrueString);

var sut = new InterceptSendOperations("fake-endpoint", scenarioContext); ;
await sut.Invoke(context, () => Task.CompletedTask).ConfigureAwait(false);

var requestTimeoutOperation = scenarioContext.OutgoingMessageOperations.SingleOrDefault() as RequestTimeoutOperation;

Assert.AreEqual(1, scenarioContext.OutgoingMessageOperations.Count());
Assert.IsNotNull(requestTimeoutOperation);
Assert.AreEqual(expectedSagaId, requestTimeoutOperation.SagaId);
Assert.AreEqual(expectedSagaType, requestTimeoutOperation.SagaTypeAssemblyQualifiedName);
}
}
}
8 changes: 7 additions & 1 deletion src/NServiceBus.IntegrationTesting.sln
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MySystem.AcceptanceTests",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NServiceBus.IntegrationTesting", "NServiceBus.IntegrationTesting\NServiceBus.IntegrationTesting.csproj", "{A1DF8428-D478-4FAA-9237-AA36D4C83732}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NServiceBus.AssemblyScanner.Extensions", "NServiceBus.AssemblyScanner.Extensions\NServiceBus.AssemblyScanner.Extensions.csproj", "{DA921CE7-6B53-4196-8567-B0197E773043}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NServiceBus.AssemblyScanner.Extensions", "NServiceBus.AssemblyScanner.Extensions\NServiceBus.AssemblyScanner.Extensions.csproj", "{DA921CE7-6B53-4196-8567-B0197E773043}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NServiceBus.IntegrationTesting.Tests", "NServiceBus.IntegrationTesting.Tests\NServiceBus.IntegrationTesting.Tests.csproj", "{18A4490B-641E-471C-9A53-02C4AAB69551}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -45,6 +47,10 @@ Global
{DA921CE7-6B53-4196-8567-B0197E773043}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DA921CE7-6B53-4196-8567-B0197E773043}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DA921CE7-6B53-4196-8567-B0197E773043}.Release|Any CPU.Build.0 = Release|Any CPU
{18A4490B-641E-471C-9A53-02C4AAB69551}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{18A4490B-641E-471C-9A53-02C4AAB69551}.Debug|Any CPU.Build.0 = Debug|Any CPU
{18A4490B-641E-471C-9A53-02C4AAB69551}.Release|Any CPU.ActiveCfg = Release|Any CPU
{18A4490B-641E-471C-9A53-02C4AAB69551}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
5 changes: 4 additions & 1 deletion src/NServiceBus.IntegrationTesting/EndpointTemplate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ public async Task<EndpointConfiguration> GetConfiguration(RunDescriptor runDescr

configurationBuilderCustomization(configuration);

configuration.Pipeline.Register(new InterceptInvokedHandlers(endpointCustomizationConfiguration.EndpointName), "Intercept invoked Message Handlers");
configuration.Pipeline.Register(new InterceptInvokedHandlers(endpointCustomizationConfiguration.EndpointName, (IntegrationScenarioContext)runDescriptor.ScenarioContext), "Intercept invoked Message Handlers and Sagas");
configuration.Pipeline.Register(new InterceptSendOperations(endpointCustomizationConfiguration.EndpointName, (IntegrationScenarioContext)runDescriptor.ScenarioContext), "Intercept send operations");
configuration.Pipeline.Register(new InterceptPublishOperations(endpointCustomizationConfiguration.EndpointName, (IntegrationScenarioContext)runDescriptor.ScenarioContext), "Intercept publish operations");
configuration.Pipeline.Register(new InterceptReplyOperations(endpointCustomizationConfiguration.EndpointName, (IntegrationScenarioContext)runDescriptor.ScenarioContext), "Intercept reply operations");

return configuration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@

namespace NServiceBus.IntegrationTesting
{
public class IntegrationContext : ScenarioContext
public class IntegrationScenarioContext : ScenarioContext
{
readonly ConcurrentBag<HandlerInvocation> invokedHandlers = new ConcurrentBag<HandlerInvocation>();
readonly ConcurrentBag<SagaInvocation> invokedSagas = new ConcurrentBag<SagaInvocation>();
readonly ConcurrentBag<OutgoingMessageOperation> outgoingMessageOperations = new ConcurrentBag<OutgoingMessageOperation>();

public IEnumerable<HandlerInvocation> InvokedHandlers { get { return invokedHandlers; } }
public IEnumerable<SagaInvocation> InvokedSagas { get { return invokedSagas; } }
public IEnumerable<OutgoingMessageOperation> OutgoingMessageOperations { get { return outgoingMessageOperations; } }

internal HandlerInvocation CaptureInvokedHandler(HandlerInvocation invocation)
{
Expand All @@ -21,27 +23,16 @@ internal HandlerInvocation CaptureInvokedHandler(HandlerInvocation invocation)
return invocation;
}

internal SagaInvocation CaptureInvokedSaga(SagaInvocation invocation)
{
invokedSagas.Add(invocation);

return invocation;
}

static PropertyInfo GetScenarioContextCurrentProperty()
internal void AddOutogingOperation(OutgoingMessageOperation outgoingMessageOperation)
{
return typeof(ScenarioContext).GetProperty("Current", BindingFlags.Static | BindingFlags.NonPublic);
outgoingMessageOperations.Add(outgoingMessageOperation);
}

public static IntegrationContext CurrentContext
internal SagaInvocation CaptureInvokedSaga(SagaInvocation invocation)
{
get
{
var pi = GetScenarioContextCurrentProperty();
var current = (IntegrationContext)pi.GetMethod.Invoke(null, null);
invokedSagas.Add(invocation);

return current;
}
return invocation;
}

public bool HandlerWasInvoked<THandler>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ namespace NServiceBus.IntegrationTesting
class InterceptInvokedHandlers : Behavior<IInvokeHandlerContext>
{
readonly string endpointName;
readonly IntegrationScenarioContext integrationContext;

public InterceptInvokedHandlers(string endpointName)
public InterceptInvokedHandlers(string endpointName, IntegrationScenarioContext integrationContext)
{
this.endpointName = endpointName;
this.integrationContext = integrationContext;
}

public override async Task Invoke(IInvokeHandlerContext context, Func<Task> next)
Expand All @@ -23,7 +25,7 @@ public override async Task Invoke(IInvokeHandlerContext context, Func<Task> next

if (context.Extensions.TryGet(out ActiveSagaInstance saga))
{
invocation = IntegrationContext.CurrentContext.CaptureInvokedSaga(new SagaInvocation()
invocation = integrationContext.CaptureInvokedSaga(new SagaInvocation()
{
NotFound = saga.NotFound,
SagaType = saga.NotFound ? null : saga.Instance.GetType(),
Expand All @@ -34,7 +36,7 @@ public override async Task Invoke(IInvokeHandlerContext context, Func<Task> next
}
else
{
invocation = IntegrationContext.CurrentContext.CaptureInvokedHandler(new HandlerInvocation()
invocation = integrationContext.CaptureInvokedHandler(new HandlerInvocation()
{
HandlerType = context.MessageHandler.HandlerType
});
Expand Down
41 changes: 41 additions & 0 deletions src/NServiceBus.IntegrationTesting/InterceptPublishOperations.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using NServiceBus.Pipeline;
using System;
using System.Threading.Tasks;

namespace NServiceBus.IntegrationTesting
{
class InterceptPublishOperations : Behavior<IOutgoingPublishContext>
{
readonly string endpointName;
readonly IntegrationScenarioContext integrationContext;

public InterceptPublishOperations(string endpointName, IntegrationScenarioContext integrationContext)
{
this.endpointName = endpointName;
this.integrationContext = integrationContext;
}

public override async Task Invoke(IOutgoingPublishContext context, Func<Task> next)
{
var sendOperation = new PublishOperation
{
SenderEndpoint = endpointName,
MessageId = context.MessageId,
MessageType = context.Message.MessageType,
MessageInstance = context.Message.Instance,
MessageHeaders = context.Headers
};

integrationContext.AddOutogingOperation(sendOperation);
try
{
await next();
}
catch(Exception sendError)
{
sendOperation.OperationError = sendError;
throw;
}
}
}
}
41 changes: 41 additions & 0 deletions src/NServiceBus.IntegrationTesting/InterceptReplyOperations.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using NServiceBus.Pipeline;
using System;
using System.Threading.Tasks;

namespace NServiceBus.IntegrationTesting
{
class InterceptReplyOperations : Behavior<IOutgoingReplyContext>
{
readonly string endpointName;
readonly IntegrationScenarioContext integrationContext;

public InterceptReplyOperations(string endpointName, IntegrationScenarioContext integrationContext)
{
this.endpointName = endpointName;
this.integrationContext = integrationContext;
}

public override async Task Invoke(IOutgoingReplyContext context, Func<Task> next)
{
var sendOperation = new ReplyOperation
{
SenderEndpoint = endpointName,
MessageId = context.MessageId,
MessageType = context.Message.MessageType,
MessageInstance = context.Message.Instance,
MessageHeaders = context.Headers
};

integrationContext.AddOutogingOperation(sendOperation);
try
{
await next();
}
catch(Exception sendError)
{
sendOperation.OperationError = sendError;
throw;
}
}
}
}
Loading

0 comments on commit e9bfbbd

Please sign in to comment.