From e3616fc27b34df58a77730f7fb190dce4eeea1c6 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Mon, 22 Jul 2024 16:26:21 +0200 Subject: [PATCH] Store transient tasks separately Rename PersistentTaskType -> CachedTaskType --- crates/turbo-tasks-macros/src/func.rs | 66 +++-- .../turbo-tasks-memory/src/memory_backend.rs | 237 ++++++++++++------ crates/turbo-tasks-memory/src/task.rs | 51 ++-- .../tests/task_statistics.rs | 4 +- crates/turbo-tasks/src/backend.rs | 51 ++-- crates/turbo-tasks/src/lib.rs | 5 +- crates/turbo-tasks/src/manager.rs | 207 +++++++++++++-- crates/turbo-tasks/src/persisted_graph.rs | 18 +- crates/turbo-tasks/src/task/task_input.rs | 6 +- 9 files changed, 480 insertions(+), 165 deletions(-) diff --git a/crates/turbo-tasks-macros/src/func.rs b/crates/turbo-tasks-macros/src/func.rs index d6840ded4fe59a..356a1840bc678c 100644 --- a/crates/turbo-tasks-macros/src/func.rs +++ b/crates/turbo-tasks-macros/src/func.rs @@ -319,13 +319,27 @@ impl TurboFn { let inputs = self.inputs(); parse_quote! { { + let turbo_tasks_transient = #( turbo_tasks::TaskInput::is_transient(&#inputs) ||)* false; + let turbo_tasks_trait = *#trait_type_id_ident; + let turbo_tasks_name = std::borrow::Cow::Borrowed(stringify!(#ident)); + let turbo_tasks_this = #converted_this; + let turbo_tasks_arg = Box::new((#(#inputs,)*)) as Box; <#output as turbo_tasks::task::TaskOutput>::try_from_raw_vc( - turbo_tasks::trait_call( - *#trait_type_id_ident, - std::borrow::Cow::Borrowed(stringify!(#ident)), - #converted_this, - Box::new((#(#inputs,)*)) as Box, - ) + if turbo_tasks_transient { + turbo_tasks::transient_trait_call( + turbo_tasks_trait, + turbo_tasks_name, + turbo_tasks_this, + turbo_tasks_arg, + ) + } else { + turbo_tasks::trait_call( + turbo_tasks_trait, + turbo_tasks_name, + turbo_tasks_this, + turbo_tasks_arg, + ) + } ) } } @@ -346,23 +360,45 @@ impl TurboFn { if let Some(converted_this) = self.converted_this() { parse_quote! { { + let turbo_tasks_transient = #( turbo_tasks::TaskInput::is_transient(&#inputs) ||)* false; + let turbo_tasks_func = *#native_function_id_ident; + let turbo_tasks_this = #converted_this; + let turbo_tasks_arg = Box::new((#(#inputs,)*)) as Box; <#output as turbo_tasks::task::TaskOutput>::try_from_raw_vc( - turbo_tasks::dynamic_this_call( - *#native_function_id_ident, - #converted_this, - Box::new((#(#inputs,)*)) as Box, - ) + if turbo_tasks_transient { + turbo_tasks::transient_dynamic_this_call( + turbo_tasks_func, + turbo_tasks_this, + turbo_tasks_arg, + ) + } else { + turbo_tasks::dynamic_this_call( + turbo_tasks_func, + turbo_tasks_this, + turbo_tasks_arg, + ) + } ) } } } else { parse_quote! { { + let turbo_tasks_transient = #( turbo_tasks::TaskInput::is_transient(&#inputs) ||)* false; + let turbo_tasks_func = *#native_function_id_ident; + let turbo_tasks_arg = Box::new((#(#inputs,)*)) as Box; <#output as turbo_tasks::task::TaskOutput>::try_from_raw_vc( - turbo_tasks::dynamic_call( - *#native_function_id_ident, - Box::new((#(#inputs,)*)) as Box, - ) + if turbo_tasks_transient { + turbo_tasks::transient_dynamic_call( + turbo_tasks_func, + turbo_tasks_arg, + ) + } else { + turbo_tasks::dynamic_call( + turbo_tasks_func, + turbo_tasks_arg, + ) + } ) } } diff --git a/crates/turbo-tasks-memory/src/memory_backend.rs b/crates/turbo-tasks-memory/src/memory_backend.rs index 059dbd9a8ca90a..41a70d3cbc0193 100644 --- a/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/crates/turbo-tasks-memory/src/memory_backend.rs @@ -20,12 +20,13 @@ use tracing::trace_span; use turbo_prehash::{BuildHasherExt, PassThroughHash, PreHashed}; use turbo_tasks::{ backend::{ - Backend, BackendJobId, CellContent, PersistentTaskType, TaskCollectiblesMap, - TaskExecutionSpec, TransientTaskType, + Backend, BackendJobId, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec, + TransientTaskType, }, event::EventListener, util::{IdFactoryWithReuse, NoMoveVec}, CellId, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, Unused, + TRANSIENT_TASK_BIT, }; use crate::{ @@ -37,16 +38,19 @@ use crate::{ task_statistics::TaskStatisticsApi, }; -fn prehash_task_type(task_type: PersistentTaskType) -> PreHashed { +fn prehash_task_type(task_type: CachedTaskType) -> PreHashed { BuildHasherDefault::::prehash(&Default::default(), task_type) } pub struct MemoryBackend { - memory_tasks: NoMoveVec, + persistent_tasks: NoMoveVec, + transient_tasks: NoMoveVec, backend_jobs: NoMoveVec, backend_job_id_factory: IdFactoryWithReuse, task_cache: - DashMap>, TaskId, BuildHasherDefault>, + DashMap>, TaskId, BuildHasherDefault>, + transient_task_cache: + DashMap>, TaskId, BuildHasherDefault>, memory_limit: usize, gc_queue: Option, idle_gc_active: AtomicBool, @@ -61,14 +65,17 @@ impl Default for MemoryBackend { impl MemoryBackend { pub fn new(memory_limit: usize) -> Self { + let shard_amount = + (std::thread::available_parallelism().map_or(1, usize::from) * 32).next_power_of_two(); Self { - memory_tasks: NoMoveVec::new(), + persistent_tasks: NoMoveVec::new(), + transient_tasks: NoMoveVec::new(), backend_jobs: NoMoveVec::new(), backend_job_id_factory: IdFactoryWithReuse::new(), - task_cache: DashMap::with_hasher_and_shard_amount( + task_cache: DashMap::with_hasher_and_shard_amount(Default::default(), shard_amount), + transient_task_cache: DashMap::with_hasher_and_shard_amount( Default::default(), - (std::thread::available_parallelism().map_or(1, usize::from) * 32) - .next_power_of_two(), + shard_amount, ), memory_limit, gc_queue: (memory_limit != usize::MAX).then(GcQueue::new), @@ -119,16 +126,33 @@ impl MemoryBackend { for id in self.task_cache.clone().into_read_only().values() { func(*id); } + for id in self.transient_task_cache.clone().into_read_only().values() { + func(*id); + } } #[inline(always)] pub fn with_task(&self, id: TaskId, func: impl FnOnce(&Task) -> T) -> T { - func(self.memory_tasks.get(*id as usize).unwrap()) + let value = *id; + let index = (value & !TRANSIENT_TASK_BIT) as usize; + let item = if value & TRANSIENT_TASK_BIT == 0 { + self.persistent_tasks.get(index) + } else { + self.transient_tasks.get(index) + }; + func(item.unwrap()) } #[inline(always)] pub fn task(&self, id: TaskId) -> &Task { - self.memory_tasks.get(*id as usize).unwrap() + let value = *id; + let index = (value & !TRANSIENT_TASK_BIT) as usize; + let item = if value & TRANSIENT_TASK_BIT == 0 { + self.persistent_tasks.get(index) + } else { + self.transient_tasks.get(index) + }; + item.unwrap() } /// Runs the garbage collection until reaching the target memory. An `idle` @@ -167,18 +191,21 @@ impl MemoryBackend { false } - fn insert_and_connect_fresh_task( + fn insert_and_connect_fresh_task( &self, parent_task: TaskId, task_cache: &DashMap, + task_storage: &NoMoveVec, + task_storage_offset: u32, key: K, new_id: Unused, task: Task, turbo_tasks: &dyn TurboTasksBackendApi, ) -> TaskId { let new_id = new_id.into(); + let index = (*new_id - task_storage_offset) as usize; // Safety: We have a fresh task id that nobody knows about yet - unsafe { self.memory_tasks.insert(*new_id as usize, task) }; + unsafe { task_storage.insert(index, task) }; let result_task = match task_cache.entry(key) { Entry::Vacant(entry) => { // This is the most likely case @@ -190,7 +217,7 @@ impl MemoryBackend { let task_id = *entry.get(); drop(entry); unsafe { - self.memory_tasks.remove(*new_id as usize); + task_storage.remove(index); let new_id = Unused::new_unchecked(new_id); turbo_tasks.reuse_persistent_task_id(new_id); } @@ -238,6 +265,74 @@ impl MemoryBackend { pub fn task_statistics(&self) -> &TaskStatisticsApi { &self.task_statistics } + + fn track_cache_hit( + &self, + task_type: &PreHashed, + turbo_tasks: &dyn TurboTasksBackendApi, + ) { + self.task_statistics().map(|stats| match &**task_type { + CachedTaskType::ResolveNative { + fn_type: function_id, + this: _, + arg: _, + } + | CachedTaskType::Native { + fn_type: function_id, + this: _, + arg: _, + } => { + stats.increment_cache_hit(*function_id); + } + CachedTaskType::ResolveTrait { + trait_type, + method_name: name, + this, + arg: _, + } => { + // HACK: Resolve the this argument (`self`) in order to attribute the cache hit + // to the concrete trait implementation, rather than the dynamic trait method. + // This ensures cache hits and misses are both attributed to the same thing. + // + // Because this task already resolved, in most cases `self` should either be + // resolved, or already in the process of being resolved. + // + // However, `self` could become unloaded due to cache eviction, and this might + // trigger an otherwise unnecessary re-evalutation. + // + // This is a potentially okay trade-off as long as we don't log statistics by + // default. The alternative would be to store function ids on completed + // ResolveTrait tasks. + let trait_type = *trait_type; + let name = name.clone(); + let this = *this; + let stats = Arc::clone(stats); + turbo_tasks.run_once(Box::pin(async move { + let function_id = + CachedTaskType::resolve_trait_method(trait_type, name, this).await?; + stats.increment_cache_hit(function_id); + Ok(()) + })); + } + }); + } + + fn track_cache_miss(&self, task_type: &PreHashed) { + self.task_statistics().map(|stats| match &**task_type { + CachedTaskType::Native { + fn_type: function_id, + this: _, + arg: _, + } => { + stats.increment_cache_miss(*function_id); + } + CachedTaskType::ResolveTrait { .. } | CachedTaskType::ResolveNative { .. } => { + // these types re-execute themselves as `Native` after + // resolving their arguments, skip counting their + // executions here to avoid double-counting + } + }); + } } impl Backend for MemoryBackend { @@ -536,7 +631,7 @@ impl Backend for MemoryBackend { fn get_or_create_persistent_task( &self, - task_type: PersistentTaskType, + task_type: CachedTaskType, parent_task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi, ) -> TaskId { @@ -545,68 +640,10 @@ impl Backend for MemoryBackend { self.lookup_and_connect_task(parent_task, &self.task_cache, &task_type, turbo_tasks) { // fast pass without creating a new task - self.task_statistics().map(|stats| match &*task_type { - PersistentTaskType::ResolveNative { - fn_type: function_id, - this: _, - arg: _, - } - | PersistentTaskType::Native { - fn_type: function_id, - this: _, - arg: _, - } => { - stats.increment_cache_hit(*function_id); - } - PersistentTaskType::ResolveTrait { - trait_type, - method_name: name, - this, - arg: _, - } => { - // HACK: Resolve the this argument (`self`) in order to attribute the cache hit - // to the concrete trait implementation, rather than the dynamic trait method. - // This ensures cache hits and misses are both attributed to the same thing. - // - // Because this task already resolved, in most cases `self` should either be - // resolved, or already in the process of being resolved. - // - // However, `self` could become unloaded due to cache eviction, and this might - // trigger an otherwise unnecessary re-evalutation. - // - // This is a potentially okay trade-off as long as we don't log statistics by - // default. The alternative would be to store function ids on completed - // ResolveTrait tasks. - let trait_type = *trait_type; - let name = name.clone(); - let this = *this; - let stats = Arc::clone(stats); - turbo_tasks.run_once(Box::pin(async move { - let function_id = - PersistentTaskType::resolve_trait_method(trait_type, name, this) - .await?; - stats.increment_cache_hit(function_id); - Ok(()) - })); - } - }); + self.track_cache_hit(&task_type, turbo_tasks); task } else { - self.task_statistics().map(|stats| match &*task_type { - PersistentTaskType::Native { - fn_type: function_id, - this: _, - arg: _, - } => { - stats.increment_cache_miss(*function_id); - } - PersistentTaskType::ResolveTrait { .. } - | PersistentTaskType::ResolveNative { .. } => { - // these types re-execute themselves as `Native` after - // resolving their arguments, skip counting their - // executions here to avoid double-counting - } - }); + self.track_cache_miss(&task_type); // It's important to avoid overallocating memory as this will go into the task // cache and stay there forever. We can to be as small as possible. let (task_type_hash, task_type) = PreHashed::into_parts(task_type); @@ -622,6 +659,51 @@ impl Backend for MemoryBackend { self.insert_and_connect_fresh_task( parent_task, &self.task_cache, + &self.persistent_tasks, + 0, + task_type, + id, + task, + turbo_tasks, + ) + } + } + + fn get_or_create_transient_task( + &self, + task_type: CachedTaskType, + parent_task: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> TaskId { + let task_type = prehash_task_type(task_type); + if let Some(task) = self.lookup_and_connect_task( + parent_task, + &self.transient_task_cache, + &task_type, + turbo_tasks, + ) { + // fast pass without creating a new task + self.track_cache_hit(&task_type, turbo_tasks); + task + } else { + self.track_cache_miss(&task_type); + // It's important to avoid overallocating memory as this will go into the task + // cache and stay there forever. We can to be as small as possible. + let (task_type_hash, task_type) = PreHashed::into_parts(task_type); + let task_type = Arc::new(PreHashed::new(task_type_hash, task_type)); + // slow pass with key lock + let id = turbo_tasks.get_fresh_transient_task_id(); + let task = Task::new_transient( + // Safety: That task will hold the value, but we are still in + // control of the task + *unsafe { id.get_unchecked() }, + task_type.clone(), + ); + self.insert_and_connect_fresh_task( + parent_task, + &self.transient_task_cache, + &self.transient_tasks, + TRANSIENT_TASK_BIT, task_type, id, task, @@ -654,17 +736,18 @@ impl Backend for MemoryBackend { ) -> TaskId { let id = turbo_tasks.get_fresh_transient_task_id(); let id = id.into(); + let index = (*id - TRANSIENT_TASK_BIT) as usize; match task_type { TransientTaskType::Root(f) => { let task = Task::new_root(id, move || f() as _); // SAFETY: We have a fresh task id where nobody knows about yet - unsafe { self.memory_tasks.insert(*id as usize, task) }; + unsafe { self.transient_tasks.insert(index, task) }; Task::set_root(id, self, turbo_tasks); } TransientTaskType::Once(f) => { let task = Task::new_once(id, f); // SAFETY: We have a fresh task id where nobody knows about yet - unsafe { self.memory_tasks.insert(*id as usize, task) }; + unsafe { self.transient_tasks.insert(index, task) }; Task::set_once(id, self, turbo_tasks); } }; diff --git a/crates/turbo-tasks-memory/src/task.rs b/crates/turbo-tasks-memory/src/task.rs index f9ee262ba58f3c..d628802fa7b757 100644 --- a/crates/turbo-tasks-memory/src/task.rs +++ b/crates/turbo-tasks-memory/src/task.rs @@ -21,7 +21,7 @@ use tokio::task_local; use tracing::Span; use turbo_prehash::PreHashed; use turbo_tasks::{ - backend::{PersistentTaskType, TaskCollectiblesMap, TaskExecutionSpec}, + backend::{CachedTaskType, TaskCollectiblesMap, TaskExecutionSpec}, event::{Event, EventListener}, get_invalidator, registry, CellId, Invalidator, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, ValueTypeId, @@ -71,15 +71,16 @@ enum TaskType { Once(Box), /// A normal persistent task - Persistent { - ty: Arc>, - }, + Persistent { ty: Arc> }, + + /// A cached transient task + Transient { ty: Arc> }, } enum TaskTypeForDescription { Root, Once, - Persistent(Arc>), + Persistent(Arc>), } impl TaskTypeForDescription { @@ -88,6 +89,7 @@ impl TaskTypeForDescription { TaskType::Root(..) => Self::Root, TaskType::Once(..) => Self::Once, TaskType::Persistent { ty, .. } => Self::Persistent(ty.clone()), + TaskType::Transient { ty, .. } => Self::Persistent(ty.clone()), } } } @@ -98,6 +100,7 @@ impl Debug for TaskType { Self::Root(..) => f.debug_tuple("Root").finish(), Self::Once(..) => f.debug_tuple("Once").finish(), Self::Persistent { ty, .. } => Debug::fmt(ty, f), + Self::Transient { ty } => Debug::fmt(ty, f), } } } @@ -108,6 +111,7 @@ impl Display for TaskType { Self::Root(..) => f.debug_tuple("Root").finish(), Self::Once(..) => f.debug_tuple("Once").finish(), Self::Persistent { ty, .. } => Display::fmt(ty, f), + Self::Transient { ty } => Display::fmt(ty, f), } } } @@ -444,10 +448,7 @@ pub enum GcResult { } impl Task { - pub(crate) fn new_persistent( - id: TaskId, - task_type: Arc>, - ) -> Self { + pub(crate) fn new_persistent(id: TaskId, task_type: Arc>) -> Self { let ty = TaskType::Persistent { ty: task_type }; Self { id, @@ -457,6 +458,16 @@ impl Task { } } + pub(crate) fn new_transient(id: TaskId, task_type: Arc>) -> Self { + let ty = TaskType::Transient { ty: task_type }; + Self { + id, + ty, + state: RwLock::new(TaskMetaState::Full(Box::new(TaskState::new()))), + graph_modification_in_progress_counter: AtomicU32::new(0), + } + } + pub(crate) fn new_root( id: TaskId, functor: impl Fn() -> NativeTaskFuture + Sync + Send + 'static, @@ -492,6 +503,7 @@ impl Task { pub(crate) fn is_pure(&self) -> bool { match &self.ty { TaskType::Persistent { .. } => true, + TaskType::Transient { .. } => true, TaskType::Root(_) => false, TaskType::Once(_) => false, } @@ -500,6 +512,7 @@ impl Task { pub(crate) fn is_once(&self) -> bool { match &self.ty { TaskType::Persistent { .. } => false, + TaskType::Transient { .. } => false, TaskType::Root(_) => false, TaskType::Once(_) => true, } @@ -563,7 +576,7 @@ impl Task { } pub(crate) fn get_function_name(&self) -> Option> { - if let TaskType::Persistent { ty, .. } = &self.ty { + if let TaskType::Persistent { ty, .. } | TaskType::Transient { ty, .. } = &self.ty { Some(ty.get_name()) } else { None @@ -579,14 +592,14 @@ impl Task { TaskTypeForDescription::Root => format!("[{}] root", id), TaskTypeForDescription::Once => format!("[{}] once", id), TaskTypeForDescription::Persistent(ty) => match &***ty { - PersistentTaskType::Native { + CachedTaskType::Native { fn_type: native_fn, this: _, arg: _, } => { format!("[{}] {}", id, registry::get_function(*native_fn).name) } - PersistentTaskType::ResolveNative { + CachedTaskType::ResolveNative { fn_type: native_fn, this: _, arg: _, @@ -597,7 +610,7 @@ impl Task { registry::get_function(*native_fn).name ) } - PersistentTaskType::ResolveTrait { + CachedTaskType::ResolveTrait { trait_type, method_name: fn_name, this: _, @@ -752,8 +765,8 @@ impl Task { mutex.lock().take().expect("Task can only be executed once"), tracing::trace_span!("turbo_tasks::once_task"), ), - TaskType::Persistent { ty, .. } => match &***ty { - PersistentTaskType::Native { + TaskType::Persistent { ty, .. } | TaskType::Transient { ty, .. } => match &***ty { + CachedTaskType::Native { fn_type: native_fn, this, arg, @@ -765,7 +778,7 @@ impl Task { drop(entered); (future, span) } - PersistentTaskType::ResolveNative { + CachedTaskType::ResolveNative { fn_type: ref native_fn_id, this, arg, @@ -775,7 +788,7 @@ impl Task { let span = func.resolve_span(); let entered = span.enter(); let turbo_tasks = turbo_tasks.pin(); - let future = Box::pin(PersistentTaskType::run_resolve_native( + let future = Box::pin(CachedTaskType::run_resolve_native( native_fn_id, *this, &**arg, @@ -784,7 +797,7 @@ impl Task { drop(entered); (future, span) } - PersistentTaskType::ResolveTrait { + CachedTaskType::ResolveTrait { trait_type: trait_type_id, method_name: name, this, @@ -796,7 +809,7 @@ impl Task { let entered = span.enter(); let name = name.clone(); let turbo_tasks = turbo_tasks.pin(); - let future = Box::pin(PersistentTaskType::run_resolve_trait( + let future = Box::pin(CachedTaskType::run_resolve_trait( trait_type_id, name, *this, diff --git a/crates/turbo-tasks-memory/tests/task_statistics.rs b/crates/turbo-tasks-memory/tests/task_statistics.rs index 98c1dae4e8be70..818df674821543 100644 --- a/crates/turbo-tasks-memory/tests/task_statistics.rs +++ b/crates/turbo-tasks-memory/tests/task_statistics.rs @@ -184,13 +184,13 @@ async fn test_no_execution() { .await; } -// Internally, this function uses `PersistentTaskType::Native`. +// Internally, this function uses `CachedTaskType::Native`. #[turbo_tasks::function] fn double(val: u64) -> Vc { Vc::cell(val * 2) } -// Internally, this function uses `PersistentTaskType::ResolveNative`. +// Internally, this function uses `CachedTaskType::ResolveNative`. #[turbo_tasks::function] async fn double_vc(val: Vc) -> Result> { let val = *val.await?; diff --git a/crates/turbo-tasks/src/backend.rs b/crates/turbo-tasks/src/backend.rs index dbe51da7923524..985d7930af54e6 100644 --- a/crates/turbo-tasks/src/backend.rs +++ b/crates/turbo-tasks/src/backend.rs @@ -34,7 +34,7 @@ pub enum TaskType { /// Tasks that can persist between sessions and potentially /// shared globally - Persistent(PersistentTaskType), + Persistent(CachedTaskType), } type TransientTaskRoot = @@ -67,7 +67,7 @@ impl Debug for TransientTaskType { } #[derive(Debug, PartialEq, Eq, Hash)] -pub enum PersistentTaskType { +pub enum CachedTaskType { /// A normal task execution a native (rust) function Native { fn_type: FunctionId, @@ -95,7 +95,7 @@ pub enum PersistentTaskType { }, } -impl Display for PersistentTaskType { +impl Display for CachedTaskType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(&self.get_name()) } @@ -144,7 +144,7 @@ mod ser { type Value = FunctionAndArg<'de>; fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - write!(formatter, "a valid PersistentTaskType") + write!(formatter, "a valid CachedTaskType") } fn visit_seq(self, mut seq: A) -> std::result::Result @@ -167,13 +167,13 @@ mod ser { } } - impl Serialize for PersistentTaskType { + impl Serialize for CachedTaskType { fn serialize(&self, serializer: S) -> std::result::Result where S: ser::Serializer, { match self { - PersistentTaskType::Native { fn_type, this, arg } => { + CachedTaskType::Native { fn_type, this, arg } => { let mut s = serializer.serialize_seq(Some(3))?; s.serialize_element::(&0)?; s.serialize_element(&FunctionAndArg::Borrowed { @@ -183,7 +183,7 @@ mod ser { s.serialize_element(this)?; s.end() } - PersistentTaskType::ResolveNative { fn_type, this, arg } => { + CachedTaskType::ResolveNative { fn_type, this, arg } => { let mut s = serializer.serialize_seq(Some(3))?; s.serialize_element::(&1)?; s.serialize_element(&FunctionAndArg::Borrowed { @@ -193,7 +193,7 @@ mod ser { s.serialize_element(this)?; s.end() } - PersistentTaskType::ResolveTrait { + CachedTaskType::ResolveTrait { trait_type, method_name, this, @@ -218,7 +218,7 @@ mod ser { } } - impl<'de> Deserialize<'de> for PersistentTaskType { + impl<'de> Deserialize<'de> for CachedTaskType { fn deserialize>(deserializer: D) -> Result { #[derive(Deserialize)] enum VariantKind { @@ -228,10 +228,10 @@ mod ser { } struct Visitor; impl<'de> serde::de::Visitor<'de> for Visitor { - type Value = PersistentTaskType; + type Value = CachedTaskType; fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - write!(formatter, "a valid PersistentTaskType") + write!(formatter, "a valid CachedTaskType") } fn visit_seq(self, mut seq: A) -> std::result::Result @@ -252,7 +252,7 @@ mod ser { let this = seq .next_element()? .ok_or_else(|| serde::de::Error::invalid_length(2, &self))?; - Ok(PersistentTaskType::Native { fn_type, this, arg }) + Ok(CachedTaskType::Native { fn_type, this, arg }) } 1 => { let FunctionAndArg::Owned { fn_type, arg } = seq @@ -264,7 +264,7 @@ mod ser { let this = seq .next_element()? .ok_or_else(|| serde::de::Error::invalid_length(2, &self))?; - Ok(PersistentTaskType::ResolveNative { fn_type, this, arg }) + Ok(CachedTaskType::ResolveNative { fn_type, this, arg }) } 2 => { let trait_type = seq @@ -284,7 +284,7 @@ mod ser { let arg = seq .next_element_seed(method.arg_deserializer)? .ok_or_else(|| serde::de::Error::invalid_length(3, &self))?; - Ok(PersistentTaskType::ResolveTrait { + Ok(CachedTaskType::ResolveTrait { trait_type, method_name, this, @@ -300,7 +300,7 @@ mod ser { } } -impl PersistentTaskType { +impl CachedTaskType { /// Returns the name of the function in the code. Trait methods are /// formatted as `TraitName::method_name`. /// @@ -308,17 +308,17 @@ impl PersistentTaskType { /// it can return a `&'static str` in many cases. pub fn get_name(&self) -> Cow<'static, str> { match self { - PersistentTaskType::Native { + CachedTaskType::Native { fn_type: native_fn, this: _, arg: _, } - | PersistentTaskType::ResolveNative { + | CachedTaskType::ResolveNative { fn_type: native_fn, this: _, arg: _, } => Cow::Borrowed(®istry::get_function(*native_fn).name), - PersistentTaskType::ResolveTrait { + CachedTaskType::ResolveTrait { trait_type: trait_id, method_name: fn_name, this: _, @@ -522,7 +522,14 @@ pub trait Backend: Sync + Send { fn get_or_create_persistent_task( &self, - task_type: PersistentTaskType, + task_type: CachedTaskType, + parent_task: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> TaskId; + + fn get_or_create_transient_task( + &self, + task_type: CachedTaskType, parent_task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi, ) -> TaskId; @@ -551,7 +558,7 @@ pub trait Backend: Sync + Send { fn dispose_root_task(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi); } -impl PersistentTaskType { +impl CachedTaskType { pub async fn run_resolve_native( fn_id: FunctionId, mut this: Option, @@ -657,7 +664,7 @@ pub(crate) mod tests { fn test_get_name() { crate::register(); assert_eq!( - PersistentTaskType::Native { + CachedTaskType::Native { fn_type: *MOCK_FUNC_TASK_FUNCTION_ID, this: None, arg: Box::new(()), @@ -666,7 +673,7 @@ pub(crate) mod tests { "mock_func_task", ); assert_eq!( - PersistentTaskType::ResolveTrait { + CachedTaskType::ResolveTrait { trait_type: *MOCKTRAIT_TRAIT_TYPE_ID, method_name: "mock_method_task".into(), this: RawVc::TaskOutput(unsafe { TaskId::new_unchecked(1) }), diff --git a/crates/turbo-tasks/src/lib.rs b/crates/turbo-tasks/src/lib.rs index 166404a9000390..dbc013b99aa6e7 100644 --- a/crates/turbo-tasks/src/lib.rs +++ b/crates/turbo-tasks/src/lib.rs @@ -81,7 +81,7 @@ use auto_hash_map::AutoSet; pub use collectibles::CollectiblesSource; pub use completion::{Completion, Completions}; pub use display::ValueToString; -pub use id::{FunctionId, TaskId, TraitTypeId, ValueTypeId}; +pub use id::{FunctionId, TaskId, TraitTypeId, ValueTypeId, TRANSIENT_TASK_BIT}; pub use invalidation::{ DynamicEqHash, InvalidationReason, InvalidationReasonKind, InvalidationReasonSet, }; @@ -90,7 +90,8 @@ pub use magic_any::MagicAny; pub use manager::{ dynamic_call, dynamic_this_call, emit, get_invalidator, mark_finished, mark_stateful, prevent_gc, run_once, run_once_with_reason, spawn_blocking, spawn_thread, trait_call, - turbo_tasks, CurrentCellRef, Invalidator, TurboTasks, TurboTasksApi, TurboTasksBackendApi, + transient_dynamic_call, transient_dynamic_this_call, transient_trait_call, turbo_tasks, + CurrentCellRef, Invalidator, TurboTasks, TurboTasksApi, TurboTasksBackendApi, TurboTasksCallApi, Unused, UpdateInfo, }; pub use native_function::NativeFunction; diff --git a/crates/turbo-tasks/src/manager.rs b/crates/turbo-tasks/src/manager.rs index 11d3056e4a871a..6764d0a59d7d86 100644 --- a/crates/turbo-tasks/src/manager.rs +++ b/crates/turbo-tasks/src/manager.rs @@ -25,7 +25,7 @@ use turbo_tasks_malloc::TurboMalloc; use crate::{ backend::{ - Backend, CellContent, PersistentTaskType, TaskCollectiblesMap, TaskExecutionSpec, + Backend, CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec, TransientTaskType, }, capture_future::{self, CaptureFuture}, @@ -44,10 +44,37 @@ use crate::{ }; pub trait TurboTasksCallApi: Sync + Send { + /// Calls a native function with arguments. Resolves arguments when needed + /// with a wrapper task. fn dynamic_call(&self, func: FunctionId, arg: Box) -> RawVc; + fn transient_dynamic_call(&self, func: FunctionId, arg: Box) -> RawVc { + self.dynamic_call(func, arg) + } + /// Calls a native function with arguments. Resolves arguments when needed + /// with a wrapper task. fn dynamic_this_call(&self, func: FunctionId, this: RawVc, arg: Box) -> RawVc; + fn transient_dynamic_this_call( + &self, + func: FunctionId, + this: RawVc, + arg: Box, + ) -> RawVc { + self.dynamic_this_call(func, this, arg) + } + /// Call a native function with arguments. + /// All inputs must be resolved. fn native_call(&self, func: FunctionId, arg: Box) -> RawVc; + fn transient_native_call(&self, func: FunctionId, arg: Box) -> RawVc { + self.native_call(func, arg) + } + /// Call a native function with arguments. + /// All inputs must be resolved. fn this_call(&self, func: FunctionId, this: RawVc, arg: Box) -> RawVc; + fn transient_this_call(&self, func: FunctionId, this: RawVc, arg: Box) -> RawVc { + self.this_call(func, this, arg) + } + /// Calls a trait method with arguments. First input is the `self` object. + /// Uses a wrapper task to resolve fn trait_call( &self, trait_type: TraitTypeId, @@ -55,6 +82,15 @@ pub trait TurboTasksCallApi: Sync + Send { this: RawVc, arg: Box, ) -> RawVc; + fn transient_trait_call( + &self, + trait_type: TraitTypeId, + trait_fn_name: Cow<'static, str>, + this: RawVc, + arg: Box, + ) -> RawVc { + self.trait_call(trait_type, trait_fn_name, this, arg) + } fn run_once( &self, @@ -345,11 +381,21 @@ impl TurboTasks { Ok(rx.await?) } - /// Call a native function with arguments. - /// All inputs must be resolved. pub(crate) fn native_call(&self, func: FunctionId, arg: Box) -> RawVc { RawVc::TaskOutput(self.backend.get_or_create_persistent_task( - PersistentTaskType::Native { + CachedTaskType::Native { + fn_type: func, + this: None, + arg, + }, + current_task("turbo_function calls"), + self, + )) + } + + pub(crate) fn transient_native_call(&self, func: FunctionId, arg: Box) -> RawVc { + RawVc::TaskOutput(self.backend.get_or_create_transient_task( + CachedTaskType::Native { fn_type: func, this: None, arg, @@ -359,11 +405,26 @@ impl TurboTasks { )) } - /// Call a native function with arguments. - /// All inputs must be resolved. pub(crate) fn this_call(&self, func: FunctionId, this: RawVc, arg: Box) -> RawVc { RawVc::TaskOutput(self.backend.get_or_create_persistent_task( - PersistentTaskType::Native { + CachedTaskType::Native { + fn_type: func, + this: Some(this), + arg, + }, + current_task("turbo_function calls"), + self, + )) + } + + pub(crate) fn transient_this_call( + &self, + func: FunctionId, + this: RawVc, + arg: Box, + ) -> RawVc { + RawVc::TaskOutput(self.backend.get_or_create_transient_task( + CachedTaskType::Native { fn_type: func, this: Some(this), arg, @@ -373,14 +434,28 @@ impl TurboTasks { )) } - /// Calls a native function with arguments. Resolves arguments when needed - /// with a wrapper task. pub fn dynamic_call(&self, func: FunctionId, arg: Box) -> RawVc { if registry::get_function(func).arg_meta.is_resolved(&*arg) { self.native_call(func, arg) } else { RawVc::TaskOutput(self.backend.get_or_create_persistent_task( - PersistentTaskType::ResolveNative { + CachedTaskType::ResolveNative { + fn_type: func, + this: None, + arg, + }, + current_task("turbo_function calls"), + self, + )) + } + } + + pub fn transient_dynamic_call(&self, func: FunctionId, arg: Box) -> RawVc { + if registry::get_function(func).arg_meta.is_resolved(&*arg) { + self.transient_native_call(func, arg) + } else { + RawVc::TaskOutput(self.backend.get_or_create_transient_task( + CachedTaskType::ResolveNative { fn_type: func, this: None, arg, @@ -391,8 +466,6 @@ impl TurboTasks { } } - /// Calls a native function with arguments. Resolves arguments when needed - /// with a wrapper task. pub fn dynamic_this_call( &self, func: FunctionId, @@ -403,7 +476,28 @@ impl TurboTasks { self.this_call(func, this, arg) } else { RawVc::TaskOutput(self.backend.get_or_create_persistent_task( - PersistentTaskType::ResolveNative { + CachedTaskType::ResolveNative { + fn_type: func, + this: Some(this), + arg, + }, + current_task("turbo_function calls"), + self, + )) + } + } + + pub fn transient_dynamic_this_call( + &self, + func: FunctionId, + this: RawVc, + arg: Box, + ) -> RawVc { + if this.is_resolved() && registry::get_function(func).arg_meta.is_resolved(&*arg) { + self.transient_this_call(func, this, arg) + } else { + RawVc::TaskOutput(self.backend.get_or_create_transient_task( + CachedTaskType::ResolveNative { fn_type: func, this: Some(this), arg, @@ -414,8 +508,6 @@ impl TurboTasks { } } - /// Calls a trait method with arguments. First input is the `self` object. - /// Uses a wrapper task to resolve pub fn trait_call( &self, trait_type: TraitTypeId, @@ -439,7 +531,41 @@ impl TurboTasks { // create a wrapper task to resolve all inputs RawVc::TaskOutput(self.backend.get_or_create_persistent_task( - PersistentTaskType::ResolveTrait { + CachedTaskType::ResolveTrait { + trait_type, + method_name: trait_fn_name, + this, + arg, + }, + current_task("turbo_function calls"), + self, + )) + } + + pub fn transient_trait_call( + &self, + trait_type: TraitTypeId, + mut trait_fn_name: Cow<'static, str>, + this: RawVc, + arg: Box, + ) -> RawVc { + // avoid creating a wrapper task if self is already resolved + // for resolved cells we already know the value type so we can lookup the + // function + if let RawVc::TaskCell(_, CellId { type_id, .. }) = this { + match get_trait_method(trait_type, type_id, trait_fn_name) { + Ok(native_fn) => { + return self.transient_dynamic_this_call(native_fn, this, arg); + } + Err(name) => { + trait_fn_name = name; + } + } + } + + // create a wrapper task to resolve all inputs + RawVc::TaskOutput(self.backend.get_or_create_transient_task( + CachedTaskType::ResolveTrait { trait_type, method_name: trait_fn_name, this, @@ -829,15 +955,32 @@ impl TurboTasksCallApi for TurboTasks { fn dynamic_call(&self, func: FunctionId, arg: Box) -> RawVc { self.dynamic_call(func, arg) } + fn transient_dynamic_call(&self, func: FunctionId, arg: Box) -> RawVc { + self.transient_dynamic_call(func, arg) + } fn dynamic_this_call(&self, func: FunctionId, this: RawVc, arg: Box) -> RawVc { self.dynamic_this_call(func, this, arg) } + fn transient_dynamic_this_call( + &self, + func: FunctionId, + this: RawVc, + arg: Box, + ) -> RawVc { + self.transient_dynamic_this_call(func, this, arg) + } fn native_call(&self, func: FunctionId, arg: Box) -> RawVc { self.native_call(func, arg) } + fn transient_native_call(&self, func: FunctionId, arg: Box) -> RawVc { + self.transient_native_call(func, arg) + } fn this_call(&self, func: FunctionId, this: RawVc, arg: Box) -> RawVc { self.this_call(func, this, arg) } + fn transient_this_call(&self, func: FunctionId, this: RawVc, arg: Box) -> RawVc { + self.transient_this_call(func, this, arg) + } fn trait_call( &self, trait_type: TraitTypeId, @@ -847,6 +990,15 @@ impl TurboTasksCallApi for TurboTasks { ) -> RawVc { self.trait_call(trait_type, trait_fn_name, this, arg) } + fn transient_trait_call( + &self, + trait_type: TraitTypeId, + trait_fn_name: Cow<'static, str>, + this: RawVc, + arg: Box, + ) -> RawVc { + self.transient_trait_call(trait_type, trait_fn_name, this, arg) + } #[track_caller] fn run_once( @@ -1332,12 +1484,24 @@ pub fn dynamic_call(func: FunctionId, arg: Box) -> RawVc { with_turbo_tasks(|tt| tt.dynamic_call(func, arg)) } +/// Calls [`TurboTasks::transient_dynamic_call`] for the current turbo tasks +/// instance. +pub fn transient_dynamic_call(func: FunctionId, arg: Box) -> RawVc { + with_turbo_tasks(|tt| tt.transient_dynamic_call(func, arg)) +} + /// Calls [`TurboTasks::dynamic_this_call`] for the current turbo tasks /// instance. pub fn dynamic_this_call(func: FunctionId, this: RawVc, arg: Box) -> RawVc { with_turbo_tasks(|tt| tt.dynamic_this_call(func, this, arg)) } +/// Calls [`TurboTasks::transient_dynamic_this_call`] for the current turbo +/// tasks instance. +pub fn transient_dynamic_this_call(func: FunctionId, this: RawVc, arg: Box) -> RawVc { + with_turbo_tasks(|tt| tt.transient_dynamic_this_call(func, this, arg)) +} + /// Calls [`TurboTasks::trait_call`] for the current turbo tasks instance. pub fn trait_call( trait_type: TraitTypeId, @@ -1348,6 +1512,17 @@ pub fn trait_call( with_turbo_tasks(|tt| tt.trait_call(trait_type, trait_fn_name, this, arg)) } +/// Calls [`TurboTasks::transient_trait_call`] for the current turbo tasks +/// instance. +pub fn transient_trait_call( + trait_type: TraitTypeId, + trait_fn_name: Cow<'static, str>, + this: RawVc, + arg: Box, +) -> RawVc { + with_turbo_tasks(|tt| tt.transient_trait_call(trait_type, trait_fn_name, this, arg)) +} + pub fn turbo_tasks() -> Arc { TURBO_TASKS.with(|arc| arc.clone()) } diff --git a/crates/turbo-tasks/src/persisted_graph.rs b/crates/turbo-tasks/src/persisted_graph.rs index dffb1086c79e30..086af29d02d993 100644 --- a/crates/turbo-tasks/src/persisted_graph.rs +++ b/crates/turbo-tasks/src/persisted_graph.rs @@ -2,7 +2,7 @@ use anyhow::Result; use serde::{Deserialize, Serialize}; use crate::{ - backend::{CellContent, PersistentTaskType}, + backend::{CachedTaskType, CellContent}, CellId, RawVc, TaskId, }; @@ -105,14 +105,14 @@ pub trait PersistedGraph: Sync + Send { /// returns false if that were too many fn lookup( &self, - partial_task_type: &PersistentTaskType, + partial_task_type: &CachedTaskType, api: &dyn PersistedGraphApi, ) -> Result; /// lookup one cache entry fn lookup_one( &self, - task_type: &PersistentTaskType, + task_type: &CachedTaskType, api: &dyn PersistedGraphApi, ) -> Result>; @@ -197,9 +197,9 @@ pub trait PersistedGraph: Sync + Send { } pub trait PersistedGraphApi { - fn get_or_create_task_type(&self, ty: PersistentTaskType) -> TaskId; + fn get_or_create_task_type(&self, ty: CachedTaskType) -> TaskId; - fn lookup_task_type(&self, id: TaskId) -> &PersistentTaskType; + fn lookup_task_type(&self, id: TaskId) -> &CachedTaskType; } /* @@ -207,8 +207,8 @@ pub trait PersistedGraphApi { read: data: (TaskId) => (TaskData) - cache: (PersistentTaskType) => (TaskId) - type: (TaskId) => (PersistentTaskType) + cache: (CachedTaskType) => (TaskId) + type: (TaskId) => (CachedTaskType) read_dependents: @@ -238,7 +238,7 @@ impl PersistedGraph for () { fn lookup( &self, - _partial_task_type: &PersistentTaskType, + _partial_task_type: &CachedTaskType, _api: &dyn PersistedGraphApi, ) -> Result { Ok(false) @@ -246,7 +246,7 @@ impl PersistedGraph for () { fn lookup_one( &self, - _task_type: &PersistentTaskType, + _task_type: &CachedTaskType, _api: &dyn PersistedGraphApi, ) -> Result> { Ok(None) diff --git a/crates/turbo-tasks/src/task/task_input.rs b/crates/turbo-tasks/src/task/task_input.rs index ef7cd47b5fe3c5..22ede253865a68 100644 --- a/crates/turbo-tasks/src/task/task_input.rs +++ b/crates/turbo-tasks/src/task/task_input.rs @@ -108,7 +108,7 @@ where } fn is_transient(&self) -> bool { - false + self.node.get_task_id().is_transient() } async fn resolve(&self) -> Result { @@ -125,7 +125,7 @@ where } fn is_transient(&self) -> bool { - false + self.node.node.get_task_id().is_transient() } } @@ -190,7 +190,7 @@ where impl TaskInput for TransientInstance where - T: Sync + Send, + T: Sync + Send + 'static, { fn is_transient(&self) -> bool { true