Skip to content

Commit

Permalink
automatically shrink aggregation lists (#8657)
Browse files Browse the repository at this point in the history
### Description

When unloading tasks, edges from the aggregation structure are removed,
but their memory is not reclaimed as Vecs don't shrink automatically.

This adds a `shrink_amortized` method which shrinks the Vec/HashMap when
capacity is 2 times larger than length.

This also calls `shrink_to_fit` when running GC on a task.

### Testing Instructions

<!--
  Give a quick description of steps to test your changes.
-->
  • Loading branch information
sokra authored Jul 4, 2024
1 parent ca9b635 commit 0df64d5
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 49 deletions.
19 changes: 19 additions & 0 deletions crates/turbo-tasks-auto-hash-map/src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,25 @@ impl<K: Eq + Hash, V, H: BuildHasher + Default, const I: usize> AutoMap<K, V, H,
}
}
}

pub fn shrink_amortized(&mut self) {
match self {
AutoMap::List(list) => {
if list.capacity() > list.len() * 3 {
list.shrink_to_fit();
}
}
AutoMap::Map(map) => {
if map.len() <= MIN_HASH_SIZE {
let mut list = SmallVec::with_capacity(map.len());
list.extend(map.drain());
*self = AutoMap::List(list);
} else if map.capacity() > map.len() * 3 {
map.shrink_to_fit();
}
}
}
}
}

impl<K: Eq + Hash, V, H: BuildHasher, const I: usize> AutoMap<K, V, H, I> {
Expand Down
29 changes: 20 additions & 9 deletions crates/turbo-tasks-memory/src/aggregation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,6 @@ pub enum AggregationNode<I, D> {
Aggegating(Box<AggegatingNode<I, D>>),
}

impl<I, D> AggregationNode<I, D> {
pub fn new() -> Self {
Self::Leaf {
aggregation_number: 0,
uppers: CountHashSet::new(),
}
}
}

/// The aggregation node structure for aggregating nodes.
pub struct AggegatingNode<I, D> {
aggregation_number: u32,
Expand All @@ -64,6 +55,26 @@ pub struct AggegatingNode<I, D> {
}

impl<I, A> AggregationNode<I, A> {
pub fn new() -> Self {
Self::Leaf {
aggregation_number: 0,
uppers: CountHashSet::new(),
}
}

pub fn shrink_to_fit(&mut self)
where
I: Hash + Eq,
{
match self {
AggregationNode::Leaf { uppers, .. } => uppers.shrink_to_fit(),
AggregationNode::Aggegating(aggregating) => {
aggregating.uppers.shrink_to_fit();
aggregating.followers.shrink_to_fit();
}
}
}

/// Returns the aggregation number of the node.
pub fn aggregation_number(&self) -> u32 {
match self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ impl<I: Clone + Eq + Hash, D> AggregationNode<I, D> {
None
}
RemoveIfEntryResult::Removed => {
aggregating.followers.shrink_amortized();
let uppers = aggregating.uppers.iter().cloned().collect::<StackVec<_>>();
start_in_progress_all(ctx, &uppers);
self.finish_in_progress(ctx, balance_queue, upper_id);
Expand Down Expand Up @@ -58,6 +59,7 @@ impl<I: Clone + Eq + Hash, D> AggregationNode<I, D> {
match aggregating.followers.remove_if_entry(follower_id) {
RemoveIfEntryResult::PartiallyRemoved => None,
RemoveIfEntryResult::Removed => {
aggregating.followers.shrink_amortized();
let uppers = aggregating.uppers.iter().cloned().collect::<StackVec<_>>();
start_in_progress_all(ctx, &uppers);
Some(PreparedNotifyLostFollower::RemovedFollower {
Expand Down Expand Up @@ -162,6 +164,7 @@ impl<C: AggregationContext> PreparedInternalOperation<C> for PreparedNotifyLostF
return;
}
RemoveIfEntryResult::Removed => {
aggregating.followers.shrink_amortized();
let uppers =
aggregating.uppers.iter().cloned().collect::<StackVec<_>>();
start_in_progress_all(ctx, &uppers);
Expand Down
26 changes: 14 additions & 12 deletions crates/turbo-tasks-memory/src/aggregation/uppers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,16 @@ pub fn remove_upper_count<C: AggregationContext>(
upper_id: &C::NodeRef,
count: usize,
) {
let removed = match &mut *node {
AggregationNode::Leaf { uppers, .. } => uppers.remove_clonable_count(upper_id, count),
let uppers = match &mut *node {
AggregationNode::Leaf { uppers, .. } => uppers,
AggregationNode::Aggegating(aggegating) => {
let AggegatingNode { ref mut uppers, .. } = **aggegating;
uppers.remove_clonable_count(upper_id, count)
uppers
}
};
let removed = uppers.remove_clonable_count(upper_id, count);
if removed {
uppers.shrink_amortized();
on_removed(ctx, balance_queue, node, upper_id);
}
}
Expand All @@ -185,20 +187,20 @@ pub fn remove_positive_upper_count<C: AggregationContext>(
upper_id: &C::NodeRef,
count: usize,
) -> RemovePositiveUpperCountResult {
let RemovePositiveCountResult {
removed,
removed_count,
count,
} = match &mut *node {
AggregationNode::Leaf { uppers, .. } => {
uppers.remove_positive_clonable_count(upper_id, count)
}
let uppers = match &mut *node {
AggregationNode::Leaf { uppers, .. } => uppers,
AggregationNode::Aggegating(aggegating) => {
let AggegatingNode { ref mut uppers, .. } = **aggegating;
uppers.remove_positive_clonable_count(upper_id, count)
uppers
}
};
let RemovePositiveCountResult {
removed,
removed_count,
count,
} = uppers.remove_positive_clonable_count(upper_id, count);
if removed {
uppers.shrink_amortized();
on_removed(ctx, balance_queue, node, upper_id);
}
RemovePositiveUpperCountResult {
Expand Down
10 changes: 10 additions & 0 deletions crates/turbo-tasks-memory/src/count_hash_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,16 @@ impl<T: Eq + Hash, H: BuildHasher + Default> CountHashSet<T, H> {
None => 0,
}
}

/// Frees unused memory
pub fn shrink_to_fit(&mut self) {
self.inner.shrink_to_fit();
}

/// Frees unused memory in an amortized way
pub fn shrink_amortized(&mut self) {
self.inner.shrink_amortized()
}
}

impl<T: Eq + Hash + Clone, H: BuildHasher + Default> CountHashSet<T, H> {
Expand Down
65 changes: 37 additions & 28 deletions crates/turbo-tasks-memory/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1513,42 +1513,51 @@ impl Task {

let mut cells_to_drop = Vec::new();

if let TaskMetaStateWriteGuard::Full(mut state) = self.state_mut() {
if state.gc.generation > generation {
return false;
}
match self.state_mut() {
TaskMetaStateWriteGuard::Full(mut state) => {
if state.gc.generation > generation {
return false;
}

match &mut state.state_type {
TaskStateType::Done { stateful, edges: _ } => {
if *stateful {
match &mut state.state_type {
TaskStateType::Done { stateful, edges: _ } => {
if *stateful {
return false;
}
}
TaskStateType::Dirty { .. } => {}
_ => {
// GC can't run in this state. We will reschedule it when the execution
// completes.
return false;
}
}
TaskStateType::Dirty { .. } => {}
_ => {
// GC can't run in this state. We will reschedule it when the execution
// completes.
return false;
}
}

// shrinking memory and dropping cells
state.output.dependent_tasks.shrink_to_fit();
state.cells.shrink_to_fit();
for cells in state.cells.values_mut() {
cells.shrink_to_fit();
for cell in cells.iter_mut() {
cells_to_drop.extend(cell.gc_content());
cell.shrink_to_fit();
// shrinking memory and dropping cells
state.aggregation_node.shrink_to_fit();
state.output.dependent_tasks.shrink_to_fit();
state.cells.shrink_to_fit();
for cells in state.cells.values_mut() {
cells.shrink_to_fit();
for cell in cells.iter_mut() {
cells_to_drop.extend(cell.gc_content());
cell.shrink_to_fit();
}
}
}
} else {
return false;
};

drop(cells_to_drop);
drop(state);

true
// Dropping cells outside of the lock
drop(cells_to_drop);

true
}
TaskMetaStateWriteGuard::Partial(mut state) => {
state.aggregation_node.shrink_to_fit();
false
}
_ => false,
}
}

pub(crate) fn gc_state(&self) -> Option<GcTaskState> {
Expand Down

0 comments on commit 0df64d5

Please sign in to comment.