Skip to content

Commit 051211a

Browse files
committed
Fix simultaneous accept cancel and server close
1 parent efd71b3 commit 051211a

File tree

2 files changed

+34
-14
lines changed

2 files changed

+34
-14
lines changed

src/Ipc/SocketIpcHub.php

+15-14
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
use Amp\Cache\LocalCache;
66
use Amp\Cancellation;
7-
use Amp\CancelledException;
7+
use Amp\DeferredFuture;
88
use Amp\ForbidCloning;
99
use Amp\ForbidSerialization;
1010
use Amp\NullCancellation;
@@ -25,14 +25,15 @@ final class SocketIpcHub implements IpcHub
2525
/** @var non-empty-string */
2626
private readonly string $uri;
2727

28-
/** @var array<string, EventLoop\Suspension> */
28+
/** @var array<string, DeferredFuture> */
2929
private array $waitingByKey = [];
3030

3131
/** @var \Closure(): void */
3232
private readonly \Closure $accept;
3333

3434
private bool $queued = false;
3535

36+
/** @var LocalCache<ResourceSocket> */
3637
private LocalCache $clientsByKey;
3738

3839
/**
@@ -68,8 +69,8 @@ public function __construct(
6869
if (!$client) {
6970
$queued = false;
7071
$exception = new Socket\SocketException('IPC socket closed before the client connected');
71-
foreach ($waitingByKey as $suspension) {
72-
$suspension->throw($exception);
72+
foreach ($waitingByKey as $deferred) {
73+
$deferred->error($exception);
7374
}
7475
return;
7576
}
@@ -82,7 +83,7 @@ public function __construct(
8283
}
8384

8485
if (isset($waitingByKey[$received])) {
85-
$waitingByKey[$received]->resume($client);
86+
$waitingByKey[$received]->complete($client);
8687
unset($waitingByKey[$received]);
8788
} else {
8889
$clientsByKey->set($received, $client);
@@ -112,8 +113,8 @@ public function close(): void
112113
}
113114

114115
$exception = new Socket\SocketException('IPC socket closed before the client connected');
115-
foreach ($this->waitingByKey as $suspension) {
116-
$suspension->throw($exception);
116+
foreach ($this->waitingByKey as $deferred) {
117+
$deferred->error($exception);
117118
}
118119
}
119120

@@ -165,19 +166,19 @@ public function accept(string $key, ?Cancellation $cancellation = null): Resourc
165166
$this->queued = true;
166167
}
167168

168-
$cancellation = $cancellation ?? new NullCancellation();
169-
$cancellation->throwIfRequested();
169+
$cancellation ??= new NullCancellation();
170170

171-
$this->waitingByKey[$key] = $suspension = EventLoop::getSuspension();
172-
$cancellationId = $cancellation->subscribe(function (CancelledException $exception) use ($suspension) {
173-
$suspension->throw($exception);
171+
$this->waitingByKey[$key] = $deferred = new DeferredFuture();
172+
173+
$waitingByKey = &$this->waitingByKey;
174+
$cancellationId = $cancellation->subscribe(static function () use (&$waitingByKey, $key): void {
175+
unset($waitingByKey[$key]);
174176
});
175177

176178
try {
177-
$client = $suspension->suspend();
179+
$client = $deferred->getFuture()->await($cancellation);
178180
} finally {
179181
$cancellation->unsubscribe($cancellationId);
180-
unset($this->waitingByKey[$key]);
181182
}
182183

183184
return $client;

test/Ipc/SocketIpcHubTest.php

+19
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,23 @@ public function testAcceptAfterCancel(): void
5050

5151
self::assertSame($this->server->getAddress()->toString(), $client->getLocalAddress()->toString());
5252
}
53+
54+
public function testCancelledAcceptAndCloseServer(): void
55+
{
56+
$deferredCancellation = new DeferredCancellation();
57+
$future = async(fn () => $this->ipcHub->accept(
58+
$this->ipcHub->generateKey(),
59+
$deferredCancellation->getCancellation(),
60+
));
61+
62+
async(function () use ($deferredCancellation): void {
63+
$this->server->close();
64+
$deferredCancellation->cancel();
65+
});
66+
67+
$this->expectException(Socket\SocketException::class);
68+
$this->expectExceptionMessage('closed before');
69+
70+
$future->await();
71+
}
5372
}

0 commit comments

Comments
 (0)