Skip to content

Commit a928670

Browse files
authored
Cherry-pick #8874 to v1.79.x (#8904)
Cherry picks [PR](#8874) into 1.79.x RELEASE NOTES: * stats: only process RPC stats/tracing in health and ORCA producers if a handler is configured, preventing unnecessary error logging
1 parent 06df363 commit a928670

File tree

7 files changed

+111
-59
lines changed

7 files changed

+111
-59
lines changed

internal/transport/client_stream.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"golang.org/x/net/http2"
2525
"google.golang.org/grpc/mem"
2626
"google.golang.org/grpc/metadata"
27+
"google.golang.org/grpc/stats"
2728
"google.golang.org/grpc/status"
2829
)
2930

@@ -46,10 +47,11 @@ type ClientStream struct {
4647
// meaningful after headerChan is closed (always call waitOnHeader() before
4748
// reading its value).
4849
headerValid bool
49-
noHeaders bool // set if the client never received headers (set only after the stream is done).
50-
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
51-
bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream
52-
unprocessed atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream
50+
noHeaders bool // set if the client never received headers (set only after the stream is done).
51+
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
52+
bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream
53+
unprocessed atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream
54+
statsHandler stats.Handler // nil for internal streams (e.g., health check, ORCA) where telemetry is not supported.
5355
}
5456

5557
// Read reads an n byte message from the input stream.

internal/transport/http2_client.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -478,18 +478,19 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
478478
return t, nil
479479
}
480480

481-
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientStream {
481+
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr, handler stats.Handler) *ClientStream {
482482
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
483483
s := &ClientStream{
484484
Stream: Stream{
485485
method: callHdr.Method,
486486
sendCompress: callHdr.SendCompress,
487487
contentSubtype: callHdr.ContentSubtype,
488488
},
489-
ct: t,
490-
done: make(chan struct{}),
491-
headerChan: make(chan struct{}),
492-
doneFunc: callHdr.DoneFunc,
489+
ct: t,
490+
done: make(chan struct{}),
491+
headerChan: make(chan struct{}),
492+
doneFunc: callHdr.DoneFunc,
493+
statsHandler: handler,
493494
}
494495
s.Stream.buf.init()
495496
s.Stream.wq.init(defaultWriteQuota, s.done)
@@ -744,7 +745,7 @@ func (e NewStreamError) Error() string {
744745

745746
// NewStream creates a stream and registers it into the transport as "active"
746747
// streams. All non-nil errors returned will be *NewStreamError.
747-
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientStream, error) {
748+
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr, handler stats.Handler) (*ClientStream, error) {
748749
ctx = peer.NewContext(ctx, t.Peer())
749750

750751
// ServerName field of the resolver returned address takes precedence over
@@ -781,7 +782,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientS
781782
if err != nil {
782783
return nil, &NewStreamError{Err: err, AllowTransparentRetry: false}
783784
}
784-
s := t.newStream(ctx, callHdr)
785+
s := t.newStream(ctx, callHdr, handler)
785786
cleanup := func(err error) {
786787
if s.swapState(streamDone) == streamDone {
787788
// If it was already done, return.
@@ -902,7 +903,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientS
902903
return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
903904
}
904905
}
905-
if t.statsHandler != nil {
906+
if s.statsHandler != nil {
906907
header, ok := metadata.FromOutgoingContext(ctx)
907908
if ok {
908909
header.Set("user-agent", t.userAgent)
@@ -911,7 +912,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientS
911912
}
912913
// Note: The header fields are compressed with hpack after this call returns.
913914
// No WireLength field is set here.
914-
t.statsHandler.HandleRPC(s.ctx, &stats.OutHeader{
915+
s.statsHandler.HandleRPC(s.ctx, &stats.OutHeader{
915916
Client: true,
916917
FullMethod: callHdr.Method,
917918
RemoteAddr: t.remoteAddr,
@@ -1587,16 +1588,16 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
15871588
}
15881589
}
15891590

1590-
if t.statsHandler != nil {
1591+
if s.statsHandler != nil {
15911592
if !endStream {
1592-
t.statsHandler.HandleRPC(s.ctx, &stats.InHeader{
1593+
s.statsHandler.HandleRPC(s.ctx, &stats.InHeader{
15931594
Client: true,
15941595
WireLength: int(frame.Header().Length),
15951596
Header: metadata.MD(mdata).Copy(),
15961597
Compression: s.recvCompress,
15971598
})
15981599
} else {
1599-
t.statsHandler.HandleRPC(s.ctx, &stats.InTrailer{
1600+
s.statsHandler.HandleRPC(s.ctx, &stats.InTrailer{
16001601
Client: true,
16011602
WireLength: int(frame.Header().Length),
16021603
Trailer: metadata.MD(mdata).Copy(),

internal/transport/keepalive_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func (s) TestMaxConnectionIdle(t *testing.T) {
6969

7070
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
7171
defer cancel()
72-
stream, err := client.NewStream(ctx, &CallHdr{})
72+
stream, err := client.NewStream(ctx, &CallHdr{}, nil)
7373
if err != nil {
7474
t.Fatalf("client.NewStream() failed: %v", err)
7575
}
@@ -111,7 +111,7 @@ func (s) TestMaxConnectionIdleBusyClient(t *testing.T) {
111111

112112
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
113113
defer cancel()
114-
_, err := client.NewStream(ctx, &CallHdr{})
114+
_, err := client.NewStream(ctx, &CallHdr{}, nil)
115115
if err != nil {
116116
t.Fatalf("client.NewStream() failed: %v", err)
117117
}
@@ -150,7 +150,7 @@ func (s) TestMaxConnectionAge(t *testing.T) {
150150

151151
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
152152
defer cancel()
153-
if _, err := client.NewStream(ctx, &CallHdr{}); err != nil {
153+
if _, err := client.NewStream(ctx, &CallHdr{}, nil); err != nil {
154154
t.Fatalf("client.NewStream() failed: %v", err)
155155
}
156156

@@ -372,7 +372,7 @@ func (s) TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
372372
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
373373
defer cancel()
374374
// Create a stream, but send no data on it.
375-
if _, err := client.NewStream(ctx, &CallHdr{}); err != nil {
375+
if _, err := client.NewStream(ctx, &CallHdr{}, nil); err != nil {
376376
t.Fatalf("Stream creation failed: %v", err)
377377
}
378378

@@ -514,7 +514,7 @@ func (s) TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) {
514514

515515
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
516516
defer cancel()
517-
if _, err := client.NewStream(ctx, &CallHdr{}); err != nil {
517+
if _, err := client.NewStream(ctx, &CallHdr{}, nil); err != nil {
518518
t.Fatalf("Stream creation failed: %v", err)
519519
}
520520

@@ -743,7 +743,7 @@ func (s) TestTCPUserTimeout(t *testing.T) {
743743

744744
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
745745
defer cancel()
746-
stream, err := client.NewStream(ctx, &CallHdr{})
746+
stream, err := client.NewStream(ctx, &CallHdr{}, nil)
747747
if err != nil {
748748
t.Fatalf("client.NewStream() failed: %v", err)
749749
}
@@ -810,7 +810,7 @@ func makeTLSCreds(t *testing.T, certPath, keyPath, rootsPath string) credentials
810810
func checkForHealthyStream(client *http2Client) error {
811811
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
812812
defer cancel()
813-
stream, err := client.NewStream(ctx, &CallHdr{})
813+
stream, err := client.NewStream(ctx, &CallHdr{}, nil)
814814
stream.Close(err)
815815
return err
816816
}
@@ -819,7 +819,7 @@ func pollForStreamCreationError(client *http2Client) error {
819819
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
820820
defer cancel()
821821
for {
822-
if _, err := client.NewStream(ctx, &CallHdr{}); err != nil {
822+
if _, err := client.NewStream(ctx, &CallHdr{}, nil); err != nil {
823823
break
824824
}
825825
time.Sleep(50 * time.Millisecond)
@@ -845,7 +845,7 @@ func waitForGoAwayTooManyPings(client *http2Client) error {
845845
return fmt.Errorf("test timed out before getting GoAway with reason:GoAwayTooManyPings from server")
846846
}
847847

848-
if _, err := client.NewStream(ctx, &CallHdr{}); err == nil {
848+
if _, err := client.NewStream(ctx, &CallHdr{}, nil); err == nil {
849849
return fmt.Errorf("stream creation succeeded after receiving a GoAway from the server")
850850
}
851851
return nil

internal/transport/transport.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ type ClientTransport interface {
617617
GracefulClose()
618618

619619
// NewStream creates a Stream for an RPC.
620-
NewStream(ctx context.Context, callHdr *CallHdr) (*ClientStream, error)
620+
NewStream(ctx context.Context, callHdr *CallHdr, handler stats.Handler) (*ClientStream, error)
621621

622622
// Error returns a channel that is closed when some I/O error
623623
// happens. Typically the caller should have a goroutine to monitor

0 commit comments

Comments
 (0)