Skip to content

Commit c41575c

Browse files
committed
disallow reentrancy
Reentrancy causes event reordering and stack explosions, in addition to leading to hard-to-debug scenarios. Since none of chronos is actually tested under reentrant conditions (ie that all features such as exceptions, cancellations, buffer operations, timers etc work reliably when the loop is reentered), this is hard to support over time and prevents useful optimizations - this PR simply detects and disallows the behaviour to remove the uncertainty, simplifying reasoning about the event loop in general.
1 parent 1306170 commit c41575c

File tree

3 files changed

+36
-75
lines changed

3 files changed

+36
-75
lines changed

chronos/internal/asyncengine.nim

Lines changed: 25 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -64,20 +64,23 @@ type
6464
ticks*: Deque[AsyncCallback]
6565
trackers*: Table[string, TrackerBase]
6666
counters*: Table[string, TrackerCounter]
67-
68-
proc sentinelCallbackImpl(arg: pointer) {.gcsafe, noreturn.} =
69-
raiseAssert "Sentinel callback MUST not be scheduled"
70-
71-
const
72-
SentinelCallback = AsyncCallback(function: sentinelCallbackImpl,
73-
udata: nil)
74-
75-
proc isSentinel(acb: AsyncCallback): bool =
76-
acb == SentinelCallback
67+
polling*: bool
68+
## The event loop is currently running
7769

7870
proc `<`(a, b: TimerCallback): bool =
7971
result = a.finishAt < b.finishAt
8072

73+
template preparePoll(loop: PDispatcherBase) =
74+
# If you hit this assert, you've called `poll`, `runForever` or `waitFor`
75+
# from within an async function which is not supported due to the difficulty
76+
# to control stack depth and event ordering
77+
# If you're using `waitFor`, switch to `await` and / or propagate the
78+
# up the call stack.
79+
doAssert not loop.polling, "The event loop and chronos functions in general are not reentrant"
80+
81+
loop.polling = true
82+
defer: loop.polling = false
83+
8184
func getAsyncTimestamp*(a: Duration): auto {.inline.} =
8285
## Return rounded up value of duration with milliseconds resolution.
8386
##
@@ -142,10 +145,10 @@ template processTicks(loop: untyped) =
142145
loop.callbacks.addLast(loop.ticks.popFirst())
143146

144147
template processCallbacks(loop: untyped) =
145-
while true:
146-
let callable = loop.callbacks.popFirst() # len must be > 0 due to sentinel
147-
if isSentinel(callable):
148-
break
148+
# Process existing callbacks but not those that follow, to allow the network
149+
# to regain control regularly
150+
for _ in 0..<loop.callbacks.len():
151+
let callable = loop.callbacks.popFirst()
149152
if not(isNil(callable.function)):
150153
callable.function(callable.udata)
151154

@@ -337,7 +340,6 @@ elif defined(windows):
337340
trackers: initTable[string, TrackerBase](),
338341
counters: initTable[string, TrackerCounter]()
339342
)
340-
res.callbacks.addLast(SentinelCallback)
341343
initAPI(res)
342344
res
343345
@@ -585,16 +587,13 @@ elif defined(windows):
585587
586588
proc poll*() =
587589
let loop = getThreadDispatcher()
590+
loop.preparePoll()
591+
588592
var
589593
curTime = Moment.now()
590594
curTimeout = DWORD(0)
591595
events: array[MaxEventsCount, osdefs.OVERLAPPED_ENTRY]
592596
593-
# On reentrant `poll` calls from `processCallbacks`, e.g., `waitFor`,
594-
# complete pending work of the outer `processCallbacks` call.
595-
# On non-reentrant `poll` calls, this only removes sentinel element.
596-
processCallbacks(loop)
597-
598597
# Moving expired timers to `loop.callbacks` and calculate timeout
599598
loop.processTimersGetTimeout(curTimeout)
600599
@@ -660,14 +659,10 @@ elif defined(windows):
660659
# We move tick callbacks to `loop.callbacks` always.
661660
processTicks(loop)
662661

663-
# All callbacks which will be added during `processCallbacks` will be
664-
# scheduled after the sentinel and are processed on next `poll()` call.
665-
loop.callbacks.addLast(SentinelCallback)
662+
# Process the callbacks currently scheduled - new callbacks scheduled during
663+
# callback execution will run in the next poll iteration
666664
processCallbacks(loop)
667665

668-
# All callbacks done, skip `processCallbacks` at start.
669-
loop.callbacks.addFirst(SentinelCallback)
670-
671666
proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) =
672667
## Closes a socket and ensures that it is unregistered.
673668
let loop = getThreadDispatcher()
@@ -758,7 +753,6 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
758753
trackers: initTable[string, TrackerBase](),
759754
counters: initTable[string, TrackerCounter]()
760755
)
761-
res.callbacks.addLast(SentinelCallback)
762756
initAPI(res)
763757
res
764758
@@ -1014,14 +1008,11 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
10141008
proc poll*() {.gcsafe.} =
10151009
## Perform single asynchronous step.
10161010
let loop = getThreadDispatcher()
1011+
loop.preparePoll()
1012+
10171013
var curTime = Moment.now()
10181014
var curTimeout = 0
10191015

1020-
# On reentrant `poll` calls from `processCallbacks`, e.g., `waitFor`,
1021-
# complete pending work of the outer `processCallbacks` call.
1022-
# On non-reentrant `poll` calls, this only removes sentinel element.
1023-
processCallbacks(loop)
1024-
10251016
# Moving expired timers to `loop.callbacks` and calculate timeout.
10261017
loop.processTimersGetTimeout(curTimeout)
10271018

@@ -1068,14 +1059,10 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
10681059
# We move tick callbacks to `loop.callbacks` always.
10691060
processTicks(loop)
10701061

1071-
# All callbacks which will be added during `processCallbacks` will be
1072-
# scheduled after the sentinel and are processed on next `poll()` call.
1073-
loop.callbacks.addLast(SentinelCallback)
1062+
# Process the callbacks currently scheduled - new callbacks scheduled during
1063+
# callback execution will run in the next poll iteration
10741064
processCallbacks(loop)
10751065

1076-
# All callbacks done, skip `processCallbacks` at start.
1077-
loop.callbacks.addFirst(SentinelCallback)
1078-
10791066
else:
10801067
proc initAPI() = discard
10811068
proc globalInit() = discard

tests/testbugs.nim

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -101,40 +101,6 @@ suite "Asynchronous issues test suite":
101101

102102
result = r1 and r2
103103

104-
proc createBigMessage(size: int): seq[byte] =
105-
var message = "MESSAGE"
106-
var res = newSeq[byte](size)
107-
for i in 0 ..< len(result):
108-
res[i] = byte(message[i mod len(message)])
109-
res
110-
111-
proc testIndexError(): Future[bool] {.async.} =
112-
var server = createStreamServer(initTAddress("127.0.0.1:0"),
113-
flags = {ReuseAddr})
114-
let messageSize = DefaultStreamBufferSize * 4
115-
var buffer = newSeq[byte](messageSize)
116-
let msg = createBigMessage(messageSize)
117-
let address = server.localAddress()
118-
let afut = server.accept()
119-
let outTransp = await connect(address)
120-
let inpTransp = await afut
121-
let bytesSent = await outTransp.write(msg)
122-
check bytesSent == messageSize
123-
var rfut {.used.} = inpTransp.readExactly(addr buffer[0], messageSize)
124-
125-
proc waiterProc(udata: pointer) {.raises: [], gcsafe.} =
126-
try:
127-
waitFor(sleepAsync(0.milliseconds))
128-
except CatchableError:
129-
raiseAssert "Unexpected exception happened"
130-
let timer {.used.} = setTimer(Moment.fromNow(0.seconds), waiterProc, nil)
131-
await sleepAsync(100.milliseconds)
132-
133-
await inpTransp.closeWait()
134-
await outTransp.closeWait()
135-
await server.closeWait()
136-
return true
137-
138104
test "Issue #6":
139105
check waitFor(issue6()) == true
140106

@@ -149,6 +115,3 @@ suite "Asynchronous issues test suite":
149115

150116
test "Defer for asynchronous procedures test [Nim's issue #13899]":
151117
check waitFor(testDefer()) == true
152-
153-
test "IndexError crash test":
154-
check waitFor(testIndexError()) == true

tests/testmacro.nim

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,3 +555,14 @@ suite "Exceptions tracking":
555555
await raiseException()
556556

557557
waitFor(callCatchAll())
558+
559+
test "No poll re-entrancy allowed":
560+
proc testReentrancy() {.async.} =
561+
await sleepAsync(1.milliseconds)
562+
poll()
563+
564+
let reenter = testReentrancy()
565+
expect(Defect):
566+
waitFor reenter
567+
568+
await reenter.cancelAndWait() # avoid pending future leaks

0 commit comments

Comments
 (0)