Skip to content

Commit

Permalink
Async all the things
Browse files Browse the repository at this point in the history
  • Loading branch information
mvniekerk committed Aug 10, 2022
1 parent 943ca69 commit f043783
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 188 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tokio-cron-scheduler"
version = "0.7.6"
version = "0.8.0"
authors = ["Michael van Niekerk <[email protected]>"]
edition = "2018"

Expand Down
32 changes: 16 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,21 @@ async fn main() {

sched.add(Job::new("1/10 * * * * *", |uuid, l| {
println!("I run every 10 seconds");
}).unwrap());
}).await.unwrap());

sched.add(Job::new_async("1/7 * * * * *", |uuid, l| Box::pin( async {
println!("I run async every 7 seconds");
})).unwrap());
})).await.unwrap());

sched.add(Job::new("1/30 * * * * *", |uuid, l| {
println!("I run every 30 seconds");
}).unwrap());
}).await.unwrap());

sched.add(
Job::new_one_shot(Duration::from_secs(18), |_uuid, _l| {
println!("{:?} I'm only run once", chrono::Utc::now());
}).unwrap()
);
).await;

let mut jj = Job::new_repeated(Duration::from_secs(8), |_uuid, _l| {
println!("{:?} I'm repeated every 8 seconds", chrono::Utc::now());
Expand All @@ -81,60 +81,60 @@ async fn main() {
Box::pin(async move {
println!("Job {:?} was started, notification {:?} ran ({:?})", job_id, notification_id, type_of_notification);
})
}));
})).await;

jj.on_stop_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
Box::pin(async move {
println!("Job {:?} was completed, notification {:?} ran ({:?})", job_id, notification_id, type_of_notification);
})
}));
})).await;

jj.on_removed_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
Box::pin(async move {
println!("Job {:?} was removed, notification {:?} ran ({:?})", job_id, notification_id, type_of_notification);
})
}));
sched.add(jj);
})).await;
sched.add(jj).await;

let five_s_job = Job::new("1/5 * * * * *", |_uuid, _l| {
println!("{:?} I run every 5 seconds", chrono::Utc::now());
})
.unwrap();
sched.add(five_s_job);
sched.add(five_s_job).await;

let four_s_job_async = Job::new_async("1/4 * * * * *", |_uuid, _l| Box::pin(async move {
println!("{:?} I run async every 4 seconds", chrono::Utc::now());
})).unwrap();
sched.add(four_s_job_async);
sched.add(four_s_job_async).await;

sched.add(
Job::new("1/30 * * * * *", |_uuid, _l| {
println!("{:?} I run every 30 seconds", chrono::Utc::now());
})
.unwrap(),
);
).await;

sched.add(
Job::new_one_shot(Duration::from_secs(18), |_uuid, _l| {
println!("{:?} I'm only run once", chrono::Utc::now());
}).unwrap()
);
).await;

sched.add(
Job::new_one_shot_async(Duration::from_secs(16), |_uuid, _l| Box::pin( async move {
println!("{:?} I'm only run once async", chrono::Utc::now());
})).unwrap()
);
).await;

let jj = Job::new_repeated(Duration::from_secs(8), |_uuid, _l| {
println!("{:?} I'm repeated every 8 seconds", chrono::Utc::now());
}).unwrap();
sched.add(jj);
sched.add(jj).await;

let jja = Job::new_repeated_async(Duration::from_secs(7), |_uuid, _l| Box::pin(async move {
println!("{:?} I'm repeated async every 7 seconds", chrono::Utc::now());
})).unwrap();
sched.add(jja);
sched.add(jja).await;

#[cfg(feature = "signal")]
sched.shutdown_on_ctrl_c();
Expand All @@ -145,7 +145,7 @@ async fn main() {
})
}));

sched.start();
sched.start().await;
}
```

Expand Down
116 changes: 63 additions & 53 deletions examples/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,21 @@ pub async fn run_example(mut sched: JobScheduler) -> Result<()> {

// Adding a job notification without it being added to the scheduler will automatically add it to
// the job store, but with stopped marking
five_s_job.on_removed_notification_add(
&sched,
Box::new(|job_id, notification_id, type_of_notification| {
Box::pin(async move {
info!(
"5s Job {:?} was removed, notification {:?} ran ({:?})",
job_id, notification_id, type_of_notification
);
})
}),
)?;
five_s_job
.on_removed_notification_add(
&sched,
Box::new(|job_id, notification_id, type_of_notification| {
Box::pin(async move {
info!(
"5s Job {:?} was removed, notification {:?} ran ({:?})",
job_id, notification_id, type_of_notification
);
})
}),
)
.await?;
let five_s_job_guid = five_s_job.guid();
sched.add(five_s_job)?;
sched.add(five_s_job).await?;

let mut four_s_job_async = Job::new_async("1/4 * * * * *", |uuid, _l| {
Box::pin(async move {
Expand All @@ -53,66 +55,74 @@ pub async fn run_example(mut sched: JobScheduler) -> Result<()> {
Box::pin(async move {
info!("4s Job {:?} ran on start notification {:?} ({:?})", job_id, notification_id, type_of_notification);
info!("This should only run once since we're going to remove this notification immediately.");
info!("Removed? {:?}", four_s_job_async_clone.on_start_notification_remove(&js, &notification_id));
info!("Removed? {:?}", four_s_job_async_clone.on_start_notification_remove(&js, &notification_id).await);
})
}))?;

four_s_job_async.on_done_notification_add(
&sched,
Box::new(|job_id, notification_id, type_of_notification| {
Box::pin(async move {
info!(
"4s Job {:?} completed and ran notification {:?} ({:?})",
job_id, notification_id, type_of_notification
);
})
}),
)?;
})).await?;

four_s_job_async
.on_done_notification_add(
&sched,
Box::new(|job_id, notification_id, type_of_notification| {
Box::pin(async move {
info!(
"4s Job {:?} completed and ran notification {:?} ({:?})",
job_id, notification_id, type_of_notification
);
})
}),
)
.await?;

let four_s_job_guid = four_s_job_async.guid();
sched.add(four_s_job_async)?;
sched.add(four_s_job_async).await?;

sched.add(
Job::new("1/30 * * * * *", |uuid, _l| {
info!("I run every 30 seconds id {:?}", uuid);
})
.unwrap(),
)?;
sched
.add(
Job::new("1/30 * * * * *", |uuid, _l| {
info!("I run every 30 seconds id {:?}", uuid);
})
.unwrap(),
)
.await?;

info!(
"Sched one shot for {:?}",
chrono::Utc::now()
.checked_add_signed(time::Duration::seconds(10))
.unwrap()
);
sched.add(
Job::new_one_shot(Duration::from_secs(10), |_uuid, _l| {
info!("I'm only run once");
})
.unwrap(),
)?;
sched
.add(
Job::new_one_shot(Duration::from_secs(10), |_uuid, _l| {
info!("I'm only run once");
})
.unwrap(),
)
.await?;

info!(
"Sched one shot async for {:?}",
chrono::Utc::now()
.checked_add_signed(time::Duration::seconds(16))
.unwrap()
);
sched.add(
Job::new_one_shot_async(Duration::from_secs(16), |_uuid, _l| {
Box::pin(async move {
info!("I'm only run once async");
sched
.add(
Job::new_one_shot_async(Duration::from_secs(16), |_uuid, _l| {
Box::pin(async move {
info!("I'm only run once async");
})
})
})
.unwrap(),
)?;
.unwrap(),
)
.await?;

let jj = Job::new_repeated(Duration::from_secs(8), |_uuid, _l| {
info!("I'm repeated every 8 seconds");
})
.unwrap();
let jj_guid = jj.guid();
sched.add(jj)?;
sched.add(jj).await?;

let jja = Job::new_repeated_async(Duration::from_secs(7), |_uuid, _l| {
Box::pin(async move {
Expand All @@ -121,20 +131,20 @@ pub async fn run_example(mut sched: JobScheduler) -> Result<()> {
})
.unwrap();
let jja_guid = jja.guid();
sched.add(jja)?;
sched.add(jja).await?;

let start = sched.start();
let start = sched.start().await;
if start.is_err() {
error!("Error starting scheduler");
return Ok(());
}
tokio::time::sleep(Duration::from_secs(20)).await;

info!("Remove 4, 5, 7 and 8 sec jobs");
sched.remove(&five_s_job_guid)?;
sched.remove(&four_s_job_guid)?;
sched.remove(&jj_guid)?;
sched.remove(&jja_guid)?;
sched.remove(&five_s_job_guid).await?;
sched.remove(&four_s_job_guid).await?;
sched.remove(&jj_guid).await?;
sched.remove(&jja_guid).await?;

tokio::time::sleep(Duration::from_secs(40)).await;

Expand Down
2 changes: 1 addition & 1 deletion examples/simple_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ async fn main() {
.with_max_level(Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("Setting default subscriber failed");
let sched = JobScheduler::new();
let sched = JobScheduler::new().await;
let sched = sched.unwrap();
run_example(sched).await;
}
2 changes: 1 addition & 1 deletion examples/simple_job_tokio_in_a_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn start() -> Result<(), Box<dyn Error>> {
.finish();
tracing::subscriber::set_global_default(subscriber).expect("Setting default subscriber failed");
info!("Creating scheduler");
let sched = JobScheduler::new()?;
let sched = JobScheduler::new().await?;
info!("Run example");
run_example(sched).await;
Ok(())
Expand Down
Loading

0 comments on commit f043783

Please sign in to comment.