From 7ddbe469d36ac42a4c8884de9f50aa674755ea07 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Mon, 23 Dec 2024 07:09:57 +0000 Subject: [PATCH 01/17] replace dial with newclient --- test/goaway_test.go | 5 +++-- test/healthcheck_test.go | 5 +++-- test/resolver_update_test.go | 12 ++++++------ test/xds/xds_client_certificate_providers_test.go | 5 +++-- 4 files changed, 15 insertions(+), 12 deletions(-) diff --git a/test/goaway_test.go b/test/goaway_test.go index 65d2cc02d05a..cdffecb35245 100644 --- a/test/goaway_test.go +++ b/test/goaway_test.go @@ -672,11 +672,12 @@ func (s) TestGoAwayStreamIDSmallerThanCreatedStreams(t *testing.T) { ctCh.Send(ct) }() - cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - t.Fatalf("error dialing: %v", err) + t.Fatalf("NewClient() failed: %v", err) } defer cc.Close() + cc.Connect() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() diff --git a/test/healthcheck_test.go b/test/healthcheck_test.go index fac565240ab7..42fdb4518e42 100644 --- a/test/healthcheck_test.go +++ b/test/healthcheck_test.go @@ -224,10 +224,11 @@ func setupClient(t *testing.T, c *clientConfig) (*grpc.ClientConn, *manual.Resol opts = append(opts, c.extraDialOption...) } - cc, err := grpc.Dial(r.Scheme()+":///test.server", opts...) + cc, err := grpc.NewClient(r.Scheme()+":///test.server", opts...) if err != nil { - t.Fatalf("grpc.Dial() failed: %v", err) + t.Fatalf("grpc.NewClient() failed: %v", err) } + cc.Connect() t.Cleanup(func() { cc.Close() }) return cc, r } diff --git a/test/resolver_update_test.go b/test/resolver_update_test.go index 619979b9b045..d6a7eefb2413 100644 --- a/test/resolver_update_test.go +++ b/test/resolver_update_test.go @@ -112,12 +112,12 @@ func (s) TestResolverUpdateDuringBuild_ServiceConfigInvalidTypeError(t *testing. func (s) TestResolverUpdate_InvalidServiceConfigAsFirstUpdate(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + cc, err := grpc.NewClient(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) if err != nil { - t.Fatalf("Dial(_, _) = _, %v; want _, nil", err) + t.Fatalf("NewClient(_, _) = _, %v; want _, nil", err) } defer cc.Close() - + cc.Connect() scpr := r.CC.ParseServiceConfig("bad json service config") r.UpdateState(resolver.State{ServiceConfig: scpr}) @@ -195,12 +195,12 @@ func (s) TestResolverUpdate_InvalidServiceConfigAfterGoodUpdate(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + cc, err := grpc.NewClient(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) if err != nil { - t.Fatalf("Dial(_, _) = _, %v; want _, nil", err) + t.Fatalf("NewClient(_, _) = _, %v; want _, nil", err) } defer cc.Close() - + cc.Connect() // Push a resolver update and verify that our balancer receives the update. addrs := []resolver.Address{{Addr: backend.Address}} const lbCfg = "wrapping balancer LB policy config" diff --git a/test/xds/xds_client_certificate_providers_test.go b/test/xds/xds_client_certificate_providers_test.go index 03d9f7f19519..af580cb47085 100644 --- a/test/xds/xds_client_certificate_providers_test.go +++ b/test/xds/xds_client_certificate_providers_test.go @@ -167,11 +167,12 @@ func (s) TestClientSideXDS_WithNoCertificateProvidersInBootstrap_Failure(t *test } // Create a ClientConn and ensure that it moves to TRANSIENT_FAILURE. - cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds), grpc.WithResolvers(resolverBuilder)) + cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds), grpc.WithResolvers(resolverBuilder)) if err != nil { - t.Fatalf("failed to dial local test server: %v", err) + t.Fatalf("NewClient() failed: %v", err) } defer cc.Close() + cc.Connect() testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure) // Make an RPC and ensure that expected error is returned. From 71804b46dfa5622ddd5be884587abd3c0ac4d6e3 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Mon, 3 Feb 2025 10:48:12 +0000 Subject: [PATCH 02/17] refactor tracing and metrics interceptors separately --- stats/opentelemetry/client_metrics.go | 56 ++++++++++++++++++--------- 1 file changed, 38 insertions(+), 18 deletions(-) diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index 4fffba60fb33..075574634aed 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -18,6 +18,7 @@ package opentelemetry import ( "context" + "log" "sync/atomic" "time" @@ -37,7 +38,11 @@ import ( type clientStatsHandler struct { estats.MetricsRecorder - options Options + options Options + metrics metricsHandler +} + +type metricsHandler struct { clientMetrics clientMetrics } @@ -58,11 +63,11 @@ func (h *clientStatsHandler) initializeMetrics() { metrics = DefaultMetrics() } - h.clientMetrics.attemptStarted = createInt64Counter(metrics.Metrics(), "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started.")) - h.clientMetrics.attemptDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) - h.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) - h.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) - h.clientMetrics.callDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) + h.metrics.clientMetrics.attemptStarted = createInt64Counter(metrics.Metrics(), "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started.")) + h.metrics.clientMetrics.attemptDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) + h.metrics.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) + h.metrics.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) + h.metrics.clientMetrics.callDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) rm := ®istryMetrics{ optionalLabels: h.options.MetricsOptions.OptionalLabels, @@ -93,7 +98,12 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string ctx, span = h.createCallTraceSpan(ctx, method) } err := invoker(ctx, method, req, reply, cc, opts...) - h.perCallTracesAndMetrics(ctx, err, startTime, ci, span) + if h.options.isMetricsEnabled() { + h.metrics.perCallMetrics(ctx, err, startTime, ci) + } + if h.options.isTracingEnabled() { + h.perCallTraces(ctx, err, startTime, ci, span) + } return err } @@ -131,15 +141,21 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S ctx, span = h.createCallTraceSpan(ctx, method) } callback := func(err error) { - h.perCallTracesAndMetrics(ctx, err, startTime, ci, span) + if h.options.isMetricsEnabled() { + h.metrics.perCallMetrics(ctx, err, startTime, ci) + } + if h.options.isTracingEnabled() { + h.perCallTraces(ctx, err, startTime, ci, span) + } } opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...) return streamer(ctx, desc, cc, method, opts...) } -// perCallTracesAndMetrics records per call trace spans and metrics. -func (h *clientStatsHandler) perCallTracesAndMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo, ts trace.Span) { +// perCallTraces records per call trace spans. +func (h *clientStatsHandler) perCallTraces(ctx context.Context, err error, startTime time.Time, ci *callInfo, ts trace.Span) { if h.options.isTracingEnabled() { + log.Printf("Tracing call with context: %v", ctx) s := status.Convert(err) if s.Code() == grpccodes.OK { ts.SetStatus(otelcodes.Ok, s.Message()) @@ -148,14 +164,18 @@ func (h *clientStatsHandler) perCallTracesAndMetrics(ctx context.Context, err er } ts.End() } - if h.options.isMetricsEnabled() { +} + +// perCallMetrics records per call metrics. +func (m *metricsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) { + if m.clientMetrics.callDuration != nil { callLatency := float64(time.Since(startTime)) / float64(time.Second) - attrs := otelmetric.WithAttributeSet(otelattribute.NewSet( + attrs := otelattribute.NewSet( otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target), otelattribute.String("grpc.status", canonicalString(status.Code(err))), - )) - h.clientMetrics.callDuration.Record(ctx, callLatency, attrs) + ) + m.clientMetrics.callDuration.Record(ctx, callLatency, otelmetric.WithAttributeSet(attrs)) } } @@ -225,7 +245,7 @@ func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCSta otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target), )) - h.clientMetrics.attemptStarted.Add(ctx, 1, attrs) + h.metrics.clientMetrics.attemptStarted.Add(ctx, 1, attrs) case *stats.OutPayload: atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength)) case *stats.InPayload: @@ -283,9 +303,9 @@ func (h *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo, // Allocate vararg slice once. opts := []otelmetric.RecordOption{otelmetric.WithAttributeSet(otelattribute.NewSet(attributes...))} - h.clientMetrics.attemptDuration.Record(ctx, latency, opts...) - h.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), opts...) - h.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), opts...) + h.metrics.clientMetrics.attemptDuration.Record(ctx, latency, opts...) + h.metrics.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), opts...) + h.metrics.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), opts...) } const ( From 4b3bd269d80776e4b53566b34ea4299c9c1fe6da Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Thu, 30 Jan 2025 16:46:01 +0000 Subject: [PATCH 03/17] adding opentelemetry tracing for client-server --- .../features/opentelemetry/client/main.go | 63 +++++++++++++++++-- .../features/opentelemetry/server/main.go | 52 ++++++++++++++- 2 files changed, 109 insertions(+), 6 deletions(-) diff --git a/examples/features/opentelemetry/client/main.go b/examples/features/opentelemetry/client/main.go index 6b0ee92eb0da..46aca9897abd 100644 --- a/examples/features/opentelemetry/client/main.go +++ b/examples/features/opentelemetry/client/main.go @@ -33,8 +33,15 @@ import ( "google.golang.org/grpc/stats/opentelemetry" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" ) var ( @@ -42,28 +49,76 @@ var ( prometheusEndpoint = flag.String("prometheus_endpoint", ":9465", "the Prometheus exporter endpoint") ) +// initTracer initializes OpenTelemetry tracing +func initTracer() (*trace.TracerProvider, error) { + exporter, err := stdouttrace.New(stdouttrace.WithPrettyPrint()) + if err != nil { + return nil, fmt.Errorf("failed to create stdouttrace exporter: %w", err) + } + + res, err := resource.New(context.Background(), + resource.WithAttributes(attribute.String("service.name", "grpc-client")), + ) + if err != nil { + return nil, fmt.Errorf("failed to create resource: %w", err) + } + + tp := trace.NewTracerProvider( + trace.WithSpanProcessor(trace.NewSimpleSpanProcessor(exporter)), + trace.WithResource(res), + ) + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.TraceContext{}) + return tp, nil +} + func main() { + flag.Parse() + exporter, err := prometheus.New() if err != nil { log.Fatalf("Failed to start prometheus exporter: %v", err) } provider := metric.NewMeterProvider(metric.WithReader(exporter)) - go http.ListenAndServe(*prometheusEndpoint, promhttp.Handler()) + go func() { + log.Printf("Starting Prometheus metrics server at %s\n", *prometheusEndpoint) + if err := http.ListenAndServe(*prometheusEndpoint, promhttp.Handler()); err != nil { + log.Fatalf("Failed to start Prometheus server: %v", err) + } + }() ctx := context.Background() + // Initialize tracing + tracerProvider, err := initTracer() + if err != nil { + log.Fatalf("Error setting up tracing: %v", err) + } + defer func() { _ = tracerProvider.Shutdown(context.Background()) }() + do := opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: opentelemetry.MetricsOptions{MeterProvider: provider}}) - cc, err := grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()), do) + cc, err := grpc.NewClient(*addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + do, + grpc.WithStatsHandler(otelgrpc.NewClientHandler()), + ) if err != nil { - log.Fatalf("Failed to start NewClient: %v", err) + log.Fatalf("Failed to create a client: %v", err) } defer cc.Close() c := echo.NewEchoClient(cc) + tracer := otel.Tracer("grpc-client") // Make an RPC every second. This should trigger telemetry to be emitted from // the client and the server. for { - r, err := c.UnaryEcho(ctx, &echo.EchoRequest{Message: "this is examples/opentelemetry"}) + reqCtx, cancel := context.WithTimeout(ctx, time.Second) + _, span := tracer.Start(reqCtx, "UnaryEchoClientCall") + + r, err := c.UnaryEcho(reqCtx, &echo.EchoRequest{Message: "this is examples/opentelemetry"}) + span.End() + cancel() + if err != nil { log.Fatalf("UnaryEcho failed: %v", err) } diff --git a/examples/features/opentelemetry/server/main.go b/examples/features/opentelemetry/server/main.go index 2bda00809e47..2e25ae5b58c3 100644 --- a/examples/features/opentelemetry/server/main.go +++ b/examples/features/opentelemetry/server/main.go @@ -32,8 +32,14 @@ import ( "google.golang.org/grpc/stats/opentelemetry" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" ) var ( @@ -46,17 +52,59 @@ type echoServer struct { addr string } -func (s *echoServer) UnaryEcho(_ context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) { +func (s *echoServer) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) { + tracer := otel.Tracer("grpc-server") + _, span := tracer.Start(ctx, "UnaryEcho") + span.SetAttributes(attribute.String("request.message", req.GetMessage())) + defer span.End() + + log.Printf("Received request: %v", req.GetMessage()) return &pb.EchoResponse{Message: fmt.Sprintf("%s (from %s)", req.Message, s.addr)}, nil } +func initTracer() (*trace.TracerProvider, error) { + exporter, err := stdouttrace.New(stdouttrace.WithPrettyPrint()) + if err != nil { + return nil, fmt.Errorf("failed to create stdouttrace exporter: %w", err) + } + + res, err := resource.New(context.Background(), + resource.WithAttributes(attribute.String("service.name", "grpc-server")), + ) + if err != nil { + return nil, fmt.Errorf("failed to create resource: %w", err) + } + + tp := trace.NewTracerProvider( + trace.WithSpanProcessor(trace.NewSimpleSpanProcessor(exporter)), + trace.WithResource(res), + ) + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.TraceContext{}) + return tp, nil +} + func main() { + flag.Parse() + exporter, err := prometheus.New() if err != nil { log.Fatalf("Failed to start prometheus exporter: %v", err) } provider := metric.NewMeterProvider(metric.WithReader(exporter)) - go http.ListenAndServe(*prometheusEndpoint, promhttp.Handler()) + go func() { + log.Printf("Starting Prometheus metrics server at %s\n", *prometheusEndpoint) + if err := http.ListenAndServe(*prometheusEndpoint, promhttp.Handler()); err != nil { + log.Fatalf("Failed to start Prometheus server: %v", err) + } + }() + + // Initialize tracing + tracerProvider, err := initTracer() + if err != nil { + log.Fatalf("Error setting up tracing: %v", err) + } + defer func() { _ = tracerProvider.Shutdown(context.Background()) }() so := opentelemetry.ServerOption(opentelemetry.Options{MetricsOptions: opentelemetry.MetricsOptions{MeterProvider: provider}}) From 69df069d764c961a1095371753e555a080365f46 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Mon, 3 Feb 2025 11:25:00 +0000 Subject: [PATCH 04/17] fixing vet issues --- examples/go.mod | 7 ++++--- examples/go.sum | 2 ++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/examples/go.mod b/examples/go.mod index 15b8cec6b2a2..6e16ec011de6 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -5,7 +5,11 @@ go 1.22.0 require ( github.com/cncf/xds/go v0.0.0-20241223141626-cff3c89139a3 github.com/prometheus/client_golang v1.20.5 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0 + go.opentelemetry.io/otel v1.34.0 go.opentelemetry.io/otel/exporters/prometheus v0.56.0 + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0 + go.opentelemetry.io/otel/sdk v1.34.0 go.opentelemetry.io/otel/sdk/metric v1.34.0 golang.org/x/oauth2 v0.25.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f @@ -63,11 +67,8 @@ require ( go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/detectors/gcp v1.34.0 // indirect - go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect - go.opentelemetry.io/otel v1.34.0 // indirect go.opentelemetry.io/otel/metric v1.34.0 // indirect - go.opentelemetry.io/otel/sdk v1.34.0 // indirect go.opentelemetry.io/otel/trace v1.34.0 // indirect golang.org/x/crypto v0.32.0 // indirect golang.org/x/net v0.34.0 // indirect diff --git a/examples/go.sum b/examples/go.sum index 53ba900e0ef9..7890779b85d4 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -1138,6 +1138,8 @@ go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= go.opentelemetry.io/otel/exporters/prometheus v0.56.0 h1:GnCIi0QyG0yy2MrJLzVrIM7laaJstj//flf1zEJCG+E= go.opentelemetry.io/otel/exporters/prometheus v0.56.0/go.mod h1:JQcVZtbIIPM+7SWBB+T6FK+xunlyidwLp++fN0sUaOk= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0 h1:jBpDk4HAUsrnVO1FsfCfCOTEc/MkInJmvfCHYLFiT80= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0/go.mod h1:H9LUIM1daaeZaz91vZcfeM0fejXPmgCYE8ZhzqfJuiU= go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= From 770f4304e667359be4f44d1b5c43cbbb0cfa2976 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Mon, 3 Feb 2025 12:23:32 +0000 Subject: [PATCH 05/17] reverting newclient changes and fixing vet issues --- stats/opentelemetry/client_metrics.go | 2 +- test/goaway_test.go | 5 ++--- test/healthcheck_test.go | 5 ++--- test/resolver_update_test.go | 12 ++++++------ test/xds/xds_client_certificate_providers_test.go | 6 +++--- 5 files changed, 14 insertions(+), 16 deletions(-) diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index 075574634aed..83782dfd760e 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -153,7 +153,7 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S } // perCallTraces records per call trace spans. -func (h *clientStatsHandler) perCallTraces(ctx context.Context, err error, startTime time.Time, ci *callInfo, ts trace.Span) { +func (h *clientStatsHandler) perCallTraces(ctx context.Context, err error, _ time.Time, _ *callInfo, ts trace.Span) { if h.options.isTracingEnabled() { log.Printf("Tracing call with context: %v", ctx) s := status.Convert(err) diff --git a/test/goaway_test.go b/test/goaway_test.go index a9285bb514e4..84efde7de278 100644 --- a/test/goaway_test.go +++ b/test/goaway_test.go @@ -674,12 +674,11 @@ func (s) TestGoAwayStreamIDSmallerThanCreatedStreams(t *testing.T) { ctCh.Send(ct) }() - cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - t.Fatalf("NewClient() failed: %v", err) + t.Fatalf("error dialing: %v", err) } defer cc.Close() - cc.Connect() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() diff --git a/test/healthcheck_test.go b/test/healthcheck_test.go index 42fdb4518e42..fac565240ab7 100644 --- a/test/healthcheck_test.go +++ b/test/healthcheck_test.go @@ -224,11 +224,10 @@ func setupClient(t *testing.T, c *clientConfig) (*grpc.ClientConn, *manual.Resol opts = append(opts, c.extraDialOption...) } - cc, err := grpc.NewClient(r.Scheme()+":///test.server", opts...) + cc, err := grpc.Dial(r.Scheme()+":///test.server", opts...) if err != nil { - t.Fatalf("grpc.NewClient() failed: %v", err) + t.Fatalf("grpc.Dial() failed: %v", err) } - cc.Connect() t.Cleanup(func() { cc.Close() }) return cc, r } diff --git a/test/resolver_update_test.go b/test/resolver_update_test.go index d6a7eefb2413..619979b9b045 100644 --- a/test/resolver_update_test.go +++ b/test/resolver_update_test.go @@ -112,12 +112,12 @@ func (s) TestResolverUpdateDuringBuild_ServiceConfigInvalidTypeError(t *testing. func (s) TestResolverUpdate_InvalidServiceConfigAsFirstUpdate(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") - cc, err := grpc.NewClient(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) if err != nil { - t.Fatalf("NewClient(_, _) = _, %v; want _, nil", err) + t.Fatalf("Dial(_, _) = _, %v; want _, nil", err) } defer cc.Close() - cc.Connect() + scpr := r.CC.ParseServiceConfig("bad json service config") r.UpdateState(resolver.State{ServiceConfig: scpr}) @@ -195,12 +195,12 @@ func (s) TestResolverUpdate_InvalidServiceConfigAfterGoodUpdate(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") - cc, err := grpc.NewClient(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) if err != nil { - t.Fatalf("NewClient(_, _) = _, %v; want _, nil", err) + t.Fatalf("Dial(_, _) = _, %v; want _, nil", err) } defer cc.Close() - cc.Connect() + // Push a resolver update and verify that our balancer receives the update. addrs := []resolver.Address{{Addr: backend.Address}} const lbCfg = "wrapping balancer LB policy config" diff --git a/test/xds/xds_client_certificate_providers_test.go b/test/xds/xds_client_certificate_providers_test.go index 1236718955ce..47eda4059757 100644 --- a/test/xds/xds_client_certificate_providers_test.go +++ b/test/xds/xds_client_certificate_providers_test.go @@ -167,12 +167,12 @@ func (s) TestClientSideXDS_WithNoCertificateProvidersInBootstrap_Failure(t *test } // Create a ClientConn and ensure that it moves to TRANSIENT_FAILURE. - cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds), grpc.WithResolvers(resolverBuilder)) + cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds), grpc.WithResolvers(resolverBuilder)) if err != nil { - t.Fatalf("NewClient() failed: %v", err) + t.Fatalf("failed to dial local test server: %v", err) } defer cc.Close() - cc.Connect() + testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure) // Make an RPC and ensure that expected error is returned. From b97a3ca060130e9080bdf055b689a199fc1e3eca Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Mon, 3 Feb 2025 12:27:46 +0000 Subject: [PATCH 06/17] reverting otel trace for client/server changes --- .../features/opentelemetry/client/main.go | 65 ++----------------- .../features/opentelemetry/server/main.go | 54 +-------------- examples/go.mod | 7 +- examples/go.sum | 2 - .../xds_client_certificate_providers_test.go | 1 - 5 files changed, 11 insertions(+), 118 deletions(-) diff --git a/examples/features/opentelemetry/client/main.go b/examples/features/opentelemetry/client/main.go index 46aca9897abd..9f98bbc31beb 100644 --- a/examples/features/opentelemetry/client/main.go +++ b/examples/features/opentelemetry/client/main.go @@ -33,15 +33,8 @@ import ( "google.golang.org/grpc/stats/opentelemetry" "github.com/prometheus/client_golang/prometheus/promhttp" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/prometheus" - "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" - "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" - "go.opentelemetry.io/otel/sdk/trace" ) var ( @@ -49,80 +42,32 @@ var ( prometheusEndpoint = flag.String("prometheus_endpoint", ":9465", "the Prometheus exporter endpoint") ) -// initTracer initializes OpenTelemetry tracing -func initTracer() (*trace.TracerProvider, error) { - exporter, err := stdouttrace.New(stdouttrace.WithPrettyPrint()) - if err != nil { - return nil, fmt.Errorf("failed to create stdouttrace exporter: %w", err) - } - - res, err := resource.New(context.Background(), - resource.WithAttributes(attribute.String("service.name", "grpc-client")), - ) - if err != nil { - return nil, fmt.Errorf("failed to create resource: %w", err) - } - - tp := trace.NewTracerProvider( - trace.WithSpanProcessor(trace.NewSimpleSpanProcessor(exporter)), - trace.WithResource(res), - ) - otel.SetTracerProvider(tp) - otel.SetTextMapPropagator(propagation.TraceContext{}) - return tp, nil -} - func main() { - flag.Parse() - exporter, err := prometheus.New() if err != nil { log.Fatalf("Failed to start prometheus exporter: %v", err) } provider := metric.NewMeterProvider(metric.WithReader(exporter)) - go func() { - log.Printf("Starting Prometheus metrics server at %s\n", *prometheusEndpoint) - if err := http.ListenAndServe(*prometheusEndpoint, promhttp.Handler()); err != nil { - log.Fatalf("Failed to start Prometheus server: %v", err) - } - }() + go http.ListenAndServe(*prometheusEndpoint, promhttp.Handler()) ctx := context.Background() - // Initialize tracing - tracerProvider, err := initTracer() - if err != nil { - log.Fatalf("Error setting up tracing: %v", err) - } - defer func() { _ = tracerProvider.Shutdown(context.Background()) }() - do := opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: opentelemetry.MetricsOptions{MeterProvider: provider}}) - cc, err := grpc.NewClient(*addr, - grpc.WithTransportCredentials(insecure.NewCredentials()), - do, - grpc.WithStatsHandler(otelgrpc.NewClientHandler()), - ) + cc, err := grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()), do) if err != nil { - log.Fatalf("Failed to create a client: %v", err) + log.Fatalf("Failed to start NewClient: %v", err) } defer cc.Close() c := echo.NewEchoClient(cc) - tracer := otel.Tracer("grpc-client") // Make an RPC every second. This should trigger telemetry to be emitted from // the client and the server. for { - reqCtx, cancel := context.WithTimeout(ctx, time.Second) - _, span := tracer.Start(reqCtx, "UnaryEchoClientCall") - - r, err := c.UnaryEcho(reqCtx, &echo.EchoRequest{Message: "this is examples/opentelemetry"}) - span.End() - cancel() - + r, err := c.UnaryEcho(ctx, &echo.EchoRequest{Message: "this is examples/opentelemetry"}) if err != nil { log.Fatalf("UnaryEcho failed: %v", err) } fmt.Println(r) time.Sleep(time.Second) } -} +} \ No newline at end of file diff --git a/examples/features/opentelemetry/server/main.go b/examples/features/opentelemetry/server/main.go index 2e25ae5b58c3..cee36ba98e01 100644 --- a/examples/features/opentelemetry/server/main.go +++ b/examples/features/opentelemetry/server/main.go @@ -32,14 +32,8 @@ import ( "google.golang.org/grpc/stats/opentelemetry" "github.com/prometheus/client_golang/prometheus/promhttp" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/prometheus" - "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" - "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" - "go.opentelemetry.io/otel/sdk/trace" ) var ( @@ -52,59 +46,17 @@ type echoServer struct { addr string } -func (s *echoServer) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) { - tracer := otel.Tracer("grpc-server") - _, span := tracer.Start(ctx, "UnaryEcho") - span.SetAttributes(attribute.String("request.message", req.GetMessage())) - defer span.End() - - log.Printf("Received request: %v", req.GetMessage()) +func (s *echoServer) UnaryEcho(_ context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) { return &pb.EchoResponse{Message: fmt.Sprintf("%s (from %s)", req.Message, s.addr)}, nil } -func initTracer() (*trace.TracerProvider, error) { - exporter, err := stdouttrace.New(stdouttrace.WithPrettyPrint()) - if err != nil { - return nil, fmt.Errorf("failed to create stdouttrace exporter: %w", err) - } - - res, err := resource.New(context.Background(), - resource.WithAttributes(attribute.String("service.name", "grpc-server")), - ) - if err != nil { - return nil, fmt.Errorf("failed to create resource: %w", err) - } - - tp := trace.NewTracerProvider( - trace.WithSpanProcessor(trace.NewSimpleSpanProcessor(exporter)), - trace.WithResource(res), - ) - otel.SetTracerProvider(tp) - otel.SetTextMapPropagator(propagation.TraceContext{}) - return tp, nil -} - func main() { - flag.Parse() - exporter, err := prometheus.New() if err != nil { log.Fatalf("Failed to start prometheus exporter: %v", err) } provider := metric.NewMeterProvider(metric.WithReader(exporter)) - go func() { - log.Printf("Starting Prometheus metrics server at %s\n", *prometheusEndpoint) - if err := http.ListenAndServe(*prometheusEndpoint, promhttp.Handler()); err != nil { - log.Fatalf("Failed to start Prometheus server: %v", err) - } - }() - - // Initialize tracing - tracerProvider, err := initTracer() - if err != nil { - log.Fatalf("Error setting up tracing: %v", err) - } - defer func() { _ = tracerProvider.Shutdown(context.Background()) }() + go http.ListenAndServe(*prometheusEndpoint, promhttp.Handler()) so := opentelemetry.ServerOption(opentelemetry.Options{MetricsOptions: opentelemetry.MetricsOptions{MeterProvider: provider}}) @@ -120,4 +72,4 @@ func main() { if err := s.Serve(lis); err != nil { log.Fatalf("Failed to serve: %v", err) } -} +} \ No newline at end of file diff --git a/examples/go.mod b/examples/go.mod index 6e16ec011de6..15b8cec6b2a2 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -5,11 +5,7 @@ go 1.22.0 require ( github.com/cncf/xds/go v0.0.0-20241223141626-cff3c89139a3 github.com/prometheus/client_golang v1.20.5 - go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0 - go.opentelemetry.io/otel v1.34.0 go.opentelemetry.io/otel/exporters/prometheus v0.56.0 - go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0 - go.opentelemetry.io/otel/sdk v1.34.0 go.opentelemetry.io/otel/sdk/metric v1.34.0 golang.org/x/oauth2 v0.25.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f @@ -67,8 +63,11 @@ require ( go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/detectors/gcp v1.34.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect + go.opentelemetry.io/otel v1.34.0 // indirect go.opentelemetry.io/otel/metric v1.34.0 // indirect + go.opentelemetry.io/otel/sdk v1.34.0 // indirect go.opentelemetry.io/otel/trace v1.34.0 // indirect golang.org/x/crypto v0.32.0 // indirect golang.org/x/net v0.34.0 // indirect diff --git a/examples/go.sum b/examples/go.sum index 7890779b85d4..53ba900e0ef9 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -1138,8 +1138,6 @@ go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= go.opentelemetry.io/otel/exporters/prometheus v0.56.0 h1:GnCIi0QyG0yy2MrJLzVrIM7laaJstj//flf1zEJCG+E= go.opentelemetry.io/otel/exporters/prometheus v0.56.0/go.mod h1:JQcVZtbIIPM+7SWBB+T6FK+xunlyidwLp++fN0sUaOk= -go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0 h1:jBpDk4HAUsrnVO1FsfCfCOTEc/MkInJmvfCHYLFiT80= -go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0/go.mod h1:H9LUIM1daaeZaz91vZcfeM0fejXPmgCYE8ZhzqfJuiU= go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= diff --git a/test/xds/xds_client_certificate_providers_test.go b/test/xds/xds_client_certificate_providers_test.go index 47eda4059757..283f81a4bb7c 100644 --- a/test/xds/xds_client_certificate_providers_test.go +++ b/test/xds/xds_client_certificate_providers_test.go @@ -172,7 +172,6 @@ func (s) TestClientSideXDS_WithNoCertificateProvidersInBootstrap_Failure(t *test t.Fatalf("failed to dial local test server: %v", err) } defer cc.Close() - testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure) // Make an RPC and ensure that expected error is returned. From c89f3c90fea653bf6e4886963ac6bf31e9af763f Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Mon, 3 Feb 2025 12:32:16 +0000 Subject: [PATCH 07/17] fixing vet issue --- examples/features/opentelemetry/client/main.go | 2 +- examples/features/opentelemetry/server/main.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/features/opentelemetry/client/main.go b/examples/features/opentelemetry/client/main.go index 9f98bbc31beb..6b0ee92eb0da 100644 --- a/examples/features/opentelemetry/client/main.go +++ b/examples/features/opentelemetry/client/main.go @@ -70,4 +70,4 @@ func main() { fmt.Println(r) time.Sleep(time.Second) } -} \ No newline at end of file +} diff --git a/examples/features/opentelemetry/server/main.go b/examples/features/opentelemetry/server/main.go index cee36ba98e01..2bda00809e47 100644 --- a/examples/features/opentelemetry/server/main.go +++ b/examples/features/opentelemetry/server/main.go @@ -72,4 +72,4 @@ func main() { if err := s.Serve(lis); err != nil { log.Fatalf("Failed to serve: %v", err) } -} \ No newline at end of file +} From 3f07e48747fd9df32c98ffcc6e52c001ef0ce889 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Wed, 5 Feb 2025 09:56:32 +0000 Subject: [PATCH 08/17] renaming receiver name --- stats/opentelemetry/client_metrics.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index 83782dfd760e..d270b18a1e5e 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -167,15 +167,15 @@ func (h *clientStatsHandler) perCallTraces(ctx context.Context, err error, _ tim } // perCallMetrics records per call metrics. -func (m *metricsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) { - if m.clientMetrics.callDuration != nil { +func (h *metricsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) { + if h.clientMetrics.callDuration != nil { callLatency := float64(time.Since(startTime)) / float64(time.Second) attrs := otelattribute.NewSet( otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target), otelattribute.String("grpc.status", canonicalString(status.Code(err))), ) - m.clientMetrics.callDuration.Record(ctx, callLatency, otelmetric.WithAttributeSet(attrs)) + h.clientMetrics.callDuration.Record(ctx, callLatency, otelmetric.WithAttributeSet(attrs)) } } From 5e8a4a569ae06adbe76b5aa915380ea704d03c41 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Wed, 5 Feb 2025 10:18:37 +0000 Subject: [PATCH 09/17] unused param fix --- stats/opentelemetry/client_metrics.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index d270b18a1e5e..630f8bbf445a 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -18,7 +18,6 @@ package opentelemetry import ( "context" - "log" "sync/atomic" "time" @@ -83,6 +82,9 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string } ctx = setCallInfo(ctx, ci) + metricsEnabled := h.options.isMetricsEnabled() + tracingEnabled := h.options.isTracingEnabled() + if h.options.MetricsOptions.pluginOption != nil { md := h.options.MetricsOptions.pluginOption.GetMetadata() for k, vs := range md { @@ -94,14 +96,16 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string startTime := time.Now() var span trace.Span - if h.options.isTracingEnabled() { + if tracingEnabled { ctx, span = h.createCallTraceSpan(ctx, method) } + err := invoker(ctx, method, req, reply, cc, opts...) - if h.options.isMetricsEnabled() { + + if metricsEnabled { h.metrics.perCallMetrics(ctx, err, startTime, ci) } - if h.options.isTracingEnabled() { + if tracingEnabled { h.perCallTraces(ctx, err, startTime, ci, span) } return err @@ -153,9 +157,8 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S } // perCallTraces records per call trace spans. -func (h *clientStatsHandler) perCallTraces(ctx context.Context, err error, _ time.Time, _ *callInfo, ts trace.Span) { +func (h *clientStatsHandler) perCallTraces(_ context.Context, err error, _ time.Time, ci *callInfo, ts trace.Span) { if h.options.isTracingEnabled() { - log.Printf("Tracing call with context: %v", ctx) s := status.Convert(err) if s.Code() == grpccodes.OK { ts.SetStatus(otelcodes.Ok, s.Message()) From 76e422af7d71989a01fdf1a693f0087a3cf53f40 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Wed, 5 Feb 2025 10:29:31 +0000 Subject: [PATCH 10/17] fixing vet issue --- stats/opentelemetry/client_metrics.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index 630f8bbf445a..a90c5dfa3b12 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -99,7 +99,6 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string if tracingEnabled { ctx, span = h.createCallTraceSpan(ctx, method) } - err := invoker(ctx, method, req, reply, cc, opts...) if metricsEnabled { @@ -129,7 +128,8 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S method: h.determineMethod(method, opts...), } ctx = setCallInfo(ctx, ci) - + metricsEnabled := h.options.isMetricsEnabled() + tracingEnabled := h.options.isTracingEnabled() if h.options.MetricsOptions.pluginOption != nil { md := h.options.MetricsOptions.pluginOption.GetMetadata() for k, vs := range md { @@ -141,14 +141,14 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S startTime := time.Now() var span trace.Span - if h.options.isTracingEnabled() { + if tracingEnabled { ctx, span = h.createCallTraceSpan(ctx, method) } callback := func(err error) { - if h.options.isMetricsEnabled() { + if metricsEnabled { h.metrics.perCallMetrics(ctx, err, startTime, ci) } - if h.options.isTracingEnabled() { + if tracingEnabled { h.perCallTraces(ctx, err, startTime, ci, span) } } @@ -157,7 +157,7 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S } // perCallTraces records per call trace spans. -func (h *clientStatsHandler) perCallTraces(_ context.Context, err error, _ time.Time, ci *callInfo, ts trace.Span) { +func (h *clientStatsHandler) perCallTraces(_ context.Context, err error, _ time.Time, _ *callInfo, ts trace.Span) { if h.options.isTracingEnabled() { s := status.Convert(err) if s.Code() == grpccodes.OK { From 8fa0b03a372247b88e0acbc41a65ff380399a6f0 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Wed, 12 Feb 2025 08:08:42 +0000 Subject: [PATCH 11/17] refactor client interceptor separately for traces and metrics --- stats/opentelemetry/client_metrics.go | 150 +++++++++++++++----------- stats/opentelemetry/opentelemetry.go | 13 ++- 2 files changed, 99 insertions(+), 64 deletions(-) diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index a90c5dfa3b12..5de2053e9361 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -21,7 +21,9 @@ import ( "sync/atomic" "time" + otelattribute "go.opentelemetry.io/otel/attribute" otelcodes "go.opentelemetry.io/otel/codes" + otelmetric "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" grpccodes "google.golang.org/grpc/codes" @@ -30,19 +32,19 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/stats" "google.golang.org/grpc/status" - - otelattribute "go.opentelemetry.io/otel/attribute" - otelmetric "go.opentelemetry.io/otel/metric" ) type clientStatsHandler struct { estats.MetricsRecorder - options Options - metrics metricsHandler + options Options + clientMetrics clientMetrics } -type metricsHandler struct { - clientMetrics clientMetrics +type clientMetricsStatsHandler struct { + *clientStatsHandler +} +type clientTracingStatsHandler struct { + *clientStatsHandler } func (h *clientStatsHandler) initializeMetrics() { @@ -62,11 +64,11 @@ func (h *clientStatsHandler) initializeMetrics() { metrics = DefaultMetrics() } - h.metrics.clientMetrics.attemptStarted = createInt64Counter(metrics.Metrics(), "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started.")) - h.metrics.clientMetrics.attemptDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) - h.metrics.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) - h.metrics.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) - h.metrics.clientMetrics.callDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) + h.clientMetrics.attemptStarted = createInt64Counter(metrics.Metrics(), "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started.")) + h.clientMetrics.attemptDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) + h.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) + h.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) + h.clientMetrics.callDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) rm := ®istryMetrics{ optionalLabels: h.options.MetricsOptions.OptionalLabels, @@ -75,16 +77,13 @@ func (h *clientStatsHandler) initializeMetrics() { rm.registerMetrics(metrics, meter) } -func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { +func (h *clientMetricsStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { ci := &callInfo{ target: cc.CanonicalTarget(), method: h.determineMethod(method, opts...), } ctx = setCallInfo(ctx, ci) - metricsEnabled := h.options.isMetricsEnabled() - tracingEnabled := h.options.isTracingEnabled() - if h.options.MetricsOptions.pluginOption != nil { md := h.options.MetricsOptions.pluginOption.GetMetadata() for k, vs := range md { @@ -95,18 +94,8 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string } startTime := time.Now() - var span trace.Span - if tracingEnabled { - ctx, span = h.createCallTraceSpan(ctx, method) - } err := invoker(ctx, method, req, reply, cc, opts...) - - if metricsEnabled { - h.metrics.perCallMetrics(ctx, err, startTime, ci) - } - if tracingEnabled { - h.perCallTraces(ctx, err, startTime, ci, span) - } + h.perCallMetrics(ctx, err, startTime, ci) return err } @@ -122,14 +111,13 @@ func (h *clientStatsHandler) determineMethod(method string, opts ...grpc.CallOpt return "other" } -func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { +func (h *clientMetricsStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { ci := &callInfo{ target: cc.CanonicalTarget(), method: h.determineMethod(method, opts...), } ctx = setCallInfo(ctx, ci) - metricsEnabled := h.options.isMetricsEnabled() - tracingEnabled := h.options.isTracingEnabled() + if h.options.MetricsOptions.pluginOption != nil { md := h.options.MetricsOptions.pluginOption.GetMetadata() for k, vs := range md { @@ -140,46 +128,82 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S } startTime := time.Now() - var span trace.Span - if tracingEnabled { - ctx, span = h.createCallTraceSpan(ctx, method) - } callback := func(err error) { - if metricsEnabled { - h.metrics.perCallMetrics(ctx, err, startTime, ci) - } - if tracingEnabled { - h.perCallTraces(ctx, err, startTime, ci, span) - } + h.perCallMetrics(ctx, err, startTime, ci) } opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...) return streamer(ctx, desc, cc, method, opts...) } -// perCallTraces records per call trace spans. -func (h *clientStatsHandler) perCallTraces(_ context.Context, err error, _ time.Time, _ *callInfo, ts trace.Span) { - if h.options.isTracingEnabled() { - s := status.Convert(err) - if s.Code() == grpccodes.OK { - ts.SetStatus(otelcodes.Ok, s.Message()) - } else { - ts.SetStatus(otelcodes.Error, s.Message()) +// perCallMetrics records per call metrics. +func (h *clientMetricsStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) { + callLatency := float64(time.Since(startTime)) / float64(time.Second) + attrs := otelmetric.WithAttributeSet(otelattribute.NewSet( + otelattribute.String("grpc.method", ci.method), + otelattribute.String("grpc.target", ci.target), + otelattribute.String("grpc.status", canonicalString(status.Code(err))), + )) + h.clientMetrics.callDuration.Record(ctx, callLatency, attrs) +} + +func (h *clientTracingStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + ci := &callInfo{ + target: cc.CanonicalTarget(), + method: h.determineMethod(method, opts...), + } + ctx = setCallInfo(ctx, ci) + if h.options.MetricsOptions.pluginOption != nil { + md := h.options.MetricsOptions.pluginOption.GetMetadata() + for k, vs := range md { + for _, v := range vs { + ctx = metadata.AppendToOutgoingContext(ctx, k, v) + } } - ts.End() } + + startTime := time.Now() + var span trace.Span + ctx, span = h.createCallTraceSpan(ctx, method) + err := invoker(ctx, method, req, reply, cc, opts...) + h.perCallTraces(ctx, err, startTime, ci, span) + return err } -// perCallMetrics records per call metrics. -func (h *metricsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) { - if h.clientMetrics.callDuration != nil { - callLatency := float64(time.Since(startTime)) / float64(time.Second) - attrs := otelattribute.NewSet( - otelattribute.String("grpc.method", ci.method), - otelattribute.String("grpc.target", ci.target), - otelattribute.String("grpc.status", canonicalString(status.Code(err))), - ) - h.clientMetrics.callDuration.Record(ctx, callLatency, otelmetric.WithAttributeSet(attrs)) +func (h *clientTracingStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + ci := &callInfo{ + target: cc.CanonicalTarget(), + method: h.determineMethod(method, opts...), + } + ctx = setCallInfo(ctx, ci) + + if h.options.MetricsOptions.pluginOption != nil { // Use MetricsOptions Plugin option here, as plugin option is related to metadata and labels. + md := h.options.MetricsOptions.pluginOption.GetMetadata() + for k, vs := range md { + for _, v := range vs { + ctx = metadata.AppendToOutgoingContext(ctx, k, v) // Metadata still needs to be appended to context for tracing to propagate it. + } + } + } + + startTime := time.Now() + var span trace.Span + ctx, span = h.createCallTraceSpan(ctx, method) + callback := func(err error) { + h.perCallTraces(ctx, err, startTime, ci, span) + } + opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...) + return streamer(ctx, desc, cc, method, opts...) +} + +// perCallTraces records per call trace spans and metrics. +func (h *clientTracingStatsHandler) perCallTraces(_ context.Context, err error, _ time.Time, _ *callInfo, ts trace.Span) { + s := status.Convert(err) + if s.Code() == grpccodes.OK { + ts.SetStatus(otelcodes.Ok, s.Message()) + } else { + ts.SetStatus(otelcodes.Error, s.Message()) } + ts.End() } // TagConn exists to satisfy stats.Handler. @@ -248,7 +272,7 @@ func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCSta otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target), )) - h.metrics.clientMetrics.attemptStarted.Add(ctx, 1, attrs) + h.clientMetrics.attemptStarted.Add(ctx, 1, attrs) case *stats.OutPayload: atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength)) case *stats.InPayload: @@ -306,9 +330,9 @@ func (h *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo, // Allocate vararg slice once. opts := []otelmetric.RecordOption{otelmetric.WithAttributeSet(otelattribute.NewSet(attributes...))} - h.metrics.clientMetrics.attemptDuration.Record(ctx, latency, opts...) - h.metrics.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), opts...) - h.metrics.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), opts...) + h.clientMetrics.attemptDuration.Record(ctx, latency, opts...) + h.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), opts...) + h.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), opts...) } const ( diff --git a/stats/opentelemetry/opentelemetry.go b/stats/opentelemetry/opentelemetry.go index d99169e2da67..91c9771a0aee 100644 --- a/stats/opentelemetry/opentelemetry.go +++ b/stats/opentelemetry/opentelemetry.go @@ -120,7 +120,18 @@ type MetricsOptions struct { func DialOption(o Options) grpc.DialOption { csh := &clientStatsHandler{options: o} csh.initializeMetrics() - return joinDialOptions(grpc.WithChainUnaryInterceptor(csh.unaryInterceptor), grpc.WithChainStreamInterceptor(csh.streamInterceptor), grpc.WithStatsHandler(csh)) + var interceptors []grpc.DialOption + if o.isMetricsEnabled() { + metricsHandler := &clientMetricsStatsHandler{clientStatsHandler: csh} + interceptors = append(interceptors, grpc.WithChainUnaryInterceptor(metricsHandler.unaryInterceptor), grpc.WithChainStreamInterceptor(metricsHandler.streamInterceptor)) + } + if o.isTracingEnabled() { + tracingHandler := &clientTracingStatsHandler{clientStatsHandler: csh} + interceptors = append(interceptors, grpc.WithChainUnaryInterceptor(tracingHandler.unaryInterceptor), grpc.WithChainStreamInterceptor(tracingHandler.streamInterceptor)) + } + interceptors = append(interceptors, grpc.WithStatsHandler(csh)) + + return joinDialOptions(interceptors...) } var joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption) From 1f41a490e412724b88cc3aea4923f93ab0834343 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Wed, 12 Feb 2025 09:29:28 +0000 Subject: [PATCH 12/17] moving tracing code to client_tracing.go file --- stats/opentelemetry/client_metrics.go | 66 ------------------------- stats/opentelemetry/client_tracing.go | 70 +++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 66 deletions(-) diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index 5de2053e9361..0234ecdc5b45 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -22,11 +22,8 @@ import ( "time" otelattribute "go.opentelemetry.io/otel/attribute" - otelcodes "go.opentelemetry.io/otel/codes" otelmetric "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" - grpccodes "google.golang.org/grpc/codes" estats "google.golang.org/grpc/experimental/stats" istats "google.golang.org/grpc/internal/stats" "google.golang.org/grpc/metadata" @@ -43,9 +40,6 @@ type clientStatsHandler struct { type clientMetricsStatsHandler struct { *clientStatsHandler } -type clientTracingStatsHandler struct { - *clientStatsHandler -} func (h *clientStatsHandler) initializeMetrics() { // Will set no metrics to record, logically making this stats handler a @@ -146,66 +140,6 @@ func (h *clientMetricsStatsHandler) perCallMetrics(ctx context.Context, err erro h.clientMetrics.callDuration.Record(ctx, callLatency, attrs) } -func (h *clientTracingStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - ci := &callInfo{ - target: cc.CanonicalTarget(), - method: h.determineMethod(method, opts...), - } - ctx = setCallInfo(ctx, ci) - if h.options.MetricsOptions.pluginOption != nil { - md := h.options.MetricsOptions.pluginOption.GetMetadata() - for k, vs := range md { - for _, v := range vs { - ctx = metadata.AppendToOutgoingContext(ctx, k, v) - } - } - } - - startTime := time.Now() - var span trace.Span - ctx, span = h.createCallTraceSpan(ctx, method) - err := invoker(ctx, method, req, reply, cc, opts...) - h.perCallTraces(ctx, err, startTime, ci, span) - return err -} - -func (h *clientTracingStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { - ci := &callInfo{ - target: cc.CanonicalTarget(), - method: h.determineMethod(method, opts...), - } - ctx = setCallInfo(ctx, ci) - - if h.options.MetricsOptions.pluginOption != nil { // Use MetricsOptions Plugin option here, as plugin option is related to metadata and labels. - md := h.options.MetricsOptions.pluginOption.GetMetadata() - for k, vs := range md { - for _, v := range vs { - ctx = metadata.AppendToOutgoingContext(ctx, k, v) // Metadata still needs to be appended to context for tracing to propagate it. - } - } - } - - startTime := time.Now() - var span trace.Span - ctx, span = h.createCallTraceSpan(ctx, method) - callback := func(err error) { - h.perCallTraces(ctx, err, startTime, ci, span) - } - opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...) - return streamer(ctx, desc, cc, method, opts...) -} - -// perCallTraces records per call trace spans and metrics. -func (h *clientTracingStatsHandler) perCallTraces(_ context.Context, err error, _ time.Time, _ *callInfo, ts trace.Span) { - s := status.Convert(err) - if s.Code() == grpccodes.OK { - ts.SetStatus(otelcodes.Ok, s.Message()) - } else { - ts.SetStatus(otelcodes.Error, s.Message()) - } - ts.End() -} - // TagConn exists to satisfy stats.Handler. func (h *clientStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { return ctx diff --git a/stats/opentelemetry/client_tracing.go b/stats/opentelemetry/client_tracing.go index 075f401588a1..db681c0782f1 100644 --- a/stats/opentelemetry/client_tracing.go +++ b/stats/opentelemetry/client_tracing.go @@ -19,12 +19,22 @@ package opentelemetry import ( "context" "strings" + "time" "go.opentelemetry.io/otel" + otelcodes "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc" + grpccodes "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" otelinternaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing" + "google.golang.org/grpc/status" ) +type clientTracingStatsHandler struct { + *clientStatsHandler +} + // traceTagRPC populates provided context with a new span using the // TextMapPropagator supplied in trace options and internal itracing.carrier. // It creates a new outgoing carrier which serializes information about this @@ -52,3 +62,63 @@ func (h *clientStatsHandler) createCallTraceSpan(ctx context.Context, method str ctx, span := tracer.Start(ctx, mn, trace.WithSpanKind(trace.SpanKindClient)) return ctx, span } + +func (h *clientTracingStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + ci := &callInfo{ + target: cc.CanonicalTarget(), + method: h.determineMethod(method, opts...), + } + ctx = setCallInfo(ctx, ci) + if h.options.MetricsOptions.pluginOption != nil { + md := h.options.MetricsOptions.pluginOption.GetMetadata() + for k, vs := range md { + for _, v := range vs { + ctx = metadata.AppendToOutgoingContext(ctx, k, v) + } + } + } + + startTime := time.Now() + var span trace.Span + ctx, span = h.createCallTraceSpan(ctx, method) + err := invoker(ctx, method, req, reply, cc, opts...) + h.perCallTraces(ctx, err, startTime, ci, span) + return err +} + +func (h *clientTracingStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + ci := &callInfo{ + target: cc.CanonicalTarget(), + method: h.determineMethod(method, opts...), + } + ctx = setCallInfo(ctx, ci) + + if h.options.MetricsOptions.pluginOption != nil { + md := h.options.MetricsOptions.pluginOption.GetMetadata() + for k, vs := range md { + for _, v := range vs { + ctx = metadata.AppendToOutgoingContext(ctx, k, v) + } + } + } + + startTime := time.Now() + var span trace.Span + ctx, span = h.createCallTraceSpan(ctx, method) + callback := func(err error) { + h.perCallTraces(ctx, err, startTime, ci, span) + } + opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...) + return streamer(ctx, desc, cc, method, opts...) +} + +// perCallTraces records per call trace spans and metrics. +func (h *clientTracingStatsHandler) perCallTraces(_ context.Context, err error, _ time.Time, _ *callInfo, ts trace.Span) { + s := status.Convert(err) + if s.Code() == grpccodes.OK { + ts.SetStatus(otelcodes.Ok, s.Message()) + } else { + ts.SetStatus(otelcodes.Error, s.Message()) + } + ts.End() +} From d74c61de32ee18a67eb168295a9738fb15655827 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Wed, 12 Feb 2025 09:39:49 +0000 Subject: [PATCH 13/17] revert previous commit --- stats/opentelemetry/client_metrics.go | 66 +++++++++++++++++++++++++ stats/opentelemetry/client_tracing.go | 70 --------------------------- 2 files changed, 66 insertions(+), 70 deletions(-) diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index 0234ecdc5b45..52647ca87e21 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -22,8 +22,11 @@ import ( "time" otelattribute "go.opentelemetry.io/otel/attribute" + otelcodes "go.opentelemetry.io/otel/codes" otelmetric "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" + grpccodes "google.golang.org/grpc/codes" estats "google.golang.org/grpc/experimental/stats" istats "google.golang.org/grpc/internal/stats" "google.golang.org/grpc/metadata" @@ -40,6 +43,9 @@ type clientStatsHandler struct { type clientMetricsStatsHandler struct { *clientStatsHandler } +type clientTracingStatsHandler struct { + *clientStatsHandler +} func (h *clientStatsHandler) initializeMetrics() { // Will set no metrics to record, logically making this stats handler a @@ -140,6 +146,66 @@ func (h *clientMetricsStatsHandler) perCallMetrics(ctx context.Context, err erro h.clientMetrics.callDuration.Record(ctx, callLatency, attrs) } +func (h *clientTracingStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + ci := &callInfo{ + target: cc.CanonicalTarget(), + method: h.determineMethod(method, opts...), + } + ctx = setCallInfo(ctx, ci) + if h.options.MetricsOptions.pluginOption != nil { + md := h.options.MetricsOptions.pluginOption.GetMetadata() + for k, vs := range md { + for _, v := range vs { + ctx = metadata.AppendToOutgoingContext(ctx, k, v) + } + } + } + + startTime := time.Now() + var span trace.Span + ctx, span = h.createCallTraceSpan(ctx, method) + err := invoker(ctx, method, req, reply, cc, opts...) + h.perCallTraces(ctx, err, startTime, ci, span) + return err +} + +func (h *clientTracingStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + ci := &callInfo{ + target: cc.CanonicalTarget(), + method: h.determineMethod(method, opts...), + } + ctx = setCallInfo(ctx, ci) + + if h.options.MetricsOptions.pluginOption != nil { + md := h.options.MetricsOptions.pluginOption.GetMetadata() + for k, vs := range md { + for _, v := range vs { + ctx = metadata.AppendToOutgoingContext(ctx, k, v) + } + } + } + + startTime := time.Now() + var span trace.Span + ctx, span = h.createCallTraceSpan(ctx, method) + callback := func(err error) { + h.perCallTraces(ctx, err, startTime, ci, span) + } + opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...) + return streamer(ctx, desc, cc, method, opts...) +} + +// perCallTraces records per call trace spans and metrics. +func (h *clientTracingStatsHandler) perCallTraces(_ context.Context, err error, _ time.Time, _ *callInfo, ts trace.Span) { + s := status.Convert(err) + if s.Code() == grpccodes.OK { + ts.SetStatus(otelcodes.Ok, s.Message()) + } else { + ts.SetStatus(otelcodes.Error, s.Message()) + } + ts.End() +} + // TagConn exists to satisfy stats.Handler. func (h *clientStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { return ctx diff --git a/stats/opentelemetry/client_tracing.go b/stats/opentelemetry/client_tracing.go index db681c0782f1..075f401588a1 100644 --- a/stats/opentelemetry/client_tracing.go +++ b/stats/opentelemetry/client_tracing.go @@ -19,22 +19,12 @@ package opentelemetry import ( "context" "strings" - "time" "go.opentelemetry.io/otel" - otelcodes "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" - "google.golang.org/grpc" - grpccodes "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" otelinternaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing" - "google.golang.org/grpc/status" ) -type clientTracingStatsHandler struct { - *clientStatsHandler -} - // traceTagRPC populates provided context with a new span using the // TextMapPropagator supplied in trace options and internal itracing.carrier. // It creates a new outgoing carrier which serializes information about this @@ -62,63 +52,3 @@ func (h *clientStatsHandler) createCallTraceSpan(ctx context.Context, method str ctx, span := tracer.Start(ctx, mn, trace.WithSpanKind(trace.SpanKindClient)) return ctx, span } - -func (h *clientTracingStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - ci := &callInfo{ - target: cc.CanonicalTarget(), - method: h.determineMethod(method, opts...), - } - ctx = setCallInfo(ctx, ci) - if h.options.MetricsOptions.pluginOption != nil { - md := h.options.MetricsOptions.pluginOption.GetMetadata() - for k, vs := range md { - for _, v := range vs { - ctx = metadata.AppendToOutgoingContext(ctx, k, v) - } - } - } - - startTime := time.Now() - var span trace.Span - ctx, span = h.createCallTraceSpan(ctx, method) - err := invoker(ctx, method, req, reply, cc, opts...) - h.perCallTraces(ctx, err, startTime, ci, span) - return err -} - -func (h *clientTracingStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { - ci := &callInfo{ - target: cc.CanonicalTarget(), - method: h.determineMethod(method, opts...), - } - ctx = setCallInfo(ctx, ci) - - if h.options.MetricsOptions.pluginOption != nil { - md := h.options.MetricsOptions.pluginOption.GetMetadata() - for k, vs := range md { - for _, v := range vs { - ctx = metadata.AppendToOutgoingContext(ctx, k, v) - } - } - } - - startTime := time.Now() - var span trace.Span - ctx, span = h.createCallTraceSpan(ctx, method) - callback := func(err error) { - h.perCallTraces(ctx, err, startTime, ci, span) - } - opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...) - return streamer(ctx, desc, cc, method, opts...) -} - -// perCallTraces records per call trace spans and metrics. -func (h *clientTracingStatsHandler) perCallTraces(_ context.Context, err error, _ time.Time, _ *callInfo, ts trace.Span) { - s := status.Convert(err) - if s.Code() == grpccodes.OK { - ts.SetStatus(otelcodes.Ok, s.Message()) - } else { - ts.SetStatus(otelcodes.Error, s.Message()) - } - ts.End() -} From 170eef610f50a6d8ff1bb9c90501559cc32b3a45 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Mon, 17 Feb 2025 13:36:57 +0000 Subject: [PATCH 14/17] adding separate interceptors for traces and metrics of server --- stats/opentelemetry/opentelemetry.go | 13 +++++-- stats/opentelemetry/server_metrics.go | 51 ++++++++++++++++++++------- 2 files changed, 50 insertions(+), 14 deletions(-) diff --git a/stats/opentelemetry/opentelemetry.go b/stats/opentelemetry/opentelemetry.go index 91c9771a0aee..2ad1451981c4 100644 --- a/stats/opentelemetry/opentelemetry.go +++ b/stats/opentelemetry/opentelemetry.go @@ -130,7 +130,6 @@ func DialOption(o Options) grpc.DialOption { interceptors = append(interceptors, grpc.WithChainUnaryInterceptor(tracingHandler.unaryInterceptor), grpc.WithChainStreamInterceptor(tracingHandler.streamInterceptor)) } interceptors = append(interceptors, grpc.WithStatsHandler(csh)) - return joinDialOptions(interceptors...) } @@ -151,7 +150,17 @@ var joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) g func ServerOption(o Options) grpc.ServerOption { ssh := &serverStatsHandler{options: o} ssh.initializeMetrics() - return joinServerOptions(grpc.ChainUnaryInterceptor(ssh.unaryInterceptor), grpc.ChainStreamInterceptor(ssh.streamInterceptor), grpc.StatsHandler(ssh)) + var interceptors []grpc.ServerOption + if o.isMetricsEnabled() { + metricsHandler := &serverMetricsStatsHandler{serverStatsHandler: ssh} + interceptors = append(interceptors, grpc.ChainUnaryInterceptor(metricsHandler.unaryInterceptor), grpc.ChainStreamInterceptor(metricsHandler.streamInterceptor)) + } + if o.isTracingEnabled() { + tracingHandler := &serverTracingStatsHandler{serverStatsHandler: ssh} + interceptors = append(interceptors, grpc.ChainUnaryInterceptor(tracingHandler.unaryInterceptor), grpc.ChainStreamInterceptor(tracingHandler.streamInterceptor)) + } + interceptors = append(interceptors, grpc.StatsHandler(ssh)) + return joinServerOptions(interceptors...) } // callInfo is information pertaining to the lifespan of the RPC client side. diff --git a/stats/opentelemetry/server_metrics.go b/stats/opentelemetry/server_metrics.go index da3f60a9ebe5..da5fc0e16f87 100644 --- a/stats/opentelemetry/server_metrics.go +++ b/stats/opentelemetry/server_metrics.go @@ -38,6 +38,13 @@ type serverStatsHandler struct { serverMetrics serverMetrics } +type serverMetricsStatsHandler struct { + *serverStatsHandler +} +type serverTracingStatsHandler struct { + *serverStatsHandler +} + func (h *serverStatsHandler) initializeMetrics() { // Will set no metrics to record, logically making this stats handler a // no-op. @@ -211,24 +218,41 @@ func (h *serverStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) // HandleRPC implements per RPC tracing and stats implementation. func (h *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + if h.options.isMetricsEnabled() { + metricsHandler := &serverMetricsStatsHandler{serverStatsHandler: h} + metricsHandler.HandleRPC(ctx, rs) + } + if h.options.isTracingEnabled() { + tracingHandler := &serverTracingStatsHandler{serverStatsHandler: h} + tracingHandler.HandleRPC(ctx, rs) + } +} + +// HandleRPC implements per RPC stats handling for metrics. +func (h *serverMetricsStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { ri := getRPCInfo(ctx) if ri == nil { - logger.Error("ctx passed into server side stats handler metrics event handling has no server call data present") + logger.Error("ctx passed into server metrics stats handler metrics event handling has no server call data present") return } - if h.options.isTracingEnabled() { - populateSpan(rs, ri.ai) - } - if h.options.isMetricsEnabled() { - h.processRPCData(ctx, rs, ri.ai) + h.processRPCData(ctx, rs, ri.ai) +} + +// HandleRPC implements per RPC tracing handling for tracing. +func (h *serverTracingStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + ri := getRPCInfo(ctx) + if ri == nil { + logger.Error("ctx passed into server tracing stats handler tracing event handling has no server call data present") + return } + populateSpan(rs, ri.ai) } -func (h *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, ai *attemptInfo) { +func (h *serverMetricsStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, ai *attemptInfo) { switch st := s.(type) { case *stats.InHeader: - if ai.pluginOptionLabels == nil && h.options.MetricsOptions.pluginOption != nil { - labels := h.options.MetricsOptions.pluginOption.GetLabels(st.Header) + if ai.pluginOptionLabels == nil && h.serverStatsHandler.options.MetricsOptions.pluginOption != nil { // Use serverStatsHandler to access options + labels := h.serverStatsHandler.options.MetricsOptions.pluginOption.GetLabels(st.Header) // Use serverStatsHandler to access options if labels == nil { labels = map[string]string{} // Shouldn't return a nil map. Make it empty if so to ignore future Get Calls for this Attempt. } @@ -237,18 +261,21 @@ func (h *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStat attrs := otelmetric.WithAttributeSet(otelattribute.NewSet( otelattribute.String("grpc.method", ai.method), )) - h.serverMetrics.callStarted.Add(ctx, 1, attrs) + h.serverStatsHandler.serverMetrics.callStarted.Add(ctx, 1, attrs) // Use serverStatsHandler to access serverMetrics case *stats.OutPayload: atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength)) case *stats.InPayload: atomic.AddInt64(&ai.recvCompressedBytes, int64(st.CompressedLength)) case *stats.End: - h.processRPCEnd(ctx, ai, st) + h.processMetricsRPCEnd(ctx, ai, st) default: } } +func (h *serverMetricsStatsHandler) processMetricsRPCEnd(ctx context.Context, ai *attemptInfo, e *stats.End) { + h.serverStatsHandler.processRPCEndInternal(ctx, ai, e) +} -func (h *serverStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo, e *stats.End) { +func (h *serverStatsHandler) processRPCEndInternal(ctx context.Context, ai *attemptInfo, e *stats.End) { latency := float64(time.Since(ai.startTime)) / float64(time.Second) st := "OK" if e.Error != nil { From 7f5f5394b965aabda3a8a30e8c79b1180e04f620 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Mon, 17 Feb 2025 19:28:51 +0000 Subject: [PATCH 15/17] separating HandleRPC interceptors of traces and metrics --- stats/opentelemetry/client_metrics.go | 30 +++++++++++++++++++++------ stats/opentelemetry/server_metrics.go | 6 +++--- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index 52647ca87e21..b0f15a1f92cb 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -195,7 +195,7 @@ func (h *clientTracingStatsHandler) streamInterceptor(ctx context.Context, desc return streamer(ctx, desc, cc, method, opts...) } -// perCallTraces records per call trace spans and metrics. +// perCallTraces records per call trace spans. func (h *clientTracingStatsHandler) perCallTraces(_ context.Context, err error, _ time.Time, _ *callInfo, ts trace.Span) { s := status.Convert(err) if s.Code() == grpccodes.OK { @@ -245,18 +245,36 @@ func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) }) } +// HandleRPC implements per RPC tracing and stats implementation. func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + if h.options.isMetricsEnabled() { + metricsHandler := &clientMetricsStatsHandler{clientStatsHandler: h} + metricsHandler.HandleRPC(ctx, rs) + } + if h.options.isTracingEnabled() { + tracingHandler := &clientTracingStatsHandler{clientStatsHandler: h} + tracingHandler.HandleRPC(ctx, rs) + } +} + +// HandleRPC implements per RPC stats handling for metrics. +func (h *clientMetricsStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { ri := getRPCInfo(ctx) if ri == nil { logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present") return } - if h.options.isMetricsEnabled() { - h.processRPCEvent(ctx, rs, ri.ai) - } - if h.options.isTracingEnabled() { - populateSpan(rs, ri.ai) + h.processRPCEvent(ctx, rs, ri.ai) +} + +// HandleRPC implements per RPC tracing handling for tracing. +func (h *clientTracingStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + ri := getRPCInfo(ctx) + if ri == nil { + logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present") + return } + populateSpan(rs, ri.ai) } func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, ai *attemptInfo) { diff --git a/stats/opentelemetry/server_metrics.go b/stats/opentelemetry/server_metrics.go index da5fc0e16f87..ce03d8dec53e 100644 --- a/stats/opentelemetry/server_metrics.go +++ b/stats/opentelemetry/server_metrics.go @@ -251,8 +251,8 @@ func (h *serverTracingStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCS func (h *serverMetricsStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, ai *attemptInfo) { switch st := s.(type) { case *stats.InHeader: - if ai.pluginOptionLabels == nil && h.serverStatsHandler.options.MetricsOptions.pluginOption != nil { // Use serverStatsHandler to access options - labels := h.serverStatsHandler.options.MetricsOptions.pluginOption.GetLabels(st.Header) // Use serverStatsHandler to access options + if ai.pluginOptionLabels == nil && h.serverStatsHandler.options.MetricsOptions.pluginOption != nil { + labels := h.serverStatsHandler.options.MetricsOptions.pluginOption.GetLabels(st.Header) if labels == nil { labels = map[string]string{} // Shouldn't return a nil map. Make it empty if so to ignore future Get Calls for this Attempt. } @@ -261,7 +261,7 @@ func (h *serverMetricsStatsHandler) processRPCData(ctx context.Context, s stats. attrs := otelmetric.WithAttributeSet(otelattribute.NewSet( otelattribute.String("grpc.method", ai.method), )) - h.serverStatsHandler.serverMetrics.callStarted.Add(ctx, 1, attrs) // Use serverStatsHandler to access serverMetrics + h.serverStatsHandler.serverMetrics.callStarted.Add(ctx, 1, attrs) case *stats.OutPayload: atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength)) case *stats.InPayload: From 50999a08fbb53a3511566e2f57aea5e2b0ce836d Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Tue, 18 Feb 2025 05:44:51 +0000 Subject: [PATCH 16/17] updating client and server metrics --- stats/opentelemetry/client_metrics.go | 27 +++++---------------------- stats/opentelemetry/server_metrics.go | 7 ++----- 2 files changed, 7 insertions(+), 27 deletions(-) diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index b0f15a1f92cb..98fdf7a93d25 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -247,34 +247,17 @@ func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) // HandleRPC implements per RPC tracing and stats implementation. func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { - if h.options.isMetricsEnabled() { - metricsHandler := &clientMetricsStatsHandler{clientStatsHandler: h} - metricsHandler.HandleRPC(ctx, rs) - } - if h.options.isTracingEnabled() { - tracingHandler := &clientTracingStatsHandler{clientStatsHandler: h} - tracingHandler.HandleRPC(ctx, rs) - } -} - -// HandleRPC implements per RPC stats handling for metrics. -func (h *clientMetricsStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { ri := getRPCInfo(ctx) if ri == nil { logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present") return } - h.processRPCEvent(ctx, rs, ri.ai) -} - -// HandleRPC implements per RPC tracing handling for tracing. -func (h *clientTracingStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { - ri := getRPCInfo(ctx) - if ri == nil { - logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present") - return + if h.options.isMetricsEnabled() { + h.processRPCEvent(ctx, rs, ri.ai) + } + if h.options.isTracingEnabled() { + populateSpan(rs, ri.ai) } - populateSpan(rs, ri.ai) } func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, ai *attemptInfo) { diff --git a/stats/opentelemetry/server_metrics.go b/stats/opentelemetry/server_metrics.go index ce03d8dec53e..34f095edd522 100644 --- a/stats/opentelemetry/server_metrics.go +++ b/stats/opentelemetry/server_metrics.go @@ -267,15 +267,12 @@ func (h *serverMetricsStatsHandler) processRPCData(ctx context.Context, s stats. case *stats.InPayload: atomic.AddInt64(&ai.recvCompressedBytes, int64(st.CompressedLength)) case *stats.End: - h.processMetricsRPCEnd(ctx, ai, st) + h.processRPCEnd(ctx, ai, st) default: } } -func (h *serverMetricsStatsHandler) processMetricsRPCEnd(ctx context.Context, ai *attemptInfo, e *stats.End) { - h.serverStatsHandler.processRPCEndInternal(ctx, ai, e) -} -func (h *serverStatsHandler) processRPCEndInternal(ctx context.Context, ai *attemptInfo, e *stats.End) { +func (h *serverStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo, e *stats.End) { latency := float64(time.Since(ai.startTime)) / float64(time.Second) st := "OK" if e.Error != nil { From 68b896657c39805645744518c4dc539408d69cd9 Mon Sep 17 00:00:00 2001 From: Vissa Janardhan Krishna Sai Date: Wed, 19 Feb 2025 17:42:14 +0000 Subject: [PATCH 17/17] removing metrics code from tracingstatshandler and unused parameters --- stats/opentelemetry/client_metrics.go | 26 ++++---------------------- stats/opentelemetry/server_metrics.go | 1 + 2 files changed, 5 insertions(+), 22 deletions(-) diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index 98fdf7a93d25..c3fe81b5e3e0 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -43,6 +43,7 @@ type clientStatsHandler struct { type clientMetricsStatsHandler struct { *clientStatsHandler } + type clientTracingStatsHandler struct { *clientStatsHandler } @@ -152,20 +153,11 @@ func (h *clientTracingStatsHandler) unaryInterceptor(ctx context.Context, method method: h.determineMethod(method, opts...), } ctx = setCallInfo(ctx, ci) - if h.options.MetricsOptions.pluginOption != nil { - md := h.options.MetricsOptions.pluginOption.GetMetadata() - for k, vs := range md { - for _, v := range vs { - ctx = metadata.AppendToOutgoingContext(ctx, k, v) - } - } - } - startTime := time.Now() var span trace.Span ctx, span = h.createCallTraceSpan(ctx, method) err := invoker(ctx, method, req, reply, cc, opts...) - h.perCallTraces(ctx, err, startTime, ci, span) + h.perCallTraces(err, span) return err } @@ -176,27 +168,17 @@ func (h *clientTracingStatsHandler) streamInterceptor(ctx context.Context, desc } ctx = setCallInfo(ctx, ci) - if h.options.MetricsOptions.pluginOption != nil { - md := h.options.MetricsOptions.pluginOption.GetMetadata() - for k, vs := range md { - for _, v := range vs { - ctx = metadata.AppendToOutgoingContext(ctx, k, v) - } - } - } - - startTime := time.Now() var span trace.Span ctx, span = h.createCallTraceSpan(ctx, method) callback := func(err error) { - h.perCallTraces(ctx, err, startTime, ci, span) + h.perCallTraces(err, span) } opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...) return streamer(ctx, desc, cc, method, opts...) } // perCallTraces records per call trace spans. -func (h *clientTracingStatsHandler) perCallTraces(_ context.Context, err error, _ time.Time, _ *callInfo, ts trace.Span) { +func (h *clientTracingStatsHandler) perCallTraces(err error, ts trace.Span) { s := status.Convert(err) if s.Code() == grpccodes.OK { ts.SetStatus(otelcodes.Ok, s.Message()) diff --git a/stats/opentelemetry/server_metrics.go b/stats/opentelemetry/server_metrics.go index 34f095edd522..cd50f5243ec3 100644 --- a/stats/opentelemetry/server_metrics.go +++ b/stats/opentelemetry/server_metrics.go @@ -41,6 +41,7 @@ type serverStatsHandler struct { type serverMetricsStatsHandler struct { *serverStatsHandler } + type serverTracingStatsHandler struct { *serverStatsHandler }