From 5db6e71fa93d35c0a7889203a2b1c7ae5e663362 Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Mon, 7 Aug 2023 11:29:29 +0200 Subject: [PATCH] Add support template types --- .github/workflows/ci.yml | 24 ++++++++++ composer.json | 5 ++- phpstan.neon.dist | 12 +++++ src/Observable.php | 33 +++++++------- src/Observable/AnonymousObservable.php | 8 ++++ src/Observable/ArrayObservable.php | 14 ++++++ src/Observable/ConnectableObservable.php | 13 ++++-- src/Observable/EmptyObservable.php | 7 +++ src/Observable/ErrorObservable.php | 11 +++++ src/Observable/ForkJoinObservable.php | 18 +++++++- src/Observable/GroupedObservable.php | 22 +++++++++ src/Observable/IntervalObservable.php | 7 +++ src/Observable/IteratorObservable.php | 12 ++++- src/Observable/MulticastObservable.php | 14 +++--- src/Observable/NeverObservable.php | 4 ++ src/Observable/RangeObservable.php | 13 ++++++ src/Observable/RefCountObservable.php | 10 ++++- src/Observable/ReturnObservable.php | 14 ++++++ src/Observable/TimerObservable.php | 10 +++++ src/ObservableInterface.php | 5 ++- src/Observer/CallbackObserver.php | 2 +- src/React/Promise.php | 23 +++++----- src/Subject/AsyncSubject.php | 8 ++-- src/Subject/BehaviorSubject.php | 15 +++++++ src/Subject/InnerSubscriptionDisposable.php | 17 +++++++ src/Subject/ReplaySubject.php | 19 +++++++- src/Subject/Subject.php | 45 ++++++++++++++++++- .../Rx/Functional/Promise/FromPromiseTest.php | 2 +- test/types/array-observable.php | 11 +++++ test/types/observable-from-array.php | 16 +++++++ test/types/observable-from-promise.php | 13 ++++++ test/types/observable-of.php | 14 ++++++ test/types/promise-to-observable.php | 13 ++++++ 33 files changed, 405 insertions(+), 49 deletions(-) create mode 100644 phpstan.neon.dist create mode 100644 test/types/array-observable.php create mode 100644 test/types/observable-from-array.php create mode 100644 test/types/observable-from-promise.php create mode 100644 test/types/observable-of.php create mode 100644 test/types/promise-to-observable.php diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b98344d0..c233b8c7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,3 +29,27 @@ jobs: coverage: xdebug - run: composer install - run: vendor/bin/phpunit --coverage-text + PHPStan: + name: PHPStan (PHP ${{ matrix.php }} on ${{ matrix.os }}) + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: + - ubuntu-20.04 + - windows-2019 + php: + - 8.2 + - 8.1 + - 8.0 + - 7.4 + - 7.3 + - 7.2 + steps: + - uses: actions/checkout@v3 + - uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php }} + coverage: none + - run: composer install + - run: vendor/bin/phpstan diff --git a/composer.json b/composer.json index df3eb4bf..8a0919c0 100644 --- a/composer.json +++ b/composer.json @@ -21,12 +21,13 @@ ], "require": { "php": ">=7.0.0", - "react/promise": "~2.2" + "react/promise": "^3 || ~2.2" }, "require-dev": { "satooshi/php-coveralls": "~1.0", "phpunit/phpunit": "^8.5 || ^9", - "react/event-loop": "^1.0 || ^0.5 || ^0.4.2" + "react/event-loop": "^1.0 || ^0.5 || ^0.4.2", + "phpstan/phpstan": "^1.10" }, "suggest": { "react/event-loop": "Used for scheduling async operations" diff --git a/phpstan.neon.dist b/phpstan.neon.dist new file mode 100644 index 00000000..cc580c61 --- /dev/null +++ b/phpstan.neon.dist @@ -0,0 +1,12 @@ +parameters: + level: max + + paths: + - src/Observable.php + - src/ObservableInterface.php + - src/Observable/ + - src/Subject/ + - test/types/ + + fileExtensions: + - php diff --git a/src/Observable.php b/src/Observable.php index 79db73ba..62620c5c 100644 --- a/src/Observable.php +++ b/src/Observable.php @@ -78,10 +78,13 @@ use Rx\Subject\ReplaySubject; use Rx\Subject\Subject; +/** + * @template-covariant T + */ abstract class Observable implements ObservableInterface { /** - * @param callable|ObserverInterface|null $onNextOrObserver + * @param callable(T)|ObserverInterface|null $onNextOrObserver * @param callable|null $onError * @param callable|null $onCompleted * @return DisposableInterface @@ -175,15 +178,15 @@ public static function interval(int $interval, AsyncSchedulerInterface $schedule /** * Returns an observable sequence that contains a single element. * - * @param mixed $value Single element in the resulting observable sequence. - * @param SchedulerInterface $scheduler - * @return ReturnObservable An observable sequence with the single element. + * @param T $value Single element in the resulting observable sequence. + * @param ?SchedulerInterface $scheduler + * @return Observable * * @demo of/of.php * @operator * @reactivex just */ - public static function of($value, SchedulerInterface $scheduler = null): ReturnObservable + public static function of($value, SchedulerInterface $scheduler = null): Observable { return new ReturnObservable($value, $scheduler ?: Scheduler::getDefault()); } @@ -194,9 +197,9 @@ public static function of($value, SchedulerInterface $scheduler = null): ReturnO * * @param $value * @param SchedulerInterface|null $scheduler - * @return ReturnObservable + * @return Observable */ - public static function just($value, SchedulerInterface $scheduler = null): ReturnObservable + public static function just($value, SchedulerInterface $scheduler = null): Observable { return static::of($value, $scheduler); } @@ -296,15 +299,15 @@ public function mergeAll(): Observable /** * Converts an array to an observable sequence * - * @param array $array - * @param SchedulerInterface $scheduler - * @return ArrayObservable + * @param array $array + * @param ?SchedulerInterface $scheduler + * @return ObservableInterface * * @demo fromArray/fromArray.php * @operator * @reactivex from */ - public static function fromArray(array $array, SchedulerInterface $scheduler = null): ArrayObservable + public static function fromArray(array $array, SchedulerInterface $scheduler = null): ObservableInterface { return new ArrayObservable($array, $scheduler ?: Scheduler::getDefault()); } @@ -312,9 +315,9 @@ public static function fromArray(array $array, SchedulerInterface $scheduler = n /** * Converts an Iterator into an observable sequence * - * @param \Iterator $iterator + * @param \Iterator $iterator * @param SchedulerInterface $scheduler - * @return IteratorObservable + * @return ObservableInterface * * @demo iterator/iterator.php * @operator @@ -2049,8 +2052,8 @@ public function finally(callable $callback): Observable /** * Converts a promise into an observable * - * @param PromiseInterface $promise - * @return Observable + * @param PromiseInterface $promise + * @return Observable * @throws \InvalidArgumentException * * @demo promise/fromPromise.php diff --git a/src/Observable/AnonymousObservable.php b/src/Observable/AnonymousObservable.php index edcfd693..24363bd7 100644 --- a/src/Observable/AnonymousObservable.php +++ b/src/Observable/AnonymousObservable.php @@ -7,11 +7,19 @@ use Rx\Disposable\CallbackDisposable; use Rx\DisposableInterface; use Rx\Observable; +use Rx\ObservableInterface; use Rx\ObserverInterface; use Rx\Observer\AutoDetachObserver; +/** + * @template T + * @template-extends Observable + */ class AnonymousObservable extends Observable { + /** + * @var callable + */ private $subscribeAction; public function __construct(callable $subscribeAction) diff --git a/src/Observable/ArrayObservable.php b/src/Observable/ArrayObservable.php index 7b1be03e..32ecf2b7 100644 --- a/src/Observable/ArrayObservable.php +++ b/src/Observable/ArrayObservable.php @@ -6,15 +6,29 @@ use Rx\DisposableInterface; use Rx\Observable; +use Rx\ObservableInterface; use Rx\ObserverInterface; use Rx\SchedulerInterface; +/** + * @template T + * @template-extends Observable + */ class ArrayObservable extends Observable { + /** + * @var array + */ private $data; + /** + * @var SchedulerInterface + */ private $scheduler; + /** + * @param array $data + */ public function __construct(array $data, SchedulerInterface $scheduler) { $this->data = $data; diff --git a/src/Observable/ConnectableObservable.php b/src/Observable/ConnectableObservable.php index 377fc32f..f7e53abd 100644 --- a/src/Observable/ConnectableObservable.php +++ b/src/Observable/ConnectableObservable.php @@ -12,18 +12,20 @@ use Rx\Subject\Subject; /** + * @template T + * @template-extends Observable * Class ConnectableObservable * @package Rx\Observable */ class ConnectableObservable extends Observable { - /** @var \Rx\Subject\Subject */ + /** @var \Rx\Subject\Subject */ protected $subject; /** @var BinaryDisposable */ protected $subscription; - /** @var Observable */ + /** @var Observable */ protected $sourceObservable; /** @var bool */ @@ -31,8 +33,8 @@ class ConnectableObservable extends Observable /** * ConnectableObservable constructor. - * @param Observable $source - * @param \Rx\Subject\Subject $subject + * @param Observable $source + * @param \Rx\Subject\Subject $subject */ public function __construct(Observable $source, Subject $subject = null) { @@ -63,6 +65,9 @@ public function connect(): DisposableInterface return $this->subscription; } + /** + * @return RefCountObservable + */ public function refCount(): RefCountObservable { return new RefCountObservable($this); diff --git a/src/Observable/EmptyObservable.php b/src/Observable/EmptyObservable.php index 57b4b3b6..b15cd772 100644 --- a/src/Observable/EmptyObservable.php +++ b/src/Observable/EmptyObservable.php @@ -9,8 +9,15 @@ use Rx\ObserverInterface; use Rx\SchedulerInterface; +/** + * @template T + * @template-extends Observable + */ class EmptyObservable extends Observable { + /** + * @var SchedulerInterface + */ private $scheduler; public function __construct(SchedulerInterface $scheduler) diff --git a/src/Observable/ErrorObservable.php b/src/Observable/ErrorObservable.php index 53a95d58..b962dc46 100644 --- a/src/Observable/ErrorObservable.php +++ b/src/Observable/ErrorObservable.php @@ -9,9 +9,20 @@ use Rx\ObserverInterface; use Rx\SchedulerInterface; +/** + * @template T + * @template-extends Observable + */ class ErrorObservable extends Observable { + /** + * @var \Throwable + */ private $error; + + /** + * @var SchedulerInterface + */ private $scheduler; public function __construct(\Throwable $error, SchedulerInterface $scheduler) diff --git a/src/Observable/ForkJoinObservable.php b/src/Observable/ForkJoinObservable.php index d7f35221..ef9fe83f 100644 --- a/src/Observable/ForkJoinObservable.php +++ b/src/Observable/ForkJoinObservable.php @@ -10,19 +10,35 @@ use Rx\ObserverInterface; use Rx\Disposable\CompositeDisposable; +/** + * @template T + * @template-extends Observable + */ class ForkJoinObservable extends Observable { /** - * @var Observable[] + * @var array> */ private $observables; + /** + * @var array + */ private $values = []; + /** + * @var int + */ private $completed = 0; + /** + * @var callable|null + */ private $resultSelector; + /** + * @param array> $observables + */ public function __construct(array $observables = [], callable $resultSelector = null) { $this->observables = $observables; diff --git a/src/Observable/GroupedObservable.php b/src/Observable/GroupedObservable.php index 36ba1d33..656e4eb9 100644 --- a/src/Observable/GroupedObservable.php +++ b/src/Observable/GroupedObservable.php @@ -11,11 +11,26 @@ use Rx\Disposable\RefCountDisposable; use Rx\DisposableInterface; +/** + * @template T + * @template-extends Observable + */ class GroupedObservable extends Observable { + /** + * @var mixed + */ private $key; + + /** + * @var ObservableInterface + */ private $underlyingObservable; + /** + * @param mixed $key + * @param ObservableInterface $underlyingObservable + */ public function __construct($key, ObservableInterface $underlyingObservable, RefCountDisposable $mergedDisposable = null) { $this->key = $key; @@ -25,6 +40,9 @@ public function __construct($key, ObservableInterface $underlyingObservable, Ref $this->newUnderlyingObservable($mergedDisposable, $underlyingObservable); } + /** + * @return mixed + */ public function getKey() { return $this->key; @@ -35,6 +53,10 @@ protected function _subscribe(ObserverInterface $observer): DisposableInterface return $this->underlyingObservable->subscribe($observer); } + /** + * @param ObservableInterface $underlyingObservable + * @return Observable + */ private function newUnderlyingObservable(RefCountDisposable $mergedDisposable, ObservableInterface $underlyingObservable): Observable { return new AnonymousObservable( diff --git a/src/Observable/IntervalObservable.php b/src/Observable/IntervalObservable.php index 90845c1e..92cbd3cb 100644 --- a/src/Observable/IntervalObservable.php +++ b/src/Observable/IntervalObservable.php @@ -9,8 +9,15 @@ use Rx\ObserverInterface; use Rx\AsyncSchedulerInterface; +/** + * @template T + * @template-extends Observable + */ class IntervalObservable extends Observable { + /** + * @var int + */ private $interval; /** @var AsyncSchedulerInterface */ diff --git a/src/Observable/IteratorObservable.php b/src/Observable/IteratorObservable.php index 3bfb3958..ddcef1a1 100644 --- a/src/Observable/IteratorObservable.php +++ b/src/Observable/IteratorObservable.php @@ -9,13 +9,23 @@ use Rx\ObserverInterface; use Rx\SchedulerInterface; +/** + * @template T + * @template-extends Observable + */ class IteratorObservable extends Observable { + /** + * @var \Iterator + */ private $items; + /** + * @var SchedulerInterface + */ private $scheduler; - public function __construct(\Iterator $items, SchedulerInterface $scheduler = null) + public function __construct(\Iterator $items, SchedulerInterface $scheduler) { $this->items = $items; $this->scheduler = $scheduler; diff --git a/src/Observable/MulticastObservable.php b/src/Observable/MulticastObservable.php index 027340ab..65db6217 100644 --- a/src/Observable/MulticastObservable.php +++ b/src/Observable/MulticastObservable.php @@ -10,25 +10,27 @@ use Rx\ObserverInterface; /** + * @template T + * @template-extends Observable * Class MulticastObservable * @package Rx\Observable */ class MulticastObservable extends Observable { - /** @var \Rx\Observable */ + /** @var \Rx\Observable */ private $source; - /** @var callable */ + /** @var callable */ private $fn1; - /** @var callable */ + /** @var callable */ private $fn2; /** * MulticastObservable constructor. - * @param $source - * @param $fn1 - * @param $fn2 + * @param Observable $source + * @param callable $fn1 + * @param callable $fn2 */ public function __construct(Observable $source, callable $fn1, callable $fn2) { diff --git a/src/Observable/NeverObservable.php b/src/Observable/NeverObservable.php index 34925faa..8588437f 100644 --- a/src/Observable/NeverObservable.php +++ b/src/Observable/NeverObservable.php @@ -9,6 +9,10 @@ use Rx\Observable; use Rx\ObserverInterface; +/** + * @template T + * @template-extends Observable + */ class NeverObservable extends Observable { protected function _subscribe(ObserverInterface $observer): DisposableInterface diff --git a/src/Observable/RangeObservable.php b/src/Observable/RangeObservable.php index 785033a0..73c7fffa 100644 --- a/src/Observable/RangeObservable.php +++ b/src/Observable/RangeObservable.php @@ -9,12 +9,25 @@ use Rx\ObserverInterface; use Rx\SchedulerInterface; +/** + * @template T + * @template-extends Observable + */ class RangeObservable extends Observable { + /** + * @var int + */ private $start; + /** + * @var int + */ private $count; + /** + * @var SchedulerInterface + */ private $scheduler; public function __construct(int $start, int $count, SchedulerInterface $scheduler) diff --git a/src/Observable/RefCountObservable.php b/src/Observable/RefCountObservable.php index 970d055d..fb65fe4f 100644 --- a/src/Observable/RefCountObservable.php +++ b/src/Observable/RefCountObservable.php @@ -11,20 +11,26 @@ use Rx\ObserverInterface; /** + * @template T + * @template-extends Observable + * * Class RefCountObservable * @package Rx\Observable */ class RefCountObservable extends Observable { - /** @var \Rx\Observable\ConnectableObservable */ + /** @var \Rx\Observable\ConnectableObservable */ protected $source; /** @var int */ protected $count; - /** @var BinaryDisposable */ + /** @var DisposableInterface */ protected $connectableSubscription; + /** + * @param ConnectableObservable $source + */ public function __construct(ConnectableObservable $source) { $this->source = $source; diff --git a/src/Observable/ReturnObservable.php b/src/Observable/ReturnObservable.php index c777e148..73c7160e 100644 --- a/src/Observable/ReturnObservable.php +++ b/src/Observable/ReturnObservable.php @@ -10,11 +10,25 @@ use Rx\ObserverInterface; use Rx\SchedulerInterface; +/** + * @template T + * @template-extends Observable + */ class ReturnObservable extends Observable { + /** + * @var T + */ private $value; + + /** + * @var SchedulerInterface + */ private $scheduler; + /** + * @param T $value + */ public function __construct($value, SchedulerInterface $scheduler) { $this->value = $value; diff --git a/src/Observable/TimerObservable.php b/src/Observable/TimerObservable.php index 1402b7a0..65bff764 100644 --- a/src/Observable/TimerObservable.php +++ b/src/Observable/TimerObservable.php @@ -9,10 +9,20 @@ use Rx\ObserverInterface; use Rx\AsyncSchedulerInterface; +/** + * @template T + * @template-extends Observable + */ class TimerObservable extends Observable { + /** + * @var int + */ private $dueTime; + /** + * @var AsyncSchedulerInterface + */ private $scheduler; public function __construct(int $dueTime, AsyncSchedulerInterface $scheduler) diff --git a/src/ObservableInterface.php b/src/ObservableInterface.php index 754118f6..376139ed 100644 --- a/src/ObservableInterface.php +++ b/src/ObservableInterface.php @@ -4,10 +4,13 @@ namespace Rx; +/** + * @template-covariant T + */ interface ObservableInterface { /** - * @param callable|ObserverInterface|null $onNextOrObserver + * @param callable(T)|ObserverInterface|null $onNextOrObserver * @param callable|null $onError * @param callable|null $onCompleted * @return DisposableInterface diff --git a/src/Observer/CallbackObserver.php b/src/Observer/CallbackObserver.php index 2234c028..90456ae4 100644 --- a/src/Observer/CallbackObserver.php +++ b/src/Observer/CallbackObserver.php @@ -6,7 +6,7 @@ class CallbackObserver extends AbstractObserver { - /** @var callable|null */ + /** @var callable(T)|null */ private $onNext; /** @var callable|null */ diff --git a/src/React/Promise.php b/src/React/Promise.php index c8efacfc..dcea17e6 100644 --- a/src/React/Promise.php +++ b/src/React/Promise.php @@ -2,7 +2,6 @@ namespace Rx\React; -use React\Promise\CancellablePromiseInterface; use React\Promise\Promise as ReactPromise; use React\Promise\PromiseInterface; use Rx\Disposable\CallbackDisposable; @@ -11,12 +10,16 @@ use Rx\Observable\AnonymousObservable; use Rx\Subject\AsyncSubject; use React\Promise\Deferred; +use Throwable; +/** + * @template T + */ final class Promise { /** - * @param mixed $value - * @return ReactPromise A promise resolved to $value + * @param T $value + * @return PromiseInterface A promise resolved to $value */ public static function resolved($value): ReactPromise { @@ -27,21 +30,21 @@ public static function resolved($value): ReactPromise /** * @param mixed $exception - * @return ReactPromise A promise rejected with $exception + * @return PromiseInterface A promise rejected with $exception */ public static function rejected($exception): ReactPromise { $d = new Deferred(); - $d->reject($exception); + $d->reject($exception instanceof Throwable ? $exception : new RejectedPromiseException($exception)); return $d->promise(); } /** * Converts an existing observable sequence to React Promise * - * @param ObservableInterface $observable + * @param ObservableInterface $observable * @param Deferred $deferred - * @return ReactPromise + * @return ReactPromise * @throws \InvalidArgumentException */ public static function fromObservable(ObservableInterface $observable, Deferred $deferred = null): ReactPromise @@ -71,8 +74,8 @@ function () use ($d, &$value) { /** * Converts a Promise to an Observable sequence * - * @param PromiseInterface $promise - * @return Observable + * @param PromiseInterface $promise + * @return Observable * @throws \InvalidArgumentException */ public static function toObservable(PromiseInterface $promise): Observable @@ -94,7 +97,7 @@ function ($error) use ($subject) { $disp = $subject->subscribe($observer); return new CallbackDisposable(function () use ($p, $disp) { $disp->dispose(); - if ($p instanceof CancellablePromiseInterface) { + if (\method_exists($p, 'cancel')) { $p->cancel(); } }); diff --git a/src/Subject/AsyncSubject.php b/src/Subject/AsyncSubject.php index ef3200cc..b2b1e55c 100644 --- a/src/Subject/AsyncSubject.php +++ b/src/Subject/AsyncSubject.php @@ -9,13 +9,15 @@ use Rx\ObserverInterface; /** + * @template T + * @template-extends Subject * Class AsyncSubject * @package Rx\Subject */ class AsyncSubject extends Subject { /** - * @var + * @var T */ private $value; @@ -26,7 +28,7 @@ class AsyncSubject extends Subject private $valueSet = false; /** - * @return mixed + * @return T */ public function getValue() { @@ -34,7 +36,7 @@ public function getValue() } /** - * @param $value + * @param T $value */ public function onNext($value) { diff --git a/src/Subject/BehaviorSubject.php b/src/Subject/BehaviorSubject.php index 4729b75f..d8c991bb 100644 --- a/src/Subject/BehaviorSubject.php +++ b/src/Subject/BehaviorSubject.php @@ -5,17 +5,32 @@ namespace Rx\Subject; use Rx\DisposableInterface; +use Rx\Observable; +use Rx\ObservableInterface; use Rx\ObserverInterface; +/** + * @template T + * @template-extends Subject + */ class BehaviorSubject extends Subject { + /** + * @var T|null + */ private $value; + /** + * @param T $initValue + */ public function __construct($initValue = null) { $this->value = $initValue; } + /** + * @return ?T + */ public function getValue() { return $this->value; diff --git a/src/Subject/InnerSubscriptionDisposable.php b/src/Subject/InnerSubscriptionDisposable.php index ccea81d7..66f70a0b 100644 --- a/src/Subject/InnerSubscriptionDisposable.php +++ b/src/Subject/InnerSubscriptionDisposable.php @@ -7,17 +7,34 @@ use Rx\DisposableInterface; use Rx\ObserverInterface; +/** + * @template T + */ class InnerSubscriptionDisposable implements DisposableInterface { + /** + * @var ?ObserverInterface + */ private $observer; + + /** + * @var Subject + */ private $subject; + /** + * @param Subject $subject + * @param ObserverInterface $observer + */ public function __construct(Subject $subject, ObserverInterface $observer) { $this->subject = $subject; $this->observer = $observer; } + /** + * @return void + */ public function dispose() { if ($this->subject->isDisposed()) { diff --git a/src/Subject/ReplaySubject.php b/src/Subject/ReplaySubject.php index e0ac85d8..5b986dd4 100644 --- a/src/Subject/ReplaySubject.php +++ b/src/Subject/ReplaySubject.php @@ -6,12 +6,17 @@ use Rx\Disposable\CallbackDisposable; use Rx\DisposableInterface; +use Rx\Observable; +use Rx\ObservableInterface; use Rx\Observer\ScheduledObserver; use Rx\ObserverInterface; use Rx\Scheduler; use Rx\SchedulerInterface; /** + * @template T + * @template-extends Subject + * * Represents an object that is both an observable sequence as well as an observer. * Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies. */ @@ -81,6 +86,10 @@ protected function _subscribe(ObserverInterface $observer): DisposableInterface return $subscription; } + /** + * @param T $value + * @return void + */ public function onNext($value) { $this->assertNotDisposed(); @@ -144,7 +153,12 @@ public function onError(\Throwable $exception) $this->observers = []; } - private function createRemovableDisposable($subject, $observer): DisposableInterface + /** + * @param Subject $subject + * @param ScheduledObserver $observer + * @return DisposableInterface + */ + private function createRemovableDisposable(Subject $subject, ScheduledObserver $observer): DisposableInterface { return new CallbackDisposable(function () use ($observer, $subject) { $observer->dispose(); @@ -154,6 +168,9 @@ private function createRemovableDisposable($subject, $observer): DisposableInter }); } + /** + * @return void + */ private function trim() { if (count($this->queue) > $this->bufferSize) { diff --git a/src/Subject/Subject.php b/src/Subject/Subject.php index f5e2cdcb..73ce6cdf 100644 --- a/src/Subject/Subject.php +++ b/src/Subject/Subject.php @@ -8,13 +8,34 @@ use Rx\Disposable\EmptyDisposable; use Rx\Observable; use Rx\DisposableInterface; +use Rx\ObservableInterface; use Rx\ObserverInterface; -class Subject extends Observable implements ObserverInterface, DisposableInterface +/** + * @template T + * @template-extends Observable + * @template-implements ObservableInterface + */ +class Subject extends Observable implements ObserverInterface, DisposableInterface, ObservableInterface { + /** + * @var ?\Throwable + */ protected $exception; + + /** + * @var bool + */ protected $isDisposed = false; + + /** + * @var bool + */ protected $isStopped = false; + + /** + * @var array + */ protected $observers = []; protected function _subscribe(ObserverInterface $observer): DisposableInterface @@ -38,16 +59,25 @@ protected function _subscribe(ObserverInterface $observer): DisposableInterface return new EmptyDisposable(); } + /** + * @return bool + */ public function isDisposed() { return $this->isDisposed; } + /** + * @return bool + */ public function hasObservers() { return count($this->observers) > 0; } + /** + * @return void + */ protected function assertNotDisposed() { if ($this->isDisposed) { @@ -55,6 +85,9 @@ protected function assertNotDisposed() } } + /** + * @return void + */ public function onCompleted() { $this->assertNotDisposed(); @@ -73,6 +106,9 @@ public function onCompleted() $this->observers = []; } + /** + * @return void + */ public function onError(\Throwable $exception) { $this->assertNotDisposed(); @@ -92,6 +128,10 @@ public function onError(\Throwable $exception) $this->observers = []; } + /** + * @param T $value + * @return void + */ public function onNext($value) { $this->assertNotDisposed(); @@ -106,6 +146,9 @@ public function onNext($value) } } + /** + * @return void + */ public function dispose() { $this->isDisposed = true; diff --git a/test/Rx/Functional/Promise/FromPromiseTest.php b/test/Rx/Functional/Promise/FromPromiseTest.php index a3be98da..16483c8e 100644 --- a/test/Rx/Functional/Promise/FromPromiseTest.php +++ b/test/Rx/Functional/Promise/FromPromiseTest.php @@ -40,7 +40,7 @@ function () { */ public function from_promise_failure() { - $p = \React\Promise\reject('error'); + $p = \React\Promise\reject(new RejectedPromiseException('error')); $source = Observable::fromPromise($p); diff --git a/test/types/array-observable.php b/test/types/array-observable.php new file mode 100644 index 00000000..bf9f33c6 --- /dev/null +++ b/test/types/array-observable.php @@ -0,0 +1,11 @@ +', new ArrayObservable([true, false], new ImmediateScheduler())); +//assertType('Rx\Observable', new ArrayObservable([true, false], new ImmediateScheduler())); +//assertType('Rx\ObservableInterface', new ArrayObservable([true, false], new ImmediateScheduler())); diff --git a/test/types/observable-from-array.php b/test/types/observable-from-array.php new file mode 100644 index 00000000..c8f3c999 --- /dev/null +++ b/test/types/observable-from-array.php @@ -0,0 +1,16 @@ +', Observable::fromArray([true, false])); +//assertType('Rx\Observable\ArrayObservable', Observable::fromArray([true, false])); +//assertType('Rx\Observable', Observable::fromArray([true, false])); +//// +////assertType('Rx\ObservableInterface', new ArrayObservable([true, false], new ImmediateScheduler())); +////assertType('Rx\Observable\ArrayObservable', new ArrayObservable([true, false], new ImmediateScheduler())); +////assertType('Rx\Observable', new ArrayObservable([true, false], new ImmediateScheduler())); diff --git a/test/types/observable-from-promise.php b/test/types/observable-from-promise.php new file mode 100644 index 00000000..2e9a7dcd --- /dev/null +++ b/test/types/observable-from-promise.php @@ -0,0 +1,13 @@ +', Observable::fromPromise(resolve(false))); +//assertType('Rx\Observable\ArrayObservable', Observable::fromPromise(resolve(false))); +//assertType('Rx\Observable', Observable::fromPromise(resolve(false))); diff --git a/test/types/observable-of.php b/test/types/observable-of.php new file mode 100644 index 00000000..09dc6472 --- /dev/null +++ b/test/types/observable-of.php @@ -0,0 +1,14 @@ +', new ReturnObservable(true, new ImmediateScheduler())); +assertType('Rx\Observable', Observable::of(true)); +assertType('Rx\Observable', Observable::of(false)); +assertType('Rx\Observable', Observable::of(time())); diff --git a/test/types/promise-to-observable.php b/test/types/promise-to-observable.php new file mode 100644 index 00000000..37fa9c54 --- /dev/null +++ b/test/types/promise-to-observable.php @@ -0,0 +1,13 @@ +', Promise::toObservable(resolve(false))); +assertType('Rx\Observable\ArrayObservable', Promise::toObservable(resolve(false))); +assertType('Rx\Observable', Promise::toObservable(resolve(false))); +assertType('Rx\Observable', Promise::toObservable(resolve(false)));