diff --git a/src/Observable.php b/src/Observable.php index 2d97b7b5..a10d2e0d 100644 --- a/src/Observable.php +++ b/src/Observable.php @@ -675,10 +675,10 @@ public function takeUntil(ObservableInterface $other): Observable * @operator * @reactivex takeWhile */ - public function takeWhile(callable $predicate): Observable + public function takeWhile(callable $predicate, bool $inclusive = false): Observable { - return $this->lift(function () use ($predicate) { - return new TakeWhileOperator($predicate); + return $this->lift(function () use ($predicate, $inclusive) { + return new TakeWhileOperator($predicate, $inclusive); }); } diff --git a/src/Operator/TakeWhileOperator.php b/src/Operator/TakeWhileOperator.php index 02e21e6f..8d0fb35b 100644 --- a/src/Operator/TakeWhileOperator.php +++ b/src/Operator/TakeWhileOperator.php @@ -12,10 +12,12 @@ final class TakeWhileOperator implements OperatorInterface { private $predicate; + private $inclusive; - public function __construct(callable $predicate) + public function __construct(callable $predicate, bool $inclusive = false) { $this->predicate = $predicate; + $this->inclusive = $inclusive; } public function __invoke(ObservableInterface $observable, ObserverInterface $observer): DisposableInterface @@ -25,6 +27,9 @@ public function __invoke(ObservableInterface $observable, ObserverInterface $obs if (($this->predicate)($value)) { $observer->onNext($value); } else { + if ($this->inclusive) { + $observer->onNext($value); + } $observer->onCompleted(); } } catch (\Throwable $e) { diff --git a/test/Rx/Functional/Operator/TakeWhileTest.php b/test/Rx/Functional/Operator/TakeWhileTest.php index 041756b8..5fd24541 100644 --- a/test/Rx/Functional/Operator/TakeWhileTest.php +++ b/test/Rx/Functional/Operator/TakeWhileTest.php @@ -440,4 +440,49 @@ private function isPrime($num) return true; } + + /** + * @test + */ + public function takeWhile_inclusive() + { + $error = new \Exception(); + + $xs = $this->createHotObservable([ + onNext(90, -1), + onNext(110, -1), + onNext(210, 2), + onNext(260, 5), + onError(270, $error), + onCompleted(280), + onNext(290, 13), + onNext(320, 3), + onNext(350, 7), + onNext(390, 4), + onNext(410, 17), + onNext(450, 8), + onNext(500, 23) + ]); + + $invoked = 0; + + $results = $this->scheduler->startWithCreate(function () use ($xs, &$invoked) { + return $xs->takeWhile(function ($x) use (&$invoked) { + $invoked++; + return $x != 5; + }, true); + }); + + $this->assertMessages([ + onNext(210, 2), + onNext(260, 5), + onCompleted(260) + ], $results->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 260) + ], $xs->getSubscriptions()); + + $this->assertEquals(2, $invoked); + } }