Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update messaging attributes from 1.23 to 1.28 #3765

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ This component adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.h

### Changed

- Update messaging attributes from [v1.23.0](https://github.com/open-telemetry/semantic-conventions/blob/v1.23.0/docs/messaging/messaging-spans.md) to [v1.28.0](https://github.com/open-telemetry/semantic-conventions/blob/v1.28.0/docs/messaging/messaging-spans.md)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add information that it affects Kafka and RabbitMq6 instrumentation.


#### Dependency updates

### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ internal static class KafkaInstrumentation
string? spanName = null;
if (!string.IsNullOrEmpty(consumeResult.Topic))
{
spanName = $"{consumeResult.Topic} {MessagingAttributes.Values.ReceiveOperationName}";
spanName = $"{MessagingAttributes.Values.Kafka.PollOperationName} {consumeResult.Topic}";
}

spanName ??= MessagingAttributes.Values.ReceiveOperationName;
spanName ??= MessagingAttributes.Values.Kafka.PollOperationName;

var activityLinks = propagatedContext.Value.ActivityContext.IsValid()
? new[] { new ActivityLink(propagatedContext.Value.ActivityContext) }
Expand All @@ -41,7 +41,8 @@ internal static class KafkaInstrumentation
{
SetCommonAttributes(
activity,
MessagingAttributes.Values.ReceiveOperationName,
MessagingAttributes.Values.ReceiveOperation,
MessagingAttributes.Values.Kafka.PollOperationName,
consumeResult.Topic,
consumeResult.Partition,
consumeResult.Message?.Key,
Expand All @@ -51,7 +52,7 @@ internal static class KafkaInstrumentation

if (ConsumerCache.TryGet(consumer, out var groupId))
{
activity.SetTag(MessagingAttributes.Keys.Kafka.ConsumerGroupId, groupId);
activity.SetTag(MessagingAttributes.Keys.MessagingConsumerGroupName, groupId);
}
}

Expand All @@ -69,16 +70,17 @@ internal static class KafkaInstrumentation
string? spanName = null;
if (!string.IsNullOrEmpty(partition.Topic))
{
spanName = $"{partition.Topic} {MessagingAttributes.Values.PublishOperationName}";
spanName = $"{MessagingAttributes.Values.SendOperation} {partition.Topic}";
}

spanName ??= MessagingAttributes.Values.PublishOperationName;
spanName ??= MessagingAttributes.Values.SendOperation;
var activity = Source.StartActivity(name: spanName, ActivityKind.Producer);
if (activity is not null && activity.IsAllDataRequested)
{
SetCommonAttributes(
activity,
MessagingAttributes.Values.PublishOperationName,
MessagingAttributes.Values.SendOperation,
MessagingAttributes.Values.SendOperation,
partition.Topic,
partition.Partition,
message.Key,
Expand All @@ -103,7 +105,7 @@ public static void InjectContext<TTopicPartition, TMessage>(TMessage message, Ac
public static void SetDeliveryResults(Activity activity, IDeliveryResult deliveryResult)
{
// Set the final partition message was delivered to.
activity.SetTag(MessagingAttributes.Keys.Kafka.Partition, deliveryResult.Partition.Value);
activity.SetTag(MessagingAttributes.Keys.Partition, deliveryResult.Partition.Value);

activity.SetTag(
MessagingAttributes.Keys.Kafka.PartitionOffset,
Expand All @@ -123,13 +125,15 @@ public static void SetDeliveryResults(Activity activity, IDeliveryResult deliver
private static void SetCommonAttributes(
Activity activity,
string operationName,
string operationType,
string? topic,
Partition? partition,
object? key,
INamedClient? client)
{
activity.SetTag(MessagingAttributes.Keys.MessagingOperation, operationName);
activity.SetTag(MessagingAttributes.Keys.MessagingSystem, MessagingAttributes.Values.KafkaMessagingSystemName);
activity.SetTag(MessagingAttributes.Keys.MessagingOperationName, operationName);
activity.SetTag(MessagingAttributes.Keys.MessagingOperationType, operationType);
activity.SetTag(MessagingAttributes.Keys.MessagingSystem, MessagingAttributes.Values.Kafka.MessagingSystemName);
if (!string.IsNullOrEmpty(topic))
{
activity.SetTag(MessagingAttributes.Keys.DestinationName, topic);
Expand All @@ -151,7 +155,7 @@ private static void SetCommonAttributes(

if (partition is not null)
{
activity.SetTag(MessagingAttributes.Keys.Kafka.Partition, partition.Value.Value);
activity.SetTag(MessagingAttributes.Keys.Partition, partition.Value.Value);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,25 @@

namespace OpenTelemetry.AutoInstrumentation.Instrumentations;

// https://github.com/open-telemetry/semantic-conventions/blob/v1.23.0/docs/messaging/messaging-spans.md#messaging-attributes
// https://github.com/open-telemetry/semantic-conventions/blob/v1.28.0/docs/messaging/messaging-spans.md#messaging-attributes
internal static class MessagingAttributes
{
internal static class Keys
{
public const string MessagingSystem = "messaging.system";
public const string MessagingOperation = "messaging.operation";
public const string MessagingOperationName = "messaging.operation.name";
public const string MessagingOperationType = "messaging.operation.type";
public const string MessagingConsumerGroupName = "messaging.consumer.group.name";
public const string DestinationName = "messaging.destination.name";
public const string ClientId = "messaging.client_id";
public const string MessageBodySize = "messaging.message.body.size";
public const string MessageId = "messaging.message.id";
public const string ConversationId = "messaging.message.conversation_id";
public const string Partition = "messaging.destination.partition.id";

// https://github.com/open-telemetry/semantic-conventions/blob/v1.23.0/docs/messaging/kafka.md#span-attributes
// https://github.com/open-telemetry/semantic-conventions/blob/v1.28.0/docs/messaging/kafka.md#span-attributes
internal static class Kafka
{
public const string ConsumerGroupId = "messaging.kafka.consumer.group";
public const string Partition = "messaging.kafka.destination.partition";
public const string MessageKey = "messaging.kafka.message.key";
public const string PartitionOffset = "messaging.kafka.message.offset";
public const string IsTombstone = "messaging.kafka.message.tombstone";
Expand All @@ -35,15 +36,28 @@ internal static class RabbitMq

internal static class Values
{
public const string KafkaMessagingSystemName = "kafka";
public const string PublishOperationName = "publish";
public const string ReceiveOperationName = "receive";
public const string DeliverOperationName = "deliver";
public const string SendOperation = "send";
public const string ReceiveOperation = "receive";
public const string ProcessOperation = "process";

internal static class Kafka
{
public const string MessagingSystemName = "kafka";

public const string CommitOperationName = "commit";
public const string PublishOperationName = "commit";
public const string PollOperationName = "poll";
public const string ConsumeOperationName = "consume";
}

internal static class RabbitMq
{
public const string MessagingSystemName = "rabbitmq";
public const string DefaultExchangeName = "amq.default";

public const string CommitOperationName = "ack";
public const string PollOperationName = "nack";
public const string ConsumeOperationName = "reject";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ internal static class RabbitMqInstrumentation
var connection = instance.Session?.Connection;
return StartConsume(
result.BasicProperties?.Headers,
MessagingAttributes.Values.ReceiveOperationName,
MessagingAttributes.Values.ReceiveOperation,
MessagingAttributes.Values.ReceiveOperation,
result.Exchange,
result.RoutingKey,
result.DeliveryTag,
Expand Down Expand Up @@ -54,7 +55,8 @@ internal static class RabbitMqInstrumentation

return StartConsume(
headers,
MessagingAttributes.Values.DeliverOperationName,
MessagingAttributes.Values.RabbitMq.ConsumeOperationName,
MessagingAttributes.Values.ProcessOperation,
exchange,
routingKey,
deliveryTag,
Expand All @@ -67,7 +69,7 @@ internal static class RabbitMqInstrumentation
where TBasicProperties : IBasicProperties
where TModel : IModelBase
{
var name = GetActivityName(routingKey, MessagingAttributes.Values.PublishOperationName);
var name = GetActivityName(MessagingAttributes.Values.SendOperation, routingKey);
var activity = Source.StartActivity(name, ActivityKind.Producer);
if (activity is not null && basicProperties.Instance is not null)
{
Expand All @@ -83,7 +85,8 @@ internal static class RabbitMqInstrumentation
exchange,
routingKey,
bodyLength,
MessagingAttributes.Values.PublishOperationName,
MessagingAttributes.Values.SendOperation,
MessagingAttributes.Values.SendOperation,
connection?.Endpoint?.HostName,
connection?.Endpoint?.Port,
connection?.RemoteEndPoint);
Expand All @@ -92,14 +95,15 @@ internal static class RabbitMqInstrumentation
return activity;
}

private static string GetActivityName(string? routingKey, string operationType)
private static string GetActivityName(string operationName, string? routingKey)
{
return string.IsNullOrEmpty(routingKey) ? operationType : $"{routingKey} {operationType}";
return string.IsNullOrEmpty(routingKey) ? operationName : $"{operationName} {routingKey}";
}

private static Activity? StartConsume(
IDictionary<string, object>? headers,
string consumeOperationName,
string consumeOperationType,
string? exchange,
string? routingKey,
ulong deliveryTag,
Expand All @@ -117,7 +121,7 @@ private static string GetActivityName(string? routingKey, string operationType)
? new[] { new ActivityLink(propagatedContext.ActivityContext) }
: Array.Empty<ActivityLink>();

var name = GetActivityName(routingKey, consumeOperationName);
var name = GetActivityName(consumeOperationName, routingKey);
var activity = Source.StartActivity(
name: name,
kind: ActivityKind.Consumer,
Expand All @@ -130,6 +134,7 @@ private static string GetActivityName(string? routingKey, string operationType)
exchange,
routingKey,
consumeOperationName,
consumeOperationType,
deliveryTag,
bodyLength,
messageId,
Expand All @@ -148,6 +153,7 @@ private static void SetCommonTags(
string? routingKey,
int bodyLength,
string operationName,
string operationType,
string? hostName,
int? port,
EndPoint? remoteEndpoint)
Expand All @@ -156,7 +162,8 @@ private static void SetCommonTags(

activity
.SetTag(MessagingAttributes.Keys.MessagingSystem, MessagingAttributes.Values.RabbitMq.MessagingSystemName)
.SetTag(MessagingAttributes.Keys.MessagingOperation, operationName)
.SetTag(MessagingAttributes.Keys.MessagingOperationName, operationName)
.SetTag(MessagingAttributes.Keys.MessagingOperationType, operationType)
.SetTag(MessagingAttributes.Keys.DestinationName, exchange)
.SetTag(MessagingAttributes.Keys.MessageBodySize, bodyLength);

Expand Down Expand Up @@ -204,6 +211,7 @@ private static void SetConsumeTags(
string? exchange,
string? routingKey,
string operationName,
string operationType,
ulong deliveryTag,
int bodyLength,
string? basicPropertiesMessageId,
Expand All @@ -212,7 +220,7 @@ private static void SetConsumeTags(
int? port,
EndPoint? remoteEndpoint)
{
SetCommonTags(activity, exchange, routingKey, bodyLength, operationName, hostName, port, remoteEndpoint);
SetCommonTags(activity, exchange, routingKey, bodyLength, operationName, operationType, hostName, port, remoteEndpoint);
if (!string.IsNullOrEmpty(basicPropertiesMessageId))
{
activity.SetTag(MessagingAttributes.Keys.MessageId, basicPropertiesMessageId);
Expand Down
32 changes: 18 additions & 14 deletions test/IntegrationTests/KafkaTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@ namespace IntegrationTests;
[Collection(KafkaCollection.Name)]
public class KafkaTests : TestHelper
{
private const string MessagingPublishOperationAttributeValue = "publish";
private const string MessagingPublishOperationAttributeValue = "send";
private const string MessagingReceiveOperationAttributeValue = "receive";
private const string MessagingPollOperationAttributeValue = "poll";
private const string MessagingSystemAttributeName = "messaging.system";
private const string MessagingOperationAttributeName = "messaging.operation";
private const string MessagingOperationNameAttributeName = "messaging.operation.name";
private const string MessagingOperationTypeAttributeName = "messaging.operation.type";
private const string MessagingDestinationAttributeName = "messaging.destination.name";
private const string MessagingClientIdAttributeName = "messaging.client_id";
private const string MessagingConsumerGroupNameAttributeName = "messaging.consumer.group.name";

private const string KafkaMessageSystemAttributeValue = "kafka";
private const string KafkaConsumerGroupAttributeName = "messaging.kafka.consumer.group";
private const string KafkaMessageKeyAttributeName = "messaging.kafka.message.key";
private const string KafkaMessageKeyAttributeValue = "testkey";
private const string KafkaDestinationPartitionAttributeName = "messaging.kafka.destination.partition";
Expand Down Expand Up @@ -124,15 +126,15 @@ private static bool ValidateConsumeExceptionSpan(Span span, string topicName)
private static bool ValidateConsumerSpan(Span span, string topicName, int messageOffset, string? expectedMessageKey = KafkaMessageKeyAttributeValue)
{
var kafkaMessageOffset = span.Attributes.SingleOrDefault(kv => kv.Key == KafkaMessageOffsetAttributeName)?.Value.IntValue;
var consumerGroupId = span.Attributes.Single(kv => kv.Key == KafkaConsumerGroupAttributeName).Value.StringValue;
return ValidateCommonAttributes(span.Attributes, topicName, KafkaConsumerClientIdAttributeValue, MessagingReceiveOperationAttributeValue, 0, expectedMessageKey) &&
var consumerGroupId = span.Attributes.Single(kv => kv.Key == MessagingConsumerGroupNameAttributeName).Value.StringValue;
return ValidateCommonAttributes(span.Attributes, topicName, KafkaConsumerClientIdAttributeValue, MessagingReceiveOperationAttributeValue, MessagingPollOperationAttributeValue, 0, expectedMessageKey) &&
kafkaMessageOffset == messageOffset &&
consumerGroupId == GetConsumerGroupIdAttributeValue(topicName);
}

private static bool ValidateBasicProduceExceptionSpan(Span span, string topicName)
{
return ValidateCommonAttributes(span.Attributes, topicName, KafkaProducerClientIdAttributeValue, MessagingPublishOperationAttributeValue, -1, KafkaMessageKeyAttributeValue) &&
return ValidateCommonAttributes(span.Attributes, topicName, KafkaProducerClientIdAttributeValue, MessagingPublishOperationAttributeValue, MessagingPublishOperationAttributeValue, -1, KafkaMessageKeyAttributeValue) &&
span.Status.Code == Status.Types.StatusCode.Error;
}

Expand All @@ -154,38 +156,40 @@ private static bool ValidateProducerSpan(Span span, string topicName, int partit
{
var isTombstone = span.Attributes.Single(kv => kv.Key == KafkaMessageTombstoneAttributeName).Value.BoolValue;

return ValidateCommonAttributes(span.Attributes, topicName, KafkaProducerClientIdAttributeValue, MessagingPublishOperationAttributeValue, partition, KafkaMessageKeyAttributeValue) &&
return ValidateCommonAttributes(span.Attributes, topicName, KafkaProducerClientIdAttributeValue, MessagingPublishOperationAttributeValue, MessagingPublishOperationAttributeValue, partition, KafkaMessageKeyAttributeValue) &&
isTombstone == tombstoneExpected &&
span.Status is null;
}

private static bool ValidateCommonAttributes(IReadOnlyCollection<KeyValue> attributes, string topicName, string clientId, string operationName, int partition, string? expectedMessageKey)
private static bool ValidateCommonAttributes(IReadOnlyCollection<KeyValue> attributes, string topicName, string clientId, string operationName, string operationType, int partition, string? expectedMessageKey)
{
var messagingDestinationName = attributes.SingleOrDefault(kv => kv.Key == MessagingDestinationAttributeName)?.Value.StringValue;
var kafkaMessageKey = attributes.SingleOrDefault(kv => kv.Key == KafkaMessageKeyAttributeName)?.Value.StringValue;
var kafkaPartition = attributes.SingleOrDefault(kv => kv.Key == KafkaDestinationPartitionAttributeName)?.Value.IntValue;

return ValidateBasicSpanAttributes(attributes, clientId, operationName) &&
return ValidateBasicSpanAttributes(attributes, clientId, operationName, operationType) &&
messagingDestinationName == topicName &&
kafkaMessageKey == expectedMessageKey &&
kafkaPartition == partition;
}

private static bool ValidateBasicSpanAttributes(IReadOnlyCollection<KeyValue> attributes, string clientId, string operationName)
private static bool ValidateBasicSpanAttributes(IReadOnlyCollection<KeyValue> attributes, string clientId, string operationName, string operationType)
{
var messagingSystem = attributes.Single(kv => kv.Key == MessagingSystemAttributeName).Value.StringValue;
var messagingOperation = attributes.Single(kv => kv.Key == MessagingOperationAttributeName).Value.StringValue;
var messagingOperationName = attributes.Single(kv => kv.Key == MessagingOperationNameAttributeName).Value.StringValue;
var messagingOperationType = attributes.Single(kv => kv.Key == MessagingOperationTypeAttributeName).Value.StringValue;
var messagingClientId = attributes.Single(kv => kv.Key == MessagingClientIdAttributeName).Value.StringValue;

return messagingSystem == KafkaMessageSystemAttributeValue &&
messagingOperation == operationName &&
messagingOperationName == operationName &&
messagingOperationType == operationType &&
messagingClientId == clientId;
}

private static bool ValidatePropagation(ICollection<MockSpansCollector.Collected> collectedSpans, string topicName)
{
var expectedReceiveOperationName = $"{topicName} {MessagingReceiveOperationAttributeValue}";
var expectedPublishOperationName = $"{topicName} {MessagingPublishOperationAttributeValue}";
var expectedReceiveOperationName = $"{MessagingPollOperationAttributeValue} {topicName}";
var expectedPublishOperationName = $"{MessagingPublishOperationAttributeValue} {topicName}";
var producerSpans = collectedSpans
.Where(span =>
span.Span.Name == expectedPublishOperationName &&
Expand Down
Loading