Skip to content

Commit f3cb604

Browse files
committed
Message bus and timer.
1 parent 0f0901d commit f3cb604

File tree

3 files changed

+216
-1
lines changed

3 files changed

+216
-1
lines changed

mys/lib/packages/fiber/src/lib.mys

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,3 +315,168 @@ trait Message:
315315
"""All messages should implement this trait.
316316

317317
"""
318+
319+
class _TimerHandler(TimerHandler):
320+
queue: MessageQueue
321+
message: Message
322+
323+
func on_timeout(self):
324+
self.queue.put(self.message)
325+
326+
class MessageTimer:
327+
_timer: Timer
328+
329+
func __init__(self,
330+
queue: MessageQueue,
331+
message: Message,
332+
initial: f64,
333+
interval: f64? = None):
334+
self._timer = Timer(_TimerHandler(queue, message), initial, interval)
335+
336+
func start(self):
337+
self._timer.start()
338+
339+
func stop(self):
340+
self._timer.stop()
341+
342+
# ToDo: Remove once generic queue works for messages.
343+
class MessageQueue:
344+
"""Message passing from one fiber to another.
345+
346+
"""
347+
348+
_values: [Message]
349+
_reader: Fiber?
350+
_is_closed: bool
351+
352+
func __init__(self):
353+
self._values = []
354+
self._reader = None
355+
self._is_closed = False
356+
357+
func open(self):
358+
"""Allow putting and getting message. A queue is open by default.
359+
360+
"""
361+
362+
self._is_closed = False
363+
364+
func close(self):
365+
"""Do not allow putting more messages on the queue. Allow getting
366+
all already queued messages.
367+
368+
"""
369+
370+
self._is_closed = True
371+
372+
if self._reader is not None:
373+
resume(self._reader)
374+
self._reader = None
375+
376+
func length(self) -> i64:
377+
"""Get number of queued messages.
378+
379+
"""
380+
381+
return self._values.length()
382+
383+
func put(self, value: Message):
384+
"""Put given value at the end of the queue. Never blocks.
385+
386+
"""
387+
388+
if self._is_closed:
389+
raise QueueError("Cannot put message on closed queue.")
390+
391+
self._values.append(value)
392+
393+
if self._reader is not None:
394+
resume(self._reader)
395+
self._reader = None
396+
397+
func get(self) -> Message:
398+
"""Get the first value from the queue. Suspends current fiber if the
399+
queue is empty.
400+
401+
"""
402+
403+
if self._values.length() == 0:
404+
if self._reader is not None:
405+
raise QueueError("only one fiber can get for a queue")
406+
407+
if self._is_closed:
408+
raise QueueError("Cannot get message from closed queue.")
409+
410+
self._reader = current()
411+
412+
try:
413+
suspend()
414+
except CancelledError:
415+
self._reader = None
416+
raise
417+
418+
if self._values.length() == 0:
419+
raise QueueError("Cannot get message from closed queue.")
420+
421+
return self._values.pop(0)
422+
423+
class Bus:
424+
"""A message bus.
425+
426+
"""
427+
428+
_subscribers: {i64: [MessageQueue]}
429+
_lock: Lock
430+
431+
func __init__(self):
432+
self._subscribers = {}
433+
self._lock = Lock()
434+
435+
func broadcast(self, message_id: i64, message: Message):
436+
self._lock.acquire()
437+
438+
try:
439+
queues = self._subscribers.get(message_id, None)
440+
441+
if queues is not None:
442+
for queue in queues:
443+
queue.put(message)
444+
finally:
445+
self._lock.release()
446+
447+
func subscribe(self, message_id: i64, queue: MessageQueue):
448+
self._lock.acquire()
449+
450+
try:
451+
if message_id not in self._subscribers:
452+
self._subscribers[message_id] = []
453+
454+
self._subscribers[message_id].append(queue)
455+
finally:
456+
self._lock.release()
457+
458+
func unsubscribe(self, message_id: i64, queue: MessageQueue):
459+
raise NotImplementedError("Unsubscribe not implemented.")
460+
461+
_BUS: Bus = Bus()
462+
463+
func broadcast(message_id: i64, message: Message):
464+
"""Broadcast given message on the default bus.
465+
466+
"""
467+
468+
_BUS.broadcast(message_id, message)
469+
470+
func subscribe(message_id: i64, queue: MessageQueue):
471+
"""Subscribe to given message on the default bus.
472+
473+
"""
474+
475+
_BUS.subscribe(message_id, queue)
476+
477+
func unsubscribe(message_id: i64, queue: MessageQueue):
478+
"""Unsubscribe from given message on the default bus.
479+
480+
"""
481+
482+
_BUS.unsubscribe(message_id, queue)

mys/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '0.451.0'
1+
__version__ = '0.452.0'

tests/files/fibers.mys

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ from fiber import CancelledError
88
from fiber import QueueError
99
from fiber import Timer
1010
from fiber import TimerHandler
11+
from fiber import broadcast
12+
from fiber import subscribe
13+
from fiber import Message
14+
from fiber import MessageQueue
1115

1216
test sleep():
1317
sleep(0.2)
@@ -279,3 +283,49 @@ test periodic_timer():
279283

280284
while _TIMEOUT_COUNT < 3:
281285
sleep(0.05)
286+
287+
_M1_ID: i64 = __unique_id__
288+
_M2_ID: i64 = __unique_id__
289+
290+
class _M1(Message):
291+
pass
292+
293+
class _M2(Message):
294+
pass
295+
296+
test bus():
297+
q1 = MessageQueue()
298+
q2 = MessageQueue()
299+
300+
broadcast(_M1_ID, _M1())
301+
assert q1.length() == 0
302+
assert q2.length() == 0
303+
304+
subscribe(_M1_ID, q1)
305+
subscribe(_M2_ID, q1)
306+
subscribe(_M2_ID, q2)
307+
308+
broadcast(_M1_ID, _M1())
309+
broadcast(_M2_ID, _M2())
310+
311+
assert q1.length() == 2
312+
313+
match q1.get():
314+
case _M1():
315+
pass
316+
case _:
317+
assert False
318+
319+
match q1.get():
320+
case _M2():
321+
pass
322+
case _:
323+
assert False
324+
325+
assert q2.length() == 1
326+
327+
match q2.get():
328+
case _M2():
329+
pass
330+
case _:
331+
assert False

0 commit comments

Comments
 (0)