diff --git a/client/v3/watch.go b/client/v3/watch.go index 14ddcb920e10..e977b5e23c93 100644 --- a/client/v3/watch.go +++ b/client/v3/watch.go @@ -455,12 +455,13 @@ func (w *watcher) closeStream(wgs *watchGRPCStream) { w.mu.Unlock() } -func (w *watchGRPCStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) { +func (w *watchGRPCStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream, closing map[*watcherStream]struct{}) { // check watch ID for backward compatibility (<= v3.3) if resp.WatchId == InvalidWatchID || (resp.Canceled && resp.CancelReason != "") { w.closeErr = v3rpc.Error(errors.New(resp.CancelReason)) // failed; no channel close(ws.recvc) + closing[ws] = struct{}{} return } ws.id = resp.WatchId @@ -591,7 +592,7 @@ func (w *watchGRPCStream) run() { // response to head of queue creation if len(w.resuming) != 0 { if ws := w.resuming[0]; ws != nil { - w.addSubstream(pbresp, ws) + w.addSubstream(pbresp, ws, closing) w.dispatchEvent(pbresp) w.resuming[0] = nil }