Skip to content

Commit

Permalink
refactor tracing and metrics interceptors separately
Browse files Browse the repository at this point in the history
  • Loading branch information
janardhankrishna-sai committed Feb 3, 2025
1 parent e9ad552 commit 71804b4
Showing 1 changed file with 38 additions and 18 deletions.
56 changes: 38 additions & 18 deletions stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package opentelemetry

import (
"context"
"log"
"sync/atomic"
"time"

Expand All @@ -37,7 +38,11 @@ import (

type clientStatsHandler struct {
estats.MetricsRecorder
options Options
options Options
metrics metricsHandler
}

type metricsHandler struct {
clientMetrics clientMetrics
}

Expand All @@ -58,11 +63,11 @@ func (h *clientStatsHandler) initializeMetrics() {
metrics = DefaultMetrics()
}

h.clientMetrics.attemptStarted = createInt64Counter(metrics.Metrics(), "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started."))
h.clientMetrics.attemptDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
h.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.clientMetrics.callDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
h.metrics.clientMetrics.attemptStarted = createInt64Counter(metrics.Metrics(), "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started."))
h.metrics.clientMetrics.attemptDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
h.metrics.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.metrics.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.metrics.clientMetrics.callDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))

rm := &registryMetrics{
optionalLabels: h.options.MetricsOptions.OptionalLabels,
Expand Down Expand Up @@ -93,7 +98,12 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string
ctx, span = h.createCallTraceSpan(ctx, method)
}
err := invoker(ctx, method, req, reply, cc, opts...)
h.perCallTracesAndMetrics(ctx, err, startTime, ci, span)
if h.options.isMetricsEnabled() {
h.metrics.perCallMetrics(ctx, err, startTime, ci)
}
if h.options.isTracingEnabled() {
h.perCallTraces(ctx, err, startTime, ci, span)
}
return err
}

Expand Down Expand Up @@ -131,15 +141,21 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S
ctx, span = h.createCallTraceSpan(ctx, method)
}
callback := func(err error) {
h.perCallTracesAndMetrics(ctx, err, startTime, ci, span)
if h.options.isMetricsEnabled() {
h.metrics.perCallMetrics(ctx, err, startTime, ci)
}
if h.options.isTracingEnabled() {
h.perCallTraces(ctx, err, startTime, ci, span)
}
}
opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...)
return streamer(ctx, desc, cc, method, opts...)
}

// perCallTracesAndMetrics records per call trace spans and metrics.
func (h *clientStatsHandler) perCallTracesAndMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo, ts trace.Span) {
// perCallTraces records per call trace spans.
func (h *clientStatsHandler) perCallTraces(ctx context.Context, err error, startTime time.Time, ci *callInfo, ts trace.Span) {

Check failure on line 156 in stats/opentelemetry/client_metrics.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.22)

parameter 'startTime' seems to be unused, consider removing or renaming it as _ https://revive.run/r#unused-parameter

Check failure on line 156 in stats/opentelemetry/client_metrics.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.22)

parameter 'ci' seems to be unused, consider removing or renaming it as _ https://revive.run/r#unused-parameter

Check failure on line 156 in stats/opentelemetry/client_metrics.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.22)

parameter 'startTime' seems to be unused, consider removing or renaming it as _ https://revive.run/r#unused-parameter

Check failure on line 156 in stats/opentelemetry/client_metrics.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.22)

parameter 'ci' seems to be unused, consider removing or renaming it as _ https://revive.run/r#unused-parameter
if h.options.isTracingEnabled() {
log.Printf("Tracing call with context: %v", ctx)
s := status.Convert(err)
if s.Code() == grpccodes.OK {
ts.SetStatus(otelcodes.Ok, s.Message())
Expand All @@ -148,14 +164,18 @@ func (h *clientStatsHandler) perCallTracesAndMetrics(ctx context.Context, err er
}
ts.End()
}
if h.options.isMetricsEnabled() {
}

// perCallMetrics records per call metrics.
func (m *metricsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) {
if m.clientMetrics.callDuration != nil {
callLatency := float64(time.Since(startTime)) / float64(time.Second)
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
attrs := otelattribute.NewSet(
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
otelattribute.String("grpc.status", canonicalString(status.Code(err))),
))
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs)
)
m.clientMetrics.callDuration.Record(ctx, callLatency, otelmetric.WithAttributeSet(attrs))
}
}

Expand Down Expand Up @@ -225,7 +245,7 @@ func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCSta
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
))
h.clientMetrics.attemptStarted.Add(ctx, 1, attrs)
h.metrics.clientMetrics.attemptStarted.Add(ctx, 1, attrs)
case *stats.OutPayload:
atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength))
case *stats.InPayload:
Expand Down Expand Up @@ -283,9 +303,9 @@ func (h *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo,

// Allocate vararg slice once.
opts := []otelmetric.RecordOption{otelmetric.WithAttributeSet(otelattribute.NewSet(attributes...))}
h.clientMetrics.attemptDuration.Record(ctx, latency, opts...)
h.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), opts...)
h.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), opts...)
h.metrics.clientMetrics.attemptDuration.Record(ctx, latency, opts...)
h.metrics.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), opts...)
h.metrics.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), opts...)
}

const (
Expand Down

0 comments on commit 71804b4

Please sign in to comment.