Skip to content

Commit 5bf144d

Browse files
authored
fix: eliminate send-on-closed-channel race in subscriptions (#13)
- fix: replace server channel closing with per-subscription done channel - fix: unsubscribe order — remove event handler before signaling shutdown - fix: guard handler sends with done to avoid late sends - fix: stop worker on done; never close serverCh - fix: apply same pattern to unicast and fanout subscriptions - test: add deterministic tests using fakeEventBus to simulate in-flight publish after unsubscribe Signed-off-by: Andres Morey <[email protected]>
1 parent 7db06b7 commit 5bf144d

File tree

2 files changed

+105
-17
lines changed

2 files changed

+105
-17
lines changed

dispatcher.go

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,15 @@ type Subscription struct {
6767
serverCh chan server
6868
cleanup func()
6969
unsubscribeOnce sync.Once
70+
done chan struct{}
7071
}
7172

7273
// Ends subscription
7374
func (sub *Subscription) Unsubscribe() {
7475
sub.unsubscribeOnce.Do(func() {
75-
close(sub.serverCh)
76+
// First, stop receiving new events, then signal worker shutdown.
7677
sub.cleanup()
78+
close(sub.done)
7779
})
7880
}
7981

@@ -130,12 +132,17 @@ func (d *Dispatcher) Unicast(ctx context.Context, nodeName string, fn DispatchHa
130132
// they become available until Unsubscribe() is called
131133
func (d *Dispatcher) UnicastSubscribe(ctx context.Context, nodeName string, fn DispatchHandler) (*Subscription, error) {
132134
serverCh := make(chan server)
135+
done := make(chan struct{})
133136

134137
// server handler
135138
handleNewServers := func(newServers []server) {
136139
for _, server := range newServers {
137140
if server.nodeName == nodeName {
138-
serverCh <- server
141+
select {
142+
case <-done:
143+
return
144+
case serverCh <- server:
145+
}
139146
}
140147
}
141148
}
@@ -144,14 +151,9 @@ func (d *Dispatcher) UnicastSubscribe(ctx context.Context, nodeName string, fn D
144151
go func() {
145152
for {
146153
select {
147-
case <-ctx.Done():
154+
case <-done:
148155
return
149-
case server, ok := <-serverCh:
150-
if !ok {
151-
// unsubscribe was called
152-
return
153-
}
154-
156+
case server := <-serverCh:
155157
// execute dispatch handler in goroutine
156158
connCtx := context.WithValue(ctx, dispatcherAddrCtxKey, fmt.Sprintf("%s:%s", server.ip, d.connectArgs.Port))
157159
go fn(connCtx, d.conn)
@@ -176,6 +178,7 @@ func (d *Dispatcher) UnicastSubscribe(ctx context.Context, nodeName string, fn D
176178
cleanup: func() {
177179
d.eventbus.Unsubscribe("add:servers", handleNewServers)
178180
},
181+
done: done,
179182
}, nil
180183
}
181184

@@ -213,26 +216,26 @@ func (d *Dispatcher) Fanout(ctx context.Context, fn DispatchHandler) {
213216
// they become available until Unsubscribe() is called
214217
func (d *Dispatcher) FanoutSubscribe(ctx context.Context, fn DispatchHandler) (*Subscription, error) {
215218
serverCh := make(chan server)
219+
done := make(chan struct{})
216220

217221
// server handler
218222
handleNewServers := func(newServers []server) {
219223
for _, server := range newServers {
220-
serverCh <- server
224+
select {
225+
case <-done:
226+
return
227+
case serverCh <- server:
228+
}
221229
}
222230
}
223231

224232
// worker
225233
go func() {
226234
for {
227235
select {
228-
case <-ctx.Done():
236+
case <-done:
229237
return
230-
case server, ok := <-serverCh:
231-
if !ok {
232-
// unsubscribe was called
233-
return
234-
}
235-
238+
case server := <-serverCh:
236239
// execute dispatch handler in goroutine
237240
connCtx := context.WithValue(ctx, dispatcherAddrCtxKey, fmt.Sprintf("%s:%s", server.ip, d.connectArgs.Port))
238241
go fn(connCtx, d.conn)
@@ -257,6 +260,7 @@ func (d *Dispatcher) FanoutSubscribe(ctx context.Context, fn DispatchHandler) (*
257260
cleanup: func() {
258261
d.eventbus.Unsubscribe("add:servers", handleNewServers)
259262
},
263+
done: done,
260264
}, nil
261265
}
262266

dispatcher_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,3 +266,87 @@ func TestDispatcherFanoutSubscribe(t *testing.T) {
266266
// check result
267267
require.Equal(t, wantIps, mapset.NewSet(ips...))
268268
}
269+
270+
// Exposes the race: Unsubscribe closes the internal channel, so any in-flight
271+
// event handler that tries to send will panic with: "send on closed channel".
272+
// This test models the event bus behavior by sending to the channel after
273+
// Unsubscribe; it should panic with the current implementation.
274+
// fakeEventBus lets us trigger the subscription callback even after Unsubscribe
275+
// to simulate an in-flight publish racing with unsubscribe.
276+
type fakeEventBus struct {
277+
mu sync.Mutex
278+
handler func([]server)
279+
}
280+
281+
func (b *fakeEventBus) Subscribe(topic string, fn interface{}) error {
282+
b.mu.Lock()
283+
defer b.mu.Unlock()
284+
b.handler = fn.(func([]server))
285+
return nil
286+
}
287+
func (b *fakeEventBus) SubscribeOnce(topic string, fn interface{}) error {
288+
return b.Subscribe(topic, fn)
289+
}
290+
func (b *fakeEventBus) SubscribeAsync(topic string, fn interface{}, once bool) error {
291+
b.mu.Lock()
292+
defer b.mu.Unlock()
293+
// Our code subscribes with: func([]server)
294+
b.handler = fn.(func([]server))
295+
return nil
296+
}
297+
func (b *fakeEventBus) SubscribeOnceAsync(topic string, fn interface{}) error {
298+
return b.Subscribe(topic, fn)
299+
}
300+
func (b *fakeEventBus) Unsubscribe(topic string, fn interface{}) error { return nil }
301+
func (b *fakeEventBus) HasCallback(topic string) bool {
302+
b.mu.Lock()
303+
defer b.mu.Unlock()
304+
return b.handler != nil
305+
}
306+
func (b *fakeEventBus) Publish(topic string, args ...interface{}) {
307+
b.mu.Lock()
308+
h := b.handler
309+
b.mu.Unlock()
310+
if h != nil {
311+
h(args[0].([]server))
312+
}
313+
}
314+
func (b *fakeEventBus) WaitAsync() {}
315+
316+
func TestUnicastSubscribe_UnsubscribeThenPublishDoesNotPanic(t *testing.T) {
317+
// initialize dispatcher with fake bus to control callback timing
318+
d := newTestDispatcher()
319+
fb := &fakeEventBus{}
320+
d.eventbus = fb
321+
322+
// start a subscription
323+
sub, err := d.UnicastSubscribe(context.Background(), "n1", func(ctx context.Context, clientconn *grpc.ClientConn) {})
324+
require.Nil(t, err)
325+
326+
// Unsubscribe; publishing should not panic even if handler runs
327+
sub.Unsubscribe()
328+
329+
// Simulate an in-flight publish that still calls the handler
330+
require.NotPanics(t, func() {
331+
fb.Publish("add:servers", []server{{ip: "1.2.3.4", nodeName: "n1"}})
332+
})
333+
}
334+
335+
func TestFanoutSubscribe_UnsubscribeThenPublishDoesNotPanic(t *testing.T) {
336+
// initialize dispatcher with fake bus to control callback timing
337+
d := newTestDispatcher()
338+
fb := &fakeEventBus{}
339+
d.eventbus = fb
340+
341+
// start a subscription
342+
sub, err := d.FanoutSubscribe(context.Background(), func(ctx context.Context, clientconn *grpc.ClientConn) {})
343+
require.Nil(t, err)
344+
345+
// Unsubscribe; publishing should not panic even if handler runs
346+
sub.Unsubscribe()
347+
348+
// Simulate an in-flight publish that still calls the handler
349+
require.NotPanics(t, func() {
350+
fb.Publish("add:servers", []server{{ip: "1.2.3.4", nodeName: "n1"}})
351+
})
352+
}

0 commit comments

Comments
 (0)