-
Notifications
You must be signed in to change notification settings - Fork 6.8k
[core] Refactor aggregator agent to use async publisher event loop #55780
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 12 commits
acc5135
7d97880
526d57b
478a64d
9f012a1
5ad838d
f97954c
6ab692c
42cce0c
ffcc140
ed8ea9e
7a300e0
515d530
5913272
286b15a
8e8e5be
4c1f6e9
284c28a
f224814
2456ac2
5a954ca
cf8d094
730b644
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
from collections import deque | ||
import asyncio | ||
import time | ||
from typing import Dict, List | ||
from dataclasses import dataclass | ||
import uuid | ||
|
||
from ray.core.generated import ( | ||
events_base_event_pb2, | ||
) | ||
from ray.core.generated.events_base_event_pb2 import RayEvent | ||
|
||
|
||
@dataclass | ||
class _ConsumerState: | ||
# index of the next event to be consumed by this consumer | ||
cursor_index: int | ||
# map of event type to the number of events evicted for this consumer since last metric update | ||
evicted_events_count: Dict[str, int] | ||
# event to signal that there are new events to consume | ||
has_new_events_to_consume: asyncio.Event | ||
|
||
|
||
class MultiConsumerEventBuffer: | ||
"""A buffer which allows adding one event at a time and consuming events in batches. | ||
Supports multiple consumers, each with their own cursor index. Tracks the number of events evicted for each consumer. | ||
|
||
Buffer is not thread-safe but is asyncio-friendly. All operations must be called from the same event loop. | ||
""" | ||
|
||
def __init__(self, max_size: int, max_batch_size: int): | ||
sampan-s-nayak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._buffer = deque(maxlen=max_size) | ||
self._max_size = max_size | ||
self._lock = asyncio.Lock() | ||
self._consumers: Dict[str, _ConsumerState] = {} | ||
|
||
self._max_batch_size = max_batch_size | ||
|
||
async def add_event(self, event: events_base_event_pb2.RayEvent): | ||
sampan-s-nayak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Add an event to the buffer. | ||
|
||
If the buffer is full, the oldest event is dropped. | ||
""" | ||
async with self._lock: | ||
dropped_event = None | ||
if len(self._buffer) >= self._max_size: | ||
dropped_event = self._buffer.popleft() | ||
self._buffer.append(event) | ||
|
||
for _, consumer_state in self._consumers.items(): | ||
# update consumer cursor index and evicted events count if the event was dropped | ||
if dropped_event is not None: | ||
sampan-s-nayak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if consumer_state.cursor_index == 0: | ||
# the dropped event was the next event this consumer would have consumed | ||
event_type_name = RayEvent.EventType.Name( | ||
dropped_event.event_type | ||
) | ||
if event_type_name not in consumer_state.evicted_events_count: | ||
consumer_state.evicted_events_count[event_type_name] = 0 | ||
consumer_state.evicted_events_count[event_type_name] += 1 | ||
else: | ||
# the dropped event was before the consumer's current position, so adjust cursor | ||
consumer_state.cursor_index -= 1 | ||
# signal that there are new events to consume | ||
consumer_state.has_new_events_to_consume.set() | ||
|
||
async def wait_for_batch( | ||
self, consumer_id: str, timeout_seconds: float = 1.0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this API is a bit unusual; I would expect the timeout to be global rather than timeout after the first event received There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I went with this design so that I can avoid using an explicit sleep in the publisher loop. the loop we have today: while True:
batch = await self._event_buffer.wait_for_batch(
self._event_buffer_consumer_id,
PUBLISHER_MAX_BUFFER_SEND_INTERVAL_SECONDS,
)
await self._async_publish_with_retries(batch) because of the way There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How long is PUBLISHER_MAX_BUFFER_SEND_INTERVAL_SECONDS? Unless that value is in milliseconds, it might already serve as the effective sleep interval in the while loop, meaning you don’t need an extra asyncio.sleep(x) between iterations? In the Phase 2 implementation of wait_for_batch, the logic is nearly the same as Phase 1, except that it uses await asyncio.wait_for(has_events_to_consume.wait(), remaining) instead of await has_events_to_consume.wait(). The difference is essentially waiting forever (Phase 1) versus waiting with a timeout (every few seconds) (Phase 2), at the cost of looping back into the while loop—which is negligible (compared to the already seconds-long waits in Phase 2) that let other async tasks run? Your implementation is certainly more efficient in theory, but I wonder if we can keep it simpler while achieving similar efficiency. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
as you said when there are items in the buffer both the approach you are proposing and my implementation are equivalent. the only advantage is when the buffer is empty, instead of busy waiting the API I defined blocks until a new event shows up. if we want to simplify this we can expose another API called I think I am ok with either approach, cant really think of any strong reasons to pick one over the other. let me know your preference There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got you, if Unrelated question, is 0.1s is also how frequent we are pushing to the GCS + external HttpServer, that seems a very high QPS for GCS or the external HttpServer to handle and process data. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 0.1s is indeed very frequent pushing. what's the reason for tuning it so low? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this was present even before my change (its controlled by an env variable) I am not aware about the reason behind this, should I increase it to 0.5/1 second instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think so, the current reporting interval to GCS is 1s (https://github.com/ray-project/ray/blob/master/src/ray/common/ray_config_def.h#L457 CC: @MengjinYan for context about this value. |
||
) -> List[events_base_event_pb2.RayEvent]: | ||
"""Wait for batch respecting self.max_batch_size and timeout_seconds. | ||
|
||
Returns a batch of up to self.max_batch_size items. Waits for up to | ||
timeout_seconds after receiving the first request that will be in | ||
sampan-s-nayak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
the next batch. After the timeout, returns as many items as are ready. | ||
|
||
Always returns a batch with at least one item - will block | ||
indefinitely until an item comes in. | ||
|
||
Arguments: | ||
consumer_id: id of the consumer consuming the batch | ||
timeout_seconds: maximum time to wait for a batch | ||
""" | ||
max_batch = self._max_batch_size | ||
consumer_state = None | ||
has_events_to_consume = None | ||
async with self._lock: | ||
consumer_state = self._consumers.get(consumer_id) | ||
if consumer_state is None: | ||
raise KeyError(f"unknown consumer '{consumer_id}'") | ||
sampan-s-nayak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
has_events_to_consume = consumer_state.has_new_events_to_consume | ||
|
||
# phase 1: read the first event, we wait indefinitely until there is at least one event to consume | ||
# we wait inside a loop to deal with spurious wakeups. | ||
while True: | ||
# we wait outside the lock to avoid deadlocks | ||
await has_events_to_consume.wait() | ||
sampan-s-nayak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
async with self._lock: | ||
if consumer_state.cursor_index < len(self._buffer): | ||
# add the first event to the batch | ||
event = self._buffer[consumer_state.cursor_index] | ||
consumer_state.cursor_index += 1 | ||
batch = [event] | ||
break | ||
|
||
# there is no new events to consume, clear the condition variable and wait for it to be set again | ||
has_events_to_consume.clear() | ||
|
||
# Phase 2: add items to the batch up to timeout or until full | ||
deadline = time.monotonic() + max(0.0, float(timeout_seconds)) | ||
while len(batch) < max_batch: | ||
remaining = deadline - time.monotonic() | ||
if remaining <= 0: | ||
break | ||
|
||
async with self._lock: | ||
# drain whatever is available | ||
while len(batch) < max_batch and consumer_state.cursor_index < len( | ||
self._buffer | ||
): | ||
batch.append(self._buffer[consumer_state.cursor_index]) | ||
consumer_state.cursor_index += 1 | ||
|
||
if len(batch) >= max_batch: | ||
break | ||
|
||
# there is still room in the batch, but no new events to consume, clear the condition variable and wait for it to be set again | ||
has_events_to_consume.clear() | ||
try: | ||
await asyncio.wait_for(has_events_to_consume.wait(), remaining) | ||
except asyncio.TimeoutError: | ||
# timeout, we return the batch as is | ||
break | ||
|
||
return batch | ||
|
||
async def register_consumer(self) -> str: | ||
"""Register a new consumer. | ||
|
||
Returns: | ||
id of the consumer | ||
""" | ||
async with self._lock: | ||
consumer_id = str(uuid.uuid4()) | ||
self._consumers[consumer_id] = _ConsumerState( | ||
cursor_index=0, | ||
evicted_events_count={}, | ||
has_new_events_to_consume=asyncio.Event(), | ||
) | ||
return consumer_id | ||
|
||
async def size(self) -> int: | ||
"""Get the number of events in the buffer.""" | ||
async with self._lock: | ||
return len(self._buffer) | ||
|
||
async def get_and_reset_evicted_events_count( | ||
sampan-s-nayak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self, consumer_id: str | ||
) -> Dict[str, int]: | ||
"""Get the number of events evicted for a consumer. and reset the evicted events count.""" | ||
async with self._lock: | ||
consumer_state = self._consumers.get(consumer_id) | ||
if consumer_state is None: | ||
raise KeyError(f"unknown consumer '{consumer_id}'") | ||
evicted_events_count = consumer_state.evicted_events_count | ||
consumer_state.evicted_events_count = {} | ||
return evicted_events_count |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
from concurrent.futures import ThreadPoolExecutor | ||
import json | ||
import logging | ||
|
||
from ray._common.utils import get_or_create_event_loop | ||
import aiohttp | ||
from ray._private.protobuf_compat import message_to_json | ||
from ray.core.generated import events_base_event_pb2 | ||
from ray.dashboard.modules.aggregator.publisher.configs import PUBLISHER_TIMEOUT_SECONDS | ||
from typing import Callable | ||
from dataclasses import dataclass | ||
from abc import ABC, abstractmethod | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@dataclass | ||
class PublishStats: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same, can we log metrics directly inside the publisher vs. bubble up There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. replied #55780 (comment) |
||
"""Data class that represents stats of publishing a batch of events.""" | ||
|
||
is_publish_successful: bool | ||
num_events_published: int | ||
num_events_filtered_out: int | ||
|
||
|
||
class PublisherClientInterface(ABC): | ||
"""Abstract interface for publishing Ray event batches to external destinations. | ||
|
||
Implementations should handle the actual publishing logic, filtering, | ||
and format conversion appropriate for their specific destination type. | ||
""" | ||
|
||
@abstractmethod | ||
async def publish(self, batch) -> PublishStats: | ||
sampan-s-nayak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Publish a batch of events to the destination.""" | ||
pass | ||
|
||
@abstractmethod | ||
def count_num_events_in_batch(self, batch) -> int: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in this pr the batch is just a list of ray events. but in the next pr where we add GCS publisher the batch will be a tuple of droppedTaskAttempts and list of events. this was why I made the name a bit more explicit |
||
"""Count the number of events in a given batch.""" | ||
pass | ||
|
||
@abstractmethod | ||
async def close(self) -> None: | ||
"""Clean up any resources used by this client. Should be called when the publisherClient is no longer required""" | ||
pass | ||
|
||
|
||
class AsyncHttpPublisherClient(PublisherClientInterface): | ||
"""Client for publishing ray event batches to an external HTTP service.""" | ||
|
||
def __init__( | ||
self, | ||
endpoint: str, | ||
executor: ThreadPoolExecutor, | ||
events_filter_fn: Callable[[object], bool], | ||
timeout: float = PUBLISHER_TIMEOUT_SECONDS, | ||
) -> None: | ||
self._endpoint = endpoint | ||
self._executor = executor | ||
self._events_filter_fn = events_filter_fn | ||
self._timeout = aiohttp.ClientTimeout(total=timeout) | ||
self._session = None | ||
|
||
async def publish( | ||
self, events_batch: list[events_base_event_pb2.RayEvent] | ||
) -> PublishStats: | ||
if not events_batch: | ||
return PublishStats(True, 0, 0) | ||
filtered = [e for e in events_batch if self._events_filter_fn(e)] | ||
num_filtered_out = len(events_batch) - len(filtered) | ||
if not filtered: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe move the filtered inside the consumer info of |
||
# All filtered out -> success but nothing published | ||
return PublishStats(True, 0, num_filtered_out) | ||
|
||
# Convert protobuf objects to python dictionaries for HTTP POST | ||
filtered_json = await get_or_create_event_loop().run_in_executor( | ||
self._executor, | ||
lambda: [ | ||
json.loads( | ||
message_to_json(e, always_print_fields_with_no_presence=True) | ||
) | ||
for e in filtered | ||
], | ||
) | ||
|
||
try: | ||
# Create session on first use (lazy initialization) | ||
sampan-s-nayak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if not self._session: | ||
self._session = aiohttp.ClientSession(timeout=self._timeout) | ||
|
||
return await self._send_http_request(filtered_json, num_filtered_out) | ||
except Exception as e: | ||
logger.error("Failed to send events to external service. Error: %s", e) | ||
return PublishStats(False, 0, 0) | ||
|
||
async def _send_http_request(self, json_data, num_filtered_out): | ||
sampan-s-nayak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
async with self._session.post( | ||
self._endpoint, | ||
json=json_data, | ||
) as resp: | ||
resp.raise_for_status() | ||
return PublishStats(True, len(json_data), num_filtered_out) | ||
|
||
def count_num_events_in_batch( | ||
self, events_batch: list[events_base_event_pb2.RayEvent] | ||
) -> int: | ||
return len(events_batch) | ||
|
||
async def close(self) -> None: | ||
"""Closes the http session if one was created. Should be called when the publisherClient is no longer required""" | ||
if self._session: | ||
await self._session.close() | ||
self._session = None | ||
|
||
def set_session(self, session) -> None: | ||
sampan-s-nayak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Inject an HTTP client session. Intended for testing. | ||
|
||
If a session is set explicitly, it will be used and managed by close(). | ||
""" | ||
self._session = session |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
# Environment variables for the aggregator agent | ||
from ray._private import ray_constants | ||
|
||
env_var_prefix = "RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISHER" | ||
# timeout for the publisher to publish events to the destination | ||
PUBLISHER_TIMEOUT_SECONDS = ray_constants.env_integer( | ||
f"{env_var_prefix}_TIMEOUT_SECONDS", 5 | ||
) | ||
# maximum number of retries for publishing events to the destination, if less than 0, will retry indefinitely | ||
PUBLISHER_MAX_RETRIES = ray_constants.env_integer(f"{env_var_prefix}_MAX_RETRIES", -1) | ||
# initial backoff time for publishing events to the destination | ||
PUBLISHER_INITIAL_BACKOFF_SECONDS = ray_constants.env_float( | ||
f"{env_var_prefix}_INITIAL_BACKOFF_SECONDS", 0.01 | ||
) | ||
# maximum backoff time for publishing events to the destination | ||
PUBLISHER_MAX_BACKOFF_SECONDS = ray_constants.env_float( | ||
f"{env_var_prefix}_MAX_BACKOFF_SECONDS", 5.0 | ||
) | ||
# jitter ratio for publishing events to the destination | ||
PUBLISHER_JITTER_RATIO = ray_constants.env_float(f"{env_var_prefix}_JITTER_RATIO", 0.1) | ||
# Maximum sleep time between sending batches of events to the destination, should be greater than 0.0 to avoid busy looping | ||
PUBLISHER_MAX_BUFFER_SEND_INTERVAL_SECONDS = ray_constants.env_float( | ||
f"{env_var_prefix}_MAX_BUFFER_SEND_INTERVAL_SECONDS", 0.1 | ||
) |
Uh oh!
There was an error while loading. Please reload this page.