Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support cancellation of timer operations #110

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ TESTS = \
tests/parameters.scm \
tests/preemption.scm \
tests/speedup.scm \
tests/timer-wheel.scm
tests/timer-wheel.scm \
tests/cancel-timer.scm

# The following tests require SOCK_NONBLOCK and SOCK_CLOEXEC. For now we just
# run them on a platform that supports epoll (probably Linux).
Expand Down
2 changes: 1 addition & 1 deletion fibers.texi
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ outside of a fiber.
There is also a low-level constructor for other modules that implement
primitive operation types:

@defun make-base-operation wrap-fn try-fn block-fn
@defun make-base-operation wrap-fn try-fn block-fn [cancel-fn]
Make a fresh base operation.
@end defun

Expand Down
27 changes: 24 additions & 3 deletions fibers/operations.scm
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,20 @@
(define-inlinable (make-op-state) (make-atomic-box 'W))

(define-record-type <base-op>
(make-base-operation wrap-fn try-fn block-fn)
(%make-base-operation wrap-fn try-fn block-fn cancel-fn)
base-op?
;; ((arg ...) -> (result ...)) | #f
(wrap-fn base-op-wrap-fn)
;; () -> (thunk | #f)
(try-fn base-op-try-fn)
;; (op-state sched resume-k) -> ()
(block-fn base-op-block-fn))
(block-fn base-op-block-fn)
;; (sched) -> ()
(cancel-fn base-op-cancel-fn))

(define* (make-base-operation wrap-fn try-fn block-fn
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ABI reasons, there needs to be a way to tell to Guile 'don't inline this' (not blocking this PR).

#:optional (cancel-fn (const #f)))
(%make-base-operation wrap-fn try-fn block-fn cancel-fn))

(define-record-type <choice-op>
(make-choice-operation base-ops)
Expand Down Expand Up @@ -121,6 +127,18 @@ succeeds, will succeed with one and only one of the sub-operations
((base-op) base-op)
(base-ops (make-choice-operation (list->vector base-ops)))))

(define (cancel-other-operations op index)
"Assuming @var{op} is a choice operation, cancel every operation but the
one at @var{index}."
(match op
(($ <choice-op> base-ops)
(let loop ((i 0))
(when (< i (vector-length base-ops))
(unless (= i index)
(match (vector-ref base-ops i)
(($ <base-op> wrap-fn try-fn block-fn cancel-fn)
(cancel-fn (current-scheduler))))))))))

(define (perform-operation op)
"Perform the operation @var{op} and return the resulting values. If
the operation cannot complete directly, block until it can complete."
Expand All @@ -141,7 +159,10 @@ the operation cannot complete directly, block until it can complete."
(when (< i (vector-length base-ops))
(match (vector-ref base-ops i)
(($ <base-op> wrap-fn try-fn block-fn)
(block-fn flag sched (wrap-resume resume wrap-fn))))
(let ((resume (lambda (thunk)
(cancel-other-operations op i)
(resume thunk))))
(block-fn flag sched (wrap-resume resume wrap-fn)))))
(lp (1+ i))))))))

(define (suspend)
Expand Down
1 change: 1 addition & 0 deletions fibers/scheduler.scm
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
(scheduler-kernel-thread/public . scheduler-kernel-thread)
scheduler-remote-peers
scheduler-work-pending?
scheduler-timers
choose-parallel-scheduler
run-scheduler
destroy-scheduler
Expand Down
13 changes: 13 additions & 0 deletions fibers/timer-wheel.scm
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#:use-module (ice-9 format)
#:export (make-timer-wheel
timer-wheel-add!
timer-wheel-remove!
timer-wheel-next-entry-time
timer-wheel-next-tick-start
timer-wheel-next-tick-end
Expand Down Expand Up @@ -141,6 +142,18 @@
(else
(timer-wheel-add! (or outer (add-outer-wheel! wheel)) t obj)))))))

(define (timer-wheel-remove! wheel entry)
"Remove @var{entry}, a timer entry as returned by @code{timer-wheel-add!},
from @var{wheel}."
(match entry
(($ <timer-entry> prev next)
(when prev
(set-timer-entry-next! prev next)
(set-timer-entry-prev! entry #f))
(when next
(set-timer-entry-prev! next prev)
(set-timer-entry-next! entry #f)))))

(define (timer-wheel-next-entry-time wheel)
(define (slot-min-time head)
(let lp ((entry (timer-entry-next head)) (min #f))
Expand Down
27 changes: 21 additions & 6 deletions fibers/timers.scm
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
;; Fibers: cooperative, event-driven user-space threads.

;;;; Copyright (C) 2016 Free Software Foundation, Inc.
;;;; Copyright (C) 2016, 2024 Free Software Foundation, Inc.
;;;;
;;;; This library is free software; you can redistribute it and/or
;;;; modify it under the terms of the GNU Lesser General Public
Expand All @@ -19,6 +19,7 @@
(define-module (fibers timers)
#:use-module (fibers scheduler)
#:use-module (fibers operations)
#:autoload (fibers timer-wheel) (timer-wheel-remove!)
#:use-module (ice-9 atomic)
#:use-module (ice-9 match)
#:use-module (ice-9 threads)
Expand All @@ -45,23 +46,37 @@
"Make an operation that will succeed when the current time is
greater than or equal to @var{expiry}, expressed in internal time
units. The operation will succeed with no values."
(make-base-operation #f
(lambda ()
(define wheel-entry
;; If true, this is the currently active timer entry for this operation.
#f)

(make-base-operation #f ;wrap
(lambda () ;try
(and (< expiry (get-internal-real-time))
values))
(lambda (flag sched resume)
(lambda (flag sched resume) ;block
(define (timer)
(match (atomic-box-compare-and-swap! flag 'W 'S)
('W (resume values))
('C (timer))
('S #f)))
(if sched
(schedule-task-at-time sched expiry timer)
(set! wheel-entry
(schedule-task-at-time sched expiry timer))
(schedule-task
(timer-sched)
(lambda ()
(perform-operation (timer-operation expiry))
(timer)))))))
(timer)))))
(lambda (sched) ;cancel
;; This operation is being canceled.
(when (and sched wheel-entry)
;; Remove WHEEL-ENTRY from the timer wheel right
;; away to avoid accumulating entries in the
;; wheel. See
;; <https://github.com/wingo/fibers/issues/109>.
(timer-wheel-remove! (scheduler-timers sched)
wheel-entry)))))

(define (sleep-operation seconds)
"Make an operation that will succeed with no values when
Expand Down
68 changes: 68 additions & 0 deletions tests/cancel-timer.scm
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
;; Fibers: cooperative, event-driven user-space threads.

;;;; Copyright (C) 2024 Ludovic Courtès <[email protected]>
;;;;
;;;; This library is free software; you can redistribute it and/or
;;;; modify it under the terms of the GNU Lesser General Public
;;;; License as published by the Free Software Foundation; either
;;;; version 3 of the License, or (at your option) any later version.
;;;;
;;;; This library is distributed in the hope that it will be useful,
;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;;; Lesser General Public License for more details.
;;;;
;;;; You should have received a copy of the GNU Lesser General Public License
;;;; along with this program. If not, see <http://www.gnu.org/licenses/>.
;;;;

(define-module (tests cancel-timer)
#:use-module (fibers)
#:use-module (fibers channels)
#:use-module (fibers operations)
#:use-module (fibers timers)
#:use-module (ice-9 format))

(define (heap-size)
(assoc-ref (gc-stats) 'heap-size))

(define iterations 200000)

;;; Check the heap growth caused by repeated choice operations where one of
;;; the base operations is a timer that always "loses" the choice.
;;;
;;; This situation used to cause timer continuations to accumulate, thereby
;;; leading to unbounded heap growth. The cancel function of
;;; 'timer-operation' fixes that by immediately canceling timers that lost in
;;; a choice operation. See <https://github.com/wingo/fibers/issues/109>.

(run-fibers
(lambda ()
(define channel
(make-channel))

(spawn-fiber
(lambda ()
(let loop ((i 0))
(when (< i iterations)
(put-message channel 'hello)
(loop (+ i 1))))))

(let ((initial-heap-size (heap-size)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're checking heap sizes, best do a gc beforehand, otherwise some heavy activity could lead to false negatives (as in 'no bug detected even though it exists').

Would it be possible (and sufficiently informative & meaningful) to instead check the length of the timer wheel? Seems less finicky to me (e.g. what if in the future tests are run in parallel in a single process).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if something finicky like heap sizes is avoided, I imagine the number of iterations could be reduced a lot (good for test performance).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that when #109 is present, the heap would grow way beyond the 2x limit that's tested here; it would not go unnoticed. (Maybe we could make the test faster but it was already reasonably fast in my experience.)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How fast is 'reasonably fast'?

(let loop ((i 0))
(when (< i iterations)
(perform-operation
(choice-operation (sleep-operation 500)
(get-operation channel)))
(loop (+ 1 i))))

(let ((final-heap-size (heap-size))
(MiB (lambda (size)
(/ size (expt 2 20.)))))
(if (<= final-heap-size (* 2 initial-heap-size))
(format #t "final heap size: ~,2f MiB; initial heap size: ~,2f MiB~%"
(MiB final-heap-size) (MiB initial-heap-size))
(begin
(format #t "heap grew too much: ~,2f MiB vs. ~,2f MiB~%"
(MiB final-heap-size) (MiB initial-heap-size))
(primitive-exit 1)))))))
Loading