From 9ee68c2da7dc6f913b8c2dd51dc79272f9fa8548 Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Tue, 16 Jul 2024 13:49:55 -0700 Subject: [PATCH 1/2] Rewrite IdFactory and IdFactoryWithReuse --- Cargo.lock | 6 +- Cargo.toml | 2 +- .../turbo-tasks-memory/src/memory_backend.rs | 6 +- .../src/memory_backend_with_pg.rs | 6 +- crates/turbo-tasks/src/id.rs | 16 +++- crates/turbo-tasks/src/id_factory.rs | 86 +++++++++++++++---- crates/turbo-tasks/src/manager.rs | 8 +- crates/turbo-tasks/src/registry.rs | 4 +- crates/turbo-tasks/src/util.rs | 6 +- 9 files changed, 107 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 16dc4aabb6387..8ea9451c639cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1537,9 +1537,9 @@ dependencies = [ [[package]] name = "concurrent-queue" -version = "2.1.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c278839b831783b70278b14df4d45e1beb1aad306c07bb796637de9a0e323e8e" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" dependencies = [ "crossbeam-utils", ] @@ -11537,7 +11537,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", - "rand 0.4.6", + "rand 0.8.5", "static_assertions", ] diff --git a/Cargo.toml b/Cargo.toml index 7879564321f06..a58e59a6f3613 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -226,7 +226,7 @@ camino = { version = "1.1.4", features = ["serde1"] } chrono = "0.4.23" clap = "4.5.2" clap_complete = "4.5.1" -concurrent-queue = "2.1.0" +concurrent-queue = "2.5.0" console = "0.15.5" console-subscriber = "0.1.8" criterion = "0.4.0" diff --git a/crates/turbo-tasks-memory/src/memory_backend.rs b/crates/turbo-tasks-memory/src/memory_backend.rs index cc2f973988ac9..eccfc45e044cb 100644 --- a/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/crates/turbo-tasks-memory/src/memory_backend.rs @@ -25,7 +25,7 @@ use turbo_tasks::{ TransientTaskType, }, event::EventListener, - util::{IdFactory, NoMoveVec}, + util::{IdFactoryWithReuse, NoMoveVec}, CellId, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, Unused, }; @@ -45,7 +45,7 @@ fn prehash_task_type(task_type: PersistentTaskType) -> PreHashed, backend_jobs: NoMoveVec, - backend_job_id_factory: IdFactory, + backend_job_id_factory: IdFactoryWithReuse, task_cache: DashMap>, TaskId, BuildHasherDefault>, memory_limit: usize, @@ -65,7 +65,7 @@ impl MemoryBackend { Self { memory_tasks: NoMoveVec::new(), backend_jobs: NoMoveVec::new(), - backend_job_id_factory: IdFactory::new(), + backend_job_id_factory: IdFactoryWithReuse::new(), task_cache: DashMap::with_hasher_and_shard_amount( Default::default(), (std::thread::available_parallelism().map_or(1, usize::from) * 32) diff --git a/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs b/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs index a8fa8104b8af2..19ee36169a778 100644 --- a/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs +++ b/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs @@ -26,7 +26,7 @@ use turbo_tasks::{ ActivateResult, DeactivateResult, PersistResult, PersistTaskState, PersistedGraph, PersistedGraphApi, ReadTaskState, TaskCell, TaskData, }, - util::{IdFactory, NoMoveVec, SharedError}, + util::{IdFactoryWithReuse, NoMoveVec, SharedError}, CellId, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, Unused, }; @@ -131,7 +131,7 @@ pub struct MemoryBackendWithPersistedGraph { pub pg: P, tasks: NoMoveVec, cache: DashMap, - background_job_id_factory: IdFactory, + background_job_id_factory: IdFactoryWithReuse, background_jobs: NoMoveVec, only_known_to_memory_tasks: DashSet, /// Tasks that were selected to persist @@ -154,7 +154,7 @@ pub struct MemoryBackendWithPersistedGraph { impl MemoryBackendWithPersistedGraph

{ pub fn new(pg: P) -> Self { - let background_job_id_factory = IdFactory::new(); + let background_job_id_factory = IdFactoryWithReuse::new(); let persist_job = background_job_id_factory.get(); Self { pg, diff --git a/crates/turbo-tasks/src/id.rs b/crates/turbo-tasks/src/id.rs index 60d60725e786f..37b55723c3835 100644 --- a/crates/turbo-tasks/src/id.rs +++ b/crates/turbo-tasks/src/id.rs @@ -1,7 +1,7 @@ use std::{ fmt::{Debug, Display}, mem::transmute_copy, - num::NonZeroU32, + num::{NonZeroU32, NonZeroU64, TryFromIntError}, ops::Deref, }; @@ -17,6 +17,8 @@ macro_rules! define_id { } impl $name { + /// Constructs a wrapper type from the numeric identifier. + /// /// # Safety /// /// The passed `id` must not be zero. @@ -39,11 +41,23 @@ macro_rules! define_id { } } + /// Converts a numeric identifier to the wrapper type. + /// + /// Panics if the given id value is zero. impl From for $name { fn from(id: u32) -> Self { Self { id: NonZeroU32::new(id).expect("Ids can only be created from non zero values") } } } + + /// Converts a numeric identifier to the wrapper type. + impl TryFrom for $name { + type Error = TryFromIntError; + + fn try_from(id: NonZeroU64) -> Result { + Ok(Self { id: NonZeroU32::try_from(id)? }) + } + } }; ($name:ident) => { define_id!(internal $name); diff --git a/crates/turbo-tasks/src/id_factory.rs b/crates/turbo-tasks/src/id_factory.rs index 87203ff578c26..0c9b0441f8e85 100644 --- a/crates/turbo-tasks/src/id_factory.rs +++ b/crates/turbo-tasks/src/id_factory.rs @@ -1,40 +1,96 @@ use std::{ + any::type_name, marker::PhantomData, - ops::Deref, - sync::atomic::{AtomicU32, Ordering}, + num::NonZeroU64, + sync::atomic::{AtomicU64, Ordering}, }; use concurrent_queue::ConcurrentQueue; -use once_cell::sync::Lazy; +/// A helper for constructing id types like [`FunctionId`][crate::FunctionId]. +/// +/// For ids that may be re-used, see [`IdFactoryWithReuse`]. pub struct IdFactory { - next_id: AtomicU32, - free_ids: Lazy>, - phantom_data: PhantomData, + next_id: AtomicU64, + _phantom_data: PhantomData, } -impl + Deref> Default for IdFactory { +impl IdFactory { + pub const fn new() -> Self { + Self { + next_id: AtomicU64::new(1), + _phantom_data: PhantomData, + } + } +} + +impl Default for IdFactory { fn default() -> Self { Self::new() } } -impl + Deref> IdFactory { +impl IdFactory +where + T: TryFrom, +{ + /// Return a unique new id. + /// + /// Panics (best-effort) if the id type overflows. + pub fn get(&self) -> T { + // Safety: u64 will not overflow. This is *very* unlikely to happen (would take + // decades). + let new_id = + unsafe { NonZeroU64::new_unchecked(self.next_id.fetch_add(1, Ordering::Relaxed)) }; + + // Use the extra bits of the AtomicU64 as cheap overflow detection when the + // value is less than 64 bits. + match new_id.try_into() { + Ok(id) => id, + Err(_) => panic!( + "Overflow detected while attempting to generate a unique {}", + type_name::(), + ), + } + } +} + +/// An [`IdFactory`], but extended with a free list to allow for id reuse for +/// ids such as [`BackendJobId`][crate::backend::BackendJobId]. +pub struct IdFactoryWithReuse { + factory: IdFactory, + free_ids: ConcurrentQueue, +} + +impl IdFactoryWithReuse { pub const fn new() -> Self { Self { - next_id: AtomicU32::new(1), - free_ids: Lazy::new(|| ConcurrentQueue::unbounded()), - phantom_data: PhantomData, + factory: IdFactory::new(), + free_ids: ConcurrentQueue::unbounded(), } } +} + +impl Default for IdFactoryWithReuse { + fn default() -> Self { + Self::new() + } +} +impl IdFactoryWithReuse +where + T: TryFrom, +{ + /// Return a new or potentially reused id. + /// + /// Panics (best-effort) if the id type overflows. pub fn get(&self) -> T { - if let Ok(id) = self.free_ids.pop() { - return id; - } - self.next_id.fetch_add(1, Ordering::Relaxed).into() + self.free_ids.pop().unwrap_or_else(|_| self.factory.get()) } + /// Add an id to the free list, allowing it to be re-used on a subsequent + /// call to [`IdFactoryWithReuse::get`]. + /// /// # Safety /// /// It must be ensured that the id is no longer used diff --git a/crates/turbo-tasks/src/manager.rs b/crates/turbo-tasks/src/manager.rs index c179b871b5554..88ebb08acff09 100644 --- a/crates/turbo-tasks/src/manager.rs +++ b/crates/turbo-tasks/src/manager.rs @@ -28,7 +28,7 @@ use crate::{ capture_future::{self, CaptureFuture}, event::{Event, EventListener}, id::{BackendJobId, FunctionId, TraitTypeId}, - id_factory::IdFactory, + id_factory::IdFactoryWithReuse, raw_vc::{CellId, RawVc}, registry, trace::TraceRawVcs, @@ -133,7 +133,7 @@ pub trait TaskIdProvider { fn reuse_task_id(&self, id: Unused); } -impl TaskIdProvider for IdFactory { +impl TaskIdProvider for IdFactoryWithReuse { fn get_fresh_task_id(&self) -> Unused { // Safety: This is a fresh id from the factory unsafe { Unused::new_unchecked(self.get()) } @@ -234,7 +234,7 @@ pub struct UpdateInfo { pub struct TurboTasks { this: Weak, backend: B, - task_id_factory: IdFactory, + task_id_factory: IdFactoryWithReuse, stopped: AtomicBool, currently_scheduled_tasks: AtomicUsize, currently_scheduled_foreground_jobs: AtomicUsize, @@ -279,7 +279,7 @@ impl TurboTasks { // so we probably want to make sure that all tasks are joined // when trying to drop turbo tasks pub fn new(mut backend: B) -> Arc { - let task_id_factory = IdFactory::new(); + let task_id_factory = IdFactoryWithReuse::new(); backend.initialize(&task_id_factory); let this = Arc::new_cyclic(|this| Self { this: this.clone(), diff --git a/crates/turbo-tasks/src/registry.rs b/crates/turbo-tasks/src/registry.rs index df2f3f16a3201..04cc6e995c2ca 100644 --- a/crates/turbo-tasks/src/registry.rs +++ b/crates/turbo-tasks/src/registry.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, hash::Hash, ops::Deref}; +use std::{fmt::Debug, hash::Hash, num::NonZeroU64, ops::Deref}; use dashmap::{mapref::entry::Entry, DashMap}; use once_cell::sync::Lazy; @@ -30,7 +30,7 @@ static TRAIT_TYPES_BY_VALUE: Lazy> = static TRAIT_TYPES: Lazy> = Lazy::new(NoMoveVec::new); fn register_thing< - K: From + Deref + Sync + Send + Copy, + K: TryFrom + Deref + Sync + Send + Copy, V: Clone + Hash + Ord + Eq + Sync + Send + Copy, const INITIAL_CAPACITY_BITS: u32, >( diff --git a/crates/turbo-tasks/src/util.rs b/crates/turbo-tasks/src/util.rs index 7fdf45bf9dc20..8c05fccc829ca 100644 --- a/crates/turbo-tasks/src/util.rs +++ b/crates/turbo-tasks/src/util.rs @@ -14,7 +14,11 @@ use anyhow::{anyhow, Error}; use pin_project_lite::pin_project; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -pub use super::{id_factory::IdFactory, no_move_vec::NoMoveVec, once_map::*}; +pub use super::{ + id_factory::{IdFactory, IdFactoryWithReuse}, + no_move_vec::NoMoveVec, + once_map::*, +}; /// A error struct that is backed by an Arc to allow cloning errors #[derive(Debug, Clone)] From 7aa56588a18308d6b449f737a8e62eac137ce7f9 Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Tue, 16 Jul 2024 14:23:37 -0700 Subject: [PATCH 2/2] Add unit test for overflow detection --- crates/turbo-tasks/src/id_factory.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/crates/turbo-tasks/src/id_factory.rs b/crates/turbo-tasks/src/id_factory.rs index 0c9b0441f8e85..1abce319654b2 100644 --- a/crates/turbo-tasks/src/id_factory.rs +++ b/crates/turbo-tasks/src/id_factory.rs @@ -98,3 +98,21 @@ where let _ = self.free_ids.push(id); } } + +#[cfg(test)] +mod tests { + use std::num::NonZeroU8; + + use super::*; + + #[test] + #[should_panic(expected = "Overflow detected")] + fn test_overflow() { + let factory = IdFactory::::new(); + assert_eq!(factory.get(), NonZeroU8::new(1).unwrap()); + assert_eq!(factory.get(), NonZeroU8::new(2).unwrap()); + for _i in 2..256 { + factory.get(); + } + } +}