Skip to content

Commit

Permalink
Merge pull request #7 from anexia/ptaibel/SIANXSVC-1058_stability_imp…
Browse files Browse the repository at this point in the history
…rovements

SIANXSVC-1058 stability improvements
  • Loading branch information
beachmachine authored Sep 28, 2023
2 parents 5ed8952 + 5e4fd39 commit 0828cbb
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 82 deletions.
29 changes: 13 additions & 16 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,46 +182,46 @@ func (this *Client) FlushRedisAndRespond() (err error) {
}

defer func() {
if this.transactionMode == transactionModeNone {
// We are not in a transaction, so we can simply recycle it
connectionPool.RecycleRemoteConnection(redisConn)

if this.reservedRedisConn != nil {
this.reservedRedisConn = nil
if err != nil {
// Force upstream disconnect on any error
redisConn.Disconnect()
}

}
if this.transactionMode == transactionModeNone {
if this.transactionDoneChannel != nil {
// If there was a transaction going on, we need to stop it, which will recycle the connection
close(this.transactionDoneChannel)
this.transactionDoneChannel = nil
} else {
// We are not in a transaction, so we can simply recycle it
connectionPool.RecycleRemoteConnection(redisConn)
}
} else {
// We are currently in a transaction
if err != nil {
// Reset client and server connection as we can not recover from any error states
this.ReadChannel <- readItem{nil, err}
redisConn.Disconnect()
close(this.transactionDoneChannel)
connectionPool.RecycleRemoteConnection(redisConn)
} else if this.reservedRedisConn == nil {
this.reservedRedisConn = redisConn
this.transactionDoneChannel = make(chan interface{}, 1)
go func() {
select {
case <-this.transactionDoneChannel:
case <-time.After(this.TransactionTimeout):
log.Error("Transaction timed out. Disconnecting the connection.")
this.ReadChannel <- readItem{nil, ERR_TRANSACTION_TIMEOUT}
redisConn.Disconnect()
connectionPool.RecycleRemoteConnection(redisConn)
}
this.transactionDoneChannel = nil
this.reservedRedisConn = nil
connectionPool.RecycleRemoteConnection(redisConn)
}()
}
}
}()

if redisConn.DatabaseId != this.DatabaseId {
if err = redisConn.SelectDatabase(this.DatabaseId); err != nil {
// Disconnect the current connection if selecting failed, will auto-reconnect this connection holder when queried later
redisConn.Disconnect()
return
}
}
Expand All @@ -235,7 +235,6 @@ func (this *Client) FlushRedisAndRespond() (err error) {
_, err = redisConn.Writer.Write(command.GetBuffer())
if err != nil {
log.Error("Error when writing to server: %s. Disconnecting the connection.", err)
redisConn.Disconnect()
return
}
}
Expand All @@ -244,7 +243,6 @@ func (this *Client) FlushRedisAndRespond() (err error) {
err = redisConn.Writer.Flush()
if err != nil {
log.Error("Error when flushing to server: %s. Disconnecting the connection.", err)
redisConn.Disconnect()
return
}
}
Expand All @@ -253,7 +251,6 @@ func (this *Client) FlushRedisAndRespond() (err error) {

if err = protocol.CopyServerResponses(redisConn.Reader, this.Writer, numCommands); err != nil {
log.Error("Error when copying redis responses to client: %s. Disconnecting the connection.", err)
redisConn.Disconnect()
this.ReadChannel <- readItem{nil, err}
return
}
Expand Down
39 changes: 25 additions & 14 deletions connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,21 @@ type Connection struct {
// The writer to the redis server
Writer *writer.FlexibleWriter

protocol string
endpoint string
authUser string
authPassword string
connectTimeout time.Duration
readTimeout time.Duration
writeTimeout time.Duration
protocol string
endpoint string
authUser string
authPassword string
connectTimeout time.Duration
readTimeout time.Duration
writeTimeout time.Duration
reconnectInterval time.Duration
nextReconnect time.Time
}

// Initializes a new connection, of the given protocol and endpoint, with the given connection timeout
// ex: "unix", "/tmp/myAwesomeSocket", 50*time.Millisecond
func NewConnection(Protocol, Endpoint string, ConnectTimeout, ReadTimeout, WriteTimeout time.Duration,
authUser string, authPassword string) *Connection {
reconnectInterval time.Duration, authUser string, authPassword string) *Connection {
c := &Connection{}
c.protocol = Protocol
c.endpoint = Endpoint
Expand All @@ -70,13 +72,14 @@ func NewConnection(Protocol, Endpoint string, ConnectTimeout, ReadTimeout, Write
c.connectTimeout = ConnectTimeout
c.readTimeout = ReadTimeout
c.writeTimeout = WriteTimeout
c.reconnectInterval = reconnectInterval
return c
}

func (c *Connection) Disconnect() {
if c.connection != nil {
c.connection.Close()
log.Info("Disconnected a connection")
log.Debug("Disconnected a connection")
graphite.Increment("disconnect")
}
c.connection = nil
Expand All @@ -86,7 +89,7 @@ func (c *Connection) Disconnect() {
}

func (c *Connection) ReconnectIfNecessary() (err error) {
if c.IsConnected() {
if c.IsConnected() && time.Now().Before(c.nextReconnect) {
return nil
}

Expand All @@ -109,6 +112,9 @@ func (c *Connection) ReconnectIfNecessary() (err error) {
return err
}

c.nextReconnect = time.Now().Add(c.reconnectInterval)
log.Debug("Connected a connection")

return nil
}

Expand Down Expand Up @@ -181,6 +187,7 @@ func (this *Connection) authenticate() (err error) {

// Checks if the current connection is up or not
// If we do not get a response, or if we do not get a PONG reply, or if there is any error, returns false
// This is only used for diagnostic connections!
func (myConnection *Connection) CheckConnection() bool {
if myConnection.connection == nil {
return false
Expand All @@ -194,7 +201,8 @@ func (myConnection *Connection) CheckConnection() bool {
startWrite := time.Now()
err := protocol.WriteLine(protocol.SHORT_PING_COMMAND, myConnection.Writer, true)
if err != nil {
log.Error("CheckConnection: Could not write PING Err:%s Timing:%s", err, time.Now().Sub(startWrite))
log.Error("CheckConnection: Could not write PING on diagnostics connection. Err:%s Timing:%s",
err, time.Now().Sub(startWrite))
myConnection.Disconnect()
return false
}
Expand All @@ -206,11 +214,12 @@ func (myConnection *Connection) CheckConnection() bool {
return true
} else {
if err != nil {
log.Error("CheckConnection: Could not read PING. Error: %s Timing:%s", err, time.Now().Sub(startRead))
log.Error("CheckConnection: Could not read PING on diagnostics connection. Error: %s Timing:%s",
err, time.Now().Sub(startRead))
} else if isPrefix {
log.Error("CheckConnection: ReadLine returned prefix: %q", line)
log.Error("CheckConnection: ReadLine returned prefix on diagnostics connection: %q", line)
} else {
log.Error("CheckConnection: Expected PONG response. Got: %q", line)
log.Error("CheckConnection: Expected PONG response on diagnostics connection. Got: %q", line)
}
myConnection.Disconnect()
return false
Expand Down Expand Up @@ -239,7 +248,9 @@ func (c *Connection) IsConnected() bool {
}

if n != 0 {
// If we get stuff back here, the connection is most likely unusable at this point
log.Warn("Got %d bytes back when we expected 0.", n)
return false
}

return true
Expand Down
17 changes: 9 additions & 8 deletions connection/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
EXTERN_READ_TIMEOUT = time.Millisecond * 500
//Default write timeout, for connection pools. Can be adjusted on individual pools after initialization
EXTERN_WRITE_TIMEOUT = time.Millisecond * 500
// Default reconnect interval, for connection pools. Can be adjusted on individual pools after initialization
EXTERN_RECONNECT_INTERVAL = time.Hour * 24
)

// A pool of connections to a single outbound redis server
Expand All @@ -59,6 +61,8 @@ type ConnectionPool struct {
ReadTimeout time.Duration
//An overridable write timeout. Defaults to EXTERN_WRITE_TIMEOUT
WriteTimeout time.Duration
//An overridable reconnection interval. Defaults to EXTERN_RECONNECT_INTERVAL
ReconnectInterval time.Duration
//channel of recycled connections, for re-use
connectionPool chan *Connection
// The connection used for diagnostics (like checking that the pool is up)
Expand All @@ -74,7 +78,7 @@ type ConnectionPool struct {
// Initialize a new connection pool, for the given protocol/endpoint, with a given pool capacity
// ex: "unix", "/tmp/myAwesomeSocket", 5
func NewConnectionPool(Protocol, Endpoint string, poolCapacity int, connectTimeout time.Duration,
readTimeout time.Duration, writeTimeout time.Duration, authUser string,
readTimeout time.Duration, writeTimeout time.Duration, reconnectInterval time.Duration, authUser string,
authPassword string) (newConnectionPool *ConnectionPool) {
newConnectionPool = &ConnectionPool{}
newConnectionPool.Protocol = Protocol
Expand All @@ -85,6 +89,7 @@ func NewConnectionPool(Protocol, Endpoint string, poolCapacity int, connectTimeo
newConnectionPool.ConnectTimeout = connectTimeout
newConnectionPool.ReadTimeout = readTimeout
newConnectionPool.WriteTimeout = writeTimeout
newConnectionPool.ReconnectInterval = reconnectInterval
newConnectionPool.Count = 0

// Fill the pool with as many handlers as it asks for
Expand Down Expand Up @@ -124,6 +129,7 @@ func (cp *ConnectionPool) CreateConnection() *Connection {
cp.ConnectTimeout,
cp.ReadTimeout,
cp.WriteTimeout,
cp.ReconnectInterval,
cp.AuthUser,
cp.AuthPassword,
)
Expand All @@ -133,7 +139,7 @@ func (cp *ConnectionPool) getDiagnosticConnection() (connection *Connection, err
cp.diagnosticConnectionLock.Lock()

if err := cp.diagnosticConnection.ReconnectIfNecessary(); err != nil {
log.Error("The diangnostic connection is down for %s:%s : %s", cp.Protocol, cp.Endpoint, err)
log.Error("The diagnostic connection is down for %s:%s : %s", cp.Protocol, cp.Endpoint, err)
cp.diagnosticConnectionLock.Unlock()
return nil, err
}
Expand Down Expand Up @@ -166,6 +172,7 @@ func (cp *ConnectionPool) IsConnected() bool {

// Checks the state of connections in this connection pool
// If a remote server has severe lag, mysteriously goes away, or stops responding all-together, returns false
// This is only used for diagnostic connections!
func (cp *ConnectionPool) CheckConnectionState() (isUp bool) {
isUp = true
defer func() {
Expand All @@ -179,12 +186,6 @@ func (cp *ConnectionPool) CheckConnectionState() (isUp bool) {
}
defer cp.releaseDiagnosticConnection()

//If we failed to bind, or if our PING fails, the pool is down
if connection == nil || connection.connection == nil {
isUp = false
return
}

if !connection.CheckConnection() {
connection.Disconnect()
isUp = false
Expand Down
4 changes: 2 additions & 2 deletions connection/connection_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestRecycleConnection(test *testing.T) {

//Setting the channel at size 2 makes this more interesting
timeout := 500 * time.Millisecond
connectionPool := NewConnectionPool("unix", testSocket, 2, timeout, timeout, timeout, "", "")
connectionPool := NewConnectionPool("unix", testSocket, 2, timeout, timeout, timeout, time.Hour, "", "")

connection, err := connectionPool.GetConnection()
if err != nil {
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestCheckConnectionState(test *testing.T) {

// Create the pool, have a size of zero so that no connections are made except for diagnostics
timeout := 10 * time.Millisecond
connectionPool := NewConnectionPool("unix", testSocket, 0, timeout, timeout, timeout, "", "")
connectionPool := NewConnectionPool("unix", testSocket, 0, timeout, timeout, timeout, time.Hour, "", "")

// get and release which will actually create the connection
connectionPool.getDiagnosticConnection()
Expand Down
16 changes: 8 additions & 8 deletions connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func verifySelectDatabaseSuccess(test *testing.T, database int) {
test.Fatal("Failed to listen on test socket ", testSocket)
}
defer listenSock.Close()
testConnection := NewConnection("unix", testSocket, 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, "", "")
testConnection := NewConnection("unix", testSocket, 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, time.Hour, "", "")
testConnection.ReconnectIfNecessary()

//read buffer does't matter
Expand Down Expand Up @@ -77,7 +77,7 @@ func verifySelectDatabaseError(test *testing.T, database int) {
defer func() {
listenSock.Close()
}()
testConnection := NewConnection("unix", testSocket, 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, "", "")
testConnection := NewConnection("unix", testSocket, 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, time.Hour, "", "")
testConnection.ReconnectIfNecessary()
//read buffer does't matter
readBuf := bufio.NewReader(bytes.NewBufferString("+NOPE\r\n"))
Expand Down Expand Up @@ -106,7 +106,7 @@ func verifySelectDatabaseTimeout(test *testing.T, database int) {
}
defer listenSock.Close()

testConnection := NewConnection("unix", testSocket, 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, "", "")
testConnection := NewConnection("unix", testSocket, 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, time.Hour, "", "")
if err := testConnection.ReconnectIfNecessary(); err != nil {
test.Fatalf("Could not connect to testSocket %s: %s", testSocket, err)
}
Expand Down Expand Up @@ -149,13 +149,13 @@ func TestNewUnixConnection(test *testing.T) {
}
defer listenSock.Close()

connection := NewConnection("unix", testSocket, 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, "", "")
connection := NewConnection("unix", testSocket, 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, time.Hour, "", "")
connection.ReconnectIfNecessary()
if connection == nil || connection.connection == nil {
test.Fatal("Connection initialization returned nil, binding to unix endpoint failed")
}

connection = NewConnection("unix", "/tmp/thisdoesnotexist", 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, "", "")
connection = NewConnection("unix", "/tmp/thisdoesnotexist", 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, time.Hour, "", "")
connection.ReconnectIfNecessary()
if connection != nil && connection.connection != nil {
test.Fatal("Connection initialization success, binding to fake unix endpoint succeeded????")
Expand All @@ -170,14 +170,14 @@ func TestNewTcpConnection(test *testing.T) {
}
defer listenSock.Close()

connection := NewConnection("tcp", testEndpoint, 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, "", "")
connection := NewConnection("tcp", testEndpoint, 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, time.Hour, "", "")
connection.ReconnectIfNecessary()
if connection == nil || connection.connection == nil {
test.Fatal("Connection initialization returned nil, binding to tcp endpoint failed")
}

//reserved sock should have nothing on it
connection = NewConnection("tcp", "localhost:49151", 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, "", "")
connection = NewConnection("tcp", "localhost:49151", 10*time.Millisecond, 10*time.Millisecond, 10*time.Millisecond, time.Hour, "", "")
connection.ReconnectIfNecessary()
if connection != nil && connection.connection != nil {
test.Fatal("Connection initialization success, binding to fake tcp endpoint succeeded????")
Expand All @@ -194,7 +194,7 @@ func TestCheckConnection(test *testing.T) {
listenSock.Close()
}()

connection := NewConnection("unix", testSocket, 100*time.Millisecond, 100*time.Millisecond, 100*time.Millisecond, "", "")
connection := NewConnection("unix", testSocket, 100*time.Millisecond, 100*time.Millisecond, 100*time.Millisecond, time.Hour, "", "")
connection.ReconnectIfNecessary()
if connection == nil {
test.Fatal("Connection initialization returned nil, binding to unix endpoint failed")
Expand Down
6 changes: 5 additions & 1 deletion doc/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Configuration can be handled either via command-line arguments or via config fil
-remoteReadTimeout=0: Timeout to set for remote redises (read)
-remoteTimeout=0: Timeout to set for remote redises (connect+read+write)
-remoteWriteTimeout=0: Timeout to set for remote redises (write)
-remoteReconnectInterval=0: Interval in which connected redis connections will be forced to reconnect in minutes
-remoteDiagnosticCheckInterval=0: Interval to check the diagnostic connection in seconds
-socket="": The socket to listen for incoming connections on. If this is provided, host and port are ignored
-tcpConnections="localhost:6380 localhost:6381": TCP connections (destination redis servers) to multiplex over
-unixConnections="": Unix connections (destination redis servers) to multiplex over
Expand Down Expand Up @@ -46,7 +48,9 @@ for the configuration json is as follows:
"remoteTimeout": int,
"remoteReadTimeout": int,
"remoteWriteTimeout": int,
"remoteConnectTimeout": int
"remoteConnectTimeout": int,
"remoteReconnectInterval": int,
"remoteDiagnosticCheckInterval": int
},
...
]
Expand Down
Loading

0 comments on commit 0828cbb

Please sign in to comment.