Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats/opentelemetry: separate out interceptors for tracing and metrics #8063

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7ddbe46
replace dial with newclient
janardhankrishna-sai Dec 23, 2024
0486355
Merge branch 'master' of https://github.com/janardhanvissa/grpc-go
janardhankrishna-sai Jan 1, 2025
53fa9cd
Merge branch 'grpc:master' into master
janardhanvissa Jan 8, 2025
4435b8a
Merge branch 'grpc:master' into master
janardhanvissa Jan 13, 2025
a413555
Merge branch 'grpc:master' into master
janardhanvissa Jan 20, 2025
4e203c3
Merge branch 'grpc:master' into master
janardhanvissa Jan 30, 2025
e9ad552
Merge branch 'grpc:master' into master
janardhanvissa Jan 30, 2025
71804b4
refactor tracing and metrics interceptors separately
janardhankrishna-sai Feb 3, 2025
4b3bd26
adding opentelemetry tracing for client-server
janardhankrishna-sai Jan 30, 2025
69df069
fixing vet issues
janardhankrishna-sai Feb 3, 2025
770f430
reverting newclient changes and fixing vet issues
janardhankrishna-sai Feb 3, 2025
b97a3ca
reverting otel trace for client/server changes
janardhankrishna-sai Feb 3, 2025
c89f3c9
fixing vet issue
janardhankrishna-sai Feb 3, 2025
3f07e48
renaming receiver name
janardhankrishna-sai Feb 5, 2025
5e8a4a5
unused param fix
janardhankrishna-sai Feb 5, 2025
76e422a
fixing vet issue
janardhankrishna-sai Feb 5, 2025
8fa0b03
refactor client interceptor separately for traces and metrics
janardhankrishna-sai Feb 12, 2025
1f41a49
moving tracing code to client_tracing.go file
janardhankrishna-sai Feb 12, 2025
d74c61d
revert previous commit
janardhankrishna-sai Feb 12, 2025
170eef6
adding separate interceptors for traces and metrics of server
janardhankrishna-sai Feb 17, 2025
7f5f539
separating HandleRPC interceptors of traces and metrics
janardhankrishna-sai Feb 17, 2025
50999a0
updating client and server metrics
janardhankrishna-sai Feb 18, 2025
68b8966
removing metrics code from tracingstatshandler and unused parameters
janardhankrishna-sai Feb 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 80 additions & 32 deletions stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
"sync/atomic"
"time"

otelattribute "go.opentelemetry.io/otel/attribute"
otelcodes "go.opentelemetry.io/otel/codes"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
grpccodes "google.golang.org/grpc/codes"
Expand All @@ -30,9 +32,6 @@
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"

otelattribute "go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
)

type clientStatsHandler struct {
Expand All @@ -41,6 +40,13 @@
clientMetrics clientMetrics
}

type clientMetricsStatsHandler struct {
*clientStatsHandler
}
type clientTracingStatsHandler struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new line in between struct declaration

*clientStatsHandler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we extending clientStatsHandler. Let's just keep all stats handler embedding estats.MetricsRecorder. Increasing levels in hierarchy doesn't help in anyway because metrics and traces stats handler doesn't share common functionality.

}

func (h *clientStatsHandler) initializeMetrics() {
// Will set no metrics to record, logically making this stats handler a
// no-op.
Expand Down Expand Up @@ -71,7 +77,7 @@
rm.registerMetrics(metrics, meter)
}

func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
func (h *clientMetricsStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ci := &callInfo{
target: cc.CanonicalTarget(),
method: h.determineMethod(method, opts...),
Expand All @@ -88,12 +94,8 @@
}

startTime := time.Now()
var span trace.Span
if h.options.isTracingEnabled() {
ctx, span = h.createCallTraceSpan(ctx, method)
}
err := invoker(ctx, method, req, reply, cc, opts...)
h.perCallTracesAndMetrics(ctx, err, startTime, ci, span)
h.perCallMetrics(ctx, err, startTime, ci)
return err
}

Expand All @@ -109,7 +111,7 @@
return "other"
}

func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
func (h *clientMetricsStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
ci := &callInfo{
target: cc.CanonicalTarget(),
method: h.determineMethod(method, opts...),
Expand All @@ -126,37 +128,82 @@
}

startTime := time.Now()
var span trace.Span
if h.options.isTracingEnabled() {
ctx, span = h.createCallTraceSpan(ctx, method)
}
callback := func(err error) {
h.perCallTracesAndMetrics(ctx, err, startTime, ci, span)
h.perCallMetrics(ctx, err, startTime, ci)
}
opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...)
return streamer(ctx, desc, cc, method, opts...)
}

// perCallTracesAndMetrics records per call trace spans and metrics.
func (h *clientStatsHandler) perCallTracesAndMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo, ts trace.Span) {
if h.options.isTracingEnabled() {
s := status.Convert(err)
if s.Code() == grpccodes.OK {
ts.SetStatus(otelcodes.Ok, s.Message())
} else {
ts.SetStatus(otelcodes.Error, s.Message())
// perCallMetrics records per call metrics.
func (h *clientMetricsStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) {
callLatency := float64(time.Since(startTime)) / float64(time.Second)
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
otelattribute.String("grpc.status", canonicalString(status.Code(err))),
))
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs)
}

func (h *clientTracingStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ci := &callInfo{
target: cc.CanonicalTarget(),
method: h.determineMethod(method, opts...),
}
ctx = setCallInfo(ctx, ci)
if h.options.MetricsOptions.pluginOption != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this metadata part is not applicable for tracing. It should only be in metrics interceptor. Please remove

md := h.options.MetricsOptions.pluginOption.GetMetadata()
for k, vs := range md {
for _, v := range vs {
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
}

Check warning on line 160 in stats/opentelemetry/client_metrics.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/client_metrics.go#L156-L160

Added lines #L156 - L160 were not covered by tests
}
ts.End()
}
if h.options.isMetricsEnabled() {
callLatency := float64(time.Since(startTime)) / float64(time.Second)
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
otelattribute.String("grpc.status", canonicalString(status.Code(err))),
))
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs)

startTime := time.Now()
var span trace.Span
ctx, span = h.createCallTraceSpan(ctx, method)
err := invoker(ctx, method, req, reply, cc, opts...)
h.perCallTraces(ctx, err, startTime, ci, span)
return err
}

func (h *clientTracingStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
ci := &callInfo{
target: cc.CanonicalTarget(),
method: h.determineMethod(method, opts...),
}
ctx = setCallInfo(ctx, ci)

if h.options.MetricsOptions.pluginOption != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same. this metadata part is not applicable for tracing. It should only be in metrics interceptor. Please remove

md := h.options.MetricsOptions.pluginOption.GetMetadata()
for k, vs := range md {
for _, v := range vs {
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
}

Check warning on line 184 in stats/opentelemetry/client_metrics.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/client_metrics.go#L180-L184

Added lines #L180 - L184 were not covered by tests
}
}

startTime := time.Now()
var span trace.Span
ctx, span = h.createCallTraceSpan(ctx, method)
callback := func(err error) {
h.perCallTraces(ctx, err, startTime, ci, span)
}
opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...)
return streamer(ctx, desc, cc, method, opts...)
}

// perCallTraces records per call trace spans.
func (h *clientTracingStatsHandler) perCallTraces(_ context.Context, err error, _ time.Time, _ *callInfo, ts trace.Span) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we don't need time and callInfo, we should not just have them here instead of replacing them with _

s := status.Convert(err)
if s.Code() == grpccodes.OK {
ts.SetStatus(otelcodes.Ok, s.Message())
} else {
ts.SetStatus(otelcodes.Error, s.Message())
}
ts.End()
}

// TagConn exists to satisfy stats.Handler.
Expand Down Expand Up @@ -198,6 +245,7 @@
})
}

// HandleRPC implements per RPC tracing and stats implementation.
func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
ri := getRPCInfo(ctx)
if ri == nil {
Expand Down
24 changes: 22 additions & 2 deletions stats/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,17 @@ type MetricsOptions struct {
func DialOption(o Options) grpc.DialOption {
csh := &clientStatsHandler{options: o}
csh.initializeMetrics()
return joinDialOptions(grpc.WithChainUnaryInterceptor(csh.unaryInterceptor), grpc.WithChainStreamInterceptor(csh.streamInterceptor), grpc.WithStatsHandler(csh))
var interceptors []grpc.DialOption
if o.isMetricsEnabled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think we need to check anything here. We should just always add all the 4 interceptors. We should have initializeMetrics and initialTraces methods of respective stats handler that check and return early if they are not enabled.

Currently, initializeMetrics exist in statsHandler for server and client which does that. We should modify it to be part of metricsStatsHandler and create the similar function for traceStatsHandler

metricsHandler := &clientMetricsStatsHandler{clientStatsHandler: csh}
interceptors = append(interceptors, grpc.WithChainUnaryInterceptor(metricsHandler.unaryInterceptor), grpc.WithChainStreamInterceptor(metricsHandler.streamInterceptor))
}
if o.isTracingEnabled() {
tracingHandler := &clientTracingStatsHandler{clientStatsHandler: csh}
interceptors = append(interceptors, grpc.WithChainUnaryInterceptor(tracingHandler.unaryInterceptor), grpc.WithChainStreamInterceptor(tracingHandler.streamInterceptor))
}
interceptors = append(interceptors, grpc.WithStatsHandler(csh))
return joinDialOptions(interceptors...)
}

var joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption)
Expand All @@ -140,7 +150,17 @@ var joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) g
func ServerOption(o Options) grpc.ServerOption {
ssh := &serverStatsHandler{options: o}
ssh.initializeMetrics()
return joinServerOptions(grpc.ChainUnaryInterceptor(ssh.unaryInterceptor), grpc.ChainStreamInterceptor(ssh.streamInterceptor), grpc.StatsHandler(ssh))
var interceptors []grpc.ServerOption
if o.isMetricsEnabled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as dial option

metricsHandler := &serverMetricsStatsHandler{serverStatsHandler: ssh}
interceptors = append(interceptors, grpc.ChainUnaryInterceptor(metricsHandler.unaryInterceptor), grpc.ChainStreamInterceptor(metricsHandler.streamInterceptor))
}
if o.isTracingEnabled() {
tracingHandler := &serverTracingStatsHandler{serverStatsHandler: ssh}
interceptors = append(interceptors, grpc.ChainUnaryInterceptor(tracingHandler.unaryInterceptor), grpc.ChainStreamInterceptor(tracingHandler.streamInterceptor))
}
interceptors = append(interceptors, grpc.StatsHandler(ssh))
return joinServerOptions(interceptors...)
}

// callInfo is information pertaining to the lifespan of the RPC client side.
Expand Down
44 changes: 34 additions & 10 deletions stats/opentelemetry/server_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@
serverMetrics serverMetrics
}

type serverMetricsStatsHandler struct {
*serverStatsHandler
}
type serverTracingStatsHandler struct {
*serverStatsHandler
}

func (h *serverStatsHandler) initializeMetrics() {
// Will set no metrics to record, logically making this stats handler a
// no-op.
Expand Down Expand Up @@ -211,24 +218,41 @@

// HandleRPC implements per RPC tracing and stats implementation.
func (h *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
if h.options.isMetricsEnabled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should have separate HandleRPC for metrics and trace statsHandlers instead of having common which requires to check what is enabled etc.

metricsHandler := &serverMetricsStatsHandler{serverStatsHandler: h}
metricsHandler.HandleRPC(ctx, rs)
}
if h.options.isTracingEnabled() {
tracingHandler := &serverTracingStatsHandler{serverStatsHandler: h}
tracingHandler.HandleRPC(ctx, rs)
}
}

// HandleRPC implements per RPC stats handling for metrics.
func (h *serverMetricsStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
ri := getRPCInfo(ctx)
if ri == nil {
logger.Error("ctx passed into server side stats handler metrics event handling has no server call data present")
logger.Error("ctx passed into server metrics stats handler metrics event handling has no server call data present")

Check warning on line 235 in stats/opentelemetry/server_metrics.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/server_metrics.go#L235

Added line #L235 was not covered by tests
return
}
if h.options.isTracingEnabled() {
populateSpan(rs, ri.ai)
}
if h.options.isMetricsEnabled() {
h.processRPCData(ctx, rs, ri.ai)
h.processRPCData(ctx, rs, ri.ai)
}

// HandleRPC implements per RPC tracing handling for tracing.
func (h *serverTracingStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
ri := getRPCInfo(ctx)
if ri == nil {
logger.Error("ctx passed into server tracing stats handler tracing event handling has no server call data present")
return

Check warning on line 246 in stats/opentelemetry/server_metrics.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/server_metrics.go#L245-L246

Added lines #L245 - L246 were not covered by tests
}
populateSpan(rs, ri.ai)
}

func (h *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, ai *attemptInfo) {
func (h *serverMetricsStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, ai *attemptInfo) {
switch st := s.(type) {
case *stats.InHeader:
if ai.pluginOptionLabels == nil && h.options.MetricsOptions.pluginOption != nil {
labels := h.options.MetricsOptions.pluginOption.GetLabels(st.Header)
if ai.pluginOptionLabels == nil && h.serverStatsHandler.options.MetricsOptions.pluginOption != nil {
labels := h.serverStatsHandler.options.MetricsOptions.pluginOption.GetLabels(st.Header)
if labels == nil {
labels = map[string]string{} // Shouldn't return a nil map. Make it empty if so to ignore future Get Calls for this Attempt.
}
Expand All @@ -237,7 +261,7 @@
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
otelattribute.String("grpc.method", ai.method),
))
h.serverMetrics.callStarted.Add(ctx, 1, attrs)
h.serverStatsHandler.serverMetrics.callStarted.Add(ctx, 1, attrs)
case *stats.OutPayload:
atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength))
case *stats.InPayload:
Expand Down
Loading