Skip to content

Commit

Permalink
Merge pull request #986 from eclipse-vertx/pool-hook
Browse files Browse the repository at this point in the history
Connection pool improvements
  • Loading branch information
vietj committed Jun 28, 2021
2 parents c3bdc06 + 4e97b37 commit 9143c90
Show file tree
Hide file tree
Showing 58 changed files with 1,362 additions and 1,362 deletions.
4 changes: 4 additions & 0 deletions vertx-db2-client/src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ You can also configure the client to use an HTTP/1.x CONNECT, SOCKS4a or SOCKS5

More information can be found in the http://vertx.io/docs/vertx-core/java/#_using_a_proxy_for_client_connections[Vert.x documentation].

== Advanced pool configuration

include::pool_config.adoc[]

ifeval::["$lang" == "java"]
include::override/rxjava3.adoc[]
endif::[]
15 changes: 15 additions & 0 deletions vertx-db2-client/src/main/java/examples/SqlClientExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import io.vertx.core.Vertx;
import io.vertx.core.tracing.TracingPolicy;
import io.vertx.db2client.DB2ConnectOptions;
import io.vertx.db2client.DB2Pool;
import io.vertx.docgen.Source;
import io.vertx.sqlclient.Cursor;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.PreparedStatement;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
Expand Down Expand Up @@ -342,4 +344,17 @@ public void usingCursors03(SqlConnection connection) {
public void tracing01(DB2ConnectOptions options) {
options.setTracingPolicy(TracingPolicy.ALWAYS);
}

public void poolConfig01(DB2ConnectOptions server1, DB2ConnectOptions server2, DB2ConnectOptions server3, PoolOptions options) {
DB2Pool pool = DB2Pool.pool(Arrays.asList(server1, server2, server3), options);
}

public void poolConfig02(DB2Pool pool, String sql) {
pool.connectHandler(conn -> {
conn.query(sql).execute().onSuccess(res -> {
// Release the connection to the pool, ready to be used by the application
conn.close();
});
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@
@DataObject(generateConverter = true)
public class DB2ConnectOptions extends SqlConnectOptions {

/**
* @return the {@code options} as DB2 specific connect options
*/
public static DB2ConnectOptions wrap(SqlConnectOptions options) {
if (options instanceof DB2ConnectOptions) {
return (DB2ConnectOptions) options;
} else {
return new DB2ConnectOptions(options);
}
}

/**
* Provide a {@link DB2ConnectOptions} configured from a connection URI.
*
Expand Down
135 changes: 86 additions & 49 deletions vertx-db2-client/src/main/java/io/vertx/db2client/DB2Pool.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,21 @@
*/
package io.vertx.db2client;

import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.impl.VertxInternal;
import io.vertx.db2client.impl.DB2PoolImpl;
import io.vertx.db2client.spi.DB2Driver;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.SqlClient;
import io.vertx.sqlclient.SqlConnection;

import java.util.Collections;
import java.util.List;
import java.util.function.Function;

import static io.vertx.db2client.DB2ConnectOptions.fromUri;

Expand All @@ -33,116 +40,146 @@
public interface DB2Pool extends Pool {

/**
* Like {@link #pool(String, PoolOptions)} with a default {@code poolOptions}.
* Like {@link #pool(String, PoolOptions)} with default options.
*/
static DB2Pool pool(String connectionUri) {
return pool(connectionUri, new PoolOptions());
}

/**
* Like {@link #pool(DB2ConnectOptions, PoolOptions)} with
* {@code connectOptions} build from {@code connectionUri}.
* {@code database} build from {@code connectionUri}.
*/
static DB2Pool pool(String connectionUri, PoolOptions poolOptions) {
return pool(fromUri(connectionUri), poolOptions);
static DB2Pool pool(String connectionUri, PoolOptions options) {
return pool(fromUri(connectionUri), options);
}

/**
* Like {@link #pool(Vertx, String,PoolOptions)} with a default
* {@code poolOptions}..
* Like {@link #pool(Vertx, String,PoolOptions)} with default options.
*/
static DB2Pool pool(Vertx vertx, String connectionUri) {
return pool(vertx, fromUri(connectionUri), new PoolOptions());
}

/**
* Like {@link #pool(Vertx, DB2ConnectOptions, PoolOptions)} with
* {@code connectOptions} build from {@code connectionUri}.
* {@code database} build from {@code connectionUri}.
*/
static DB2Pool pool(Vertx vertx, String connectionUri, PoolOptions poolOptions) {
return pool(vertx, fromUri(connectionUri), poolOptions);
static DB2Pool pool(Vertx vertx, String connectionUri, PoolOptions options) {
return pool(vertx, fromUri(connectionUri), options);
}

/**
* Create a connection pool to the DB2 server configured with the given
* {@code connectOptions} and {@code poolOptions}.
* Create a connection pool to the DB2 {@code database} configured with the given {@code options}.
*
* @param connectOptions the options for the connection
* @param poolOptions the options for creating the pool
* @param database the options for the connection
* @param options the options for creating the pool
* @return the connection pool
*/
static DB2Pool pool(DB2ConnectOptions connectOptions, PoolOptions poolOptions) {
if (Vertx.currentContext() != null) {
throw new IllegalStateException(
"Running in a Vertx context => use DB2Pool#pool(Vertx, DB2ConnectOptions, PoolOptions) instead");
}
VertxOptions vertxOptions = new VertxOptions();
Vertx vertx = Vertx.vertx(vertxOptions);
return DB2PoolImpl.create((VertxInternal) vertx, true, false, connectOptions, poolOptions);
static DB2Pool pool(DB2ConnectOptions database, PoolOptions options) {
return pool(null, database, options);
}

/**
* Like {@link #pool(DB2ConnectOptions, PoolOptions)} with a specific
* {@link Vertx} instance.
*/
static DB2Pool pool(Vertx vertx, DB2ConnectOptions connectOptions, PoolOptions poolOptions) {
return DB2PoolImpl.create((VertxInternal) vertx, false, false, connectOptions, poolOptions);
static DB2Pool pool(Vertx vertx, DB2ConnectOptions database, PoolOptions options) {
return pool(vertx, Collections.singletonList(database), options);
}

/**
* Create a connection pool to the DB2 {@code databases} with round-robin selection.
* Round-robin is applied when a new connection is created by the pool.
*
* @param databases the list of servers
* @param options the options for creating the pool
* @return the connection pool
*/
static DB2Pool pool(List<DB2ConnectOptions> databases, PoolOptions options) {
return pool(null, databases, options);
}

/**
* Like {@link #pool(List, PoolOptions)} with a specific
* {@link Vertx} instance.
*/
static DB2Pool pool(Vertx vertx, List<DB2ConnectOptions> databases, PoolOptions options) {
return new DB2Driver().createPool(vertx, databases, options);
}

/**
* Like {@link #client(String, PoolOptions)} with a default {@code poolOptions}.
* Like {@link #client(String, PoolOptions)} with default options.
*/
static SqlClient client(String connectionUri) {
return client(connectionUri, new PoolOptions());
}

/**
* Like {@link #client(DB2ConnectOptions, PoolOptions)} with
* {@code connectOptions} build from {@code connectionUri}.
* {@code database} build from {@code connectionUri}.
*/
static SqlClient client(String connectionUri, PoolOptions poolOptions) {
return client(fromUri(connectionUri), poolOptions);
static SqlClient client(String connectionUri, PoolOptions options) {
return client(fromUri(connectionUri), options);
}

/**
* Like {@link #client(Vertx, String,PoolOptions)} with a default
* {@code poolOptions}..
* Like {@link #client(Vertx, String, PoolOptions)} with default options.
*/
static SqlClient client(Vertx vertx, String connectionUri) {
return client(vertx, fromUri(connectionUri), new PoolOptions());
}

/**
* Like {@link #client(Vertx, DB2ConnectOptions, PoolOptions)} with
* {@code connectOptions} build from {@code connectionUri}.
* {@code database} build from {@code connectionUri}.
*/
static SqlClient client(Vertx vertx, String connectionUri, PoolOptions poolOptions) {
return client(vertx, fromUri(connectionUri), poolOptions);
static SqlClient client(Vertx vertx, String connectionUri, PoolOptions options) {
return client(vertx, fromUri(connectionUri), options);
}

/**
* Create a pooled client to the DB2 server configured with the given
* {@code connectOptions} and {@code poolOptions}.
* Create a pooled client to the DB2 {@code database} configured with the given {@code options}.
*
* @param connectOptions the options for the connection
* @param poolOptions the options for creating the pool
* @param database the options for the connection
* @param options the options for creating the pool
* @return the connection pool
*/
static SqlClient client(DB2ConnectOptions connectOptions, PoolOptions poolOptions) {
if (Vertx.currentContext() != null) {
throw new IllegalStateException(
"Running in a Vertx context => use DB2Pool#pool(Vertx, DB2ConnectOptions, PoolOptions) instead");
}
VertxOptions vertxOptions = new VertxOptions();
Vertx vertx = Vertx.vertx(vertxOptions);
return DB2PoolImpl.create((VertxInternal) vertx, true, true, connectOptions, poolOptions);
static SqlClient client(DB2ConnectOptions database, PoolOptions options) {
return client(null, database, options);
}

/**
* Like {@link #client(DB2ConnectOptions, PoolOptions)} with a specific
* {@link Vertx} instance.
*/
static SqlClient client(Vertx vertx, DB2ConnectOptions connectOptions, PoolOptions poolOptions) {
return DB2PoolImpl.create((VertxInternal) vertx, false, true, connectOptions, poolOptions);
static SqlClient client(Vertx vertx, DB2ConnectOptions database, PoolOptions options) {
return client(vertx, Collections.singletonList(database), options);
}

/**
* Create a client backed by a connection pool to the DB2 {@code databases} with round-robin selection.
* Round-robin is applied when a new connection is created by the pool.
*
* @param databases the list of servers
* @param options the options for creating the pool
* @return the pooled client
*/
static SqlClient client(List<DB2ConnectOptions> databases, PoolOptions options) {
return client(null, databases, options);
}

/**
* Like {@link #client(List, PoolOptions)} with a specific
* {@link Vertx} instance.
*/
static SqlClient client(Vertx vertx, List<DB2ConnectOptions> databases, PoolOptions options) {
return new DB2Driver().createClient(vertx, databases, options);
}

@Override
DB2Pool connectHandler(Handler<SqlConnection> handler);

@Fluent
DB2Pool connectionProvider(Function<Context, Future<SqlConnection>> provider);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,23 @@
*/
package io.vertx.db2client.impl;

import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.NetSocketInternal;
import io.vertx.db2client.DB2ConnectOptions;
import io.vertx.sqlclient.SqlConnectOptions;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.impl.Connection;
import io.vertx.sqlclient.impl.ConnectionFactory;
import io.vertx.sqlclient.impl.SqlConnectionFactoryBase;
import io.vertx.sqlclient.impl.ConnectionFactoryBase;
import io.vertx.sqlclient.impl.tracing.QueryTracer;

public class DB2ConnectionFactory extends SqlConnectionFactoryBase implements ConnectionFactory {
public class DB2ConnectionFactory extends ConnectionFactoryBase {

private int pipeliningLimit;

Expand All @@ -50,20 +51,26 @@ protected void configureNetClientOptions(NetClientOptions netClientOptions) {
}

@Override
protected void doConnectInternal(Promise<Connection> promise) {
PromiseInternal<Connection> promiseInternal = (PromiseInternal<Connection>) promise;
EventLoopContext context = ConnectionFactory.asEventLoopContext(promiseInternal.context());
Future<NetSocket> fut = netClient.connect(socketAddress);
fut.onComplete(ar -> {
if (ar.succeeded()) {
NetSocket so = ar.result();
DB2SocketConnection conn = new DB2SocketConnection((NetSocketInternal) so, cachePreparedStatements,
preparedStatementCacheSize, preparedStatementCacheSqlFilter, pipeliningLimit, context);
conn.init();
conn.sendStartupMessage(username, password, database, properties, promise);
} else {
promise.fail(ar.cause());
}
protected Future<Connection> doConnectInternal(SocketAddress server, String username, String password, String database, EventLoopContext context) {
return netClient.connect(server).flatMap(so -> {
DB2SocketConnection conn = new DB2SocketConnection((NetSocketInternal) so, cachePreparedStatements,
preparedStatementCacheSize, preparedStatementCacheSqlFilter, pipeliningLimit, context);
conn.init();
return Future.future(p -> conn.sendStartupMessage(username, password, database, properties, p));
});
}

@Override
public Future<SqlConnection> connect(Context context) {
ContextInternal contextInternal = (ContextInternal) context;
QueryTracer tracer = contextInternal.tracer() == null ? null : new QueryTracer(contextInternal.tracer(), options);
Promise<SqlConnection> promise = contextInternal.promise();
connect(asEventLoopContext(contextInternal))
.map(conn -> {
DB2ConnectionImpl db2Connection = new DB2ConnectionImpl(contextInternal, this, conn, tracer, null);
conn.init(db2Connection);
return (SqlConnection)db2Connection;
}).onComplete(promise);
return promise.future();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.db2client.DB2ConnectOptions;
import io.vertx.db2client.DB2Connection;
import io.vertx.db2client.impl.command.PingCommand;
import io.vertx.sqlclient.impl.Connection;
import io.vertx.sqlclient.impl.ConnectionFactory;
import io.vertx.sqlclient.impl.SqlConnectionImpl;
import io.vertx.sqlclient.impl.tracing.QueryTracer;
import io.vertx.sqlclient.spi.ConnectionFactory;

public class DB2ConnectionImpl extends SqlConnectionImpl<DB2ConnectionImpl> implements DB2Connection {

Expand All @@ -41,19 +40,11 @@ public static Future<DB2Connection> connect(Vertx vertx, DB2ConnectOptions optio
return ctx.failedFuture(e);
}
ctx.addCloseHook(client);
QueryTracer tracer = ctx.tracer() == null ? null : new QueryTracer(ctx.tracer(), options);
Promise<Connection> promise = ctx.promise();
client.connect(promise);
return promise.future()
.map(conn -> {
DB2ConnectionImpl db2Connection = new DB2ConnectionImpl(client, ctx, conn, tracer, null);
conn.init(db2Connection);
return db2Connection;
});
return (Future) client.connect(ctx);
}

public DB2ConnectionImpl(DB2ConnectionFactory factory, ContextInternal context, Connection conn, QueryTracer tracer, ClientMetrics metrics) {
super(context, conn, tracer, metrics);
public DB2ConnectionImpl(ContextInternal context, ConnectionFactory factory, Connection conn, QueryTracer tracer, ClientMetrics metrics) {
super(context, factory, conn, tracer, metrics);
}

@Override
Expand Down

0 comments on commit 9143c90

Please sign in to comment.