Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats/opentelemetry: separate out interceptors for tracing and metrics #8063

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Changes from 16 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7ddbe46
replace dial with newclient
janardhankrishna-sai Dec 23, 2024
0486355
Merge branch 'master' of https://github.com/janardhanvissa/grpc-go
janardhankrishna-sai Jan 1, 2025
53fa9cd
Merge branch 'grpc:master' into master
janardhanvissa Jan 8, 2025
4435b8a
Merge branch 'grpc:master' into master
janardhanvissa Jan 13, 2025
a413555
Merge branch 'grpc:master' into master
janardhanvissa Jan 20, 2025
4e203c3
Merge branch 'grpc:master' into master
janardhanvissa Jan 30, 2025
e9ad552
Merge branch 'grpc:master' into master
janardhanvissa Jan 30, 2025
71804b4
refactor tracing and metrics interceptors separately
janardhankrishna-sai Feb 3, 2025
4b3bd26
adding opentelemetry tracing for client-server
janardhankrishna-sai Jan 30, 2025
69df069
fixing vet issues
janardhankrishna-sai Feb 3, 2025
770f430
reverting newclient changes and fixing vet issues
janardhankrishna-sai Feb 3, 2025
b97a3ca
reverting otel trace for client/server changes
janardhankrishna-sai Feb 3, 2025
c89f3c9
fixing vet issue
janardhankrishna-sai Feb 3, 2025
3f07e48
renaming receiver name
janardhankrishna-sai Feb 5, 2025
5e8a4a5
unused param fix
janardhankrishna-sai Feb 5, 2025
76e422a
fixing vet issue
janardhankrishna-sai Feb 5, 2025
8fa0b03
refactor client interceptor separately for traces and metrics
janardhankrishna-sai Feb 12, 2025
1f41a49
moving tracing code to client_tracing.go file
janardhankrishna-sai Feb 12, 2025
d74c61d
revert previous commit
janardhankrishna-sai Feb 12, 2025
170eef6
adding separate interceptors for traces and metrics of server
janardhankrishna-sai Feb 17, 2025
7f5f539
separating HandleRPC interceptors of traces and metrics
janardhankrishna-sai Feb 17, 2025
50999a0
updating client and server metrics
janardhankrishna-sai Feb 18, 2025
68b8966
removing metrics code from tracingstatshandler and unused parameters
janardhankrishna-sai Feb 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 44 additions & 21 deletions stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ import (

type clientStatsHandler struct {
estats.MetricsRecorder
options Options
options Options
metrics metricsHandler
}

type metricsHandler struct {
clientMetrics clientMetrics
}

Expand All @@ -58,11 +62,11 @@ func (h *clientStatsHandler) initializeMetrics() {
metrics = DefaultMetrics()
}

h.clientMetrics.attemptStarted = createInt64Counter(metrics.Metrics(), "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started."))
h.clientMetrics.attemptDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
h.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.clientMetrics.callDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
h.metrics.clientMetrics.attemptStarted = createInt64Counter(metrics.Metrics(), "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started."))
h.metrics.clientMetrics.attemptDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
h.metrics.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.metrics.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.metrics.clientMetrics.callDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))

rm := &registryMetrics{
optionalLabels: h.options.MetricsOptions.OptionalLabels,
Expand All @@ -78,6 +82,9 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string
}
ctx = setCallInfo(ctx, ci)

metricsEnabled := h.options.isMetricsEnabled()
tracingEnabled := h.options.isTracingEnabled()

if h.options.MetricsOptions.pluginOption != nil {
md := h.options.MetricsOptions.pluginOption.GetMetadata()
for k, vs := range md {
Expand All @@ -89,11 +96,17 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string

startTime := time.Now()
var span trace.Span
if h.options.isTracingEnabled() {
if tracingEnabled {
ctx, span = h.createCallTraceSpan(ctx, method)
}
err := invoker(ctx, method, req, reply, cc, opts...)
h.perCallTracesAndMetrics(ctx, err, startTime, ci, span)

if metricsEnabled {
h.metrics.perCallMetrics(ctx, err, startTime, ci)
}
if tracingEnabled {
h.perCallTraces(ctx, err, startTime, ci, span)
}
return err
}

Expand All @@ -115,7 +128,8 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S
method: h.determineMethod(method, opts...),
}
ctx = setCallInfo(ctx, ci)

metricsEnabled := h.options.isMetricsEnabled()
tracingEnabled := h.options.isTracingEnabled()
if h.options.MetricsOptions.pluginOption != nil {
md := h.options.MetricsOptions.pluginOption.GetMetadata()
for k, vs := range md {
Expand All @@ -127,18 +141,23 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S

startTime := time.Now()
var span trace.Span
if h.options.isTracingEnabled() {
if tracingEnabled {
ctx, span = h.createCallTraceSpan(ctx, method)
}
callback := func(err error) {
h.perCallTracesAndMetrics(ctx, err, startTime, ci, span)
if metricsEnabled {
h.metrics.perCallMetrics(ctx, err, startTime, ci)
}
if tracingEnabled {
h.perCallTraces(ctx, err, startTime, ci, span)
}
}
opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...)
return streamer(ctx, desc, cc, method, opts...)
}

// perCallTracesAndMetrics records per call trace spans and metrics.
func (h *clientStatsHandler) perCallTracesAndMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo, ts trace.Span) {
// perCallTraces records per call trace spans.
func (h *clientStatsHandler) perCallTraces(_ context.Context, err error, _ time.Time, _ *callInfo, ts trace.Span) {
if h.options.isTracingEnabled() {
s := status.Convert(err)
if s.Code() == grpccodes.OK {
Expand All @@ -148,14 +167,18 @@ func (h *clientStatsHandler) perCallTracesAndMetrics(ctx context.Context, err er
}
ts.End()
}
if h.options.isMetricsEnabled() {
}

// perCallMetrics records per call metrics.
func (h *metricsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) {
if h.clientMetrics.callDuration != nil {
callLatency := float64(time.Since(startTime)) / float64(time.Second)
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
attrs := otelattribute.NewSet(
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
otelattribute.String("grpc.status", canonicalString(status.Code(err))),
))
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs)
)
h.clientMetrics.callDuration.Record(ctx, callLatency, otelmetric.WithAttributeSet(attrs))
}
}

Expand Down Expand Up @@ -225,7 +248,7 @@ func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCSta
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
))
h.clientMetrics.attemptStarted.Add(ctx, 1, attrs)
h.metrics.clientMetrics.attemptStarted.Add(ctx, 1, attrs)
case *stats.OutPayload:
atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength))
case *stats.InPayload:
Expand Down Expand Up @@ -283,9 +306,9 @@ func (h *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo,

// Allocate vararg slice once.
opts := []otelmetric.RecordOption{otelmetric.WithAttributeSet(otelattribute.NewSet(attributes...))}
h.clientMetrics.attemptDuration.Record(ctx, latency, opts...)
h.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), opts...)
h.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), opts...)
h.metrics.clientMetrics.attemptDuration.Record(ctx, latency, opts...)
h.metrics.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), opts...)
h.metrics.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), opts...)
}

const (
Expand Down
Loading