Skip to content

Commit 0fb4739

Browse files
committed
Fix losing elements with concurrent cancellations (pushN)
1 parent 32d36c7 commit 0fb4739

File tree

1 file changed

+24
-6
lines changed

1 file changed

+24
-6
lines changed

std/shared/src/main/scala/cats/effect/std/Stack.scala

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,19 +145,26 @@ object Stack {
145145
this.copy(elements = element :: this.elements) -> F.pure(true)
146146
}
147147

148-
def pushN(elements: Seq[A])(implicit F: Concurrent[F]): ModifyResult[F[Unit]] =
148+
def pushN(elements: Seq[A])(implicit F: Concurrent[F]): ModifyResult[F[Seq[A]]] =
149149
if (this.waiters.isEmpty)
150150
// If there are no waiters we just push all the elements in reverse order.
151-
this.copy(elements = this.elements.prependedAll(elements.reverseIterator)) -> F.unit
151+
this.copy(
152+
elements = this.elements.prependedAll(elements.reverseIterator)
153+
) -> F.pure(Seq.empty)
152154
else {
153155
// Otherwise, if there is at least one waiter, we take all we can.
154156
val (remaining, waitersToNotify) =
155157
elements.reverse.align(this.waiters).partitionMap(_.unwrap)
156158

157-
// We notify all the waiters we could take.
158-
val notifyWaiters = waitersToNotify.traverse_ {
159+
// We try to notify all the waiters we could take.
160+
val notifyWaiters = waitersToNotify.traverseFilter {
159161
case (element, waiter) =>
160-
waiter.complete(element).void
162+
waiter.complete(element).map {
163+
// If the waiter was successfully awaken, we remove the element from the Stack.
164+
case true => None
165+
// Otherwise, we preserve the element for retrying the push.
166+
case false => Some(element)
167+
}
161168
}
162169

163170
// The remaining elements are either all elements, or all waiters.
@@ -226,7 +233,18 @@ object Stack {
226233
}
227234

228235
override def pushN(elements: A*): F[Unit] =
229-
F.uncancelable(_ => state.flatModify(_.pushN(elements)))
236+
F.uncancelable { _ =>
237+
// If elements is empty, do nothing.
238+
if (elements.isEmpty) F.unit
239+
// Optimize for the singleton case.
240+
else if (elements.sizeIs == 1) this.push(elements.head)
241+
else
242+
// Otherwise try to push all the elements at once.
243+
state.flatModify(_.pushN(elements)).flatMap { failedElements =>
244+
// For the elements we failed to push, we retry.
245+
this.pushN(failedElements: _*)
246+
}
247+
}
230248

231249
override final val pop: F[A] =
232250
F.uncancelable { poll =>

0 commit comments

Comments
 (0)