Skip to content

Commit f7948b5

Browse files
committed
fix: address pr review
1 parent 075b930 commit f7948b5

File tree

6 files changed

+60
-76
lines changed

6 files changed

+60
-76
lines changed

internal/pool/conn.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@ type Conn struct {
2727
onClose func() error
2828

2929
// Push notification processor for handling push notifications on this connection
30-
PushNotificationProcessor interface {
31-
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error
32-
}
30+
// Uses the same interface as defined in pool.go to avoid duplication
31+
PushNotificationProcessor PushNotificationProcessorInterface
3332
}
3433

3534
func NewConn(netConn net.Conn) *Conn {

internal/pool/pool.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ var (
2424
ErrPoolTimeout = errors.New("redis: connection pool timeout")
2525
)
2626

27+
28+
2729
var timers = sync.Pool{
2830
New: func() interface{} {
2931
t := time.NewTimer(time.Hour)
@@ -60,6 +62,12 @@ type Pooler interface {
6062
Close() error
6163
}
6264

65+
// PushNotificationProcessorInterface defines the interface for push notification processors.
66+
// This matches the main PushNotificationProcessorInterface to avoid duplication while preventing circular imports.
67+
type PushNotificationProcessorInterface interface {
68+
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error
69+
}
70+
6371
type Options struct {
6472
Dialer func(context.Context) (net.Conn, error)
6573

@@ -74,9 +82,12 @@ type Options struct {
7482
ConnMaxLifetime time.Duration
7583

7684
// Push notification processor for connections
77-
PushNotificationProcessor interface {
78-
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error
79-
}
85+
// This interface matches PushNotificationProcessorInterface to avoid duplication
86+
// while preventing circular imports
87+
PushNotificationProcessor PushNotificationProcessorInterface
88+
89+
// Protocol version for optimization (3 = RESP3 with push notifications, 2 = RESP2 without)
90+
Protocol int
8091
}
8192

8293
type lastDialErrorWrap struct {
@@ -390,8 +401,8 @@ func (p *ConnPool) popIdle() (*Conn, error) {
390401
func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
391402
if cn.rd.Buffered() > 0 {
392403
// Check if this might be push notification data
393-
if cn.PushNotificationProcessor != nil {
394-
// Try to process pending push notifications before discarding connection
404+
if cn.PushNotificationProcessor != nil && p.cfg.Protocol == 3 {
405+
// Only process for RESP3 clients (push notifications only available in RESP3)
395406
err := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd)
396407
if err != nil {
397408
internal.Logger.Printf(ctx, "push: error processing pending notifications: %v", err)
@@ -553,9 +564,9 @@ func (p *ConnPool) isHealthyConn(cn *Conn) bool {
553564
// Check connection health, but be aware of push notifications
554565
if err := connCheck(cn.netConn); err != nil {
555566
// If there's unexpected data and we have push notification support,
556-
// it might be push notifications
557-
if err == errUnexpectedRead && cn.PushNotificationProcessor != nil {
558-
// Try to process any pending push notifications
567+
// it might be push notifications (only for RESP3)
568+
if err == errUnexpectedRead && cn.PushNotificationProcessor != nil && p.cfg.Protocol == 3 {
569+
// Try to process any pending push notifications (only for RESP3)
559570
ctx := context.Background()
560571
if procErr := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd); procErr != nil {
561572
internal.Logger.Printf(ctx, "push: error processing pending notifications during health check: %v", procErr)

internal/pushnotif/processor.go

Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -114,35 +114,12 @@ func (v *VoidProcessor) GetRegistryForTesting() *Registry {
114114
return nil
115115
}
116116

117-
// ProcessPendingNotifications reads and discards any pending push notifications.
117+
// ProcessPendingNotifications for VoidProcessor does nothing since push notifications
118+
// are only available in RESP3 and this processor is used when they're disabled.
119+
// This avoids unnecessary buffer scanning overhead.
118120
func (v *VoidProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error {
119-
// Check for nil reader
120-
if rd == nil {
121-
return nil
122-
}
123-
124-
// Read and discard any pending push notifications to clean the buffer
125-
for {
126-
// Peek at the next reply type to see if it's a push notification
127-
replyType, err := rd.PeekReplyType()
128-
if err != nil {
129-
// No more data available or error reading
130-
break
131-
}
132-
133-
// Push notifications use RespPush type in RESP3
134-
if replyType != proto.RespPush {
135-
break
136-
}
137-
138-
// Read and discard the push notification
139-
_, err = rd.ReadReply()
140-
if err != nil {
141-
return fmt.Errorf("failed to read push notification for discarding: %w", err)
142-
}
143-
144-
// Notification discarded - continue to next one
145-
}
146-
121+
// VoidProcessor is used when push notifications are disabled (typically RESP2 or disabled RESP3).
122+
// Since push notifications only exist in RESP3, we can safely skip all processing
123+
// to avoid unnecessary buffer scanning overhead.
147124
return nil
148125
}

options.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,11 +221,11 @@ type Options struct {
221221
// When enabled, the client will process RESP3 push notifications and
222222
// route them to registered handlers.
223223
//
224-
// For RESP3 connections (Protocol: 3), push notifications are automatically enabled.
225-
// To disable push notifications for RESP3, use Protocol: 2 instead.
224+
// For RESP3 connections (Protocol: 3), push notifications are always enabled
225+
// and cannot be disabled. To avoid push notifications, use Protocol: 2 (RESP2).
226226
// For RESP2 connections, push notifications are not available.
227227
//
228-
// default: automatically enabled for RESP3, disabled for RESP2
228+
// default: always enabled for RESP3, disabled for RESP2
229229
PushNotifications bool
230230

231231
// PushNotificationProcessor is the processor for handling push notifications.
@@ -609,5 +609,7 @@ func newConnPool(
609609
ConnMaxLifetime: opt.ConnMaxLifetime,
610610
// Pass push notification processor for connection initialization
611611
PushNotificationProcessor: opt.PushNotificationProcessor,
612+
// Pass protocol version for push notification optimization
613+
Protocol: opt.Protocol,
612614
})
613615
}

redis.go

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -755,7 +755,7 @@ func NewClient(opt *Options) *Client {
755755
}
756756
opt.init()
757757

758-
// Enable push notifications by default for RESP3
758+
// Push notifications are always enabled for RESP3 (cannot be disabled)
759759
// Only override if no custom processor is provided
760760
if opt.Protocol == 3 && opt.PushNotificationProcessor == nil {
761761
opt.PushNotifications = true
@@ -811,18 +811,27 @@ func (c *Client) Options() *Options {
811811
return c.opt
812812
}
813813

814-
// initializePushProcessor initializes the push notification processor.
815-
func (c *Client) initializePushProcessor() {
814+
// initializePushProcessor initializes the push notification processor for any client type.
815+
// This is a shared helper to avoid duplication across NewClient, NewFailoverClient, and NewSentinelClient.
816+
func initializePushProcessor(opt *Options, useVoidByDefault bool) PushNotificationProcessorInterface {
816817
// Always use custom processor if provided
817-
if c.opt.PushNotificationProcessor != nil {
818-
c.pushProcessor = c.opt.PushNotificationProcessor
819-
} else if c.opt.PushNotifications {
818+
if opt.PushNotificationProcessor != nil {
819+
return opt.PushNotificationProcessor
820+
}
821+
822+
// For regular clients, respect the PushNotifications setting
823+
if !useVoidByDefault && opt.PushNotifications {
820824
// Create default processor when push notifications are enabled
821-
c.pushProcessor = NewPushNotificationProcessor()
822-
} else {
823-
// Create void processor when push notifications are disabled
824-
c.pushProcessor = NewVoidPushNotificationProcessor()
825+
return NewPushNotificationProcessor()
825826
}
827+
828+
// Create void processor when push notifications are disabled or for specialized clients
829+
return NewVoidPushNotificationProcessor()
830+
}
831+
832+
// initializePushProcessor initializes the push notification processor for this client.
833+
func (c *Client) initializePushProcessor() {
834+
c.pushProcessor = initializePushProcessor(c.opt, false)
826835
}
827836

828837
// RegisterPushNotificationHandler registers a handler for a specific push notification name.
@@ -976,13 +985,9 @@ func newConn(opt *Options, connPool pool.Pooler, parentHooks *hooksMixin) *Conn
976985
c.hooksMixin = parentHooks.clone()
977986
}
978987

979-
// Set push notification processor from options, ensure it's never nil
980-
if opt.PushNotificationProcessor != nil {
981-
c.pushProcessor = opt.PushNotificationProcessor
982-
} else {
983-
// Create a void processor if none provided to ensure we never have nil
984-
c.pushProcessor = NewVoidPushNotificationProcessor()
985-
}
988+
// Initialize push notification processor using shared helper
989+
// Use void processor by default for connections (typically don't need push notifications)
990+
c.pushProcessor = initializePushProcessor(opt, true)
986991

987992
c.cmdable = c.Process
988993
c.statefulCmdable = c.Process

sentinel.go

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -431,14 +431,9 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
431431
}
432432
rdb.init()
433433

434-
// Initialize push notification processor similar to regular client
435-
if opt.PushNotificationProcessor != nil {
436-
rdb.pushProcessor = opt.PushNotificationProcessor
437-
} else if opt.PushNotifications {
438-
rdb.pushProcessor = NewPushNotificationProcessor()
439-
} else {
440-
rdb.pushProcessor = NewVoidPushNotificationProcessor()
441-
}
434+
// Initialize push notification processor using shared helper
435+
// Use void processor by default for failover clients (typically don't need push notifications)
436+
rdb.pushProcessor = initializePushProcessor(opt, true)
442437

443438
connPool = newConnPool(opt, rdb.dialHook)
444439
rdb.connPool = connPool
@@ -506,14 +501,9 @@ func NewSentinelClient(opt *Options) *SentinelClient {
506501
},
507502
}
508503

509-
// Initialize push notification processor similar to regular client
510-
if opt.PushNotificationProcessor != nil {
511-
c.pushProcessor = opt.PushNotificationProcessor
512-
} else if opt.PushNotifications {
513-
c.pushProcessor = NewPushNotificationProcessor()
514-
} else {
515-
c.pushProcessor = NewVoidPushNotificationProcessor()
516-
}
504+
// Initialize push notification processor using shared helper
505+
// Use void processor by default for sentinel clients (typically don't need push notifications)
506+
c.pushProcessor = initializePushProcessor(opt, true)
517507

518508
c.initHooks(hooks{
519509
dial: c.baseClient.dial,

0 commit comments

Comments
 (0)