Skip to content

Commit 30d709d

Browse files
committed
feat: swoole support
1 parent a82bc9d commit 30d709d

File tree

4 files changed

+412
-6
lines changed

4 files changed

+412
-6
lines changed
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
<?php
2+
namespace PhpAmqpLib\Connection;
3+
4+
use PhpAmqpLib\Wire\IO\SwooleIO;
5+
6+
class AMQPSwooleConnection extends AbstractConnection
7+
{
8+
/**
9+
* @param string $host
10+
* @param int $port
11+
* @param string $user
12+
* @param string $password
13+
* @param string $vhost
14+
* @param bool $insist
15+
* @param string $login_method
16+
* @param string $locale
17+
* @param float $connection_timeout
18+
* @param float $read_write_timeout
19+
* @param resource|array|null $context
20+
* @param bool $keepalive
21+
* @param int $heartbeat
22+
* @param float $channel_rpc_timeout
23+
* @param AMQPConnectionConfig|null $config
24+
* @throws \Exception
25+
*/
26+
public function __construct(
27+
$host,
28+
$port,
29+
$user,
30+
$password,
31+
$vhost = '/',
32+
$insist = false,
33+
$login_method = 'AMQPLAIN',
34+
$locale = 'en_US',
35+
$connection_timeout = 3.0,
36+
$read_write_timeout = 3.0,
37+
$context = null,
38+
$keepalive = false,
39+
$heartbeat = 0,
40+
$channel_rpc_timeout = 0.0,
41+
?AMQPConnectionConfig $config = null
42+
) {
43+
if ($channel_rpc_timeout > $read_write_timeout) {
44+
throw new \InvalidArgumentException('channel RPC timeout must not be greater than I/O read-write timeout');
45+
}
46+
47+
$io = new SwooleIO(
48+
$host,
49+
$port,
50+
$connection_timeout,
51+
$read_write_timeout,
52+
$context,
53+
$keepalive,
54+
$heartbeat
55+
);
56+
57+
parent::__construct(
58+
$user,
59+
$password,
60+
$vhost,
61+
$insist,
62+
$login_method,
63+
null, // login_response - deprecated but still needed for parent constructor
64+
$locale,
65+
$io,
66+
$heartbeat,
67+
$connection_timeout,
68+
$channel_rpc_timeout,
69+
$config
70+
);
71+
72+
// save the params for the use of __clone, this will overwrite the parent
73+
$this->construct_params = func_get_args();
74+
}
75+
}

PhpAmqpLib/Wire/IO/SwooleIO.php

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
<?php
2+
3+
namespace PhpAmqpLib\Wire\IO;
4+
5+
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
6+
use PhpAmqpLib\Exception\AMQPIOException;
7+
use PhpAmqpLib\Exception\AMQPRuntimeException;
8+
use PhpAmqpLib\Exception\AMQPTimeoutException;
9+
use Swoole\Coroutine\Client;
10+
11+
class SwooleIO extends AbstractIO
12+
{
13+
/** @var Client|null */
14+
private $sock;
15+
16+
/** @var string */
17+
private $buffer = '';
18+
19+
/**
20+
* @param string $host
21+
* @param int $port
22+
* @param float $connection_timeout
23+
* @param float $read_write_timeout
24+
* @param null $context
25+
* @param bool $keepalive
26+
* @param int $heartbeat
27+
*/
28+
public function __construct(
29+
$host,
30+
$port,
31+
$connection_timeout,
32+
$read_write_timeout,
33+
$context = null,
34+
$keepalive = false,
35+
$heartbeat = 0
36+
) {
37+
if ($heartbeat !== 0 && ($read_write_timeout < ($heartbeat * 2))) {
38+
throw new \InvalidArgumentException('read_write_timeout must be at least 2x the heartbeat');
39+
}
40+
$this->host = $host;
41+
$this->port = $port;
42+
$this->connection_timeout = $connection_timeout;
43+
$this->read_timeout = $read_write_timeout;
44+
$this->write_timeout = $read_write_timeout;
45+
$this->keepalive = $keepalive;
46+
$this->heartbeat = $heartbeat;
47+
$this->initial_heartbeat = $heartbeat;
48+
$this->canDispatchPcntlSignal = false; // Swoole handles signals differently
49+
}
50+
51+
/**
52+
* Sets up the stream connection
53+
*
54+
* @throws AMQPRuntimeException
55+
* @throws AMQPIOException
56+
*/
57+
public function connect()
58+
{
59+
$this->sock = new Client(SWOOLE_SOCK_TCP);
60+
61+
// Set socket options before connecting
62+
$this->sock->set([
63+
'timeout' => $this->connection_timeout,
64+
'connect_timeout' => $this->connection_timeout,
65+
'write_timeout' => $this->write_timeout,
66+
'read_timeout' => $this->read_timeout,
67+
'open_tcp_nodelay' => true,
68+
'tcp_keepalive' => $this->keepalive,
69+
'package_max_length' => 2 * 1024 * 1024, // 2MB max package
70+
]);
71+
72+
if (!$this->sock->connect($this->host, $this->port)) {
73+
throw new AMQPIOException(
74+
sprintf(
75+
'Error Connecting to server(%s): %s ',
76+
$this->sock->errCode,
77+
swoole_strerror($this->sock->errCode)
78+
),
79+
$this->sock->errCode
80+
);
81+
}
82+
}
83+
84+
/**
85+
* @param int $len
86+
* @return string
87+
* @throws AMQPIOException
88+
* @throws AMQPRuntimeException
89+
* @throws AMQPTimeoutException
90+
* @throws AMQPConnectionClosedException
91+
*/
92+
public function read($len)
93+
{
94+
if ($this->sock === null) {
95+
throw new AMQPConnectionClosedException('Socket connection is closed');
96+
}
97+
98+
$this->check_heartbeat();
99+
100+
$data = '';
101+
$remaining = $len;
102+
103+
// First, consume from buffer
104+
if ($this->buffer !== '') {
105+
$chunk_size = min($remaining, strlen($this->buffer));
106+
$data = substr($this->buffer, 0, $chunk_size);
107+
$this->buffer = substr($this->buffer, $chunk_size);
108+
$remaining -= $chunk_size;
109+
}
110+
111+
// Read remaining bytes from socket
112+
while ($remaining > 0) {
113+
if (!$this->sock->connected) {
114+
throw new AMQPConnectionClosedException('Broken pipe or closed connection');
115+
}
116+
117+
// Swoole recv() returns false on error, empty string on EOF
118+
$chunk = $this->sock->recv($remaining, $this->read_timeout);
119+
120+
if ($chunk === false) {
121+
if ($this->sock->errCode == SOCKET_ETIMEDOUT) {
122+
throw new AMQPTimeoutException('Read timeout');
123+
}
124+
throw new AMQPIOException(
125+
sprintf('Error receiving data: %s', swoole_strerror($this->sock->errCode)),
126+
$this->sock->errCode
127+
);
128+
}
129+
130+
if ($chunk === '') {
131+
throw new AMQPConnectionClosedException('Connection closed by peer');
132+
}
133+
134+
$data .= $chunk;
135+
$remaining -= strlen($chunk);
136+
}
137+
138+
$this->last_read = microtime(true);
139+
return $data;
140+
}
141+
142+
/**
143+
* @param string $data
144+
* @throws AMQPRuntimeException
145+
* @throws AMQPTimeoutException
146+
* @throws AMQPConnectionClosedException
147+
*/
148+
public function write($data)
149+
{
150+
if ($this->sock === null || !$this->sock->connected) {
151+
throw new AMQPConnectionClosedException('Socket connection is closed');
152+
}
153+
154+
$this->checkBrokerHeartbeat();
155+
156+
// Swoole send() handles partial writes internally
157+
$result = $this->sock->send($data);
158+
159+
if ($result === false) {
160+
if ($this->sock->errCode == SOCKET_ETIMEDOUT) {
161+
throw new AMQPTimeoutException('Write timeout');
162+
}
163+
throw new AMQPIOException(
164+
sprintf('Error sending data: %s', swoole_strerror($this->sock->errCode)),
165+
$this->sock->errCode
166+
);
167+
}
168+
169+
if ($result !== strlen($data)) {
170+
throw new AMQPIOException('Could not write entire buffer');
171+
}
172+
173+
$this->last_write = microtime(true);
174+
}
175+
176+
/**
177+
* @return void
178+
*/
179+
public function close()
180+
{
181+
if ($this->sock !== null && $this->sock->connected) {
182+
$this->sock->close();
183+
}
184+
$this->sock = null;
185+
$this->last_read = null;
186+
$this->last_write = null;
187+
$this->buffer = '';
188+
}
189+
190+
/**
191+
* @param int|null $sec
192+
* @param int $usec
193+
* @return int|bool
194+
* @throws AMQPConnectionClosedException
195+
*/
196+
protected function do_select(?int $sec, int $usec)
197+
{
198+
if ($this->sock === null || !$this->sock->connected) {
199+
throw new AMQPConnectionClosedException('Socket connection is closed');
200+
}
201+
202+
// If we have buffered data, return immediately
203+
if (strlen($this->buffer) > 0) {
204+
return 1;
205+
}
206+
207+
// Convert timeout to seconds for Swoole (supports fractional seconds)
208+
$timeout = $sec === null ? -1 : ($sec + $usec / 1000000);
209+
210+
// Swoole doesn't have a true select() equivalent for coroutines.
211+
// We must use recv() to wait for data, then buffer it.
212+
// This blocks the coroutine (not the process) until data arrives or timeout.
213+
$data = $this->sock->recv($timeout);
214+
215+
if ($data === false) {
216+
if ($this->sock->errCode == SOCKET_ETIMEDOUT) {
217+
return 0; // Timeout - no data available
218+
}
219+
// Connection error
220+
if ($this->sock->errCode == SOCKET_ECONNRESET || !$this->sock->connected) {
221+
throw new AMQPConnectionClosedException('Connection reset by peer');
222+
}
223+
return false; // Other error
224+
}
225+
226+
if ($data === '') {
227+
throw new AMQPConnectionClosedException('Connection closed by peer');
228+
}
229+
230+
// Buffer the received data for subsequent read() calls
231+
$this->buffer .= $data;
232+
return 1; // Data is now available
233+
}
234+
235+
/**
236+
* @return Client|null
237+
*/
238+
public function getSocket()
239+
{
240+
return $this->sock;
241+
}
242+
}

composer.json

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
{
2-
"name": "php-amqplib/php-amqplib",
2+
"name": "appwrite-labs/php-amqplib",
33
"replace": {
4-
"videlalvaro/php-amqplib": "self.version"
4+
"videlalvaro/php-amqplib": "self.version",
5+
"php-amqplib/php-amqplib": "self.version"
56
},
67
"type": "library",
7-
"description": "Formerly videlalvaro/php-amqplib. This library is a pure PHP implementation of the AMQP protocol. It's been tested against RabbitMQ.",
8-
"keywords": ["rabbitmq", "message", "queue"],
9-
"homepage": "https://github.com/php-amqplib/php-amqplib/",
8+
"description": "Fork of php-amqplib with Swoole coroutine support. A pure PHP implementation of the AMQP protocol tested against RabbitMQ.",
9+
"keywords": ["rabbitmq", "message", "queue", "swoole", "coroutine", "async"],
10+
"homepage": "https://github.com/appwrite-labs/php-amqplib/",
1011
"authors": [
12+
{
13+
"name": "Appwrite Labs",
14+
"email": "[email protected]",
15+
"role": "Fork Maintainer"
16+
},
1117
{
1218
"name": "Alvaro Videla",
1319
"role": "Original Maintainer"
@@ -38,7 +44,11 @@
3844
"ext-curl": "*",
3945
"phpunit/phpunit": "^7.5|^9.5",
4046
"squizlabs/php_codesniffer": "^3.6",
41-
"nategood/httpful": "^0.2.20"
47+
"nategood/httpful": "^0.2.20",
48+
"swoole/ide-helper": "^5.0"
49+
},
50+
"suggest": {
51+
"ext-swoole": "For Swoole coroutine support"
4252
},
4353
"conflict": {
4454
"php": "7.4.0 - 7.4.1"

0 commit comments

Comments
 (0)