Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
437 changes: 165 additions & 272 deletions python/ray/dashboard/modules/aggregator/aggregator_agent.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
from collections import deque
import asyncio
import time
from typing import Dict, List
from dataclasses import dataclass
import uuid

from ray.core.generated import (
events_base_event_pb2,
)
from ray.core.generated.events_base_event_pb2 import RayEvent


@dataclass
class _ConsumerState:
# index of the next event to be consumed by this consumer
cursor_index: int
# map of event type to the number of events evicted for this consumer since last metric update
evicted_events_count: Dict[str, int]
# event to signal that there are new events to consume
has_new_events_to_consume: asyncio.Event


class MultiConsumerEventBuffer:
"""A buffer which allows adding one event at a time and consuming events in batches.
Supports multiple consumers, each with their own cursor index. Tracks the number of events evicted for each consumer.

Buffer is not thread-safe but is asyncio-friendly. All operations must be called from the same event loop.
"""

def __init__(self, max_size: int, max_batch_size: int):
self._buffer = deque(maxlen=max_size)
self._max_size = max_size
self._lock = asyncio.Lock()
self._consumers: Dict[str, _ConsumerState] = {}

self._max_batch_size = max_batch_size

async def add_event(self, event: events_base_event_pb2.RayEvent):
"""Add an event to the buffer.

If the buffer is full, the oldest event is dropped.
"""
async with self._lock:
dropped_event = None
if len(self._buffer) >= self._max_size:
dropped_event = self._buffer.popleft()
self._buffer.append(event)

for _, consumer_state in self._consumers.items():
# update consumer cursor index and evicted events count if the event was dropped
if dropped_event is not None:
if consumer_state.cursor_index == 0:
# the dropped event was the next event this consumer would have consumed
event_type_name = RayEvent.EventType.Name(
dropped_event.event_type
)
if event_type_name not in consumer_state.evicted_events_count:
consumer_state.evicted_events_count[event_type_name] = 0
consumer_state.evicted_events_count[event_type_name] += 1
else:
# the dropped event was before the consumer's current position, so adjust cursor
consumer_state.cursor_index -= 1
# signal that there are new events to consume
consumer_state.has_new_events_to_consume.set()

async def wait_for_batch(
self, consumer_id: str, timeout_seconds: float = 1.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this API is a bit unusual; I would expect the timeout to be global rather than timeout after the first event received

Copy link
Contributor Author

@sampan-s-nayak sampan-s-nayak Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with this design so that I can avoid using an explicit sleep in the publisher loop.

the loop we have today:

while True:
      batch = await self._event_buffer.wait_for_batch(
          self._event_buffer_consumer_id,
          PUBLISHER_MAX_BUFFER_SEND_INTERVAL_SECONDS,
      )
      await self._async_publish_with_retries(batch)

because of the way wait_for_batch is defined, if there are no events to form batches then the loop will be waiting and other async tasks can run during this time. if we used a global timeout then we would either want to check if batch is empty and retry wait_for_batch or use an asynio.sleep(x) everytime we receive an empty batch. both feel inefficient to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How long is PUBLISHER_MAX_BUFFER_SEND_INTERVAL_SECONDS? Unless that value is in milliseconds, it might already serve as the effective sleep interval in the while loop, meaning you don’t need an extra asyncio.sleep(x) between iterations?

In the Phase 2 implementation of wait_for_batch, the logic is nearly the same as Phase 1, except that it uses await asyncio.wait_for(has_events_to_consume.wait(), remaining) instead of await has_events_to_consume.wait(). The difference is essentially waiting forever (Phase 1) versus waiting with a timeout (every few seconds) (Phase 2), at the cost of looping back into the while loop—which is negligible (compared to the already seconds-long waits in Phase 2) that let other async tasks run?

Your implementation is certainly more efficient in theory, but I wonder if we can keep it simpler while achieving similar efficiency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PUBLISHER_MAX_BUFFER_SEND_INTERVAL_SECONDS: default is 0.1 seconds

as you said when there are items in the buffer both the approach you are proposing and my implementation are equivalent. the only advantage is when the buffer is empty, instead of busy waiting the API I defined blocks until a new event shows up.

if we want to simplify this we can expose another API called wait_for_events_to_consume(consumer_id) and call that first before calling wait_for_batch() in the worker loop. but in the future if we have more than one publisher worker, we might end up getting notified that there are events to consume but when we check there might be no events left to consume(spurious wakeup). This may still be ok, but I feel the worker loop logic will end up becoming a little complex.

I think I am ok with either approach, cant really think of any strong reasons to pick one over the other. let me know your preference

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got you, if PUBLISHER_MAX_BUFFER_SEND_INTERVAL_SECONDS is that frequent I think your design makes sense.

Unrelated question, is 0.1s is also how frequent we are pushing to the GCS + external HttpServer, that seems a very high QPS for GCS or the external HttpServer to handle and process data.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0.1s is indeed very frequent pushing. what's the reason for tuning it so low?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was present even before my change (its controlled by an env variable) I am not aware about the reason behind this, should I increase it to 0.5/1 second instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, the current reporting interval to GCS is 1s (https://github.com/ray-project/ray/blob/master/src/ray/common/ray_config_def.h#L457
). Let’s keep the same value here; 1s should also be sufficient for the external HTTP server. Personally, I think 1s is still a bit too frequent, but let’s stick with what we’ve had so far.”

CC: @MengjinYan for context about this value.

) -> List[events_base_event_pb2.RayEvent]:
"""Wait for batch respecting self.max_batch_size and timeout_seconds.

Returns a batch of up to self.max_batch_size items. Waits for up to
timeout_seconds after receiving the first request that will be in
the next batch. After the timeout, returns as many items as are ready.

Always returns a batch with at least one item - will block
indefinitely until an item comes in.

Arguments:
consumer_id: id of the consumer consuming the batch
timeout_seconds: maximum time to wait for a batch
"""
max_batch = self._max_batch_size
consumer_state = None
has_events_to_consume = None
async with self._lock:
consumer_state = self._consumers.get(consumer_id)
if consumer_state is None:
raise KeyError(f"unknown consumer '{consumer_id}'")
has_events_to_consume = consumer_state.has_new_events_to_consume

# phase 1: read the first event, we wait indefinitely until there is at least one event to consume
# we wait inside a loop to deal with spurious wakeups.
while True:
# we wait outside the lock to avoid deadlocks
await has_events_to_consume.wait()
async with self._lock:
if consumer_state.cursor_index < len(self._buffer):
# add the first event to the batch
event = self._buffer[consumer_state.cursor_index]
consumer_state.cursor_index += 1
batch = [event]
break

# there is no new events to consume, clear the condition variable and wait for it to be set again
has_events_to_consume.clear()

# Phase 2: add items to the batch up to timeout or until full
deadline = time.monotonic() + max(0.0, float(timeout_seconds))
while len(batch) < max_batch:
remaining = deadline - time.monotonic()
if remaining <= 0:
break

async with self._lock:
# drain whatever is available
while len(batch) < max_batch and consumer_state.cursor_index < len(
self._buffer
):
batch.append(self._buffer[consumer_state.cursor_index])
consumer_state.cursor_index += 1

if len(batch) >= max_batch:
break

# there is still room in the batch, but no new events to consume, clear the condition variable and wait for it to be set again
has_events_to_consume.clear()
try:
await asyncio.wait_for(has_events_to_consume.wait(), remaining)
except asyncio.TimeoutError:
# timeout, we return the batch as is
break

return batch

async def register_consumer(self) -> str:
"""Register a new consumer.

Returns:
id of the consumer
"""
async with self._lock:
consumer_id = str(uuid.uuid4())
self._consumers[consumer_id] = _ConsumerState(
cursor_index=0,
evicted_events_count={},
has_new_events_to_consume=asyncio.Event(),
)
return consumer_id

async def size(self) -> int:
"""Get the number of events in the buffer."""
async with self._lock:
return len(self._buffer)

async def get_and_reset_evicted_events_count(
self, consumer_id: str
) -> Dict[str, int]:
"""Get the number of events evicted for a consumer. and reset the evicted events count."""
async with self._lock:
consumer_state = self._consumers.get(consumer_id)
if consumer_state is None:
raise KeyError(f"unknown consumer '{consumer_id}'")
evicted_events_count = consumer_state.evicted_events_count
consumer_state.evicted_events_count = {}
return evicted_events_count
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from concurrent.futures import ThreadPoolExecutor
import json
import logging

from ray._common.utils import get_or_create_event_loop
import aiohttp
from ray._private.protobuf_compat import message_to_json
from ray.core.generated import events_base_event_pb2
from ray.dashboard.modules.aggregator.publisher.configs import PUBLISHER_TIMEOUT_SECONDS
from typing import Callable
from dataclasses import dataclass
from abc import ABC, abstractmethod

logger = logging.getLogger(__name__)


@dataclass
class PublishStats:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, can we log metrics directly inside the publisher vs. bubble up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""Data class that represents stats of publishing a batch of events."""

is_publish_successful: bool
num_events_published: int
num_events_filtered_out: int


class PublisherClientInterface(ABC):
"""Abstract interface for publishing Ray event batches to external destinations.

Implementations should handle the actual publishing logic, filtering,
and format conversion appropriate for their specific destination type.
"""

@abstractmethod
async def publish(self, batch) -> PublishStats:
"""Publish a batch of events to the destination."""
pass

@abstractmethod
def count_num_events_in_batch(self, batch) -> int:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: count_num_events_in_batch -> batch_size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this pr the batch is just a list of ray events. but in the next pr where we add GCS publisher the batch will be a tuple of droppedTaskAttempts and list of events. this was why I made the name a bit more explicit

"""Count the number of events in a given batch."""
pass

@abstractmethod
async def close(self) -> None:
"""Clean up any resources used by this client. Should be called when the publisherClient is no longer required"""
pass


class AsyncHttpPublisherClient(PublisherClientInterface):
"""Client for publishing ray event batches to an external HTTP service."""

def __init__(
self,
endpoint: str,
executor: ThreadPoolExecutor,
events_filter_fn: Callable[[object], bool],
timeout: float = PUBLISHER_TIMEOUT_SECONDS,
) -> None:
self._endpoint = endpoint
self._executor = executor
self._events_filter_fn = events_filter_fn
self._timeout = aiohttp.ClientTimeout(total=timeout)
self._session = None

async def publish(
self, events_batch: list[events_base_event_pb2.RayEvent]
) -> PublishStats:
if not events_batch:
return PublishStats(True, 0, 0)
filtered = [e for e in events_batch if self._events_filter_fn(e)]
num_filtered_out = len(events_batch) - len(filtered)
if not filtered:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe move the filtered inside the consumer info of multi_consumer_event_buffer so we can fetch more meaningful batch here; maybe a TODO, not in this PR

# All filtered out -> success but nothing published
return PublishStats(True, 0, num_filtered_out)

# Convert protobuf objects to python dictionaries for HTTP POST
filtered_json = await get_or_create_event_loop().run_in_executor(
self._executor,
lambda: [
json.loads(
message_to_json(e, always_print_fields_with_no_presence=True)
)
for e in filtered
],
)

try:
# Create session on first use (lazy initialization)
if not self._session:
self._session = aiohttp.ClientSession(timeout=self._timeout)

return await self._send_http_request(filtered_json, num_filtered_out)
except Exception as e:
logger.error("Failed to send events to external service. Error: %s", e)
return PublishStats(False, 0, 0)

async def _send_http_request(self, json_data, num_filtered_out):
async with self._session.post(
self._endpoint,
json=json_data,
) as resp:
resp.raise_for_status()
return PublishStats(True, len(json_data), num_filtered_out)

def count_num_events_in_batch(
self, events_batch: list[events_base_event_pb2.RayEvent]
) -> int:
return len(events_batch)

async def close(self) -> None:
"""Closes the http session if one was created. Should be called when the publisherClient is no longer required"""
if self._session:
await self._session.close()
self._session = None

def set_session(self, session) -> None:
"""Inject an HTTP client session. Intended for testing.

If a session is set explicitly, it will be used and managed by close().
"""
self._session = session
24 changes: 24 additions & 0 deletions python/ray/dashboard/modules/aggregator/publisher/configs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Environment variables for the aggregator agent
from ray._private import ray_constants

env_var_prefix = "RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISHER"
# timeout for the publisher to publish events to the destination
PUBLISHER_TIMEOUT_SECONDS = ray_constants.env_integer(
f"{env_var_prefix}_TIMEOUT_SECONDS", 5
)
# maximum number of retries for publishing events to the destination, if less than 0, will retry indefinitely
PUBLISHER_MAX_RETRIES = ray_constants.env_integer(f"{env_var_prefix}_MAX_RETRIES", -1)
# initial backoff time for publishing events to the destination
PUBLISHER_INITIAL_BACKOFF_SECONDS = ray_constants.env_float(
f"{env_var_prefix}_INITIAL_BACKOFF_SECONDS", 0.01
)
# maximum backoff time for publishing events to the destination
PUBLISHER_MAX_BACKOFF_SECONDS = ray_constants.env_float(
f"{env_var_prefix}_MAX_BACKOFF_SECONDS", 5.0
)
# jitter ratio for publishing events to the destination
PUBLISHER_JITTER_RATIO = ray_constants.env_float(f"{env_var_prefix}_JITTER_RATIO", 0.1)
# Maximum sleep time between sending batches of events to the destination, should be greater than 0.0 to avoid busy looping
PUBLISHER_MAX_BUFFER_SEND_INTERVAL_SECONDS = ray_constants.env_float(
f"{env_var_prefix}_MAX_BUFFER_SEND_INTERVAL_SECONDS", 0.1
)
Loading