Skip to content

Commit

Permalink
feat: Serving as a builtin source with Monovertex (#2382)
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
BulkBeing and vigith authored Feb 20, 2025
1 parent d5d4408 commit 01cc33d
Show file tree
Hide file tree
Showing 20 changed files with 684 additions and 289 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/numaflow-core/src/config/monovertex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl MonovertexConfig {
.metadata
.as_ref()
.and_then(|metadata| metadata.name.clone())
.ok_or_else(|| Error::Config("Mono vertex name not found".to_string()))?;
.ok_or_else(|| Error::Config("MonoVertex name not found".to_string()))?;

let transformer_config = mono_vertex_obj
.spec
Expand Down
3 changes: 2 additions & 1 deletion rust/serving/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ axum-macros = "0.4.1"
hyper-util = { version = "0.1.6", features = ["client-legacy"] }
serde_json = "1.0.120"
tower-http = { version = "0.5.2", features = ["trace", "timeout"] }
uuid = { version = "1.10.0", features = ["v4"] }
uuid = { version = "1.10.0", features = ["std","v7"] }
redis = { version = "0.26.0", features = [
"tokio-comp",
"aio",
Expand All @@ -39,6 +39,7 @@ parking_lot = "0.12.3"
prometheus-client = "0.22.3"
thiserror = "1.0.63"
reqwest = { workspace = true, features = ["rustls-tls", "json"] }
http = "1.2.0"

[dev-dependencies]
reqwest = { workspace = true, features = ["json"] }
Expand Down
13 changes: 6 additions & 7 deletions rust/serving/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ pub(crate) async fn router_with_auth<T>(app: AppState<T>) -> crate::Result<Route
where
T: Clone + Send + Sync + Store + 'static,
{
let tid_header = app.settings.tid_header.clone();
let layers = ServiceBuilder::new()
// Add tracing to all requests
.layer(
Expand All @@ -87,12 +86,12 @@ where
// We don't need request ID for these endpoints
return info_span!("request", method=?req.method(), path=req_path);
}
let tid = req
.headers()
.get(&tid_header)
.and_then(|v| v.to_str().ok())
.map(|v| v.to_string())
.unwrap_or_else(|| Uuid::new_v4().to_string());

// Generate a tid with good enough randomness and not too long
// Example of a UUID v7: 01951b72-d0f4-711e-baba-4efe03d9cb76
// We use the characters representing timestamp in milliseconds (without '-'), and last 5 characters for randomness.
let uuid = Uuid::now_v7().to_string();
let tid = format!("{}{}{}", &uuid[..8], &uuid[10..13], &uuid[uuid.len() - 5..]);

let matched_path = req
.extensions()
Expand Down
2 changes: 1 addition & 1 deletion rust/serving/src/app/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ mod tests {
let msg_graph = MessageGraph::from_pipeline(&pipeline_spec).unwrap();
let mut state = CallbackState::new(msg_graph, store).await.unwrap();

let x = state.register("test_id".to_string());
let (_, x) = state.register(Some("test_id".to_string())).await.unwrap();
// spawn a task which will be awaited later
let handle = tokio::spawn(async move {
let _ = x.await.unwrap();
Expand Down
78 changes: 47 additions & 31 deletions rust/serving/src/app/callback/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use std::{

use tokio::sync::oneshot;

use super::store::Store;
use super::store::{ProcessingStatus, Store};
use crate::app::callback::store::Error as StoreError;
use crate::app::callback::store::Result as StoreResult;
use crate::app::callback::{store::PayloadToSave, Callback};
use crate::app::tracker::MessageGraph;
use crate::Error;
Expand Down Expand Up @@ -42,25 +44,36 @@ where
}

/// register a new connection
/// The oneshot receiver will be notified when all callbacks for this connection is received from the numaflow pipeline
pub(crate) fn register(&mut self, id: String) -> oneshot::Receiver<Result<String, Error>> {
/// The oneshot receiver will be notified when all callbacks for this connection is received from
/// the numaflow pipeline.
pub(crate) async fn register(
&mut self,
id: Option<String>,
) -> StoreResult<(String, oneshot::Receiver<Result<String, Error>>)> {
// TODO: add an entry in Redis to note that the entry has been registered

let id = self.store.register(id).await?;

let (tx, rx) = oneshot::channel();
let mut guard = self.callbacks.lock().expect("Getting lock on State");
guard.insert(
id.clone(),
RequestState {
tx,
vtx_visited: Vec::new(),
},
);
rx
{
let mut guard = self.callbacks.lock().expect("Getting lock on State");
guard.insert(
id.clone(),
RequestState {
tx,
vtx_visited: Vec::new(),
},
);
}
Ok((id, rx))
}

/// Retrieves the output of the numaflow pipeline
pub(crate) async fn retrieve_saved(&mut self, id: &str) -> Result<Vec<Vec<u8>>, Error> {
self.store.retrieve_datum(id).await
pub(crate) async fn retrieve_saved(
&mut self,
id: &str,
) -> Result<ProcessingStatus, StoreError> {
self.store.retrieve_datum(id).await.map_err(Into::into)
}

pub(crate) async fn save_response(
Expand All @@ -76,6 +89,7 @@ where
value: body,
}])
.await
.map_err(Into::into)
}

/// insert_callback_requests is used to insert the callback requests.
Expand Down Expand Up @@ -104,13 +118,10 @@ where
let id = cbr.id.clone();
{
let mut guard = self.callbacks.lock().expect("Getting lock on State");
guard
.get_mut(&cbr.id)
.ok_or(Error::IDNotFound(
"Connection for the received callback is not present in the in-memory store",
))?
.vtx_visited
.push(cbr);
let req_state = guard.get_mut(&id).ok_or_else(|| {
Error::SubGraphInvalidInput(format!("request id {id} doesn't exist in-memory"))
})?;
req_state.vtx_visited.push(cbr);
}

// check if the sub graph can be generated
Expand All @@ -125,7 +136,8 @@ where
// if the sub graph is not generated, then we can continue
continue;
}
_ => {
err => {
tracing::error!(?err, "Failed to generate subgraph");
// if there is an error, deregister with the error
self.deregister(&id).await?
}
Expand Down Expand Up @@ -190,6 +202,8 @@ where
));
};

self.store.done(id.to_string()).await?;

state
.tx
.send(Ok(id.to_string()))
Expand All @@ -209,13 +223,12 @@ where
id: &str,
) -> Result<Vec<Arc<Callback>>, Error> {
// If the id is not found in the in-memory store, fetch from Redis
let callbacks: Vec<Arc<Callback>> = match self.store.retrieve_callbacks(id).await {
Ok(response) => response.into_iter().collect(),
Err(e) => {
return Err(e);
}
};
Ok(callbacks)
Ok(self
.store
.retrieve_callbacks(id)
.await?
.into_iter()
.collect())
}

// Check if the store is ready
Expand Down Expand Up @@ -244,7 +257,7 @@ mod tests {

// Test register
let id = "test_id".to_string();
let rx = state.register(id.clone());
let (id, rx) = state.register(Some(id.clone())).await.unwrap();

let xid = id.clone();

Expand All @@ -262,7 +275,10 @@ mod tests {

// Test retrieve_saved
let saved = state.retrieve_saved(&id).await.unwrap();
assert_eq!(saved, vec!["Test Message".as_bytes()]);
assert_eq!(
saved,
ProcessingStatus::Completed(vec!["Test Message".as_bytes().to_vec()])
);

// Test insert_callback_requests
let cbs = vec![
Expand Down
47 changes: 44 additions & 3 deletions rust/serving/src/app/callback/store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::sync::Arc;

use thiserror::Error;

use crate::app::callback::Callback;

// in-memory store
Expand All @@ -17,13 +19,52 @@ pub(crate) enum PayloadToSave {
},
}

#[derive(Debug, PartialEq)]
/// Represents the current processing status of a request id in the `Store`.
pub(crate) enum ProcessingStatus {
InProgress,
Completed(Vec<Vec<u8>>),
}

#[derive(Error, Debug, Clone)]
pub(crate) enum Error {
#[error("Connecting to the store: {0}")]
Connection(String),

#[error("Request id {0} doesn't exist in store")]
InvalidRequestId(String),

#[error("Request id {0} already exists in the store")]
DuplicateRequest(String),

#[error("Reading from the store: {0}")]
StoreRead(String),

#[error("Writing payload to the store: {0}")]
StoreWrite(String),
}

impl From<Error> for crate::Error {
fn from(value: Error) -> Self {
crate::Error::Store(value.to_string())
}
}

pub(crate) type Result<T> = std::result::Result<T, Error>;

/// Store trait to store the callback information.
#[trait_variant::make(Store: Send)]
#[allow(dead_code)]
pub(crate) trait LocalStore {
async fn save(&mut self, messages: Vec<PayloadToSave>) -> crate::Result<()>;
/// Register a request id in the store. If user provides a request id, the same should be returned
/// if it doesn't already exist in the store. An error should be returned if the user-specified request id
/// already exists in the store. If the `id` is `None`, the store should generate a new unique request id.
async fn register(&mut self, id: Option<String>) -> Result<String>;
/// This method will be called when processing is completed for a request id.
async fn done(&mut self, id: String) -> Result<()>;
async fn save(&mut self, messages: Vec<PayloadToSave>) -> Result<()>;
/// retrieve the callback payloads
async fn retrieve_callbacks(&mut self, id: &str) -> Result<Vec<Arc<Callback>>, crate::Error>;
async fn retrieve_datum(&mut self, id: &str) -> Result<Vec<Vec<u8>>, crate::Error>;
async fn retrieve_callbacks(&mut self, id: &str) -> Result<Vec<Arc<Callback>>>;
async fn retrieve_datum(&mut self, id: &str) -> Result<ProcessingStatus>;
async fn ready(&mut self) -> bool;
}
51 changes: 35 additions & 16 deletions rust/serving/src/app/callback/store/memstore.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::collections::HashMap;
use std::sync::Arc;

use super::PayloadToSave;
use uuid::Uuid;

use super::{Error as StoreError, Result as StoreResult};
use super::{PayloadToSave, ProcessingStatus};
use crate::app::callback::Callback;
use crate::consts::SAVED;
use crate::Error;

const STORE_KEY_SUFFIX: &str = "saved";

/// `InMemoryStore` is an in-memory implementation of the `Store` trait.
/// It uses a `HashMap` to store data in memory.
Expand All @@ -26,25 +29,32 @@ impl InMemoryStore {
}

impl super::Store for InMemoryStore {
async fn register(&mut self, id: Option<String>) -> StoreResult<String> {
Ok(id.unwrap_or_else(|| Uuid::now_v7().to_string()))
}
async fn done(&mut self, _id: String) -> StoreResult<()> {
Ok(())
}
/// Saves a vector of `PayloadToSave` into the `HashMap`.
/// Each `PayloadToSave` is serialized into bytes and stored in the `HashMap` under its key.
async fn save(&mut self, messages: Vec<PayloadToSave>) -> crate::Result<()> {
async fn save(&mut self, messages: Vec<PayloadToSave>) -> StoreResult<()> {
let mut data = self.data.lock().unwrap();
for msg in messages {
match msg {
PayloadToSave::Callback { key, value } => {
if key.is_empty() {
return Err(Error::StoreWrite("Key cannot be empty".to_string()));
return Err(StoreError::StoreWrite("Key cannot be empty".to_string()));
}
let bytes = serde_json::to_vec(&*value)
.map_err(|e| Error::StoreWrite(format!("Serializing to bytes - {}", e)))?;
let bytes = serde_json::to_vec(&*value).map_err(|e| {
StoreError::StoreWrite(format!("Serializing to bytes - {}", e))
})?;
data.entry(key).or_default().push(bytes);
}
PayloadToSave::DatumFromPipeline { key, value } => {
if key.is_empty() {
return Err(Error::StoreWrite("Key cannot be empty".to_string()));
return Err(StoreError::StoreWrite("Key cannot be empty".to_string()));
}
data.entry(format!("{}_{}", key, SAVED))
data.entry(format!("{key}_{STORE_KEY_SUFFIX}"))
.or_default()
.push(value.into());
}
Expand All @@ -55,15 +65,15 @@ impl super::Store for InMemoryStore {

/// Retrieves callbacks for a given id from the `HashMap`.
/// Each callback is deserialized from bytes into a `CallbackRequest`.
async fn retrieve_callbacks(&mut self, id: &str) -> Result<Vec<Arc<Callback>>, Error> {
async fn retrieve_callbacks(&mut self, id: &str) -> StoreResult<Vec<Arc<Callback>>> {
let data = self.data.lock().unwrap();
match data.get(id) {
Some(result) => {
let messages: Result<Vec<_>, _> = result
.iter()
.map(|msg| {
let cbr: Callback = serde_json::from_slice(msg).map_err(|_| {
Error::StoreRead(
StoreError::StoreRead(
"Failed to parse CallbackRequest from bytes".to_string(),
)
})?;
Expand All @@ -72,18 +82,24 @@ impl super::Store for InMemoryStore {
.collect();
messages
}
None => Err(Error::StoreRead(format!("No entry found for id: {}", id))),
None => Err(StoreError::StoreRead(format!(
"No entry found for id: {}",
id
))),
}
}

/// Retrieves data for a given id from the `HashMap`.
/// Each piece of data is deserialized from bytes into a `String`.
async fn retrieve_datum(&mut self, id: &str) -> Result<Vec<Vec<u8>>, Error> {
let id = format!("{}_{}", id, SAVED);
async fn retrieve_datum(&mut self, id: &str) -> StoreResult<ProcessingStatus> {
let id = format!("{id}_{STORE_KEY_SUFFIX}");
let data = self.data.lock().unwrap();
match data.get(&id) {
Some(result) => Ok(result.to_vec()),
None => Err(Error::StoreRead(format!("No entry found for id: {}", id))),
Some(result) => Ok(ProcessingStatus::Completed(result.to_vec())),
None => Err(StoreError::InvalidRequestId(format!(
"No entry found for id: {}",
id
))),
}
}

Expand Down Expand Up @@ -146,6 +162,9 @@ mod tests {

// Retrieve the datum
let retrieved = store.retrieve_datum(&key).await.unwrap();
let ProcessingStatus::Completed(retrieved) = retrieved else {
panic!("Expected pipeline processing to be completed");
};

// Check that the retrieved datum is the same as the one we saved
assert_eq!(retrieved.len(), 1);
Expand Down
Loading

0 comments on commit 01cc33d

Please sign in to comment.