Skip to content

Commit 57998e8

Browse files
committed
Added publish handler for raw messages
1 parent 1ef1fa8 commit 57998e8

File tree

3 files changed

+45
-2
lines changed

3 files changed

+45
-2
lines changed

src/Remote/AMQP/Publish.php

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Onliner\CommandBus\Remote\AMQP;
6+
7+
final class Publish
8+
{
9+
public function __construct(
10+
public string $exchange,
11+
public string $queue,
12+
public string $payload,
13+
) {}
14+
15+
/**
16+
* @param array<string, mixed> $payload
17+
*/
18+
public static function create(string $exchange, string $queue, array $payload): self
19+
{
20+
return new self($exchange, $queue, json_encode($payload, JSON_THROW_ON_ERROR));
21+
}
22+
}

src/Remote/AMQP/Transport.php

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@
44

55
namespace Onliner\CommandBus\Remote\AMQP;
66

7+
use Onliner\CommandBus\Builder;
8+
use Onliner\CommandBus\Context;
9+
use Onliner\CommandBus\Extension;
710
use Onliner\CommandBus\Remote\Envelope;
811
use Onliner\CommandBus\Remote\Transport as TransportContract;
912
use Psr\Log\LoggerInterface;
1013

11-
final class Transport implements TransportContract
14+
final class Transport implements TransportContract, Extension
1215
{
1316
public function __construct(
1417
private Connector $connector,
@@ -28,9 +31,13 @@ public static function create(string $dsn, string $exchange = '', array $routes
2831
}
2932

3033
public function send(Envelope $envelope): void
34+
{
35+
$this->publish($envelope, $this->router->match($envelope));
36+
}
37+
38+
public function publish(Envelope $envelope, Route $route): void
3139
{
3240
$message = $this->packager->pack($envelope);
33-
$route = $this->router->match($envelope);
3441

3542
$channel = $this->connector->connect();
3643
$channel->basic_publish(
@@ -57,4 +64,14 @@ public function declare(Exchange|array $exchange): void
5764

5865
$exchange->declare($this->connector->connect());
5966
}
67+
68+
public function setup(Builder $builder): void
69+
{
70+
$builder->handle(Publish::class, function (Publish $message, Context $context) {
71+
$this->publish(
72+
new Envelope(Publish::class, $message->payload, $context->all()),
73+
new Route($message->exchange, $message->queue),
74+
);
75+
});
76+
}
6077
}

src/Remote/RemoteExtension.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ public function local(string ...$local): void
3232

3333
public function setup(Builder $builder): void
3434
{
35+
if ($this->transport instanceof Extension) {
36+
$this->transport->setup($builder);
37+
}
38+
3539
$gateway = new Gateway($this->transport, $this->serializer);
3640

3741
$builder->middleware(new RemoteMiddleware($gateway, $this->local));

0 commit comments

Comments
 (0)