-
Notifications
You must be signed in to change notification settings - Fork 536
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Stack
#4112
Open
BalmungSan
wants to merge
8
commits into
typelevel:series/3.x
Choose a base branch
from
BalmungSan:add-stack
base: series/3.x
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Add Stack
#4112
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
619dcc0
Add Stack interface
BalmungSan 712e8e4
Add Stack docs
BalmungSan e7b3a83
Add Stack tests
BalmungSan 022511a
Add Stack implementation
BalmungSan 91ade26
Fix losing elements with concurrent cancellations (push & pop)
BalmungSan 32d36c7
Add 'Stack should not lost elements when concurrently canceling multi…
BalmungSan 0fb4739
Fix losing elements with concurrent cancellations (pushN)
BalmungSan aad9656
Fix pushN retry elements order.
BalmungSan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
--- | ||
id: stack | ||
title: Stack | ||
--- | ||
|
||
A `Stack` is a concurrent data structure which allows insertion and retrieval of | ||
elements of in a last-in-first-out (LIFO) manner. | ||
|
||
```scala | ||
trait Stack[F[_], A] { | ||
def push(a: A): F[Unit] | ||
|
||
def pushN(as: A*): F[Unit] | ||
|
||
def pop: F[A] | ||
|
||
def tryPop: F[Option[A]] | ||
|
||
def peek: F[Option[A]] | ||
} | ||
``` | ||
|
||
* `push`: Pushes an element to the top of the `Stack`, never blocks and will always succeed. | ||
* `pushN`: Pushes many element sto the top of the `Stack`, the last element will be the final top, never blocks and will always succeed. | ||
* `pop`: Retrieves the top element from the `Stack`, semantically blocks when the `Stack` is empty. | ||
* `tryPop`: Similar to `pop` but rather than blocking, when empty will return `None`. | ||
* `peek` Similar to `tryPop` but would not remove the element from the `Stack`. There is no guarantee that a consequent `pop`, `tryPop`, or `peek` would return the same element due to concurrency. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,291 @@ | ||
/* | ||
* Copyright 2020-2024 Typelevel | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package cats | ||
package effect | ||
package std | ||
|
||
import cats.effect.kernel._ | ||
import cats.syntax.all._ | ||
|
||
/** | ||
* A purely functional, concurrent data structure which allows insertion and retrieval of | ||
* elements of type `A` in a last-in-first-out (LIFO) manner. | ||
* | ||
* The [[Stack#push]] operation never blocks and will always succeed. | ||
* | ||
* The [[Stack#pop]] operation semantically blocks when the `Stack` is empty, [[Stack#tryPop]] | ||
* allow for use cases which want to avoid blocking a fiber. | ||
* | ||
* The [[Stack#peek]] operation never blocks and will always succeed, it would however not | ||
* remove the element from the `Stack`, and there is no guarantee that a consequent `pop` would | ||
* return the same element. | ||
*/ | ||
abstract class Stack[F[_], A] { self => | ||
|
||
/** | ||
* Pushes the given element to the top of the `Stack`. | ||
* | ||
* @param element | ||
* the element to push at the top of the `Stack`. | ||
*/ | ||
def push(element: A): F[Unit] | ||
|
||
/** | ||
* Pushes the given elements to the top of the `Stack`, the last element will be the final | ||
* top. | ||
* | ||
* @param elements | ||
* the elements to push at the top of the `Stack`. | ||
*/ | ||
def pushN(elements: A*): F[Unit] | ||
|
||
/** | ||
* Takes the top element of `Stack`, if there is none it will semantically block until one is | ||
* made available. If multiple fibers are waiting for an element, they will be served in order | ||
* of arrival. | ||
*/ | ||
def pop: F[A] | ||
|
||
/** | ||
* Tries ta take the top element of `Stack`, if there is none it will return `None`. | ||
*/ | ||
def tryPop: F[Option[A]] | ||
|
||
/** | ||
* Returns the top element of the `Stack`, if there is any, without removing it. | ||
* | ||
* @note | ||
* In a concurrent scenario, there is no guarantee that a `peek` followed by a `pop` or | ||
* `tryPop` would return the same element. | ||
*/ | ||
def peek: F[Option[A]] | ||
|
||
/** | ||
* Returns the number of elements currently present in the `Stack`. | ||
* | ||
* @note | ||
* In a concurrent scenario, this value must be considered stale immediately after returning | ||
* it. There is no guarantee that doing a `pop` after seeing a value bigger than `0` will | ||
* not block. | ||
*/ | ||
def size: F[Int] | ||
|
||
/** | ||
* Modifies the context in which this `Stack` is executed using the natural transformation | ||
* `f`. | ||
* | ||
* @return | ||
* a `Stack` in the new context obtained by mapping the current one using `f`. | ||
*/ | ||
final def mapK[G[_]](f: F ~> G): Stack[G, A] = | ||
new Stack[G, A] { | ||
override def push(element: A): G[Unit] = | ||
f(self.push(element)) | ||
|
||
override def pushN(elements: A*): G[Unit] = | ||
f(self.pushN(elements: _*)) | ||
|
||
override def pop: G[A] = | ||
f(self.pop) | ||
|
||
override def tryPop: G[Option[A]] = | ||
f(self.tryPop) | ||
|
||
override def peek: G[Option[A]] = | ||
f(self.peek) | ||
|
||
override def size: G[Int] = | ||
f(self.size) | ||
} | ||
} | ||
|
||
object Stack { | ||
|
||
/** | ||
* Creates a new `Stack`. | ||
*/ | ||
def apply[F[_], A](implicit F: Concurrent[F]): F[Stack[F, A]] = | ||
// Initialize the state with an empty stack. | ||
Ref.of[F, StackState[F, A]](StackState.empty).map(state => new ConcurrentImpl(state)) | ||
|
||
/** | ||
* Creates a new `Stack`. Like `apply` but initializes state using another effect constructor. | ||
*/ | ||
def in[F[_], G[_], A](implicit F: Sync[F], G: Async[G]): F[Stack[G, A]] = | ||
// Initialize the state with an empty stack. | ||
Ref.in[F, G, StackState[G, A]](StackState.empty).map(state => new ConcurrentImpl(state)) | ||
|
||
private final case class StackState[F[_], A]( | ||
elements: List[A], | ||
waiters: collection.immutable.Queue[Deferred[F, A]] | ||
) { | ||
type CopyResult = StackState[F, A] | ||
type ModifyResult[R] = (CopyResult, R) | ||
|
||
def push(element: A)(implicit F: Concurrent[F]): ModifyResult[F[Boolean]] = | ||
waiters.dequeueOption match { | ||
case Some((waiter, remainingWaiters)) => | ||
this.copy(waiters = remainingWaiters) -> waiter.complete(element) | ||
|
||
case None => | ||
this.copy(elements = element :: this.elements) -> F.pure(true) | ||
} | ||
|
||
def pushN(elements: Seq[A])(implicit F: Concurrent[F]): ModifyResult[F[Seq[A]]] = | ||
if (this.waiters.isEmpty) | ||
// If there are no waiters we just push all the elements in reverse order. | ||
this.copy( | ||
elements = this.elements.prependedAll(elements.reverseIterator) | ||
) -> F.pure(Seq.empty) | ||
else { | ||
// Otherwise, if there is at least one waiter, we take all we can. | ||
val (remaining, waitersToNotify) = | ||
elements.reverse.align(this.waiters).partitionMap(_.unwrap) | ||
|
||
// We try to notify all the waiters we could take. | ||
val notifyWaiters = waitersToNotify.traverseFilter { | ||
case (element, waiter) => | ||
waiter.complete(element).map { | ||
// If the waiter was successfully awaken, we remove the element from the Stack. | ||
case true => None | ||
// Otherwise, we preserve the element for retrying the push. | ||
case false => Some(element) | ||
} | ||
} | ||
|
||
// The remaining elements are either all elements, or all waiters. | ||
val newState = remaining.parTraverse(_.toEitherNec) match { | ||
case Left(remainingElements) => | ||
// If only elements remained, then we preserve all the pending waiters, | ||
// and set the Stack elements as the remaining ones. | ||
// This is safe because the remaining elements are already in the correct order, | ||
// and since there was at least one waiter then we can assume there were not pending elements. | ||
this.copy(elements = remainingElements.toList) | ||
|
||
case Right(remainingWaiters) => | ||
// If only waiters remained, then we create a new Queue from them. | ||
this.copy(waiters = collection.immutable.Queue.from(remainingWaiters)) | ||
} | ||
|
||
newState -> notifyWaiters | ||
} | ||
|
||
def pop(waiter: Deferred[F, A]): ModifyResult[Option[A]] = | ||
elements match { | ||
case head :: tail => | ||
this.copy(elements = tail) -> Some(head) | ||
|
||
case Nil => | ||
this.copy(waiters = waiters.enqueue(waiter)) -> None | ||
} | ||
|
||
def removeWaiter(waiter: Deferred[F, A]): CopyResult = | ||
this.copy(waiters = this.waiters.filterNot(_ eq waiter)) | ||
|
||
def tryPop: ModifyResult[Option[A]] = | ||
elements match { | ||
case head :: tail => | ||
this.copy(elements = tail) -> Some(head) | ||
|
||
case Nil => | ||
this -> None | ||
} | ||
} | ||
|
||
private object StackState { | ||
def empty[F[_], A]: StackState[F, A] = StackState( | ||
elements = List.empty, | ||
waiters = collection.immutable.Queue.empty | ||
) | ||
} | ||
|
||
private final class ConcurrentImpl[F[_], A]( | ||
state: Ref[F, StackState[F, A]] | ||
)( | ||
implicit F: Concurrent[F] | ||
) extends Stack[F, A] { | ||
override def push(element: A): F[Unit] = | ||
F.uncancelable { _ => | ||
// Try to push an element to the Stack. | ||
state.flatModify(_.push(element)).flatMap { | ||
case true => | ||
// If it worked we finish the process. | ||
F.unit | ||
|
||
case false => | ||
// If it failed, we retry. | ||
this.push(element) | ||
} | ||
} | ||
|
||
override def pushN(elements: A*): F[Unit] = | ||
F.uncancelable { _ => | ||
// If elements is empty, do nothing. | ||
if (elements.isEmpty) F.unit | ||
// Optimize for the singleton case. | ||
else if (elements.sizeIs == 1) this.push(elements.head) | ||
else | ||
// Otherwise try to push all the elements at once. | ||
state.flatModify(_.pushN(elements)).flatMap { failedElements => | ||
// For the elements we failed to push, we retry. | ||
this.pushN(failedElements.reverse: _*) | ||
} | ||
} | ||
|
||
override final val pop: F[A] = | ||
F.uncancelable { poll => | ||
Deferred[F, A].flatMap { waiter => | ||
// Try to pop the head of the Stack. | ||
state.modify(_.pop(waiter)).flatMap { | ||
case Some(head) => | ||
// If there is one, we simply return it. | ||
F.pure(head) | ||
|
||
case None => | ||
// If there wasn't one, | ||
// we already added our waiter at the end of the waiters queue. | ||
// We then need to wait for it to be completed. | ||
// However, we may be cancelled while waiting for that. | ||
// If we are cancelled, then we will try to invalidate our waiter: | ||
val waitCancelledFinalizer = waiter.complete(null.asInstanceOf[A]).flatMap { | ||
case true => | ||
// If we managed to invalidate our waiter, | ||
// we try to remove it from the waiters queue. | ||
state.update(_.removeWaiter(waiter)).void | ||
|
||
case false => | ||
// But, if we didn't managed to invalidate it. | ||
// Then, that means we managed to receive a pushed element. | ||
// Thus, we have to push it again to avoid it getting lost. | ||
waiter.get.flatMap(element => this.push(element)) | ||
} | ||
|
||
F.onCancel(poll(waiter.get), waitCancelledFinalizer) | ||
} | ||
} | ||
} | ||
|
||
override final val tryPop: F[Option[A]] = | ||
F.uncancelable(_ => state.modify(_.tryPop)) | ||
|
||
override final val peek: F[Option[A]] = | ||
state.get.map(_.elements.headOption) | ||
|
||
override final val size: F[Int] = | ||
state.get.map(_.elements.size) | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't this let elements slip under the top the stack if a push happens after the pop then before the push back? ie. stack of A,B,C:
Now you'd have A,D,B,C, which should be impossible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, concurrent
pop
cancellation andpush
can change the order of elements in theStack
, I don't think there is much we can do here tho, we would need to store more information and the implementation would be very complex.Also, note that for it to happen you actually need to wait, that can't happen on a non-empty
Stack
, only in an empty one.So the real situation is something more akin to:
Stack
is empty.One fiber is waiting on
pop
Another fiber pushes
A, B, C
, concurrently withpop
is cancelled, and to anotherpush(D)
The end result may be something like:
C, D, B, A
Which, yes, feels wrong, but in a highly concurrent scenario, I'm not sure what else to do, at least the element didn't got lost and the overall priority is mostly preserved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is your scenario possible with pushN(A,B,C), or just a series of push()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible with
pushN
sadly, for series forpushs
I don't mind since if they are all concurrent then the right order is undefined anyway AFAIK.But for
pushN
I am indeed sad that it can lead to that error, the thing is thatpushN
is atomic in deciding which waiters to await, but it can't wait for them to be awakened before modifying theState
.