From ef4c99e27941a6d5cad164299fa1bc259e705249 Mon Sep 17 00:00:00 2001 From: "qiheng.zhou" Date: Wed, 28 Aug 2024 12:45:24 +0800 Subject: [PATCH] chore: do not modify the header length encode logic --- pkg/remote/codec/default_codec.go | 16 ++++++--- pkg/remote/codec/default_codec_test.go | 47 -------------------------- pkg/remote/codec/header_codec.go | 8 ----- 3 files changed, 12 insertions(+), 59 deletions(-) diff --git a/pkg/remote/codec/default_codec.go b/pkg/remote/codec/default_codec.go index a8f2b2116a..baa9163738 100644 --- a/pkg/remote/codec/default_codec.go +++ b/pkg/remote/codec/default_codec.go @@ -138,8 +138,7 @@ func (c *defaultCodec) EncodePayload(ctx context.Context, message remote.Message return perrors.NewProtocolErrorWithMsg("no buffer allocated for the framed length field") } payloadLen = out.MallocLen() - headerLen - // Be careful here. The `frameLenField` was malloced before encoding payload - // If the `out` buffer using copy to grow when the capacity is not enough, setting the pre-allocated `frameLenField` will be invalid. + // FIXME: if the `out` buffer using copy to grow when the capacity is not enough, setting the pre-allocated `framedLenField` may not take effect. binary.BigEndian.PutUint32(framedLenField, uint32(payloadLen)) } else if message.ProtocolInfo().CodecType == serviceinfo.Protobuf { return perrors.NewProtocolErrorWithMsg("protobuf just support 'framed' trans proto") @@ -176,6 +175,7 @@ func (c *defaultCodec) EncodeMetaAndPayload(ctx context.Context, message remote. if totalLenField == nil { return perrors.NewProtocolErrorWithMsg("no buffer allocated for the header length field") } + // FIXME: if the `out` buffer using copy to grow when the capacity is not enough, setting the pre-allocated `totalLenField` may not take effect. payloadLen := out.MallocLen() - Size32 binary.BigEndian.PutUint32(totalLenField, uint32(payloadLen)) } @@ -292,8 +292,8 @@ func (c *defaultCodec) encodeMetaAndPayloadWithPayloadValidator(ctx context.Cont message.SetPayloadLen(len(payload)) // 2. encode header and return totalLenField if needed - // In this case, set total length during TTHeader encode - if _, err = ttHeaderCodec.encode(ctx, message, out); err != nil { + totalLenField, err := ttHeaderCodec.encode(ctx, message, out) + if err != nil { return err } @@ -303,6 +303,14 @@ func (c *defaultCodec) encodeMetaAndPayloadWithPayloadValidator(ctx context.Cont } else { _, err = out.WriteBinary(payload) } + + // 4. fill totalLen field for header if needed + // FIXME: if the `out` buffer using copy to grow when the capacity is not enough, setting the pre-allocated `totalLenField` may not take effect. + if totalLenField == nil { + return perrors.NewProtocolErrorWithMsg("no buffer allocated for the header length field") + } + payloadLen := out.MallocLen() - Size32 + binary.BigEndian.PutUint32(totalLenField, uint32(payloadLen)) return err } diff --git a/pkg/remote/codec/default_codec_test.go b/pkg/remote/codec/default_codec_test.go index 581298c17a..572c28daf2 100644 --- a/pkg/remote/codec/default_codec_test.go +++ b/pkg/remote/codec/default_codec_test.go @@ -367,53 +367,6 @@ func TestDefaultCodecWithCustomizedValidator(t *testing.T) { test.Assert(t, err != nil, err) } -func TestDefaultCodecWithCRC32EncodeDecodeWithNonNetpollBuffer(t *testing.T) { - remote.PutPayloadCode(serviceinfo.Thrift, mpc) - - dc := NewDefaultCodecWithConfig(CodecConfig{CRC32Check: true}) - ctx := context.Background() - intKVInfo := prepareIntKVInfo() - strKVInfo := prepareStrKVInfo() - sendMsg := initClientSendMsg(transport.TTHeaderFramed, 32*1024) - sendMsg.TransInfo().PutTransIntInfo(intKVInfo) - sendMsg.TransInfo().PutTransStrInfo(strKVInfo) - - // test encode err - badOut := remote.NewReaderBuffer(nil) - err := dc.Encode(ctx, sendMsg, badOut) - test.Assert(t, err != nil) - - // encode with defaultByteBuffer - byteBuffer := remote.NewWriterBuffer(0) - err = dc.Encode(ctx, sendMsg, byteBuffer) - test.Assert(t, err == nil, err) - - // decode, succeed - recvMsg := initServerRecvMsg() - buf, err := byteBuffer.Bytes() - test.Assert(t, err == nil, err) - in := remote.NewReaderBuffer(buf) - err = dc.Decode(ctx, recvMsg, in) - test.Assert(t, err == nil, err) - intKVInfoRecv := recvMsg.TransInfo().TransIntInfo() - strKVInfoRecv := recvMsg.TransInfo().TransStrInfo() - test.DeepEqual(t, intKVInfoRecv, intKVInfo) - test.DeepEqual(t, strKVInfoRecv, strKVInfo) - test.Assert(t, sendMsg.RPCInfo().Invocation().SeqID() == recvMsg.RPCInfo().Invocation().SeqID()) - - // decode, crc32c check failed - test.Assert(t, err == nil, err) - bufLen := len(buf) - modifiedBuf := make([]byte, bufLen) - copy(modifiedBuf, buf) - for i := bufLen - 1; i > bufLen-10; i-- { - modifiedBuf[i] = 123 - } - in = remote.NewReaderBuffer(modifiedBuf) - err = dc.Decode(ctx, recvMsg, in) - test.Assert(t, err != nil, err) -} - func TestCodecTypeNotMatchWithServiceInfoPayloadCodec(t *testing.T) { for _, tb := range transportBuffers { t.Run(tb.Name, func(t *testing.T) { diff --git a/pkg/remote/codec/header_codec.go b/pkg/remote/codec/header_codec.go index 38903b5230..524dd6a903 100644 --- a/pkg/remote/codec/header_codec.go +++ b/pkg/remote/codec/header_codec.go @@ -117,8 +117,6 @@ const ( type ttHeader struct{} func (t ttHeader) encode(ctx context.Context, message remote.Message, out remote.ByteBuffer) (totalLenField []byte, err error) { - mallocLenBefore := out.MallocLen() - // 1. header meta var headerMeta []byte headerMeta, err = out.Malloc(TTHeaderMetaSize) @@ -155,12 +153,6 @@ func (t ttHeader) encode(ctx context.Context, message remote.Message, out remote return nil, perrors.NewProtocolErrorWithMsg(fmt.Sprintf("invalid header length[%d]", headerInfoSize)) } binary.BigEndian.PutUint16(headerInfoSizeField, uint16(headerInfoSize/4)) - if message.PayloadLen() != 0 { - // payload encoded before. set total length here. - headerLen := out.MallocLen() - mallocLenBefore - totalLen := message.PayloadLen() + headerLen - Size32 - binary.BigEndian.PutUint32(totalLenField, uint32(totalLen)) - } return totalLenField, err }