Skip to content

Commit

Permalink
Merge pull request #2922 from armanbilge/fix/synchronous-channel-fifo
Browse files Browse the repository at this point in the history
Respect FIFO for bounded Channel
  • Loading branch information
mpilquist authored Jun 10, 2022
2 parents aa30a5e + d7a0d85 commit 12dfdbf
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 6 deletions.
19 changes: 13 additions & 6 deletions core/shared/src/main/scala/fs2/concurrent/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -218,19 +218,25 @@ object Channel {
else (state.copy(waiting = waiting.some), state)
}
.flatMap {
case s @ State(values, stateSize, ignorePreviousWaiting @ _, producers, closed) =>
case s @ State(
initValues,
stateSize,
ignorePreviousWaiting @ _,
producers,
closed
) =>
if (shouldEmit(s)) {
var size = stateSize
var allValues = values
val tailValues = List.newBuilder[A]
var unblock = F.unit

producers.foreach { case (value, producer) =>
size += 1
allValues = value :: allValues
tailValues += value
unblock = unblock <* producer.complete(())
}

val toEmit = makeChunk(allValues, size)
val toEmit = makeChunk(initValues, tailValues.result(), size)

unblock.as(Pull.output(toEmit) >> consumeLoop)
} else {
Expand Down Expand Up @@ -258,11 +264,12 @@ object Channel {

@inline private def shouldEmit(s: State) = s.values.nonEmpty || s.producers.nonEmpty

private def makeChunk(allValues: List[A], size: Int): Chunk[A] = {
private def makeChunk(init: List[A], tail: List[A], size: Int): Chunk[A] = {
val arr = new Array[Any](size)
var i = size - 1
var values = allValues
var values = tail
while (i >= 0) {
if (values.isEmpty) values = init
arr(i) = values.head
values = values.tail
i -= 1
Expand Down
18 changes: 18 additions & 0 deletions core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package concurrent

import cats.syntax.all._
import cats.effect.IO
import cats.effect.testkit.TestControl
import scala.concurrent.duration._

import org.scalacheck.effect.PropF.forAllF
Expand Down Expand Up @@ -132,4 +133,21 @@ class ChannelSuite extends Fs2Suite {
p.assertEquals(true)
}

test("Channel.synchronous respects fifo") {
val l = for {
chan <- Channel.synchronous[IO, Int]
_ <- (0 until 5).toList.traverse_ { i =>
val f = for {
_ <- IO.sleep(i.second)
_ <- chan.send(i)
_ <- if (i == 4) chan.close.void else IO.unit
} yield ()
f.start
}
result <- IO.sleep(5.seconds) *> chan.stream.compile.toList
} yield result

TestControl.executeEmbed(l).assertEquals((0 until 5).toList)
}

}

0 comments on commit 12dfdbf

Please sign in to comment.