Skip to content

Conversation

sampan-s-nayak
Copy link
Contributor

@sampan-s-nayak sampan-s-nayak commented Aug 20, 2025

Why are these changes needed?

This is the first out of two pr's for supporting sending events from aggregator to GCS. in this pr the existing code is refactored to:

  • run on the main dashboard agent event loop.
  • use a custom light weight MultiConsumerEventBuffer instead of python Queue.queue for buffering and batching RayEvents
  • move publisher logic to a separate module with support to add more publishers in the future

Related issue number

NA

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@sampan-s-nayak sampan-s-nayak requested a review from a team as a code owner August 20, 2025 11:19
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request significantly refactors the aggregator agent to use a dedicated asyncio event loop for publishing events, which is a great architectural improvement. The introduction of BoundedQueue and a publisher base class with ExternalSvcPublisher and NoopPublisher implementations makes the system more robust, testable, and extensible. My review includes a few suggestions to fix potential runtime errors, improve correctness, and clean up the code.

1
)
dropped_oldest = self._event_buffer.put(event)
if dropped_oldest:
Copy link
Contributor Author

@sampan-s-nayak sampan-s-nayak Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously we were calling get_nowait() followed by put_nowait(). this sequence of operations is not thread safe and could lead to unnecessary loss of newer events.

also Queue.queue() is more suited towards pub-sub usecases but as we dont really need the blocking api's I have switched to using a simple custom Queue built on top of python deque

Signed-off-by: sampan <[email protected]>
sampan added 2 commits August 20, 2025 11:36
Signed-off-by: sampan <[email protected]>
Signed-off-by: sampan <[email protected]>
@ray-gardener ray-gardener bot added core Issues that should be addressed in Ray Core observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling labels Aug 20, 2025
Comment on lines 234 to 235
class ExternalSvcPublisher(RayEventsPublisherBase):
"""Publishes event batches to an external HTTP endpoint after filtering.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class ExternalSvcPublisher(RayEventsPublisherBase):
"""Publishes event batches to an external HTTP endpoint after filtering.
class HTTPPublisher(RayEventsPublisherBase):
"""Publishes event batches to an HTTP endpoint after filtering.

Copy link
Collaborator

@edoakes edoakes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some detailed comments, let me know if you're unsure about how to structure asyncio code.

Higher-level structural comment that I think will help with cleaning up the tests: I don't think we need the inheritance structure here at all. Rather, you can have just a concrete EventPublisher class that takes an EventClient interface.

The EventClient interface will just have the two methods you currently require subclassing. Then, you can define an HTTPEventClient, gRPCEventClient (for GCS in the future), and (if needed) FakeEventClient.

You can use the FakeEventClient to test the logic of the EventPublisher without requiring any mocking or use of private methods. The HTTPEventClient and gRPCEventClient will be dead simple and can have their own unit tests.

I think with this structure you could also eliminate the "count failed events" part of the interface, because the EventPublisher can track it internally.

Comment on lines 215 to 223
# Subclasses must implement these
@abstractmethod
async def _async_publish(self, item) -> PublishStats:
"""Publishes an item to the destination.

Returns:
PublishStats: (success, published_count, filtered_count)
"""
pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't be underscore prefixed if it's part of the class interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not to be accessed publicly, it is called from within the async worker task

return PublishStats(True, 1, 0)

pub = MockPublisher(side_effect=side_effect, **base_kwargs)
await pub._async_publish_with_retries("test_item")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests shouldn't test against internal methods. refactor them to use the public interface only

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend testing the actual HTTPPublisher implementation instead: inject a fake HTTP client and make assertions on the side effects called on that HTTP client

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have separate unit tests for the http publisher as well where I don't call internal methods. We can remove this test.

"""Base class for event publishers with common functions for creating an async worker to publish events with retries and backoff.

Subclasses must implement _async_publish(item) -> PublishStats
and _estimate_item_size(item) -> int.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't match the actual method name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah nice catch, missed this when renaming the method

Comment on lines 228 to 229
Estimates the size of an item to track number of items that failed to publish.
Used to increment failure metrics when publishing fails or queue is full.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the name is confusing if this is the semantic... num_failed_events?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, what is the "item" (add a type hint)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Item can be list of events (for http publisher) or tuple of events and taskEventsMetadata (for gcs publisher). This is why I don't have type hints in the base class, but I have added it in the child classes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used a generic name because at the end of the day the method just returns the number of events in the batch (item). It would have no idea whether publishing failed or succeeded

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me rename it to _count_num_events_in_batch and rename item to batch everywhere

Comment on lines 22 to 44
PUBLISHER_TIMEOUT_SECONDS = ray_constants.env_integer(
f"{env_var_prefix}_PUBLISHER_TIMEOUT_SECONDS", 5
)
# maximum number of events that can be queued for publishing to the destination
PUBLISHER_QUEUE_MAX_SIZE = ray_constants.env_integer(
f"{env_var_prefix}_PUBLISH_DEST_QUEUE_MAX_SIZE", 50
)
# maximum number of retries for publishing events to the destination
PUBLISHER_MAX_RETRIES = ray_constants.env_integer(
f"{env_var_prefix}_PUBLISH_MAX_RETRIES", 5
)
# initial backoff time for publishing events to the destination
PUBLISHER_INITIAL_BACKOFF_SECONDS = ray_constants.env_float(
f"{env_var_prefix}_PUBLISH_INITIAL_BACKOFF_SECONDS", 0.01
)
# maximum backoff time for publishing events to the destination
PUBLISHER_MAX_BACKOFF_SECONDS = ray_constants.env_float(
f"{env_var_prefix}_PUBLISH_MAX_BACKOFF_SECONDS", 5.0
)
# jitter ratio for publishing events to the destination
PUBLISHER_JITTER_RATIO = ray_constants.env_float(
f"{env_var_prefix}_PUBLISH_JITTER_RATIO", 0.1
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd make these class variables on the publisher since they're only used by it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using these as default values to the constructor params, so keeping them outside seemed cleaner. let me know if you still think it is better to define them inside the class

raise ValueError("max_size must be a positive integer")

self._queue = deque(maxlen=max_size)
self._lock = threading.Lock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we cannot use threading.Lock on asyncio event loop (it will block the loop)

this class should use asyncio primitives (like event and queue) instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bounded_queue is a shared data structure used by several threads (11 by default). So unfortunately had to use a threading.lock.

This data structure is a replacement for the Queue.queue we were using previously (which also internally uses a lock)

)
dropped_oldest = self._event_buffer.put(event)
if dropped_oldest:
with self._lock:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, cannot use threading.Lock on asyncio event loop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two event loops within the aggregator.

One for the gcs server receiving rayEvents from worker event buffers. This gcs server uses a thread pool with default 10 threads. So when using the bounded_queue (the internal buffer) and when updating metrics a lock is being used to protect shared resources.

The other event loop is the one I am introducing for the publisher logic. This loop is responsible for reading from the bounded_queue forming batches and adding those to the destination specific asyncio queues (there are async tasks which then consume from these queues and publishes to the respective destinations)

Comment on lines 49 to 56
return {
"name": "test",
"queue_max_size": 10,
"max_retries": 2,
"initial_backoff": 0.01,
"max_backoff": 0.1,
"jitter_ratio": 0.1,
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't rely on any timing conditions in tests; instead, inject fake clocks and make the tests deterministic (and fast!)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me set the backoff to 0 so that we can test retries without the wait

Comment on lines 85 to 89
@pytest.mark.asyncio
async def test_start_and_stop_publishers(self, mock_publisher):
"""Test that start and async shutdown work."""
mock_publisher.start()
await mock_publisher.shutdown() # should not block or error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we care about testing these methods on the 'mock' publisher?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah not really needed, I had mainly added for coverage but I think we can completely remove the mock publisher tests and just rely on the http publisher and the gcs publisher (in the next pr) tests

Comment on lines 245 to 250
timeout: float = PUBLISHER_TIMEOUT_SECONDS,
queue_max_size: int = PUBLISHER_QUEUE_MAX_SIZE,
max_retries: int = PUBLISHER_MAX_RETRIES,
initial_backoff: float = PUBLISHER_INITIAL_BACKOFF_SECONDS,
max_backoff: float = PUBLISHER_MAX_BACKOFF_SECONDS,
jitter_ratio: float = PUBLISHER_JITTER_RATIO,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like the defaulting for these should live in the base class not the implementation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept them in the implementation because I thought this would make it easier in the future to have separate default values for each implementation.

but I guess for the time being it makes more sense to keep it in the base class itself. I'll make the change

@can-anyscale can-anyscale self-requested a review August 20, 2025 17:28
@sampan-s-nayak
Copy link
Contributor Author

ft some detailed comments, let me know if you're unsure about how to structure asyncio code.

Higher-level structural comment that I think will help with cleaning up the tests: I don't think we need the inheritance structure here at all. Rather, you can have just a concrete EventPublisher class that takes an EventClient interface.

The EventClient interface will just have the two methods you currently require subclassing. Then, you can define an HTTPEventClient, gRPCEventClient (for GCS in the future), and (if needed) FakeEventClient.

You can use the FakeEventClient to test the logic of the EventPublisher without requiring any mocking or use of private methods. The HTTPEventClient and gRPCEventClient will be dead simple and can have their own unit tests.

I think with this structure you could also eliminate the "count failed events" part of the interface, because the EventPublisher can track it internally.

this is a good suggestion, Ill refactor my pr accordingly.

I think with this structure you could also eliminate the "count failed events" part of the interface, because the EventPublisher can track it internally.

we may not be able to eliminate this as the input to the http publisher is just list of events where as the input to the gcs publisher is list of events and taskEventsMetadata.

sampan and others added 2 commits August 21, 2025 06:44
Signed-off-by: sampan <[email protected]>
@sampan-s-nayak sampan-s-nayak marked this pull request as draft August 26, 2025 07:38
Copy link
Contributor

@can-anyscale can-anyscale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saw you moved to draft so just push my pending comments

@@ -0,0 +1,342 @@
from abc import ABC, abstractmethod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ray_event (singular) for consistency; ray_event is the branding name of the system and is not intended to refer to events of Ray

from requests import Session
from requests.adapters import HTTPAdapter
from ray._private.protobuf_compat import message_to_json
from ray.dashboard.modules.aggregator.ray_events_publisher import (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ray_event_publisher for naming consistency (event without the s)

@@ -84,6 +78,10 @@
EXPOSABLE_EVENT_TYPES = os.environ.get(
f"{env_var_prefix}_EXPOSABLE_EVENT_TYPES", DEFAULT_EXPOSABLE_EVENT_TYPES
)
# flag to enable publishing events to the external HTTP service
PUBLISH_EVENTS_TO_EXTERNAL_HTTP_SVC = ray_constants.env_bool(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a new flag; how is it controlled today?

Copy link
Contributor Author

@sampan-s-nayak sampan-s-nayak Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

today we always publish to http service (unless the endpoint is not specified. I am adding an explicit flag to switch http service publisher on and off)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am introducing the new flag to support following configurations:

  1. only send to http svc
  2. only send to GCS (gcs support is added in the next pr)
  3. send to both http svc and GCS

@@ -110,20 +108,32 @@
tuple(dashboard_consts.COMPONENT_METRICS_TAG_KEYS),
namespace="ray",
)
events_published = Counter(
f"{metrics_prefix}_events_published",
events_published_to_external_svc = Counter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is svc stands for (the comment says external server)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

svc is short form for service. so here I am tracking number of events published to the external http service (shortened it to make the variable name smaller)

)
self._http_session.mount("http://", HTTPAdapter(max_retries=retries))
self._http_session.mount("https://", HTTPAdapter(max_retries=retries))
# Dedicated publisher event loop thread
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add to the comments what some of the tasks this event loop is expected to do; also notes on the separation of cpu-intensive tasks from this loop etc.

except asyncio.QueueFull:
# Drop oldest then try once more
oldest = self._queue.get_nowait()
drop_count = self._publish_client.count_num_events_in_batch(oldest)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: count_num_events_in_batch -> get_event_batch_size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case of GCS publisher batch contains both list of events and a batch of dropped task attempt metadata. this was why I went with a slightly more weird but generic function name

"""
if self._publish_worker_task and self._queue is not None:
# Send sentinel (None) value to stop worker
self.enqueue_batch(None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is pretty weird, maybe add another field in this class or something to indicate shutdown

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from what I read using sentinel values is a common way to deal with shutdown scenarios in python as the queue does not expose a wakeup/notify all API. but as we are going to move away from using a duplicate queue this piece of code will be completely updated

await asyncio.sleep(delay)


class NoopPublisher:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to define a shared interface between this and RayEventsPublisher so type system can protect them from drifting in the future

publisher.enqueue_batch("new_batch")

metrics = publisher.get_and_reset_metrics()
print(metrics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

# Retry sending with other events in the next iteration.
self._send_events_to_external_service(event_batch)
frozen_batch = tuple(event_batch)
# Enqueue batch to publishers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't really need this comment

events_published.labels(**labels).inc(_events_published)
events_filtered_out.labels(**labels).inc(_events_filtered_out)

def _check_main_thread_liveness(self) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the cleanup and liveness checks as these are not really needed and also seem to be implemented incorrectly.

the base class already has a sigtermHandler:

def sigterm_handler():
logger.warning("Exiting with SIGTERM immediately...")
# Exit code 0 will be considered as an expected shutdown
os._exit(signal.SIGTERM)
if sys.platform != "win32":
# TODO(rickyyx): we currently do not have any logic for actual
# graceful termination in the agent. Most of the underlying
# async tasks run by the agent head doesn't handle CancelledError.
# So a truly graceful shutdown is not trivial w/o much refactoring.
# Re-open the issue: https://github.com/ray-project/ray/issues/25518
# if a truly graceful shutdown is required.
loop.add_signal_handler(signal.SIGTERM, sigterm_handler)

also

# TODO(rickyyx): we currently do not have any logic for actual
# graceful termination in the agent. Most of the underlying
# async tasks run by the agent head doesn't handle CancelledError.
# So a truly graceful shutdown is not trivial w/o much refactoring.
# Re-open the issue: https://github.com/ray-project/ray/issues/25518
# if a truly graceful shutdown is required.
comment talks about how graceful shutdown is not really currently supported, if needed we should revist this holistically instead of adding custom implementations.

Signed-off-by: sampan <[email protected]>
@sampan-s-nayak sampan-s-nayak marked this pull request as ready for review September 1, 2025 04:04
@sampan-s-nayak sampan-s-nayak added the go add ONLY when ready to merge, run all tests label Sep 1, 2025
Signed-off-by: sampan <[email protected]>
Copy link
Contributor

@can-anyscale can-anyscale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly nit and thoughts on metric collection; @MengjinYan to review as well

# Wait inside a loop to deal with spurious wakeups.
while True:
# Wait outside the lock to avoid deadlocks
await has_events_to_consume.wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a race between the wait and _lock that has_events_to_consume has been set to false in between?

Copy link
Contributor Author

@sampan-s-nayak sampan-s-nayak Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deadlock scenario: if we wait after acquiring the lock then we can enter a deadlock as add_event() also ties to acquire the lock.

spurious wakeup: once has_events_to_consume is set, with the current code we are guaranteed to always have events in the buffer (so spurious wakeups are not possible) but I added this check to prevent any issues in the future (if we decide to have many consumer workers instead of a single one like we have today). This may not really be needed but its a small addition to protect our code so I feel it might be ok to keep it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

with pytest.raises(
RuntimeError, match="The condition wasn't met before the timeout expired."
):
wait_for_condition(lambda: len(httpserver.log) > 0, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comments to explain why waiting for 1 second is sufficient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just didnt want the test to wait for too long, Ill increase it to 3 to ensure that we are giving ample time to validate that events are not being sent to http service

return event


class TestMultiConsumerEventBuffer:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need tests for concurrently running of add_event and wait_for_batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have test_wait_for_batch_blocks_until_event_available where both are called. let me add another test where there are events being continuously published and batches being continuously read

Signed-off-by: sampan <[email protected]>
async with self._lock:
return len(self._buffer)

async def get_and_reset_evicted_events_count(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we aggregate metrics for some time before flushing it periodically

ray.util.metric abstract the aggregation and flushing logic away from you, so you don't really have to worry about that. Unless you're doing Counter(...).inc(1) every ms; otherwise that function is almost similar to increase an in-memory counter so there are no IO overhead.

every component (ray_event_producer, multi_consumer_event_buffer, aggregator_agent) would define their own update metrics task which we would need to await within AggregatorAgent.run()

Each individual component is responsible for logging its own metric sounds good to me; reduce the amount of dependencies between components. What do you mean by update metrics task though?

consumer_state.has_new_events_to_consume.set()

async def wait_for_batch(
self, consumer_id: str, timeout_seconds: float = 1.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How long is PUBLISHER_MAX_BUFFER_SEND_INTERVAL_SECONDS? Unless that value is in milliseconds, it might already serve as the effective sleep interval in the while loop, meaning you don’t need an extra asyncio.sleep(x) between iterations?

In the Phase 2 implementation of wait_for_batch, the logic is nearly the same as Phase 1, except that it uses await asyncio.wait_for(has_events_to_consume.wait(), remaining) instead of await has_events_to_consume.wait(). The difference is essentially waiting forever (Phase 1) versus waiting with a timeout (every few seconds) (Phase 2), at the cost of looping back into the while loop—which is negligible (compared to the already seconds-long waits in Phase 2) that let other async tasks run?

Your implementation is certainly more efficient in theory, but I wonder if we can keep it simpler while achieving similar efficiency.

# Wait inside a loop to deal with spurious wakeups.
while True:
# Wait outside the lock to avoid deadlocks
await has_events_to_consume.wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

async with self._lock:
consumer_state = self._consumers.get(consumer_id)
if consumer_state is None:
raise KeyError(f"unknown consumer '{consumer_id}'")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, currently this API requires you to register first before calling wait_for_batch; otherwise you'll hit this fatal. I was wondering we can eliminate async def register_consumer(self) -> str: and create consumer during this MultiConsumerEventBuffer object construction instead.

return PublishStats(True, 0, 0)
filtered = [e for e in events_batch if self._events_filter_fn(e)]
num_filtered_out = len(events_batch) - len(filtered)
if not filtered:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe move the filtered inside the consumer info of multi_consumer_event_buffer so we can fetch more meaningful batch here; maybe a TODO, not in this PR

await self._session.close()
self._session = None

def set_session(self, session) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

try:
logger.info(f"Starting publisher {self._name}")
while True:
batch = await self._event_buffer.wait_for_batch(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned in the other comments; feel like wait_for_batch already yield to other tasks for seconds (PUBLISHER_MAX_BUFFER_SEND_INTERVAL_SECONDS ) so it eliminates the busy waiting of the while loop

time_since_last_success_seconds: Time elapsed since last successful publish, or None if never succeeded
dropped_events: Dict mapping event types to counts of events dropped from buffer
"""
async with self._metrics_lock:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned, Counter is just an in-memory thing; it performs io flushing every few seconds in the background

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants