-
Notifications
You must be signed in to change notification settings - Fork 6.8k
[core] Refactor aggregator agent to use async publisher event loop #55780
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: sampan <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request significantly refactors the aggregator agent to use a dedicated asyncio event loop for publishing events, which is a great architectural improvement. The introduction of BoundedQueue
and a publisher base class with ExternalSvcPublisher
and NoopPublisher
implementations makes the system more robust, testable, and extensible. My review includes a few suggestions to fix potential runtime errors, improve correctness, and clean up the code.
python/ray/dashboard/modules/aggregator/ray_events_publisher.py
Outdated
Show resolved
Hide resolved
python/ray/dashboard/modules/aggregator/ray_events_publisher.py
Outdated
Show resolved
Hide resolved
python/ray/dashboard/modules/aggregator/tests/test_ray_events_publisher.py
Outdated
Show resolved
Hide resolved
1 | ||
) | ||
dropped_oldest = self._event_buffer.put(event) | ||
if dropped_oldest: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previously we were calling get_nowait()
followed by put_nowait()
. this sequence of operations is not thread safe and could lead to unnecessary loss of newer events.
also Queue.queue()
is more suited towards pub-sub usecases but as we dont really need the blocking api's I have switched to using a simple custom Queue built on top of python deque
Signed-off-by: sampan <[email protected]>
Signed-off-by: sampan <[email protected]>
Signed-off-by: sampan <[email protected]>
class ExternalSvcPublisher(RayEventsPublisherBase): | ||
"""Publishes event batches to an external HTTP endpoint after filtering. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class ExternalSvcPublisher(RayEventsPublisherBase): | |
"""Publishes event batches to an external HTTP endpoint after filtering. | |
class HTTPPublisher(RayEventsPublisherBase): | |
"""Publishes event batches to an HTTP endpoint after filtering. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some detailed comments, let me know if you're unsure about how to structure asyncio code.
Higher-level structural comment that I think will help with cleaning up the tests: I don't think we need the inheritance structure here at all. Rather, you can have just a concrete EventPublisher
class that takes an EventClient
interface.
The EventClient
interface will just have the two methods you currently require subclassing. Then, you can define an HTTPEventClient
, gRPCEventClient
(for GCS in the future), and (if needed) FakeEventClient
.
You can use the FakeEventClient
to test the logic of the EventPublisher
without requiring any mocking or use of private methods. The HTTPEventClient
and gRPCEventClient
will be dead simple and can have their own unit tests.
I think with this structure you could also eliminate the "count failed events" part of the interface, because the EventPublisher
can track it internally.
# Subclasses must implement these | ||
@abstractmethod | ||
async def _async_publish(self, item) -> PublishStats: | ||
"""Publishes an item to the destination. | ||
|
||
Returns: | ||
PublishStats: (success, published_count, filtered_count) | ||
""" | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't be underscore prefixed if it's part of the class interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not to be accessed publicly, it is called from within the async worker task
return PublishStats(True, 1, 0) | ||
|
||
pub = MockPublisher(side_effect=side_effect, **base_kwargs) | ||
await pub._async_publish_with_retries("test_item") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tests shouldn't test against internal methods. refactor them to use the public interface only
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend testing the actual HTTPPublisher
implementation instead: inject a fake HTTP client and make assertions on the side effects called on that HTTP client
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have separate unit tests for the http publisher as well where I don't call internal methods. We can remove this test.
"""Base class for event publishers with common functions for creating an async worker to publish events with retries and backoff. | ||
|
||
Subclasses must implement _async_publish(item) -> PublishStats | ||
and _estimate_item_size(item) -> int. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't match the actual method name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah nice catch, missed this when renaming the method
Estimates the size of an item to track number of items that failed to publish. | ||
Used to increment failure metrics when publishing fails or queue is full. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the name is confusing if this is the semantic... num_failed_events
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, what is the "item" (add a type hint)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Item can be list of events (for http publisher) or tuple of events and taskEventsMetadata (for gcs publisher). This is why I don't have type hints in the base class, but I have added it in the child classes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used a generic name because at the end of the day the method just returns the number of events in the batch (item). It would have no idea whether publishing failed or succeeded
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me rename it to _count_num_events_in_batch
and rename item
to batch
everywhere
PUBLISHER_TIMEOUT_SECONDS = ray_constants.env_integer( | ||
f"{env_var_prefix}_PUBLISHER_TIMEOUT_SECONDS", 5 | ||
) | ||
# maximum number of events that can be queued for publishing to the destination | ||
PUBLISHER_QUEUE_MAX_SIZE = ray_constants.env_integer( | ||
f"{env_var_prefix}_PUBLISH_DEST_QUEUE_MAX_SIZE", 50 | ||
) | ||
# maximum number of retries for publishing events to the destination | ||
PUBLISHER_MAX_RETRIES = ray_constants.env_integer( | ||
f"{env_var_prefix}_PUBLISH_MAX_RETRIES", 5 | ||
) | ||
# initial backoff time for publishing events to the destination | ||
PUBLISHER_INITIAL_BACKOFF_SECONDS = ray_constants.env_float( | ||
f"{env_var_prefix}_PUBLISH_INITIAL_BACKOFF_SECONDS", 0.01 | ||
) | ||
# maximum backoff time for publishing events to the destination | ||
PUBLISHER_MAX_BACKOFF_SECONDS = ray_constants.env_float( | ||
f"{env_var_prefix}_PUBLISH_MAX_BACKOFF_SECONDS", 5.0 | ||
) | ||
# jitter ratio for publishing events to the destination | ||
PUBLISHER_JITTER_RATIO = ray_constants.env_float( | ||
f"{env_var_prefix}_PUBLISH_JITTER_RATIO", 0.1 | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd make these class variables on the publisher since they're only used by it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am using these as default values to the constructor params, so keeping them outside seemed cleaner. let me know if you still think it is better to define them inside the class
raise ValueError("max_size must be a positive integer") | ||
|
||
self._queue = deque(maxlen=max_size) | ||
self._lock = threading.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we cannot use threading.Lock
on asyncio event loop (it will block the loop)
this class should use asyncio
primitives (like event and queue) instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bounded_queue is a shared data structure used by several threads (11 by default). So unfortunately had to use a threading.lock.
This data structure is a replacement for the Queue.queue we were using previously (which also internally uses a lock)
) | ||
dropped_oldest = self._event_buffer.put(event) | ||
if dropped_oldest: | ||
with self._lock: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, cannot use threading.Lock
on asyncio event loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two event loops within the aggregator.
One for the gcs server receiving rayEvents from worker event buffers. This gcs server uses a thread pool with default 10 threads. So when using the bounded_queue (the internal buffer) and when updating metrics a lock is being used to protect shared resources.
The other event loop is the one I am introducing for the publisher logic. This loop is responsible for reading from the bounded_queue forming batches and adding those to the destination specific asyncio queues (there are async tasks which then consume from these queues and publishes to the respective destinations)
return { | ||
"name": "test", | ||
"queue_max_size": 10, | ||
"max_retries": 2, | ||
"initial_backoff": 0.01, | ||
"max_backoff": 0.1, | ||
"jitter_ratio": 0.1, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we shouldn't rely on any timing conditions in tests; instead, inject fake clocks and make the tests deterministic (and fast!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me set the backoff to 0 so that we can test retries without the wait
@pytest.mark.asyncio | ||
async def test_start_and_stop_publishers(self, mock_publisher): | ||
"""Test that start and async shutdown work.""" | ||
mock_publisher.start() | ||
await mock_publisher.shutdown() # should not block or error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we care about testing these methods on the 'mock' publisher?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah not really needed, I had mainly added for coverage but I think we can completely remove the mock publisher tests and just rely on the http publisher and the gcs publisher (in the next pr) tests
timeout: float = PUBLISHER_TIMEOUT_SECONDS, | ||
queue_max_size: int = PUBLISHER_QUEUE_MAX_SIZE, | ||
max_retries: int = PUBLISHER_MAX_RETRIES, | ||
initial_backoff: float = PUBLISHER_INITIAL_BACKOFF_SECONDS, | ||
max_backoff: float = PUBLISHER_MAX_BACKOFF_SECONDS, | ||
jitter_ratio: float = PUBLISHER_JITTER_RATIO, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like the defaulting for these should live in the base class not the implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept them in the implementation because I thought this would make it easier in the future to have separate default values for each implementation.
but I guess for the time being it makes more sense to keep it in the base class itself. I'll make the change
this is a good suggestion, Ill refactor my pr accordingly.
we may not be able to eliminate this as the input to the http publisher is just list of events where as the input to the gcs publisher is list of events and taskEventsMetadata. |
Signed-off-by: sampan <[email protected]>
Signed-off-by: Sampan S Nayak <[email protected]>
Signed-off-by: sampan <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
saw you moved to draft so just push my pending comments
@@ -0,0 +1,342 @@ | |||
from abc import ABC, abstractmethod |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ray_event (singular) for consistency; ray_event is the branding name of the system and is not intended to refer to events of Ray
from requests import Session | ||
from requests.adapters import HTTPAdapter | ||
from ray._private.protobuf_compat import message_to_json | ||
from ray.dashboard.modules.aggregator.ray_events_publisher import ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ray_event_publisher for naming consistency (event without the s
)
@@ -84,6 +78,10 @@ | |||
EXPOSABLE_EVENT_TYPES = os.environ.get( | |||
f"{env_var_prefix}_EXPOSABLE_EVENT_TYPES", DEFAULT_EXPOSABLE_EVENT_TYPES | |||
) | |||
# flag to enable publishing events to the external HTTP service | |||
PUBLISH_EVENTS_TO_EXTERNAL_HTTP_SVC = ray_constants.env_bool( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a new flag; how is it controlled today?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
today we always publish to http service (unless the endpoint is not specified. I am adding an explicit flag to switch http service publisher on and off)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am introducing the new flag to support following configurations:
- only send to http svc
- only send to GCS (gcs support is added in the next pr)
- send to both http svc and GCS
@@ -110,20 +108,32 @@ | |||
tuple(dashboard_consts.COMPONENT_METRICS_TAG_KEYS), | |||
namespace="ray", | |||
) | |||
events_published = Counter( | |||
f"{metrics_prefix}_events_published", | |||
events_published_to_external_svc = Counter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is svc
stands for (the comment says external server)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
svc
is short form for service
. so here I am tracking number of events published to the external http service (shortened it to make the variable name smaller)
) | ||
self._http_session.mount("http://", HTTPAdapter(max_retries=retries)) | ||
self._http_session.mount("https://", HTTPAdapter(max_retries=retries)) | ||
# Dedicated publisher event loop thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add to the comments what some of the tasks this event loop is expected to do; also notes on the separation of cpu-intensive tasks from this loop etc.
except asyncio.QueueFull: | ||
# Drop oldest then try once more | ||
oldest = self._queue.get_nowait() | ||
drop_count = self._publish_client.count_num_events_in_batch(oldest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: count_num_events_in_batch
-> get_event_batch_size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in case of GCS publisher batch contains both list of events and a batch of dropped task attempt metadata. this was why I went with a slightly more weird but generic function name
""" | ||
if self._publish_worker_task and self._queue is not None: | ||
# Send sentinel (None) value to stop worker | ||
self.enqueue_batch(None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is pretty weird, maybe add another field in this class or something to indicate shutdown
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from what I read using sentinel values is a common way to deal with shutdown scenarios in python as the queue does not expose a wakeup/notify all API. but as we are going to move away from using a duplicate queue this piece of code will be completely updated
await asyncio.sleep(delay) | ||
|
||
|
||
class NoopPublisher: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to define a shared interface between this and RayEventsPublisher
so type system can protect them from drifting in the future
publisher.enqueue_batch("new_batch") | ||
|
||
metrics = publisher.get_and_reset_metrics() | ||
print(metrics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
# Retry sending with other events in the next iteration. | ||
self._send_events_to_external_service(event_batch) | ||
frozen_batch = tuple(event_batch) | ||
# Enqueue batch to publishers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't really need this comment
Signed-off-by: sampan <[email protected]>
Signed-off-by: sampan <[email protected]>
Signed-off-by: sampan <[email protected]>
Signed-off-by: sampan <[email protected]>
events_published.labels(**labels).inc(_events_published) | ||
events_filtered_out.labels(**labels).inc(_events_filtered_out) | ||
|
||
def _check_main_thread_liveness(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed the cleanup and liveness checks as these are not really needed and also seem to be implemented incorrectly.
the base class already has a sigtermHandler:
ray/python/ray/dashboard/agent.py
Lines 442 to 454 in 4efc09a
def sigterm_handler(): | |
logger.warning("Exiting with SIGTERM immediately...") | |
# Exit code 0 will be considered as an expected shutdown | |
os._exit(signal.SIGTERM) | |
if sys.platform != "win32": | |
# TODO(rickyyx): we currently do not have any logic for actual | |
# graceful termination in the agent. Most of the underlying | |
# async tasks run by the agent head doesn't handle CancelledError. | |
# So a truly graceful shutdown is not trivial w/o much refactoring. | |
# Re-open the issue: https://github.com/ray-project/ray/issues/25518 | |
# if a truly graceful shutdown is required. | |
loop.add_signal_handler(signal.SIGTERM, sigterm_handler) |
also
ray/python/ray/dashboard/agent.py
Lines 448 to 453 in 4efc09a
# TODO(rickyyx): we currently do not have any logic for actual | |
# graceful termination in the agent. Most of the underlying | |
# async tasks run by the agent head doesn't handle CancelledError. | |
# So a truly graceful shutdown is not trivial w/o much refactoring. | |
# Re-open the issue: https://github.com/ray-project/ray/issues/25518 | |
# if a truly graceful shutdown is required. |
Signed-off-by: sampan <[email protected]>
Signed-off-by: sampan <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly nit and thoughts on metric collection; @MengjinYan to review as well
python/ray/dashboard/modules/aggregator/multi_consumer_event_buffer.py
Outdated
Show resolved
Hide resolved
python/ray/dashboard/modules/aggregator/multi_consumer_event_buffer.py
Outdated
Show resolved
Hide resolved
python/ray/dashboard/modules/aggregator/multi_consumer_event_buffer.py
Outdated
Show resolved
Hide resolved
python/ray/dashboard/modules/aggregator/multi_consumer_event_buffer.py
Outdated
Show resolved
Hide resolved
python/ray/dashboard/modules/aggregator/publisher/ray_event_publisher.py
Outdated
Show resolved
Hide resolved
# Wait inside a loop to deal with spurious wakeups. | ||
while True: | ||
# Wait outside the lock to avoid deadlocks | ||
await has_events_to_consume.wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a race between the wait
and _lock
that has_events_to_consume
has been set to false in between?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deadlock scenario: if we wait after acquiring the lock then we can enter a deadlock as add_event()
also ties to acquire the lock.
spurious wakeup: once has_events_to_consume
is set, with the current code we are guaranteed to always have events in the buffer (so spurious wakeups are not possible) but I added this check to prevent any issues in the future (if we decide to have many consumer workers instead of a single one like we have today). This may not really be needed but its a small addition to protect our code so I feel it might be ok to keep it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it
python/ray/dashboard/modules/aggregator/publisher/ray_event_publisher.py
Outdated
Show resolved
Hide resolved
with pytest.raises( | ||
RuntimeError, match="The condition wasn't met before the timeout expired." | ||
): | ||
wait_for_condition(lambda: len(httpserver.log) > 0, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add comments to explain why waiting for 1 second is sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just didnt want the test to wait for too long, Ill increase it to 3 to ensure that we are giving ample time to validate that events are not being sent to http service
return event | ||
|
||
|
||
class TestMultiConsumerEventBuffer: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need tests for concurrently running of add_event
and wait_for_batch
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have test_wait_for_batch_blocks_until_event_available
where both are called. let me add another test where there are events being continuously published and batches being continuously read
Signed-off-by: sampan <[email protected]>
async with self._lock: | ||
return len(self._buffer) | ||
|
||
async def get_and_reset_evicted_events_count( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we aggregate metrics for some time before flushing it periodically
ray.util.metric
abstract the aggregation and flushing logic away from you, so you don't really have to worry about that. Unless you're doing Counter(...).inc(1)
every ms; otherwise that function is almost similar to increase an in-memory counter so there are no IO overhead.
every component (ray_event_producer, multi_consumer_event_buffer, aggregator_agent) would define their own update metrics task which we would need to await within AggregatorAgent.run()
Each individual component is responsible for logging its own metric sounds good to me; reduce the amount of dependencies between components. What do you mean by update metrics task
though?
consumer_state.has_new_events_to_consume.set() | ||
|
||
async def wait_for_batch( | ||
self, consumer_id: str, timeout_seconds: float = 1.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How long is PUBLISHER_MAX_BUFFER_SEND_INTERVAL_SECONDS? Unless that value is in milliseconds, it might already serve as the effective sleep interval in the while loop, meaning you don’t need an extra asyncio.sleep(x) between iterations?
In the Phase 2 implementation of wait_for_batch, the logic is nearly the same as Phase 1, except that it uses await asyncio.wait_for(has_events_to_consume.wait(), remaining) instead of await has_events_to_consume.wait(). The difference is essentially waiting forever (Phase 1) versus waiting with a timeout (every few seconds) (Phase 2), at the cost of looping back into the while loop—which is negligible (compared to the already seconds-long waits in Phase 2) that let other async tasks run?
Your implementation is certainly more efficient in theory, but I wonder if we can keep it simpler while achieving similar efficiency.
# Wait inside a loop to deal with spurious wakeups. | ||
while True: | ||
# Wait outside the lock to avoid deadlocks | ||
await has_events_to_consume.wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it
async with self._lock: | ||
consumer_state = self._consumers.get(consumer_id) | ||
if consumer_state is None: | ||
raise KeyError(f"unknown consumer '{consumer_id}'") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, currently this API requires you to register first before calling wait_for_batch; otherwise you'll hit this fatal. I was wondering we can eliminate async def register_consumer(self) -> str:
and create consumer during this MultiConsumerEventBuffer object construction instead.
return PublishStats(True, 0, 0) | ||
filtered = [e for e in events_batch if self._events_filter_fn(e)] | ||
num_filtered_out = len(events_batch) - len(filtered) | ||
if not filtered: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe move the filtered inside the consumer info of multi_consumer_event_buffer
so we can fetch more meaningful batch here; maybe a TODO, not in this PR
await self._session.close() | ||
self._session = None | ||
|
||
def set_session(self, session) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it
try: | ||
logger.info(f"Starting publisher {self._name}") | ||
while True: | ||
batch = await self._event_buffer.wait_for_batch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as mentioned in the other comments; feel like wait_for_batch
already yield to other tasks for seconds (PUBLISHER_MAX_BUFFER_SEND_INTERVAL_SECONDS ) so it eliminates the busy waiting of the while loop
time_since_last_success_seconds: Time elapsed since last successful publish, or None if never succeeded | ||
dropped_events: Dict mapping event types to counts of events dropped from buffer | ||
""" | ||
async with self._metrics_lock: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as mentioned, Counter
is just an in-memory thing; it performs io flushing every few seconds in the background
Why are these changes needed?
This is the first out of two pr's for supporting sending events from aggregator to GCS. in this pr the existing code is refactored to:
MultiConsumerEventBuffer
instead of pythonQueue.queue
for buffering and batchingRayEvents
Related issue number
NA
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.