Skip to content

Commit

Permalink
Merge pull request #197 from zsxwing/rxjava-1.1.6
Browse files Browse the repository at this point in the history
Bump to RxJava 1.1.6
  • Loading branch information
zsxwing authored Jun 17, 2016
2 parents bee74d0 + 0b8e56d commit 0e16542
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 1 deletion.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ crossScalaVersions in ThisBuild := Seq("2.10.6", "2.11.8", "2.12.0-M4")
parallelExecution in Test := false

libraryDependencies ++= Seq(
"io.reactivex" % "rxjava" % "1.1.5",
"io.reactivex" % "rxjava" % "1.1.6",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"junit" % "junit" % "4.11" % "test",
"org.scalatest" %% "scalatest" % "2.2.6" % "test")
Expand Down
19 changes: 19 additions & 0 deletions examples/src/test/scala/examples/RxScalaDemo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1896,4 +1896,23 @@ class RxScalaDemo extends JUnitSuite {
println(s"onTerminateDetach: isUnsubscribed=${s.isUnsubscribed}, weakReference=${weakReference.get}")
}
}

@Test def rebatchRequestsExample(): Unit = {
val o = (1 to 100).toObservable
o.doOnRequest(r => println(s"Requesting $r via rebatchRequests"))
.rebatchRequests(10)
.doOnRequest(r => println(s"Requesting $r via Subscriber"))
.subscribe(new Subscriber[Int]() {
override def onStart(): Unit = request(1)

override def onNext(value: Int): Unit = {
println(s"Receive $value")
request(1)
}

override def onError(error: Throwable): Unit = error.printStackTrace()

override def onCompleted(): Unit = println("Done")
})
}
}
20 changes: 20 additions & 0 deletions src/main/scala/rx/lang/scala/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,26 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.unsubscribeOn(scheduler))
}

/**
* $experimental Returns an [[Observable]] that requests `n` initially from the upstream and then 75% of `n` subsequently after 75% of `n` values have
* been emitted to the downstream.
*
* This operator allows preventing the downstream to trigger unbounded mode via `request(Long.MaxValue)` or compensate for the per-item
* overhead of small and frequent requests.
*
* ===Backpressure:===
* The operator expects backpressure from upstream and honors backpressure from downstream.</dd>
*
* $noDefaultScheduler
*
* @param n the initial request amount, further request will happen after 75% of this value
* @return the [[Observable]] that rebatches request amounts from downstream
*/
@Experimental
def rebatchRequests(n: Int): Observable[T] = {
toScalaObservable[T](asJavaObservable.rebatchRequests(n))
}

/**
* Asynchronously notify [[rx.lang.scala.Observer]]s on the specified [[rx.lang.scala.Scheduler]].
*
Expand Down

0 comments on commit 0e16542

Please sign in to comment.