diff --git a/reactivex/observable/observable.py b/reactivex/observable/observable.py index b32e237e..8cf2086d 100644 --- a/reactivex/observable/observable.py +++ b/reactivex/observable/observable.py @@ -236,7 +236,7 @@ def pipe(self, *operators: Callable[[Any], Any]) -> Any: return pipe_(self, *operators) - def run(self) -> Any: + def run(self) -> _T_out: """Run source synchronously. Subscribes to the observable source. Then blocks and waits for the diff --git a/reactivex/operators/__init__.py b/reactivex/operators/__init__.py index c4847454..2cae6c17 100644 --- a/reactivex/operators/__init__.py +++ b/reactivex/operators/__init__.py @@ -63,9 +63,14 @@ def all(predicate: Predicate[_T]) -> Callable[[Observable[_T]], Observable[bool] .. marble:: :alt: all - --1--2--3--4--5-| - [ all(i: i<10) ] - ----------------true-| + --1--2--3--4--5--6----| + [ all(i: i<8) ] + ------------------true| + + + --1--2--3--4--5--6----| + [ all(i: i<4) ] + ------false| Example: >>> op = all(lambda value: value.length > 3) @@ -78,6 +83,13 @@ def all(predicate: Predicate[_T]) -> Callable[[Observable[_T]], Observable[bool] returns an observable sequence containing a single element determining whether all elements in the source sequence pass the test in the specified predicate. + + If a predicate returns false, the result sequence emits false + and completes immediately, regardless of the state of the + source sequence. + + If all items pass the predicate test, the emission of true + will only happen as the source completes. """ from ._all import all_ @@ -90,9 +102,8 @@ def amb(right_source: Observable[_T]) -> Callable[[Observable[_T]], Observable[_ .. marble:: :alt: amb - ---8--6--9-----------| + ---8--6--9---------| --1--2--3---5--------| - ----------10-20-30---| [ amb() ] --1--2--3---5--------| @@ -2611,7 +2622,7 @@ def scan( Applies an accumulator function over an observable sequence and returns each intermediate result. The optional seed value is used as the initial accumulator value. For aggregation behavior with no - intermediate results, see `aggregate()` or `Observable()`. + intermediate results, see `reduce()` or `Observable()`. .. marble:: :alt: scan @@ -2705,12 +2716,29 @@ def single( the condition in the optional predicate, and reports an exception if there is not exactly one element in the observable sequence. + If the predicates does not match any item, the resulting sequence + errors once the source completes. + + If the predicate matches more than one item, the resulting sequence + errors immediately, without waiting for the source to complete. + + If the source never completes, the resulting sequence does not + emit anything, nor completes. + .. marble:: :alt: single ----1--2--3--4-----| - [ single(3) ] - ----------3--------| + [ single(x==3) ] + -----------------3-| + + ----1--3--3--4-----| + [ single(x==3) ] + ----------x + + ----1--1--1--1-----| + [ single(x==3) ] + -------------------x Example: >>> res = single() diff --git a/reactivex/operators/_amb.py b/reactivex/operators/_amb.py index 49bf94ad..3dc9398d 100644 --- a/reactivex/operators/_amb.py +++ b/reactivex/operators/_amb.py @@ -1,92 +1,91 @@ from asyncio import Future -from typing import Callable, List, Optional, TypeVar, Union +from typing import List, Optional, TypeVar, Union from reactivex import Observable, abc, from_future +from reactivex.curry import curry_flip from reactivex.disposable import CompositeDisposable, SingleAssignmentDisposable _T = TypeVar("_T") +@curry_flip(1) def amb_( - right_source: Union[Observable[_T], "Future[_T]"] -) -> Callable[[Observable[_T]], Observable[_T]]: + left_source: Observable[_T], right_source: Union[Observable[_T], "Future[_T]"] +) -> Observable[_T]: if isinstance(right_source, Future): obs: Observable[_T] = from_future(right_source) else: obs = right_source - def amb(left_source: Observable[_T]) -> Observable[_T]: - def subscribe( - observer: abc.ObserverBase[_T], - scheduler: Optional[abc.SchedulerBase] = None, - ) -> abc.DisposableBase: - choice: List[Optional[str]] = [None] - left_choice = "L" - right_choice = "R" - left_subscription = SingleAssignmentDisposable() - right_subscription = SingleAssignmentDisposable() - - def choice_left(): - if not choice[0]: - choice[0] = left_choice - right_subscription.dispose() - - def choice_right(): - if not choice[0]: - choice[0] = right_choice - left_subscription.dispose() - - def on_next_left(value: _T) -> None: - with left_source.lock: - choice_left() - if choice[0] == left_choice: - observer.on_next(value) - - def on_error_left(err: Exception) -> None: - with left_source.lock: - choice_left() - if choice[0] == left_choice: - observer.on_error(err) - - def on_completed_left() -> None: - with left_source.lock: - choice_left() - if choice[0] == left_choice: - observer.on_completed() - - left_d = left_source.subscribe( - on_next_left, on_error_left, on_completed_left, scheduler=scheduler - ) - left_subscription.disposable = left_d - - def send_right(value: _T) -> None: - with left_source.lock: - choice_right() - if choice[0] == right_choice: - observer.on_next(value) - - def on_error_right(err: Exception) -> None: - with left_source.lock: - choice_right() - if choice[0] == right_choice: - observer.on_error(err) - - def on_completed_right() -> None: - with left_source.lock: - choice_right() - if choice[0] == right_choice: - observer.on_completed() - - right_d = obs.subscribe( - send_right, on_error_right, on_completed_right, scheduler=scheduler - ) - right_subscription.disposable = right_d - return CompositeDisposable(left_subscription, right_subscription) - - return Observable(subscribe) - - return amb + def subscribe( + observer: abc.ObserverBase[_T], + scheduler: Optional[abc.SchedulerBase] = None, + ) -> abc.DisposableBase: + choice: List[Optional[str]] = [None] + left_choice = "L" + right_choice = "R" + left_subscription = SingleAssignmentDisposable() + right_subscription = SingleAssignmentDisposable() + + def choice_left(): + if not choice[0]: + choice[0] = left_choice + right_subscription.dispose() + + def choice_right(): + if not choice[0]: + choice[0] = right_choice + left_subscription.dispose() + + def on_next_left(value: _T) -> None: + with left_source.lock: + choice_left() + if choice[0] == left_choice: + observer.on_next(value) + + def on_error_left(err: Exception) -> None: + with left_source.lock: + choice_left() + if choice[0] == left_choice: + observer.on_error(err) + + def on_completed_left() -> None: + with left_source.lock: + choice_left() + if choice[0] == left_choice: + observer.on_completed() + + left_d = left_source.subscribe( + on_next_left, on_error_left, on_completed_left, scheduler=scheduler + ) + left_subscription.disposable = left_d + + def send_right(value: _T) -> None: + with left_source.lock: + choice_right() + if choice[0] == right_choice: + observer.on_next(value) + + def on_error_right(err: Exception) -> None: + with left_source.lock: + choice_right() + if choice[0] == right_choice: + observer.on_error(err) + + def on_completed_right() -> None: + with left_source.lock: + choice_right() + if choice[0] == right_choice: + observer.on_completed() + + right_d = obs.subscribe( + send_right, on_error_right, on_completed_right, scheduler=scheduler + ) + right_subscription.disposable = right_d + return CompositeDisposable(left_subscription, right_subscription) + + return Observable(subscribe) __all__ = ["amb_"] diff --git a/reactivex/operators/_average.py b/reactivex/operators/_average.py index 99ab672e..291de905 100644 --- a/reactivex/operators/_average.py +++ b/reactivex/operators/_average.py @@ -1,7 +1,8 @@ from dataclasses import dataclass -from typing import Any, Callable, Optional, TypeVar, cast +from typing import Any, Optional, TypeVar, cast from reactivex import Observable, operators, typing +from reactivex.curry import curry_flip _T = TypeVar("_T") @@ -12,51 +13,47 @@ class AverageValue: count: int +@curry_flip(1) def average_( + source: Observable[Any], key_mapper: Optional[typing.Mapper[_T, float]] = None, -) -> Callable[[Observable[_T]], Observable[float]]: - def average(source: Observable[Any]) -> Observable[float]: - """Partially applied average operator. +) -> Observable[float]: + """Partially applied average operator. - Computes the average of an observable sequence of values that - are in the sequence or obtained by invoking a transform - function on each element of the input sequence if present. + Computes the average of an observable sequence of values that + are in the sequence or obtained by invoking a transform + function on each element of the input sequence if present. - Examples: - >>> res = average(source) + Examples: + >>> res = average(source) - Args: - source: Source observable to average. + Args: + source: Source observable to average. - Returns: - An observable sequence containing a single element with the - average of the sequence of values. - """ + Returns: + An observable sequence containing a single element with the + average of the sequence of values. + """ - key_mapper_: typing.Mapper[_T, float] = key_mapper or ( - lambda x: float(cast(Any, x)) - ) + key_mapper_: typing.Mapper[_T, float] = key_mapper or ( + lambda x: float(cast(Any, x)) + ) - def accumulator(prev: AverageValue, cur: float) -> AverageValue: - return AverageValue(sum=prev.sum + cur, count=prev.count + 1) + def accumulator(prev: AverageValue, cur: float) -> AverageValue: + return AverageValue(sum=prev.sum + cur, count=prev.count + 1) - def mapper(s: AverageValue) -> float: - if s.count == 0: - raise Exception("The input sequence was empty") + def mapper(s: AverageValue) -> float: + return s.sum / float(s.count) - return s.sum / float(s.count) + seed = AverageValue(sum=0, count=0) - seed = AverageValue(sum=0, count=0) - - ret = source.pipe( - operators.map(key_mapper_), - operators.scan(accumulator, seed), - operators.last(), - operators.map(mapper), - ) - return ret - - return average + ret = source.pipe( + operators.map(key_mapper_), + operators.scan(accumulator, seed), + operators.last(), + operators.map(mapper), + ) + return ret __all__ = ["average_"] diff --git a/reactivex/operators/_take.py b/reactivex/operators/_take.py index 744d7492..19086985 100644 --- a/reactivex/operators/_take.py +++ b/reactivex/operators/_take.py @@ -1,53 +1,52 @@ -from typing import Callable, Optional, TypeVar +from typing import Optional, TypeVar, cast from reactivex import Observable, abc, empty +from reactivex.curry import curry_flip from reactivex.internal import ArgumentOutOfRangeException _T = TypeVar("_T") -def take_(count: int) -> Callable[[Observable[_T]], Observable[_T]]: +@curry_flip(1) +def take_(source: Observable[_T], count: int) -> Observable[_T]: if count < 0: raise ArgumentOutOfRangeException() - def take(source: Observable[_T]) -> Observable[_T]: - """Returns a specified number of contiguous elements from the start of - an observable sequence. + """Returns a specified number of contiguous elements from the start of + an observable sequence. - >>> take(source) + >>> take(source) - Keyword arguments: - count -- The number of elements to return. + Keyword arguments: + count -- The number of elements to return. - Returns an observable sequence that contains the specified number of - elements from the start of the input sequence. - """ + Returns an observable sequence that contains the specified number of + elements from the start of the input sequence. + """ - if not count: - return empty() + if not count: + return cast(Observable[_T], empty()) - def subscribe( - observer: abc.ObserverBase[_T], - scheduler: Optional[abc.SchedulerBase] = None, - ): - remaining = count + def subscribe( + observer: abc.ObserverBase[_T], + scheduler: Optional[abc.SchedulerBase] = None, + ): + remaining = count - def on_next(value: _T) -> None: - nonlocal remaining + def on_next(value: _T) -> None: + nonlocal remaining - if remaining > 0: - remaining -= 1 - observer.on_next(value) - if not remaining: - observer.on_completed() + if remaining > 0: + remaining -= 1 + observer.on_next(value) + if not remaining: + observer.on_completed() - return source.subscribe( - on_next, observer.on_error, observer.on_completed, scheduler=scheduler - ) + return source.subscribe( + on_next, observer.on_error, observer.on_completed, scheduler=scheduler + ) - return Observable(subscribe) - - return take + return Observable(subscribe) __all__ = ["take_"] diff --git a/reactivex/operators/_takewhile.py b/reactivex/operators/_takewhile.py index 1b395d03..1cd92356 100644 --- a/reactivex/operators/_takewhile.py +++ b/reactivex/operators/_takewhile.py @@ -1,121 +1,96 @@ -from typing import Any, Callable, Optional, TypeVar +from typing import Optional, TypeVar -from reactivex import Observable, abc +from reactivex import Observable, abc, operators +from reactivex.curry import curry_flip from reactivex.typing import Predicate, PredicateIndexed _T = TypeVar("_T") +@curry_flip(1) def take_while_( - predicate: Predicate[_T], inclusive: bool = False -) -> Callable[[Observable[_T]], Observable[_T]]: - def take_while(source: Observable[_T]) -> Observable[_T]: - """Returns elements from an observable sequence as long as a - specified condition is true. - - Example: - >>> take_while(source) - - Args: - source: The source observable to take from. - - Returns: - An observable sequence that contains the elements from the - input sequence that occur before the element at which the - test no longer passes. - """ - - def subscribe( - observer: abc.ObserverBase[_T], - scheduler: Optional[abc.SchedulerBase] = None, - ) -> abc.DisposableBase: - running = True - - def on_next(value: _T): - nonlocal running - - with source.lock: - if not running: - return - - try: - running = predicate(value) - except Exception as exn: - observer.on_error(exn) - return - - if running: + source: Observable[_T], predicate: Predicate[_T], inclusive: bool = False +) -> Observable[_T]: + """Returns elements from an observable sequence as long as a + specified condition is true. + + Example: + >>> take_while(source) + + Args: + source: The source observable to take from. + + Returns: + An observable sequence that contains the elements from the + input sequence that occur before the element at which the + test no longer passes. + """ + + def subscribe( + observer: abc.ObserverBase[_T], + scheduler: Optional[abc.SchedulerBase] = None, + ) -> abc.DisposableBase: + running = True + + def on_next(value: _T): + nonlocal running + + with source.lock: + if not running: + return + + try: + running = predicate(value) + except Exception as exn: + observer.on_error(exn) + return + + if running: + observer.on_next(value) + else: + if inclusive: observer.on_next(value) - else: - if inclusive: - observer.on_next(value) - observer.on_completed() + observer.on_completed() - return source.subscribe( - on_next, observer.on_error, observer.on_completed, scheduler=scheduler - ) + return source.subscribe( + on_next, observer.on_error, observer.on_completed, scheduler=scheduler + ) - return Observable(subscribe) - - return take_while + return Observable(subscribe) +@curry_flip(1) def take_while_indexed_( - predicate: PredicateIndexed[_T], inclusive: bool = False -) -> Callable[[Observable[_T]], Observable[_T]]: - def take_while_indexed(source: Observable[_T]) -> Observable[_T]: - """Returns elements from an observable sequence as long as a - specified condition is true. The element's index is used in the - logic of the predicate function. - - Example: - >>> take_while(source) - - Args: - source: Source observable to take from. - - Returns: - An observable sequence that contains the elements from the - input sequence that occur before the element at which the - test no longer passes. - """ - - def subscribe( - observer: abc.ObserverBase[_T], - scheduler: Optional[abc.SchedulerBase] = None, - ) -> abc.DisposableBase: - running = True - i = 0 - - def on_next(value: Any) -> None: - nonlocal running, i - - with source.lock: - if not running: - return - - try: - running = predicate(value, i) - except Exception as exn: - observer.on_error(exn) - return - else: - i += 1 - - if running: - observer.on_next(value) - else: - if inclusive: - observer.on_next(value) - observer.on_completed() - - return source.subscribe( - on_next, observer.on_error, observer.on_completed, scheduler=scheduler - ) - - return Observable(subscribe) - - return take_while_indexed + source: Observable[_T], predicate: PredicateIndexed[_T], inclusive: bool = False +) -> Observable[_T]: + """Returns elements from an observable sequence as long as a + specified condition is true. The element's index is used in the + logic of the predicate function. + + Example: + >>> take_while(source) + + Args: + source: Source observable to take from. + + Returns: + An observable sequence that contains the elements from the + input sequence that occur before the element at which the + test no longer passes. + """ + i = 0 + + def increment(_: _T): + nonlocal i + i += 1 + + def predicate_with_index(x: _T): + return predicate(x, i) + + return source.pipe( + take_while_(predicate_with_index, inclusive=inclusive), + operators.do_action(on_next=increment), + ) __all__ = ["take_while_", "take_while_indexed_"] diff --git a/reactivex/operators/_throttlefirst.py b/reactivex/operators/_throttlefirst.py index 58fec06a..65b5ce13 100644 --- a/reactivex/operators/_throttlefirst.py +++ b/reactivex/operators/_throttlefirst.py @@ -1,57 +1,58 @@ from datetime import datetime -from typing import Callable, Optional, TypeVar +from typing import Optional, TypeVar from reactivex import Observable, abc, typing +from reactivex.curry import curry_flip from reactivex.scheduler import TimeoutScheduler _T = TypeVar("_T") +@curry_flip(1) def throttle_first_( - window_duration: typing.RelativeTime, scheduler: Optional[abc.SchedulerBase] = None -) -> Callable[[Observable[_T]], Observable[_T]]: - def throttle_first(source: Observable[_T]) -> Observable[_T]: - """Returns an observable that emits only the first item emitted - by the source Observable during sequential time windows of a - specified duration. - - Args: - source: Source observable to throttle. - - Returns: - An Observable that performs the throttle operation. - """ - - def subscribe( - observer: abc.ObserverBase[_T], - scheduler_: Optional[abc.SchedulerBase] = None, - ) -> abc.DisposableBase: - _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton() - - duration = _scheduler.to_timedelta(window_duration or 0.0) - if duration <= _scheduler.to_timedelta(0): - raise ValueError("window_duration cannot be less or equal zero.") - last_on_next: Optional[datetime] = None - - def on_next(x: _T) -> None: - nonlocal last_on_next - emit = False - now = _scheduler.now - - with source.lock: - if not last_on_next or now - last_on_next >= duration: - last_on_next = now - emit = True - if emit: - observer.on_next(x) - - return source.subscribe( - on_next, observer.on_error, observer.on_completed, scheduler=_scheduler - ) - - return Observable(subscribe) - - return throttle_first + source: Observable[_T], + window_duration: typing.RelativeTime, + scheduler: Optional[abc.SchedulerBase] = None, +) -> Observable[_T]: + """Returns an observable that emits only the first item emitted + by the source Observable during sequential time windows of a + specified duration. + + Args: + source: Source observable to throttle. + + Returns: + An Observable that performs the throttle operation. + """ + + def subscribe( + observer: abc.ObserverBase[_T], + scheduler_: Optional[abc.SchedulerBase] = None, + ) -> abc.DisposableBase: + _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton() + + duration = _scheduler.to_timedelta(window_duration or 0.0) + if duration <= _scheduler.to_timedelta(0): + raise ValueError("window_duration cannot be less or equal zero.") + last_on_next: Optional[datetime] = None + + def on_next(x: _T) -> None: + nonlocal last_on_next + emit = False + now = _scheduler.now + + with source.lock: + if not last_on_next or now - last_on_next >= duration: + last_on_next = now + emit = True + if emit: + observer.on_next(x) + + return source.subscribe( + on_next, observer.on_error, observer.on_completed, scheduler=_scheduler + ) + + return Observable(subscribe) __all__ = ["throttle_first_"] diff --git a/reactivex/operators/_timestamp.py b/reactivex/operators/_timestamp.py index 2ae40607..cf3f6a54 100644 --- a/reactivex/operators/_timestamp.py +++ b/reactivex/operators/_timestamp.py @@ -1,8 +1,9 @@ from dataclasses import dataclass from datetime import datetime -from typing import Callable, Generic, Optional, TypeVar +from typing import Generic, Optional, TypeVar from reactivex import Observable, abc, defer, operators +from reactivex.curry import curry_flip from reactivex.scheduler import TimeoutScheduler _T = TypeVar("_T") @@ -14,36 +15,37 @@ class Timestamp(Generic[_T]): timestamp: datetime +@curry_flip(1) def timestamp_( + source: Observable[_T], scheduler: Optional[abc.SchedulerBase] = None, -) -> Callable[[Observable[_T]], Observable[Timestamp[_T]]]: - def timestamp(source: Observable[_T]) -> Observable[Timestamp[_T]]: - """Records the timestamp for each value in an observable sequence. +) -> Observable[Timestamp[_T]]: + """Records the timestamp for each value in an observable sequence. - Examples: - >>> timestamp(source) + Examples: + >>> timestamp(source) - Produces objects with attributes `value` and `timestamp`, where - value is the original value. + Produces objects with attributes `value` and `timestamp`, where + value is the original value. - Args: - source: Observable source to timestamp. + Args: + source: Observable source to timestamp. - Returns: - An observable sequence with timestamp information on values. - """ + Returns: + An observable sequence with timestamp information on values. + Each emitted item is a Timestamp object with `.value` and + `.timestamp` attributes + """ - def factory(scheduler_: Optional[abc.SchedulerBase] = None): - _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton() + def factory(scheduler_: Optional[abc.SchedulerBase] = None): + _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton() - def mapper(value: _T) -> Timestamp[_T]: - return Timestamp(value=value, timestamp=_scheduler.now) + def mapper(value: _T) -> Timestamp[_T]: + return Timestamp(value=value, timestamp=_scheduler.now) - return source.pipe(operators.map(mapper)) + return source.pipe(operators.map(mapper)) - return defer(factory) - - return timestamp + return defer(factory) __all__ = ["timestamp_"] diff --git a/reactivex/operators/_todict.py b/reactivex/operators/_todict.py index 74971978..a40f04e3 100644 --- a/reactivex/operators/_todict.py +++ b/reactivex/operators/_todict.py @@ -1,6 +1,7 @@ -from typing import Callable, Dict, Optional, TypeVar, cast +from typing import Dict, Optional, TypeVar, cast from reactivex import Observable, abc +from reactivex.curry import curry_flip from reactivex.typing import Mapper _T = TypeVar("_T") @@ -8,57 +9,57 @@ _TValue = TypeVar("_TValue") +@curry_flip(1) def to_dict_( - key_mapper: Mapper[_T, _TKey], element_mapper: Optional[Mapper[_T, _TValue]] = None -) -> Callable[[Observable[_T]], Observable[Dict[_TKey, _TValue]]]: - def to_dict(source: Observable[_T]) -> Observable[Dict[_TKey, _TValue]]: - """Converts the observable sequence to a Map if it exists. - - Args: - source: Source observable to convert. - - Returns: - An observable sequence with a single value of a dictionary - containing the values from the observable sequence. - """ - - def subscribe( - observer: abc.ObserverBase[Dict[_TKey, _TValue]], - scheduler: Optional[abc.SchedulerBase] = None, - ) -> abc.DisposableBase: - m: Dict[_TKey, _TValue] = dict() - - def on_next(x: _T) -> None: + source: Observable[_T], + key_mapper: Mapper[_T, _TKey], + element_mapper: Optional[Mapper[_T, _TValue]] = None, +) -> Observable[Dict[_TKey, _TValue]]: + """Converts the observable sequence to a Map if it exists. + + Args: + source: Source observable to convert. + + Returns: + An observable sequence with a single value of a dictionary + containing the values from the observable sequence. + """ + + def subscribe( + observer: abc.ObserverBase[Dict[_TKey, _TValue]], + scheduler: Optional[abc.SchedulerBase] = None, + ) -> abc.DisposableBase: + m: Dict[_TKey, _TValue] = dict() + + def on_next(x: _T) -> None: + try: + key = key_mapper(x) + except Exception as ex: # pylint: disable=broad-except + observer.on_error(ex) + return + + if element_mapper: try: - key = key_mapper(x) + element = element_mapper(x) except Exception as ex: # pylint: disable=broad-except observer.on_error(ex) return + else: + element = cast(_TValue, x) - if element_mapper: - try: - element = element_mapper(x) - except Exception as ex: # pylint: disable=broad-except - observer.on_error(ex) - return - else: - element = cast(_TValue, x) - - m[key] = cast(_TValue, element) - - def on_completed() -> None: - nonlocal m - observer.on_next(m) - m = dict() - observer.on_completed() + m[key] = cast(_TValue, element) - return source.subscribe( - on_next, observer.on_error, on_completed, scheduler=scheduler - ) + def on_completed() -> None: + nonlocal m + observer.on_next(m) + m = dict() + observer.on_completed() - return Observable(subscribe) + return source.subscribe( + on_next, observer.on_error, on_completed, scheduler=scheduler + ) - return to_dict + return Observable(subscribe) __all__ = ["to_dict_"] diff --git a/tests/test_observable/test_all.py b/tests/test_observable/test_all.py index 451b63a2..799fba85 100644 --- a/tests/test_observable/test_all.py +++ b/tests/test_observable/test_all.py @@ -2,6 +2,7 @@ from reactivex import operators as _ from reactivex.testing import ReactiveTest, TestScheduler +from reactivex.testing.subscription import Subscription on_next = ReactiveTest.on_next on_completed = ReactiveTest.on_completed @@ -24,6 +25,17 @@ def create(): res = scheduler.start(create=create).messages assert res == [on_next(250, True), on_completed(250)] + def test_all_no_emit(self): + """Should emit true if no item is emitted""" + scheduler = TestScheduler() + xs = scheduler.create_hot_observable(on_completed(250)) + + def create(): + return xs.pipe(_.all(lambda x: x > 0)) + + res = scheduler.start(create=create).messages + assert res == [on_next(250, True), on_completed(250)] + def test_all_return(self): scheduler = TestScheduler() msgs = [on_next(150, 1), on_next(210, 2), on_completed(250)] @@ -79,8 +91,12 @@ def create(): res = scheduler.start(create=create).messages assert res == [on_next(210, False), on_completed(210)] + assert xs.subscriptions == [Subscription(200, 210)] def test_all_some_all_match(self): + """Should emit true and complete after the source completes if all + items pass the predicate test + """ scheduler = TestScheduler() msgs = [ on_next(150, 1), @@ -119,6 +135,7 @@ def create(): res = scheduler.start(create=create).messages assert res == [] + assert xs.subscriptions == [Subscription(200, 1000)] if __name__ == "__main__": diff --git a/tests/test_observable/test_average.py b/tests/test_observable/test_average.py index ebe75cbf..4bba22aa 100644 --- a/tests/test_observable/test_average.py +++ b/tests/test_observable/test_average.py @@ -1,6 +1,7 @@ import unittest from reactivex import operators as _ +from reactivex.internal.exceptions import SequenceContainsNoElementsError from reactivex.testing import ReactiveTest, TestScheduler on_next = ReactiveTest.on_next @@ -19,9 +20,7 @@ def test_average_int32_empty(self): xs = scheduler.create_hot_observable(msgs) res = scheduler.start(create=lambda: xs.pipe(_.average())).messages - assert len(res) == 1 - assert res[0].value.kind == "E" and res[0].value.exception != None - assert res[0].time == 250 + assert res == [on_error(250, SequenceContainsNoElementsError())] def test_average_int32_return(self): scheduler = TestScheduler() diff --git a/tests/test_observable/test_fromfuture.py b/tests/test_observable/test_fromfuture.py index 7f85d80b..ecdc94cd 100644 --- a/tests/test_observable/test_fromfuture.py +++ b/tests/test_observable/test_fromfuture.py @@ -1,6 +1,7 @@ import asyncio import unittest from asyncio import Future +from typing import Any import reactivex @@ -11,15 +12,15 @@ def test_future_success(self): success = [False, True, False] async def go(): - future = Future() + future: Future[int] = Future() future.set_result(42) source = reactivex.from_future(future) - def on_next(x): + def on_next(x: int): success[0] = x == 42 - def on_error(err): + def on_error(_err: Exception): success[1] = False def on_completed(): @@ -37,15 +38,15 @@ def test_future_failure(self): async def go(): error = Exception("woops") - future = Future() + future: Future[Any] = Future() future.set_exception(error) source = reactivex.from_future(future) - def on_next(x): + def on_next(x: Any): success[0] = False - def on_error(err): + def on_error(err: Exception): success[1] = str(err) == str(error) def on_completed(): @@ -61,13 +62,13 @@ def test_future_cancel(self): success = [True, False, True] async def go(): - future = Future() + future: Future[Any] = Future() source = reactivex.from_future(future) - def on_next(x): + def on_next(x: Any): success[0] = False - def on_error(err): + def on_error(err: Any): success[1] = type(err) == asyncio.CancelledError def on_completed(): @@ -84,15 +85,15 @@ def test_future_dispose(self): success = [True, True, True] async def go(): - future = Future() + future: Future[int] = Future() future.set_result(42) source = reactivex.from_future(future) - def on_next(x): + def on_next(x: int): success[0] = False - def on_error(err): + def on_error(err: Exception): success[1] = False def on_completed(): diff --git a/tests/test_observable/test_single.py b/tests/test_observable/test_single.py index 7c39b197..9718453d 100644 --- a/tests/test_observable/test_single.py +++ b/tests/test_observable/test_single.py @@ -22,6 +22,18 @@ def _raise(ex): class TestSingle(unittest.TestCase): + def test_single_source_never_completes(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable(on_next(150, 1), on_next(250, 2)) + + def create(): + return xs.pipe(ops.single()) + + res = scheduler.start(create=create) + + assert [] == res.messages + assert xs.subscriptions == [subscribe(200, 1000)] + def test_single_async_empty(self): scheduler = TestScheduler() xs = scheduler.create_hot_observable(on_next(150, 1), on_completed(250)) @@ -52,9 +64,14 @@ def create(): assert xs.subscriptions == [subscribe(200, 250)] def test_single_async_many(self): + """Should error as soon as a second "valid" element is encountered.""" scheduler = TestScheduler() xs = scheduler.create_hot_observable( - on_next(150, 1), on_next(210, 2), on_next(220, 3), on_completed(250) + on_next(150, 1), + on_next(210, 2), + on_next(220, 3), + on_next(230, 3), + on_completed(250), ) def create(): diff --git a/tests/test_observable/test_takewhile.py b/tests/test_observable/test_takewhile.py index 268bd5b8..835fa9a4 100644 --- a/tests/test_observable/test_takewhile.py +++ b/tests/test_observable/test_takewhile.py @@ -33,7 +33,7 @@ def test_take_while_complete_Before(self): invoked = 0 def factory(): - def predicate(x): + def predicate(x: int): nonlocal invoked invoked += 1 @@ -72,7 +72,7 @@ def test_take_while_complete_after(self): invoked = 0 def factory(): - def predicate(x): + def predicate(x: int): nonlocal invoked invoked += 1 @@ -113,7 +113,7 @@ def test_take_while_error_before(self): invoked = 0 def factory(): - def predicate(x): + def predicate(x: int): nonlocal invoked invoked += 1 @@ -146,7 +146,7 @@ def test_take_while_error_after(self): invoked = 0 def factory(): - def predicate(x): + def predicate(x: int): nonlocal invoked invoked += 1 @@ -186,7 +186,7 @@ def test_take_while_dispose_before(self): invoked = 0 def create(): - def predicate(x): + def predicate(x: int): nonlocal invoked invoked += 1 @@ -218,7 +218,7 @@ def test_take_while_dispose_after(self): invoked = 0 def create(): - def predicate(x): + def predicate(x: int): nonlocal invoked invoked += 1 @@ -258,7 +258,7 @@ def test_take_while_zero(self): invoked = 0 def create(): - def predicate(x): + def predicate(x: int): nonlocal invoked invoked += 1 @@ -291,7 +291,7 @@ def test_take_while_on_error(self): invoked = 0 def factory(): - def predicate(x): + def predicate(x: int): nonlocal invoked invoked += 1 @@ -374,6 +374,35 @@ def factory(): ] assert xs.subscriptions == [subscribe(200, 350)] + def test_take_while_index_use_index_and_value(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + on_next(90, -1), + on_next(110, -1), + on_next(205, 100), + on_next(210, 2), + on_next(260, 5), + on_next(290, 13), + on_next(320, 3), + on_next(350, 7), + on_next(390, 4), + on_completed(600), + ) + + def create(): + return xs.pipe(ops.take_while_indexed(lambda x, i: x > i)) + + results = scheduler.start(create) + + assert results.messages == [ + on_next(205, 100), + on_next(210, 2), + on_next(260, 5), + on_next(290, 13), + on_completed(320), + ] + assert xs.subscriptions == [subscribe(200, 320)] + def test_take_while_index_inclusive_true(self): scheduler = TestScheduler() xs = scheduler.create_hot_observable( @@ -426,7 +455,7 @@ def test_take_while_complete_after_inclusive_true(self): invoked = 0 def factory(): - def predicate(x): + def predicate(x: int): nonlocal invoked invoked += 1 diff --git a/tests/test_observable/test_throttlefirst.py b/tests/test_observable/test_throttlefirst.py index 25f65d6a..dd063594 100644 --- a/tests/test_observable/test_throttlefirst.py +++ b/tests/test_observable/test_throttlefirst.py @@ -16,11 +16,6 @@ class RxException(Exception): pass -# Helper function for raising exceptions within lambdas -def _raise(ex): - raise RxException(ex) - - class TestThrottleFirst(unittest.TestCase): def test_throttle_first_completed(self): scheduler = TestScheduler() diff --git a/tests/test_observable/test_timestamp.py b/tests/test_observable/test_timestamp.py index 87fff722..67ea0895 100644 --- a/tests/test_observable/test_timestamp.py +++ b/tests/test_observable/test_timestamp.py @@ -1,8 +1,10 @@ import unittest from datetime import datetime +from typing import Any, Union import reactivex from reactivex import operators as ops +from reactivex.operators._timestamp import Timestamp as OriginalTimestamp from reactivex.testing import ReactiveTest, TestScheduler on_next = ReactiveTest.on_next @@ -15,11 +17,11 @@ class Timestamp(object): - def __init__(self, value, timestamp): + def __init__(self, value: Any, timestamp: Union[datetime, int]): if isinstance(timestamp, datetime): - timestamp = timestamp - datetime.utcfromtimestamp(0) + time_delta = timestamp - datetime.utcfromtimestamp(0) timestamp = int( - timestamp.seconds + time_delta.seconds ) # FIXME: Must fix when tests run at fraction of seconds. self.value = value @@ -28,7 +30,7 @@ def __init__(self, value, timestamp): def __str__(self): return "%s@%s" % (self.value, self.timestamp) - def equals(self, other): + def equals(self, other: "Timestamp"): return other.timestamp == self.timestamp and other.value == self.value @@ -46,7 +48,7 @@ def test_timestamp_regular(self): ) def create(): - def mapper(x): + def mapper(x: OriginalTimestamp[int]): return Timestamp(x.value, x.timestamp) return xs.pipe( @@ -64,6 +66,8 @@ def mapper(x): on_completed(400), ] + assert xs.subscriptions == [subscribe(200, 400)] + def test_timestamp_empty(self): scheduler = TestScheduler() @@ -74,6 +78,7 @@ def create(): assert results.messages == [on_completed(200)] def test_timestamp_error(self): + """Should not timestamp errors""" ex = "ex" scheduler = TestScheduler() diff --git a/tests/test_observable/test_todict.py b/tests/test_observable/test_todict.py index 13e79abc..6b715ef5 100644 --- a/tests/test_observable/test_todict.py +++ b/tests/test_observable/test_todict.py @@ -74,7 +74,7 @@ def test_to_dict_keymapperthrows(self): ) def create(): - def key_mapper(x): + def key_mapper(x: int): if x < 4: return x * 2 else: @@ -102,7 +102,7 @@ def test_to_dict_elementmapperthrows(self): on_completed(600), ) - def value_mapper(x): + def value_mapper(x: int): if x < 4: return x * 4 else: @@ -136,3 +136,29 @@ def create(): assert res.messages == [] assert xs.subscriptions == [subscribe(200, 1000)] + + def test_to_dict_no_element_mapper(self): + scheduler = TestScheduler() + + xs = scheduler.create_hot_observable( + on_next(110, 1), + on_next(220, 2), + on_next(330, 3), + on_next(440, 4), + on_next(550, 5), + on_completed(660), + ) + + def key_mapper(x: int): + return x * 2 + + def create(): + return xs.pipe(ops.to_dict(key_mapper)) + + res = scheduler.start(create) + assert res.messages == [ + on_next(660, {4: 2, 6: 3, 8: 4, 10: 5}), + on_completed(660), + ] + + assert xs.subscriptions == [subscribe(200, 660)]