Skip to content

Commit

Permalink
Rewrite IdFactory and IdFactoryWithReuse (vercel/turborepo#8769)
Browse files Browse the repository at this point in the history
### Description

The real goal here was to extend `IdFactory<T>` to work with 64-bit ids, which I'll need soon for globally unique (and non-reusable) "execution ids" (vercel/turborepo#8771) to support the safety requirements of local uncached Vcs.

I got a little carried away and essentially rewrote this:

- (Debatable if this is an improvement or not) ID generation re-use requires an almost-but-not-entirely-free check of the concurrent queue, so it is now opt-in using the `IdFactoryWithReuse`.

- ID generation is always performed with an AtomicU64 (which shouldn't really be any more or less expensive than AtomicU32 on 64 bit architectures).

- u32 overflow detection is performed by using a `TryFrom` conversion from a NonZeroU64. Previously we could only detect and panic on the first id generated after overflow. Now we should detect and panic on (basically) all ids generated after overflow.

- New versions of `concurrent-queue` make the `unbounded` constructor `const`, which allows us to eliminate the use of `Lazy`.

- Add a unit test for overflow detection

### Testing Instructions

```
cargo nextest r -p turbo-tasks -p turbo-tasks-memory
```
  • Loading branch information
bgw committed Jul 17, 2024
1 parent 8ca881b commit 610c84e
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 29 deletions.
6 changes: 3 additions & 3 deletions crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use turbo_tasks::{
TransientTaskType,
},
event::EventListener,
util::{IdFactory, NoMoveVec},
util::{IdFactoryWithReuse, NoMoveVec},
CellId, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, Unused,
};

Expand All @@ -45,7 +45,7 @@ fn prehash_task_type(task_type: PersistentTaskType) -> PreHashed<PersistentTaskT
pub struct MemoryBackend {
memory_tasks: NoMoveVec<Task, 13>,
backend_jobs: NoMoveVec<Job>,
backend_job_id_factory: IdFactory<BackendJobId>,
backend_job_id_factory: IdFactoryWithReuse<BackendJobId>,
task_cache:
DashMap<Arc<PreHashed<PersistentTaskType>>, TaskId, BuildHasherDefault<PassThroughHash>>,
memory_limit: usize,
Expand All @@ -65,7 +65,7 @@ impl MemoryBackend {
Self {
memory_tasks: NoMoveVec::new(),
backend_jobs: NoMoveVec::new(),
backend_job_id_factory: IdFactory::new(),
backend_job_id_factory: IdFactoryWithReuse::new(),
task_cache: DashMap::with_hasher_and_shard_amount(
Default::default(),
(std::thread::available_parallelism().map_or(1, usize::from) * 32)
Expand Down
6 changes: 3 additions & 3 deletions crates/turbo-tasks-memory/src/memory_backend_with_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use turbo_tasks::{
ActivateResult, DeactivateResult, PersistResult, PersistTaskState, PersistedGraph,
PersistedGraphApi, ReadTaskState, TaskCell, TaskData,
},
util::{IdFactory, NoMoveVec, SharedError},
util::{IdFactoryWithReuse, NoMoveVec, SharedError},
CellId, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, Unused,
};

Expand Down Expand Up @@ -131,7 +131,7 @@ pub struct MemoryBackendWithPersistedGraph<P: PersistedGraph + 'static> {
pub pg: P,
tasks: NoMoveVec<Task>,
cache: DashMap<PersistentTaskType, TaskId>,
background_job_id_factory: IdFactory<BackendJobId>,
background_job_id_factory: IdFactoryWithReuse<BackendJobId>,
background_jobs: NoMoveVec<BackgroundJob>,
only_known_to_memory_tasks: DashSet<TaskId>,
/// Tasks that were selected to persist
Expand All @@ -154,7 +154,7 @@ pub struct MemoryBackendWithPersistedGraph<P: PersistedGraph + 'static> {

impl<P: PersistedGraph> MemoryBackendWithPersistedGraph<P> {
pub fn new(pg: P) -> Self {
let background_job_id_factory = IdFactory::new();
let background_job_id_factory = IdFactoryWithReuse::new();
let persist_job = background_job_id_factory.get();
Self {
pg,
Expand Down
16 changes: 15 additions & 1 deletion crates/turbo-tasks/src/id.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
fmt::{Debug, Display},
mem::transmute_copy,
num::NonZeroU32,
num::{NonZeroU32, NonZeroU64, TryFromIntError},
ops::Deref,
};

Expand All @@ -17,6 +17,8 @@ macro_rules! define_id {
}

impl $name {
/// Constructs a wrapper type from the numeric identifier.
///
/// # Safety
///
/// The passed `id` must not be zero.
Expand All @@ -39,11 +41,23 @@ macro_rules! define_id {
}
}

/// Converts a numeric identifier to the wrapper type.
///
/// Panics if the given id value is zero.
impl From<u32> for $name {
fn from(id: u32) -> Self {
Self { id: NonZeroU32::new(id).expect("Ids can only be created from non zero values") }
}
}

/// Converts a numeric identifier to the wrapper type.
impl TryFrom<NonZeroU64> for $name {
type Error = TryFromIntError;

fn try_from(id: NonZeroU64) -> Result<Self, Self::Error> {
Ok(Self { id: NonZeroU32::try_from(id)? })
}
}
};
($name:ident) => {
define_id!(internal $name);
Expand Down
104 changes: 89 additions & 15 deletions crates/turbo-tasks/src/id_factory.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,118 @@
use std::{
any::type_name,
marker::PhantomData,
ops::Deref,
sync::atomic::{AtomicU32, Ordering},
num::NonZeroU64,
sync::atomic::{AtomicU64, Ordering},
};

use concurrent_queue::ConcurrentQueue;
use once_cell::sync::Lazy;

/// A helper for constructing id types like [`FunctionId`][crate::FunctionId].
///
/// For ids that may be re-used, see [`IdFactoryWithReuse`].
pub struct IdFactory<T> {
next_id: AtomicU32,
free_ids: Lazy<ConcurrentQueue<T>>,
phantom_data: PhantomData<T>,
next_id: AtomicU64,
_phantom_data: PhantomData<T>,
}

impl<T: From<u32> + Deref<Target = u32>> Default for IdFactory<T> {
impl<T> IdFactory<T> {
pub const fn new() -> Self {
Self {
next_id: AtomicU64::new(1),
_phantom_data: PhantomData,
}
}
}

impl<T> Default for IdFactory<T> {
fn default() -> Self {
Self::new()
}
}

impl<T: From<u32> + Deref<Target = u32>> IdFactory<T> {
impl<T> IdFactory<T>
where
T: TryFrom<NonZeroU64>,
{
/// Return a unique new id.
///
/// Panics (best-effort) if the id type overflows.
pub fn get(&self) -> T {
// Safety: u64 will not overflow. This is *very* unlikely to happen (would take
// decades).
let new_id =
unsafe { NonZeroU64::new_unchecked(self.next_id.fetch_add(1, Ordering::Relaxed)) };

// Use the extra bits of the AtomicU64 as cheap overflow detection when the
// value is less than 64 bits.
match new_id.try_into() {
Ok(id) => id,
Err(_) => panic!(
"Overflow detected while attempting to generate a unique {}",
type_name::<T>(),
),
}
}
}

/// An [`IdFactory`], but extended with a free list to allow for id reuse for
/// ids such as [`BackendJobId`][crate::backend::BackendJobId].
pub struct IdFactoryWithReuse<T> {
factory: IdFactory<T>,
free_ids: ConcurrentQueue<T>,
}

impl<T> IdFactoryWithReuse<T> {
pub const fn new() -> Self {
Self {
next_id: AtomicU32::new(1),
free_ids: Lazy::new(|| ConcurrentQueue::unbounded()),
phantom_data: PhantomData,
factory: IdFactory::new(),
free_ids: ConcurrentQueue::unbounded(),
}
}
}

impl<T> Default for IdFactoryWithReuse<T> {
fn default() -> Self {
Self::new()
}
}

impl<T> IdFactoryWithReuse<T>
where
T: TryFrom<NonZeroU64>,
{
/// Return a new or potentially reused id.
///
/// Panics (best-effort) if the id type overflows.
pub fn get(&self) -> T {
if let Ok(id) = self.free_ids.pop() {
return id;
}
self.next_id.fetch_add(1, Ordering::Relaxed).into()
self.free_ids.pop().unwrap_or_else(|_| self.factory.get())
}

/// Add an id to the free list, allowing it to be re-used on a subsequent
/// call to [`IdFactoryWithReuse::get`].
///
/// # Safety
///
/// It must be ensured that the id is no longer used
pub unsafe fn reuse(&self, id: T) {
let _ = self.free_ids.push(id);
}
}

#[cfg(test)]
mod tests {
use std::num::NonZeroU8;

use super::*;

#[test]
#[should_panic(expected = "Overflow detected")]
fn test_overflow() {
let factory = IdFactory::<NonZeroU8>::new();
assert_eq!(factory.get(), NonZeroU8::new(1).unwrap());
assert_eq!(factory.get(), NonZeroU8::new(2).unwrap());
for _i in 2..256 {
factory.get();
}
}
}
8 changes: 4 additions & 4 deletions crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
capture_future::{self, CaptureFuture},
event::{Event, EventListener},
id::{BackendJobId, FunctionId, TraitTypeId},
id_factory::IdFactory,
id_factory::IdFactoryWithReuse,
raw_vc::{CellId, RawVc},
registry,
trace::TraceRawVcs,
Expand Down Expand Up @@ -133,7 +133,7 @@ pub trait TaskIdProvider {
fn reuse_task_id(&self, id: Unused<TaskId>);
}

impl TaskIdProvider for IdFactory<TaskId> {
impl TaskIdProvider for IdFactoryWithReuse<TaskId> {
fn get_fresh_task_id(&self) -> Unused<TaskId> {
// Safety: This is a fresh id from the factory
unsafe { Unused::new_unchecked(self.get()) }
Expand Down Expand Up @@ -234,7 +234,7 @@ pub struct UpdateInfo {
pub struct TurboTasks<B: Backend + 'static> {
this: Weak<Self>,
backend: B,
task_id_factory: IdFactory<TaskId>,
task_id_factory: IdFactoryWithReuse<TaskId>,
stopped: AtomicBool,
currently_scheduled_tasks: AtomicUsize,
currently_scheduled_foreground_jobs: AtomicUsize,
Expand Down Expand Up @@ -279,7 +279,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
// so we probably want to make sure that all tasks are joined
// when trying to drop turbo tasks
pub fn new(mut backend: B) -> Arc<Self> {
let task_id_factory = IdFactory::new();
let task_id_factory = IdFactoryWithReuse::new();
backend.initialize(&task_id_factory);
let this = Arc::new_cyclic(|this| Self {
this: this.clone(),
Expand Down
4 changes: 2 additions & 2 deletions crates/turbo-tasks/src/registry.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt::Debug, hash::Hash, ops::Deref};
use std::{fmt::Debug, hash::Hash, num::NonZeroU64, ops::Deref};

use dashmap::{mapref::entry::Entry, DashMap};
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -30,7 +30,7 @@ static TRAIT_TYPES_BY_VALUE: Lazy<DashMap<&'static TraitType, TraitTypeId>> =
static TRAIT_TYPES: Lazy<NoMoveVec<(&'static TraitType, &'static str)>> = Lazy::new(NoMoveVec::new);

fn register_thing<
K: From<u32> + Deref<Target = u32> + Sync + Send + Copy,
K: TryFrom<NonZeroU64> + Deref<Target = u32> + Sync + Send + Copy,
V: Clone + Hash + Ord + Eq + Sync + Send + Copy,
const INITIAL_CAPACITY_BITS: u32,
>(
Expand Down
6 changes: 5 additions & 1 deletion crates/turbo-tasks/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ use anyhow::{anyhow, Error};
use pin_project_lite::pin_project;
use serde::{Deserialize, Deserializer, Serialize, Serializer};

pub use super::{id_factory::IdFactory, no_move_vec::NoMoveVec, once_map::*};
pub use super::{
id_factory::{IdFactory, IdFactoryWithReuse},
no_move_vec::NoMoveVec,
once_map::*,
};

/// A error struct that is backed by an Arc to allow cloning errors
#[derive(Debug, Clone)]
Expand Down

0 comments on commit 610c84e

Please sign in to comment.