Skip to content

Commit

Permalink
Rewrite transaction logic for stable operation
Browse files Browse the repository at this point in the history
  • Loading branch information
PatrickTaibel committed Oct 24, 2023
1 parent 0828cbb commit 42e2142
Showing 1 changed file with 48 additions and 30 deletions.
78 changes: 48 additions & 30 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Client struct {
Scanner *protocol.RespScanner
TransactionTimeout time.Duration
queued []protocol.Command
reservedRedisConn *connection.Connection
reservedRedisConn chan *connection.Connection
transactionMode transactionMode
transactionDoneChannel chan interface{}
}
Expand Down Expand Up @@ -171,7 +171,13 @@ func (this *Client) FlushRedisAndRespond() (err error) {
var redisConn *connection.Connection

if this.reservedRedisConn != nil {
redisConn = this.reservedRedisConn
// Wait for the reserved connection to be available
redisConn = <-this.reservedRedisConn
if redisConn == nil {
// Check if a transaction timeout handler closed the channel and report to the client
this.ReadChannel <- readItem{nil, ERR_TRANSACTION_TIMEOUT}
return ERR_TRANSACTION_TIMEOUT
}
} else {
redisConn, err = connectionPool.GetConnection()
if err != nil {
Expand All @@ -183,39 +189,26 @@ func (this *Client) FlushRedisAndRespond() (err error) {

defer func() {
if err != nil {
// Force upstream disconnect on any error
// In case of an error the upstream and the downstream connection need to be disconnected
redisConn.Disconnect()
this.ReadChannel <- readItem{nil, err}
}

if this.transactionMode == transactionModeNone {
if this.transactionDoneChannel != nil {
// If there was a transaction going on, we need to stop it, which will recycle the connection
close(this.transactionDoneChannel)
} else {
// We are not in a transaction, so we can simply recycle it
connectionPool.RecycleRemoteConnection(redisConn)
}
if this.reservedRedisConn == nil {
// In case there is no transaction involved, we can simply recycle the connection
connectionPool.RecycleRemoteConnection(redisConn)
} else {
// We are currently in a transaction
if err != nil {
// Reset client and server connection as we can not recover from any error states
this.ReadChannel <- readItem{nil, err}
// Handling for a transaction
if err != nil || this.transactionMode == transactionModeNone {
// Clean up after a transaction ends
close(this.transactionDoneChannel)
} else if this.reservedRedisConn == nil {
this.reservedRedisConn = redisConn
this.transactionDoneChannel = make(chan interface{}, 1)
go func() {
select {
case <-this.transactionDoneChannel:
case <-time.After(this.TransactionTimeout):
log.Error("Transaction timed out. Disconnecting the connection.")
this.ReadChannel <- readItem{nil, ERR_TRANSACTION_TIMEOUT}
redisConn.Disconnect()
}
this.transactionDoneChannel = nil
this.reservedRedisConn = nil
connectionPool.RecycleRemoteConnection(redisConn)
}()
close(this.reservedRedisConn)
this.reservedRedisConn = nil
this.transactionDoneChannel = nil
connectionPool.RecycleRemoteConnection(redisConn)
} else {
// Reserve the connection for this client
this.reservedRedisConn <- redisConn
}
}
}()
Expand Down Expand Up @@ -257,6 +250,31 @@ func (this *Client) FlushRedisAndRespond() (err error) {

this.Writer.Flush()

if this.transactionMode != transactionModeNone && this.reservedRedisConn == nil {
// A new transaction was started
reservedRedisConn := make(chan *connection.Connection, 1)
transactionDoneChannel := make(chan interface{}, 1)
this.reservedRedisConn = reservedRedisConn
this.transactionDoneChannel = transactionDoneChannel

go func() {
select {
case <-transactionDoneChannel:
// Exit the routine if the transaction has already finished
case <-time.After(this.TransactionTimeout):
// Handle a transaction timeout, needs to reserve the redis connection to prevent race conditions
redisConn = <-reservedRedisConn
if redisConn != nil {
// Only enforce the timeout if we actually got the connection back
close(reservedRedisConn)
log.Error("Transaction timed out. Disconnecting the connection.")
redisConn.Disconnect()
connectionPool.RecycleRemoteConnection(redisConn)
}
}
}()
}

return nil
}

Expand Down

0 comments on commit 42e2142

Please sign in to comment.