From ad4643e351bffe8e50b3bcce59827a749b9a3a10 Mon Sep 17 00:00:00 2001 From: Michael van Niekerk Date: Fri, 13 Jan 2023 19:47:13 +0200 Subject: [PATCH] Fix lock issue when adding job --- Cargo.toml | 2 +- src/job/creator.rs | 117 ++++++++++++++++++------------------------- src/job_scheduler.rs | 6 ++- 3 files changed, 53 insertions(+), 72 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 45f0e28..f6ea46e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tokio-cron-scheduler" -version = "0.8.3" +version = "0.8.4" authors = ["Michael van Niekerk "] edition = "2018" diff --git a/src/job/creator.rs b/src/job/creator.rs index 4916161..12c54a2 100644 --- a/src/job/creator.rs +++ b/src/job/creator.rs @@ -66,87 +66,66 @@ impl JobCreator { }) } - pub fn add(context: &Context, mut job: JobLocked) -> Result { + pub async fn add(context: &Context, mut job: JobLocked) -> Result { let tx = context.job_create_tx.clone(); let mut rx = context.job_created_tx.subscribe(); - let (done_tx, done_rx) = std::sync::mpsc::channel(); + let data = job.job_data(); + let uuid = job.guid(); - tokio::spawn(async move { - let data = job.job_data(); - let uuid = job.guid(); - - if let Err(e) = data { - error!("Error getting job data {:?}", e); - if let Err(e) = done_tx.send(Err(e)) { - error!("Could not notify of error {:?}", e); - } - return; - } - let data = data.unwrap(); - let job: Box = Box::new(move |job_id, job_scheduler| { - let job = job.clone(); - Box::pin(async move { - let job_done = { - let w = job.0.write(); - if let Err(e) = w { - error!("Error getting job {:?}", e); - return; - } - let mut w = w.unwrap(); - w.run(job_scheduler) - }; - let job_done = job_done.await; - match job_done { - Err(e) => { - error!("Error running job {:?} {:?}", job_id, e); - } - Ok(val) => { - if !val { - error!("Error running job {:?}", job_id); - } - } + if let Err(e) = data { + error!("Error getting job data {e:?}"); + return Err(e); + } + let data = data.unwrap(); + let job: Box = Box::new(move |job_id, job_scheduler| { + let job = job.clone(); + Box::pin(async move { + let job_done = { + let w = job.0.write(); + if let Err(e) = w { + error!("Error getting job {:?}", e); + return; } - }) - }); - let done_tx_on_send = done_tx.clone(); - tokio::spawn(async move { - let job = Arc::new(RwLock::new(job)); - if let Err(_e) = tx.send((data, job)) { - error!("Error sending new job"); - if let Err(e) = done_tx_on_send.send(Err(JobSchedulerError::CantAdd)) { - error!("Error sending failure of adding job {:?}", e); + let mut w = w.unwrap(); + w.run(job_scheduler) + }; + let job_done = job_done.await; + match job_done { + Err(e) => { + error!("Error running job {:?} {:?}", job_id, e); + } + Ok(val) => { + if !val { + error!("Error running job {:?}", job_id); + } } } - }); + }) + }); - while let Ok(val) = rx.recv().await { - match val { - Ok(ret_uuid) => { - if ret_uuid == uuid { - if let Err(e) = done_tx.send(Ok(uuid)) { - error!("Could not send successful addition {:?}", e); - } - break; - } + let job = Arc::new(RwLock::new(job)); + if let Err(_e) = tx.send((data, job)) { + error!("Error sending new job"); + return Err(JobSchedulerError::CantAdd); + } + + while let Ok(val) = rx.recv().await { + match val { + Ok(ret_uuid) => { + if ret_uuid == uuid { + return Ok(uuid); } - Err((e, Some(ret_uuid))) => { - if ret_uuid == uuid { - if let Err(e) = done_tx.send(Err(e)) { - error!("Could not send failure {:?}", e); - } - break; - } + } + Err((e, Some(ret_uuid))) => { + if ret_uuid == uuid { + return Err(e); } - _ => {} } + _ => {} } - }); + } - let uuid = done_rx.recv().map_err(|e| { - error!("Could not receive done from add {:?}", e); - JobSchedulerError::CantAdd - })??; - Ok(uuid) + Err(JobSchedulerError::CantAdd) } } diff --git a/src/job_scheduler.rs b/src/job_scheduler.rs index 261c4a8..650290a 100644 --- a/src/job_scheduler.rs +++ b/src/job_scheduler.rs @@ -15,7 +15,7 @@ use std::sync::Arc; #[cfg(feature = "signal")] use tokio::signal::unix::SignalKind; use tokio::sync::RwLock; -use tracing::error; +use tracing::{error, info}; use uuid::Uuid; pub type ShutdownNotification = @@ -265,12 +265,14 @@ impl JobsSchedulerLocked { pub async fn add(&self, job: JobLocked) -> Result { let guid = job.guid(); if !self.inited().await { + info!("Uninited"); let mut s = self.clone(); s.init().await?; } let context = self.context.clone(); - JobCreator::add(&context, job)?; + JobCreator::add(&context, job).await?; + info!("Job creator created"); Ok(guid) }