Skip to content

Commit ddc7c9d

Browse files
committed
Per ingredient sync table
1 parent 7fe054e commit ddc7c9d

File tree

10 files changed

+51
-137
lines changed

10 files changed

+51
-137
lines changed

src/function.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ use std::{any::Any, fmt, ptr::NonNull};
33
use crate::{
44
accumulator::accumulated_map::{AccumulatedMap, InputAccumulatedValues},
55
cycle::{CycleRecoveryAction, CycleRecoveryStrategy},
6+
function::sync::{ClaimResult, SyncTable},
67
ingredient::fmt_index,
78
key::DatabaseKeyIndex,
89
plumbing::MemoIngredientMap,
910
salsa_struct::SalsaStructInDb,
10-
table::sync::ClaimResult,
1111
table::Table,
1212
views::DatabaseDownCaster,
1313
zalsa::{IngredientIndex, MemoIngredientIndex, Zalsa},
@@ -32,6 +32,7 @@ mod lru;
3232
mod maybe_changed_after;
3333
mod memo;
3434
mod specify;
35+
mod sync;
3536

3637
pub trait Configuration: Any {
3738
const DEBUG_NAME: &'static str;
@@ -119,6 +120,8 @@ pub struct IngredientImpl<C: Configuration> {
119120
/// instances that this downcaster was derived from.
120121
view_caster: DatabaseDownCaster<C::DbView>,
121122

123+
sync_table: SyncTable,
124+
122125
/// When `fetch` and friends executes, they return a reference to the
123126
/// value stored in the memo that is extended to live as long as the `&self`
124127
/// reference we start with. This means that whenever we remove something
@@ -157,6 +160,7 @@ where
157160
lru: lru::Lru::new(lru),
158161
deleted_entries: Default::default(),
159162
view_caster,
163+
sync_table: Default::default(),
160164
}
161165
}
162166

@@ -248,12 +252,10 @@ where
248252
/// Attempts to claim `key_index`, returning `false` if a cycle occurs.
249253
fn wait_for(&self, db: &dyn Database, key_index: Id) -> bool {
250254
let zalsa = db.zalsa();
251-
match zalsa.sync_table_for(key_index).claim(
252-
db,
253-
zalsa,
254-
self.database_key_index(key_index),
255-
self.memo_ingredient_index(zalsa, key_index),
256-
) {
255+
match self
256+
.sync_table
257+
.try_claim(db, zalsa, self.database_key_index(key_index))
258+
{
257259
ClaimResult::Retry | ClaimResult::Claimed(_) => true,
258260
ClaimResult::Cycle => false,
259261
}

src/function/fetch.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use super::{memo::Memo, Configuration, IngredientImpl, VerifyResult};
2+
use crate::function::sync::ClaimResult;
23
use crate::zalsa::MemoIngredientIndex;
34
use crate::{
45
accumulator::accumulated_map::InputAccumulatedValues,
56
runtime::StampedValue,
6-
table::sync::ClaimResult,
77
zalsa::{Zalsa, ZalsaDatabase},
88
zalsa_local::QueryRevisions,
99
AsDynDatabase as _, Id,
@@ -107,12 +107,7 @@ where
107107
let database_key_index = self.database_key_index(id);
108108

109109
// Try to claim this query: if someone else has claimed it already, go back and start again.
110-
let _claim_guard = match zalsa.sync_table_for(id).claim(
111-
db,
112-
zalsa,
113-
database_key_index,
114-
memo_ingredient_index,
115-
) {
110+
let _claim_guard = match self.sync_table.try_claim(db, zalsa, database_key_index) {
116111
ClaimResult::Retry => return None,
117112
ClaimResult::Cycle => {
118113
// check if there's a provisional value for this query

src/function/maybe_changed_after.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use crate::{
22
accumulator::accumulated_map::InputAccumulatedValues,
33
cycle::{CycleHeads, CycleRecoveryStrategy},
4+
function::sync::ClaimResult,
45
key::DatabaseKeyIndex,
5-
table::sync::ClaimResult,
66
zalsa::{MemoIngredientIndex, Zalsa, ZalsaDatabase},
77
zalsa_local::{ActiveQueryGuard, QueryEdge, QueryOrigin},
88
AsDynDatabase as _, Id, Revision,
@@ -98,12 +98,7 @@ where
9898
) -> Option<VerifyResult> {
9999
let database_key_index = self.database_key_index(key_index);
100100

101-
let _claim_guard = match zalsa.sync_table_for(key_index).claim(
102-
db,
103-
zalsa,
104-
database_key_index,
105-
memo_ingredient_index,
106-
) {
101+
let _claim_guard = match self.sync_table.try_claim(db, zalsa, database_key_index) {
107102
ClaimResult::Retry => return None,
108103
ClaimResult::Cycle => match C::CYCLE_STRATEGY {
109104
CycleRecoveryStrategy::Panic => panic!(

src/table/sync.rs renamed to src/function/sync.rs

Lines changed: 35 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,26 @@
11
use std::thread::ThreadId;
22

33
use parking_lot::Mutex;
4+
use rustc_hash::FxHashMap;
45

56
use crate::{
67
key::DatabaseKeyIndex,
78
runtime::{BlockResult, WaitResult},
8-
zalsa::{MemoIngredientIndex, Zalsa},
9-
Database,
9+
zalsa::Zalsa,
10+
Database, Id,
1011
};
1112

12-
use super::util;
13-
1413
/// Tracks the keys that are currently being processed; used to coordinate between
1514
/// worker threads.
1615
#[derive(Default)]
1716
pub(crate) struct SyncTable {
18-
syncs: Mutex<Vec<Option<SyncState>>>,
17+
syncs: Mutex<FxHashMap<Id, SyncState>>,
18+
}
19+
20+
pub(crate) enum ClaimResult<'a> {
21+
Retry,
22+
Cycle,
23+
Claimed(ClaimGuard<'a>),
1924
}
2025

2126
struct SyncState {
@@ -26,44 +31,20 @@ struct SyncState {
2631
anyone_waiting: bool,
2732
}
2833

29-
pub(crate) enum ClaimResult<'a> {
30-
Retry,
31-
Cycle,
32-
Claimed(ClaimGuard<'a>),
33-
}
34-
3534
impl SyncTable {
36-
#[inline]
37-
pub(crate) fn claim<'me>(
35+
pub(crate) fn try_claim<'me>(
3836
&'me self,
3937
db: &'me (impl ?Sized + Database),
4038
zalsa: &'me Zalsa,
4139
database_key_index: DatabaseKeyIndex,
42-
memo_ingredient_index: MemoIngredientIndex,
4340
) -> ClaimResult<'me> {
44-
let mut syncs = self.syncs.lock();
45-
let thread_id = std::thread::current().id();
46-
47-
util::ensure_vec_len(&mut syncs, memo_ingredient_index.as_usize() + 1);
48-
49-
match &mut syncs[memo_ingredient_index.as_usize()] {
50-
None => {
51-
syncs[memo_ingredient_index.as_usize()] = Some(SyncState {
52-
id: thread_id,
53-
anyone_waiting: false,
54-
});
55-
ClaimResult::Claimed(ClaimGuard {
56-
database_key_index,
57-
memo_ingredient_index,
58-
zalsa,
59-
sync_table: self,
60-
_padding: false,
61-
})
62-
}
63-
Some(SyncState {
64-
id: other_id,
65-
anyone_waiting,
66-
}) => {
41+
let mut write = self.syncs.lock();
42+
match write.entry(database_key_index.key_index()) {
43+
std::collections::hash_map::Entry::Occupied(occupied_entry) => {
44+
let &mut SyncState {
45+
id,
46+
ref mut anyone_waiting,
47+
} = occupied_entry.into_mut();
6748
// NB: `Ordering::Relaxed` is sufficient here,
6849
// as there are no loads that are "gated" on this
6950
// value. Everything that is written is also protected
@@ -75,13 +56,25 @@ impl SyncTable {
7556
db.as_dyn_database(),
7657
db.zalsa_local(),
7758
database_key_index,
78-
*other_id,
79-
syncs,
59+
id,
60+
write,
8061
) {
8162
BlockResult::Completed => ClaimResult::Retry,
8263
BlockResult::Cycle => ClaimResult::Cycle,
8364
}
8465
}
66+
std::collections::hash_map::Entry::Vacant(vacant_entry) => {
67+
vacant_entry.insert(SyncState {
68+
id: std::thread::current().id(),
69+
anyone_waiting: false,
70+
});
71+
ClaimResult::Claimed(ClaimGuard {
72+
database_key_index,
73+
zalsa,
74+
sync_table: self,
75+
_padding: false,
76+
})
77+
}
8578
}
8679
}
8780
}
@@ -91,7 +84,6 @@ impl SyncTable {
9184
#[must_use]
9285
pub(crate) struct ClaimGuard<'me> {
9386
database_key_index: DatabaseKeyIndex,
94-
memo_ingredient_index: MemoIngredientIndex,
9587
zalsa: &'me Zalsa,
9688
sync_table: &'me SyncTable,
9789
// Reduce the size of ClaimResult by making more niches available in ClaimGuard; this fits into
@@ -104,7 +96,9 @@ impl ClaimGuard<'_> {
10496
let mut syncs = self.sync_table.syncs.lock();
10597

10698
let SyncState { anyone_waiting, .. } =
107-
syncs[self.memo_ingredient_index.as_usize()].take().unwrap();
99+
syncs.remove(&self.database_key_index.key_index()).unwrap();
100+
101+
drop(syncs);
108102

109103
if anyone_waiting {
110104
self.zalsa.runtime().unblock_queries_blocked_on(

src/input.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::{
1717
input::singleton::{Singleton, SingletonChoice},
1818
key::DatabaseKeyIndex,
1919
plumbing::{Jar, Stamp},
20-
table::{memo::MemoTable, sync::SyncTable, Slot, Table},
20+
table::{memo::MemoTable, Slot, Table},
2121
zalsa::{IngredientIndex, Zalsa},
2222
zalsa_local::QueryOrigin,
2323
Database, Durability, Id, Revision, Runtime,
@@ -107,7 +107,6 @@ impl<C: Configuration> IngredientImpl<C> {
107107
fields,
108108
stamps,
109109
memos: Default::default(),
110-
syncs: Default::default(),
111110
})
112111
});
113112

@@ -286,9 +285,6 @@ where
286285

287286
/// Memos
288287
memos: MemoTable,
289-
290-
/// Syncs
291-
syncs: SyncTable,
292288
}
293289

294290
impl<C> Value<C>
@@ -322,9 +318,4 @@ where
322318
fn memos_mut(&mut self) -> &mut crate::table::memo::MemoTable {
323319
&mut self.memos
324320
}
325-
326-
#[inline]
327-
unsafe fn syncs(&self, _current_revision: Revision) -> &SyncTable {
328-
&self.syncs
329-
}
330321
}

src/interned.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use crate::ingredient::fmt_index;
66
use crate::plumbing::{IngredientIndices, Jar};
77
use crate::revision::AtomicRevision;
88
use crate::table::memo::MemoTable;
9-
use crate::table::sync::SyncTable;
109
use crate::table::Slot;
1110
use crate::zalsa::{IngredientIndex, Zalsa};
1211
use crate::zalsa_local::QueryOrigin;
@@ -71,7 +70,6 @@ where
7170
{
7271
fields: C::Fields<'static>,
7372
memos: MemoTable,
74-
syncs: SyncTable,
7573

7674
/// The revision the value was first interned in.
7775
first_interned_at: Revision,
@@ -291,7 +289,6 @@ where
291289
let id = zalsa_local.allocate(table, self.ingredient_index, |id| Value::<C> {
292290
fields: unsafe { self.to_internal_data(assemble(id, key)) },
293291
memos: Default::default(),
294-
syncs: Default::default(),
295292
durability: AtomicU8::new(durability.as_u8()),
296293
// Record the revision we are interning in.
297294
first_interned_at: current_revision,
@@ -479,11 +476,6 @@ where
479476
fn memos_mut(&mut self) -> &mut MemoTable {
480477
&mut self.memos
481478
}
482-
483-
#[inline]
484-
unsafe fn syncs(&self, _current_revision: Revision) -> &crate::table::sync::SyncTable {
485-
&self.syncs
486-
}
487479
}
488480

489481
/// A trait for types that hash and compare like `O`.

src/table.rs

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,10 @@ use std::{
1212
use memo::MemoTable;
1313
use parking_lot::Mutex;
1414
use rustc_hash::FxHashMap;
15-
use sync::SyncTable;
1615

1716
use crate::{Id, IngredientIndex, Revision};
1817

1918
pub(crate) mod memo;
20-
pub(crate) mod sync;
21-
mod util;
2219

2320
const PAGE_LEN_BITS: usize = 10;
2421
const PAGE_LEN_MASK: usize = PAGE_LEN - 1;
@@ -44,13 +41,6 @@ pub(crate) trait Slot: Any + Send + Sync {
4441

4542
/// Mutably access the [`MemoTable`] for this slot.
4643
fn memos_mut(&mut self) -> &mut MemoTable;
47-
48-
/// Access the [`SyncTable`][] for this slot.
49-
///
50-
/// # Safety condition
51-
///
52-
/// The current revision MUST be the current revision of the database containing this slot.
53-
unsafe fn syncs(&self, current_revision: Revision) -> &SyncTable;
5444
}
5545

5646
/// [Slot::memos]
@@ -61,17 +51,12 @@ type SlotMemosFn<T> = unsafe fn(&T, current_revision: Revision) -> &MemoTable;
6151
type SlotMemosMutFnRaw = unsafe fn(*mut ()) -> *mut MemoTable;
6252
/// [Slot::memos_mut]
6353
type SlotMemosMutFn<T> = unsafe fn(&mut T) -> &mut MemoTable;
64-
/// [Slot::syncs]
65-
type SlotSyncsFnRaw = unsafe fn(*const (), current_revision: Revision) -> *const SyncTable;
66-
/// [Slot::syncs]
67-
type SlotSyncsFn<T> = unsafe fn(&T, current_revision: Revision) -> &SyncTable;
6854

6955
struct SlotVTable {
7056
layout: Layout,
7157
/// [`Slot`] methods
7258
memos: SlotMemosFnRaw,
7359
memos_mut: SlotMemosMutFnRaw,
74-
syncs: SlotSyncsFnRaw,
7560
/// A drop impl to call when the own page drops
7661
/// SAFETY: The caller is required to supply a correct data pointer to a `Box<PageDataEntry<T>>` and initialized length
7762
drop_impl: unsafe fn(data: *mut (), initialized: usize),
@@ -95,8 +80,6 @@ impl SlotVTable {
9580
memos_mut: unsafe {
9681
mem::transmute::<SlotMemosMutFn<T>, SlotMemosMutFnRaw>(T::memos_mut)
9782
},
98-
// SAFETY: The signatures are compatible
99-
syncs: unsafe { mem::transmute::<SlotSyncsFn<T>, SlotSyncsFnRaw>(T::syncs) },
10083
}
10184
}
10285
}
@@ -245,19 +228,6 @@ impl Table {
245228
unsafe { &mut *(page.slot_vtable.memos_mut)(page.get(slot)) }
246229
}
247230

248-
/// Get the sync table associated with `id`
249-
///
250-
/// # Safety condition
251-
///
252-
/// The parameter `current_revision` MUST be the current revision
253-
/// of the owner of database owning this table.
254-
pub(crate) unsafe fn syncs(&self, id: Id, current_revision: Revision) -> &SyncTable {
255-
let (page, slot) = split_id(id);
256-
let page = &self.pages[page.0];
257-
// SAFETY: We supply a proper slot pointer and the caller is required to pass the `current_revision`.
258-
unsafe { &*(page.slot_vtable.syncs)(page.get(slot), current_revision) }
259-
}
260-
261231
pub(crate) fn slots_of<T: Slot>(&self) -> impl Iterator<Item = &T> + '_ {
262232
self.pages
263233
.iter()

src/table/util.rs

Lines changed: 0 additions & 5 deletions
This file was deleted.

0 commit comments

Comments
 (0)