Skip to content

Commit d39fbc9

Browse files
committed
vert-x3#108: Implemented connection retries in the AsyncConnectionPool
Signed-off-by: Ernesto J. Perez <[email protected]>
1 parent 22869fd commit d39fbc9

File tree

6 files changed

+61
-42
lines changed

6 files changed

+61
-42
lines changed

src/main/java/io/vertx/ext/asyncsql/impl/BaseSQLClient.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,11 @@ public abstract class BaseSQLClient {
4949
protected final Logger log = LoggerFactory.getLogger(this.getClass());
5050
protected final Vertx vertx;
5151

52-
protected int maxPoolSize;
53-
protected int transactionTimeout;
54-
protected String registerAddress;
52+
protected final JsonObject globalConfig;
5553

56-
public BaseSQLClient(Vertx vertx, JsonObject config) {
54+
public BaseSQLClient(Vertx vertx, JsonObject globalConfig) {
5755
this.vertx = vertx;
58-
this.maxPoolSize = config.getInteger("maxPoolSize", 10);
59-
this.transactionTimeout = config.getInteger("transactionTimeout", 500);
60-
this.registerAddress = config.getString("address");
56+
this.globalConfig = globalConfig;
6157
}
6258

6359
protected abstract AsyncConnectionPool pool();
@@ -96,7 +92,7 @@ public void close() {
9692
close(null);
9793
}
9894

99-
protected Configuration getConfiguration(
95+
protected Configuration getConnectionConfiguration(
10096
String defaultHost,
10197
int defaultPort,
10298
String defaultDatabase,

src/main/java/io/vertx/ext/asyncsql/impl/MYSQLClientImpl.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,9 @@ public class MYSQLClientImpl extends BaseSQLClient {
3434

3535
private final MysqlAsyncConnectionPool pool;
3636

37-
public MYSQLClientImpl(Vertx vertx,
38-
JsonObject config) {
39-
super(vertx, config);
40-
pool = new MysqlAsyncConnectionPool(vertx, maxPoolSize, getConfiguration(
37+
public MYSQLClientImpl(Vertx vertx, JsonObject globalConfig) {
38+
super(vertx, globalConfig);
39+
pool = new MysqlAsyncConnectionPool(vertx, globalConfig, getConnectionConfiguration(
4140
MySQLClient.DEFAULT_HOST,
4241
MySQLClient.DEFAULT_PORT,
4342
MySQLClient.DEFAULT_DATABASE,
@@ -46,7 +45,7 @@ public MYSQLClientImpl(Vertx vertx,
4645
MySQLClient.DEFAULT_CHARSET,
4746
MySQLClient.DEFAULT_CONNECT_TIMEOUT,
4847
MySQLClient.DEFAULT_TEST_TIMEOUT,
49-
config));
48+
globalConfig));
5049
}
5150

5251
@Override

src/main/java/io/vertx/ext/asyncsql/impl/PostgreSQLClientImpl.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717
package io.vertx.ext.asyncsql.impl;
1818

1919
import com.github.mauricio.async.db.Connection;
20-
import io.vertx.core.AsyncResult;
21-
import io.vertx.core.Future;
22-
import io.vertx.core.Handler;
2320
import io.vertx.core.Vertx;
2421
import io.vertx.core.json.JsonObject;
2522
import io.vertx.ext.asyncsql.PostgreSQLClient;
@@ -37,9 +34,9 @@ public class PostgreSQLClientImpl extends BaseSQLClient {
3734

3835
private final PostgresqlAsyncConnectionPool pool;
3936

40-
public PostgreSQLClientImpl(Vertx vertx, JsonObject config) {
41-
super(vertx, config);
42-
pool = new PostgresqlAsyncConnectionPool(vertx, maxPoolSize, getConfiguration(
37+
public PostgreSQLClientImpl(Vertx vertx, JsonObject globalConfig) {
38+
super(vertx, globalConfig);
39+
pool = new PostgresqlAsyncConnectionPool(vertx, globalConfig, getConnectionConfiguration(
4340
PostgreSQLClient.DEFAULT_HOST,
4441
PostgreSQLClient.DEFAULT_PORT,
4542
PostgreSQLClient.DEFAULT_DATABASE,
@@ -48,7 +45,7 @@ public PostgreSQLClientImpl(Vertx vertx, JsonObject config) {
4845
PostgreSQLClient.DEFAULT_CHARSET,
4946
PostgreSQLClient.DEFAULT_CONNECT_TIMEOUT,
5047
PostgreSQLClient.DEFAULT_TEST_TIMEOUT,
51-
config));
48+
globalConfig));
5249
}
5350

5451
@Override

src/main/java/io/vertx/ext/asyncsql/impl/pool/AsyncConnectionPool.java

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.vertx.core.Future;
2323
import io.vertx.core.Handler;
2424
import io.vertx.core.Vertx;
25+
import io.vertx.core.json.JsonObject;
2526
import io.vertx.core.logging.Logger;
2627
import io.vertx.core.logging.LoggerFactory;
2728
import io.vertx.ext.asyncsql.impl.ScalaUtils;
@@ -37,40 +38,64 @@
3738
*/
3839
public abstract class AsyncConnectionPool {
3940

40-
private final int maxPoolSize;
41+
public static final int DEFAULT_MAX_POOL_SIZE = 10;
42+
public static final int DEFAULT_MAX_CONNECTION_RETRIES = 0; // No connection retries by default
43+
public static final int DEFAULT_CONNECTION_RETRY_DELAY = 5_000; // 5 seconds between retries by default
4144

4245
private static final Logger logger = LoggerFactory.getLogger(AsyncConnectionPool.class);
43-
protected final Configuration configuration;
46+
47+
private final int maxPoolSize;
48+
private final int maxConnectionRetries;
49+
private final int connectionRetryDelay;
50+
51+
protected final Configuration connectionConfig;
4452
protected final Vertx vertx;
4553

4654
private int poolSize = 0;
4755
private final Deque<Connection> availableConnections = new ArrayDeque<>();
4856
private final Deque<Handler<AsyncResult<Connection>>> waiters = new ArrayDeque<>();
4957

50-
public AsyncConnectionPool(Vertx vertx, int maxPoolSize, Configuration configuration) {
58+
public AsyncConnectionPool(Vertx vertx, JsonObject globalConfig, Configuration connectionConfig) {
5159
this.vertx = vertx;
52-
this.maxPoolSize = maxPoolSize;
53-
this.configuration = configuration;
60+
this.maxPoolSize = globalConfig.getInteger("maxPoolSize", DEFAULT_MAX_POOL_SIZE);
61+
this.maxConnectionRetries = globalConfig.getInteger("maxConnectionRetries", DEFAULT_MAX_CONNECTION_RETRIES);
62+
this.connectionRetryDelay = globalConfig.getInteger("connectionRetryDelay", DEFAULT_CONNECTION_RETRY_DELAY);
63+
this.connectionConfig = connectionConfig;
5464
}
5565

5666
protected abstract Connection create();
5767

5868
private synchronized void createConnection(Handler<AsyncResult<Connection>> handler) {
5969
poolSize += 1;
70+
createAndConnect(new Handler<AsyncResult<Connection>>() {
71+
int retries = 0;
72+
@Override
73+
public void handle(AsyncResult<Connection> connectionResult) {
74+
if (connectionResult.succeeded()) {
75+
handler.handle(connectionResult);
76+
} else if (maxConnectionRetries<0 || retries<maxConnectionRetries) {
77+
retries++;
78+
logger.debug("Error creating connection. Waiting " + connectionRetryDelay + " ms for the retry " +
79+
retries + (maxConnectionRetries >= 0 ? " of " + maxConnectionRetries : ""));
80+
vertx.setTimer(connectionRetryDelay, timerId ->
81+
createAndConnect(this) // Try to connect again using this handler
82+
);
83+
} else {
84+
poolSize -= 1;
85+
notifyWaitersAboutAvailableConnection();
86+
handler.handle(connectionResult);
87+
}
88+
}
89+
});
90+
}
91+
92+
private synchronized void createAndConnect(Handler<AsyncResult<Connection>> handler) {
6093
try {
61-
Connection connection = create();
62-
connection
63-
.connect()
64-
.onComplete(ScalaUtils.toFunction1(asyncResult -> {
65-
if (asyncResult.failed()) {
66-
poolSize -= 1;
67-
notifyWaitersAboutAvailableConnection();
68-
}
69-
handler.handle(asyncResult);
70-
}), VertxEventLoopExecutionContext.create(vertx));
94+
create()
95+
.connect()
96+
.onComplete(ScalaUtils.toFunction1(handler), VertxEventLoopExecutionContext.create(vertx));
7197
} catch (Throwable e) {
7298
logger.info("creating a connection went wrong", e);
73-
poolSize -= 1;
7499
handler.handle(Future.failedFuture(e));
75100
}
76101
}

src/main/java/io/vertx/ext/asyncsql/impl/pool/MysqlAsyncConnectionPool.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.github.mauricio.async.db.mysql.MySQLConnection;
2222
import com.github.mauricio.async.db.mysql.util.CharsetMapper;
2323
import io.vertx.core.Vertx;
24+
import io.vertx.core.json.JsonObject;
2425
import io.vertx.ext.asyncsql.impl.VertxEventLoopExecutionContext;
2526

2627
/**
@@ -30,13 +31,13 @@
3031
*/
3132
public class MysqlAsyncConnectionPool extends AsyncConnectionPool {
3233

33-
public MysqlAsyncConnectionPool(Vertx vertx, int maxPoolSize, Configuration configuration) {
34-
super(vertx, maxPoolSize, configuration);
34+
public MysqlAsyncConnectionPool(Vertx vertx, JsonObject globalConfig, Configuration connectionConfig) {
35+
super(vertx, globalConfig, connectionConfig);
3536
}
3637

3738
@Override
3839
protected Connection create() {
39-
return new MySQLConnection(configuration, CharsetMapper.Instance(),
40+
return new MySQLConnection(connectionConfig, CharsetMapper.Instance(),
4041
vertx.nettyEventLoopGroup().next(),
4142
VertxEventLoopExecutionContext.create(vertx)
4243
);

src/main/java/io/vertx/ext/asyncsql/impl/pool/PostgresqlAsyncConnectionPool.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.github.mauricio.async.db.postgresql.column.PostgreSQLColumnDecoderRegistry;
2323
import com.github.mauricio.async.db.postgresql.column.PostgreSQLColumnEncoderRegistry;
2424
import io.vertx.core.Vertx;
25+
import io.vertx.core.json.JsonObject;
2526
import io.vertx.ext.asyncsql.impl.VertxEventLoopExecutionContext;
2627

2728
/**
@@ -31,14 +32,14 @@
3132
*/
3233
public class PostgresqlAsyncConnectionPool extends AsyncConnectionPool {
3334

34-
public PostgresqlAsyncConnectionPool(Vertx vertx, int maxPoolSize, Configuration configuration) {
35-
super(vertx, maxPoolSize, configuration);
35+
public PostgresqlAsyncConnectionPool(Vertx vertx, JsonObject globalConfig, Configuration connectionConfig) {
36+
super(vertx, globalConfig, connectionConfig);
3637
}
3738

3839
@Override
3940
protected Connection create() {
4041
return new PostgreSQLConnection(
41-
configuration,
42+
connectionConfig,
4243
PostgreSQLColumnEncoderRegistry.Instance(),
4344
PostgreSQLColumnDecoderRegistry.Instance(),
4445
vertx.nettyEventLoopGroup().next(),

0 commit comments

Comments
 (0)