Skip to content

Commit

Permalink
Merge pull request #8 from anexia/ptaibel/SIANXSVC-1060_rmux_stabilit…
Browse files Browse the repository at this point in the history
…y_v2

SIANXSVC-1060: stability improvments v2
  • Loading branch information
beachmachine committed Nov 3, 2023
2 parents 0828cbb + 795f524 commit 053d73b
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 52 deletions.
79 changes: 49 additions & 30 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Client struct {
Scanner *protocol.RespScanner
TransactionTimeout time.Duration
queued []protocol.Command
reservedRedisConn *connection.Connection
reservedRedisConn chan *connection.Connection
transactionMode transactionMode
transactionDoneChannel chan interface{}
}
Expand Down Expand Up @@ -171,7 +171,13 @@ func (this *Client) FlushRedisAndRespond() (err error) {
var redisConn *connection.Connection

if this.reservedRedisConn != nil {
redisConn = this.reservedRedisConn
// Wait for the reserved connection to be available
redisConn = <-this.reservedRedisConn
if redisConn == nil {
// Check if a transaction timeout handler closed the channel and report to the client
this.ReadChannel <- readItem{nil, ERR_TRANSACTION_TIMEOUT}
return ERR_TRANSACTION_TIMEOUT
}
} else {
redisConn, err = connectionPool.GetConnection()
if err != nil {
Expand All @@ -183,45 +189,33 @@ func (this *Client) FlushRedisAndRespond() (err error) {

defer func() {
if err != nil {
// Force upstream disconnect on any error
// In case of an error the upstream and the downstream connection need to be disconnected
redisConn.Disconnect()
this.ReadChannel <- readItem{nil, err}
}

if this.transactionMode == transactionModeNone {
if this.transactionDoneChannel != nil {
// If there was a transaction going on, we need to stop it, which will recycle the connection
close(this.transactionDoneChannel)
} else {
// We are not in a transaction, so we can simply recycle it
connectionPool.RecycleRemoteConnection(redisConn)
}
if this.reservedRedisConn == nil {
// In case there is no transaction involved, we can simply recycle the connection
connectionPool.RecycleRemoteConnection(redisConn)
} else {
// We are currently in a transaction
if err != nil {
// Reset client and server connection as we can not recover from any error states
this.ReadChannel <- readItem{nil, err}
// Handling for a transaction
if err != nil || this.transactionMode == transactionModeNone {
// Clean up after a transaction ends
close(this.transactionDoneChannel)
} else if this.reservedRedisConn == nil {
this.reservedRedisConn = redisConn
this.transactionDoneChannel = make(chan interface{}, 1)
go func() {
select {
case <-this.transactionDoneChannel:
case <-time.After(this.TransactionTimeout):
log.Error("Transaction timed out. Disconnecting the connection.")
this.ReadChannel <- readItem{nil, ERR_TRANSACTION_TIMEOUT}
redisConn.Disconnect()
}
this.transactionDoneChannel = nil
this.reservedRedisConn = nil
connectionPool.RecycleRemoteConnection(redisConn)
}()
close(this.reservedRedisConn)
this.reservedRedisConn = nil
this.transactionDoneChannel = nil
connectionPool.RecycleRemoteConnection(redisConn)
} else {
// Reserve the connection for this client
this.reservedRedisConn <- redisConn
}
}
}()

if redisConn.DatabaseId != this.DatabaseId {
if err = redisConn.SelectDatabase(this.DatabaseId); err != nil {
log.Error("Select database failed: %s", err)
return
}
}
Expand Down Expand Up @@ -257,6 +251,31 @@ func (this *Client) FlushRedisAndRespond() (err error) {

this.Writer.Flush()

if this.transactionMode != transactionModeNone && this.reservedRedisConn == nil {
// A new transaction was started
reservedRedisConn := make(chan *connection.Connection, 1)
transactionDoneChannel := make(chan interface{}, 1)
this.reservedRedisConn = reservedRedisConn
this.transactionDoneChannel = transactionDoneChannel

go func() {
select {
case <-transactionDoneChannel:
// Exit the routine if the transaction has already finished
case <-time.After(this.TransactionTimeout):
// Handle a transaction timeout, needs to reserve the redis connection to prevent race conditions
redisConn = <-reservedRedisConn
if redisConn != nil {
// Only enforce the timeout if we actually got the connection back
close(reservedRedisConn)
log.Error("Transaction timed out. Disconnecting the connection.")
redisConn.Disconnect()
connectionPool.RecycleRemoteConnection(redisConn)
}
}
}()
}

return nil
}

Expand Down
34 changes: 13 additions & 21 deletions connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func (c *Connection) ReconnectIfNecessary() (err error) {

c.connection, err = net.DialTimeout(c.protocol, c.endpoint, c.connectTimeout)
if err != nil {
log.Error("NewConnection: Error received from dial: %s", err)
c.connection = nil
return err
}
Expand All @@ -109,7 +108,7 @@ func (c *Connection) ReconnectIfNecessary() (err error) {
c.Reader = bufio.NewReader(netReadWriter)

if err = c.authenticate(); err != nil {
return err
return fmt.Errorf("authentication failed: %w", err)
}

c.nextReconnect = time.Now().Add(c.reconnectInterval)
Expand All @@ -121,42 +120,38 @@ func (c *Connection) ReconnectIfNecessary() (err error) {
// Selects the given database, for the connection
// If an error is returned, or if an invalid response is returned from the select, then this will return an error
// If not, the connections internal database will be updated accordingly
func (this *Connection) SelectDatabase(DatabaseId int) (err error) {
func (this *Connection) SelectDatabase(DatabaseId int) error {
if this.connection == nil {
log.Error("SelectDatabase: Selecting on invalid connection")
return errors.New("Selecting database on an invalid connection")
return errors.New("selecting database on an invalid connection")
}

err = protocol.WriteLine([]byte(fmt.Sprintf("select %d", DatabaseId)), this.Writer, true)
err := protocol.WriteLine([]byte(fmt.Sprintf("select %d", DatabaseId)), this.Writer, true)
if err != nil {
log.Error("SelectDatabase: Error received from protocol.FlushLine: %s", err)
return err
return fmt.Errorf("flush line failed: %w", err)
}

if line, isPrefix, err := this.Reader.ReadLine(); err != nil || isPrefix || !bytes.Equal(line, protocol.OK_RESPONSE) {
if err == nil {
err = errors.New("unknown ReadLine error")
}

log.Error("SelectDatabase: Error while attempting to select database. Err:%q Response:%q isPrefix:%t", err, line, isPrefix)
this.Disconnect()
return errors.New("Invalid select response")
return fmt.Errorf("invalid select database response: err:%q Response:%q isPrefix:%t", err, line, isPrefix)
}

this.DatabaseId = DatabaseId
return
return nil
}

// Tries to authenticate the connection
// If an error is returned, or if an invalid response is returned from the AUTH command, then this will return an error
func (this *Connection) authenticate() (err error) {
func (this *Connection) authenticate() error {
if this.connection == nil {
log.Error("authenticate: Using an invalid connection")
return errors.New("authenticating against an invalid connection")
}

if this.authPassword == "" {
return
return nil
}

authCommand := fmt.Sprintf("AUTH %s", this.authPassword)
Expand All @@ -165,10 +160,9 @@ func (this *Connection) authenticate() (err error) {
authCommand = fmt.Sprintf("AUTH %s %s", this.authUser, this.authPassword)
}

err = protocol.WriteLine([]byte(authCommand), this.Writer, true)
err := protocol.WriteLine([]byte(authCommand), this.Writer, true)
if err != nil {
log.Error("authenticate: Error received from protocol.FlushLine: %s", err)
return
return fmt.Errorf("flush line failed: %w", err)
}

line, isPrefix, err := this.Reader.ReadLine()
Expand All @@ -177,12 +171,10 @@ func (this *Connection) authenticate() (err error) {
err = errors.New("unknown ReadLine error")
}

log.Error("authenticate: Error while attempting to authenticate. Err:%q Response:%q isPrefix:%t",
err, line, isPrefix)
this.Disconnect()
return errors.New("invalid authentication response")
return fmt.Errorf("invalid authentication response: err:%q Response:%q isPrefix:%t", err, line, isPrefix)
}
return
return nil
}

// Checks if the current connection is up or not
Expand Down
4 changes: 3 additions & 1 deletion connection/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
package connection

import (
"errors"
"rmux/graphite"
"rmux/log"
"strings"
Expand Down Expand Up @@ -117,7 +118,8 @@ func (cp *ConnectionPool) GetConnection() (connection *Connection, err error) {
}

return connection, nil
// TODO: Maybe a while/timeout/graphiteping loop?
case <-time.After(1 * time.Second):
return nil, errors.New("timeout while waiting for a new connection")
}
}

Expand Down
5 changes: 5 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,11 @@ func (this *RedisMultiplexer) initializeClient(localConnection net.Conn, transac
myClient.Connection.Close()
}()

if this.activeConnectionCount < 1 {
protocol.WriteError([]byte("No Redis server available"), myClient.Writer, true)
return
}

this.HandleClientRequests(myClient)
}

Expand Down

0 comments on commit 053d73b

Please sign in to comment.