Skip to content

Commit

Permalink
Add next tick for job in job scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
mvniekerk committed Sep 6, 2022
1 parent f043783 commit 382578e
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 4 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tokio-cron-scheduler"
version = "0.8.0"
version = "0.8.1"
authors = ["Michael van Niekerk <[email protected]>"]
edition = "2018"

Expand Down
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,13 @@ async fn main() {
println!("I run every 10 seconds");
}).await.unwrap());

sched.add(Job::new_async("1/7 * * * * *", |uuid, l| Box::pin( async {
sched.add(Job::new_async("1/7 * * * * *", |uuid, mut l| Box::pin( async {
println!("I run async every 7 seconds");
let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Ok(Some(ts)) => info!("Next time for 7s is {:?}", ts),
_ => warn!("Could not get next tick for 7s job"),
}
})).await.unwrap());

sched.add(Job::new("1/30 * * * * *", |uuid, l| {
Expand Down
10 changes: 8 additions & 2 deletions examples/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use std::time::Duration;
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{error, info};
use tracing::{error, info, warn};

pub async fn run_example(mut sched: JobScheduler) -> Result<()> {
#[cfg(feature = "signal")]
Expand Down Expand Up @@ -40,9 +40,15 @@ pub async fn run_example(mut sched: JobScheduler) -> Result<()> {
let five_s_job_guid = five_s_job.guid();
sched.add(five_s_job).await?;

let mut four_s_job_async = Job::new_async("1/4 * * * * *", |uuid, _l| {
let mut four_s_job_async = Job::new_async("1/4 * * * * *", |uuid, l| {
let mut l = l.clone();
Box::pin(async move {
info!("I run async every 4 seconds id {:?}", uuid);
let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Ok(Some(ts)) => info!("Next time for 4s is {:?}", ts),
_ => warn!("Could not get next tick for 4s job"),
}
})
})
.unwrap();
Expand Down
23 changes: 23 additions & 0 deletions src/job_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::simple::{
SimpleJobCode, SimpleMetadataStore, SimpleNotificationCode, SimpleNotificationStore,
};
use crate::store::{MetaDataStorage, NotificationStore};
use chrono::{DateTime, NaiveDateTime, Utc};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
Expand Down Expand Up @@ -382,6 +383,28 @@ impl JobsSchedulerLocked {
}
}

/// `next_tick_for_job` returns the date/time for when the next tick will
/// be for a job
pub async fn next_tick_for_job(
&mut self,
job_id: Uuid,
) -> Result<Option<DateTime<Utc>>, JobSchedulerError> {
if !self.inited().await {
let mut s = self.clone();
s.init().await?;
}
let next_tick = {
let mut r = self.context.metadata_storage.write().await;
r.get(job_id).await.map(|v| {
v.map(|vv| vv.next_tick)
.filter(|t| *t != 0)
.map(|ts| NaiveDateTime::from_timestamp(ts as i64, 0))
.map(|ts| DateTime::from_utc(ts, Utc))
})
};
next_tick
}

///
/// Shut the scheduler down
pub async fn shutdown(&mut self) -> Result<(), JobSchedulerError> {
Expand Down

0 comments on commit 382578e

Please sign in to comment.