Skip to content

Commit 4e99c28

Browse files
committed
feat: add gorums.System and proper io.Closer support
This adds the System helper for registration, starting and stopping servers and closing connections. This also implements proper io.Closer support in the Manager's Close() method, and prevents duplicate calls to the channel's close().
1 parent 2306fac commit 4e99c28

File tree

10 files changed

+190
-34
lines changed

10 files changed

+190
-34
lines changed

channel.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ type channel struct {
8181
// to the caller.
8282
responseRouters map[uint64]request
8383
responseMut sync.Mutex
84+
closeOnceFunc func() error
8485
}
8586

8687
// newChannel creates a new channel for the given node and starts the sender
@@ -102,19 +103,22 @@ func newChannel(parentCtx context.Context, conn *grpc.ClientConn, id uint32, sen
102103
responseRouters: make(map[uint64]request),
103104
streamReady: make(chan struct{}, 1),
104105
}
106+
c.closeOnceFunc = sync.OnceValue(func() error {
107+
// important to cancel first to stop goroutines
108+
c.connCancel()
109+
if c.conn != nil {
110+
return c.conn.Close()
111+
}
112+
return nil
113+
})
105114
go c.sender()
106115
go c.receiver()
107116
return c
108117
}
109118

110-
// close closes the channel and the underlying connection.
119+
// close closes the channel and the underlying connection exactly once.
111120
func (c *channel) close() error {
112-
// important to cancel first to stop goroutines
113-
c.connCancel()
114-
if c.conn != nil {
115-
return c.conn.Close()
116-
}
117-
return nil
121+
return c.closeOnceFunc()
118122
}
119123

120124
// newNodeStream creates a new stream for this channel.

channel_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func testNodeWithoutServer(t testing.TB, opts ...ManagerOption) *Node {
8282
t.Helper()
8383
mgrOpts := append([]ManagerOption{InsecureDialOptions(t)}, opts...)
8484
mgr := NewManager(mgrOpts...)
85-
t.Cleanup(mgr.Close)
85+
t.Cleanup(Closer(t, mgr))
8686
// Use a high port number that's unlikely to have anything listening.
8787
// We use a fixed ID for simplicity.
8888
node, err := mgr.newNode("127.0.0.1:59999", 1)

config_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ var (
2626
func TestNewConfigurationEmptyNodeList(t *testing.T) {
2727
wantErr := errors.New("config: missing required node addresses")
2828
mgr := gorums.NewManager(gorums.InsecureDialOptions(t))
29-
t.Cleanup(mgr.Close)
29+
t.Cleanup(gorums.Closer(t, mgr))
3030

3131
_, err := gorums.NewConfiguration(mgr, gorums.WithNodeList([]string{}))
3232
if err == nil {
@@ -39,7 +39,7 @@ func TestNewConfigurationEmptyNodeList(t *testing.T) {
3939

4040
func TestNewConfigurationNodeList(t *testing.T) {
4141
mgr := gorums.NewManager(gorums.InsecureDialOptions(t))
42-
t.Cleanup(mgr.Close)
42+
t.Cleanup(gorums.Closer(t, mgr))
4343

4444
cfg, err := gorums.NewConfiguration(mgr, gorums.WithNodeList(nodes))
4545
if err != nil {
@@ -77,7 +77,7 @@ func TestNewConfigurationNodeList(t *testing.T) {
7777

7878
func TestNewConfigurationNodeMap(t *testing.T) {
7979
mgr := gorums.NewManager(gorums.InsecureDialOptions(t))
80-
t.Cleanup(mgr.Close)
80+
t.Cleanup(gorums.Closer(t, mgr))
8181

8282
cfg, err := gorums.NewConfiguration(mgr, gorums.WithNodeMap(nodeMap))
8383
if err != nil {
@@ -103,7 +103,7 @@ func TestNewConfigurationNodeMap(t *testing.T) {
103103

104104
func TestNewConfigurationNodeIDs(t *testing.T) {
105105
mgr := gorums.NewManager(gorums.InsecureDialOptions(t))
106-
t.Cleanup(mgr.Close)
106+
t.Cleanup(gorums.Closer(t, mgr))
107107

108108
c1, err := gorums.NewConfiguration(mgr, gorums.WithNodeList(nodes))
109109
if err != nil {
@@ -141,7 +141,7 @@ func TestNewConfigurationNodeIDs(t *testing.T) {
141141

142142
func TestNewConfigurationAnd(t *testing.T) {
143143
mgr := gorums.NewManager(gorums.InsecureDialOptions(t))
144-
t.Cleanup(mgr.Close)
144+
t.Cleanup(gorums.Closer(t, mgr))
145145

146146
c1, err := gorums.NewConfiguration(mgr, gorums.WithNodeList(nodes))
147147
if err != nil {
@@ -195,7 +195,7 @@ func TestNewConfigurationAnd(t *testing.T) {
195195

196196
func TestNewConfigurationExcept(t *testing.T) {
197197
mgr := gorums.NewManager(gorums.InsecureDialOptions(t))
198-
t.Cleanup(mgr.Close)
198+
t.Cleanup(gorums.Closer(t, mgr))
199199

200200
c1, err := gorums.NewConfiguration(mgr, gorums.WithNodeList(nodes))
201201
if err != nil {
@@ -254,7 +254,7 @@ func TestConfigConcurrentAccess(t *testing.T) {
254254

255255
func TestConfigurationWithoutErrors(t *testing.T) {
256256
mgr := gorums.NewManager(gorums.InsecureDialOptions(t))
257-
t.Cleanup(mgr.Close)
257+
t.Cleanup(gorums.Closer(t, mgr))
258258

259259
cfg, err := gorums.NewConfiguration(mgr, gorums.WithNodeMap(nodeMap))
260260
if err != nil {

mgr.go

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package gorums
22

33
import (
4+
"errors"
45
"fmt"
56
"log"
67
"sync"
@@ -50,23 +51,15 @@ func NewManager(opts ...ManagerOption) *Manager {
5051
return m
5152
}
5253

53-
func (m *Manager) closeNodeConns() {
54-
for _, node := range m.nodes {
55-
err := node.close()
56-
if err != nil && m.logger != nil {
57-
m.logger.Printf("error closing: %v", err)
58-
}
59-
}
60-
}
61-
6254
// Close closes all node connections and any client streams.
63-
func (m *Manager) Close() {
55+
func (m *Manager) Close() error {
56+
var err error
6457
m.closeOnce.Do(func() {
65-
if m.logger != nil {
66-
m.logger.Printf("closing")
58+
for _, node := range m.nodes {
59+
err = errors.Join(err, node.close())
6760
}
68-
m.closeNodeConns()
6961
})
62+
return err
7063
}
7164

7265
// NodeIDs returns the identifier of each available node. IDs are returned in

mgr_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,17 @@ func TestManagerLogging(t *testing.T) {
2323
logger = log.New(&buf, "logger: ", log.Lshortfile)
2424
)
2525
mgr := NewManager(InsecureDialOptions(t), WithLogger(logger))
26-
t.Cleanup(mgr.Close)
26+
t.Cleanup(Closer(t, mgr))
2727

28-
want := "logger: mgr.go:48: ready"
28+
want := "logger: mgr.go:49: ready"
2929
if strings.TrimSpace(buf.String()) != want {
3030
t.Errorf("logger: got %q, want %q", buf.String(), want)
3131
}
3232
}
3333

3434
func TestManagerNewNode(t *testing.T) {
3535
mgr := NewManager(InsecureDialOptions(t))
36-
t.Cleanup(mgr.Close)
36+
t.Cleanup(Closer(t, mgr))
3737

3838
_, err := NewConfiguration(mgr, WithNodeMap(nodeMap))
3939
if err != nil {
@@ -62,7 +62,7 @@ func TestManagerNewNode(t *testing.T) {
6262
func TestManagerNewNodeWithConn(t *testing.T) {
6363
addrs := TestServers(t, 3, nil)
6464
mgr := NewManager(InsecureDialOptions(t))
65-
t.Cleanup(mgr.Close)
65+
t.Cleanup(Closer(t, mgr))
6666

6767
// Create configuration with only first 2 nodes
6868
_, err := NewConfiguration(mgr, WithNodeList(addrs[:2]))

server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func TestTCPReconnection(t *testing.T) {
121121
}()
122122

123123
mgr := gorums.NewManager(gorums.InsecureDialOptions(t))
124-
t.Cleanup(mgr.Close)
124+
t.Cleanup(gorums.Closer(t, mgr))
125125

126126
cfg, err := gorums.NewConfiguration(mgr, gorums.WithNodeList([]string{addr}))
127127
if err != nil {

system.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package gorums
2+
3+
import (
4+
"errors"
5+
"io"
6+
"net"
7+
)
8+
9+
// System encapsulates the state of a Gorums system, including the server,
10+
// listener, and any registered closers (e.g. managers).
11+
type System struct {
12+
closers []io.Closer
13+
srv *Server
14+
lis net.Listener
15+
}
16+
17+
// NewSystem creates a new Gorums System with the provided listener and server options.
18+
func NewSystem(lis net.Listener, opts ...ServerOption) *System {
19+
return &System{
20+
srv: NewServer(opts...),
21+
lis: lis,
22+
}
23+
}
24+
25+
// RegisterService registers the service with the server using the provided register function.
26+
// The closer is added to the list of closers to be closed when the system is stopped.
27+
//
28+
// Example usage:
29+
//
30+
// gs := NewSystem(lis)
31+
// mgr := NewManager(...)
32+
// impl := &srvImpl{}
33+
// gs.RegisterService(mgr, func(srv *Server) {
34+
// pb.RegisterMultiPaxosServer(srv, impl)
35+
// })
36+
func (s *System) RegisterService(closer io.Closer, registerFunc func(*Server)) {
37+
if closer != nil {
38+
s.closers = append(s.closers, closer)
39+
}
40+
registerFunc(s.srv)
41+
}
42+
43+
// Serve starts the server.
44+
func (s *System) Serve() error {
45+
return s.srv.Serve(s.lis)
46+
}
47+
48+
// Stop stops the Gorums server and closes all registered closers.
49+
// It immediately closes all open connections and listeners. It cancels
50+
// all active RPCs on the server side and the corresponding pending RPCs
51+
// on the client side will get notified by connection errors.
52+
func (s *System) Stop() (errs error) {
53+
// We cannot use graceful stop here since multicast methods does not
54+
// respond to the client, and thus would block indefinitely.
55+
// The server's listener is closed by s.srv.Stop().
56+
s.srv.Stop()
57+
for _, closer := range s.closers {
58+
errs = errors.Join(errs, closer.Close())
59+
}
60+
return errs
61+
}

system_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package gorums_test
2+
3+
import (
4+
"errors"
5+
"net"
6+
"testing"
7+
"time"
8+
9+
"github.com/relab/gorums"
10+
)
11+
12+
type mockCloser struct {
13+
closed bool
14+
err error
15+
}
16+
17+
func (m *mockCloser) Close() error {
18+
m.closed = true
19+
return m.err
20+
}
21+
22+
func TestSystemLifecycle(t *testing.T) {
23+
lis, err := net.Listen("tcp", "127.0.0.1:0")
24+
if err != nil {
25+
t.Fatalf("failed to listen: %v", err)
26+
}
27+
28+
sys := gorums.NewSystem(lis)
29+
if sys == nil {
30+
t.Fatal("NewSystem returned nil")
31+
}
32+
33+
closer1 := &mockCloser{}
34+
closer2 := &mockCloser{}
35+
36+
sys.RegisterService(closer1, func(srv *gorums.Server) {
37+
// In a real scenario, we would register a Gorums service here.
38+
})
39+
sys.RegisterService(closer2, func(srv *gorums.Server) {
40+
// Register another service or just use the callback.
41+
})
42+
43+
go func() {
44+
// Serve acts as a blocking call, so run in goroutine
45+
if err := sys.Serve(); err != nil {
46+
// Serve returns error on Stop usually (or net closed)
47+
t.Logf("Serve returned: %v", err)
48+
}
49+
}()
50+
51+
// Give it a moment to start
52+
time.Sleep(10 * time.Millisecond)
53+
54+
// Stop the system
55+
if err := sys.Stop(); err != nil {
56+
t.Errorf("Stop returned error: %v", err)
57+
}
58+
59+
if !closer1.closed {
60+
t.Error("closer1 was not closed")
61+
}
62+
if !closer2.closed {
63+
t.Error("closer2 was not closed")
64+
}
65+
}
66+
67+
func TestSystemStopError(t *testing.T) {
68+
lis, err := net.Listen("tcp", "127.0.0.1:0")
69+
if err != nil {
70+
t.Fatalf("failed to listen: %v", err)
71+
}
72+
73+
sys := gorums.NewSystem(lis)
74+
errCloser := &mockCloser{err: errors.New("closer error")}
75+
76+
sys.RegisterService(errCloser, func(srv *gorums.Server) {})
77+
78+
go func() {
79+
_ = sys.Serve()
80+
}()
81+
time.Sleep(10 * time.Millisecond)
82+
83+
err = sys.Stop()
84+
if err == nil {
85+
t.Error("expected error from Stop, got nil")
86+
}
87+
}

testing_gorums.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package gorums
33
import (
44
"context"
55
"fmt"
6+
"io"
67
"iter"
78
"maps"
89
"net"
@@ -148,6 +149,16 @@ type ServerIface interface {
148149
Stop()
149150
}
150151

152+
// Closer returns a cleanup function that closes the given io.Closer.
153+
func Closer(t testing.TB, c io.Closer) func() {
154+
t.Helper()
155+
return func() {
156+
if err := c.Close(); err != nil {
157+
t.Errorf("c.Close() = %q, expected no error", err.Error())
158+
}
159+
}
160+
}
161+
151162
type serverState struct {
152163
srv ServerIface
153164
lis net.Listener

testopts.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func (to *testOptions) getOrCreateManager(t testing.TB) *Manager {
5353
// Create manager and register its cleanup LAST so it runs FIRST (LIFO)
5454
mgrOpts := append([]ManagerOption{InsecureDialOptions(t)}, to.managerOpts...)
5555
mgr := NewManager(mgrOpts...)
56-
t.Cleanup(mgr.Close)
56+
t.Cleanup(Closer(t, mgr))
5757
return mgr
5858
}
5959

0 commit comments

Comments
 (0)