Skip to content

Commit

Permalink
fix: gonet transServer processes gRPC transHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
ppzqh committed Jan 31, 2025
1 parent 047444c commit 49b8cdf
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 0 deletions.
24 changes: 24 additions & 0 deletions pkg/remote/trans/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"net"
"sync/atomic"
"time"

"github.com/cloudwego/kitex/pkg/remote"
Expand All @@ -29,6 +30,29 @@ import (

var readMoreTimeout = 5 * time.Millisecond

type (
CtxKeyOnRead struct{}
CtxValueOnRead struct {
onlyOnce int32
}
)

func (v *CtxValueOnRead) SetOnlyOnce(b bool) {
if b {
atomic.StoreInt32(&v.onlyOnce, 1)
} else {
atomic.StoreInt32(&v.onlyOnce, 0)
}
}

func (v *CtxValueOnRead) GetOnlyOnce() bool {
if atomic.LoadInt32(&v.onlyOnce) == 1 {
return true
} else {
return false
}
}

// Extension is the interface that trans extensions need to implement, it will make the extension of trans more easily.
// Normally if we want to extend transport layer we need to implement the trans interfaces which are defined in trans_handler.go.
// In fact most code logic is similar in same mode, so the Extension interface is the the differentiated part that need to
Expand Down
9 changes: 9 additions & 0 deletions pkg/remote/trans/gonet/trans_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,18 @@ func (ts *transServer) BootstrapServer(ln net.Listener) (err error) {
klog.CtxErrorf(ctx, "KITEX: OnActive error=%s", err)
return
}
ctxValueOnRead := &trans.CtxValueOnRead{}
ctx = context.WithValue(ctx, trans.CtxKeyOnRead{}, ctxValueOnRead)
onReadOnlyOnceCheck := false
for {
ts.refreshDeadline(rpcinfo.GetRPCInfo(ctx), bc)
err := ts.transHdlr.OnRead(ctx, bc)
if !onReadOnlyOnceCheck {
onReadOnlyOnceCheck = true
if ctxValueOnRead.GetOnlyOnce() {
break
}
}
if err != nil {
ts.onError(ctx, err, bc)
_ = bc.Close()
Expand Down
6 changes: 6 additions & 0 deletions pkg/remote/trans/nphttp2/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"sync/atomic"
"time"

"github.com/cloudwego/kitex/pkg/remote/trans"

"github.com/cloudwego/netpoll"

"github.com/cloudwego/kitex/pkg/endpoint"
Expand Down Expand Up @@ -128,6 +130,10 @@ func (t *svrTransHandler) Read(ctx context.Context, conn net.Conn, msg remote.Me

// 只 return write err
func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) error {
if v := ctx.Value(trans.CtxKeyOnRead{}); v != nil {
value := v.(*trans.CtxValueOnRead)
value.SetOnlyOnce(true)
}
svrTrans := ctx.Value(ctxKeySvrTransport).(*SvrTrans)
tr := svrTrans.tr
tr.HandleStreams(func(s *grpcTransport.Stream) {
Expand Down

0 comments on commit 49b8cdf

Please sign in to comment.