Skip to content

Commit 6956a22

Browse files
committed
smallnest#564 add TRACE flag to trace rpcx flow, which should be used on in test environment
1 parent d969a5f commit 6956a22

File tree

4 files changed

+103
-17
lines changed

4 files changed

+103
-17
lines changed

โ€Žclient/client.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ type RPCClient interface {
8787
Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error
8888
SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)
8989
Close() error
90+
RemoteAddr() string
9091

9192
RegisterServerMessageChan(ch chan<- *protocol.Message)
9293
UnregisterServerMessageChan()
@@ -123,6 +124,11 @@ func NewClient(option Option) *Client {
123124
}
124125
}
125126

127+
// RemoteAddr returns the remote address.
128+
func (c *Client) RemoteAddr() string {
129+
return c.Conn.RemoteAddr().String()
130+
}
131+
126132
// Option contains all options for creating clients.
127133
type Option struct {
128134
// Group is used to select the services in the same group. Services set group info in their meta.
@@ -248,6 +254,10 @@ func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string,
248254
}
249255
}
250256
call.Done = done
257+
258+
if share.Trace {
259+
log.Debug("client.Go send request for %s.%s, args: %+v in case of client call", servicePath, serviceMethod, args)
260+
}
251261
client.send(ctx, call)
252262
return call
253263
}
@@ -313,6 +323,14 @@ func (client *Client) Call(ctx context.Context, servicePath, serviceMethod strin
313323
func (client *Client) call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error {
314324
seq := new(uint64)
315325
ctx = context.WithValue(ctx, seqKey{}, seq)
326+
327+
if share.Trace {
328+
log.Debug("client.call for %s.%s, args: %+v in case of client call", servicePath, serviceMethod, args)
329+
defer func() {
330+
log.Debug("client.call done for %s.%s, args: %+v in case of client call", servicePath, serviceMethod, args)
331+
}()
332+
}
333+
316334
Done := client.Go(ctx, servicePath, serviceMethod, args, reply, make(chan *Call, 1)).Done
317335

318336
var err error
@@ -558,9 +576,15 @@ func (client *Client) send(ctx context.Context, call *Call) {
558576
_ = client.Plugins.DoClientBeforeEncode(req)
559577
}
560578

579+
if share.Trace {
580+
log.Debug("client.send for %s.%s, args: %+v in case of client call", call.ServicePath, call.ServiceMethod, call.Args)
581+
}
561582
allData := req.EncodeSlicePointer()
562583
_, err = client.Conn.Write(*allData)
563584
protocol.PutData(allData)
585+
if share.Trace {
586+
log.Debug("client.sent for %s.%s, args: %+v in case of client call", call.ServicePath, call.ServiceMethod, call.Args)
587+
}
564588

565589
if err != nil {
566590
client.mutex.Lock()
@@ -620,6 +644,10 @@ func (client *Client) input() {
620644
client.mutex.Unlock()
621645
}
622646

647+
if share.Trace {
648+
log.Debug("client.input received %v", res)
649+
}
650+
623651
switch {
624652
case call == nil:
625653
if isServerMessage {

โ€Žclient/xclient.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
"github.com/juju/ratelimit"
1818
ex "github.com/smallnest/rpcx/errors"
19+
"github.com/smallnest/rpcx/log"
1920
"github.com/smallnest/rpcx/protocol"
2021
"github.com/smallnest/rpcx/share"
2122
"golang.org/x/sync/singleflight"
@@ -411,10 +412,16 @@ func (c *xClient) Go(ctx context.Context, serviceMethod string, args interface{}
411412

412413
ctx = setServerTimeout(ctx)
413414

415+
if share.Trace {
416+
log.Debug("select a client for %s.%s, args: %+v in case of xclient Go", c.servicePath, serviceMethod, args)
417+
}
414418
_, client, err := c.selectClient(ctx, c.servicePath, serviceMethod, args)
415419
if err != nil {
416420
return nil, err
417421
}
422+
if share.Trace {
423+
log.Debug("selected a client %s for %s.%s, args: %+v in case of xclient Go", client.RemoteAddr(), c.servicePath, serviceMethod, args)
424+
}
418425
return client.Go(ctx, c.servicePath, serviceMethod, args, reply, done), nil
419426
}
420427

@@ -436,6 +443,10 @@ func (c *xClient) Call(ctx context.Context, serviceMethod string, args interface
436443
}
437444
ctx = setServerTimeout(ctx)
438445

446+
if share.Trace {
447+
log.Debug("select a client for %s.%s, failMode: %v, args: %+v in case of xclient Call", c.servicePath, serviceMethod, c.failMode, args)
448+
}
449+
439450
var err error
440451
k, client, err := c.selectClient(ctx, c.servicePath, serviceMethod, args)
441452
if err != nil {
@@ -444,6 +455,10 @@ func (c *xClient) Call(ctx context.Context, serviceMethod string, args interface
444455
}
445456
}
446457

458+
if share.Trace {
459+
log.Debug("selected a client %s for %s.%s, failMode: %v, args: %+v in case of xclient Call", client.RemoteAddr(), c.servicePath, serviceMethod, c.failMode, args)
460+
}
461+
447462
var e error
448463
switch c.failMode {
449464
case Failtry:
@@ -613,6 +628,10 @@ func (c *xClient) SendRaw(ctx context.Context, r *protocol.Message) (map[string]
613628

614629
ctx = setServerTimeout(ctx)
615630

631+
if share.Trace {
632+
log.Debug("select a client for %s.%s, failMode: %v, args: %+v in case of xclient SendRaw", r.ServicePath, r.ServiceMethod, c.failMode, r.Payload)
633+
}
634+
616635
var err error
617636
k, client, err := c.selectClient(ctx, r.ServicePath, r.ServiceMethod, r.Payload)
618637
if err != nil {
@@ -627,6 +646,10 @@ func (c *xClient) SendRaw(ctx context.Context, r *protocol.Message) (map[string]
627646
}
628647
}
629648

649+
if share.Trace {
650+
log.Debug("selected a client %s for %s.%s, failMode: %v, args: %+v in case of xclient Call", client.RemoteAddr(), r.ServicePath, r.ServiceMethod, c.failMode, r.Payload)
651+
}
652+
630653
var e error
631654
switch c.failMode {
632655
case Failtry:
@@ -702,11 +725,19 @@ func (c *xClient) wrapCall(ctx context.Context, client RPCClient, serviceMethod
702725
return ErrServerUnavailable
703726
}
704727

728+
if share.Trace {
729+
log.Debug("call a client for %s.%s, args: %+v in case of xclient wrapCall", c.servicePath, serviceMethod, args)
730+
}
731+
705732
ctx = share.NewContext(ctx)
706733
c.Plugins.DoPreCall(ctx, c.servicePath, serviceMethod, args)
707734
err := client.Call(ctx, c.servicePath, serviceMethod, args, reply)
708735
c.Plugins.DoPostCall(ctx, c.servicePath, serviceMethod, args, reply, err)
709736

737+
if share.Trace {
738+
log.Debug("called a client for %s.%s, args: %+v, err: %v in case of xclient wrapCall", c.servicePath, serviceMethod, args, err)
739+
}
740+
710741
return err
711742
}
712743

โ€Žserver/server.go

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,18 @@ import (
99
"io"
1010
"net"
1111
"net/http"
12+
"os"
13+
"os/exec"
14+
"os/signal"
1215
"reflect"
1316
"regexp"
1417
"runtime"
1518
"strconv"
1619
"strings"
1720
"sync"
1821
"sync/atomic"
19-
"time"
20-
21-
"os"
22-
"os/exec"
23-
"os/signal"
2422
"syscall"
23+
"time"
2524

2625
"github.com/smallnest/rpcx/log"
2726
"github.com/smallnest/rpcx/protocol"
@@ -245,7 +244,6 @@ func (s *Server) ServeListener(network string, ln net.Listener) (err error) {
245244
// creating a new service goroutine for each.
246245
// The service goroutines read requests and then call services to reply to them.
247246
func (s *Server) serveListener(ln net.Listener) error {
248-
249247
var tempDelay time.Duration
250248

251249
s.mu.Lock()
@@ -303,6 +301,10 @@ func (s *Server) serveListener(ln net.Listener) error {
303301
s.activeConn[conn] = struct{}{}
304302
s.mu.Unlock()
305303

304+
if share.Trace {
305+
log.Debug("server accepted an conn%c", conn.RemoteAddr().String())
306+
}
307+
306308
go s.serveConn(conn)
307309
}
308310
}
@@ -333,6 +335,10 @@ func (s *Server) serveConn(conn net.Conn) {
333335
buf = buf[:ss]
334336
log.Errorf("serving %s panic error: %s, stack:\n %s", conn.RemoteAddr(), err, buf)
335337
}
338+
if share.Trace {
339+
log.Debug("server closed conn: %v", conn.RemoteAddr().String())
340+
}
341+
336342
s.mu.Lock()
337343
delete(s.activeConn, conn)
338344
s.mu.Unlock()
@@ -390,8 +396,12 @@ func (s *Server) serveConn(conn net.Conn) {
390396
conn.SetWriteDeadline(t0.Add(s.writeTimeout))
391397
}
392398

399+
if share.Trace {
400+
log.Debug("server received an request %s from conn: %v", req, conn.RemoteAddr().String())
401+
}
402+
393403
ctx = share.WithLocalValue(ctx, StartRequestContextKey, time.Now().UnixNano())
394-
var closeConn = false
404+
closeConn := false
395405
if !req.IsHeartbeat() {
396406
err = s.auth(ctx, req)
397407
closeConn = err != nil
@@ -446,15 +456,17 @@ func (s *Server) serveConn(conn net.Conn) {
446456

447457
s.Plugins.DoPreHandleRequest(ctx, req)
448458

459+
if share.Trace {
460+
log.Debug("server handle request %s from conn: %v", req, conn.RemoteAddr().String())
461+
}
449462
res, err := s.handleRequest(ctx, req)
450-
451463
if err != nil {
452464
log.Warnf("rpcx: failed to handle request: %v", err)
453465
}
454466

455467
s.Plugins.DoPreWriteResponse(ctx, req, res, err)
456468
if !req.IsOneway() {
457-
if len(resMetadata) > 0 { //copy meta in context to request
469+
if len(resMetadata) > 0 { // copy meta in context to request
458470
meta := res.Metadata
459471
if meta == nil {
460472
res.Metadata = resMetadata
@@ -476,6 +488,10 @@ func (s *Server) serveConn(conn net.Conn) {
476488
}
477489
s.Plugins.DoPostWriteResponse(ctx, req, res, err)
478490

491+
if share.Trace {
492+
log.Debug("server write response %v for an request %s from conn: %v", res, req, conn.RemoteAddr().String())
493+
}
494+
479495
protocol.FreeMsg(req)
480496
protocol.FreeMsg(res)
481497
}()
@@ -549,21 +565,26 @@ func (s *Server) handleRequest(ctx context.Context, req *protocol.Message) (res
549565
res.SetMessageType(protocol.Response)
550566
s.serviceMapMu.RLock()
551567
service := s.serviceMap[serviceName]
568+
569+
if share.Trace {
570+
log.Debug("server get service %s for an request %s", service, req)
571+
}
572+
552573
s.serviceMapMu.RUnlock()
553574
if service == nil {
554575
err = errors.New("rpcx: can't find service " + serviceName)
555576
return handleError(res, err)
556577
}
557578
mtype := service.method[methodName]
558579
if mtype == nil {
559-
if service.function[methodName] != nil { //check raw functions
580+
if service.function[methodName] != nil { // check raw functions
560581
return s.handleRequestForFunction(ctx, req)
561582
}
562583
err = errors.New("rpcx: can't find method " + methodName)
563584
return handleError(res, err)
564585
}
565586

566-
var argv = argsReplyPools.Get(mtype.ArgType)
587+
argv := argsReplyPools.Get(mtype.ArgType)
567588

568589
codec := share.Codecs[req.SerializeType()]
569590
if codec == nil {
@@ -601,7 +622,6 @@ func (s *Server) handleRequest(ctx context.Context, req *protocol.Message) (res
601622
argsReplyPools.Put(mtype.ReplyType, replyv)
602623
if err != nil {
603624
return handleError(res, err)
604-
605625
}
606626
res.Payload = data
607627
}
@@ -614,13 +634,16 @@ func (s *Server) handleRequest(ctx context.Context, req *protocol.Message) (res
614634
argsReplyPools.Put(mtype.ReplyType, replyv)
615635
if err != nil {
616636
return handleError(res, err)
617-
618637
}
619638
res.Payload = data
620639
} else if replyv != nil {
621640
argsReplyPools.Put(mtype.ReplyType, replyv)
622641
}
623642

643+
if share.Trace {
644+
log.Debug("server called service %s for an request %s", service, req)
645+
}
646+
624647
return res, nil
625648
}
626649

@@ -644,7 +667,7 @@ func (s *Server) handleRequestForFunction(ctx context.Context, req *protocol.Mes
644667
return handleError(res, err)
645668
}
646669

647-
var argv = argsReplyPools.Get(mtype.ArgType)
670+
argv := argsReplyPools.Get(mtype.ArgType)
648671

649672
codec := share.Codecs[req.SerializeType()]
650673
if codec == nil {
@@ -677,7 +700,6 @@ func (s *Server) handleRequestForFunction(ctx context.Context, req *protocol.Mes
677700
argsReplyPools.Put(mtype.ReplyType, replyv)
678701
if err != nil {
679702
return handleError(res, err)
680-
681703
}
682704
res.Payload = data
683705
} else if replyv != nil {
@@ -841,7 +863,7 @@ func (s *Server) startProcess() (int, error) {
841863
var env []string
842864
env = append(env, os.Environ()...)
843865

844-
var originalWD, _ = os.Getwd()
866+
originalWD, _ := os.Getwd()
845867
allFiles := []*os.File{os.Stdin, os.Stdout, os.Stderr}
846868
process, err := os.StartProcess(argv0, os.Args, &os.ProcAttr{
847869
Dir: originalWD,
@@ -876,7 +898,7 @@ var ip4Reg = regexp.MustCompile(`^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-
876898
func validIP4(ipAddress string) bool {
877899
ipAddress = strings.Trim(ipAddress, " ")
878900
i := strings.LastIndex(ipAddress, ":")
879-
ipAddress = ipAddress[:i] //remove port
901+
ipAddress = ipAddress[:i] // remove port
880902

881903
return ip4Reg.MatchString(ipAddress)
882904
}

โ€Žshare/share.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ const (
3737
StreamServiceName = "_streamservice"
3838
)
3939

40+
// Trace is a flag to write a trace log or not.
41+
// You should not enable this flag ofr product environment and enable it only for test.
42+
// It writes trace log with logger Debug level.
43+
var Trace bool
44+
4045
// Codecs are codecs supported by rpcx. You can add customized codecs in Codecs.
4146
var Codecs = map[protocol.SerializeType]codec.Codec{
4247
protocol.SerializeNone: &codec.ByteCodec{},

0 commit comments

Comments
ย (0)