Skip to content

Commit

Permalink
Implement inclusive takeWhile
Browse files Browse the repository at this point in the history
  • Loading branch information
mbonneau committed Sep 22, 2024
1 parent e910b35 commit 7d326cd
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 4 deletions.
6 changes: 3 additions & 3 deletions src/Observable.php
Original file line number Diff line number Diff line change
Expand Up @@ -675,10 +675,10 @@ public function takeUntil(ObservableInterface $other): Observable
* @operator
* @reactivex takeWhile
*/
public function takeWhile(callable $predicate): Observable
public function takeWhile(callable $predicate, bool $inclusive = false): Observable
{
return $this->lift(function () use ($predicate) {
return new TakeWhileOperator($predicate);
return $this->lift(function () use ($predicate, $inclusive) {
return new TakeWhileOperator($predicate, $inclusive);
});
}

Expand Down
7 changes: 6 additions & 1 deletion src/Operator/TakeWhileOperator.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
final class TakeWhileOperator implements OperatorInterface
{
private $predicate;
private $inclusive;

public function __construct(callable $predicate)
public function __construct(callable $predicate, bool $inclusive = false)
{
$this->predicate = $predicate;
$this->inclusive = $inclusive;
}

public function __invoke(ObservableInterface $observable, ObserverInterface $observer): DisposableInterface
Expand All @@ -25,6 +27,9 @@ public function __invoke(ObservableInterface $observable, ObserverInterface $obs
if (($this->predicate)($value)) {
$observer->onNext($value);
} else {
if ($this->inclusive) {
$observer->onNext($value);
}
$observer->onCompleted();
}
} catch (\Throwable $e) {
Expand Down
45 changes: 45 additions & 0 deletions test/Rx/Functional/Operator/TakeWhileTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -440,4 +440,49 @@ private function isPrime($num)

return true;
}

/**
* @test
*/
public function takeWhile_inclusive()
{
$error = new \Exception();

$xs = $this->createHotObservable([
onNext(90, -1),
onNext(110, -1),
onNext(210, 2),
onNext(260, 5),
onError(270, $error),
onCompleted(280),
onNext(290, 13),
onNext(320, 3),
onNext(350, 7),
onNext(390, 4),
onNext(410, 17),
onNext(450, 8),
onNext(500, 23)
]);

$invoked = 0;

$results = $this->scheduler->startWithCreate(function () use ($xs, &$invoked) {
return $xs->takeWhile(function ($x) use (&$invoked) {
$invoked++;
return $x != 5;
}, true);
});

$this->assertMessages([
onNext(210, 2),
onNext(260, 5),
onCompleted(260)
], $results->getMessages());

$this->assertSubscriptions([
subscribe(200, 260)
], $xs->getSubscriptions());

$this->assertEquals(2, $invoked);
}
}

0 comments on commit 7d326cd

Please sign in to comment.