Skip to content

Commit 9403846

Browse files
committed
Fix race condition on concurrent IPC connects
We write the key to the child [1] before registering it to be received, which is inside `accept` [2]. If the child connects fast enough, this will result in a race condition. The race condition will only happen if there are multiple concurrent accepts, because we start to accept only after registering the key [3]. With concurrent pending connects, we might already be in the accept loop. Fixes #199. [1] https://github.com/amphp/parallel/blob/ffda869c33c30627b6eb5c25f096882d885681dc/src/Context/ProcessContext.php#L159 [2] https://github.com/amphp/parallel/blob/ffda869c33c30627b6eb5c25f096882d885681dc/src/Ipc/SocketIpcHub.php#L172 [3] https://github.com/amphp/parallel/blob/ffda869c33c30627b6eb5c25f096882d885681dc/src/Ipc/SocketIpcHub.php#L168
1 parent ffda869 commit 9403846

File tree

1 file changed

+40
-39
lines changed

1 file changed

+40
-39
lines changed

src/Ipc/SocketIpcHub.php

+40-39
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22

33
namespace Amp\Parallel\Ipc;
44

5+
use Amp\Cache\LocalCache;
56
use Amp\Cancellation;
6-
use Amp\DeferredFuture;
7+
use Amp\CancelledException;
78
use Amp\ForbidCloning;
89
use Amp\ForbidSerialization;
10+
use Amp\NullCancellation;
911
use Amp\Socket;
1012
use Amp\Socket\ResourceSocket;
1113
use Amp\Socket\SocketAddressType;
@@ -20,22 +22,19 @@ final class SocketIpcHub implements IpcHub
2022
public const DEFAULT_KEY_RECEIVE_TIMEOUT = 5;
2123
public const DEFAULT_KEY_LENGTH = 64;
2224

23-
private int $nextId = 0;
24-
2525
/** @var non-empty-string */
2626
private readonly string $uri;
2727

28-
/** @var array<string, int> */
29-
private array $keys = [];
30-
31-
/** @var array<int, DeferredFuture> */
32-
private array $pending = [];
28+
/** @var array<string, EventLoop\Suspension> */
29+
private array $waitingByKey = [];
3330

3431
/** @var \Closure(): void */
3532
private readonly \Closure $accept;
3633

3734
private bool $queued = false;
3835

36+
private LocalCache $clientsByKey;
37+
3938
/**
4039
* @param float $keyReceiveTimeout Timeout to receive the key on accepted connections.
4140
* @param positive-int $keyLength Length of the random key exchanged on the IPC channel when connecting.
@@ -51,24 +50,26 @@ public function __construct(
5150
SocketAddressType::Internet => 'tcp://' . $address->toString(),
5251
};
5352

53+
$this->clientsByKey = new LocalCache(1024, $keyReceiveTimeout);
54+
5455
$queued = &$this->queued;
55-
$keys = &$this->keys;
56-
$pending = &$this->pending;
56+
$waitingByKey = &$this->waitingByKey;
57+
$clientsByKey = &$this->clientsByKey;
5758
$this->accept = static function () use (
5859
&$queued,
59-
&$keys,
60-
&$pending,
60+
&$waitingByKey,
61+
&$clientsByKey,
6162
$server,
6263
$keyReceiveTimeout,
6364
$keyLength,
6465
): void {
65-
while ($pending) {
66+
while ($waitingByKey) {
6667
$client = $server->accept();
6768
if (!$client) {
6869
$queued = false;
6970
$exception = new Socket\SocketException('IPC socket closed before the client connected');
70-
foreach ($pending as $deferred) {
71-
$deferred->error($exception);
71+
foreach ($waitingByKey as $suspension) {
72+
$suspension->throw($exception);
7273
}
7374
return;
7475
}
@@ -80,22 +81,12 @@ public function __construct(
8081
continue; // Ignore possible foreign connection attempt.
8182
}
8283

83-
$id = $keys[$received] ?? null;
84-
85-
if ($id === null) {
86-
$client->close();
87-
continue; // Ignore possible foreign connection attempt.
84+
if (isset($waitingByKey[$received])) {
85+
$waitingByKey[$received]->resume($client);
86+
unset($waitingByKey[$received]);
87+
} else {
88+
$clientsByKey->set($received, $client);
8889
}
89-
90-
$deferred = $pending[$id] ?? null;
91-
unset($pending[$id], $keys[$received]);
92-
93-
if ($deferred === null) {
94-
$client->close();
95-
continue; // Client accept cancelled.
96-
}
97-
98-
$deferred->complete($client);
9990
}
10091

10192
$queued = false;
@@ -116,13 +107,13 @@ public function close(): void
116107
{
117108
$this->server->close();
118109

119-
if (!$this->pending) {
110+
if (!$this->waitingByKey) {
120111
return;
121112
}
122113

123114
$exception = new Socket\SocketException('IPC socket closed before the client connected');
124-
foreach ($this->pending as $deferred) {
125-
$deferred->error($exception);
115+
foreach ($this->waitingByKey as $suspension) {
116+
$suspension->throw($exception);
126117
}
127118
}
128119

@@ -158,24 +149,34 @@ public function accept(string $key, ?Cancellation $cancellation = null): Resourc
158149
));
159150
}
160151

161-
if (isset($this->keys[$key])) {
152+
if (isset($this->waitingByKey[$key])) {
162153
throw new \Error("An accept is already pending for the given key");
163154
}
164155

165-
$id = $this->nextId++;
156+
$client = $this->clientsByKey->get($key);
157+
if ($client) {
158+
$this->clientsByKey->delete($key);
159+
160+
return $client;
161+
}
166162

167163
if (!$this->queued) {
168164
EventLoop::queue($this->accept);
169165
$this->queued = true;
170166
}
171167

172-
$this->keys[$key] = $id;
173-
$this->pending[$id] = $deferred = new DeferredFuture();
168+
$this->waitingByKey[$key] = $suspension = EventLoop::getSuspension();
169+
170+
$cancellation = $cancellation ?? new NullCancellation();
171+
$cancellationId = $cancellation->subscribe(function (CancelledException $exception) use ($suspension) {
172+
$suspension->throw($exception);
173+
});
174174

175175
try {
176-
$client = $deferred->getFuture()->await($cancellation);
176+
$client = $suspension->suspend();
177177
} finally {
178-
unset($this->pending[$id], $this->keys[$key]);
178+
$cancellation->unsubscribe($cancellationId);
179+
unset($this->waitingByKey[$key]);
179180
}
180181

181182
return $client;

0 commit comments

Comments
 (0)