-
Notifications
You must be signed in to change notification settings - Fork 124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Serving as a builtin source with Monovertex #2382
Conversation
Signed-off-by: Sreekanth <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2382 +/- ##
==========================================
+ Coverage 70.98% 70.99% +0.01%
==========================================
Files 367 368 +1
Lines 53042 53321 +279
==========================================
+ Hits 37651 37856 +205
- Misses 14311 14387 +76
+ Partials 1080 1078 -2 ☔ View full report in Codecov by Sentry. |
…t in in-memory store Signed-off-by: Sreekanth <[email protected]>
…-serving Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review
// Generate a tid with good enough randomness and not too long | ||
// Example of a UUID v7: 01951b72-d0f4-711e-baba-4efe03d9cb76 | ||
// We use the characters representing timestamp in milliseconds (without '-'), and last 5 characters for randomness. | ||
let uuid = Uuid::now_v7().to_string(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only use uuid if the user has not passed id right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This uuid is not used to represent the data stored in Store. Added details in the PR description.
Since request id is generated by the Store (or validated if user provides it), we use different tid for tracing (logs).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the point of this TID? just to log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, all log lines corresponding to a request will have this tid. Ideally the same should be used by the callback requests for that request too, but currently it's not propagated.
@@ -17,13 +19,51 @@ pub(crate) enum PayloadToSave { | |||
}, | |||
} | |||
|
|||
#[derive(Debug, PartialEq)] | |||
pub(crate) enum PipelineResult { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider a generic name that works for mvtx as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add docs..
async fn retrieve_datum(&mut self, id: &str) -> Result<Vec<Vec<u8>>, Error> { | ||
let id = format!("{}_{}", id, SAVED); | ||
async fn retrieve_datum(&mut self, id: &str) -> StoreResult<PipelineResult> { | ||
let id = format!("{}_{}", id, "saved"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep constant ?
} | ||
} | ||
} | ||
async fn done(&mut self, id: String) -> StoreResult<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add docs
…ement Signed-off-by: Sreekanth <[email protected]>
just returning a status is not enough, we will have to return body. e.g., 409 status code is not definite enough for clients. |
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's do this in follow up PR
pub(crate) fn register(&mut self, id: String) -> oneshot::Receiver<Result<String, Error>> { | ||
pub(crate) async fn register( | ||
&mut self, | ||
id: Option<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's make sure that the ID is no longer optional.
/// Register a request id in the store. If user provides a request id, the same should be returned | ||
/// if it doesn't already exist in the store. An error should be returned if the user-specified request id | ||
/// already exists in the store. If the `id` is `None`, the store should generate a new unique request id. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's make sure ID is created at the top (when the request is received)
} | ||
Ok(id) | ||
} | ||
None => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if ID is generated from above, we do not have to do this. This will be an over complication once we have other stores (IPS, DDB, etc.)
Fixes #2350
APIs:
POST /v1/process/sync
POST /v1/process/async
GET /v1/process/fetch?id=req_id
GET /v1/process/status?id=req_id
The responsibilty of generating a unique request id is moved to the Store implementation if the user doesn't provide one with
X-Numaflow-Id
header. If one is provided and if it already exist in the Store, we return 409 conflict status.Since request id is generated by Store (or validated if user provides it), we use different tid for tracing (logs).
A request (
/sync
and/async
) is registered in the store as the first step. With Redis store implementation, this adds an entry:When the processing is completed and all callbacks are received, the removal of request id from the tracker implementation causes the Redis status key to change value to 'completed'.
The callbacks will be stored in Redis in a list with the key
request:${request}:callbacks
and the result will be saved using the keyrequest:${request}:results
.Example: The entries in Redis follows below structure:
Monovertex still needs to make a callback (localhost) when Sink processing is successful. I attempted to remove this by storing a handle to the callback state within the callback handler implementation. However it resulted in generics everywhere (
start_source_forwarder
,create_components
, etc.) since callback state is generic over store.Currently the callbacks are stored when serving pod receives a callback from each vertex for each message. Doing this only when pod is shutting down or when all callback is received can reduce the Redis API calls. This can be implemented in the future. Currently I'm prioritizing things that will make the main branch in a working/usable state.