Skip to content

Commit

Permalink
Flow work included
Browse files Browse the repository at this point in the history
  • Loading branch information
vertexclique committed Jan 17, 2024
1 parent 6061112 commit 21c312b
Show file tree
Hide file tree
Showing 17 changed files with 722 additions and 174 deletions.
2 changes: 1 addition & 1 deletion book/_coverpage.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# <img src="https://raw.githubusercontent.com/vertexclique/callysto/master/art/callysto_logo.png">
RUST STREAM PROCESSING FRAMEWORK <small>1.0</small>
RUST STREAM PROCESSING & SERVICE FRAMEWORK <small>1.0</small>

[![](assets/img/github.svg) GitHub](https://github.com/vertexclique/callysto)
[![](assets/img/sitemap-solid.svg) What is Callysto?](https://vertexclique.github.io/callysto/)
2 changes: 1 addition & 1 deletion book/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
window.$docsify = {
// GENERAL
// -----------------------------------------------------------------
name : 'Callysto - Rust Stream Processing Framework',
name : 'Callysto - Rust Stream Processing & Service Framework',
repo: 'vertexclique/callysto',
coverpage : 'coverpage.md',
homepage : 'introduction.md',
Expand Down
7 changes: 0 additions & 7 deletions book/qguide.md

This file was deleted.

126 changes: 126 additions & 0 deletions book/quickstart.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Quickstart Guide

To include callysto to your project, add this to your `Cargo.toml`.

```toml
callysto = "0.1"
```

## First steps

We initialize the app like below in Callysto:

```rust
use callysto::prelude::*;

fn main() {
let mut app = Callysto::new();
app.with_name("first-steps");
app.run();
}
```

In Callysto, everything is a `Service`, apart from one-off tasks, they belong to `Task` class.
`Service` trait needs to be implemented to implement code that work as a service definition.
Every runnable in Callysto has it's own lifetime. Lifetime of these runnables (tasks, services, agents, etc.)
embedded to their implementation. So you manage stop, start, restart, crash procedures by yourself.
Callysto is not opinionated on how to write your lifecycle procedures. It rather leaves it to user.


## Customizing Configuration

Configuration can be passed differently, even we use some sane defaults you can still change the configuration as in:
```rust
fn main() {
let mut config = Config::default();
config.kafka_config.auto_offset_reset = OffsetReset::Earliest;

let mut app = Callysto::with_state(SharedState::new());

app.with_config(config);
app.with_name("always-read-the-earliest-message");

app.run();
}

```


## Defining a Service

Services and agents are twofold in Callysto: Stateful and Stateless.
Stateful services are the ones that uses global state whereas stateless ones doesn't care and won't use the global application state.

### Stateful Services
Here we are going to show you how to define stateful services.

Here is a boilerplate for a simple service definition which says hi every 5 seconds and also counts how many times it says "hi":
```rust
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use callysto::prelude::*;
use futures_timer::Delay;

#[derive(Clone)]
struct SharedState {
pub value: Arc<AtomicU32>,
}

impl SharedState {
fn new() -> Self {
Self {
value: Arc::new(AtomicU32::default()),
}
}
}

async fn say_hi_every5_seconds(ctx: Context<SharedState>) -> Result<SharedState> {
let state = ctx.state().to_owned();
println!("{} - Hi from Callysto!", state.value.fetch_add(1, Ordering::SeqCst));
Delay::new(std::time::Duration::from_secs(5)).await;
Ok(state)
}

fn main() {
let mut app = Callysto::with_state(SharedState::new());
app.with_name("sayhi");
app.stateful_service("SayHiEvery5Seconds", say_hi_every5_seconds, vec![]);

app.run();
}
```

### Stateless/Custom Services

Here we will give an example of stateless services.
Stateless services doesn't share a state at application level, or they don't care about the global application state, rather they communicate with each other via messages.
Same example above with no state baked in:

```rust
use callysto::prelude::*;
use futures_timer::Delay;

async fn service_core(ctx: Context<()>) -> Result<()> {
println!("Hi from Callysto!");
Delay::new(std::time::Duration::from_secs(5)).await;
Ok(())
}

fn main() {
let mut app = Callysto::new();

app.with_name("sayhi");

let service = CService::new(
"SayHiEvery5Seconds", // Service name
service_core, // Service function
(), // State, as of now, no state.
vec![] // Dependency services which should start before this service starts.
);
app.service(service);

app.run();
}
```

## Running
2 changes: 1 addition & 1 deletion book/sidebar.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<!-- markdownlint-disable-next-line first-line-heading -->
- [Introduction](introduction)
- [Quickstart Guide](qguide)
- [Quickstart Guide](quickstart)
- **Sources**
- [Sources](sources)
- **Flows**
Expand Down
50 changes: 50 additions & 0 deletions callysto/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::types::task::Task;
use crate::prelude::*;
use futures::Stream;
use rdkafka::producer::FutureProducer;
use crate::types::flows::{CFlow, Flow};

// TODO: not sure static dispatch is better here. Check on using State: 'static.

Expand All @@ -64,6 +65,7 @@ where
services: LOTable<usize, Arc<dyn Service<State>>>,
agents: LOTable<usize, Arc<dyn Agent<State>>>,
tables: LOTable<String, Arc<CTable<State>>>,
flows: LOTable<usize, Arc<dyn Service<State>>>,
table_agents: LOTable<usize, Arc<dyn TableAgent<State>>>,
routes: LOTable<String, Arc<dyn Router<State>>>,
}
Expand Down Expand Up @@ -133,6 +135,7 @@ where
services: LOTable::default(),
agents: LOTable::default(),
tables: LOTable::default(),
flows: LOTable::default(),
table_agents: LOTable::default(),
routes: LOTable::default(),
}
Expand Down Expand Up @@ -187,6 +190,12 @@ where
self
}

///
/// Get state on demand for global wide access.
pub fn get_state(&mut self) -> State {
self.state.clone()
}

///
/// By default `callysto-app` is used internally as application name.
/// If you want to change this you can use this method.
Expand Down Expand Up @@ -314,12 +323,53 @@ where
self
}

/// Helper to define custom service that skips or uses global shared state.
pub fn service(&self, s: impl Service<State>) -> &Self {
let stub = self.stubs.fetch_add(1, Ordering::AcqRel);
self.services.insert(stub, Arc::new(s));
self
}

/// Helper to define flow that skips or uses global shared state.
pub fn flow<T: AsRef<str>, F, S, R, Fut>(&self, name: T, stream: S, clo: F) -> &Self
where
R: 'static + Send,
S: Stream + Send + Sync + 'static,
State: Clone + Send + Sync + 'static,
F: Send + Sync + 'static + Fn(&S, Context<State>) -> Fut,
Fut: Future<Output = CResult<R>> + Send + 'static,
{
let stub = self.stubs.fetch_add(1, Ordering::AcqRel);
let flow = CFlow::new(
stream,
clo,
self.app_name.clone(),
name.as_ref().to_string(),
self.state.clone(),
Vec::default(),
);
self.flows.insert(stub, Arc::new(flow));
self
}

/// Helper to define stateful service that uses global application level state.
pub fn stateful_service<T, F, Fut>(&self, name: T, clo: F, dependencies: Vec<Arc<dyn Service<State>>>) -> &Self
where
T: AsRef<str>,
F: Send + Sync + 'static + Fn(Context<State>) -> Fut,
Fut: Future<Output = CResult<State>> + Send + 'static,
{
let stub = self.stubs.fetch_add(1, Ordering::AcqRel);
let service = CService::new(
name,
clo,
self.state.clone(),
Vec::default(),
);
self.services.insert(stub, Arc::new(service));
self
}

pub fn crontab<C: AsRef<str>>(&self, cron_expr: C, t: impl Task<State>) -> &Self {
let stub = self.stubs.fetch_add(1, Ordering::AcqRel);
let cron_job = Arc::new(CronJob::new(cron_expr, t));
Expand Down
6 changes: 5 additions & 1 deletion callysto/src/types/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::future::Future;
use std::io::Read;
use std::marker::PhantomData as marker;
use std::sync::Arc;
use std::time::Duration;
use futures_timer::Delay;
use tracing::{error, info};

///////////////////////////////////////////////////
Expand Down Expand Up @@ -145,7 +147,9 @@ where
}

async fn wait_until_stopped(&self) {
todo!()
while !self.stopped().await {
Delay::new(Duration::from_millis(10)).await;
}
}

async fn state(&self) -> String {
Expand Down
Loading

0 comments on commit 21c312b

Please sign in to comment.