From 42e2142bc59e3b86cce486cc1b2a36a731702183 Mon Sep 17 00:00:00 2001 From: Patrick Taibel Date: Mon, 9 Oct 2023 17:05:10 +0200 Subject: [PATCH 1/4] Rewrite transaction logic for stable operation --- client.go | 78 ++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 48 insertions(+), 30 deletions(-) diff --git a/client.go b/client.go index 44d8fe8..08310f8 100644 --- a/client.go +++ b/client.go @@ -60,7 +60,7 @@ type Client struct { Scanner *protocol.RespScanner TransactionTimeout time.Duration queued []protocol.Command - reservedRedisConn *connection.Connection + reservedRedisConn chan *connection.Connection transactionMode transactionMode transactionDoneChannel chan interface{} } @@ -171,7 +171,13 @@ func (this *Client) FlushRedisAndRespond() (err error) { var redisConn *connection.Connection if this.reservedRedisConn != nil { - redisConn = this.reservedRedisConn + // Wait for the reserved connection to be available + redisConn = <-this.reservedRedisConn + if redisConn == nil { + // Check if a transaction timeout handler closed the channel and report to the client + this.ReadChannel <- readItem{nil, ERR_TRANSACTION_TIMEOUT} + return ERR_TRANSACTION_TIMEOUT + } } else { redisConn, err = connectionPool.GetConnection() if err != nil { @@ -183,39 +189,26 @@ func (this *Client) FlushRedisAndRespond() (err error) { defer func() { if err != nil { - // Force upstream disconnect on any error + // In case of an error the upstream and the downstream connection need to be disconnected redisConn.Disconnect() + this.ReadChannel <- readItem{nil, err} } - if this.transactionMode == transactionModeNone { - if this.transactionDoneChannel != nil { - // If there was a transaction going on, we need to stop it, which will recycle the connection - close(this.transactionDoneChannel) - } else { - // We are not in a transaction, so we can simply recycle it - connectionPool.RecycleRemoteConnection(redisConn) - } + if this.reservedRedisConn == nil { + // In case there is no transaction involved, we can simply recycle the connection + connectionPool.RecycleRemoteConnection(redisConn) } else { - // We are currently in a transaction - if err != nil { - // Reset client and server connection as we can not recover from any error states - this.ReadChannel <- readItem{nil, err} + // Handling for a transaction + if err != nil || this.transactionMode == transactionModeNone { + // Clean up after a transaction ends close(this.transactionDoneChannel) - } else if this.reservedRedisConn == nil { - this.reservedRedisConn = redisConn - this.transactionDoneChannel = make(chan interface{}, 1) - go func() { - select { - case <-this.transactionDoneChannel: - case <-time.After(this.TransactionTimeout): - log.Error("Transaction timed out. Disconnecting the connection.") - this.ReadChannel <- readItem{nil, ERR_TRANSACTION_TIMEOUT} - redisConn.Disconnect() - } - this.transactionDoneChannel = nil - this.reservedRedisConn = nil - connectionPool.RecycleRemoteConnection(redisConn) - }() + close(this.reservedRedisConn) + this.reservedRedisConn = nil + this.transactionDoneChannel = nil + connectionPool.RecycleRemoteConnection(redisConn) + } else { + // Reserve the connection for this client + this.reservedRedisConn <- redisConn } } }() @@ -257,6 +250,31 @@ func (this *Client) FlushRedisAndRespond() (err error) { this.Writer.Flush() + if this.transactionMode != transactionModeNone && this.reservedRedisConn == nil { + // A new transaction was started + reservedRedisConn := make(chan *connection.Connection, 1) + transactionDoneChannel := make(chan interface{}, 1) + this.reservedRedisConn = reservedRedisConn + this.transactionDoneChannel = transactionDoneChannel + + go func() { + select { + case <-transactionDoneChannel: + // Exit the routine if the transaction has already finished + case <-time.After(this.TransactionTimeout): + // Handle a transaction timeout, needs to reserve the redis connection to prevent race conditions + redisConn = <-reservedRedisConn + if redisConn != nil { + // Only enforce the timeout if we actually got the connection back + close(reservedRedisConn) + log.Error("Transaction timed out. Disconnecting the connection.") + redisConn.Disconnect() + connectionPool.RecycleRemoteConnection(redisConn) + } + } + }() + } + return nil } From 294c2e1ff04f561c70fae12da11ba0d21dd1c26b Mon Sep 17 00:00:00 2001 From: Patrick Taibel Date: Mon, 9 Oct 2023 17:06:00 +0200 Subject: [PATCH 2/4] Improve logging --- client.go | 1 + connection/connection.go | 34 +++++++++++++--------------------- 2 files changed, 14 insertions(+), 21 deletions(-) diff --git a/client.go b/client.go index 08310f8..da24a5d 100644 --- a/client.go +++ b/client.go @@ -215,6 +215,7 @@ func (this *Client) FlushRedisAndRespond() (err error) { if redisConn.DatabaseId != this.DatabaseId { if err = redisConn.SelectDatabase(this.DatabaseId); err != nil { + log.Error("Select database failed: %s", err) return } } diff --git a/connection/connection.go b/connection/connection.go index 16e469c..f43b364 100644 --- a/connection/connection.go +++ b/connection/connection.go @@ -98,7 +98,6 @@ func (c *Connection) ReconnectIfNecessary() (err error) { c.connection, err = net.DialTimeout(c.protocol, c.endpoint, c.connectTimeout) if err != nil { - log.Error("NewConnection: Error received from dial: %s", err) c.connection = nil return err } @@ -109,7 +108,7 @@ func (c *Connection) ReconnectIfNecessary() (err error) { c.Reader = bufio.NewReader(netReadWriter) if err = c.authenticate(); err != nil { - return err + return fmt.Errorf("authentication failed: %w", err) } c.nextReconnect = time.Now().Add(c.reconnectInterval) @@ -121,16 +120,14 @@ func (c *Connection) ReconnectIfNecessary() (err error) { // Selects the given database, for the connection // If an error is returned, or if an invalid response is returned from the select, then this will return an error // If not, the connections internal database will be updated accordingly -func (this *Connection) SelectDatabase(DatabaseId int) (err error) { +func (this *Connection) SelectDatabase(DatabaseId int) error { if this.connection == nil { - log.Error("SelectDatabase: Selecting on invalid connection") - return errors.New("Selecting database on an invalid connection") + return errors.New("selecting database on an invalid connection") } - err = protocol.WriteLine([]byte(fmt.Sprintf("select %d", DatabaseId)), this.Writer, true) + err := protocol.WriteLine([]byte(fmt.Sprintf("select %d", DatabaseId)), this.Writer, true) if err != nil { - log.Error("SelectDatabase: Error received from protocol.FlushLine: %s", err) - return err + return fmt.Errorf("flush line failed: %w", err) } if line, isPrefix, err := this.Reader.ReadLine(); err != nil || isPrefix || !bytes.Equal(line, protocol.OK_RESPONSE) { @@ -138,25 +135,23 @@ func (this *Connection) SelectDatabase(DatabaseId int) (err error) { err = errors.New("unknown ReadLine error") } - log.Error("SelectDatabase: Error while attempting to select database. Err:%q Response:%q isPrefix:%t", err, line, isPrefix) this.Disconnect() - return errors.New("Invalid select response") + return fmt.Errorf("invalid select database response: err:%q Response:%q isPrefix:%t", err, line, isPrefix) } this.DatabaseId = DatabaseId - return + return nil } // Tries to authenticate the connection // If an error is returned, or if an invalid response is returned from the AUTH command, then this will return an error -func (this *Connection) authenticate() (err error) { +func (this *Connection) authenticate() error { if this.connection == nil { - log.Error("authenticate: Using an invalid connection") return errors.New("authenticating against an invalid connection") } if this.authPassword == "" { - return + return nil } authCommand := fmt.Sprintf("AUTH %s", this.authPassword) @@ -165,10 +160,9 @@ func (this *Connection) authenticate() (err error) { authCommand = fmt.Sprintf("AUTH %s %s", this.authUser, this.authPassword) } - err = protocol.WriteLine([]byte(authCommand), this.Writer, true) + err := protocol.WriteLine([]byte(authCommand), this.Writer, true) if err != nil { - log.Error("authenticate: Error received from protocol.FlushLine: %s", err) - return + return fmt.Errorf("flush line failed: %w", err) } line, isPrefix, err := this.Reader.ReadLine() @@ -177,12 +171,10 @@ func (this *Connection) authenticate() (err error) { err = errors.New("unknown ReadLine error") } - log.Error("authenticate: Error while attempting to authenticate. Err:%q Response:%q isPrefix:%t", - err, line, isPrefix) this.Disconnect() - return errors.New("invalid authentication response") + return fmt.Errorf("invalid authentication response: err:%q Response:%q isPrefix:%t", err, line, isPrefix) } - return + return nil } // Checks if the current connection is up or not From 73de69b8c794778299367fee3d2256b49e49307a Mon Sep 17 00:00:00 2001 From: Patrick Taibel Date: Mon, 9 Oct 2023 17:07:02 +0200 Subject: [PATCH 3/4] Deny new clients if the diagnostic connection is down to prevent log spamming --- server.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server.go b/server.go index 55bc2b9..2576fe0 100644 --- a/server.go +++ b/server.go @@ -247,6 +247,11 @@ func (this *RedisMultiplexer) initializeClient(localConnection net.Conn, transac myClient.Connection.Close() }() + if this.activeConnectionCount < 1 { + protocol.WriteError([]byte("No Redis server available"), myClient.Writer, true) + return + } + this.HandleClientRequests(myClient) } From 795f524df0c25dc1be2737aa51881444875c547f Mon Sep 17 00:00:00 2001 From: Patrick Taibel Date: Mon, 9 Oct 2023 17:07:26 +0200 Subject: [PATCH 4/4] Fix non existing clients getting stacked up in overload scenarios --- connection/connection_pool.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/connection/connection_pool.go b/connection/connection_pool.go index b151489..faab4ee 100644 --- a/connection/connection_pool.go +++ b/connection/connection_pool.go @@ -26,6 +26,7 @@ package connection import ( + "errors" "rmux/graphite" "rmux/log" "strings" @@ -117,7 +118,8 @@ func (cp *ConnectionPool) GetConnection() (connection *Connection, err error) { } return connection, nil - // TODO: Maybe a while/timeout/graphiteping loop? + case <-time.After(1 * time.Second): + return nil, errors.New("timeout while waiting for a new connection") } }