-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] fix interrupts leak #1125
Conversation
* @return | ||
* The number of waiters on this Fiber | ||
*/ | ||
def waiters(using Frame): Int < IO = IO(self.waiters()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I needed it for testing and thought it'd be useful to expose to users
new Pending[E, A]: | ||
def waiters: Int = self.waiters + 1 | ||
def interrupt[E2 >: E](error: Error[E2]) = | ||
eval(discard(f(error.asInstanceOf[Error[E]]))) | ||
self | ||
def removeInterrupt(other: IOPromise[?, ?]) = | ||
self.removeInterrupt(other).onComplete(f) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to remove inline
since it'd recurse the inlining here
@@ -237,15 +273,18 @@ private[kyo] object IOPromise: | |||
|
|||
def waiters: Int | |||
def interrupt[E2 >: E](v: Error[E2]): Pending[E, A] | |||
def removeInterrupt(other: IOPromise[?, ?]): Pending[E, A] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new method is relatively expensive since it'll rebuild the pending list up to the point of the interrupts
callback. It should be ok since, in the majority of cases, the list is quite small with typically two elements: one for interrupt and another for completion. The exception is the collection handling methods in Fiber
, where State
objects link to all forked fibers but it becomes available for GC right after the collection operation finishes. I think we don't need an early cleanup there.
@@ -178,12 +178,13 @@ abstract private class PublisherToSubscriberTest extends Test: | |||
subStream3 <- subscriber3.stream | |||
subscriber4 <- streamSubscriber | |||
subStream4 <- subscriber4.stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test was flaky
* @return | ||
* Maybe containing the Result if the Fiber is done, or Absent if still pending | ||
*/ | ||
def poll(using Frame): Maybe[Result[E, A]] < IO = IO(self.poll()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙏
case null => | ||
cont(null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is this possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an edge case when the promise is completed with null
. I added a couple of test scenarios for that a some time ago after having a complex issue due to it not being handled well.
Fixes #1122
Problem
When a parent
Fiber
is executing and encounters a new childFiber
to be waited on, the parent needs to register a callback to propagate interrupts to the child in case there's an interrupt during the waiting. The issue is that, once the child fiber finishes and the parent resumes, the interrupt callback for the child isn't necessary anymore but it stays in the pending list until the parent fully completes execution. This leads to a memory leak when a parent fiber handles many children during its execution.Solution
When the parent fiber resumes execution, remove the interrupt callback for the completed child.
Notes
I've also included a fast-path optimization: if the child fiber is already completed, the parent resumes execution immeditely without suspending. There's a performance penalty with this change due to the removal of a few
inline
s inIOPromise
that should be easily offset with this additional optimization.