Skip to content

Commit

Permalink
Remove .tick() command
Browse files Browse the repository at this point in the history
  • Loading branch information
mvniekerk committed Jan 16, 2023
1 parent ad4643e commit 8752238
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 91 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tokio-cron-scheduler"
version = "0.8.4"
version = "0.9.0"
authors = ["Michael van Niekerk <[email protected]>"]
edition = "2018"

Expand Down
69 changes: 12 additions & 57 deletions src/job_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::error::JobSchedulerError;
use crate::job::to_code::{JobCode, NotificationCode};
use crate::job::{JobCreator, JobDeleter, JobLocked, JobRunner};
use crate::notification::{NotificationCreator, NotificationDeleter, NotificationRunner};
use crate::scheduler::{Scheduler, StartResult};
use crate::scheduler::Scheduler;
use crate::simple::{
SimpleJobCode, SimpleMetadataStore, SimpleNotificationCode, SimpleNotificationStore,
};
Expand Down Expand Up @@ -130,7 +130,7 @@ impl JobsSchedulerLocked {

{
let mut scheduler = scheduler.write().await;
scheduler.init(&context);
scheduler.init(&context).await;
}

Ok(())
Expand Down Expand Up @@ -207,7 +207,7 @@ impl JobsSchedulerLocked {
///
/// Create a new `JobsSchedulerLocked` using custom metadata and notification runners, job and notification
/// code providers
pub fn new_with_storage_and_code(
pub async fn new_with_storage_and_code(
metadata_storage: Box<dyn MetaDataStorage + Send + Sync>,
notification_storage: Box<dyn NotificationStore + Send + Sync>,
job_code: Box<dyn JobCode + Send + Sync>,
Expand All @@ -218,24 +218,13 @@ impl JobsSchedulerLocked {
let job_code = Arc::new(RwLock::new(job_code));
let notification_code = Arc::new(RwLock::new(notification_code));

let (storage_init_tx, storage_init_rx) = std::sync::mpsc::channel();

tokio::spawn(async move {
let context = JobsSchedulerLocked::init_context(
metadata_storage,
notification_storage,
job_code,
notification_code,
)
.await;
if let Err(e) = storage_init_tx.send(context) {
error!("Error sending init success {:?}", e);
}
});

let context = storage_init_rx
.recv()
.map_err(|_| JobSchedulerError::CantInit)??;
let context = JobsSchedulerLocked::init_context(
metadata_storage,
notification_storage,
job_code,
notification_code,
)
.await?;

let val = JobsSchedulerLocked {
context,
Expand Down Expand Up @@ -300,34 +289,6 @@ impl JobsSchedulerLocked {
JobDeleter::remove(&context, to_be_removed)
}

/// The `tick` method increments time for the JobScheduler and executes
/// any pending jobs. It is recommended to sleep for at least 500
/// milliseconds between invocations of this method.
/// This is kept public if you're running this yourself. It is better to
/// call the `start` method if you want all of this automated for you.
///
/// ```rust,ignore
/// loop {
/// sched.tick().await;
/// std::thread::sleep(Duration::from_millis(500));
/// }
/// ```
pub async fn tick(&self) -> Result<(), JobSchedulerError> {
if !self.inited().await {
let mut s = self.clone();
s.init().await?;
}
let ret = self.scheduler.write().await;
let ret = ret.tick();
match ret {
Ok(ret) => Ok(ret),
Err(e) => {
error!("Error receiving tick result {:?}", e);
Err(JobSchedulerError::TickError)
}
}
}

/// The `start` spawns a Tokio task where it loops. Every 500ms it
/// runs the tick method to increment any
/// any pending jobs.
Expand All @@ -337,13 +298,13 @@ impl JobsSchedulerLocked {
/// eprintln!("Error on scheduler {:?}", e);
/// }
/// ```
pub async fn start(&self) -> StartResult {
pub async fn start(&self) -> Result<(), JobSchedulerError> {
if !self.inited().await {
let mut s = self.clone();
s.init().await?;
}
let mut scheduler = self.scheduler.write().await;
let ret = scheduler.start();
let ret = scheduler.start().await;

match ret {
Ok(ret) => Ok(ret),
Expand All @@ -357,12 +318,6 @@ impl JobsSchedulerLocked {
/// The `time_till_next_job` method returns the duration till the next job
/// is supposed to run. This can be used to sleep until then without waking
/// up at a fixed interval.AsMut
///
/// ```rust, ignore
/// loop {
/// sched.tick().await;
/// std::thread::sleep(sched.time_till_next_job());
/// }
/// ```
pub async fn time_till_next_job(
&mut self,
Expand Down
48 changes: 15 additions & 33 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,9 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::Sender;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tracing::error;
use uuid::Uuid;

pub type StartResult = Result<JoinHandle<()>, JobSchedulerError>;

pub struct Scheduler {
pub shutdown: Arc<RwLock<bool>>,
pub ticker_tx: Sender<bool>,
Expand All @@ -35,7 +32,7 @@ impl Default for Scheduler {
}

impl Scheduler {
pub fn init(&mut self, context: &Context) {
pub async fn init(&mut self, context: &Context) {
if self.inited {
return;
}
Expand All @@ -48,10 +45,15 @@ impl Scheduler {

self.inited = true;

let mut ticker_rx = self.ticker_tx.subscribe();
let ticker_tx = self.ticker_tx.clone();

tokio::spawn(async move {
'next_tick: while let Ok(true) = ticker_rx.recv().await {
let ticker_rx = ticker_tx.subscribe().recv().await;
if let Err(e) = ticker_rx {
error!(?e, "Could not subscribe to ticker starter");
return;
}
'next_tick: loop {
let shutdown = {
let r = shutdown.read().await;
*r
Expand Down Expand Up @@ -195,40 +197,20 @@ impl Scheduler {
pub async fn shutdown(&mut self) {
let mut w = self.shutdown.write().await;
*w = true;

if let Err(e) = self.ticker_tx.send(false) {
error!("Error sending tick {:?}", e);
}
}

pub fn tick(&self) -> Result<(), JobSchedulerError> {
if let Err(e) = self.ticker_tx.send(true) {
error!("Error sending tick {:?}", e);
Err(JobSchedulerError::TickError)
} else {
Ok(())
}
}

pub fn start(&mut self) -> StartResult {
pub async fn start(&mut self) -> Result<(), JobSchedulerError> {
if self.ticking {
Err(JobSchedulerError::TickError)
} else {
self.ticking = true;
let tx = self.ticker_tx.clone();
let shutdown = self.shutdown.clone();
Ok(tokio::spawn(async move {
loop {
if let Err(e) = tx.send(true) {
let shutdown = { *(shutdown.read().await) };
if shutdown {
return;
}
error!("Tick send error {:?}", e);
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}))
if let Err(e) = tx.send(true) {
error!(?e, "Tick send error");
Err(JobSchedulerError::TickError)
} else {
Ok(())
}
}
}
}

0 comments on commit 8752238

Please sign in to comment.