Skip to content

Commit 465a581

Browse files
committed
feat(server): add startup and warmup time metrics to stats and Prometheus
This commit adds the server startup time and pipeline warmup duration to the `/stats` API response and Prometheus metrics. It also refactors the code to avoid using async operations on class properties for improved clarity.
1 parent 0c564ba commit 465a581

File tree

7 files changed

+465
-95
lines changed

7 files changed

+465
-95
lines changed

server/app.py

Lines changed: 82 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging
55
import os
66
import sys
7+
import time
78

89
import torch
910

@@ -24,9 +25,8 @@
2425
from aiortc.rtcrtpsender import RTCRtpSender
2526
from pipeline import Pipeline
2627
from twilio.rest import Client
27-
from utils import patch_loop_datagram, add_prefix_to_app_routes, FPSMeter
28-
from metrics import MetricsManager, StreamStatsManager
29-
import time
28+
from utils import patch_loop_datagram, add_prefix_to_app_routes
29+
from metrics import MetricsManager, StreamStatsManager, StreamStats
3030

3131
logger = logging.getLogger(__name__)
3232
logging.getLogger("aiortc.rtcrtpsender").setLevel(logging.WARNING)
@@ -38,7 +38,7 @@
3838

3939

4040
class VideoStreamTrack(MediaStreamTrack):
41-
"""video stream track that processes video frames using a pipeline.
41+
"""Video stream track that processes video frames using a pipeline.
4242
4343
Attributes:
4444
kind (str): The kind of media, which is "video" for this class.
@@ -54,17 +54,21 @@ def __init__(self, track: MediaStreamTrack, pipeline: Pipeline):
5454
Args:
5555
track: The underlying media stream track.
5656
pipeline: The processing pipeline to apply to each video frame.
57+
stats: The stream statistics.
5758
"""
59+
self._start_time = time.monotonic()
5860
super().__init__()
5961
self.track = track
6062
self.pipeline = pipeline
61-
self.fps_meter = FPSMeter(
62-
metrics_manager=app["metrics_manager"], track_id=track.id
63+
self.stats = StreamStats(
64+
track=track,
65+
metrics_manager=app.get("metrics_manager", None),
6366
)
64-
self.running = True
65-
self.collect_task = asyncio.create_task(self.collect_frames())
66-
67-
# Add cleanup when track ends
67+
self._running = True
68+
69+
asyncio.create_task(self.collect_frames())
70+
71+
# Add cleanup when track ends.
6872
@track.on("ended")
6973
async def on_ended():
7074
logger.info("Source video track ended, stopping collection")
@@ -75,7 +79,7 @@ async def collect_frames(self):
7579
the processing pipeline. Stops when track ends or connection closes.
7680
"""
7781
try:
78-
while self.running:
82+
while self._running:
7983
try:
8084
frame = await self.track.recv()
8185
await self.pipeline.put_video_frame(frame)
@@ -87,9 +91,9 @@ async def collect_frames(self):
8791
logger.info("Media stream ended")
8892
else:
8993
logger.error(f"Error collecting video frames: {str(e)}")
90-
self.running = False
94+
self._running = False
9195
break
92-
96+
9397
# Perform cleanup outside the exception handler
9498
logger.info("Video frame collection stopped")
9599
except asyncio.CancelledError:
@@ -100,28 +104,57 @@ async def collect_frames(self):
100104
await self.pipeline.cleanup()
101105

102106
async def recv(self):
103-
"""Receive a processed video frame from the pipeline, increment the frame
104-
count for FPS calculation and return the processed frame to the client.
107+
"""Receive a processed video frame from the pipeline and return it to the
108+
client, while collecting statistics about the stream.
105109
"""
110+
if self.stats.startup_time is None:
111+
self.stats.start_timestamp = time.monotonic()
112+
self.stats.startup_time = self.stats.start_timestamp - self._start_time
113+
self.stats.pipeline.video_warmup_time = (
114+
self.pipeline.stats.video_warmup_time
115+
)
116+
106117
processed_frame = await self.pipeline.get_processed_video_frame()
107118

108119
# Increment the frame count to calculate FPS.
109-
await self.fps_meter.increment_frame_count()
120+
await self.stats.fps_meter.increment_frame_count()
110121

111122
return processed_frame
112123

113124

114125
class AudioStreamTrack(MediaStreamTrack):
126+
"""Audio stream track that processes audio frames using a pipeline.
127+
128+
Attributes:
129+
kind (str): The kind of media, which is "audio" for this class.
130+
track (MediaStreamTrack): The underlying media stream track.
131+
pipeline (Pipeline): The processing pipeline to apply to each audio frame.
132+
"""
133+
115134
kind = "audio"
116135

117136
def __init__(self, track: MediaStreamTrack, pipeline):
137+
"""Initialize the AudioStreamTrack.
138+
139+
Args:
140+
track: The underlying media stream track.
141+
pipeline: The processing pipeline to apply to each audio frame.
142+
stats: The stream statistics.
143+
"""
144+
self._start_time = time.monotonic()
118145
super().__init__()
119146
self.track = track
120147
self.pipeline = pipeline
121-
self.running = True
122-
self.collect_task = asyncio.create_task(self.collect_frames())
123-
124-
# Add cleanup when track ends
148+
self.stats = StreamStats(
149+
track_id=track.id,
150+
track_type="audio",
151+
metrics_manager=app.get("metrics_manager", None),
152+
)
153+
self._running = True
154+
155+
asyncio.create_task(self.collect_frames())
156+
157+
# Add cleanup when track ends.
125158
@track.on("ended")
126159
async def on_ended():
127160
logger.info("Source audio track ended, stopping collection")
@@ -132,7 +165,7 @@ async def collect_frames(self):
132165
the processing pipeline. Stops when track ends or connection closes.
133166
"""
134167
try:
135-
while self.running:
168+
while self._running:
136169
try:
137170
frame = await self.track.recv()
138171
await self.pipeline.put_audio_frame(frame)
@@ -144,9 +177,9 @@ async def collect_frames(self):
144177
logger.info("Media stream ended")
145178
else:
146179
logger.error(f"Error collecting audio frames: {str(e)}")
147-
self.running = False
180+
self._running = False
148181
break
149-
182+
150183
# Perform cleanup outside the exception handler
151184
logger.info("Audio frame collection stopped")
152185
except asyncio.CancelledError:
@@ -157,7 +190,22 @@ async def collect_frames(self):
157190
await self.pipeline.cleanup()
158191

159192
async def recv(self):
160-
return await self.pipeline.get_processed_audio_frame()
193+
"""Receive a processed audio frame from the pipeline and return it to the
194+
client, while collecting statistics about the stream.
195+
"""
196+
if self.stats.startup_time is None:
197+
self.stats.start_timestamp = time.monotonic()
198+
self.stats.startup_time = self.stats.start_timestamp - self._start_time
199+
self.stats.pipeline.audio_warmup_time = (
200+
self.pipeline.stats.audio_warmup_time
201+
)
202+
203+
processed_frame = await self.pipeline.get_processed_audio_frame()
204+
205+
# Increment the frame count to calculate FPS.
206+
await self.stats.fps_meter.increment_frame_count()
207+
208+
return processed_frame
161209

162210

163211
def force_codec(pc, sender, forced_codec):
@@ -286,10 +334,15 @@ def on_track(track):
286334
tracks["audio"] = audioTrack
287335
pc.addTrack(audioTrack)
288336

337+
# Store audio track in app for stats.
338+
stream_id = track.id
339+
request.app["audio_tracks"][stream_id] = audioTrack
340+
289341
@track.on("ended")
290342
async def on_ended():
291343
logger.info(f"{track.kind} track ended")
292344
request.app["video_tracks"].pop(track.id, None)
345+
request.app["audio_tracks"].pop(track.id, None)
293346

294347
@pc.on("connectionstatechange")
295348
async def on_connectionstatechange():
@@ -318,6 +371,7 @@ async def on_connectionstatechange():
318371
),
319372
)
320373

374+
321375
async def cancel_collect_frames(track):
322376
track.running = False
323377
if hasattr(track, 'collect_task') is not None and not track.collect_task.done():
@@ -327,6 +381,7 @@ async def cancel_collect_frames(track):
327381
except (asyncio.CancelledError):
328382
pass
329383

384+
330385
async def set_prompt(request):
331386
pipeline = request.app["pipeline"]
332387

@@ -349,6 +404,7 @@ async def on_startup(app: web.Application):
349404
)
350405
app["pcs"] = set()
351406
app["video_tracks"] = {}
407+
app["audio_tracks"] = {}
352408

353409

354410
async def on_shutdown(app: web.Application):
@@ -410,11 +466,9 @@ async def on_shutdown(app: web.Application):
410466

411467
# Add routes for getting stream statistics.
412468
stream_stats_manager = StreamStatsManager(app)
469+
app.router.add_get("/streams/stats", stream_stats_manager.collect_all_stream_stats)
413470
app.router.add_get(
414-
"/streams/stats", stream_stats_manager.collect_all_stream_metrics
415-
)
416-
app.router.add_get(
417-
"/stream/{stream_id}/stats", stream_stats_manager.collect_stream_metrics_by_id
471+
"/stream/{stream_id}/stats", stream_stats_manager.collect_stream_stats_by_id
418472
)
419473

420474
# Add Prometheus metrics endpoint.

server/metrics/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1+
from .pipeline_stats import PipelineStats
12
from .prometheus_metrics import MetricsManager
2-
from .stream_stats import StreamStatsManager
3+
from .stream_stats import StreamStatsManager, StreamStats

server/metrics/pipeline_stats.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
"""Contains a class for real-time pipeline statistics."""
2+
3+
from typing import Optional, Dict, Any
4+
from .prometheus_metrics import MetricsManager
5+
6+
7+
class PipelineStats:
8+
"""Tracks real-time statistics of the pipeline.
9+
10+
Attributes:
11+
metrics_manager: The Prometheus metrics manager instance.
12+
track_id: The ID of the associated media pipeline (usually a stream track ID).
13+
"""
14+
15+
def __init__(
16+
self,
17+
metrics_manager: Optional[MetricsManager] = None,
18+
track_id: Optional[str] = None,
19+
):
20+
"""Initializes the PipelineStats class.
21+
22+
Args:
23+
metrics_manager: The Prometheus metrics manager instance.
24+
track_id: The ID of the stream track.
25+
"""
26+
self.metrics_manager = metrics_manager
27+
self.track_id = track_id
28+
29+
self._video_warmup_time = None
30+
self._audio_warmup_time = None
31+
32+
@property
33+
def video_warmup_time(self) -> float:
34+
"""Time taken to warm up the video pipeline."""
35+
return self._video_warmup_time
36+
37+
@video_warmup_time.setter
38+
def video_warmup_time(self, value: float):
39+
"""Sets the time taken to warm up the video pipeline."""
40+
self._video_warmup_time = value
41+
if self.metrics_manager:
42+
self.metrics_manager.update_video_warmup_time(value, self.track_id)
43+
44+
@property
45+
def audio_warmup_time(self) -> float:
46+
"""Time taken to warm up the audio pipeline."""
47+
return self._audio_warmup_time
48+
49+
@audio_warmup_time.setter
50+
def audio_warmup_time(self, value: float):
51+
"""Sets the time taken to warm up the audio pipeline."""
52+
self._audio_warmup_time = value
53+
if self.metrics_manager:
54+
self.metrics_manager.update_audio_warmup_time(value, self.track_id)
55+
56+
def to_dict(self) -> Dict[str, Any]:
57+
"""Convert stats to a dictionary for easy JSON serialization."""
58+
return {
59+
"video_warmup_time": self._video_warmup_time,
60+
"audio_warmup_time": self._audio_warmup_time,
61+
}

0 commit comments

Comments
 (0)