Skip to content

Commit d32a0e1

Browse files
committed
feat: implement activity tracing
1 parent 1e6f8ab commit d32a0e1

13 files changed

+319
-73
lines changed

projects/Directory.Packages.props

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77
<PackageVersion Include="EasyNetQ.Management.Client" Version="3.0.0" />
88
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
99
<PackageVersion Include="Nullable" Version="1.3.1" />
10-
<PackageVersion Include="OpenTelemetry.Api" Version="1.9.0" />
10+
<PackageVersion Include="OpenTelemetry.Api" Version="1.11.2" />
1111
<PackageVersion Include="OpenTelemetry.Exporter.InMemory" Version="1.9.0" />
1212
<PackageVersion Include="System.Collections.Immutable" Version="8.0.0" />
13+
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="9.0.4" />
1314
<!--
1415
Note: do NOT upgrade the System.IO.Pipelines dependency unless necessary
1516
See https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1481#pullrequestreview-1847905299
@@ -33,7 +34,6 @@
3334
* https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1481#pullrequestreview-1847905299
3435
* https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1594
3536
-->
36-
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="8.0.1" />
3737
<PackageVersion Include="System.Memory" Version="4.5.5" />
3838
<PackageVersion Include="System.Threading.Channels" Version="8.0.0" />
3939
<PackageVersion Include="System.Net.Http.Json" Version="8.0.1" />
@@ -44,4 +44,4 @@
4444
<GlobalPackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
4545
<GlobalPackageReference Include="MinVer" Version="6.0.0" />
4646
</ItemGroup>
47-
</Project>
47+
</Project>

projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050

5151
<ItemGroup>
5252
<PackageReference Include="OpenTelemetry.Api" />
53+
<PackageReference Include="System.Diagnostics.DiagnosticSource" />
5354
</ItemGroup>
5455

5556
<ItemGroup Condition="$(TargetFramework) == 'netstandard2.0'">

projects/RabbitMQ.Client/ConnectionFactory.cs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Diagnostics;
3435
using System.Diagnostics.CodeAnalysis;
3536
using System.Linq;
3637
using System.Net.Security;
@@ -544,24 +545,31 @@ public async Task<IConnection> CreateConnectionAsync(IEndpointResolver endpointR
544545
CancellationToken cancellationToken = default)
545546
{
546547
ConnectionConfig config = CreateConfig(clientProvidedName);
548+
using Activity? connectionActivity = RabbitMQActivitySource.OpenConnection(false);
547549
try
548550
{
549551
if (AutomaticRecoveryEnabled)
550552
{
551-
return await AutorecoveringConnection.CreateAsync(config, endpointResolver, cancellationToken)
553+
connectionActivity?.SetTag("messaging.rabbitmq.connection.automatic_recovery", true);
554+
return await AutorecoveringConnection.CreateAsync(config, endpointResolver, connectionActivity, cancellationToken)
552555
.ConfigureAwait(false);
553556
}
554557
else
555558
{
559+
560+
connectionActivity?.SetTag("messaging.rabbitmq.connection.automatic_recovery", false);
556561
IFrameHandler frameHandler = await endpointResolver.SelectOneAsync(CreateFrameHandlerAsync, cancellationToken)
557562
.ConfigureAwait(false);
563+
connectionActivity.SetNetworkTags(frameHandler);
558564
var c = new Connection(config, frameHandler);
559565
return await c.OpenAsync(cancellationToken)
560566
.ConfigureAwait(false);
561567
}
562568
}
563569
catch (OperationCanceledException ex)
564570
{
571+
connectionActivity?.SetStatus(ActivityStatusCode.Error);
572+
connectionActivity?.AddException(ex);
565573
if (cancellationToken.IsCancellationRequested)
566574
{
567575
throw;
@@ -573,7 +581,10 @@ public async Task<IConnection> CreateConnectionAsync(IEndpointResolver endpointR
573581
}
574582
catch (Exception ex)
575583
{
576-
throw new BrokerUnreachableException(ex);
584+
var brokerUnreachableException = new BrokerUnreachableException(ex);
585+
connectionActivity?.SetStatus(ActivityStatusCode.Error);
586+
connectionActivity?.AddException(brokerUnreachableException);
587+
throw brokerUnreachableException;
577588
}
578589
}
579590

projects/RabbitMQ.Client/IEndpointResolverExtensions.cs

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,49 +41,39 @@ public static class EndpointResolverExtensions
4141
public static async Task<T> SelectOneAsync<T>(this IEndpointResolver resolver,
4242
Func<AmqpTcpEndpoint, CancellationToken, Task<T>> selector, CancellationToken cancellationToken)
4343
{
44-
var t = default(T);
4544
var exceptions = new List<Exception>();
4645
foreach (AmqpTcpEndpoint ep in resolver.All())
4746
{
4847
cancellationToken.ThrowIfCancellationRequested();
48+
using var tcpConnection = RabbitMQActivitySource.OpenTcpConnection();
49+
tcpConnection?.SetServerTags(ep);
4950
try
5051
{
51-
t = await selector(ep, cancellationToken).ConfigureAwait(false);
52-
if (t!.Equals(default(T)) == false)
53-
{
54-
return t;
55-
}
52+
return await selector(ep, cancellationToken).ConfigureAwait(false);
5653
}
5754
catch (OperationCanceledException ex)
5855
{
56+
tcpConnection?.AddException(ex);
5957
if (cancellationToken.IsCancellationRequested)
6058
{
6159
throw;
6260
}
63-
else
64-
{
65-
exceptions.Add(ex);
66-
}
61+
62+
exceptions.Add(ex);
6763
}
6864
catch (Exception e)
6965
{
66+
tcpConnection?.AddException(e);
7067
exceptions.Add(e);
7168
}
7269
}
7370

74-
if (EqualityComparer<T>.Default.Equals(t!, default!))
71+
if (exceptions.Count > 0)
7572
{
76-
if (exceptions.Count > 0)
77-
{
78-
throw new AggregateException(exceptions);
79-
}
80-
else
81-
{
82-
throw new InvalidOperationException(InternalConstants.BugFound);
83-
}
73+
throw new AggregateException(exceptions);
8474
}
8575

86-
return t!;
76+
throw new InvalidOperationException(InternalConstants.BugFound);
8777
}
8878
}
8979
}

projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Diagnostics;
3435
using System.Linq;
3536
using System.Threading;
3637
using System.Threading.Tasks;
@@ -243,13 +244,13 @@ await _innerConnection.AbortAsync(Constants.InternalError, "FailedAutoRecovery",
243244
private async ValueTask<bool> TryRecoverConnectionDelegateAsync(CancellationToken cancellationToken)
244245
{
245246
Connection? maybeNewInnerConnection = null;
247+
using Activity? connectionActivity = RabbitMQActivitySource.OpenConnection(true);
246248
try
247249
{
248250
Connection defunctConnection = _innerConnection;
249-
250251
IFrameHandler fh = await _endpoints.SelectOneAsync(_config.FrameHandlerFactoryAsync, cancellationToken)
251252
.ConfigureAwait(false);
252-
253+
connectionActivity?.SetNetworkTags(fh);
253254
maybeNewInnerConnection = new Connection(_config, fh);
254255

255256
await maybeNewInnerConnection.OpenAsync(cancellationToken)
@@ -267,6 +268,8 @@ await maybeNewInnerConnection.OpenAsync(cancellationToken)
267268
}
268269
catch (Exception e)
269270
{
271+
connectionActivity?.AddException(e);
272+
connectionActivity?.SetStatus(ActivityStatusCode.Error);
270273
ESLog.Error("Connection recovery exception.", e);
271274
// Trigger recovery error events
272275
if (!_connectionRecoveryErrorAsyncWrapper.IsEmpty)

projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Diagnostics;
3435
using System.Diagnostics.CodeAnalysis;
3536
using System.Runtime.CompilerServices;
3637
using System.Threading;
@@ -89,11 +90,12 @@ Task onExceptionAsync(Exception exception, string context, CancellationToken can
8990
_innerConnection.OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken));
9091
}
9192

92-
internal static async ValueTask<AutorecoveringConnection> CreateAsync(ConnectionConfig config, IEndpointResolver endpoints,
93+
internal static async ValueTask<AutorecoveringConnection> CreateAsync(ConnectionConfig config, IEndpointResolver endpoints, Activity? connectionActivity,
9394
CancellationToken cancellationToken)
9495
{
9596
IFrameHandler fh = await endpoints.SelectOneAsync(config.FrameHandlerFactoryAsync, cancellationToken)
9697
.ConfigureAwait(false);
98+
connectionActivity.SetNetworkTags(fh);
9799
Connection innerConnection = new(config, fh);
98100
AutorecoveringConnection connection = new(config, endpoints, innerConnection);
99101
await innerConnection.OpenAsync(cancellationToken)

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
6060

6161
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
6262

63-
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
64-
? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length)
65-
: default;
63+
using Activity? sendActivity = RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length);
6664

6765
ulong publishSequenceNumber = 0;
6866
if (publisherConfirmationInfo is not null)
@@ -115,9 +113,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
115113

116114
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
117115

118-
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
119-
? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length)
120-
: default;
116+
using Activity? sendActivity = RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length);
121117

122118
ulong publishSequenceNumber = 0;
123119
if (publisherConfirmationInfo is not null)

projects/RabbitMQ.Client/Impl/Connection.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,9 @@ internal void TakeOver(Connection other)
228228
internal async ValueTask<IConnection> OpenAsync(CancellationToken cancellationToken)
229229
{
230230
cancellationToken.ThrowIfCancellationRequested();
231-
232231
try
233232
{
234233
RabbitMqClientEventSource.Log.ConnectionOpened();
235-
236234
cancellationToken.ThrowIfCancellationRequested();
237235

238236
// Note: this must happen *after* the frame handler is started
@@ -250,7 +248,7 @@ await _channel0.ConnectionOpenAsync(_config.VirtualHost, cancellationToken)
250248

251249
return this;
252250
}
253-
catch
251+
catch (Exception)
254252
{
255253
try
256254
{

projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs

Lines changed: 36 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,20 @@ public static class RabbitMQActivitySource
4343
private static readonly ActivitySource s_subscriberSource =
4444
new ActivitySource(SubscriberSourceName, AssemblyVersion);
4545

46+
private static readonly ActivitySource s_connectionSource =
47+
new ActivitySource(ConnectionSourceName, AssemblyVersion);
48+
4649
public const string PublisherSourceName = "RabbitMQ.Client.Publisher";
4750
public const string SubscriberSourceName = "RabbitMQ.Client.Subscriber";
51+
public const string ConnectionSourceName = "RabbitMQ.Client.Connection";
4852

49-
public static Action<Activity, IDictionary<string, object?>> ContextInjector { get; set; } = DefaultContextInjector;
53+
public static Action<Activity, IDictionary<string, object?>> ContextInjector { get; set; } =
54+
DefaultContextInjector;
5055

5156
public static Func<IReadOnlyBasicProperties, ActivityContext> ContextExtractor { get; set; } =
5257
DefaultContextExtractor;
5358

5459
public static bool UseRoutingKeyAsOperationName { get; set; } = true;
55-
internal static bool PublisherHasListeners => s_publisherSource.HasListeners();
5660

5761
internal static readonly IEnumerable<KeyValuePair<string, object?>> CreationTags = new[]
5862
{
@@ -61,14 +65,24 @@ public static class RabbitMQActivitySource
6165
new KeyValuePair<string, object?>(ProtocolVersion, "0.9.1")
6266
};
6367

68+
internal static Activity? OpenConnection(bool isReconnection)
69+
{
70+
Activity? connectionActivity =
71+
s_connectionSource.StartRabbitMQActivity("connection attempt", ActivityKind.Client);
72+
connectionActivity?.SetTag("messaging.rabbitmq.connection.is_reconnection", isReconnection);
73+
return connectionActivity;
74+
}
75+
76+
internal static Activity? OpenTcpConnection()
77+
{
78+
Activity? connectionActivity =
79+
s_connectionSource.StartRabbitMQActivity("tcp connection attempt", ActivityKind.Client);
80+
return connectionActivity;
81+
}
82+
6483
internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize,
6584
ActivityContext linkedContext = default)
6685
{
67-
if (!s_publisherSource.HasListeners())
68-
{
69-
return null;
70-
}
71-
7286
Activity? activity = linkedContext == default
7387
? s_publisherSource.StartRabbitMQActivity(
7488
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicPublish} {routingKey}" : MessagingOperationNameBasicPublish,
@@ -82,16 +96,10 @@ public static class RabbitMQActivitySource
8296
}
8397

8498
return activity;
85-
8699
}
87100

88101
internal static Activity? BasicGetEmpty(string queue)
89102
{
90-
if (!s_subscriberSource.HasListeners())
91-
{
92-
return null;
93-
}
94-
95103
Activity? activity = s_subscriberSource.StartRabbitMQActivity(
96104
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicGetEmpty} {queue}" : MessagingOperationNameBasicGetEmpty,
97105
ActivityKind.Consumer);
@@ -109,11 +117,6 @@ public static class RabbitMQActivitySource
109117
internal static Activity? BasicGet(string routingKey, string exchange, ulong deliveryTag,
110118
IReadOnlyBasicProperties readOnlyBasicProperties, int bodySize)
111119
{
112-
if (!s_subscriberSource.HasListeners())
113-
{
114-
return null;
115-
}
116-
117120
// Extract the PropagationContext of the upstream parent from the message headers.
118121
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
119122
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicGet} {routingKey}" : MessagingOperationNameBasicGet, ActivityKind.Consumer,
@@ -130,11 +133,6 @@ public static class RabbitMQActivitySource
130133
internal static Activity? Deliver(string routingKey, string exchange, ulong deliveryTag,
131134
IReadOnlyBasicProperties basicProperties, int bodySize)
132135
{
133-
if (!s_subscriberSource.HasListeners())
134-
{
135-
return null;
136-
}
137-
138136
// Extract the PropagationContext of the upstream parent from the message headers.
139137
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
140138
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicDeliver} {routingKey}" : MessagingOperationNameBasicDeliver,
@@ -197,15 +195,15 @@ private static void PopulateMessagingTags(string operationType, string operation
197195

198196
internal static void PopulateMessageEnvelopeSize(Activity? activity, int size)
199197
{
200-
if (activity != null && activity.IsAllDataRequested && PublisherHasListeners)
198+
if (activity?.IsAllDataRequested ?? false)
201199
{
202200
activity.SetTag(MessagingEnvelopeSize, size);
203201
}
204202
}
205203

206204
internal static void SetNetworkTags(this Activity? activity, IFrameHandler frameHandler)
207205
{
208-
if (PublisherHasListeners && activity != null && activity.IsAllDataRequested)
206+
if (activity?.IsAllDataRequested ?? false)
209207
{
210208
switch (frameHandler.RemoteEndPoint.AddressFamily)
211209
{
@@ -216,15 +214,7 @@ internal static void SetNetworkTags(this Activity? activity, IFrameHandler frame
216214
activity.SetTag("network.type", "ipv4");
217215
break;
218216
}
219-
220-
if (!string.IsNullOrEmpty(frameHandler.Endpoint.HostName))
221-
{
222-
activity
223-
.SetTag("server.address", frameHandler.Endpoint.HostName);
224-
}
225-
226-
activity
227-
.SetTag("server.port", frameHandler.Endpoint.Port);
217+
activity.SetServerTags(frameHandler.Endpoint);
228218

229219
if (frameHandler.RemoteEndPoint is IPEndPoint ipEndpoint)
230220
{
@@ -252,6 +242,18 @@ internal static void SetNetworkTags(this Activity? activity, IFrameHandler frame
252242
}
253243
}
254244

245+
internal static void SetServerTags(this Activity activity, AmqpTcpEndpoint endpoint)
246+
{
247+
if (!string.IsNullOrEmpty(endpoint.HostName))
248+
{
249+
activity
250+
.SetTag("server.address", endpoint.HostName);
251+
}
252+
253+
activity
254+
.SetTag("server.port", endpoint.Port);
255+
}
256+
255257
private static void DefaultContextInjector(Activity sendActivity, IDictionary<string, object?> props)
256258
{
257259
DistributedContextPropagator.Current.Inject(sendActivity, props, DefaultContextSetter);

0 commit comments

Comments
 (0)