Skip to content

Commit

Permalink
Fix lock issue when adding job
Browse files Browse the repository at this point in the history
  • Loading branch information
mvniekerk committed Jan 13, 2023
1 parent c0eb657 commit ad4643e
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 72 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tokio-cron-scheduler"
version = "0.8.3"
version = "0.8.4"
authors = ["Michael van Niekerk <[email protected]>"]
edition = "2018"

Expand Down
117 changes: 48 additions & 69 deletions src/job/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,87 +66,66 @@ impl JobCreator {
})
}

pub fn add(context: &Context, mut job: JobLocked) -> Result<Uuid, JobSchedulerError> {
pub async fn add(context: &Context, mut job: JobLocked) -> Result<Uuid, JobSchedulerError> {
let tx = context.job_create_tx.clone();
let mut rx = context.job_created_tx.subscribe();

let (done_tx, done_rx) = std::sync::mpsc::channel();
let data = job.job_data();
let uuid = job.guid();

tokio::spawn(async move {
let data = job.job_data();
let uuid = job.guid();

if let Err(e) = data {
error!("Error getting job data {:?}", e);
if let Err(e) = done_tx.send(Err(e)) {
error!("Could not notify of error {:?}", e);
}
return;
}
let data = data.unwrap();
let job: Box<JobToRunAsync> = Box::new(move |job_id, job_scheduler| {
let job = job.clone();
Box::pin(async move {
let job_done = {
let w = job.0.write();
if let Err(e) = w {
error!("Error getting job {:?}", e);
return;
}
let mut w = w.unwrap();
w.run(job_scheduler)
};
let job_done = job_done.await;
match job_done {
Err(e) => {
error!("Error running job {:?} {:?}", job_id, e);
}
Ok(val) => {
if !val {
error!("Error running job {:?}", job_id);
}
}
if let Err(e) = data {
error!("Error getting job data {e:?}");
return Err(e);
}
let data = data.unwrap();
let job: Box<JobToRunAsync> = Box::new(move |job_id, job_scheduler| {
let job = job.clone();
Box::pin(async move {
let job_done = {
let w = job.0.write();
if let Err(e) = w {
error!("Error getting job {:?}", e);
return;
}
})
});
let done_tx_on_send = done_tx.clone();
tokio::spawn(async move {
let job = Arc::new(RwLock::new(job));
if let Err(_e) = tx.send((data, job)) {
error!("Error sending new job");
if let Err(e) = done_tx_on_send.send(Err(JobSchedulerError::CantAdd)) {
error!("Error sending failure of adding job {:?}", e);
let mut w = w.unwrap();
w.run(job_scheduler)
};
let job_done = job_done.await;
match job_done {
Err(e) => {
error!("Error running job {:?} {:?}", job_id, e);
}
Ok(val) => {
if !val {
error!("Error running job {:?}", job_id);
}
}
}
});
})
});

while let Ok(val) = rx.recv().await {
match val {
Ok(ret_uuid) => {
if ret_uuid == uuid {
if let Err(e) = done_tx.send(Ok(uuid)) {
error!("Could not send successful addition {:?}", e);
}
break;
}
let job = Arc::new(RwLock::new(job));
if let Err(_e) = tx.send((data, job)) {
error!("Error sending new job");
return Err(JobSchedulerError::CantAdd);
}

while let Ok(val) = rx.recv().await {
match val {
Ok(ret_uuid) => {
if ret_uuid == uuid {
return Ok(uuid);
}
Err((e, Some(ret_uuid))) => {
if ret_uuid == uuid {
if let Err(e) = done_tx.send(Err(e)) {
error!("Could not send failure {:?}", e);
}
break;
}
}
Err((e, Some(ret_uuid))) => {
if ret_uuid == uuid {
return Err(e);
}
_ => {}
}
_ => {}
}
});
}

let uuid = done_rx.recv().map_err(|e| {
error!("Could not receive done from add {:?}", e);
JobSchedulerError::CantAdd
})??;
Ok(uuid)
Err(JobSchedulerError::CantAdd)
}
}
6 changes: 4 additions & 2 deletions src/job_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::sync::Arc;
#[cfg(feature = "signal")]
use tokio::signal::unix::SignalKind;
use tokio::sync::RwLock;
use tracing::error;
use tracing::{error, info};
use uuid::Uuid;

pub type ShutdownNotification =
Expand Down Expand Up @@ -265,12 +265,14 @@ impl JobsSchedulerLocked {
pub async fn add(&self, job: JobLocked) -> Result<Uuid, JobSchedulerError> {
let guid = job.guid();
if !self.inited().await {
info!("Uninited");
let mut s = self.clone();
s.init().await?;
}

let context = self.context.clone();
JobCreator::add(&context, job)?;
JobCreator::add(&context, job).await?;
info!("Job creator created");

Ok(guid)
}
Expand Down

0 comments on commit ad4643e

Please sign in to comment.