-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stats/opentelemetry: separate out interceptors for tracing and metrics #8063
base: master
Are you sure you want to change the base?
Changes from 22 commits
7ddbe46
0486355
53fa9cd
4435b8a
a413555
4e203c3
e9ad552
71804b4
4b3bd26
69df069
770f430
b97a3ca
c89f3c9
3f07e48
5e8a4a5
76e422a
8fa0b03
1f41a49
d74c61d
170eef6
7f5f539
50999a0
68b8966
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,7 +21,9 @@ | |
"sync/atomic" | ||
"time" | ||
|
||
otelattribute "go.opentelemetry.io/otel/attribute" | ||
otelcodes "go.opentelemetry.io/otel/codes" | ||
otelmetric "go.opentelemetry.io/otel/metric" | ||
"go.opentelemetry.io/otel/trace" | ||
"google.golang.org/grpc" | ||
grpccodes "google.golang.org/grpc/codes" | ||
|
@@ -30,9 +32,6 @@ | |
"google.golang.org/grpc/metadata" | ||
"google.golang.org/grpc/stats" | ||
"google.golang.org/grpc/status" | ||
|
||
otelattribute "go.opentelemetry.io/otel/attribute" | ||
otelmetric "go.opentelemetry.io/otel/metric" | ||
) | ||
|
||
type clientStatsHandler struct { | ||
|
@@ -41,6 +40,13 @@ | |
clientMetrics clientMetrics | ||
} | ||
|
||
type clientMetricsStatsHandler struct { | ||
*clientStatsHandler | ||
} | ||
type clientTracingStatsHandler struct { | ||
*clientStatsHandler | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are we extending clientStatsHandler. Let's just keep all stats handler embedding |
||
} | ||
|
||
func (h *clientStatsHandler) initializeMetrics() { | ||
// Will set no metrics to record, logically making this stats handler a | ||
// no-op. | ||
|
@@ -71,7 +77,7 @@ | |
rm.registerMetrics(metrics, meter) | ||
} | ||
|
||
func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { | ||
func (h *clientMetricsStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { | ||
ci := &callInfo{ | ||
target: cc.CanonicalTarget(), | ||
method: h.determineMethod(method, opts...), | ||
|
@@ -88,12 +94,8 @@ | |
} | ||
|
||
startTime := time.Now() | ||
var span trace.Span | ||
if h.options.isTracingEnabled() { | ||
ctx, span = h.createCallTraceSpan(ctx, method) | ||
} | ||
err := invoker(ctx, method, req, reply, cc, opts...) | ||
h.perCallTracesAndMetrics(ctx, err, startTime, ci, span) | ||
h.perCallMetrics(ctx, err, startTime, ci) | ||
return err | ||
} | ||
|
||
|
@@ -109,7 +111,7 @@ | |
return "other" | ||
} | ||
|
||
func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { | ||
func (h *clientMetricsStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { | ||
ci := &callInfo{ | ||
target: cc.CanonicalTarget(), | ||
method: h.determineMethod(method, opts...), | ||
|
@@ -126,37 +128,82 @@ | |
} | ||
|
||
startTime := time.Now() | ||
var span trace.Span | ||
if h.options.isTracingEnabled() { | ||
ctx, span = h.createCallTraceSpan(ctx, method) | ||
} | ||
callback := func(err error) { | ||
h.perCallTracesAndMetrics(ctx, err, startTime, ci, span) | ||
h.perCallMetrics(ctx, err, startTime, ci) | ||
} | ||
opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...) | ||
return streamer(ctx, desc, cc, method, opts...) | ||
} | ||
|
||
// perCallTracesAndMetrics records per call trace spans and metrics. | ||
func (h *clientStatsHandler) perCallTracesAndMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo, ts trace.Span) { | ||
if h.options.isTracingEnabled() { | ||
s := status.Convert(err) | ||
if s.Code() == grpccodes.OK { | ||
ts.SetStatus(otelcodes.Ok, s.Message()) | ||
} else { | ||
ts.SetStatus(otelcodes.Error, s.Message()) | ||
// perCallMetrics records per call metrics. | ||
func (h *clientMetricsStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) { | ||
callLatency := float64(time.Since(startTime)) / float64(time.Second) | ||
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet( | ||
otelattribute.String("grpc.method", ci.method), | ||
otelattribute.String("grpc.target", ci.target), | ||
otelattribute.String("grpc.status", canonicalString(status.Code(err))), | ||
)) | ||
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs) | ||
} | ||
|
||
func (h *clientTracingStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { | ||
ci := &callInfo{ | ||
target: cc.CanonicalTarget(), | ||
method: h.determineMethod(method, opts...), | ||
} | ||
ctx = setCallInfo(ctx, ci) | ||
if h.options.MetricsOptions.pluginOption != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this metadata part is not applicable for tracing. It should only be in metrics interceptor. Please remove |
||
md := h.options.MetricsOptions.pluginOption.GetMetadata() | ||
for k, vs := range md { | ||
for _, v := range vs { | ||
ctx = metadata.AppendToOutgoingContext(ctx, k, v) | ||
} | ||
} | ||
ts.End() | ||
} | ||
if h.options.isMetricsEnabled() { | ||
callLatency := float64(time.Since(startTime)) / float64(time.Second) | ||
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet( | ||
otelattribute.String("grpc.method", ci.method), | ||
otelattribute.String("grpc.target", ci.target), | ||
otelattribute.String("grpc.status", canonicalString(status.Code(err))), | ||
)) | ||
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs) | ||
|
||
startTime := time.Now() | ||
var span trace.Span | ||
ctx, span = h.createCallTraceSpan(ctx, method) | ||
err := invoker(ctx, method, req, reply, cc, opts...) | ||
h.perCallTraces(ctx, err, startTime, ci, span) | ||
return err | ||
} | ||
|
||
func (h *clientTracingStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { | ||
ci := &callInfo{ | ||
target: cc.CanonicalTarget(), | ||
method: h.determineMethod(method, opts...), | ||
} | ||
ctx = setCallInfo(ctx, ci) | ||
|
||
if h.options.MetricsOptions.pluginOption != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same. this metadata part is not applicable for tracing. It should only be in metrics interceptor. Please remove |
||
md := h.options.MetricsOptions.pluginOption.GetMetadata() | ||
for k, vs := range md { | ||
for _, v := range vs { | ||
ctx = metadata.AppendToOutgoingContext(ctx, k, v) | ||
} | ||
} | ||
} | ||
|
||
startTime := time.Now() | ||
var span trace.Span | ||
ctx, span = h.createCallTraceSpan(ctx, method) | ||
callback := func(err error) { | ||
h.perCallTraces(ctx, err, startTime, ci, span) | ||
} | ||
opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...) | ||
return streamer(ctx, desc, cc, method, opts...) | ||
} | ||
|
||
// perCallTraces records per call trace spans. | ||
func (h *clientTracingStatsHandler) perCallTraces(_ context.Context, err error, _ time.Time, _ *callInfo, ts trace.Span) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since we don't need time and callInfo, we should not just have them here instead of replacing them with _ |
||
s := status.Convert(err) | ||
if s.Code() == grpccodes.OK { | ||
ts.SetStatus(otelcodes.Ok, s.Message()) | ||
} else { | ||
ts.SetStatus(otelcodes.Error, s.Message()) | ||
} | ||
ts.End() | ||
} | ||
|
||
// TagConn exists to satisfy stats.Handler. | ||
|
@@ -198,6 +245,7 @@ | |
}) | ||
} | ||
|
||
// HandleRPC implements per RPC tracing and stats implementation. | ||
func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { | ||
ri := getRPCInfo(ctx) | ||
if ri == nil { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -120,7 +120,17 @@ type MetricsOptions struct { | |
func DialOption(o Options) grpc.DialOption { | ||
csh := &clientStatsHandler{options: o} | ||
csh.initializeMetrics() | ||
return joinDialOptions(grpc.WithChainUnaryInterceptor(csh.unaryInterceptor), grpc.WithChainStreamInterceptor(csh.streamInterceptor), grpc.WithStatsHandler(csh)) | ||
var interceptors []grpc.DialOption | ||
if o.isMetricsEnabled() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i don't think we need to check anything here. We should just always add all the 4 interceptors. We should have initializeMetrics and initialTraces methods of respective stats handler that check and return early if they are not enabled. Currently, initializeMetrics exist in statsHandler for server and client which does that. We should modify it to be part of metricsStatsHandler and create the similar function for traceStatsHandler |
||
metricsHandler := &clientMetricsStatsHandler{clientStatsHandler: csh} | ||
interceptors = append(interceptors, grpc.WithChainUnaryInterceptor(metricsHandler.unaryInterceptor), grpc.WithChainStreamInterceptor(metricsHandler.streamInterceptor)) | ||
} | ||
if o.isTracingEnabled() { | ||
tracingHandler := &clientTracingStatsHandler{clientStatsHandler: csh} | ||
interceptors = append(interceptors, grpc.WithChainUnaryInterceptor(tracingHandler.unaryInterceptor), grpc.WithChainStreamInterceptor(tracingHandler.streamInterceptor)) | ||
} | ||
interceptors = append(interceptors, grpc.WithStatsHandler(csh)) | ||
return joinDialOptions(interceptors...) | ||
} | ||
|
||
var joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption) | ||
|
@@ -140,7 +150,17 @@ var joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) g | |
func ServerOption(o Options) grpc.ServerOption { | ||
ssh := &serverStatsHandler{options: o} | ||
ssh.initializeMetrics() | ||
return joinServerOptions(grpc.ChainUnaryInterceptor(ssh.unaryInterceptor), grpc.ChainStreamInterceptor(ssh.streamInterceptor), grpc.StatsHandler(ssh)) | ||
var interceptors []grpc.ServerOption | ||
if o.isMetricsEnabled() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment as dial option |
||
metricsHandler := &serverMetricsStatsHandler{serverStatsHandler: ssh} | ||
interceptors = append(interceptors, grpc.ChainUnaryInterceptor(metricsHandler.unaryInterceptor), grpc.ChainStreamInterceptor(metricsHandler.streamInterceptor)) | ||
} | ||
if o.isTracingEnabled() { | ||
tracingHandler := &serverTracingStatsHandler{serverStatsHandler: ssh} | ||
interceptors = append(interceptors, grpc.ChainUnaryInterceptor(tracingHandler.unaryInterceptor), grpc.ChainStreamInterceptor(tracingHandler.streamInterceptor)) | ||
} | ||
interceptors = append(interceptors, grpc.StatsHandler(ssh)) | ||
return joinServerOptions(interceptors...) | ||
} | ||
|
||
// callInfo is information pertaining to the lifespan of the RPC client side. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,13 @@ | |
serverMetrics serverMetrics | ||
} | ||
|
||
type serverMetricsStatsHandler struct { | ||
*serverStatsHandler | ||
} | ||
type serverTracingStatsHandler struct { | ||
*serverStatsHandler | ||
} | ||
|
||
func (h *serverStatsHandler) initializeMetrics() { | ||
// Will set no metrics to record, logically making this stats handler a | ||
// no-op. | ||
|
@@ -211,24 +218,41 @@ | |
|
||
// HandleRPC implements per RPC tracing and stats implementation. | ||
func (h *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { | ||
if h.options.isMetricsEnabled() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should have separate HandleRPC for metrics and trace statsHandlers instead of having common which requires to check what is enabled etc. |
||
metricsHandler := &serverMetricsStatsHandler{serverStatsHandler: h} | ||
metricsHandler.HandleRPC(ctx, rs) | ||
} | ||
if h.options.isTracingEnabled() { | ||
tracingHandler := &serverTracingStatsHandler{serverStatsHandler: h} | ||
tracingHandler.HandleRPC(ctx, rs) | ||
} | ||
} | ||
|
||
// HandleRPC implements per RPC stats handling for metrics. | ||
func (h *serverMetricsStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { | ||
ri := getRPCInfo(ctx) | ||
if ri == nil { | ||
logger.Error("ctx passed into server side stats handler metrics event handling has no server call data present") | ||
logger.Error("ctx passed into server metrics stats handler metrics event handling has no server call data present") | ||
return | ||
} | ||
if h.options.isTracingEnabled() { | ||
populateSpan(rs, ri.ai) | ||
} | ||
if h.options.isMetricsEnabled() { | ||
h.processRPCData(ctx, rs, ri.ai) | ||
h.processRPCData(ctx, rs, ri.ai) | ||
} | ||
|
||
// HandleRPC implements per RPC tracing handling for tracing. | ||
func (h *serverTracingStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { | ||
ri := getRPCInfo(ctx) | ||
if ri == nil { | ||
logger.Error("ctx passed into server tracing stats handler tracing event handling has no server call data present") | ||
return | ||
} | ||
populateSpan(rs, ri.ai) | ||
} | ||
|
||
func (h *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, ai *attemptInfo) { | ||
func (h *serverMetricsStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, ai *attemptInfo) { | ||
switch st := s.(type) { | ||
case *stats.InHeader: | ||
if ai.pluginOptionLabels == nil && h.options.MetricsOptions.pluginOption != nil { | ||
labels := h.options.MetricsOptions.pluginOption.GetLabels(st.Header) | ||
if ai.pluginOptionLabels == nil && h.serverStatsHandler.options.MetricsOptions.pluginOption != nil { | ||
labels := h.serverStatsHandler.options.MetricsOptions.pluginOption.GetLabels(st.Header) | ||
if labels == nil { | ||
labels = map[string]string{} // Shouldn't return a nil map. Make it empty if so to ignore future Get Calls for this Attempt. | ||
} | ||
|
@@ -237,7 +261,7 @@ | |
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet( | ||
otelattribute.String("grpc.method", ai.method), | ||
)) | ||
h.serverMetrics.callStarted.Add(ctx, 1, attrs) | ||
h.serverStatsHandler.serverMetrics.callStarted.Add(ctx, 1, attrs) | ||
case *stats.OutPayload: | ||
atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength)) | ||
case *stats.InPayload: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new line in between struct declaration