Skip to content

Commit e048c8e

Browse files
authored
Add shutdownAsync (#109)
* Add shutdownAsync and tests * Better async tests * Actually throw error on double shutdown * Fix the tests
1 parent 15b3fb7 commit e048c8e

File tree

4 files changed

+286
-4
lines changed

4 files changed

+286
-4
lines changed

Sources/AsyncKit/ConnectionPool/EventLoopGroupConnectionPool.swift

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ public final class EventLoopGroupConnectionPool<Source> where Source: Connection
166166
///
167167
/// - Warning: This method is soft-deprecated. Use `syncShutdownGracefully()` or
168168
/// `shutdownGracefully()` instead.
169+
@available(*, noasync, message: "This calls wait() and should not be used in an async context", renamed: "shutdownAsync()")
169170
public func shutdown() {
170171
// synchronize access to closing
171172
guard self.lock.withLock({
@@ -190,6 +191,44 @@ public final class EventLoopGroupConnectionPool<Source> where Source: Connection
190191
}
191192
}
192193

194+
/// Closes the connection pool.
195+
///
196+
/// All available connections will be closed immediately. Any connections still in use will be
197+
/// closed as soon as they are returned to the pool. Once closed, the pool can not be used to
198+
/// create new connections.
199+
///
200+
/// Connection pools must be closed before they deinitialize.
201+
///
202+
/// This method shuts down asynchronously, waiting for all connection closures to complete before
203+
/// returning.
204+
///
205+
/// - Warning: The pool is always fully shut down once this method returns, even if an error is
206+
/// thrown. All errors are purely advisory.
207+
public func shutdownAsync() async throws {
208+
// synchronize access to closing
209+
guard self.lock.withLock({
210+
// check to make sure we aren't double closing
211+
guard !self.didShutdown else {
212+
return false
213+
}
214+
self.didShutdown = true
215+
self.logger.debug("Connection pool shutting down, closing each event loop's storage")
216+
return true
217+
}) else {
218+
self.logger.debug("Cannot shutdown the connection pool more than once")
219+
throw ConnectionPoolError.shutdown
220+
}
221+
222+
// shutdown all pools
223+
for pool in self.storage.values {
224+
do {
225+
try await pool.close().get()
226+
} catch {
227+
self.logger.error("Failed shutting down event loop pool: \(error)")
228+
}
229+
}
230+
}
231+
193232
/// Closes the connection pool.
194233
///
195234
/// All available connections will be closed immediately. Any connections still in use will be
@@ -203,6 +242,7 @@ public final class EventLoopGroupConnectionPool<Source> where Source: Connection
203242
///
204243
/// - Warning: The pool is always fully shut down once this method returns, even if an error is
205244
/// thrown. All errors are purely advisory.
245+
@available(*, noasync, message: "This calls wait() and should not be used in an async context", renamed: "shutdownAsync()")
206246
public func syncShutdownGracefully() throws {
207247
// - TODO: Does this need to assert "not on any EventLoop", as `EventLoopGroup.syncShutdownGracefully()` does?
208248
var possibleError: Error? = nil
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
import Atomics
2+
import AsyncKit
3+
import XCTest
4+
import NIOConcurrencyHelpers
5+
import Logging
6+
import NIOCore
7+
import NIOEmbedded
8+
9+
final class AsyncConnectionPoolTests: AsyncKitAsyncTestCase {
10+
func testPooling() async throws {
11+
let foo = FooDatabase()
12+
let pool = EventLoopConnectionPool(
13+
source: foo,
14+
maxConnections: 2,
15+
on: self.group.any()
16+
)
17+
18+
// make two connections
19+
let connA = try await pool.requestConnection().get()
20+
XCTAssertEqual(connA.isClosed, false)
21+
let connB = try await pool.requestConnection().get()
22+
XCTAssertEqual(connB.isClosed, false)
23+
XCTAssertEqual(foo.connectionsCreated.load(ordering: .relaxed), 2)
24+
25+
// try to make a third, but pool only supports 2
26+
let futureC = pool.requestConnection()
27+
let connC = ManagedAtomic<FooConnection?>(nil)
28+
futureC.whenSuccess { connC.store($0, ordering: .relaxed) }
29+
XCTAssertNil(connC.load(ordering: .relaxed))
30+
XCTAssertEqual(foo.connectionsCreated.load(ordering: .relaxed), 2)
31+
32+
// release one of the connections, allowing the third to be made
33+
pool.releaseConnection(connB)
34+
let connCRet = try await futureC.get()
35+
XCTAssertNotNil(connC.load(ordering: .relaxed))
36+
XCTAssert(connC.load(ordering: .relaxed) === connB)
37+
XCTAssert(connCRet === connC.load(ordering: .relaxed))
38+
XCTAssertEqual(foo.connectionsCreated.load(ordering: .relaxed), 2)
39+
40+
// try to make a third again, with two active
41+
let futureD = pool.requestConnection()
42+
let connD = ManagedAtomic<FooConnection?>(nil)
43+
futureD.whenSuccess { connD.store($0, ordering: .relaxed) }
44+
XCTAssertNil(connD.load(ordering: .relaxed))
45+
XCTAssertEqual(foo.connectionsCreated.load(ordering: .relaxed), 2)
46+
47+
// this time, close the connection before releasing it
48+
try await connCRet.close().get()
49+
pool.releaseConnection(connC.load(ordering: .relaxed)!)
50+
let connDRet = try await futureD.get()
51+
XCTAssert(connD.load(ordering: .relaxed) !== connB)
52+
XCTAssert(connDRet === connD.load(ordering: .relaxed))
53+
XCTAssertEqual(connD.load(ordering: .relaxed)?.isClosed, false)
54+
XCTAssertEqual(foo.connectionsCreated.load(ordering: .relaxed), 3)
55+
56+
try! await pool.close().get()
57+
}
58+
59+
func testFIFOWaiters() async throws {
60+
let foo = FooDatabase()
61+
let pool = EventLoopConnectionPool(
62+
source: foo,
63+
maxConnections: 1,
64+
on: self.group.any()
65+
)
66+
67+
// * User A makes a request for a connection, gets connection number 1.
68+
let a_1 = pool.requestConnection()
69+
let a = try await a_1.get()
70+
71+
// * User B makes a request for a connection, they are exhausted so he gets a promise.
72+
let b_1 = pool.requestConnection()
73+
74+
// * User A makes another request for a connection, they are still exhausted so he gets a promise.
75+
let a_2 = pool.requestConnection()
76+
77+
// * User A returns connection number 1. His previous request is fulfilled with connection number 1.
78+
pool.releaseConnection(a)
79+
80+
// * User B gets his connection
81+
let b = try await b_1.get()
82+
XCTAssert(a === b)
83+
84+
// * User B releases his connection
85+
pool.releaseConnection(b)
86+
87+
// * User A's second connection request is fulfilled
88+
let c = try await a_2.get()
89+
XCTAssert(a === c)
90+
91+
try! await pool.close().get()
92+
}
93+
94+
func testConnectError() async throws {
95+
let db = ErrorDatabase()
96+
let pool = EventLoopConnectionPool(
97+
source: db,
98+
maxConnections: 1,
99+
on: self.group.any()
100+
)
101+
102+
do {
103+
_ = try await pool.requestConnection().get()
104+
XCTFail("should not have created connection")
105+
} catch _ as ErrorDatabase.Error {
106+
// pass
107+
}
108+
109+
// test that we can still make another request even after a failed request
110+
do {
111+
_ = try await pool.requestConnection().get()
112+
XCTFail("should not have created connection")
113+
} catch _ as ErrorDatabase.Error {
114+
// pass
115+
}
116+
117+
try! await pool.close().get()
118+
}
119+
120+
func testPoolClose() async throws {
121+
let foo = FooDatabase()
122+
let pool = EventLoopConnectionPool(
123+
source: foo,
124+
maxConnections: 1,
125+
on: self.group.any()
126+
)
127+
let _ = try await pool.requestConnection().get()
128+
let b = pool.requestConnection()
129+
try await pool.close().get()
130+
131+
let c = pool.requestConnection()
132+
133+
// check that waiters are failed
134+
do {
135+
_ = try await b.get()
136+
XCTFail("should not have created connection")
137+
} catch ConnectionPoolError.shutdown {
138+
// pass
139+
}
140+
141+
// check that new requests fail
142+
do {
143+
_ = try await c.get()
144+
XCTFail("should not have created connection")
145+
} catch ConnectionPoolError.shutdown {
146+
// pass
147+
}
148+
}
149+
150+
func testGracefulShutdownAsync() async throws {
151+
let foo = FooDatabase()
152+
let pool = EventLoopGroupConnectionPool(
153+
source: foo,
154+
maxConnectionsPerEventLoop: 2,
155+
on: self.group
156+
)
157+
158+
try await pool.shutdownAsync()
159+
var errorCaught = false
160+
161+
do {
162+
try await pool.shutdownAsync()
163+
} catch {
164+
errorCaught = true
165+
XCTAssertEqual(error as? ConnectionPoolError, ConnectionPoolError.shutdown)
166+
}
167+
XCTAssertTrue(errorCaught)
168+
}
169+
170+
func testShutdownWithHeldConnection() async throws {
171+
let foo = FooDatabase()
172+
let pool = EventLoopGroupConnectionPool(
173+
source: foo,
174+
maxConnectionsPerEventLoop: 2,
175+
on: self.group
176+
)
177+
178+
let connection = try await pool.requestConnection().get()
179+
180+
try await pool.shutdownAsync()
181+
var errorCaught = false
182+
183+
do {
184+
try await pool.shutdownAsync()
185+
} catch {
186+
errorCaught = true
187+
XCTAssertEqual(error as? ConnectionPoolError, ConnectionPoolError.shutdown)
188+
}
189+
XCTAssertTrue(errorCaught)
190+
191+
let result1 = try await connection.eventLoop.submit { connection.isClosed }.get()
192+
XCTAssertFalse(result1)
193+
pool.releaseConnection(connection)
194+
let result2 = try await connection.eventLoop.submit { connection.isClosed }.get()
195+
XCTAssertTrue(result2)
196+
}
197+
198+
func testEventLoopDelegation() async throws {
199+
let foo = FooDatabase()
200+
let pool = EventLoopGroupConnectionPool(
201+
source: foo,
202+
maxConnectionsPerEventLoop: 1,
203+
on: self.group
204+
)
205+
206+
for _ in 0..<500 {
207+
let eventLoop = self.group.any()
208+
let a = pool.requestConnection(
209+
on: eventLoop
210+
).map { conn in
211+
XCTAssertTrue(eventLoop.inEventLoop)
212+
pool.releaseConnection(conn)
213+
}
214+
let b = pool.requestConnection(
215+
on: eventLoop
216+
).map { conn in
217+
XCTAssertTrue(eventLoop.inEventLoop)
218+
pool.releaseConnection(conn)
219+
}
220+
_ = try await a.and(b).get()
221+
}
222+
223+
try await pool.shutdownAsync()
224+
}
225+
}

Tests/AsyncKitTests/AsyncKitTestsCommon.swift

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,20 @@ class AsyncKitTestCase: XCTestCase {
4343
XCTAssertTrue(isLoggingConfigured)
4444
}
4545
}
46+
47+
class AsyncKitAsyncTestCase: XCTestCase {
48+
var group: (any EventLoopGroup)!
49+
var eventLoop: any EventLoop { self.group.any() }
50+
51+
override func setUp() async throws {
52+
try await super.setUp()
53+
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
54+
XCTAssertTrue(isLoggingConfigured)
55+
}
56+
57+
override func tearDown() async throws {
58+
try await self.group.shutdownGracefully()
59+
self.group = nil
60+
try await super.tearDown()
61+
}
62+
}

Tests/AsyncKitTests/ConnectionPoolTests.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ final class ConnectionPoolTests: AsyncKitTestCase {
264264
XCTAssertEqual($0 as? ConnectionPoolError, ConnectionPoolError.shutdown)
265265
}
266266
}
267-
267+
268268
func testGracefulShutdownWithHeldConnection() throws {
269269
let foo = FooDatabase()
270270
let pool = EventLoopGroupConnectionPool(
@@ -312,7 +312,7 @@ final class ConnectionPoolTests: AsyncKitTestCase {
312312
}
313313
}
314314

315-
private struct ErrorDatabase: ConnectionPoolSource {
315+
struct ErrorDatabase: ConnectionPoolSource {
316316
enum Error: Swift.Error {
317317
case test
318318
}
@@ -322,7 +322,7 @@ private struct ErrorDatabase: ConnectionPoolSource {
322322
}
323323
}
324324

325-
private final class FooDatabase: ConnectionPoolSource {
325+
final class FooDatabase: ConnectionPoolSource {
326326
var connectionsCreated: ManagedAtomic<Int>
327327

328328
init() {
@@ -336,7 +336,7 @@ private final class FooDatabase: ConnectionPoolSource {
336336
}
337337
}
338338

339-
private final class FooConnection: ConnectionPoolItem, AtomicReference {
339+
final class FooConnection: ConnectionPoolItem, AtomicReference {
340340
var isClosed: Bool
341341
let eventLoop: EventLoop
342342

0 commit comments

Comments
 (0)