Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Serving as a builtin source with Monovertex #2382

Merged
merged 25 commits into from
Feb 20, 2025
Merged

feat: Serving as a builtin source with Monovertex #2382

merged 25 commits into from
Feb 20, 2025

Conversation

BulkBeing
Copy link
Contributor

@BulkBeing BulkBeing commented Feb 6, 2025

Fixes #2350

APIs:

  • POST /v1/process/sync
  • POST /v1/process/async
  • GET /v1/process/fetch?id=req_id
  • GET /v1/process/status?id=req_id

The responsibilty of generating a unique request id is moved to the Store implementation if the user doesn't provide one with X-Numaflow-Id header. If one is provided and if it already exist in the Store, we return 409 conflict status.

Since request id is generated by Store (or validated if user provides it), we use different tid for tracing (logs).

A request (/sync and /async) is registered in the store as the first step. With Redis store implementation, this adds an entry:

request:${request_id}:status  = 'pending'

When the processing is completed and all callbacks are received, the removal of request id from the tracker implementation causes the Redis status key to change value to 'completed'.

The callbacks will be stored in Redis in a list with the key request:${request}:callbacks and the result will be saved using the key request:${request}:results.

Example: The entries in Redis follows below structure:

request:01951c1a-418f-74f3-a2b2-ee480d877f80:status  = "completed"
request:01951c1a-418f-74f3-a2b2-ee480d877f80:results = {"hello":"world"}

request:01951c1a-418f-74f3-a2b2-ee480d877f80:callbacks = [
    {"id":"01951c1a-418f-74f3-a2b2-ee480d877f80","vertex":"serve-sink","cb_time":1739933238792,"from_vertex":"serving-in","responses":[{"tags":null}]}

    {"id":"01951c1a-418f-74f3-a2b2-ee480d877f80","vertex":"serving-in","cb_time":1739933237779,"from_vertex":"serving-in","responses":[{"tags":null}]}
]

Monovertex still needs to make a callback (localhost) when Sink processing is successful. I attempted to remove this by storing a handle to the callback state within the callback handler implementation. However it resulted in generics everywhere (start_source_forwarder, create_components, etc.) since callback state is generic over store.

Currently the callbacks are stored when serving pod receives a callback from each vertex for each message. Doing this only when pod is shutting down or when all callback is received can reduce the Redis API calls. This can be implemented in the future. Currently I'm prioritizing things that will make the main branch in a working/usable state.

Copy link

codecov bot commented Feb 6, 2025

Codecov Report

Attention: Patch coverage is 82.24852% with 90 lines in your changes missing coverage. Please review.

Project coverage is 70.99%. Comparing base (d5d4408) to head (c14531b).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
rust/serving/src/app/jetstream_proxy.rs 72.04% 45 Missing ⚠️
rust/serving/src/app/callback/store/redisstore.rs 79.13% 24 Missing ⚠️
rust/serving/src/config.rs 83.33% 17 Missing ⚠️
rust/serving/src/app/callback/store/memstore.rs 93.10% 2 Missing ⚠️
rust/serving/src/app/response.rs 0.00% 1 Missing ⚠️
rust/serving/src/app/tracker.rs 97.05% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2382      +/-   ##
==========================================
+ Coverage   70.98%   70.99%   +0.01%     
==========================================
  Files         367      368       +1     
  Lines       53042    53321     +279     
==========================================
+ Hits        37651    37856     +205     
- Misses      14311    14387      +76     
+ Partials     1080     1078       -2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
@BulkBeing BulkBeing marked this pull request as ready for review February 19, 2025 06:36
Copy link
Contributor

@yhl25 yhl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review

// Generate a tid with good enough randomness and not too long
// Example of a UUID v7: 01951b72-d0f4-711e-baba-4efe03d9cb76
// We use the characters representing timestamp in milliseconds (without '-'), and last 5 characters for randomness.
let uuid = Uuid::now_v7().to_string();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only use uuid if the user has not passed id right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This uuid is not used to represent the data stored in Store. Added details in the PR description.

Since request id is generated by the Store (or validated if user provides it), we use different tid for tracing (logs).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the point of this TID? just to log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, all log lines corresponding to a request will have this tid. Ideally the same should be used by the callback requests for that request too, but currently it's not propagated.

@@ -17,13 +19,51 @@ pub(crate) enum PayloadToSave {
},
}

#[derive(Debug, PartialEq)]
pub(crate) enum PipelineResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider a generic name that works for mvtx as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docs..

async fn retrieve_datum(&mut self, id: &str) -> Result<Vec<Vec<u8>>, Error> {
let id = format!("{}_{}", id, SAVED);
async fn retrieve_datum(&mut self, id: &str) -> StoreResult<PipelineResult> {
let id = format!("{}_{}", id, "saved");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep constant ?

}
}
}
async fn done(&mut self, id: String) -> StoreResult<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docs

@vigith
Copy link
Member

vigith commented Feb 20, 2025

just returning a status is not enough, we will have to return body. e.g., 409 status code is not definite enough for clients.

Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Copy link
Member

@vigith vigith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do this in follow up PR

pub(crate) fn register(&mut self, id: String) -> oneshot::Receiver<Result<String, Error>> {
pub(crate) async fn register(
&mut self,
id: Option<String>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make sure that the ID is no longer optional.

Comment on lines +59 to +61
/// Register a request id in the store. If user provides a request id, the same should be returned
/// if it doesn't already exist in the store. An error should be returned if the user-specified request id
/// already exists in the store. If the `id` is `None`, the store should generate a new unique request id.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make sure ID is created at the top (when the request is received)

}
Ok(id)
}
None => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if ID is generated from above, we do not have to do this. This will be an over complication once we have other stores (IPS, DDB, etc.)

@vigith vigith merged commit 01cc33d into main Feb 20, 2025
28 checks passed
@vigith vigith deleted the mvtx-serving branch February 20, 2025 04:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Monovertex changes to support serving as a builtin source
3 participants