Skip to content

Commit 51011c6

Browse files
committed
modify byte pool
1 parent bba93a8 commit 51011c6

File tree

9 files changed

+75
-63
lines changed

9 files changed

+75
-63
lines changed

โ€Žcodec/codec.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func (c PBCodec) Encode(i interface{}) ([]byte, error) {
6969
return pb.Marshal(m)
7070
}
7171

72-
return nil, fmt.Errorf("%T is not a proto.Marshaler", i)
72+
return nil, fmt.Errorf("%T is not a proto.Marshaler or pb.Message", i)
7373
}
7474

7575
// Decode decodes an object from slice of bytes.
@@ -82,7 +82,7 @@ func (c PBCodec) Decode(data []byte, i interface{}) error {
8282
return pb.Unmarshal(data, m)
8383
}
8484

85-
return fmt.Errorf("%T is not a proto.Unmarshaler", i)
85+
return fmt.Errorf("%T is not a proto.Unmarshaler or pb.Message", i)
8686
}
8787

8888
// MsgpackCodec uses messagepack marshaler and unmarshaler.

โ€Žgo.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ require (
3939
github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161 // indirect
4040
github.com/templexxx/xor v0.0.0-20191217153810-f85b25db303b // indirect
4141
github.com/tjfoc/gmsm v1.4.0 // indirect
42+
github.com/valyala/bytebufferpool v1.0.0 // indirect
4243
github.com/valyala/fastrand v0.0.0-20170531153657-19dd0f0bf014
4344
github.com/vmihailenco/msgpack/v5 v5.3.4
4445
github.com/xtaci/kcp-go v5.4.20+incompatible

โ€Žgo.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,8 @@ github.com/templexxx/xor v0.0.0-20191217153810-f85b25db303b/go.mod h1:5XA7W9S6mn
363363
github.com/tjfoc/gmsm v1.4.0 h1:8nbaiZG+iVdh+fXVw0DZoZZa7a4TGm3Qab+xdrdzj8s=
364364
github.com/tjfoc/gmsm v1.4.0/go.mod h1:j4INPkHWMrhJb38G+J6W4Tw0AbuN8Thu3PbdVYhVcTE=
365365
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
366+
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
367+
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
366368
github.com/valyala/fastrand v0.0.0-20170531153657-19dd0f0bf014 h1:wMIk7zCJy828JUr6OlYwXO11ijcXtEPDxoCaktP5p8w=
367369
github.com/valyala/fastrand v0.0.0-20170531153657-19dd0f0bf014/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
368370
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=

โ€Žprotocol/message.go

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,22 @@
11
package protocol
22

33
import (
4-
"bytes"
54
"encoding/binary"
65
"errors"
76
"fmt"
87
"io"
98

109
"github.com/smallnest/rpcx/util"
10+
"github.com/valyala/bytebufferpool"
1111
)
1212

13-
var (
14-
bufferPool = util.NewLimitedPool(512, 4096)
15-
)
16-
var (
17-
// Compressors are compressors supported by rpcx. You can add customized compressor in Compressors.
18-
Compressors = map[CompressType]Compressor{
19-
None: &RawDataCompressor{},
20-
Gzip: &GzipCompressor{},
21-
}
22-
)
13+
var bufferPool = util.NewLimitedPool(512, 4096)
14+
15+
// Compressors are compressors supported by rpcx. You can add customized compressor in Compressors.
16+
var Compressors = map[CompressType]Compressor{
17+
None: &RawDataCompressor{},
18+
Gzip: &GzipCompressor{},
19+
}
2320

2421
// MaxMessageLength is the max length of a message.
2522
// Default is 0 that means does not limit length of messages.
@@ -233,7 +230,9 @@ func (m Message) Encode() []byte {
233230

234231
// EncodeSlicePointer encodes messages as a byte slice poiter we we can use pool to improve.
235232
func (m Message) EncodeSlicePointer() *[]byte {
236-
meta := encodeMetadata(m.Metadata)
233+
bb := bytebufferpool.Get()
234+
encodeMetadata(m.Metadata, bb)
235+
meta := bb.Bytes()
237236

238237
spL := len(m.ServicePath)
239238
smL := len(m.ServiceMethod)
@@ -264,7 +263,7 @@ func (m Message) EncodeSlicePointer() *[]byte {
264263
data := bufferPool.Get(l)
265264
copy(*data, m.Header[:])
266265

267-
//totalLen
266+
// totalLen
268267
binary.BigEndian.PutUint32((*data)[12:16], uint32(totalL))
269268

270269
binary.BigEndian.PutUint32((*data)[16:20], uint32(spL))
@@ -276,6 +275,8 @@ func (m Message) EncodeSlicePointer() *[]byte {
276275
binary.BigEndian.PutUint32((*data)[metaStart:metaStart+4], uint32(len(meta)))
277276
copy((*data)[metaStart+4:], meta)
278277

278+
bytebufferpool.Put(bb)
279+
279280
binary.BigEndian.PutUint32((*data)[payLoadStart:payLoadStart+4], uint32(len(payload)))
280281
copy((*data)[payLoadStart+4:], payload)
281282

@@ -295,7 +296,9 @@ func (m Message) WriteTo(w io.Writer) (int64, error) {
295296
return n, err
296297
}
297298

298-
meta := encodeMetadata(m.Metadata)
299+
bb := bytebufferpool.Get()
300+
encodeMetadata(m.Metadata, bb)
301+
meta := bb.Bytes()
299302

300303
spL := len(m.ServicePath)
301304
smL := len(m.ServiceMethod)
@@ -318,7 +321,7 @@ func (m Message) WriteTo(w io.Writer) (int64, error) {
318321
return n, err
319322
}
320323

321-
//write servicePath and serviceMethod
324+
// write servicePath and serviceMethod
322325
err = binary.Write(w, binary.BigEndian, uint32(len(m.ServicePath)))
323326
if err != nil {
324327
return n, err
@@ -346,7 +349,9 @@ func (m Message) WriteTo(w io.Writer) (int64, error) {
346349
return n, err
347350
}
348351

349-
//write payload
352+
bytebufferpool.Put(bb)
353+
354+
// write payload
350355
err = binary.Write(w, binary.BigEndian, uint32(len(payload)))
351356
if err != nil {
352357
return n, err
@@ -357,21 +362,19 @@ func (m Message) WriteTo(w io.Writer) (int64, error) {
357362
}
358363

359364
// len,string,len,string,......
360-
func encodeMetadata(m map[string]string) []byte {
365+
func encodeMetadata(m map[string]string, bb *bytebufferpool.ByteBuffer) {
361366
if len(m) == 0 {
362-
return []byte{}
367+
return
363368
}
364-
var buf bytes.Buffer
365-
var d = make([]byte, 4)
369+
d := poolUint32Data.Get().(*[]byte)
366370
for k, v := range m {
367-
binary.BigEndian.PutUint32(d, uint32(len(k)))
368-
buf.Write(d)
369-
buf.Write(util.StringToSliceByte(k))
370-
binary.BigEndian.PutUint32(d, uint32(len(v)))
371-
buf.Write(d)
372-
buf.Write(util.StringToSliceByte(v))
371+
binary.BigEndian.PutUint32(*d, uint32(len(k)))
372+
bb.Write(*d)
373+
bb.Write(util.StringToSliceByte(k))
374+
binary.BigEndian.PutUint32(*d, uint32(len(v)))
375+
bb.Write(*d)
376+
bb.Write(util.StringToSliceByte(v))
373377
}
374-
return buf.Bytes()
375378
}
376379

377380
func decodeMetadata(l uint32, data []byte) (map[string]string, error) {
@@ -430,7 +433,7 @@ func (m *Message) Decode(r io.Reader) error {
430433
return err
431434
}
432435

433-
//total
436+
// total
434437
lenData := poolUint32Data.Get().(*[]byte)
435438
_, err = io.ReadFull(r, *lenData)
436439
if err != nil {
@@ -445,7 +448,7 @@ func (m *Message) Decode(r io.Reader) error {
445448
}
446449

447450
totalL := int(l)
448-
if cap(m.data) >= totalL { //reuse data
451+
if cap(m.data) >= totalL { // reuse data
449452
m.data = m.data[:totalL]
450453
} else {
451454
m.data = make([]byte, totalL)
@@ -514,8 +517,10 @@ func (m *Message) Reset() {
514517
m.ServiceMethod = ""
515518
}
516519

517-
var zeroHeaderArray Header
518-
var zeroHeader = zeroHeaderArray[1:]
520+
var (
521+
zeroHeaderArray Header
522+
zeroHeader = zeroHeaderArray[1:]
523+
)
519524

520525
func resetHeader(h *Header) {
521526
copy(h[1:], zeroHeader)

โ€Žprotocol/message_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package protocol
22

33
import (
44
"bytes"
5-
"fmt"
65
"testing"
76
)
87

@@ -37,8 +36,6 @@ func TestMessage(t *testing.T) {
3736
t.Fatal(err)
3837
}
3938

40-
fmt.Println(buf.Bytes())
41-
4239
res, err := Read(&buf)
4340
if err != nil {
4441
t.Fatal(err)

โ€Žserver/pool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ type Reset interface {
1212
Reset()
1313
}
1414

15-
var argsReplyPools = &typePools{
15+
var reflectTypePools = &typePools{
1616
pools: make(map[reflect.Type]*sync.Pool),
1717
New: func(t reflect.Type) interface{} {
1818
var argv reflect.Value

โ€Žserver/pool_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"github.com/stretchr/testify/assert"
88
)
99

10-
1110
func TestPool(t *testing.T) {
1211
// not pool anything yet
1312
UsePool = false
@@ -16,17 +15,17 @@ func TestPool(t *testing.T) {
1615

1716
intType := reflect.TypeOf(magicNumber)
1817
// init int pool
19-
argsReplyPools.Init(intType)
18+
reflectTypePools.Init(intType)
2019
// insert a integer
21-
argsReplyPools.Put(intType, magicNumber)
20+
reflectTypePools.Put(intType, magicNumber)
2221
// if UsePool == false, argsReplyPools.Get() will call reflect.New() which
2322
// returns a Value representing a pointer to a new zero value
24-
assert.Equal(t, 0, *argsReplyPools.Get(intType).(*int))
23+
assert.Equal(t, 0, *reflectTypePools.Get(intType).(*int))
2524

2625
// start pooling
2726
UsePool = true
2827

29-
argsReplyPools.Put(intType, magicNumber)
28+
reflectTypePools.Put(intType, magicNumber)
3029
// Get() will remove element from pool
31-
assert.Equal(t, magicNumber, argsReplyPools.Get(intType).(int))
32-
}
30+
assert.Equal(t, magicNumber, reflectTypePools.Get(intType).(int))
31+
}

โ€Žserver/server.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,6 @@ func (s *Server) serveByWS(ln net.Listener, rpcPath string) {
346346
}
347347

348348
func (s *Server) serveConn(conn net.Conn) {
349-
350349
if s.isShutdown() {
351350
s.closeConn(conn)
352351
return
@@ -612,7 +611,8 @@ func (s *Server) handleRequest(ctx context.Context, req *protocol.Message) (res
612611
return handleError(res, err)
613612
}
614613

615-
argv := argsReplyPools.Get(mtype.ArgType)
614+
// get a argv object from object pool
615+
argv := reflectTypePools.Get(mtype.ArgType)
616616

617617
codec := share.Codecs[req.SerializeType()]
618618
if codec == nil {
@@ -625,11 +625,13 @@ func (s *Server) handleRequest(ctx context.Context, req *protocol.Message) (res
625625
return handleError(res, err)
626626
}
627627

628-
replyv := argsReplyPools.Get(mtype.ReplyType)
628+
// and get a reply object from object pool
629+
replyv := reflectTypePools.Get(mtype.ReplyType)
629630

630631
argv, err = s.Plugins.DoPreCall(ctx, serviceName, methodName, argv)
631632
if err != nil {
632-
argsReplyPools.Put(mtype.ReplyType, replyv)
633+
// return reply to object pool
634+
reflectTypePools.Put(mtype.ReplyType, replyv)
633635
return handleError(res, err)
634636
}
635637

@@ -643,29 +645,32 @@ func (s *Server) handleRequest(ctx context.Context, req *protocol.Message) (res
643645
replyv, err = s.Plugins.DoPostCall(ctx, serviceName, methodName, argv, replyv)
644646
}
645647

646-
argsReplyPools.Put(mtype.ArgType, argv)
648+
// return argc to object pool
649+
reflectTypePools.Put(mtype.ArgType, argv)
650+
647651
if err != nil {
648652
if replyv != nil {
649653
data, err := codec.Encode(replyv)
650-
argsReplyPools.Put(mtype.ReplyType, replyv)
654+
// return reply to object pool
655+
reflectTypePools.Put(mtype.ReplyType, replyv)
651656
if err != nil {
652657
return handleError(res, err)
653658
}
654659
res.Payload = data
655660
}
656-
argsReplyPools.Put(mtype.ReplyType, replyv)
657661
return handleError(res, err)
658662
}
659663

660664
if !req.IsOneway() {
661665
data, err := codec.Encode(replyv)
662-
argsReplyPools.Put(mtype.ReplyType, replyv)
666+
// return reply to object pool
667+
reflectTypePools.Put(mtype.ReplyType, replyv)
663668
if err != nil {
664669
return handleError(res, err)
665670
}
666671
res.Payload = data
667672
} else if replyv != nil {
668-
argsReplyPools.Put(mtype.ReplyType, replyv)
673+
reflectTypePools.Put(mtype.ReplyType, replyv)
669674
}
670675

671676
if share.Trace {
@@ -695,7 +700,7 @@ func (s *Server) handleRequestForFunction(ctx context.Context, req *protocol.Mes
695700
return handleError(res, err)
696701
}
697702

698-
argv := argsReplyPools.Get(mtype.ArgType)
703+
argv := reflectTypePools.Get(mtype.ArgType)
699704

700705
codec := share.Codecs[req.SerializeType()]
701706
if codec == nil {
@@ -708,30 +713,30 @@ func (s *Server) handleRequestForFunction(ctx context.Context, req *protocol.Mes
708713
return handleError(res, err)
709714
}
710715

711-
replyv := argsReplyPools.Get(mtype.ReplyType)
716+
replyv := reflectTypePools.Get(mtype.ReplyType)
712717

713718
if mtype.ArgType.Kind() != reflect.Ptr {
714719
err = service.callForFunction(ctx, mtype, reflect.ValueOf(argv).Elem(), reflect.ValueOf(replyv))
715720
} else {
716721
err = service.callForFunction(ctx, mtype, reflect.ValueOf(argv), reflect.ValueOf(replyv))
717722
}
718723

719-
argsReplyPools.Put(mtype.ArgType, argv)
724+
reflectTypePools.Put(mtype.ArgType, argv)
720725

721726
if err != nil {
722-
argsReplyPools.Put(mtype.ReplyType, replyv)
727+
reflectTypePools.Put(mtype.ReplyType, replyv)
723728
return handleError(res, err)
724729
}
725730

726731
if !req.IsOneway() {
727732
data, err := codec.Encode(replyv)
728-
argsReplyPools.Put(mtype.ReplyType, replyv)
733+
reflectTypePools.Put(mtype.ReplyType, replyv)
729734
if err != nil {
730735
return handleError(res, err)
731736
}
732737
res.Payload = data
733738
} else if replyv != nil {
734-
argsReplyPools.Put(mtype.ReplyType, replyv)
739+
reflectTypePools.Put(mtype.ReplyType, replyv)
735740
}
736741

737742
return res, nil

โ€Žserver/service.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,10 @@ func (s *Server) registerFunction(servicePath string, fn interface{}, name strin
229229
ss.function[fname] = &functionType{fn: f, ArgType: argType, ReplyType: replyType}
230230
s.serviceMap[servicePath] = ss
231231

232-
argsReplyPools.Init(argType)
233-
argsReplyPools.Init(replyType)
232+
// init pool for reflect.Type of args and reply
233+
reflectTypePools.Init(argType)
234+
reflectTypePools.Init(replyType)
235+
234236
return fname, nil
235237
}
236238

@@ -301,8 +303,9 @@ func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
301303
}
302304
methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
303305

304-
argsReplyPools.Init(argType)
305-
argsReplyPools.Init(replyType)
306+
// init pool for reflect.Type of args and reply
307+
reflectTypePools.Init(argType)
308+
reflectTypePools.Init(replyType)
306309
}
307310
return methods
308311
}

0 commit comments

Comments
ย (0)