Skip to content

Commit

Permalink
feat: add grpc client connection dump to monitor the conn and stream …
Browse files Browse the repository at this point in the history
…status
  • Loading branch information
ppzqh committed Feb 14, 2025
1 parent 885b05a commit ec38d8b
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 0 deletions.
30 changes: 30 additions & 0 deletions pkg/remote/trans/nphttp2/conn_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/tls"
"net"
"runtime"
"runtime/debug"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -233,6 +234,35 @@ func (p *connPool) Close() error {
return nil
}

// Dump dumps the connection pool with the details of the underlying transport.
func (p *connPool) Dump() interface{} {
defer func() {
if panicErr := recover(); panicErr != nil {
klog.Errorf("KITEX: dump gRPC client connection pool panic, err=%v, stack=%s", panicErr, string(debug.Stack()))
}
}()

// toAddr -> []clientTransportDump
// If mesh enabled, toAddr will be the same, so you should check the remoteAddress in each stream.
res := make(map[string]interface{}, p.size)
p.conns.Range(func(k, v interface{}) bool {
addr := k.(string)
ts := v.(*transports)
transportsDump := make([]interface{}, 0, len(ts.cliTransports))
for _, t := range ts.cliTransports {
if t == nil {
continue
}
if dumper, ok := t.(interface{ Dump() interface{} }); ok {
transportsDump = append(transportsDump, dumper.Dump())
}
}
res[addr] = transportsDump
return true
})
return res
}

// newTLSConn constructs a client-side TLS connection and performs handshake.
func newTLSConn(conn net.Conn, tlsCfg *tls.Config) (net.Conn, error) {
tlsConn := tls.Client(conn, tlsCfg)
Expand Down
11 changes: 11 additions & 0 deletions pkg/remote/trans/nphttp2/conn_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"github.com/bytedance/sonic"

"github.com/cloudwego/kitex/internal/test"
"github.com/cloudwego/kitex/pkg/remote"
)
Expand Down Expand Up @@ -50,6 +52,15 @@ func TestConnPool(t *testing.T) {
_, err = connPool.Get(ctx, "tcp", mockAddr0, opt)
test.Assert(t, err != nil, err)

// test Dump()
dump := connPool.Dump()
test.Assert(t, dump != nil)
m := dump.(map[string]interface{})
test.Assert(t, m[mockAddr0] != nil)
test.Assert(t, m[mockAddr1] != nil)
_, err = sonic.Marshal(dump)
test.Assert(t, err == nil, err)

// test Clean()
connPool.Clean("tcp", mockAddr0)
_, ok := connPool.conns.Load(mockAddr0)
Expand Down
48 changes: 48 additions & 0 deletions pkg/remote/trans/nphttp2/grpc/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"sync/atomic"
"time"

"github.com/cloudwego/kitex/pkg/remote/transmeta"

"github.com/cloudwego/netpoll"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
Expand Down Expand Up @@ -314,6 +316,22 @@ func (task closeStreamTask) Tick() {
task.toCloseStreams = task.toCloseStreams[:0]
}

type clientTransportDump struct {
State transportState `json:"transport_state"`
ActiveStreams []streamDump `json:"active_streams"`
LastReadTime time.Time `json:"last_read_time"`
KeepAliveEnabled bool `json:"keepalive_enabled"`
}

type streamDump struct {
ID uint32 `json:"id"`
RemoteAddress string `json:"remote_address"`
Method string `json:"method"`
State streamState `json:"stream_state"`
WriteQuota int32 `json:"write_quota"`
HeaderReceived bool `json:"header_received"`
}

type preAllocatedStreamFields struct {
recvBuffer *recvBuffer
writeQuota *writeQuota
Expand Down Expand Up @@ -1290,3 +1308,33 @@ func (t *http2Client) IsActive() bool {
defer t.mu.Unlock()
return t.state == reachable
}

func (t *http2Client) Dump() interface{} {
t.mu.Lock()
defer t.mu.Unlock()

as := make([]streamDump, 0, len(t.activeStreams))
for _, v := range t.activeStreams {
sd := streamDump{
ID: v.id,
Method: v.method,
State: v.state,
WriteQuota: atomic.LoadInt32(&v.wq.quota),
HeaderReceived: atomic.LoadUint32(&v.headerChanClosed) == 0,
}
// get info from header
if md, ok := v.tryGetHeader(); ok {
if rip := md.Get(transmeta.HTTPRemoteIP); len(rip) > 0 && len(rip[0]) > 0 {
sd.RemoteAddress = rip[0]
}
}
as = append(as, sd)
}

return clientTransportDump{
State: t.state,
ActiveStreams: as,
KeepAliveEnabled: t.keepaliveEnabled,
LastReadTime: time.Unix(0, atomic.LoadInt64(&t.lastRead)),
}
}
15 changes: 15 additions & 0 deletions pkg/remote/trans/nphttp2/grpc/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,21 @@ func (s *Stream) Header() (metadata.MD, error) {
return s.header.Copy(), nil
}

// tryGetHeader attempts to get the header in a non-blocking way.
// Returns (nil, false) if the header is not available.
// Notice: only use on client side.
func (s *Stream) tryGetHeader() (metadata.MD, bool) {
if s.headerChan == nil {
return nil, false
}
select {
case <-s.headerChan:
return s.header.Copy(), true
default:
return nil, false
}
}

// TrailersOnly blocks until a header or trailers-only frame is received and
// then returns true if the stream was trailers-only. If the stream ends
// before headers are received, returns true, nil. Client-side only.
Expand Down
1 change: 1 addition & 0 deletions pkg/remote/transmeta/http_metakey.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ const (
HTTPSourceMethod = "source-method"

HTTPStreamLogID = "stream-log-id"
HTTPRemoteIP = "rip"
)

0 comments on commit ec38d8b

Please sign in to comment.