Skip to content

Commit

Permalink
rewrite + handle sending message on custom queue
Browse files Browse the repository at this point in the history
  • Loading branch information
achretien committed Sep 3, 2024
1 parent 44a82d2 commit b6500a9
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 303 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ return [
'serviceBusNamespace' => 'your service bus namespace',
'sharedAccessKey' => 'your shared access key to access the service bus queue',
'sharedAccessKeyName' => 'your shared access key name',
'queue' => 'the name of your Aruez Service Bus queue',
'queue' => 'the name of your Azure Service Bus default queue',
],
]
];
Expand Down
10 changes: 10 additions & 0 deletions src/AzureJobInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

namespace sagacorp\queue\azure;

interface AzureJobInterface
{
// region Getters/Setters
public function getQueue(): ?string;
// endregion Getters/Setters
}
12 changes: 6 additions & 6 deletions src/Command.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@
*/
class Command extends CliCommand
{
//region Controllers Actions
// region Controllers Actions
/**
*/
public function actionListen(): void
public function actionListen(?string $queue = null): void
{
$this->queue->run(true);
$this->queue->run(true, queue: $queue);
}
//endregion Controllers Actions
// endregion Controllers Actions

//region Protected Methods
// region Protected Methods
/**
* @inheritdoc
*/
protected function isWorkerAction($actionID): bool
{
return $actionID === 'listen';
}
//endregion Protected Methods
// endregion Protected Methods
}
95 changes: 54 additions & 41 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
use sagacorp\queue\azure\service\BrokerProperties;
use sagacorp\queue\azure\service\Message;
use sagacorp\queue\azure\service\ServiceBus;
use yii\base\InvalidConfigException;
use yii\base\NotSupportedException;
use yii\di\Instance;
use yii\httpclient\Exception;
use yii\queue\PushEvent;

/**
* Azure bus Queue.
Expand All @@ -19,16 +22,16 @@ class Queue extends \yii\queue\cli\Queue
* You can use this property when you need to run multiple environments with the same queue at the same time, multiple locals envionnements for example.
*
* @see BrokerProperties::$to
*
* @var string|null
*/
public ?string $id = null;
/**
* @var ServiceBus
*/
public $serviceBus = 'serviceBus';
public ServiceBus|string $serviceBus = 'serviceBus';

// endregion Public Properties

// region Private Properties
private ?string $customQueue = null;
// endregion Private Properties

// region Initialization
/**
* @throws \yii\base\InvalidConfigException
Expand All @@ -40,6 +43,9 @@ public function init(): void
$this->commandClass = Command::class;

$this->serviceBus = Instance::ensure($this->serviceBus, ServiceBus::class);

$this->on(self::EVENT_BEFORE_PUSH, fn (PushEvent $event) => $this->handlePushEvent($event));
$this->on(self::EVENT_AFTER_PUSH, fn (PushEvent $event) => $this->handlePushEvent($event, true));
}
// endregion Initialization

Expand All @@ -54,26 +60,9 @@ public function init(): void
* @internal for worker command only.
* @since 2.0.2
*/
public function run(bool $repeat, int $timeout = 30): ?int
public function run(bool $repeat, int $timeout = 30, ?string $queue = null): ?int
{
return $this->runWorker(
function (callable $canContinue) use ($repeat, $timeout) {
while ($canContinue()) {
$message = $this->serviceBus->receiveMessage(ServiceBus::PEEK_LOCK, $timeout);

if ($message !== null && $message->brokerProperties !== null) {
if ($message->brokerProperties->to && !$message->brokerProperties->isTo($this->id)) {
continue;
}
if ($this->handleMessage($message->brokerProperties->messageId, $message->body, $message->brokerProperties->timeToLive, $message->brokerProperties->deliveryCount)) {
$this->serviceBus->deleteMessage($message);
}
} elseif (!$repeat) {
break;
}
}
}
);
return $this->runWorker(fn (callable $canContinue) => $this->processWorker($canContinue, $repeat, $timeout, $queue));
}

/**
Expand All @@ -88,7 +77,39 @@ public function status($id): void
}
// endregion Public Methods

//region Protected Methods
// region Protected Methods
protected function handlePushEvent(PushEvent $event, bool $resetCustomQueue = false): void
{
if (!$event->job instanceof AzureJobInterface) {
return;
}

$this->customQueue = $resetCustomQueue ? null : $event->job->getQueue();
}

/**
* @throws Exception
* @throws InvalidConfigException
* @throws \JsonException
*/
protected function processWorker(callable $canContinue, bool $repeat, int $timeout = 30, ?string $queue = null): void
{
while ($canContinue()) {
$message = $this->serviceBus->receiveMessage(ServiceBus::PEEK_LOCK, $timeout, $queue);

if ($message !== null && $message->brokerProperties !== null) {
if ($message->brokerProperties->to && !$message->brokerProperties->isTo($this->id)) {
continue;
}
if ($this->handleMessage($message->brokerProperties->messageId, $message->body, $message->brokerProperties->timeToLive, $message->brokerProperties->deliveryCount)) {
$this->serviceBus->deleteMessage($message);
}
} elseif (!$repeat) {
break;
}
}
}

/**
* @param $message
* @param int $ttr time to reserve in seconds
Expand All @@ -101,21 +122,13 @@ public function status($id): void
*/
protected function pushMessage($message, $ttr, $delay, $priority): string
{
$azureMessage = new Message(
[
'body' => $message,
'contentType' => 'application/vnd.microsoft.servicebus.yml',
'brokerProperties' => new BrokerProperties(
[
'timeToLive' => $ttr,
'delay' => $delay,
'to' => $this->id,
]
),
]
);

$this->serviceBus->sendMessage($azureMessage);
$brokerProperties = new BrokerProperties(timeToLive: $ttr, to: $this->id);

$brokerProperties->setDelay($delay);

$azureMessage = new Message($message, brokerProperties: $brokerProperties);

$this->serviceBus->sendMessage($azureMessage, $this->customQueue);

return $azureMessage->brokerProperties->messageId;
}
Expand Down
Loading

0 comments on commit b6500a9

Please sign in to comment.