diff --git a/benchmarks_test.go b/benchmarks_test.go index 316751a..8ba98f4 100644 --- a/benchmarks_test.go +++ b/benchmarks_test.go @@ -22,7 +22,7 @@ func Benchmark_NoHandler(b *testing.B) { go s.Run("127.0.0.1:0") // nolint defer s.Stop() // nolint - <-s.accepting + <-s.acceptingC // client client, err := net.Dial("tcp", s.Listener.Addr().String()) @@ -46,7 +46,7 @@ func Benchmark_OneHandler(b *testing.B) { go s.Run("127.0.0.1:0") // nolint defer s.Stop() // nolint - <-s.accepting + <-s.acceptingC // client client, err := net.Dial("tcp", s.Listener.Addr().String()) diff --git a/internal/examples/tcp/custom_packet/server/main.go b/internal/examples/tcp/custom_packet/server/main.go index 6db2b15..3a426a1 100644 --- a/internal/examples/tcp/custom_packet/server/main.go +++ b/internal/examples/tcp/custom_packet/server/main.go @@ -19,7 +19,7 @@ func init() { } func main() { - easytcp._log = log + easytcp.SetLogger(log) s := easytcp.NewServer(&easytcp.ServerOption{ // specify codec and packer diff --git a/router_context_test.go b/router_context_test.go index 89bd6ed..db20731 100644 --- a/router_context_test.go +++ b/router_context_test.go @@ -149,7 +149,7 @@ func Test_routeContext_Send(t *testing.T) { ctx := newTestContext(sess, nil) ctx.SetResponseMessage(NewMessage(1, []byte("test"))) go ctx.Send() - ctx2 := <-sess.respQueue + ctx2 := <-sess.respStream assert.Equal(t, ctx, ctx2) }) } @@ -161,7 +161,7 @@ func Test_routeContext_SendTo(t *testing.T) { ctx := newTestContext(sess1, nil) ctx.SetResponseMessage(NewMessage(1, []byte("test"))) go ctx.SendTo(sess2) - ctx2 := <-sess2.respQueue + ctx2 := <-sess2.respStream assert.Equal(t, ctx, ctx2) }) } diff --git a/server.go b/server.go index dc5224f..b9f4bc5 100644 --- a/server.go +++ b/server.go @@ -33,8 +33,8 @@ type Server struct { respQueueSize int router *Router printRoutes bool - accepting chan struct{} - stopped chan struct{} + acceptingC chan struct{} + stoppedC chan struct{} asyncRouter bool } @@ -79,8 +79,8 @@ func NewServer(opt *ServerOption) *Server { Codec: opt.Codec, printRoutes: !opt.DoNotPrintRoutes, router: newRouter(), - accepting: make(chan struct{}), - stopped: make(chan struct{}), + acceptingC: make(chan struct{}), + stoppedC: make(chan struct{}), asyncRouter: opt.AsyncRouter, } } @@ -116,7 +116,7 @@ func (s *Server) RunTLS(addr string, config *tls.Config) error { // acceptLoop accepts TCP connections in a loop, and handle connections in goroutines. // Returns error when error occurred. func (s *Server) acceptLoop() error { - close(s.accepting) + close(s.acceptingC) for { if s.isStopped() { _log.Tracef("server accept loop stopped") @@ -171,25 +171,25 @@ func (s *Server) handleConn(conn net.Conn) { if s.OnSessionCreate != nil { s.OnSessionCreate(sess) } - close(sess.afterCreateHook) + close(sess.afterCreateHookC) go sess.readInbound(s.router, s.readTimeout) // start reading message packet from connection. go sess.writeOutbound(s.writeTimeout) // start writing message packet to connection. select { - case <-sess.closed: // wait for session finished. - case <-s.stopped: // or the server is stopped. + case <-sess.closedC: // wait for session finished. + case <-s.stoppedC: // or the server is stopped. } if s.OnSessionClose != nil { s.OnSessionClose(sess) } - close(sess.afterCloseHook) + close(sess.afterCloseHookC) } // Stop stops server. Closing Listener and all connections. func (s *Server) Stop() error { - close(s.stopped) + close(s.stoppedC) return s.Listener.Close() } @@ -210,7 +210,7 @@ func (s *Server) NotFoundHandler(handler HandlerFunc) { func (s *Server) isStopped() bool { select { - case <-s.stopped: + case <-s.stoppedC: return true default: return false diff --git a/server_test.go b/server_test.go index c27f55b..f93a1f0 100644 --- a/server_test.go +++ b/server_test.go @@ -17,12 +17,12 @@ func TestNewServer(t *testing.T) { Codec: &JsonCodec{}, RespQueueSize: -1, }) - assert.NotNil(t, s.accepting) + assert.NotNil(t, s.acceptingC) assert.IsType(t, s.Packer, NewDefaultPacker()) assert.Equal(t, s.Codec, &JsonCodec{}) assert.Equal(t, s.respQueueSize, DefaultRespQueueSize) - assert.NotNil(t, s.accepting) - assert.NotNil(t, s.stopped) + assert.NotNil(t, s.acceptingC) + assert.NotNil(t, s.stoppedC) } func TestServer_Serve(t *testing.T) { @@ -34,7 +34,7 @@ func TestServer_Serve(t *testing.T) { assert.ErrorIs(t, server.Serve(lis), ErrServerStopped) close(done) }() - <-server.accepting + <-server.acceptingC time.Sleep(time.Millisecond * 5) err = server.Stop() assert.NoError(t, err) @@ -48,7 +48,7 @@ func TestServer_Run(t *testing.T) { assert.ErrorIs(t, server.Run("localhost:0"), ErrServerStopped) close(done) }() - <-server.accepting + <-server.acceptingC time.Sleep(time.Millisecond * 5) err := server.Stop() assert.NoError(t, err) @@ -69,7 +69,7 @@ func TestServer_RunTLS(t *testing.T) { assert.ErrorIs(t, server.RunTLS("localhost:0", cfg), ErrServerStopped) close(done) }() - <-server.accepting + <-server.acceptingC time.Sleep(time.Millisecond * 5) err = server.Stop() assert.NoError(t, err) @@ -92,7 +92,7 @@ func TestServer_acceptLoop(t *testing.T) { assert.Error(t, err) }() - <-server.accepting + <-server.acceptingC // client cli, err := net.Dial("tcp", lis.Addr().String()) @@ -127,7 +127,7 @@ func TestServer_Stop(t *testing.T) { assert.Equal(t, err, ErrServerStopped) }() - <-server.accepting + <-server.acceptingC // client cli, err := net.Dial("tcp", server.Listener.Addr().String()) @@ -197,7 +197,7 @@ func TestServer_handleConn(t *testing.T) { }() defer func() { assert.NoError(t, server.Stop()) }() - <-server.accepting + <-server.acceptingC // client cli, err := net.Dial("tcp", server.Listener.Addr().String()) @@ -235,7 +235,7 @@ func TestServer_NotFoundHandler(t *testing.T) { assert.Equal(t, err, ErrServerStopped) }() - <-server.accepting + <-server.acceptingC // client cli, err := net.Dial("tcp", server.Listener.Addr().String()) @@ -276,7 +276,7 @@ func TestServer_SessionHooks(t *testing.T) { }() defer func() { assert.NoError(t, server.Stop()) }() - <-server.accepting + <-server.acceptingC // client cli, err := net.Dial("tcp", server.Listener.Addr().String()) diff --git a/session.go b/session.go index 420ce06..206dc39 100644 --- a/session.go +++ b/session.go @@ -17,7 +17,7 @@ type Session interface { // SetID sets current session's id. SetID(id interface{}) - // Send sends the ctx to the respQueue. + // Send sends the ctx to the respStream. Send(ctx Context) bool // Codec returns the codec, can be nil. @@ -40,17 +40,17 @@ type Session interface { } type session struct { - id interface{} // session's ID. - conn net.Conn // tcp connection - closed chan struct{} // to close() - afterCreateHook chan struct{} // to close after session's on-create hook triggered - afterCloseHook chan struct{} // to close after session's on-close hook triggered - closeOnce sync.Once // ensure one session only close once - respQueue chan Context // response queue channel, pushed in Send() and popped in writeOutbound() - packer Packer // to pack and unpack message - codec Codec // encode/decode message data - ctxPool sync.Pool // router context pool - asyncRouter bool // calls router HandlerFunc in a goroutine if false + id interface{} // session's ID. + conn net.Conn // tcp connection + closedC chan struct{} // to close when read/write loop stopped + closeOnce sync.Once // ensure one session only close once + afterCreateHookC chan struct{} // to close after session's on-create hook triggered + afterCloseHookC chan struct{} // to close after session's on-close hook triggered + respStream chan Context // response queue channel, pushed in Send() and popped in writeOutbound() + packer Packer // to pack and unpack message + codec Codec // encode/decode message data + ctxPool sync.Pool // router context pool + asyncRouter bool // calls router HandlerFunc in a goroutine if false } // sessionOption is the extra options for session. @@ -67,16 +67,16 @@ type sessionOption struct { // Returns a session pointer. func newSession(conn net.Conn, opt *sessionOption) *session { return &session{ - id: uuid.NewString(), // use uuid as default - conn: conn, - closed: make(chan struct{}), - afterCreateHook: make(chan struct{}), - afterCloseHook: make(chan struct{}), - respQueue: make(chan Context, opt.respQueueSize), - packer: opt.Packer, - codec: opt.Codec, - ctxPool: sync.Pool{New: func() interface{} { return newContext() }}, - asyncRouter: opt.asyncRouter, + id: uuid.NewString(), // use uuid as default + conn: conn, + closedC: make(chan struct{}), + afterCreateHookC: make(chan struct{}), + afterCloseHookC: make(chan struct{}), + respStream: make(chan Context, opt.respQueueSize), + packer: opt.Packer, + codec: opt.Codec, + ctxPool: sync.Pool{New: func() interface{} { return newContext() }}, + asyncRouter: opt.asyncRouter, } } @@ -91,15 +91,15 @@ func (s *session) SetID(id interface{}) { s.id = id } -// Send pushes response message to respQueue. +// Send pushes response message to respStream. // Returns false if session is closed or ctx is done. func (s *session) Send(ctx Context) (ok bool) { select { case <-ctx.Done(): return false - case <-s.closed: + case <-s.closedC: return false - case s.respQueue <- ctx: + case s.respStream <- ctx: return true } } @@ -112,17 +112,17 @@ func (s *session) Codec() Codec { // Close closes the session, but doesn't close the connection. // The connection will be closed in the server once the session's closed. func (s *session) Close() { - s.closeOnce.Do(func() { close(s.closed) }) + s.closeOnce.Do(func() { close(s.closedC) }) } // AfterCreateHook blocks until session's on-create hook triggered. func (s *session) AfterCreateHook() <-chan struct{} { - return s.afterCreateHook + return s.afterCreateHookC } // AfterCloseHook blocks until session's on-close hook triggered. func (s *session) AfterCloseHook() <-chan struct{} { - return s.afterCloseHook + return s.afterCloseHookC } // AllocateContext gets a Context from pool and reset all but session. @@ -144,7 +144,7 @@ func (s *session) Conn() net.Conn { func (s *session) readInbound(router *Router, timeout time.Duration) { for { select { - case <-s.closed: + case <-s.closedC: return default: } @@ -184,16 +184,16 @@ func (s *session) handleReq(router *Router, reqMsg *Message) { s.Send(ctx) } -// writeOutbound fetches message from respQueue channel and writes to TCP connection in a loop. +// writeOutbound fetches message from respStream channel and writes to TCP connection in a loop. // Parameter writeTimeout specified the connection writing timeout. // The loop breaks if errors occurred, or the session is closed. func (s *session) writeOutbound(writeTimeout time.Duration) { for { var ctx Context select { - case <-s.closed: + case <-s.closedC: return - case ctx = <-s.respQueue: + case ctx = <-s.respStream: } outboundBytes, err := s.packResponse(ctx) diff --git a/session_test.go b/session_test.go index dc5d1d2..33f6a36 100644 --- a/session_test.go +++ b/session_test.go @@ -19,8 +19,8 @@ func TestNewTCPSession(t *testing.T) { s = newSession(nil, &sessionOption{}) }) assert.NotNil(t, s) - assert.NotNil(t, s.closed) - assert.NotNil(t, s.respQueue) + assert.NotNil(t, s.closedC) + assert.NotNil(t, s.respStream) } func TestTCPSession_close(t *testing.T) { @@ -34,7 +34,7 @@ func TestTCPSession_close(t *testing.T) { }() } wg.Wait() - _, ok := <-sess.closed + _, ok := <-sess.closedC assert.False(t, ok) } @@ -50,7 +50,7 @@ func TestTCPSession_readInbound(t *testing.T) { _ = p1.Close() sess := newSession(p1, &sessionOption{}) go sess.readInbound(nil, time.Millisecond) - <-sess.closed + <-sess.closedC }) t.Run("when connection read timeout", func(t *testing.T) { p1, _ := net.Pipe() @@ -77,7 +77,7 @@ func TestTCPSession_readInbound(t *testing.T) { close(done) }() <-done - <-sess.closed + <-sess.closedC }) t.Run("when unpack message returns nil message", func(t *testing.T) { ctrl := gomock.NewController(t) @@ -175,8 +175,8 @@ func TestTCPSession_Send(t *testing.T) { }) t.Run("when send succeed", func(t *testing.T) { sess := newSession(nil, &sessionOption{}) - sess.respQueue = make(chan Context) // no buffer - go func() { <-sess.respQueue }() + sess.respStream = make(chan Context) // no buffer + go func() { <-sess.respStream }() assert.True(t, sess.AllocateContext().SetRequestMessage(NewMessage(1, []byte("test"))).Send()) sess.Close() }) @@ -208,7 +208,7 @@ func TestTCPSession_writeOutbound(t *testing.T) { packer.EXPECT().Pack(gomock.Any()).AnyTimes().Return(nil, nil) sess := newSession(nil, &sessionOption{Packer: packer, respQueueSize: 1024}) - sess.respQueue <- sess.AllocateContext() + sess.respStream <- sess.AllocateContext() doneLoop := make(chan struct{}) go func() { sess.writeOutbound(0) // should stop looping and return @@ -228,7 +228,7 @@ func TestTCPSession_writeOutbound(t *testing.T) { sess := newSession(nil, &sessionOption{Packer: packer}) done := make(chan struct{}) go func() { - sess.respQueue <- sess.AllocateContext().SetResponseMessage(NewMessage(1, []byte("test"))) + sess.respStream <- sess.AllocateContext().SetResponseMessage(NewMessage(1, []byte("test"))) close(done) }() time.Sleep(time.Microsecond * 15) @@ -245,7 +245,7 @@ func TestTCPSession_writeOutbound(t *testing.T) { packer.EXPECT().Pack(gomock.Any()).Return(nil, nil) sess := newSession(nil, &sessionOption{Packer: packer, respQueueSize: 100}) - sess.respQueue <- sess.AllocateContext().SetResponseMessage(NewMessage(1, []byte("test"))) // push to queue + sess.respStream <- sess.AllocateContext().SetResponseMessage(NewMessage(1, []byte("test"))) // push to queue doneLoop := make(chan struct{}) go func() { sess.writeOutbound(0) @@ -264,9 +264,9 @@ func TestTCPSession_writeOutbound(t *testing.T) { p1, _ := net.Pipe() _ = p1.Close() sess := newSession(p1, &sessionOption{Packer: packer}) - go func() { sess.respQueue <- sess.AllocateContext().SetResponseMessage(NewMessage(1, []byte("test"))) }() + go func() { sess.respStream <- sess.AllocateContext().SetResponseMessage(NewMessage(1, []byte("test"))) }() go sess.writeOutbound(time.Millisecond * 10) - _, ok := <-sess.closed + _, ok := <-sess.closedC assert.False(t, ok) }) t.Run("when conn write timeout", func(t *testing.T) { @@ -278,9 +278,9 @@ func TestTCPSession_writeOutbound(t *testing.T) { p1, _ := net.Pipe() sess := newSession(p1, &sessionOption{Packer: packer}) - go func() { sess.respQueue <- sess.AllocateContext().SetResponseMessage(NewMessage(1, []byte("test"))) }() + go func() { sess.respStream <- sess.AllocateContext().SetResponseMessage(NewMessage(1, []byte("test"))) }() go sess.writeOutbound(time.Millisecond * 10) - _, ok := <-sess.closed + _, ok := <-sess.closedC assert.False(t, ok) _ = p1.Close() }) @@ -294,9 +294,9 @@ func TestTCPSession_writeOutbound(t *testing.T) { p1, _ := net.Pipe() assert.NoError(t, p1.Close()) sess := newSession(p1, &sessionOption{Packer: packer}) - go func() { sess.respQueue <- sess.AllocateContext().SetResponseMessage(NewMessage(1, []byte("test"))) }() + go func() { sess.respStream <- sess.AllocateContext().SetResponseMessage(NewMessage(1, []byte("test"))) }() sess.writeOutbound(0) // should stop looping and return - _, ok := <-sess.closed + _, ok := <-sess.closedC assert.False(t, ok) }) t.Run("when write succeed", func(t *testing.T) {