Conversation
Adds two new HTTP endpoints behind the ENABLE_STREAM_API flag:
- GET /stats: returns aggregated camera_fps, inference_fps, and
stream_count across all active pipelines. Reuses existing
list_pipelines/get_status IPC -- no new commands needed.
- GET /inference_pipelines/{pipeline_id}/latest_frame: returns the most
recent frame as a base64-encoded JPEG with metadata. Adds a new
LATEST_FRAME IPC command that peeks at the buffer non-destructively.
82e1800 to
5c2050f
Compare
|
|
||
| @app.get( | ||
| "/stats", | ||
| summary="Aggregated pipeline statistics", |
There was a problem hiding this comment.
description + response_model please ;)
| pipeline_ids = pipelines_resp.pipelines | ||
| stream_count = len(pipeline_ids) | ||
| for pid in pipeline_ids: | ||
| status_resp = await self.stream_manager_client.get_status( |
There was a problem hiding this comment.
tasks = [
self.stream_manager_client.get_status(pid)
for pid in pipeline_ids
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
? Not sure how much would it matter though.
| pid | ||
| ) | ||
| report = status_resp.report | ||
| throughput = report.get("inference_throughput", 0.0) |
There was a problem hiding this comment.
Couldn't we split this into somthing like :
async def get_stats():
reports = await self.fetch_pipeline_reports()
compute_stats(reports)
I think we are pushing too much into the endpoint function bodies. This is not a place for business logic.
Additionally thinking to write in this way will allow use to easily optimize the work. I know that the stats calculation is trivial, but if it would be more complicated - this is a blocking operation. Having it as a separate compute_stats would allow us to quickly fix this, running this in a separate thread or something like that.
| self._responses_queue.put((request_id, response_payload)) | ||
| return None | ||
| _, jpeg_bytes = cv.imencode( | ||
| ".jpg", frame.image, [cv.IMWRITE_JPEG_QUALITY, 70] |
There was a problem hiding this comment.
Wouldn't we like to allow to parametrize this through the request? Not sure about it's usefulness at this moment because I don't know the full context, but just wanted to point this out. Although in that case I would provide some enum with some reasonable values, low, medium, high where medium is 70 for example. Otherwise people would probably skew to typing 100 all the time.
Summary
GET /statsendpoint that returns aggregatedcamera_fps,inference_fps, andstream_countacross all active pipelines. Reuses existinglist_pipelines/get_statusIPC, no new commands needed.GET /inference_pipelines/{pipeline_id}/latest_frameendpoint that returns the most recent frame as base64-encoded JPEG with metadata (frame_id,frame_timestamp,source_id). Uses a newLATEST_FRAMEIPC command that peeks at the buffer non-destructively.Both endpoints are gated behind
ENABLE_STREAM_API.Test plan
pytest tests/inference/unit_tests/core/interfaces/stream_manager/test_stats_and_latest_frame.py -vENABLE_STREAM_API=true, callGET /statsand verify JSON shapeGET /inference_pipelines/{id}/latest_frameand verify base64 JPEG decodes