Skip to content

Commit cfb791e

Browse files
committed
Add a connect_on_init_count pool option
#64
1 parent 693388c commit cfb791e

File tree

4 files changed

+34
-18
lines changed

4 files changed

+34
-18
lines changed

readme.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,11 @@ Initializes a connection pool. Pool options are:
5959
* `size` - Number of connections to maintain. Defaults to `10`
6060
* `auth`: - See [Conn.auth](#authopts-opts-void)
6161
* `connect`: - See the [Conn.open](#openallocator-stdmemallocator-opts-opts-conn)
62-
* `timeout` - The amount of time, in milliseconds, to wait for a connection to be available when `acquire()` is called.
62+
* `timeout`: - The amount of time, in milliseconds, to wait for a connection to be available when `acquire()` is called.
63+
* `connect_on_init_count`: - The # of connections in the pool to eagerly connect during `init`. Defaults to `null` which will initiliaze all connections (`size`). The background reconnector is used to setup the remaining (`size - connect_on_init_count`) connections. This can be set to `0`, to prevent `init` from failing except in extreme cases (i.e. OOM), but that will hide any configuration/connection issue until the first query is executed.
6364

64-
### initUri(allocator: std.mem.Allocator, uri: std.Uri, size: u16, pool_timeout_ms: u32) !*Pool
65-
Initializes a connection pool using a std.Uri.
65+
### initUri(allocator: std.mem.Allocator, uri: std.Uri, opts: Opts) !*Pool
66+
Initializes a connection pool using a std.Uri. When using this function, the `auth` and `connect` fields of `opts` should **not** be set, as these will automatically set based on the provided `uri`.
6667

6768
```zig
6869
const uri = try std.Uri.parse("postgresql://username:password@localhost:5432/database_name");

src/conn.zig

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ pub const Conn = struct {
108108
};
109109

110110
pub fn openAndAuthUri(allocator: Allocator, uri: std.Uri) !Conn {
111-
var po = try lib.parseOpts(uri, allocator, 0, 0);
111+
var po = try lib.parseOpts(uri, allocator);
112112
defer po.deinit();
113113
return try openAndAuth(allocator, po.opts.connect, po.opts.auth);
114114
}

src/lib.zig

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ pub const ParsedOpts = struct {
108108
}
109109
};
110110

111-
pub fn parseOpts(uri: std.Uri, allocator: std.mem.Allocator, size: u16, pool_timeout_ms: u32) !ParsedOpts {
111+
pub fn parseOpts(uri: std.Uri, allocator: std.mem.Allocator) !ParsedOpts {
112112
if (!std.mem.eql(u8, uri.scheme, "postgresql")) {
113113
return error.InvalidUriScheme;
114114
}
@@ -144,7 +144,8 @@ pub fn parseOpts(uri: std.Uri, allocator: std.mem.Allocator, size: u16, pool_tim
144144

145145
const path = std.mem.trimLeft(u8, try uri.path.toRawMaybeAlloc(aa), "/");
146146
return .{ .arena = arena, .opts = .{
147-
.size = size,
147+
.size = 0,
148+
.timeout = 0,
148149
.auth = .{
149150
.username = if (uri.user) |user| try user.toRawMaybeAlloc(aa) else "postgres",
150151
.password = if (uri.password) |password| try password.toRawMaybeAlloc(aa) else null,
@@ -156,7 +157,6 @@ pub fn parseOpts(uri: std.Uri, allocator: std.mem.Allocator, size: u16, pool_tim
156157
.port = uri.port orelse null,
157158
.host = if (uri.host) |host| try host.toRawMaybeAlloc(aa) else null,
158159
},
159-
.timeout = pool_timeout_ms,
160160
} };
161161
}
162162

@@ -242,22 +242,22 @@ const TestCase = struct {
242242
};
243243

244244
const valid_tcs: [2]TestCase = .{
245-
.{ .uri = "postgresql:///", .expected_opts = .{ .size = 10, .auth = .{ .username = "postgres" }, .connect = .{}, .timeout = 5000 } },
246-
.{ .uri = "postgresql://user:pass@somehost:1234/somedb?tcp_user_timeout=5678", .expected_opts = .{ .size = 10, .auth = .{
245+
.{ .uri = "postgresql:///", .expected_opts = .{ .size = 0, .auth = .{ .username = "postgres" }, .connect = .{}, .timeout = 0 } },
246+
.{ .uri = "postgresql://user:pass@somehost:1234/somedb?tcp_user_timeout=5678", .expected_opts = .{ .size = 0, .auth = .{
247247
.username = "user",
248248
.password = "pass",
249249
.database = "somedb",
250250
.timeout = 5678,
251251
}, .connect = .{
252252
.host = "somehost",
253253
.port = 1234,
254-
}, .timeout = 5000 } },
254+
}, .timeout = 0 } },
255255
};
256256

257257
test "URI: parse valid" {
258258
const a = std.testing.allocator;
259259
for (valid_tcs) |tc| {
260-
var po = parseOpts(try std.Uri.parse(tc.uri), a, 10, 5000) catch |e| {
260+
var po = parseOpts(try std.Uri.parse(tc.uri), a) catch |e| {
261261
std.log.err("failed to parse URI {s}", .{tc.uri});
262262
return e;
263263
};
@@ -267,9 +267,9 @@ test "URI: parse valid" {
267267
}
268268

269269
test "URI: invalid scheme" {
270-
try std.testing.expectError(error.InvalidUriScheme, parseOpts(try std.Uri.parse("foobar:///"), std.testing.allocator, 0, 0));
270+
try std.testing.expectError(error.InvalidUriScheme, parseOpts(try std.Uri.parse("foobar:///"), std.testing.allocator));
271271
}
272272

273273
test "URI: invalid params" {
274-
try std.testing.expectError(error.UnsupportedConnectionParam, parseOpts(try std.Uri.parse("postgresql:///?bar=baz"), std.testing.allocator, 0, 0));
274+
try std.testing.expectError(error.UnsupportedConnectionParam, parseOpts(try std.Uri.parse("postgresql:///?bar=baz"), std.testing.allocator));
275275
}

src/pool.zig

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,15 @@ pub const Pool = struct {
2929
auth: auth.Opts = .{},
3030
connect: Conn.Opts = .{},
3131
timeout: u32 = 10 * std.time.ms_per_s,
32+
connect_on_init_count: ?u16 = null,
3233
};
3334

34-
pub fn initUri(allocator: Allocator, uri: std.Uri, size: u16, pool_timeout_ms: u32) !*Pool {
35-
var po = try lib.parseOpts(uri, allocator, size, pool_timeout_ms);
35+
36+
pub fn initUri(allocator: Allocator, uri: std.Uri, opts: Opts) !*Pool {
37+
var po = try lib.parseOpts(uri, allocator);
3638
defer po.deinit();
39+
po.size = opts.size;
40+
po.timeout = opts.timeout;
3741
return Pool.init(allocator, po.opts);
3842
}
3943

@@ -60,16 +64,17 @@ pub const Pool = struct {
6064
}
6165
}
6266
errdefer lib.freeSSLContext(ssl_ctx);
67+
const connect_on_init_count = opts.connect_on_init_count orelse size;
6368

6469
pool.* = .{
6570
._cond = .{},
6671
._mutex = .{},
6772
._conns = conns,
6873
._arena = arena,
6974
._opts = opts_copy,
70-
._available = size,
7175
._ssl_ctx = ssl_ctx,
7276
._allocator = allocator,
77+
._available = connect_on_init_count,
7378
._reconnector = Reconnector.init(pool),
7479
._timeout = @as(u64, @intCast(opts.timeout)) * std.time.ns_per_ms,
7580
};
@@ -81,11 +86,16 @@ pub const Pool = struct {
8186
}
8287
}
8388

84-
for (0..size) |i| {
89+
for (0..connect_on_init_count) |i| {
8590
conns[i] = try newConnection(pool, true);
8691
opened_connections += 1;
8792
}
8893

94+
const lazy_start_count = size - connect_on_init_count;
95+
for (0..lazy_start_count) |_| {
96+
try pool._reconnector.reconnect();
97+
}
98+
8999
return pool;
90100
}
91101

@@ -218,6 +228,7 @@ const Reconnector = struct {
218228
const retry_delay = 2 * std.time.ns_per_s;
219229

220230
self.mutex.lock();
231+
defer self.mutex.unlock();
221232
loop: while (self.count > 0) {
222233
const stopped = self.stopped;
223234
self.mutex.unlock();
@@ -235,6 +246,9 @@ const Reconnector = struct {
235246
self.mutex.lock();
236247
self.count -= 1;
237248
}
249+
250+
self.thread.?.detach();
251+
self.thread = null;
238252
}
239253

240254
fn stop(self: *Reconnector) void {
@@ -248,11 +262,11 @@ const Reconnector = struct {
248262

249263
fn reconnect(self: *Reconnector) !void {
250264
self.mutex.lock();
265+
defer self.mutex.unlock();
251266
self.count += 1;
252267
if (self.thread == null) {
253268
self.thread = try Thread.spawn(.{ .stack_size = 1024 * 1024 }, Reconnector.run, .{self});
254269
}
255-
self.mutex.unlock();
256270
}
257271
};
258272

@@ -291,6 +305,7 @@ test "Pool" {
291305
var pool = try Pool.init(t.allocator, .{
292306
.size = 2,
293307
.auth = t.authOpts(.{}),
308+
.connect_on_init_count = 1,
294309
});
295310
defer pool.deinit();
296311

0 commit comments

Comments
 (0)