Skip to content

Commit 223c47c

Browse files
committed
Fix accept after cancelling prior accept
1 parent f54bb40 commit 223c47c

File tree

2 files changed

+84
-4
lines changed

2 files changed

+84
-4
lines changed

src/Ipc/SocketIpcHub.php

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ final class SocketIpcHub implements IpcHub
3434
/** @var \Closure(): void */
3535
private readonly \Closure $accept;
3636

37+
private bool $queued = false;
38+
3739
/**
3840
* @param float $keyReceiveTimeout Timeout to receive the key on accepted connections.
3941
* @param positive-int $keyLength Length of the random key exchanged on the IPC channel when connecting.
@@ -49,10 +51,28 @@ public function __construct(
4951
SocketAddressType::Internet => 'tcp://' . $address->toString(),
5052
};
5153

54+
$queued = &$this->queued;
5255
$keys = &$this->keys;
5356
$pending = &$this->pending;
54-
$this->accept = static function () use (&$keys, &$pending, $server, $keyReceiveTimeout, $keyLength): void {
55-
while ($pending && $client = $server->accept()) {
57+
$this->accept = static function () use (
58+
&$queued,
59+
&$keys,
60+
&$pending,
61+
$server,
62+
$keyReceiveTimeout,
63+
$keyLength,
64+
): void {
65+
while ($pending) {
66+
$client = $server->accept();
67+
if (!$client) {
68+
$queued = false;
69+
$exception = new Socket\SocketException('IPC socket closed before the client connected');
70+
foreach ($pending as $deferred) {
71+
$deferred->error($exception);
72+
}
73+
return;
74+
}
75+
5676
try {
5777
$received = readKey($client, new TimeoutCancellation($keyReceiveTimeout), $keyLength);
5878
} catch (\Throwable) {
@@ -77,6 +97,8 @@ public function __construct(
7797

7898
$deferred->complete($client);
7999
}
100+
101+
$queued = false;
80102
};
81103
}
82104

@@ -124,6 +146,10 @@ public function generateKey(): string
124146
*/
125147
public function accept(string $key, ?Cancellation $cancellation = null): ResourceSocket
126148
{
149+
if ($this->server->isClosed()) {
150+
throw new Socket\SocketException('The IPC server has been closed');
151+
}
152+
127153
if (\strlen($key) !== $this->keyLength) {
128154
throw new \ValueError(\sprintf(
129155
"Key provided is of length %d, expected %d",
@@ -138,12 +164,13 @@ public function accept(string $key, ?Cancellation $cancellation = null): Resourc
138164

139165
$id = $this->nextId++;
140166

141-
if (!$this->pending) {
167+
if (!$this->queued) {
142168
EventLoop::queue($this->accept);
169+
$this->queued = true;
143170
}
144171

145172
$this->keys[$key] = $id;
146-
$this->pending[$id] = $deferred = new DeferredFuture;
173+
$this->pending[$id] = $deferred = new DeferredFuture();
147174

148175
try {
149176
$client = $deferred->getFuture()->await($cancellation);

test/Ipc/SocketIpcHubTest.php

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?php declare(strict_types=1);
2+
3+
namespace Amp\Parallel\Test\Ipc;
4+
5+
use Amp\CancelledException;
6+
use Amp\DeferredCancellation;
7+
use Amp\Parallel\Ipc\SocketIpcHub;
8+
use Amp\PHPUnit\AsyncTestCase;
9+
use Amp\Socket;
10+
use Amp\Socket\ServerSocket;
11+
use Amp\TimeoutCancellation;
12+
use Revolt\EventLoop;
13+
use function Amp\async;
14+
15+
class SocketIpcHubTest extends AsyncTestCase
16+
{
17+
private ServerSocket $server;
18+
private SocketIpcHub $ipcHub;
19+
20+
public function setUp(): void
21+
{
22+
parent::setUp();
23+
24+
$this->server = Socket\listen('127.0.0.1:0');
25+
$this->ipcHub = new SocketIpcHub($this->server);
26+
}
27+
28+
public function testAcceptAfterCancel(): void
29+
{
30+
$key = $this->ipcHub->generateKey();
31+
32+
$deferredCancellation = new DeferredCancellation();
33+
EventLoop::delay(0.1, static fn () => $deferredCancellation->cancel());
34+
35+
try {
36+
$this->ipcHub->accept($key, $deferredCancellation->getCancellation());
37+
self::fail('Expecting accept to have been cancelled');
38+
} catch (CancelledException) {
39+
// Expected accept to be cancelled.
40+
}
41+
42+
$key = $this->ipcHub->generateKey();
43+
44+
async(function () use ($key): void {
45+
$client = Socket\connect($this->server->getAddress());
46+
$client->write($key);
47+
});
48+
49+
$client = $this->ipcHub->accept($key, new TimeoutCancellation(1));
50+
51+
self::assertSame($this->server->getAddress()->toString(), $client->getLocalAddress()->toString());
52+
}
53+
}

0 commit comments

Comments
 (0)