Skip to content

Commit c365421

Browse files
authored
Initial code for transactional outbox pattern (#13)
* Support transactional outbox pattern * Create job ID before add call This makes it easier to get the job ID when the actual job addition is deferred, as when using an outbox pattern. * finish outbox listener * functions to add to the outbox * notify when adding to the outbox * cargo.toml metadata * comment about not ready for use * test fixes
1 parent 1895c3c commit c365421

File tree

12 files changed

+462
-19
lines changed

12 files changed

+462
-19
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[workspace]
22
members = [
33
"effectum",
4+
"outbox",
45
"stress_test"
56
]
67
resolver = "2"

effectum/Cargo.toml

+1-2
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ thiserror = "1"
3131
time = { version = "0.3", features = ["serde"] }
3232
tokio = { version = "1.32.0", features = ["rt", "macros", "time", "sync"] }
3333
tracing = "0.1.37"
34-
ulid = { version = "1.1.0", features = ["uuid"] }
35-
uuid = "1.4.1"
34+
uuid = { version = "1.7.0", features = ["v7", "serde"] }
3635

3736
[dev-dependencies]
3837
color-eyre = "0.6.2"

effectum/src/add_job.rs

+20-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{borrow::Cow, time::Duration};
22

33
use ahash::{HashMap, HashSet};
4+
use serde::{Deserialize, Serialize};
45
use time::OffsetDateTime;
56
use tracing::{instrument, Span};
67
use uuid::Uuid;
@@ -18,8 +19,13 @@ use crate::{
1819
};
1920

2021
/// A job to be submitted to the queue.
21-
#[derive(Debug, Clone)]
22+
/// Jobs are uniquely identified by their `id`, so adding a job with the same ID twice will fail.
23+
/// If you want to clone the same Job object and submit it multiple times, use [JobBuilder::clone_as_new]
24+
/// to generate a new ID with each clone.
25+
#[derive(Debug, Clone, Serialize, Deserialize)]
2226
pub struct Job {
27+
/// A unique identifier for the job.
28+
pub id: Uuid,
2329
/// The name of the job, which matches the name used in the [JobRunner](crate::JobRunner) for the job.
2430
pub job_type: Cow<'static, str>,
2531
/// Jobs with higher `priority` will be executed first.
@@ -51,10 +57,17 @@ impl Job {
5157
pub fn builder(job_type: impl Into<Cow<'static, str>>) -> JobBuilder {
5258
JobBuilder::new(job_type)
5359
}
60+
61+
/// Clone the [Job] with a new `id`.
62+
pub fn clone_as_new(&self) -> Self {
63+
let mut job = self.clone();
64+
job.id = Uuid::now_v7();
65+
job
66+
}
5467
}
5568

5669
/// `Retries` controls the exponential backoff behavior when retrying failed jobs.
57-
#[derive(Debug, Clone)]
70+
#[derive(Debug, Clone, Serialize, Deserialize)]
5871
pub struct Retries {
5972
/// How many times to retry a job before it is considered to have failed permanently.
6073
pub max_retries: u32,
@@ -83,6 +96,7 @@ impl Default for Retries {
8396
impl Default for Job {
8497
fn default() -> Self {
8598
Self {
99+
id: Uuid::now_v7(),
86100
job_type: Default::default(),
87101
priority: 0,
88102
weight: 1,
@@ -204,7 +218,7 @@ impl JobBuilder {
204218
/// Specified fields of a job to be updated, using the [Queue::update_job] method.
205219
/// All of these fields except the job ID are optional, so the update can set
206220
/// only the desired fields.
207-
#[derive(Debug, Clone)]
221+
#[derive(Debug, Clone, Serialize, Deserialize)]
208222
pub struct JobUpdate {
209223
/// The ID of the job to update
210224
pub id: Uuid,
@@ -474,7 +488,7 @@ mod tests {
474488
use std::{sync::Arc, time::Duration};
475489

476490
use temp_dir::TempDir;
477-
use ulid::Ulid;
491+
use uuid::Uuid;
478492

479493
use crate::{
480494
test_util::{
@@ -675,7 +689,7 @@ mod tests {
675689
let result = test
676690
.queue
677691
.update_job(
678-
JobUpdate::builder(Ulid::new().into())
692+
JobUpdate::builder(Uuid::now_v7().into())
679693
.run_at(test.time.now())
680694
.build(),
681695
)
@@ -777,7 +791,7 @@ mod tests {
777791
#[tokio::test]
778792
async fn cancel_nonexistent_job() {
779793
let test = TestEnvironment::new().await;
780-
let result = test.queue.cancel_job(Ulid::new().into()).await;
794+
let result = test.queue.cancel_job(Uuid::now_v7().into()).await;
781795

782796
assert!(matches!(result, Err(Error::NotFound)));
783797
}

effectum/src/db_writer/add_job.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,10 @@ pub(super) fn execute_add_job_stmt(
4747
now: OffsetDateTime,
4848
status: Option<JobState>,
4949
) -> Result<(i64, Uuid)> {
50-
let external_id: Uuid = ulid::Ulid::new().into();
5150
let run_time = job_config.run_at.unwrap_or(now).unix_timestamp();
5251

5352
jobs_stmt.execute(named_params! {
54-
"$external_id": &external_id,
53+
"$external_id": &job_config.id,
5554
"$job_type": job_config.job_type,
5655
"$priority": job_config.priority,
5756
"$weight": job_config.weight,
@@ -70,7 +69,7 @@ pub(super) fn execute_add_job_stmt(
7069

7170
let job_id = tx.last_insert_rowid();
7271

73-
Ok((job_id, external_id))
72+
Ok((job_id, job_config.id))
7473
}
7574

7675
pub(super) fn execute_add_active_job_stmt(

effectum/src/db_writer/recurring.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use rusqlite::{params, Connection, OptionalExtension, Statement};
44
use time::OffsetDateTime;
55
use tokio::sync::oneshot;
66
use tracing::{event, Level};
7+
use uuid::Uuid;
78

89
use super::{
910
add_job::{execute_add_active_job_stmt, INSERT_ACTIVE_JOBS_QUERY},
@@ -168,9 +169,10 @@ pub(super) fn schedule_next_recurring_job(
168169
tx: &Connection,
169170
now: OffsetDateTime,
170171
insert_job_stmt: &mut Statement,
171-
job: Job,
172+
mut job: Job,
172173
) -> Result<(), Error> {
173174
// Finally, add the version of the job that will actually run the first time.
175+
job.id = Uuid::now_v7();
174176
let (job_id, _) = execute_add_job_stmt(tx, insert_job_stmt, &job, now, None)?;
175177
let mut active_insert_stmt = tx.prepare_cached(INSERT_ACTIVE_JOBS_QUERY)?;
176178
execute_add_active_job_stmt(&mut active_insert_stmt, job_id, &job, now)?;

effectum/src/error.rs

+7
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ pub enum Error {
7272
RecurringJobAlreadyExists(String),
7373
}
7474

75+
impl Error {
76+
/// Returns if the error indicates that an update or cancel request occurred too late
77+
pub fn is_update_too_late(&self) -> bool {
78+
matches!(self, Error::JobRunning | Error::JobFinished)
79+
}
80+
}
81+
7582
impl From<InteractError> for Error {
7683
fn from(e: InteractError) -> Self {
7784
Error::DbInteract(e.to_string())

effectum/src/recurring.rs

+36-6
Original file line numberDiff line numberDiff line change
@@ -1311,11 +1311,21 @@ mod tests {
13111311
interval: Duration::from_secs(10),
13121312
};
13131313
test.queue
1314-
.add_recurring_job("job_id_3".to_string(), schedule.clone(), job.clone(), false)
1314+
.add_recurring_job(
1315+
"job_id_3".to_string(),
1316+
schedule.clone(),
1317+
job.clone_as_new(),
1318+
false,
1319+
)
13151320
.await
13161321
.expect("add_recurring_job");
13171322
test.queue
1318-
.add_recurring_job("job_id_1".to_string(), schedule.clone(), job.clone(), false)
1323+
.add_recurring_job(
1324+
"job_id_1".to_string(),
1325+
schedule.clone(),
1326+
job.clone_as_new(),
1327+
false,
1328+
)
13191329
.await
13201330
.expect("add_recurring_job");
13211331
test.queue
@@ -1328,7 +1338,12 @@ mod tests {
13281338
.await
13291339
.expect("add_recurring_job");
13301340
test.queue
1331-
.add_recurring_job("job_id_2".to_string(), schedule.clone(), job.clone(), false)
1341+
.add_recurring_job(
1342+
"job_id_2".to_string(),
1343+
schedule.clone(),
1344+
job.clone_as_new(),
1345+
false,
1346+
)
13321347
.await
13331348
.expect("add_recurring_job");
13341349

@@ -1354,11 +1369,21 @@ mod tests {
13541369
interval: Duration::from_secs(10),
13551370
};
13561371
test.queue
1357-
.add_recurring_job("job_id_3".to_string(), schedule.clone(), job.clone(), false)
1372+
.add_recurring_job(
1373+
"job_id_3".to_string(),
1374+
schedule.clone(),
1375+
job.clone_as_new(),
1376+
false,
1377+
)
13581378
.await
13591379
.expect("add_recurring_job");
13601380
test.queue
1361-
.add_recurring_job("job_id_1".to_string(), schedule.clone(), job.clone(), false)
1381+
.add_recurring_job(
1382+
"job_id_1".to_string(),
1383+
schedule.clone(),
1384+
job.clone_as_new(),
1385+
false,
1386+
)
13621387
.await
13631388
.expect("add_recurring_job");
13641389
test.queue
@@ -1371,7 +1396,12 @@ mod tests {
13711396
.await
13721397
.expect("add_recurring_job");
13731398
test.queue
1374-
.add_recurring_job("job_id_2".to_string(), schedule.clone(), job.clone(), false)
1399+
.add_recurring_job(
1400+
"job_id_2".to_string(),
1401+
schedule.clone(),
1402+
job.clone_as_new(),
1403+
false,
1404+
)
13751405
.await
13761406
.expect("add_recurring_job");
13771407

effectum/src/worker.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,7 @@ mod tests {
483483
#[should_panic]
484484
async fn worker_without_jobs_should_panic() {
485485
let test = TestEnvironment::new().await;
486-
Worker::builder(&test.queue, test.context.clone())
486+
let _ = Worker::builder(&test.queue, test.context.clone())
487487
.build()
488488
.await
489489
.unwrap();

outbox/Cargo.toml

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
[package]
2+
name = "effectum-outbox"
3+
version = "0.1.0"
4+
edition = "2021"
5+
publish = false
6+
authors = ["Daniel Imfeld"]
7+
license = "MIT OR Apache-2.0"
8+
repository = "https://github.com/dimfeld/effectum"
9+
keywords = ["job", "task", "queue", "sqlite"]
10+
categories = ["asynchronous"]
11+
12+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
13+
14+
[dependencies]
15+
effectum = { path = "../effectum" }
16+
futures = "0.3.30"
17+
serde = { version = "1.0.197", features = ["derive"] }
18+
sqlx = { version = "0.7.3", features = ["uuid", "time", "json"] }
19+
thiserror = "1.0.57"
20+
time = "0.3.34"
21+
tokio = { version = "1.36.0", features = ["time", "sync"] }
22+
tracing = { version = "0.1.40", optional = true }
23+
uuid = { version = "1.7.0", features = ["serde"] }
24+
25+
[features]
26+
default=["postgres", "tracing"]
27+
postgres = ["sqlx/postgres"]
28+
tracing = ["dep:tracing"]

outbox/src/README.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# effectum-outbox
2+
3+
An implementation of the transactional outbox pattern for Effectum. This code is not ready for use yet,
4+
and development is mostly on hold until I build the server mode for Effectum, at which time it will
5+
make more sense to finish this.

outbox/src/lib.rs

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#![warn(missing_docs)]
2+
3+
//! Outbox pattern implementation for Effectum
4+
//! This is not ready for use yet.
5+
6+
#[cfg(feature = "postgres")]
7+
pub mod postgres;
8+
9+
#[cfg(feature = "postgres")]
10+
pub use postgres::*;

0 commit comments

Comments
 (0)