Skip to content

Commit

Permalink
Flow changes
Browse files Browse the repository at this point in the history
  • Loading branch information
vertexclique committed Jan 23, 2024
1 parent 63b4db6 commit 8915522
Show file tree
Hide file tree
Showing 13 changed files with 240 additions and 276 deletions.
321 changes: 119 additions & 202 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions book/sources.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Sources

Sources are the stream sources described in reactive streams.
24 changes: 12 additions & 12 deletions callysto-avro/src/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use callysto::nuclei;
use callysto::nuclei::Task;
use callysto::prelude::message::OwnedMessage;
use callysto::prelude::producer::FutureRecord;
use callysto::prelude::{CKStream, ClientConfig};
use callysto::prelude::{CStream, ClientConfig};
use callysto::rdkafka::Message;
use crossbeam_channel::Sender;
use cuneiform_fields::prelude::ArchPadding;
Expand All @@ -27,19 +27,19 @@ use tracing::trace;
pin_project! {
pub struct AvroValueDeserStream {
#[pin]
stream: CKStream,
stream: CStream,
schema: Schema
}
}

impl AvroValueDeserStream {
pub fn new(stream: CKStream, schema: Schema) -> Self {
pub fn new(stream: CStream, schema: Schema) -> Self {
Self { stream, schema }
}

///
/// Give raw [CKStream] that this value based deserializer stream is using.
pub fn raw_stream(mut self) -> CKStream {
/// Give raw [CStream] that this value based deserializer stream is using.
pub fn raw_stream(mut self) -> CStream {
self.stream
}
}
Expand Down Expand Up @@ -75,7 +75,7 @@ pin_project! {
pub struct AvroDeserStream<T>
{
#[pin]
stream: CKStream,
stream: CStream,
schema: Schema,
_marker: marker<T>
}
Expand All @@ -85,7 +85,7 @@ impl<T> AvroDeserStream<T>
where
T: for<'ud> Deserialize<'ud>,
{
pub fn new(stream: CKStream, schema: Schema) -> Self {
pub fn new(stream: CStream, schema: Schema) -> Self {
Self {
stream,
schema,
Expand All @@ -94,8 +94,8 @@ where
}

///
/// Give raw [CKStream] that this value based deserializer stream is using.
pub fn raw_stream(mut self) -> CKStream {
/// Give raw [CStream] that this value based deserializer stream is using.
pub fn raw_stream(mut self) -> CStream {
self.stream
}
}
Expand Down Expand Up @@ -144,14 +144,14 @@ where
}

///
/// Avro deserializer that takes [Schema] and [CKStream].
/// Avro deserializer that takes [Schema] and [CStream].
pub struct AvroDeserializer {
stream: CKStream,
stream: CStream,
schema: Schema,
}

impl AvroDeserializer {
pub fn create(mut stream: CKStream, schema: &str) -> Result<Self> {
pub fn create(mut stream: CStream, schema: &str) -> Result<Self> {
let schema =
Schema::parse_str(schema).map_err(|e| CallystoError::GeneralError(e.to_string()))?;
Ok(AvroDeserializer { stream, schema })
Expand Down
32 changes: 14 additions & 18 deletions callysto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,52 +44,48 @@ exclude = [
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["epoll", "asyncexec"]
default = ["iouring", "asyncexec"]
docs = ["store_rocksdb", "sink_elastic", "sink_postgres"]
# IO systems
iouring = ["nuclei/iouring"]
epoll = ["nuclei/epoll"]
# Executor
asyncexec = ["nuclei/async-exec"]
tokio = ["nuclei/tokio"]
tokio03 = ["nuclei/tokio03"]
tokio02 = ["nuclei/tokio02"]
# Storage systems
store_rocksdb = ["rocksdb"]
# Sinks
sink_elastic = ["tokio", "elasticsearch"]
sink_postgres = ["tokio", "deadpool-postgres", "deadpool"]

[dependencies]
#nuclei = { version = "0.2", default-features = false, features = ["epoll", "async-exec"] }
nuclei = "0.2.1"
lightproc = "0.3.5"
lever = "0.1.3"
thiserror = "1.0.37"
async-trait = "0.1.58"
nuclei = "0.4"
lever = "0.1.4"
thiserror = "1.0"
async-trait = "0.1"
futures = { version = "0.3", default-features = false, features = ["std", "async-await"] }
futures-timer = "3.0.2"
crossbeam-channel = "0.5.6"
crossbeam-channel = "0.5"
rdkafka = { version = "0.28.0", default-features = false, features = ["libz"] }
tracing = "0.1.37"
url = "2.3.1"
libc = "0.2.135"
tracing = "0.1"
url = "2.5"
libc = "0.2"
cuneiform-fields = "0.1.1"
serde = { version = "1.0.147", features = ["derive"] }
serde_json = "1.0.87"
bincode = "1.3.3"
http-types = "2.12.0"
async-h1 = "2.3.3"
pin-project-lite = "0.2.13"
futures-lite = "1.12.0"
async-h1 = "2.3"
pin-project-lite = "0.2"
futures-lite = "2.2"

# Optionals
rocksdb = { version = "0.19.0", optional = true }
elasticsearch = { version = "7.14.0-alpha.1", optional = true }
deadpool-postgres = { version = "0.10.2", features = [
deadpool-postgres = { version = "0.12", features = [
"serde",
], optional = true }
deadpool = { version = "0.9.5", optional = true }
deadpool = { version = "0.10", optional = true }
async-global-executor = "2.3.0"

[dev-dependencies]
Expand Down
3 changes: 1 addition & 2 deletions callysto/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use http_types::headers::ToHeaderValues;
use http_types::Request;
use lever::prelude::{HOPTable, LOTable};
use lever::sync::atomics::AtomicBox;
use lightproc::prelude::RecoverableHandle;
use nuclei::Task as AsyncTask;
use rdkafka::consumer::{Consumer, DefaultConsumerContext, MessageStream, StreamConsumer};
use rdkafka::error::KafkaResult;
Expand Down Expand Up @@ -282,7 +281,7 @@ where
/// ```
pub fn agent<T: AsRef<str>, F, Fut>(&self, name: T, topic: CTopic, clo: F) -> &Self
where
F: Send + Sync + 'static + Fn(CKStream, Context<State>) -> Fut,
F: Send + Sync + 'static + Fn(CStream, Context<State>) -> Fut,
Fut: Future<Output = CResult<()>> + Send + 'static,
{
let stub = self.stubs.fetch_add(1, Ordering::AcqRel);
Expand Down
12 changes: 6 additions & 6 deletions callysto/src/kafka/cconsumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ pub struct CConsumer {
pin_project! {
#[derive(Clone)]
#[must_use = "you need to poll streams otherwise it won't work"]
pub struct CKStream {
pub struct CStream {
pub context: Arc<CConsumerContext>,
#[pin]
rx: ArchPadding<Receiver<Option<OwnedMessage>>>
}
}

impl CKStream {
impl CStream {
///
/// Get consumer context.
pub fn context(&self) -> Arc<CConsumerContext> {
Expand All @@ -67,7 +67,7 @@ impl CKStream {
}
}

impl Stream for CKStream {
impl Stream for CStream {
type Item = Option<OwnedMessage>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand All @@ -87,7 +87,7 @@ impl CConsumer {
self.consumer.clone()
}

pub fn cstream(&self) -> CKStream {
pub fn cstream(&self) -> CStream {
let (tx, rx) = (self.tx.clone(), self.rx.clone());
let consumer = self.consumer_instance();
Self::gen_stream(tx, rx, consumer)
Expand All @@ -99,7 +99,7 @@ impl CConsumer {
tx: ArchPadding<Sender<Option<OwnedMessage>>>,
rx: ArchPadding<Receiver<Option<OwnedMessage>>>,
consumer: Arc<BaseConsumer<CConsumerContext>>,
) -> CKStream {
) -> CStream {
let context = consumer.context().clone();

let handle = thread::Builder::new()
Expand All @@ -118,6 +118,6 @@ impl CConsumer {
}
});

CKStream { context, rx }
CStream { context, rx }
}
}
1 change: 0 additions & 1 deletion callysto/src/stores/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use futures_timer::Delay;
use lever::prelude::{LOTable, TTas};
use lever::sync::atomics::AtomicBox;
use lever::sync::ifaces::LockIface;
use lightproc::prelude::State;
use rdkafka::message::OwnedMessage;
use rdkafka::Message;
use rocksdb::{
Expand Down
14 changes: 7 additions & 7 deletions callysto/src/types/agent.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::context::Context;
use crate::errors::Result as CResult;
use crate::errors::*;
use crate::kafka::cconsumer::CKStream;
use crate::kafka::cconsumer::CStream;
use crate::kafka::ctopic::*;
use crate::types::service::{Service, ServiceState};
use crate::types::table::CTable;
Expand All @@ -27,7 +27,7 @@ use tracing::{error, info};
pub struct CAgent<State, F, Fut>
where
State: Clone + Send + Sync + 'static,
F: Send + Sync + 'static + Fn(CKStream, Context<State>) -> Fut,
F: Send + Sync + 'static + Fn(CStream, Context<State>) -> Fut,
Fut: Future<Output = CResult<()>> + Send + 'static,
{
clo: F,
Expand All @@ -41,7 +41,7 @@ where
impl<State, F, Fut> CAgent<State, F, Fut>
where
State: Clone + Send + Sync + 'static,
F: Send + Sync + 'static + Fn(CKStream, Context<State>) -> Fut,
F: Send + Sync + 'static + Fn(CStream, Context<State>) -> Fut,
Fut: Future<Output = CResult<()>> + Send + 'static,
{
pub fn new(
Expand Down Expand Up @@ -73,17 +73,17 @@ where
State: Clone + Send + Sync + 'static,
{
/// Do work on given message with state passed in
async fn call(&self, stream: CKStream, st: Context<State>) -> CResult<()>;
async fn call(&self, stream: CStream, st: Context<State>) -> CResult<()>;
}

#[async_trait]
impl<State, F, Fut> Agent<State> for CAgent<State, F, Fut>
where
State: Clone + Send + Sync + 'static,
F: Send + Sync + 'static + Fn(CKStream, Context<State>) -> Fut,
F: Send + Sync + 'static + Fn(CStream, Context<State>) -> Fut,
Fut: Future<Output = CResult<()>> + Send + 'static,
{
async fn call(&self, stream: CKStream, req: Context<State>) -> CResult<()> {
async fn call(&self, stream: CStream, req: Context<State>) -> CResult<()> {
let fut = (self.clo)(stream, req);
let res = fut.await?;
Ok(res)
Expand All @@ -94,7 +94,7 @@ where
impl<State, F, Fut> Service<State> for CAgent<State, F, Fut>
where
State: Clone + Send + Sync + 'static,
F: Send + Sync + 'static + Fn(CKStream, Context<State>) -> Fut,
F: Send + Sync + 'static + Fn(CStream, Context<State>) -> Fut,
Fut: Future<Output = CResult<()>> + Send + 'static,
{
async fn call(&self, st: Context<State>) -> Result<State> {
Expand Down
30 changes: 3 additions & 27 deletions callysto/src/types/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,12 @@ use futures_lite::{Stream, StreamExt};
use futures_timer::Delay;
use pin_project_lite::pin_project;
use tracing::{error, info};
use crate::prelude::{Agent, CAgent, Context as CContext, CKStream, CTopic, Service, ServiceState};
use crate::prelude::{Agent, CAgent, Context as CContext, CStream, CTopic, Service, ServiceState};
use crate::errors::Result as CResult;

// pin_project! {
// pub struct CFlow<State, S, R, F, Fut>
// where
// S: Stream,
// State: Clone,
// State: Send,
// State: Sync,
// State: 'static,
// F: Send,
// F: Sync,
// F: 'static,
// F: Fn(S, Context<State>) -> Fut,
// Fut: Future<Output = CResult<R>>,
// Fut: Send,
// Fut: 'static,
// {
// #[pin]
// stream: S,
// clo: F,
// app_name: String,
// flow_name: String,
// state: State,
// dependencies: Vec<Arc<dyn Service<State>>>,
// }
// }

pin_project! {
///
/// Flow that wraps the Source stream.
#[derive(Clone, Debug)]
#[must_use = "you need to poll streams otherwise it won't work"]
pub struct CSource<S>
Expand Down
1 change: 0 additions & 1 deletion callysto/src/types/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use futures::future::BoxFuture;
use futures::{FutureExt, SinkExt};
use futures_timer::Delay;
use lever::sync::atomics::AtomicBox;
use lightproc::prelude::State;
use rdkafka::message::OwnedMessage;
use rdkafka::{ClientConfig, ClientContext, Message};
use serde::de::DeserializeOwned;
Expand Down
4 changes: 4 additions & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ daemonize = "0.5"
async-trait = "0.1"
futures-timer = "3.0"
futures = "0.3"
anyhow = "1.0"
async-h1 = "2.3"
async-dup = "1.2"
http-types = "2.12"
callysto = { path = "../callysto" }
callysto-avro = { path = "../callysto-avro" }

Expand Down Expand Up @@ -44,6 +48,10 @@ path = "src/double-agent.rs"
name = "elastic-sink"
path = "src/elastic-sink.rs"

[[example]]
name = "httpserv"
path = "src/httpserv.rs"

[[example]]
name = "flow"
path = "src/flow.rs"
Expand Down
Loading

0 comments on commit 8915522

Please sign in to comment.