Skip to content

Commit 515d530

Browse files
author
sampan
committed
refactor changes
Signed-off-by: sampan <[email protected]>
1 parent 7a300e0 commit 515d530

File tree

8 files changed

+92
-85
lines changed

8 files changed

+92
-85
lines changed

python/ray/dashboard/modules/aggregator/aggregator_agent.py

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -89,43 +89,43 @@
8989
namespace="ray",
9090
)
9191
events_published_to_http_svc = Counter(
92-
f"{metrics_prefix}_http_events_published_total",
92+
f"{metrics_prefix}_http_publisher_published_events_total",
9393
"Total number of events successfully published to the HTTP service.",
9494
tuple(dashboard_consts.COMPONENT_METRICS_TAG_KEYS),
9595
namespace="ray",
9696
)
9797
events_filtered_out_before_http_svc_publish = Counter(
98-
f"{metrics_prefix}_http_events_filtered_total",
98+
f"{metrics_prefix}_http_publisher_filtered_events_total",
9999
"Total number of events filtered out before publishing to the HTTP service.",
100100
tuple(dashboard_consts.COMPONENT_METRICS_TAG_KEYS),
101101
namespace="ray",
102102
)
103103
events_failed_to_publish_to_http_svc = Counter(
104-
f"{metrics_prefix}_http_publish_failures_total",
104+
f"{metrics_prefix}_http_publisher_failures_total",
105105
"Total number of events that failed to publish to the HTTP service after retries.",
106106
tuple(dashboard_consts.COMPONENT_METRICS_TAG_KEYS),
107107
namespace="ray",
108108
)
109109
events_dropped_in_http_svc_publish_queue = Counter(
110-
f"{metrics_prefix}_http_publish_queue_dropped_events_total",
110+
f"{metrics_prefix}_http_publisher_queue_dropped_events_total",
111111
"Total number of events dropped because the HTTP publish queue was full.",
112112
tuple(dashboard_consts.COMPONENT_METRICS_TAG_KEYS) + ("event_type",),
113113
namespace="ray",
114114
)
115115
http_publish_latency_seconds = Histogram(
116-
f"{metrics_prefix}_http_publish_duration_seconds",
116+
f"{metrics_prefix}_http_publisher_publish_duration_seconds",
117117
"Duration of HTTP publish calls in seconds.",
118118
tuple(dashboard_consts.COMPONENT_METRICS_TAG_KEYS) + ("Outcome",),
119119
namespace="ray",
120120
)
121121
http_failed_attempts_since_last_success = Gauge(
122-
f"{metrics_prefix}_http_publish_consecutive_failures",
122+
f"{metrics_prefix}_http_publisher_consecutive_failures_since_last_success",
123123
"Number of consecutive failed publish attempts since the last success.",
124124
tuple(dashboard_consts.COMPONENT_METRICS_TAG_KEYS),
125125
namespace="ray",
126126
)
127127
http_time_since_last_success_seconds = Gauge(
128-
f"{metrics_prefix}_http_time_since_last_success_seconds",
128+
f"{metrics_prefix}_http_publisher_time_since_last_success_seconds",
129129
"Seconds since the last successful publish to the HTTP service.",
130130
tuple(dashboard_consts.COMPONENT_METRICS_TAG_KEYS),
131131
namespace="ray",
@@ -174,7 +174,7 @@ def __init__(self, dashboard_agent) -> None:
174174
f"Publishing events to external HTTP service is enabled. events_export_addr: {self._events_export_addr}"
175175
)
176176
self._event_processing_enabled = True
177-
self._HttpEndpointPublisher = RayEventsPublisher(
177+
self._http_endpoint_publisher = RayEventsPublisher(
178178
name="http-endpoint-publisher",
179179
publish_client=AsyncHttpPublisherClient(
180180
endpoint=self._events_export_addr,
@@ -187,7 +187,7 @@ def __init__(self, dashboard_agent) -> None:
187187
logger.info(
188188
f"Event HTTP target is not enabled or publishing events to external HTTP service is disabled. Skipping sending events to external HTTP service. events_export_addr: {self._events_export_addr}"
189189
)
190-
self._HttpEndpointPublisher = NoopPublisher()
190+
self._http_endpoint_publisher = NoopPublisher()
191191

192192
async def AddEvents(self, request, context) -> None:
193193
"""
@@ -245,11 +245,11 @@ async def _update_metrics(self) -> None:
245245
}
246246

247247
http_endpoint_publisher_metrics = await (
248-
self._HttpEndpointPublisher.get_and_reset_metrics()
248+
self._http_endpoint_publisher.get_and_reset_metrics()
249249
)
250250

251+
# Aggregator agent metrics
251252
async with self._lock:
252-
# Aggregator agent metrics
253253
_events_received = self._events_received_since_last_metrics_update
254254
_events_failed_to_add_to_aggregator = (
255255
self._events_failed_to_add_to_aggregator_since_last_metrics_update
@@ -258,31 +258,31 @@ async def _update_metrics(self) -> None:
258258
self._events_received_since_last_metrics_update = 0
259259
self._events_failed_to_add_to_aggregator_since_last_metrics_update = 0
260260

261-
# HTTP service publisher metrics
262-
_events_published_to_http_svc = http_endpoint_publisher_metrics.get(
263-
"published", 0
264-
)
265-
_events_filtered_out_before_http_svc_publish = (
266-
http_endpoint_publisher_metrics.get("filtered_out", 0)
267-
)
268-
_events_failed_to_publish_to_http_svc = http_endpoint_publisher_metrics.get(
269-
"failed", 0
270-
)
271-
_events_dropped_in_http_publish_queue_by_type = (
272-
http_endpoint_publisher_metrics.get("dropped_events", {})
273-
)
274-
_http_publish_latency_success_samples = http_endpoint_publisher_metrics.get(
275-
"success_latency_seconds", []
276-
)
277-
_http_publish_latency_failure_samples = http_endpoint_publisher_metrics.get(
278-
"failure_latency_seconds", []
279-
)
280-
_failed_attempts_since_last_success = http_endpoint_publisher_metrics.get(
281-
"failed_attempts_since_last_success", 0
282-
)
283-
_time_since_last_success_seconds = http_endpoint_publisher_metrics.get(
284-
"time_since_last_success_seconds", None
285-
)
261+
# HTTP service publisher metrics
262+
_events_published_to_http_svc = http_endpoint_publisher_metrics.get(
263+
"published", 0
264+
)
265+
_events_filtered_out_before_http_svc_publish = (
266+
http_endpoint_publisher_metrics.get("filtered_out", 0)
267+
)
268+
_events_failed_to_publish_to_http_svc = http_endpoint_publisher_metrics.get(
269+
"failed", 0
270+
)
271+
_events_dropped_in_http_publish_queue_by_type = (
272+
http_endpoint_publisher_metrics.get("dropped_events", {})
273+
)
274+
_http_publish_latency_success_samples = http_endpoint_publisher_metrics.get(
275+
"success_latency_seconds", []
276+
)
277+
_http_publish_latency_failure_samples = http_endpoint_publisher_metrics.get(
278+
"failure_latency_seconds", []
279+
)
280+
_failed_attempts_since_last_success = http_endpoint_publisher_metrics.get(
281+
"failed_attempts_since_last_success", 0
282+
)
283+
_time_since_last_success_seconds = http_endpoint_publisher_metrics.get(
284+
"time_since_last_success_seconds", None
285+
)
286286

287287
events_received.labels(**common_labels).inc(_events_received)
288288
events_failed_to_add_to_aggregator.labels(**common_labels).inc(
@@ -328,7 +328,7 @@ async def run(self, server) -> None:
328328
)
329329

330330
await asyncio.gather(
331-
self._HttpEndpointPublisher.run_forever(),
331+
self._http_endpoint_publisher.run_forever(),
332332
self._update_metrics(),
333333
)
334334

python/ray/dashboard/modules/aggregator/multi_consumer_event_buffer.py

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,19 @@
1313

1414
@dataclass
1515
class _ConsumerState:
16-
# index of the next event to be consumed by this consumer
16+
# Index of the next event to be consumed by this consumer
1717
cursor_index: int
18-
# map of event type to the number of events evicted for this consumer since last metric update
18+
# Map of event type to the number of events evicted for this consumer since last metric update
1919
evicted_events_count: Dict[str, int]
20-
# event to signal that there are new events to consume
20+
# Condition variable to signal that there are new events to consume
2121
has_new_events_to_consume: asyncio.Event
2222

2323

2424
class MultiConsumerEventBuffer:
2525
"""A buffer which allows adding one event at a time and consuming events in batches.
2626
Supports multiple consumers, each with their own cursor index. Tracks the number of events evicted for each consumer.
2727
28-
Buffer is not thread-safe but is asyncio-friendly. All operations must be called from the same event loop.
28+
Buffer is not thread-safe but is asyncio-friendly. All operations must be called from within the same event loop.
2929
"""
3030

3131
def __init__(self, max_size: int, max_batch_size: int):
@@ -48,20 +48,20 @@ async def add_event(self, event: events_base_event_pb2.RayEvent):
4848
self._buffer.append(event)
4949

5050
for _, consumer_state in self._consumers.items():
51-
# update consumer cursor index and evicted events count if the event was dropped
51+
# Update consumer cursor index and evicted events count if the event was dropped
5252
if dropped_event is not None:
5353
if consumer_state.cursor_index == 0:
54-
# the dropped event was the next event this consumer would have consumed
54+
# The dropped event was the next event this consumer would have consumed, update the evicted events count
5555
event_type_name = RayEvent.EventType.Name(
5656
dropped_event.event_type
5757
)
5858
if event_type_name not in consumer_state.evicted_events_count:
5959
consumer_state.evicted_events_count[event_type_name] = 0
6060
consumer_state.evicted_events_count[event_type_name] += 1
6161
else:
62-
# the dropped event was before the consumer's current position, so adjust cursor
62+
# The dropped event was already consumed by the consumer, so we need to adjust the cursor
6363
consumer_state.cursor_index -= 1
64-
# signal that there are new events to consume
64+
# Signal all consumers that there are new events to consume
6565
consumer_state.has_new_events_to_consume.set()
6666

6767
async def wait_for_batch(
@@ -81,28 +81,26 @@ async def wait_for_batch(
8181
timeout_seconds: maximum time to wait for a batch
8282
"""
8383
max_batch = self._max_batch_size
84-
consumer_state = None
85-
has_events_to_consume = None
8684
async with self._lock:
8785
consumer_state = self._consumers.get(consumer_id)
8886
if consumer_state is None:
8987
raise KeyError(f"unknown consumer '{consumer_id}'")
9088
has_events_to_consume = consumer_state.has_new_events_to_consume
9189

92-
# phase 1: read the first event, we wait indefinitely until there is at least one event to consume
93-
# we wait inside a loop to deal with spurious wakeups.
90+
# Phase 1: read the first event, wait indefinitely until there is at least one event to consume
91+
# Wait inside a loop to deal with spurious wakeups.
9492
while True:
95-
# we wait outside the lock to avoid deadlocks
93+
# Wait outside the lock to avoid deadlocks
9694
await has_events_to_consume.wait()
9795
async with self._lock:
9896
if consumer_state.cursor_index < len(self._buffer):
99-
# add the first event to the batch
97+
# Add the first event to the batch
10098
event = self._buffer[consumer_state.cursor_index]
10199
consumer_state.cursor_index += 1
102100
batch = [event]
103101
break
104102

105-
# there is no new events to consume, clear the condition variable and wait for it to be set again
103+
# There are no new events to consume, clear the condition variable and wait for it to be set again
106104
has_events_to_consume.clear()
107105

108106
# Phase 2: add items to the batch up to timeout or until full
@@ -113,7 +111,7 @@ async def wait_for_batch(
113111
break
114112

115113
async with self._lock:
116-
# drain whatever is available
114+
# Drain whatever is available
117115
while len(batch) < max_batch and consumer_state.cursor_index < len(
118116
self._buffer
119117
):
@@ -123,12 +121,12 @@ async def wait_for_batch(
123121
if len(batch) >= max_batch:
124122
break
125123

126-
# there is still room in the batch, but no new events to consume, clear the condition variable and wait for it to be set again
124+
# There is still room in the batch, but no new events to consume, clear the condition variable and wait for it to be set again
127125
has_events_to_consume.clear()
128126
try:
129127
await asyncio.wait_for(has_events_to_consume.wait(), remaining)
130128
except asyncio.TimeoutError:
131-
# timeout, we return the batch as is
129+
# Timeout, return the current batch
132130
break
133131

134132
return batch
@@ -137,7 +135,7 @@ async def register_consumer(self) -> str:
137135
"""Register a new consumer.
138136
139137
Returns:
140-
id of the consumer
138+
Id of the consumer, used to identify the consumer in other methods.
141139
"""
142140
async with self._lock:
143141
consumer_id = str(uuid.uuid4())
@@ -149,7 +147,7 @@ async def register_consumer(self) -> str:
149147
return consumer_id
150148

151149
async def size(self) -> int:
152-
"""Get the number of events in the buffer."""
150+
"""Get total number of events in the buffer. Does not take consumer cursors into account."""
153151
async with self._lock:
154152
return len(self._buffer)
155153

python/ray/dashboard/modules/aggregator/publisher/async_publisher_client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,15 @@ async def publish(
6666
self, events_batch: list[events_base_event_pb2.RayEvent]
6767
) -> PublishStats:
6868
if not events_batch:
69+
# Nothing to publish -> success but nothing published
6970
return PublishStats(True, 0, 0)
7071
filtered = [e for e in events_batch if self._events_filter_fn(e)]
7172
num_filtered_out = len(events_batch) - len(filtered)
7273
if not filtered:
7374
# All filtered out -> success but nothing published
7475
return PublishStats(True, 0, num_filtered_out)
7576

76-
# Convert protobuf objects to python dictionaries for HTTP POST
77+
# Convert protobuf objects to python dictionaries for HTTP POST. Run in executor to avoid blocking the event loop.
7778
filtered_json = await get_or_create_event_loop().run_in_executor(
7879
self._executor,
7980
lambda: [

python/ray/dashboard/modules/aggregator/publisher/configs.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
1-
# Environment variables for the aggregator agent
1+
# Environment variables for the aggregator agent publisher component.
22
from ray._private import ray_constants
33

44
env_var_prefix = "RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISHER"
5-
# timeout for the publisher to publish events to the destination
5+
# Timeout for the publisher to publish events to the destination
66
PUBLISHER_TIMEOUT_SECONDS = ray_constants.env_integer(
7-
f"{env_var_prefix}_TIMEOUT_SECONDS", 5
7+
f"{env_var_prefix}_TIMEOUT_SECONDS", 3
88
)
9-
# maximum number of retries for publishing events to the destination, if less than 0, will retry indefinitely
9+
# Maximum number of retries for publishing events to the destination, if less than 0, will retry indefinitely
1010
PUBLISHER_MAX_RETRIES = ray_constants.env_integer(f"{env_var_prefix}_MAX_RETRIES", -1)
11-
# initial backoff time for publishing events to the destination
11+
# Initial backoff time for publishing events to the destination
1212
PUBLISHER_INITIAL_BACKOFF_SECONDS = ray_constants.env_float(
1313
f"{env_var_prefix}_INITIAL_BACKOFF_SECONDS", 0.01
1414
)
15-
# maximum backoff time for publishing events to the destination
15+
# Maximum backoff time for publishing events to the destination
1616
PUBLISHER_MAX_BACKOFF_SECONDS = ray_constants.env_float(
1717
f"{env_var_prefix}_MAX_BACKOFF_SECONDS", 5.0
1818
)
19-
# jitter ratio for publishing events to the destination
19+
# Jitter ratio for publishing events to the destination
2020
PUBLISHER_JITTER_RATIO = ray_constants.env_float(f"{env_var_prefix}_JITTER_RATIO", 0.1)
2121
# Maximum sleep time between sending batches of events to the destination, should be greater than 0.0 to avoid busy looping
2222
PUBLISHER_MAX_BUFFER_SEND_INTERVAL_SECONDS = ray_constants.env_float(

python/ray/dashboard/modules/aggregator/publisher/ray_event_publisher.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,9 @@ async def wait_until_running(self, timeout: Optional[float] = None) -> bool:
4141

4242

4343
class RayEventsPublisher(RayEventsPublisherInterface):
44-
"""RayEvents publisher that publishes batches of events to a destination using a dedicated async worker.
44+
"""RayEvents publisher that publishes batches of events to a destination by running a worker loop.
4545
46-
The publisher is single-threaded and uses a queue to store batches of events.
47-
The worker loop continuously pulls batches from the queue and publishes them.
46+
The worker loop continuously pulls batches from the event buffer and publishes them to the destination.
4847
"""
4948

5049
def __init__(
@@ -78,7 +77,6 @@ def __init__(
7877
self._event_buffer_consumer_id = None
7978

8079
# Internal metrics (since last get_and_reset_metrics call)
81-
# using thread lock as non publisher threads can also call get_and_reset_metrics
8280
self._metrics_lock = asyncio.Lock()
8381
self._metric_events_published_since_last: int = 0
8482
self._metric_events_filtered_out_since_last: int = 0
@@ -110,17 +108,27 @@ async def run_forever(self) -> None:
110108
await self._async_publish_with_retries(batch)
111109
except asyncio.CancelledError:
112110
logger.info(f"Publisher {self._name} cancelled, shutting down gracefully")
111+
self._started_event.clear()
113112
await self._publish_client.close()
114113
raise
115114
except Exception as e:
116115
logger.error(f"Publisher {self._name} encountered error: {e}")
116+
self._started_event.clear()
117117
await self._publish_client.close()
118118
raise
119119

120120
async def get_and_reset_metrics(self) -> Dict[str, int]:
121121
"""Return a snapshot of internal metrics since last call and reset them.
122122
123-
Returns a dict with keys: 'published', 'filtered_out', 'failed', 'queue_dropped'.
123+
Returns a dict with the following keys:
124+
published: Number of events successfully published since last call
125+
filtered_out: Number of events filtered out before publishing since last call
126+
failed: Number of events that failed to publish since last call
127+
success_latency_seconds: List of publish latencies for successful attempts since last call
128+
failure_latency_seconds: List of publish latencies for failed attempts since last call
129+
failed_attempts_since_last_success: Number of consecutive failed publish attempts since last successful publish
130+
time_since_last_success_seconds: Time elapsed since last successful publish, or None if never succeeded
131+
dropped_events: Dict mapping event types to counts of events dropped from buffer
124132
"""
125133
async with self._metrics_lock:
126134
if self._metric_last_publish_success_timestamp is None:

python/ray/dashboard/modules/aggregator/tests/test_aggregator_agent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -880,7 +880,7 @@ def test_aggregator_agent_receive_driver_job_execution_event(
880880
],
881881
indirect=True,
882882
)
883-
def test_aggregator_agent_publish_disabled_does_not_send_http(
883+
def test_aggregator_agent_http_svc_publish_disabled(
884884
ray_start_cluster_head_with_env_vars, httpserver, fake_timestamp
885885
):
886886
cluster = ray_start_cluster_head_with_env_vars

python/ray/dashboard/modules/aggregator/tests/test_multi_consumer_event_buffer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ async def test_add_event_buffer_overflow(self):
8383
assert evicted_events_count[first_event_type_name] == 1
8484

8585
@pytest.mark.asyncio
86-
async def test_wait_for_batch_multiple_events_immediate(self):
87-
"""Test waiting for batch when multiple events are immediately available."""
86+
async def test_wait_for_batch_multiple_events(self):
87+
"""Test waiting for batch when multiple events are immediately available and when when not all events are available."""
8888
buffer = MultiConsumerEventBuffer(max_size=10, max_batch_size=3)
8989
consumer_id = await buffer.register_consumer()
9090

0 commit comments

Comments
 (0)