Skip to content

Commit cbed881

Browse files
author
Donal Byrne
committed
Fixed queue group subscribe
1 parent aff2c43 commit cbed881

File tree

2 files changed

+45
-11
lines changed

2 files changed

+45
-11
lines changed

src/NatsStreaming/Subscription.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public function __construct($subject, $qGroup, $inbox, $opts, $msgCb, $stanCon)
8484

8585
$subRequest = new SubscriptionRequest();
8686
$subRequest->setSubject($this->getSubject());
87+
$subRequest->setQGroup($this->qGroup);
8788
$subRequest->setClientID($this->stanCon->options->getClientID());
8889
$subRequest->setAckWaitInSecs($this->opts->getAckWaitSecs());
8990
$subRequest->setMaxInFlight($this->opts->getMaxInFlight());

tests/Unit/ConnectionTest.php

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -445,39 +445,72 @@ public function testMultipleDurableSubscription()
445445
public function testQueueGroupSubscribe()
446446
{
447447

448-
$this->c->reconnect();
449-
450448
$subject = 'test.subscribe.qgroup.' . uniqid();
449+
$group = 'testQueueGroup';
450+
$toSend = 100;
451+
452+
$c1 = $this->c;
453+
454+
$options = new ConnectionOptions();
455+
$options->setClientID("test-2");
456+
$options->setClusterID("test-cluster");
457+
$c2 = new Connection($options);
458+
459+
$c1->reconnect();
460+
$c2->connect();
451461

452462
$subOptions = new \NatsStreaming\SubscriptionOptions();
453463

454-
$toSend = 100;
455464

456-
$got = 0;
457-
$sub = $this->c->queueSubscribe($subject, 'testQueueGroup', function ($message) use (&$got) {
465+
$got1 = 0;
466+
$allSeqs = [];
467+
$sub1 = $c1->queueSubscribe($subject, $group, function ($message) use (&$got1, &$allSeqs) {
458468
/**
459469
* @var $message MsgProto
460470
*/
461-
$this->assertEquals($got + 1, $message->getSequence());
462-
$got ++;
471+
$allSeqs["client1"][] = $message->getSequence();
472+
$got1 ++;
473+
}, $subOptions);
474+
475+
$got2 = 0;
476+
$sub2 = $c2->queueSubscribe($subject, $group, function ($message) use (&$got2, &$allSeqs) {
477+
/**
478+
* @var $message MsgProto
479+
*/
480+
$allSeqs["client2"][] = $message->getSequence();
481+
$got2 ++;
463482
}, $subOptions);
464483

465484
$rs = [];
466485
for ($i = 0; $i < $toSend; $i++) {
467-
$rs[] = $this->c->publish($subject, 'foobar' . $i);
486+
$rs[] = $c1->publish($subject, 'foobar' . $i);
468487
}
469488

470489
foreach ($rs as $r) {
471490
$gotAck = $r->wait();
472491
$this->assertTrue($gotAck);
473492
}
474493

475-
$sub->wait($toSend);
494+
$c1->natsCon()->setStreamTimeout(1);
495+
$c2->natsCon()->setStreamTimeout(1);
496+
$c1->wait();
497+
$c2->wait();
476498

477499

478-
$this->assertEquals($toSend, $got);
500+
$client1Seqs = $allSeqs["client1"];
501+
$client2Seqs = $allSeqs["client2"];
479502

480-
$this->c->close();
503+
$totSeqs = array_merge($client1Seqs, $client2Seqs);
504+
505+
$totSeqsUnique = array_unique($totSeqs);
506+
507+
508+
$this->assertEquals($toSend, $got1 + $got2);
509+
510+
$this->assertEquals($toSend, count($totSeqsUnique));
511+
512+
$c1->close();
513+
$c2->close();
481514
}
482515

483516
/**

0 commit comments

Comments
 (0)