Skip to content

Commit

Permalink
fix: ensure that the event snapshot is available when one observer de…
Browse files Browse the repository at this point in the history
…fers and the other does not (#1562)

If an event has multiple observers, then if *any* of the
(event+observer) pairs do not defer, then we need to save the snapshot
so that it's available to process that (event+observer).

#1372 introduced a bug where if *any* of the (event+observer) pairs was
skipped because an exact match was already in the deferral queue, we
would skip saving the snapshot. This is ok if there's only a single
observer, or if an (event+observer) that did not defer was processed
before any that did defer, but not otherwise.

Fixes #1561
  • Loading branch information
tonyandrewmeyer authored Feb 5, 2025
1 parent d63208d commit d0f9f50
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 30 deletions.
6 changes: 2 additions & 4 deletions ops/framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -892,10 +892,8 @@ def _emit(self, event: EventBase):
observer_path,
method_name,
)
# We don't need to save a new notice and snapshot, but we do
# want the event to run, because it has been saved previously
# and not completed.
saved = True
# Note that there may be other (event+observer) pairs that are
# not in the queue, and so may need to save the snapshot.
continue
if not saved:
# Save the event for all known observers before the first notification
Expand Down
109 changes: 83 additions & 26 deletions test/test_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ def fake_script(request: pytest.FixtureRequest) -> FakeScript:
return FakeScript(request)


class SimpleEventWithData(ops.EventBase):
def __init__(self, handle: ops.Handle, data: str):
super().__init__(handle)
self.data: str = data

def restore(self, snapshot: typing.Dict[str, typing.Any]):
self.data = typing.cast(str, snapshot['data'])

def snapshot(self) -> typing.Dict[str, typing.Any]:
return {'data': self.data}


class TestFramework:
def test_deprecated_init(self, caplog: pytest.LogCaptureFixture):
# For 0.7, this still works, but it is deprecated.
Expand Down Expand Up @@ -372,23 +384,12 @@ def on_commit(self, event: ops.CommitEvent):
def test_defer_and_reemit(self, request: pytest.FixtureRequest):
framework = create_framework(request)

class MyEvent(ops.EventBase):
def __init__(self, handle: ops.Handle, data: str):
super().__init__(handle)
self.data: str = data

def restore(self, snapshot: typing.Dict[str, typing.Any]):
self.data = typing.cast(str, snapshot['data'])

def snapshot(self) -> typing.Dict[str, typing.Any]:
return {'data': self.data}

class MyNotifier1(ops.Object):
a = ops.EventSource(MyEvent)
b = ops.EventSource(MyEvent)
a = ops.EventSource(SimpleEventWithData)
b = ops.EventSource(SimpleEventWithData)

class MyNotifier2(ops.Object):
c = ops.EventSource(MyEvent)
c = ops.EventSource(SimpleEventWithData)

class MyObserver(ops.Object):
def __init__(self, parent: ops.Object, key: str):
Expand Down Expand Up @@ -453,23 +454,12 @@ def test_repeated_defer(self, request: pytest.FixtureRequest):
class MyEvent(ops.EventBase):
data: typing.Optional[str] = None

class MyDataEvent(MyEvent):
def __init__(self, handle: ops.Handle, data: str):
super().__init__(handle)
self.data: typing.Optional[str] = data

def restore(self, snapshot: typing.Dict[str, typing.Any]):
self.data = typing.cast(typing.Optional[str], snapshot['data'])

def snapshot(self) -> typing.Dict[str, typing.Any]:
return {'data': self.data}

class ReleaseEvent(ops.EventBase):
pass

class MyNotifier(ops.Object):
n = ops.EventSource(MyEvent)
d = ops.EventSource(MyDataEvent)
d = ops.EventSource(SimpleEventWithData)
r = ops.EventSource(ReleaseEvent)

class MyObserver(ops.Object):
Expand Down Expand Up @@ -554,6 +544,73 @@ def notices_for_observer(n: int):
assert len(notices_for_observer(1)) == 0
assert len(notices_for_observer(2)) == 4

def test_two_observers_one_deferring(self, request: pytest.FixtureRequest):
framework = create_framework(request)

class MyNotifier(ops.Object):
my_event = ops.EventSource(SimpleEventWithData)

class RecordingObserver(ops.Object):
def __init__(self, parent: ops.Object, key: str):
super().__init__(parent, key)
self.events: typing.List[str] = []

def on_event(self, _: SimpleEventWithData):
self.events.append(self.__class__.__name__)

class DeferringObserver(RecordingObserver):
defer = True

def on_event(self, event: SimpleEventWithData):
super().on_event(event)
if self.defer:
event.defer()

pub = MyNotifier(framework, 'my_event')
obs1 = DeferringObserver(framework, '1')
obs2 = RecordingObserver(framework, '2')

framework.observe(pub.my_event, obs1.on_event)
framework.observe(pub.my_event, obs2.on_event)

# We always reemit() to handle the deferred events and then do the
# actual emit(). Create a helper function to do this.
def emit():
framework.reemit()
pub.my_event.emit('foo')

# Emit an event, which will be deferred by one observer, and not by the
# other. We should have a single notice with a corresponding snapshot,
# which is the deferred event, and each observer will have seen the
# event once.
emit()
notices = tuple(framework._storage.notices())
assert len(notices) == 1
assert framework._storage.load_snapshot(notices[0][0]) == {'data': 'foo'}
assert len(obs1.events) == len(obs2.events) == 1

# If we emit the event another time, we'll still have one notice and
# snapshot, and each observer will have seen the event twice.
emit()
notices = tuple(framework._storage.notices())
assert len(notices) == 1
assert framework._storage.load_snapshot(notices[0][0]) == {'data': 'foo'}
assert len(obs1.events) == len(obs2.events) == 2

# If we emit the event with neither observer deferring, we'll have no
# remaining notice or snapshot. The observer that didn't defer will have
# seen the event a straightforward 3 times. The observer that did defer
# will have seen it once more - in this final case, it runs the deferred
# event, which clears the queue, and then it runs the actual event
# (not skipping because the queue is empty at that point).
obs1.defer = False
emit()
assert len(obs1.events) == 4
assert len(obs2.events) == 3
notices = tuple(framework._storage.notices())
assert len(notices) == 0
assert len(tuple(framework._storage.list_snapshots())) == 0

def test_custom_event_data(self, request: pytest.FixtureRequest):
framework = create_framework(request)

Expand Down

0 comments on commit d0f9f50

Please sign in to comment.