Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] fix interrupts leak #1125

Merged
merged 5 commits into from
Apr 4, 2025
Merged

[core] fix interrupts leak #1125

merged 5 commits into from
Apr 4, 2025

Conversation

fwbrasil
Copy link
Collaborator

@fwbrasil fwbrasil commented Apr 3, 2025

Fixes #1122

Problem

When a parent Fiber is executing and encounters a new child Fiber to be waited on, the parent needs to register a callback to propagate interrupts to the child in case there's an interrupt during the waiting. The issue is that, once the child fiber finishes and the parent resumes, the interrupt callback for the child isn't necessary anymore but it stays in the pending list until the parent fully completes execution. This leads to a memory leak when a parent fiber handles many children during its execution.

Solution

When the parent fiber resumes execution, remove the interrupt callback for the completed child.

Notes

I've also included a fast-path optimization: if the child fiber is already completed, the parent resumes execution immeditely without suspending. There's a performance penalty with this change due to the removal of a few inlines in IOPromise that should be easily offset with this additional optimization.

* @return
* The number of waiters on this Fiber
*/
def waiters(using Frame): Int < IO = IO(self.waiters())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed it for testing and thought it'd be useful to expose to users

new Pending[E, A]:
def waiters: Int = self.waiters + 1
def interrupt[E2 >: E](error: Error[E2]) =
eval(discard(f(error.asInstanceOf[Error[E]])))
self
def removeInterrupt(other: IOPromise[?, ?]) =
self.removeInterrupt(other).onComplete(f)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to remove inline since it'd recurse the inlining here

@@ -237,15 +273,18 @@ private[kyo] object IOPromise:

def waiters: Int
def interrupt[E2 >: E](v: Error[E2]): Pending[E, A]
def removeInterrupt(other: IOPromise[?, ?]): Pending[E, A]
Copy link
Collaborator Author

@fwbrasil fwbrasil Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new method is relatively expensive since it'll rebuild the pending list up to the point of the interrupts callback. It should be ok since, in the majority of cases, the list is quite small with typically two elements: one for interrupt and another for completion. The exception is the collection handling methods in Fiber, where State objects link to all forked fibers but it becomes available for GC right after the collection operation finishes. I think we don't need an early cleanup there.

@@ -178,12 +178,13 @@ abstract private class PublisherToSubscriberTest extends Test:
subStream3 <- subscriber3.stream
subscriber4 <- streamSubscriber
subStream4 <- subscriber4.stream
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test was flaky

* @return
* Maybe containing the Result if the Fiber is done, or Absent if still pending
*/
def poll(using Frame): Maybe[Result[E, A]] < IO = IO(self.poll())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙏

Comment on lines +55 to +56
case null =>
cont(null)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is this possible?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an edge case when the promise is completed with null. I added a couple of test scenarios for that a some time ago after having a complex issue due to it not being handled well.

@hearnadam hearnadam merged commit 6aa02b6 into main Apr 4, 2025
3 checks passed
@hearnadam hearnadam deleted the interrupts-leak branch April 4, 2025 20:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG]: Memory leak when creating a large number of Promise instances
2 participants