From 786ef8e62d8789b6dd9775f7258c2dd040923b60 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 9 Jun 2022 21:44:43 +0000 Subject: [PATCH 1/6] Replicate Channel.synchronous filo bug --- .../scala/fs2/concurrent/ChannelSuite.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index f571d043b5..229a95f229 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -24,6 +24,7 @@ package concurrent import cats.syntax.all._ import cats.effect.IO +import cats.effect.testkit.TestControl import scala.concurrent.duration._ import org.scalacheck.effect.PropF.forAllF @@ -132,4 +133,22 @@ class ChannelSuite extends Fs2Suite { p.assertEquals(true) } + test("Channel.synchronous respects fifo") { + val l = for { + chan <- Channel.synchronous[IO, Int] + _ <- chan.send(0).start + _ <- (0 until 5).toList.traverse_ { i => + val f = for { + _ <- IO.sleep(i.second) + _ <- chan.send(i) + _ <- if (i == 4) chan.close.void else IO.unit + } yield () + f.start + } + result <- IO.sleep(5.seconds) *> chan.stream.compile.toList.flatTap(IO.println) + } yield result + + TestControl.executeEmbed(l).assertEquals((0 until 5).toList) + } + } From c5de7226af18c50ef84964b09ce0a5c756a0ce6e Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 9 Jun 2022 22:13:39 +0000 Subject: [PATCH 2/6] Fix replication --- core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index 229a95f229..c08ddd9a8b 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -136,7 +136,6 @@ class ChannelSuite extends Fs2Suite { test("Channel.synchronous respects fifo") { val l = for { chan <- Channel.synchronous[IO, Int] - _ <- chan.send(0).start _ <- (0 until 5).toList.traverse_ { i => val f = for { _ <- IO.sleep(i.second) From 857961d13147b1be122d5390239c08863a26fb72 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 9 Jun 2022 22:15:04 +0000 Subject: [PATCH 3/6] Remove println --- core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index c08ddd9a8b..450c402126 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -144,7 +144,7 @@ class ChannelSuite extends Fs2Suite { } yield () f.start } - result <- IO.sleep(5.seconds) *> chan.stream.compile.toList.flatTap(IO.println) + result <- IO.sleep(5.seconds) *> chan.stream.compile.toList } yield result TestControl.executeEmbed(l).assertEquals((0 until 5).toList) From 496bf02a221424a5e5223a2487a6b12ba4c28bb1 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 9 Jun 2022 22:42:40 +0000 Subject: [PATCH 4/6] Respect fifo in Channel.synchronous impl --- .../src/main/scala/fs2/concurrent/Channel.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 0bfe03ae87..5c0b9cf47d 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -221,16 +221,16 @@ object Channel { case s @ State(values, stateSize, ignorePreviousWaiting @ _, producers, closed) => if (shouldEmit(s)) { var size = stateSize - var allValues = values + val moreValues = List.newBuilder[A] var unblock = F.unit producers.foreach { case (value, producer) => size += 1 - allValues = value :: allValues + moreValues += value unblock = unblock <* producer.complete(()) } - val toEmit = makeChunk(allValues, size) + val toEmit = makeChunk(values, moreValues.result(), size) unblock.as(Pull.output(toEmit) >> consumeLoop) } else { @@ -258,11 +258,12 @@ object Channel { @inline private def shouldEmit(s: State) = s.values.nonEmpty || s.producers.nonEmpty - private def makeChunk(allValues: List[A], size: Int): Chunk[A] = { + private def makeChunk(init: List[A], tail: List[A], size: Int): Chunk[A] = { val arr = new Array[Any](size) var i = size - 1 - var values = allValues + var values = tail while (i >= 0) { + if (values.isEmpty) values = init arr(i) = values.head values = values.tail i -= 1 From 32c2b33426d5ae90d7853a7f9e4925996e474ad0 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 9 Jun 2022 22:47:06 +0000 Subject: [PATCH 5/6] Clarify variable names --- core/shared/src/main/scala/fs2/concurrent/Channel.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 5c0b9cf47d..979c23eafb 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -218,19 +218,19 @@ object Channel { else (state.copy(waiting = waiting.some), state) } .flatMap { - case s @ State(values, stateSize, ignorePreviousWaiting @ _, producers, closed) => + case s @ State(initValues, stateSize, ignorePreviousWaiting @ _, producers, closed) => if (shouldEmit(s)) { var size = stateSize - val moreValues = List.newBuilder[A] + val tailValues = List.newBuilder[A] var unblock = F.unit producers.foreach { case (value, producer) => size += 1 - moreValues += value + tailValues += value unblock = unblock <* producer.complete(()) } - val toEmit = makeChunk(values, moreValues.result(), size) + val toEmit = makeChunk(initValues, tailValues.result(), size) unblock.as(Pull.output(toEmit) >> consumeLoop) } else { From d7a0d85cca30cb31c38113cfd2eeba62179bcb88 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 9 Jun 2022 22:47:19 +0000 Subject: [PATCH 6/6] Formatting --- core/shared/src/main/scala/fs2/concurrent/Channel.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 979c23eafb..86bda25773 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -218,7 +218,13 @@ object Channel { else (state.copy(waiting = waiting.some), state) } .flatMap { - case s @ State(initValues, stateSize, ignorePreviousWaiting @ _, producers, closed) => + case s @ State( + initValues, + stateSize, + ignorePreviousWaiting @ _, + producers, + closed + ) => if (shouldEmit(s)) { var size = stateSize val tailValues = List.newBuilder[A]