Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Backpressure.md #6685

Merged
merged 1 commit into from
Nov 1, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions docs/Backpressure.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Cold Observables are ideal for the reactive pull model of backpressure described

Your first line of defense against the problems of over-producing Observables is to use some of the ordinary set of Observable operators to reduce the number of emitted items to a more manageable number. The examples in this section will show how you might use such operators to handle a bursty Observable like the one illustrated in the following marble diagram:

<img src="/ReactiveX/RxJava/wiki/images/rx-operators/bp.bursty.png" width="640" height="35" />​
<img src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/bp.bursty.png" width="640" height="35" />​

By fine-tuning the parameters to these operators you can ensure that a slow-consuming observer is not overwhelmed by a fast-producing Observable.

Expand All @@ -33,23 +33,23 @@ The following diagrams show how you could use each of these operators on the bur
### sample (or throttleLast)
The `sample` operator periodically "dips" into the sequence and emits only the most recently emitted item during each dip:

<img src="/ReactiveX/RxJava/wiki/images/rx-operators/bp.sample.png" width="640" height="260" />​
<img src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/bp.sample.png" width="640" height="260" />​
````groovy
Observable<Integer> burstySampled = bursty.sample(500, TimeUnit.MILLISECONDS);
````

### throttleFirst
The `throttleFirst` operator is similar, but emits not the most recently emitted item, but the first item that was emitted after the previous "dip":

<img src="/ReactiveX/RxJava/wiki/images/rx-operators/bp.throttleFirst.png" width="640" height="260" />​
<img src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/bp.throttleFirst.png" width="640" height="260" />​
````groovy
Observable<Integer> burstyThrottled = bursty.throttleFirst(500, TimeUnit.MILLISECONDS);
````

### debounce (or throttleWithTimeout)
The `debounce` operator emits only those items from the source Observable that are not followed by another item within a specified duration:

<img src="/ReactiveX/RxJava/wiki/images/rx-operators/bp.debounce.png" width="640" height="240" />​
<img src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/bp.debounce.png" width="640" height="240" />​
````groovy
Observable<Integer> burstyDebounced = bursty.debounce(10, TimeUnit.MILLISECONDS);
````
Expand All @@ -64,14 +64,14 @@ The following diagrams show how you could use each of these operators on the bur

You could, for example, close and emit a buffer of items from the bursty Observable periodically, at a regular interval of time:

<img src="/ReactiveX/RxJava/wiki/images/rx-operators/bp.buffer2.png" width="640" height="270" />​
<img src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/bp.buffer2.png" width="640" height="270" />​
````groovy
Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
````

Or you could get fancy, and collect items in buffers during the bursty periods and emit them at the end of each burst, by using the `debounce` operator to emit a buffer closing indicator to the `buffer` operator:

<img src="/ReactiveX/RxJava/wiki/images/rx-operators/bp.buffer1.png" width="640" height="500" />​
<img src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/bp.buffer1.png" width="640" height="500" />​
````groovy
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Expand All @@ -86,14 +86,14 @@ Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounce

`window` is similar to `buffer`. One variant of `window` allows you to periodically emit Observable windows of items at a regular interval of time:

<img src="/ReactiveX/RxJava/wiki/images/rx-operators/bp.window1.png" width="640" height="325" />​
<img src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/bp.window1.png" width="640" height="325" />​
````groovy
Observable<Observable<Integer>> burstyWindowed = bursty.window(500, TimeUnit.MILLISECONDS);
````

You could also choose to emit a new window each time you have collected a particular number of items from the source Observable:

<img src="/ReactiveX/RxJava/wiki/images/rx-operators/bp.window2.png" width="640" height="325" />​
<img src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/bp.window2.png" width="640" height="325" />​
````groovy
Observable<Observable<Integer>> burstyWindowed = bursty.window(5);
````
Expand Down Expand Up @@ -158,11 +158,11 @@ For this to work, though, Observables _A_ and _B_ must respond correctly to the

<dl>
<dt><tt>onBackpressureBuffer</tt></dt>
<dd>maintains a buffer of all emissions from the source Observable and emits them to downstream Subscribers according to the <tt>request</tt>s they generate<br /><img src="/ReactiveX/RxJava/wiki/images/rx-operators/bp.obp.buffer.png" width="640" height="300" /><br />an experimental version of this operator (not available in RxJava 1.0) allows you to set the capacity of the buffer; applying this operator will cause the resulting Observable to terminate with an error if this buffer is overrun​</dd>
<dd>maintains a buffer of all emissions from the source Observable and emits them to downstream Subscribers according to the <tt>request</tt>s they generate<br /><img src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/bp.obp.buffer.png" width="640" height="300" /><br />an experimental version of this operator (not available in RxJava 1.0) allows you to set the capacity of the buffer; applying this operator will cause the resulting Observable to terminate with an error if this buffer is overrun​</dd>
<dt><tt>onBackpressureDrop</tt></dt>
<dd>drops emissions from the source Observable unless there is a pending <tt>request</tt> from a downstream Subscriber, in which case it will emit enough items to fulfill the request<br /><img src="/ReactiveX/RxJava/wiki/images/rx-operators/bp.obp.drop.png" width="640" height="245" />​</dd>
<dd>drops emissions from the source Observable unless there is a pending <tt>request</tt> from a downstream Subscriber, in which case it will emit enough items to fulfill the request<br /><img src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/bp.obp.drop.png" width="640" height="245" />​</dd>
<dt><tt>onBackpressureBlock</tt> <em style="color: #f00;">(experimental, not in RxJava 1.0)</em></dt>
<dd>blocks the thread on which the source Observable is operating until such time as a Subscriber issues a <tt>request</tt> for items, and then unblocks the thread only so long as there are pending requests<br /><img src="/ReactiveX/RxJava/wiki/images/rx-operators/bp.obp.block.png" width="640" height="245" /></dd>
<dd>blocks the thread on which the source Observable is operating until such time as a Subscriber issues a <tt>request</tt> for items, and then unblocks the thread only so long as there are pending requests<br /><img src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/bp.obp.block.png" width="640" height="245" /></dd>
</dl>

If you do not apply any of these operators to an Observable that does not support backpressure, _and_ if either you as the Subscriber or some operator between you and the Observable attempts to apply reactive pull backpressure, you will encounter a `MissingBackpressureException` which you will be notified of via your `onError()` callback.
Expand All @@ -172,4 +172,4 @@ If you do not apply any of these operators to an Observable that does not suppor
If the standard operators are providing the expected behavior, [one can write custom operators in RxJava](https://github.com/ReactiveX/RxJava/wiki/Implementing-custom-operators-(draft)).

# See also
* [RxJava 0.20.0-RC1 release notes](https://github.com/ReactiveX/RxJava/releases/tag/0.20.0-RC1)
* [RxJava 0.20.0-RC1 release notes](https://github.com/ReactiveX/RxJava/releases/tag/0.20.0-RC1)