diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 153f5c775..21b5c021b 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -2,6 +2,7 @@ use std::rc::Rc; use std::cell::{RefCell, RefMut}; +use std::cmp::Ordering; use std::any::Any; use std::str::FromStr; use std::time::{Instant, Duration}; @@ -13,10 +14,14 @@ use crate::communication::{Allocate, Data, Push, Pull}; use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::scheduling::{Schedule, Scheduler, Activations}; use crate::progress::timestamp::{Refines}; -use crate::progress::SubgraphBuilder; +use crate::progress::{ChangeBatch, SubgraphBuilder}; use crate::progress::operate::Operate; +use crate::progress::frontier::{AntichainRef, MutableAntichain}; use crate::dataflow::scopes::Child; use crate::logging::TimelyLogger; +use crate::order::PartialOrder; + +const SCHEDULING_CHANNEL: usize = 0; /// Different ways in which timely's progress tracking can work. /// @@ -216,12 +221,16 @@ pub struct Worker { identifiers: Rc>, // dataflows: Rc>>, dataflows: Rc>>, + frozen_dataflows: Rc>>, + scheduling_frontier: Rc>, + dataflow_counter: Rc>, logging: Rc>>, activations: Rc>, active_dataflows: Vec, + // Temporary storage for channel identifiers during dataflow construction. // These are then associated with a dataflow once constructed. temp_channel_ids: Rc>>, @@ -260,16 +269,21 @@ impl Scheduler for Worker { impl Worker { /// Allocates a new `Worker` bound to a channel allocator. - pub fn new(config: Config, c: A) -> Worker { + pub fn new(config: Config, mut c: A) -> Worker { let now = Instant::now(); let index = c.index(); + + let scheduling_frontier = SchedulingFrontier::new(&mut c); + Worker { config, timer: now, paths: Default::default(), allocator: Rc::new(RefCell::new(c)), - identifiers: Default::default(), + identifiers: Rc::new(RefCell::new(SCHEDULING_CHANNEL + 1)), dataflows: Default::default(), + frozen_dataflows: Default::default(), + scheduling_frontier: Rc::new(RefCell::new(scheduling_frontier)), dataflow_counter: Default::default(), logging: Rc::new(RefCell::new(crate::logging_core::Registry::new(now, index))), activations: Rc::new(RefCell::new(Activations::new(now))), @@ -332,6 +346,7 @@ impl Worker { /// ``` pub fn step_or_park(&mut self, duration: Option) -> bool { + let mut activate_scheduling_frontier = false; { // Process channel events. Activate responders. let mut allocator = self.allocator.borrow_mut(); allocator.receive(); @@ -345,13 +360,18 @@ impl Worker { // on the basis of non-empty channels. // TODO: This is a sloppy way to deal // with channels that may not be alloc'd. - if let Some(path) = paths.get(&channel) { + if channel == SCHEDULING_CHANNEL { + activate_scheduling_frontier = true; + } else if let Some(path) = paths.get(&channel) { self.activations .borrow_mut() .activate(&path[..]); } } } + if activate_scheduling_frontier { + self.scheduling_frontier.borrow_mut().step(); + } // Organize activations. self.activations @@ -400,15 +420,35 @@ impl Worker { paths.remove(&channel); } entry.remove_entry(); + self.scheduling_frontier.borrow_mut().update([(DataflowId::Installed(index), -1)]); } } } } + // Drop all frozen dataflows that are not beyond the scheduling frontier + { + let mut paths = self.paths.borrow_mut(); + let mut frozen_dataflows = self.frozen_dataflows.borrow_mut(); + let scheduling_frontier = self.scheduling_frontier.borrow(); + frozen_dataflows.retain(|id, dataflow| { + if !scheduling_frontier.frontier().less_equal(&DataflowId::Installed(*id)) { + // Garbage collect channel_id to path information. + for channel in dataflow.channel_ids.drain(..) { + paths.remove(&channel); + } + false + } else { + true + } + }); + } + + // Clean up, indicate if dataflows remain. self.logging.borrow_mut().flush(); self.allocator.borrow_mut().release(); - !self.dataflows.borrow().is_empty() + !self.dataflows.borrow().is_empty() || !self.frozen_dataflows.borrow().is_empty() } /// Calls `self.step()` as long as `func` evaluates to true. @@ -671,17 +711,17 @@ impl Worker { /// Drops an identified dataflow. /// - /// This method removes the identified dataflow, which will no longer be scheduled. - /// Various other resources will be cleaned up, though the method is currently in - /// public beta rather than expected to work. Please report all crashes and unmet - /// expectations! - pub fn drop_dataflow(&mut self, dataflow_identifier: usize) { - if let Some(mut entry) = self.dataflows.borrow_mut().remove(&dataflow_identifier) { - // Garbage collect channel_id to path information. - let mut paths = self.paths.borrow_mut(); - for channel in entry.channel_ids.drain(..) { - paths.remove(&channel); - } + /// This method immediately stops scheduling the the identified dataflow. Once all other + /// workers also stop scheduling the identified dataflow, due to an explicit drop or due to + /// graceful termination, the worker will proceed with dropping any resources held by this + /// dataflow. + /// + /// If the identified dataflow is in the process of being constructed, this function is a + /// no-op. + pub fn drop_dataflow(&mut self, id: usize) { + if let Some(entry) = self.dataflows.borrow_mut().remove(&id) { + self.frozen_dataflows.borrow_mut().insert(id, entry); + self.scheduling_frontier.borrow_mut().update([(DataflowId::Installed(id), -1)]); } } @@ -693,20 +733,27 @@ impl Worker { *self.dataflow_counter.borrow() } - /// List the current dataflow indices. + /// List the current dataflow indices that may be scheduled. pub fn installed_dataflows(&self) -> Vec { self.dataflows.borrow().keys().cloned().collect() } /// True if there is at least one dataflow under management. pub fn has_dataflows(&self) -> bool { - !self.dataflows.borrow().is_empty() + !self.dataflows.borrow().is_empty() || !self.frozen_dataflows.borrow().is_empty() } // Acquire a new distinct dataflow identifier. fn allocate_dataflow_index(&mut self) -> usize { *self.dataflow_counter.borrow_mut() += 1; - *self.dataflow_counter.borrow() - 1 + let new_id = *self.dataflow_counter.borrow() - 1; + + self.scheduling_frontier.borrow_mut().update([ + (DataflowId::Installed(new_id), 1), + (DataflowId::Future(new_id + 1), 1), + (DataflowId::Future(new_id), -1) + ]); + new_id } } @@ -721,6 +768,8 @@ impl Clone for Worker { allocator: self.allocator.clone(), identifiers: self.identifiers.clone(), dataflows: self.dataflows.clone(), + frozen_dataflows: self.frozen_dataflows.clone(), + scheduling_frontier: self.scheduling_frontier.clone(), dataflow_counter: self.dataflow_counter.clone(), logging: self.logging.clone(), activations: self.activations.clone(), @@ -776,3 +825,89 @@ impl Drop for Wrapper { self.resources = None; } } + + +#[derive(Debug, Eq, PartialEq, Clone, Abomonation, Serialize, Deserialize)] +/// A partial order representing the scheduling frontier. +enum DataflowId { + /// A lower bound on identifiers of future dataflows. + Future(usize), + /// An installed dataflow identifier. + Installed(usize), +} + +// A manual implementation of Ord to ensure that it is compatible with the PartialOrder below +impl Ord for DataflowId { + fn cmp(&self, other: &Self) -> Ordering { + match (self, other) { + (Self::Installed(this), Self::Installed(other)) => this.cmp(other), + (Self::Future(this), Self::Future(other)) => this.cmp(other), + (Self::Future(_), Self::Installed(_)) => Ordering::Less, + (Self::Installed(_), Self::Future(_)) => Ordering::Greater, + } + } +} +impl PartialOrd for DataflowId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Default for DataflowId { + fn default() -> Self { + Self::Future(0) + } +} + +impl PartialOrder for DataflowId { + fn less_equal(&self, other: &Self) -> bool { + match (self, other) { + (Self::Future(lower), Self::Installed(id)) => lower <= id, + (Self::Installed(this), Self::Installed(other)) => this == other, + (Self::Future(this), Self::Future(other)) => this <= other, + (Self::Installed(_), Self::Future(_)) => false, + } + } +} + +/// Keeps track of the scheduling frontier and broadcasts any local updates to it to the other +/// workers. +struct SchedulingFrontier { + frontier: MutableAntichain, + pushers: Vec>>>>, + puller: Box>>>, +} + +impl SchedulingFrontier { + fn new(alloc: &mut A) -> Self { + let (pushers, puller) = alloc.allocate(SCHEDULING_CHANNEL); + let mut frontier = MutableAntichain::new(); + frontier.update_iter([(DataflowId::default(), alloc.peers() as i64)]); + Self { + frontier, + pushers, + puller, + } + } + + fn frontier(&self) -> AntichainRef<'_, DataflowId> { + self.frontier.frontier() + } + + fn step(&mut self) { + // TODO: reuse allocations + while let Some(mut msg) = self.puller.recv() { + self.frontier.update_iter(msg.as_mut().iter().cloned()); + } + } + + fn update>(&mut self, iter: I) { + let mut change_batch = ChangeBatch::new(); + change_batch.extend(iter.into_iter()); + // TODO: reduce clones and consolidate updates into one push + for pusher in self.pushers.iter_mut() { + pusher.send(Message::from_typed(change_batch.clone())); + pusher.done(); + } + } +}