Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add updates_only mode to object store watch #666

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions nats/js/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,16 @@ async def watch(
ignore_deletes=False,
include_history=False,
meta_only=False,
updates_only=False,
) -> ObjectWatcher:
"""
watch for changes in the underlying store and receive meta information updates.

:param ignore_deletes: Whether to ignore deleted objects in the updates
:param include_history: Whether to include historical values
:param meta_only: Whether to only receive metadata
:param updates_only: Whether to only receive updates after the current state
:return: An ObjectWatcher instance
"""
all_meta = OBJ_ALL_META_PRE_TEMPLATE.format(bucket=self._name, )
watcher = ObjectStore.ObjectWatcher(self)
Expand All @@ -484,18 +491,22 @@ async def watch_updates(msg):

# When there are no more updates send an empty marker
# to signal that it is done, this will unblock iterators
if (not watcher._init_done) and meta.num_pending == 0:
# Only send None marker when not in updates_only mode
if (not watcher._init_done) and meta.num_pending == 0 and not updates_only:
watcher._init_done = True
await watcher._updates.put(None)

try:
await self._js.get_last_msg(self._stream, all_meta)
except NotFoundError:
watcher._init_done = True
await watcher._updates.put(None)
if not updates_only:
await watcher._updates.put(None)

deliver_policy = None
if not include_history:
if updates_only:
deliver_policy = api.DeliverPolicy.NEW
elif not include_history:
deliver_policy = api.DeliverPolicy.LAST_PER_SUBJECT

watcher._sub = await self._js.subscribe(
Expand Down
61 changes: 61 additions & 0 deletions tests/test_js.py
Original file line number Diff line number Diff line change
Expand Up @@ -4494,3 +4494,64 @@ async def test_add_stream_invalid_names(self):
),
):
await js.add_stream(name=name)

@async_test
async def test_object_watch_updates_only(self):
errors = []

async def error_handler(e):
print("Error:", e, type(e))
errors.append(e)

nc = await nats.connect(error_cb=error_handler)
js = nc.jetstream()

obs = await js.create_object_store(
"TEST_FILES",
config=nats.js.api.ObjectStoreConfig(description="updates_only_test", ),
)

# Put some initial objects
await obs.put("A", b"A")
await obs.put("B", b"B")
await obs.put("C", b"C")

# Start watching with updates_only=True
watcher = await obs.watch(updates_only=True)

# Since updates_only=True, we should not receive any initial state
# and no None marker since there are existing objects
with pytest.raises(asyncio.TimeoutError):
await watcher.updates(timeout=1)

# New updates should be received
await obs.put("D", b"D")
e = await watcher.updates()
assert e.name == "D"
assert e.bucket == "TEST_FILES"
assert e.size == 1
assert e.chunks == 1

# Updates to existing objects should be received
await obs.put("A", b"AA")
e = await watcher.updates()
assert e.name == "A"
assert e.bucket == "TEST_FILES"
assert e.size == 2

# Deletes should be received
await obs.delete("B")
e = await watcher.updates()
assert e.name == "B"
assert e.deleted == True

# Meta updates should be received
res = await obs.get("C")
to_update_meta = res.info.meta
to_update_meta.description = "changed"
await obs.update_meta("C", to_update_meta)
e = await watcher.updates()
assert e.name == "C"
assert e.description == "changed"

await nc.close()
Loading