From 4ff1bb82cdd9ed72771ad83a46ed4bba297d7598 Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Wed, 17 Jul 2024 08:57:46 -0700 Subject: [PATCH] Rewrite IdFactory and IdFactoryWithReuse (vercel/turbo#8769) ### Description The real goal here was to extend `IdFactory` to work with 64-bit ids, which I'll need soon for globally unique (and non-reusable) "execution ids" (vercel/turbo#8771) to support the safety requirements of local uncached Vcs. I got a little carried away and essentially rewrote this: - (Debatable if this is an improvement or not) ID generation re-use requires an almost-but-not-entirely-free check of the concurrent queue, so it is now opt-in using the `IdFactoryWithReuse`. - ID generation is always performed with an AtomicU64 (which shouldn't really be any more or less expensive than AtomicU32 on 64 bit architectures). - u32 overflow detection is performed by using a `TryFrom` conversion from a NonZeroU64. Previously we could only detect and panic on the first id generated after overflow. Now we should detect and panic on (basically) all ids generated after overflow. - New versions of `concurrent-queue` make the `unbounded` constructor `const`, which allows us to eliminate the use of `Lazy`. - Add a unit test for overflow detection ### Testing Instructions ``` cargo nextest r -p turbo-tasks -p turbo-tasks-memory ``` --- .../turbo-tasks-memory/src/memory_backend.rs | 6 +- .../src/memory_backend_with_pg.rs | 6 +- crates/turbo-tasks/src/id.rs | 16 ++- crates/turbo-tasks/src/id_factory.rs | 104 +++++++++++++++--- crates/turbo-tasks/src/manager.rs | 8 +- crates/turbo-tasks/src/registry.rs | 4 +- crates/turbo-tasks/src/util.rs | 6 +- 7 files changed, 121 insertions(+), 29 deletions(-) diff --git a/crates/turbo-tasks-memory/src/memory_backend.rs b/crates/turbo-tasks-memory/src/memory_backend.rs index cc2f973988ac9..eccfc45e044cb 100644 --- a/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/crates/turbo-tasks-memory/src/memory_backend.rs @@ -25,7 +25,7 @@ use turbo_tasks::{ TransientTaskType, }, event::EventListener, - util::{IdFactory, NoMoveVec}, + util::{IdFactoryWithReuse, NoMoveVec}, CellId, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, Unused, }; @@ -45,7 +45,7 @@ fn prehash_task_type(task_type: PersistentTaskType) -> PreHashed, backend_jobs: NoMoveVec, - backend_job_id_factory: IdFactory, + backend_job_id_factory: IdFactoryWithReuse, task_cache: DashMap>, TaskId, BuildHasherDefault>, memory_limit: usize, @@ -65,7 +65,7 @@ impl MemoryBackend { Self { memory_tasks: NoMoveVec::new(), backend_jobs: NoMoveVec::new(), - backend_job_id_factory: IdFactory::new(), + backend_job_id_factory: IdFactoryWithReuse::new(), task_cache: DashMap::with_hasher_and_shard_amount( Default::default(), (std::thread::available_parallelism().map_or(1, usize::from) * 32) diff --git a/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs b/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs index a8fa8104b8af2..19ee36169a778 100644 --- a/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs +++ b/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs @@ -26,7 +26,7 @@ use turbo_tasks::{ ActivateResult, DeactivateResult, PersistResult, PersistTaskState, PersistedGraph, PersistedGraphApi, ReadTaskState, TaskCell, TaskData, }, - util::{IdFactory, NoMoveVec, SharedError}, + util::{IdFactoryWithReuse, NoMoveVec, SharedError}, CellId, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, Unused, }; @@ -131,7 +131,7 @@ pub struct MemoryBackendWithPersistedGraph { pub pg: P, tasks: NoMoveVec, cache: DashMap, - background_job_id_factory: IdFactory, + background_job_id_factory: IdFactoryWithReuse, background_jobs: NoMoveVec, only_known_to_memory_tasks: DashSet, /// Tasks that were selected to persist @@ -154,7 +154,7 @@ pub struct MemoryBackendWithPersistedGraph { impl MemoryBackendWithPersistedGraph

{ pub fn new(pg: P) -> Self { - let background_job_id_factory = IdFactory::new(); + let background_job_id_factory = IdFactoryWithReuse::new(); let persist_job = background_job_id_factory.get(); Self { pg, diff --git a/crates/turbo-tasks/src/id.rs b/crates/turbo-tasks/src/id.rs index 60d60725e786f..37b55723c3835 100644 --- a/crates/turbo-tasks/src/id.rs +++ b/crates/turbo-tasks/src/id.rs @@ -1,7 +1,7 @@ use std::{ fmt::{Debug, Display}, mem::transmute_copy, - num::NonZeroU32, + num::{NonZeroU32, NonZeroU64, TryFromIntError}, ops::Deref, }; @@ -17,6 +17,8 @@ macro_rules! define_id { } impl $name { + /// Constructs a wrapper type from the numeric identifier. + /// /// # Safety /// /// The passed `id` must not be zero. @@ -39,11 +41,23 @@ macro_rules! define_id { } } + /// Converts a numeric identifier to the wrapper type. + /// + /// Panics if the given id value is zero. impl From for $name { fn from(id: u32) -> Self { Self { id: NonZeroU32::new(id).expect("Ids can only be created from non zero values") } } } + + /// Converts a numeric identifier to the wrapper type. + impl TryFrom for $name { + type Error = TryFromIntError; + + fn try_from(id: NonZeroU64) -> Result { + Ok(Self { id: NonZeroU32::try_from(id)? }) + } + } }; ($name:ident) => { define_id!(internal $name); diff --git a/crates/turbo-tasks/src/id_factory.rs b/crates/turbo-tasks/src/id_factory.rs index 87203ff578c26..1abce319654b2 100644 --- a/crates/turbo-tasks/src/id_factory.rs +++ b/crates/turbo-tasks/src/id_factory.rs @@ -1,40 +1,96 @@ use std::{ + any::type_name, marker::PhantomData, - ops::Deref, - sync::atomic::{AtomicU32, Ordering}, + num::NonZeroU64, + sync::atomic::{AtomicU64, Ordering}, }; use concurrent_queue::ConcurrentQueue; -use once_cell::sync::Lazy; +/// A helper for constructing id types like [`FunctionId`][crate::FunctionId]. +/// +/// For ids that may be re-used, see [`IdFactoryWithReuse`]. pub struct IdFactory { - next_id: AtomicU32, - free_ids: Lazy>, - phantom_data: PhantomData, + next_id: AtomicU64, + _phantom_data: PhantomData, } -impl + Deref> Default for IdFactory { +impl IdFactory { + pub const fn new() -> Self { + Self { + next_id: AtomicU64::new(1), + _phantom_data: PhantomData, + } + } +} + +impl Default for IdFactory { fn default() -> Self { Self::new() } } -impl + Deref> IdFactory { +impl IdFactory +where + T: TryFrom, +{ + /// Return a unique new id. + /// + /// Panics (best-effort) if the id type overflows. + pub fn get(&self) -> T { + // Safety: u64 will not overflow. This is *very* unlikely to happen (would take + // decades). + let new_id = + unsafe { NonZeroU64::new_unchecked(self.next_id.fetch_add(1, Ordering::Relaxed)) }; + + // Use the extra bits of the AtomicU64 as cheap overflow detection when the + // value is less than 64 bits. + match new_id.try_into() { + Ok(id) => id, + Err(_) => panic!( + "Overflow detected while attempting to generate a unique {}", + type_name::(), + ), + } + } +} + +/// An [`IdFactory`], but extended with a free list to allow for id reuse for +/// ids such as [`BackendJobId`][crate::backend::BackendJobId]. +pub struct IdFactoryWithReuse { + factory: IdFactory, + free_ids: ConcurrentQueue, +} + +impl IdFactoryWithReuse { pub const fn new() -> Self { Self { - next_id: AtomicU32::new(1), - free_ids: Lazy::new(|| ConcurrentQueue::unbounded()), - phantom_data: PhantomData, + factory: IdFactory::new(), + free_ids: ConcurrentQueue::unbounded(), } } +} + +impl Default for IdFactoryWithReuse { + fn default() -> Self { + Self::new() + } +} +impl IdFactoryWithReuse +where + T: TryFrom, +{ + /// Return a new or potentially reused id. + /// + /// Panics (best-effort) if the id type overflows. pub fn get(&self) -> T { - if let Ok(id) = self.free_ids.pop() { - return id; - } - self.next_id.fetch_add(1, Ordering::Relaxed).into() + self.free_ids.pop().unwrap_or_else(|_| self.factory.get()) } + /// Add an id to the free list, allowing it to be re-used on a subsequent + /// call to [`IdFactoryWithReuse::get`]. + /// /// # Safety /// /// It must be ensured that the id is no longer used @@ -42,3 +98,21 @@ impl + Deref> IdFactory { let _ = self.free_ids.push(id); } } + +#[cfg(test)] +mod tests { + use std::num::NonZeroU8; + + use super::*; + + #[test] + #[should_panic(expected = "Overflow detected")] + fn test_overflow() { + let factory = IdFactory::::new(); + assert_eq!(factory.get(), NonZeroU8::new(1).unwrap()); + assert_eq!(factory.get(), NonZeroU8::new(2).unwrap()); + for _i in 2..256 { + factory.get(); + } + } +} diff --git a/crates/turbo-tasks/src/manager.rs b/crates/turbo-tasks/src/manager.rs index c179b871b5554..88ebb08acff09 100644 --- a/crates/turbo-tasks/src/manager.rs +++ b/crates/turbo-tasks/src/manager.rs @@ -28,7 +28,7 @@ use crate::{ capture_future::{self, CaptureFuture}, event::{Event, EventListener}, id::{BackendJobId, FunctionId, TraitTypeId}, - id_factory::IdFactory, + id_factory::IdFactoryWithReuse, raw_vc::{CellId, RawVc}, registry, trace::TraceRawVcs, @@ -133,7 +133,7 @@ pub trait TaskIdProvider { fn reuse_task_id(&self, id: Unused); } -impl TaskIdProvider for IdFactory { +impl TaskIdProvider for IdFactoryWithReuse { fn get_fresh_task_id(&self) -> Unused { // Safety: This is a fresh id from the factory unsafe { Unused::new_unchecked(self.get()) } @@ -234,7 +234,7 @@ pub struct UpdateInfo { pub struct TurboTasks { this: Weak, backend: B, - task_id_factory: IdFactory, + task_id_factory: IdFactoryWithReuse, stopped: AtomicBool, currently_scheduled_tasks: AtomicUsize, currently_scheduled_foreground_jobs: AtomicUsize, @@ -279,7 +279,7 @@ impl TurboTasks { // so we probably want to make sure that all tasks are joined // when trying to drop turbo tasks pub fn new(mut backend: B) -> Arc { - let task_id_factory = IdFactory::new(); + let task_id_factory = IdFactoryWithReuse::new(); backend.initialize(&task_id_factory); let this = Arc::new_cyclic(|this| Self { this: this.clone(), diff --git a/crates/turbo-tasks/src/registry.rs b/crates/turbo-tasks/src/registry.rs index df2f3f16a3201..04cc6e995c2ca 100644 --- a/crates/turbo-tasks/src/registry.rs +++ b/crates/turbo-tasks/src/registry.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, hash::Hash, ops::Deref}; +use std::{fmt::Debug, hash::Hash, num::NonZeroU64, ops::Deref}; use dashmap::{mapref::entry::Entry, DashMap}; use once_cell::sync::Lazy; @@ -30,7 +30,7 @@ static TRAIT_TYPES_BY_VALUE: Lazy> = static TRAIT_TYPES: Lazy> = Lazy::new(NoMoveVec::new); fn register_thing< - K: From + Deref + Sync + Send + Copy, + K: TryFrom + Deref + Sync + Send + Copy, V: Clone + Hash + Ord + Eq + Sync + Send + Copy, const INITIAL_CAPACITY_BITS: u32, >( diff --git a/crates/turbo-tasks/src/util.rs b/crates/turbo-tasks/src/util.rs index 7fdf45bf9dc20..8c05fccc829ca 100644 --- a/crates/turbo-tasks/src/util.rs +++ b/crates/turbo-tasks/src/util.rs @@ -14,7 +14,11 @@ use anyhow::{anyhow, Error}; use pin_project_lite::pin_project; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -pub use super::{id_factory::IdFactory, no_move_vec::NoMoveVec, once_map::*}; +pub use super::{ + id_factory::{IdFactory, IdFactoryWithReuse}, + no_move_vec::NoMoveVec, + once_map::*, +}; /// A error struct that is backed by an Arc to allow cloning errors #[derive(Debug, Clone)]