Skip to content

Commit

Permalink
use different task ids for persistent and transient tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Jul 17, 2024
1 parent 4c95906 commit 84301f5
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 59 deletions.
6 changes: 3 additions & 3 deletions crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl MemoryBackend {
unsafe {
self.memory_tasks.remove(*new_id as usize);
let new_id = Unused::new_unchecked(new_id);
turbo_tasks.reuse_task_id(new_id);
turbo_tasks.reuse_persistent_task_id(new_id);
}
task_id
}
Expand Down Expand Up @@ -612,7 +612,7 @@ impl Backend for MemoryBackend {
let (task_type_hash, task_type) = PreHashed::into_parts(task_type);
let task_type = Arc::new(PreHashed::new(task_type_hash, task_type));
// slow pass with key lock
let id = turbo_tasks.get_fresh_task_id();
let id = turbo_tasks.get_fresh_persistent_task_id();
let task = Task::new_persistent(
// Safety: That task will hold the value, but we are still in
// control of the task
Expand Down Expand Up @@ -652,7 +652,7 @@ impl Backend for MemoryBackend {
task_type: TransientTaskType,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> TaskId {
let id = turbo_tasks.get_fresh_task_id();
let id = turbo_tasks.get_fresh_transient_task_id();
let id = id.into();
match task_type {
TransientTaskType::Root(f) => {
Expand Down
6 changes: 3 additions & 3 deletions crates/turbo-tasks/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use crate::{
raw_vc::CellId,
registry,
trait_helpers::{get_trait_method, has_trait, traits},
FunctionId, RawVc, ReadRef, SharedReference, TaskId, TaskIdProvider, TaskIdSet, TraitRef,
TraitTypeId, ValueTypeId, VcValueTrait, VcValueType,
FunctionId, RawVc, ReadRef, SharedReference, TaskId, TaskIdSet, TraitRef, TraitTypeId,
ValueTypeId, VcValueTrait, VcValueType,
};

pub enum TaskType {
Expand Down Expand Up @@ -380,7 +380,7 @@ impl CellContent {

pub trait Backend: Sync + Send {
#[allow(unused_variables)]
fn initialize(&mut self, task_id_provider: &dyn TaskIdProvider) {}
fn initialize(&mut self) {}

#[allow(unused_variables)]
fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {}
Expand Down
8 changes: 8 additions & 0 deletions crates/turbo-tasks/src/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ impl Debug for TaskId {
}
}

pub const TRANSIENT_TASK_BIT: u32 = 0x8000_0000;

impl TaskId {
pub fn is_transient(&self) -> bool {
**self & TRANSIENT_TASK_BIT != 0
}
}

macro_rules! make_serializable {
($ty:ty, $get_global_name:path, $get_id:path, $visitor_name:ident) => {
impl Serialize for $ty {
Expand Down
30 changes: 26 additions & 4 deletions crates/turbo-tasks/src/id_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ use concurrent_queue::ConcurrentQueue;
/// For ids that may be re-used, see [`IdFactoryWithReuse`].
pub struct IdFactory<T> {
next_id: AtomicU64,
max_id: u64,
_phantom_data: PhantomData<T>,
}

impl<T> IdFactory<T> {
pub const fn new() -> Self {
Self::new_with_range(1, u32::MAX as u64)
}

pub const fn new_with_range(start: u64, max: u64) -> Self {
Self {
next_id: AtomicU64::new(1),
next_id: AtomicU64::new(start as u64),
max_id: max as u64,
_phantom_data: PhantomData,
}
}
Expand All @@ -38,10 +44,18 @@ where
///
/// Panics (best-effort) if the id type overflows.
pub fn get(&self) -> T {
let new_id = self.next_id.fetch_add(1, Ordering::Relaxed);

if new_id > self.max_id {
panic!(
"Max id limit hit while attempting to generate a unique {}",
type_name::<T>(),
)
}

// Safety: u64 will not overflow. This is *very* unlikely to happen (would take
// decades).
let new_id =
unsafe { NonZeroU64::new_unchecked(self.next_id.fetch_add(1, Ordering::Relaxed)) };
let new_id = unsafe { NonZeroU64::new_unchecked(new_id) };

// Use the extra bits of the AtomicU64 as cheap overflow detection when the
// value is less than 64 bits.
Expand Down Expand Up @@ -69,6 +83,13 @@ impl<T> IdFactoryWithReuse<T> {
free_ids: ConcurrentQueue::unbounded(),
}
}

pub const fn new_with_range(start: u64, max: u64) -> Self {
Self {
factory: IdFactory::new_with_range(start, max),
free_ids: ConcurrentQueue::unbounded(),
}
}
}

impl<T> Default for IdFactoryWithReuse<T> {
Expand All @@ -93,7 +114,8 @@ where
///
/// # Safety
///
/// It must be ensured that the id is no longer used
/// It must be ensured that the id is no longer used. Id must be a valid id
/// that was previously returned by `get`.
pub unsafe fn reuse(&self, id: T) {
let _ = self.free_ids.push(id);
}
Expand Down
4 changes: 2 additions & 2 deletions crates/turbo-tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ pub use magic_any::MagicAny;
pub use manager::{
dynamic_call, dynamic_this_call, emit, get_invalidator, mark_finished, mark_stateful,
prevent_gc, run_once, run_once_with_reason, spawn_blocking, spawn_thread, trait_call,
turbo_tasks, CurrentCellRef, Invalidator, TaskIdProvider, TurboTasks, TurboTasksApi,
TurboTasksBackendApi, TurboTasksCallApi, Unused, UpdateInfo,
turbo_tasks, CurrentCellRef, Invalidator, TurboTasks, TurboTasksApi, TurboTasksBackendApi,
TurboTasksCallApi, Unused, UpdateInfo,
};
pub use native_function::NativeFunction;
pub use raw_vc::{CellId, RawVc, ReadRawVcFuture, ResolveTypeError};
Expand Down
74 changes: 27 additions & 47 deletions crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
backend::{Backend, CellContent, PersistentTaskType, TaskExecutionSpec, TransientTaskType},
capture_future::{self, CaptureFuture},
event::{Event, EventListener},
id::{BackendJobId, FunctionId, TraitTypeId},
id::{BackendJobId, FunctionId, TraitTypeId, TRANSIENT_TASK_BIT},
id_factory::IdFactoryWithReuse,
magic_any::MagicAny,
raw_vc::{CellId, RawVc},
Expand Down Expand Up @@ -133,22 +133,6 @@ pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
}

pub trait TaskIdProvider {
fn get_fresh_task_id(&self) -> Unused<TaskId>;
fn reuse_task_id(&self, id: Unused<TaskId>);
}

impl TaskIdProvider for IdFactoryWithReuse<TaskId> {
fn get_fresh_task_id(&self) -> Unused<TaskId> {
// Safety: This is a fresh id from the factory
unsafe { Unused::new_unchecked(self.get()) }
}

fn reuse_task_id(&self, id: Unused<TaskId>) {
unsafe { self.reuse(id.into()) }
}
}

/// A wrapper around a value that is unused.
pub struct Unused<T> {
inner: T,
Expand Down Expand Up @@ -179,11 +163,14 @@ impl<T> Unused<T> {
}
}

pub trait TurboTasksBackendApi<B: Backend + 'static>:
TaskIdProvider + TurboTasksCallApi + Sync + Send
{
pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync + Send {
fn pin(&self) -> Arc<dyn TurboTasksBackendApi<B>>;

fn get_fresh_persistent_task_id(&self) -> Unused<TaskId>;
fn get_fresh_transient_task_id(&self) -> Unused<TaskId>;
unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>);
unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>);

fn schedule(&self, task: TaskId);
fn schedule_backend_background_job(&self, id: BackendJobId);
fn schedule_backend_foreground_job(&self, id: BackendJobId);
Expand All @@ -207,26 +194,6 @@ pub trait TurboTasksBackendApi<B: Backend + 'static>:
fn backend(&self) -> &B;
}

impl<B: Backend + 'static> TaskIdProvider for &dyn TurboTasksBackendApi<B> {
fn get_fresh_task_id(&self) -> Unused<TaskId> {
(*self).get_fresh_task_id()
}

fn reuse_task_id(&self, id: Unused<TaskId>) {
(*self).reuse_task_id(id)
}
}

impl TaskIdProvider for &dyn TaskIdProvider {
fn get_fresh_task_id(&self) -> Unused<TaskId> {
(*self).get_fresh_task_id()
}

fn reuse_task_id(&self, id: Unused<TaskId>) {
(*self).reuse_task_id(id)
}
}

#[allow(clippy::manual_non_exhaustive)]
pub struct UpdateInfo {
pub duration: Duration,
Expand All @@ -240,6 +207,7 @@ pub struct TurboTasks<B: Backend + 'static> {
this: Weak<Self>,
backend: B,
task_id_factory: IdFactoryWithReuse<TaskId>,
transient_task_id_factory: IdFactoryWithReuse<TaskId>,
stopped: AtomicBool,
currently_scheduled_tasks: AtomicUsize,
currently_scheduled_foreground_jobs: AtomicUsize,
Expand Down Expand Up @@ -284,12 +252,16 @@ impl<B: Backend + 'static> TurboTasks<B> {
// so we probably want to make sure that all tasks are joined
// when trying to drop turbo tasks
pub fn new(mut backend: B) -> Arc<Self> {
let task_id_factory = IdFactoryWithReuse::new();
backend.initialize(&task_id_factory);
let task_id_factory =
IdFactoryWithReuse::new_with_range(1, (TRANSIENT_TASK_BIT - 1) as u64);
let transient_task_id_factory =
IdFactoryWithReuse::new_with_range(TRANSIENT_TASK_BIT as u64, u32::MAX as u64);
backend.initialize();
let this = Arc::new_cyclic(|this| Self {
this: this.clone(),
backend,
task_id_factory,
transient_task_id_factory,
stopped: AtomicBool::new(false),
currently_scheduled_tasks: AtomicUsize::new(0),
currently_scheduled_background_jobs: AtomicUsize::new(0),
Expand Down Expand Up @@ -1079,6 +1051,7 @@ impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
fn backend(&self) -> &B {
&self.backend
}

#[track_caller]
fn schedule_backend_background_job(&self, id: BackendJobId) {
self.schedule_background_job(move |this| async move {
Expand Down Expand Up @@ -1166,17 +1139,24 @@ impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
fn program_duration_until(&self, instant: Instant) -> Duration {
instant - self.program_start
}
}

impl<B: Backend + 'static> TaskIdProvider for TurboTasks<B> {
fn get_fresh_task_id(&self) -> Unused<TaskId> {
// Safety: This is a fresh id from the factory
fn get_fresh_persistent_task_id(&self) -> Unused<TaskId> {
// SAFETY: This is a fresh id from the factory
unsafe { Unused::new_unchecked(self.task_id_factory.get()) }
}

fn reuse_task_id(&self, id: Unused<TaskId>) {
fn get_fresh_transient_task_id(&self) -> Unused<TaskId> {
// SAFETY: This is a fresh id from the factory
unsafe { Unused::new_unchecked(self.transient_task_id_factory.get()) }
}

unsafe fn reuse_persistent_task_id(&self, id: Unused<TaskId>) {
unsafe { self.task_id_factory.reuse(id.into()) }
}

unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
unsafe { self.transient_task_id_factory.reuse(id.into()) }
}
}

pub(crate) fn current_task(from: &str) -> TaskId {
Expand Down

0 comments on commit 84301f5

Please sign in to comment.