diff --git a/pkg/remote/codec/default_codec.go b/pkg/remote/codec/default_codec.go index 3fa18aa390..ea41410dfd 100644 --- a/pkg/remote/codec/default_codec.go +++ b/pkg/remote/codec/default_codec.go @@ -222,6 +222,7 @@ func (c *defaultCodec) DecodeMeta(ctx context.Context, message remote.Message, i return perrors.NewProtocolErrorWithErrMsg(err, fmt.Sprintf("ttheader read payload first 8 byte failed: %s", err.Error())) } if c.payloadValidator != nil { + fillRPCInfo(message) if vErr := validate(ctx, message, in, c.payloadValidator); vErr != nil { return vErr } diff --git a/pkg/remote/codec/validate.go b/pkg/remote/codec/validate.go index cbbdbfefce..9e3c79c042 100644 --- a/pkg/remote/codec/validate.go +++ b/pkg/remote/codec/validate.go @@ -8,6 +8,7 @@ import ( "github.com/cloudwego/kitex/pkg/remote" "github.com/cloudwego/kitex/pkg/remote/codec/perrors" "github.com/cloudwego/kitex/pkg/remote/transmeta" + "github.com/cloudwego/kitex/pkg/rpcinfo" "hash/crc32" "sync" ) @@ -34,6 +35,22 @@ func validate(ctx context.Context, message remote.Message, in remote.ByteBuffer, return nil } +func fillRPCInfo(message remote.Message) { + if ri := message.RPCInfo(); ri != nil { + transInfo := message.TransInfo() + intInfo := transInfo.TransIntInfo() + ci := rpcinfo.AsMutableEndpointInfo(ri.From()) + if ci != nil { + if v := intInfo[transmeta.FromService]; v != "" { + ci.SetServiceName(v) + } + if v := intInfo[transmeta.FromMethod]; v != "" { + ci.SetMethod(v) + } + } + } +} + // PayloadValidator is the interface for validating the payload of RPC requests, which allows customized Checksum function. type PayloadValidator interface { // Key returns a key for your validator, which will be the key in ttheader