Skip to content

Commit efdbe99

Browse files
authored
Remove executed_in_epoch table. (#21477)
The table is replaced with: - An in-memory "dirty set" which holds executed but un-checkpointed transaction digests. Transactions are removed from the dirty set by CheckpointExecutor. - An additional bounded cache intended to lessen the number of db reads by CheckpointBuilder - Last-resort reads go to the `executed_transactions_to_checkpoint` table. The only purpose of the table was to allow CheckpointBuilder to prune transaction dependencies from prior epochs, and the above approach suffices while removing a surprisingly expensive source of db writes.
1 parent 8956589 commit efdbe99

File tree

6 files changed

+141
-105
lines changed

6 files changed

+141
-105
lines changed

crates/sui-core/src/authority.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3317,9 +3317,7 @@ impl AuthorityState {
33173317
);
33183318

33193319
if cfg!(debug_assertions) {
3320-
cur_epoch_store
3321-
.check_all_executed_transactions_in_checkpoint()
3322-
.expect("failed to check all executed transactions in checkpoint");
3320+
cur_epoch_store.check_all_executed_transactions_in_checkpoint();
33233321
}
33243322

33253323
if let Err(err) = self

crates/sui-core/src/authority/authority_per_epoch_store.rs

Lines changed: 48 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,8 @@ pub struct AuthorityEpochTables {
441441
transaction_cert_signatures: DBMap<TransactionDigest, AuthorityStrongQuorumSignInfo>,
442442

443443
/// Transactions that were executed in the current epoch.
444+
#[allow(dead_code)]
445+
#[deprecated]
444446
executed_in_epoch: DBMap<TransactionDigest, ()>,
445447

446448
#[allow(dead_code)]
@@ -751,15 +753,6 @@ impl AuthorityEpochTables {
751753
.safe_iter()
752754
.collect::<Result<_, _>>()?)
753755
}
754-
755-
fn get_all_user_signatures_for_checkpoints(
756-
&self,
757-
) -> SuiResult<HashMap<TransactionDigest, Vec<GenericSignature>>> {
758-
Ok(self
759-
.user_signatures_for_checkpoints
760-
.safe_iter()
761-
.collect::<Result<_, _>>()?)
762-
}
763756
}
764757

765758
pub(crate) const MUTEX_TABLE_SIZE: usize = 1024;
@@ -1402,18 +1395,15 @@ impl AuthorityPerEpochStore {
14021395
tx_digest: &TransactionDigest,
14031396
) -> SuiResult {
14041397
let tables = self.tables()?;
1405-
let mut batch = self.tables()?.executed_in_epoch.batch();
14061398

1407-
batch.insert_batch(&tables.executed_in_epoch, [(tx_digest, ())])?;
1408-
1409-
if !matches!(tx_key, TransactionKey::Digest(_)) {
1410-
batch.insert_batch(&tables.transaction_key_to_digest, [(tx_key, tx_digest)])?;
1411-
}
1412-
batch.write()?;
1399+
self.consensus_output_cache
1400+
.insert_executed_in_epoch(*tx_digest);
14131401

14141402
if !matches!(tx_key, TransactionKey::Digest(_)) {
1403+
tables.transaction_key_to_digest.insert(tx_key, tx_digest)?;
14151404
self.executed_digests_notify_read.notify(tx_key, tx_digest);
14161405
}
1406+
14171407
Ok(())
14181408
}
14191409

@@ -1430,9 +1420,10 @@ impl AuthorityPerEpochStore {
14301420
}
14311421

14321422
pub fn revert_executed_transaction(&self, tx_digest: &TransactionDigest) -> SuiResult {
1423+
self.consensus_output_cache
1424+
.remove_reverted_transaction(tx_digest);
14331425
let tables = self.tables()?;
14341426
let mut batch = tables.effects_signatures.batch();
1435-
batch.delete_batch(&tables.executed_in_epoch, [*tx_digest])?;
14361427
batch.delete_batch(&tables.effects_signatures, [*tx_digest])?;
14371428
batch.write()?;
14381429
Ok(())
@@ -1455,14 +1446,30 @@ impl AuthorityPerEpochStore {
14551446
Ok(())
14561447
}
14571448

1458-
pub fn transactions_executed_in_cur_epoch<'a>(
1449+
pub fn transactions_executed_in_cur_epoch(
14591450
&self,
1460-
digests: impl IntoIterator<Item = &'a TransactionDigest>,
1451+
digests: &[TransactionDigest],
14611452
) -> SuiResult<Vec<bool>> {
1462-
Ok(self
1463-
.tables()?
1464-
.executed_in_epoch
1465-
.multi_contains_keys(digests)?)
1453+
let tables = self.tables()?;
1454+
Ok(do_fallback_lookup(
1455+
digests,
1456+
|digest| {
1457+
if self
1458+
.consensus_output_cache
1459+
.executed_in_current_epoch(digest)
1460+
{
1461+
CacheResult::Hit(true)
1462+
} else {
1463+
CacheResult::Miss
1464+
}
1465+
},
1466+
|digests| {
1467+
tables
1468+
.executed_transactions_to_checkpoint
1469+
.multi_contains_keys(digests)
1470+
.expect("db error")
1471+
},
1472+
))
14661473
}
14671474

14681475
pub fn get_effects_signature(
@@ -1655,6 +1662,9 @@ impl AuthorityPerEpochStore {
16551662
quarantine.update_highest_executed_checkpoint(seq, self, &mut batch)?;
16561663
batch.write()?;
16571664

1665+
self.consensus_output_cache
1666+
.remove_executed_in_epoch(digests);
1667+
16581668
Ok(())
16591669
}
16601670

@@ -4415,31 +4425,23 @@ impl AuthorityPerEpochStore {
44154425
self.signature_verifier.clear_signature_cache();
44164426
}
44174427

4418-
pub(crate) fn check_all_executed_transactions_in_checkpoint(&self) -> SuiResult<()> {
4419-
let tables = self.tables().unwrap();
4420-
4421-
info!("Verifying that all executed transactions are in a checkpoint");
4422-
4423-
let mut executed_iter = tables.executed_in_epoch.safe_iter();
4424-
let mut checkpointed_iter = tables.executed_transactions_to_checkpoint.safe_iter();
4428+
pub(crate) fn check_all_executed_transactions_in_checkpoint(&self) {
4429+
let uncheckpointed_transactions = self
4430+
.consensus_output_cache
4431+
.get_uncheckpointed_transactions();
44254432

4426-
// verify that the two iterators (which are both sorted) are identical
4427-
loop {
4428-
let executed = executed_iter.next().transpose()?;
4429-
let checkpointed = checkpointed_iter.next().transpose()?;
4430-
match (executed, checkpointed) {
4431-
(Some((left, ())), Some((right, _))) => {
4432-
if left != right {
4433-
panic!("Executed transactions and checkpointed transactions do not match: {:?} {:?}", left, right);
4434-
}
4435-
}
4436-
(None, None) => break Ok(()),
4437-
(left, right) => panic!(
4438-
"Executed transactions and checkpointed transactions do not match: {:?} {:?}",
4439-
left, right
4440-
),
4441-
}
4433+
if uncheckpointed_transactions.is_empty() {
4434+
info!("Verified that all executed transactions are in a checkpoint");
4435+
return;
44424436
}
4437+
4438+
// TODO: should this be debug_fatal? Its potentially very serious in that it could
4439+
// indicate that we broke the checkpoint inclusion guarantee, but we won't be able to
4440+
// do anything about it if it happens.
4441+
fatal!(
4442+
"The following transactions were neither reverted nor checkpointed: {:?}",
4443+
uncheckpointed_transactions
4444+
);
44434445
}
44444446
}
44454447

crates/sui-core/src/authority/consensus_quarantine.rs

Lines changed: 70 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
// Copyright (c) Mysten Labs, Inc.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use std::collections::{hash_map, BTreeMap, BTreeSet, HashMap, VecDeque};
5-
64
use crate::authority::authority_per_epoch_store::{
75
AuthorityEpochTables, EncG, ExecutionIndicesWithStats, PkG,
86
};
@@ -13,8 +11,12 @@ use crate::epoch::randomness::SINGLETON_KEY;
1311
use dashmap::DashMap;
1412
use fastcrypto_tbls::{dkg_v1, nodes::PartyId};
1513
use fastcrypto_zkp::bn254::zk_login::{JwkId, JWK};
14+
use moka::policy::EvictionPolicy;
15+
use moka::sync::SegmentedCache as MokaCache;
1616
use mysten_common::fatal;
1717
use parking_lot::Mutex;
18+
use rand::seq::SliceRandom;
19+
use std::collections::{hash_map, BTreeMap, BTreeSet, HashMap, VecDeque};
1820
use sui_types::authenticator_state::ActiveJwk;
1921
use sui_types::base_types::{AuthorityName, SequenceNumber};
2022
use sui_types::crypto::RandomnessRound;
@@ -383,6 +385,9 @@ pub(crate) struct ConsensusOutputCache {
383385
pub(super) user_signatures_for_checkpoints:
384386
Mutex<HashMap<TransactionDigest, Vec<GenericSignature>>>,
385387

388+
executed_in_epoch: RwLock<DashMap<TransactionDigest, ()>>,
389+
executed_in_epoch_cache: MokaCache<TransactionDigest, ()>,
390+
386391
metrics: Arc<EpochMetrics>,
387392
}
388393

@@ -396,27 +401,30 @@ impl ConsensusOutputCache {
396401
.get_all_deferred_transactions()
397402
.expect("load deferred transactions cannot fail");
398403

399-
if !epoch_start_configuration.is_data_quarantine_active_from_beginning_of_epoch() {
400-
let shared_version_assignments =
401-
Self::get_all_shared_version_assignments(epoch_start_configuration, tables);
402-
403-
let user_signatures_for_checkpoints = tables
404-
.get_all_user_signatures_for_checkpoints()
405-
.expect("load user signatures for checkpoints cannot fail");
404+
assert!(
405+
epoch_start_configuration.is_data_quarantine_active_from_beginning_of_epoch(),
406+
"This version of sui-node can only run after data quarantining has been enabled. Please run version 1.45.0 or later to the end of the current epoch and retry"
407+
);
406408

407-
Self {
408-
shared_version_assignments: shared_version_assignments.into_iter().collect(),
409-
deferred_transactions: Mutex::new(deferred_transactions),
410-
user_signatures_for_checkpoints: Mutex::new(user_signatures_for_checkpoints),
411-
metrics,
412-
}
409+
let executed_in_epoch_cache_capacity = if cfg!(msim) {
410+
// Ensure that we test under conditions of constant, frequent,
411+
// and rare cache evictions.
412+
*[2, 100, 50000].choose(&mut rand::thread_rng()).unwrap()
413413
} else {
414-
Self {
415-
shared_version_assignments: Default::default(),
416-
deferred_transactions: Mutex::new(deferred_transactions),
417-
user_signatures_for_checkpoints: Default::default(),
418-
metrics,
419-
}
414+
50_000
415+
};
416+
417+
Self {
418+
shared_version_assignments: Default::default(),
419+
deferred_transactions: Mutex::new(deferred_transactions),
420+
user_signatures_for_checkpoints: Default::default(),
421+
executed_in_epoch: RwLock::new(DashMap::with_shard_amount(2048)),
422+
executed_in_epoch_cache: MokaCache::builder(8)
423+
// most queries should be for recent transactions
424+
.max_capacity(executed_in_epoch_cache_capacity)
425+
.eviction_policy(EvictionPolicy::lru())
426+
.build(),
427+
metrics,
420428
}
421429
}
422430

@@ -476,40 +484,49 @@ impl ConsensusOutputCache {
476484
.sub(removed_count as i64);
477485
}
478486

479-
// Used to read pre-existing shared object versions from the database after a crash.
480-
// TODO: remove this once all nodes have upgraded to data-quarantining.
481-
fn get_all_shared_version_assignments(
482-
epoch_start_configuration: &EpochStartConfiguration,
483-
tables: &AuthorityEpochTables,
484-
) -> Vec<(
485-
TransactionKey,
486-
Vec<(ConsensusObjectSequenceKey, SequenceNumber)>,
487-
)> {
488-
if epoch_start_configuration.use_version_assignment_tables_v3() {
489-
tables
490-
.assigned_shared_object_versions_v3
491-
.safe_iter()
492-
.collect::<Result<_, _>>()
493-
.expect("db error")
494-
} else {
495-
tables
496-
.assigned_shared_object_versions_v2
497-
.safe_iter()
498-
.collect::<Result<Vec<_>, _>>()
499-
.expect("db error")
500-
.into_iter()
501-
.map(|(key, value)| {
502-
(
503-
key,
504-
value
505-
.into_iter()
506-
.map(|(id, v)| ((id, SequenceNumber::UNKNOWN), v))
507-
.collect::<Vec<_>>(),
508-
)
509-
})
510-
.collect()
487+
pub fn executed_in_current_epoch(&self, digest: &TransactionDigest) -> bool {
488+
self.executed_in_epoch
489+
.read()
490+
.contains_key(digest) ||
491+
// we use get instead of contains key to mark the entry as read
492+
self.executed_in_epoch_cache.get(digest).is_some()
493+
}
494+
495+
// Called by execution
496+
pub fn insert_executed_in_epoch(&self, tx_digest: TransactionDigest) {
497+
assert!(
498+
self.executed_in_epoch
499+
.read()
500+
.insert(tx_digest, ())
501+
.is_none(),
502+
"transaction already executed"
503+
);
504+
self.executed_in_epoch_cache.insert(tx_digest, ());
505+
}
506+
507+
// CheckpointExecutor calls this (indirectly) in order to prune the in-memory cache of executed
508+
// transactions. By the time this is called, the transaction digests will have been committed to
509+
// the `executed_transactions_to_checkpoint` table.
510+
pub fn remove_executed_in_epoch(&self, tx_digests: &[TransactionDigest]) {
511+
let executed_in_epoch = self.executed_in_epoch.read();
512+
for tx_digest in tx_digests {
513+
executed_in_epoch.remove(tx_digest);
511514
}
512515
}
516+
517+
pub fn remove_reverted_transaction(&self, tx_digest: &TransactionDigest) {
518+
// reverted transactions are not guaranteed to have been executed
519+
self.executed_in_epoch.read().remove(tx_digest);
520+
}
521+
522+
/// At reconfig time, all checkpointed transactions must have been removed from self.executed_in_epoch
523+
pub fn get_uncheckpointed_transactions(&self) -> Vec<TransactionDigest> {
524+
self.executed_in_epoch
525+
.write() // exclusive lock to ensure consistent view
526+
.iter()
527+
.map(|e| *e.key())
528+
.collect()
529+
}
513530
}
514531

515532
/// ConsensusOutputQuarantine holds outputs of consensus processing in memory until the checkpoints

crates/sui-core/src/authority/epoch_start_configuration.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ pub enum EpochFlag {
6868
// This flag indicates whether data quarantining has been enabled from the
6969
// beginning of the epoch.
7070
DataQuarantineFromBeginningOfEpoch = 9,
71+
72+
// Used for `test_epoch_flag_upgrade`.
73+
#[cfg(msim)]
74+
DummyFlag = 10,
7175
}
7276

7377
impl EpochFlag {
@@ -77,6 +81,16 @@ impl EpochFlag {
7781
Self::default_flags_impl()
7882
}
7983

84+
// Return flags that are mandatory for the current version of the code. This is used
85+
// so that `test_epoch_flag_upgrade` can still work correctly even when there are no
86+
// optional flags.
87+
pub fn mandatory_flags() -> Vec<Self> {
88+
vec![
89+
EpochFlag::UseVersionAssignmentTablesV3,
90+
EpochFlag::DataQuarantineFromBeginningOfEpoch,
91+
]
92+
}
93+
8094
/// For situations in which there is no config available (e.g. setting up a downloaded snapshot).
8195
pub fn default_for_no_config() -> Vec<Self> {
8296
Self::default_flags_impl()
@@ -86,6 +100,8 @@ impl EpochFlag {
86100
vec![
87101
EpochFlag::UseVersionAssignmentTablesV3,
88102
EpochFlag::DataQuarantineFromBeginningOfEpoch,
103+
#[cfg(msim)]
104+
EpochFlag::DummyFlag,
89105
]
90106
}
91107
}
@@ -124,6 +140,10 @@ impl fmt::Display for EpochFlag {
124140
EpochFlag::DataQuarantineFromBeginningOfEpoch => {
125141
write!(f, "DataQuarantineFromBeginningOfEpoch")
126142
}
143+
#[cfg(msim)]
144+
EpochFlag::DummyFlag => {
145+
write!(f, "DummyFlag")
146+
}
127147
}
128148
}
129149
}

crates/sui-core/src/checkpoints/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1853,7 +1853,7 @@ impl CheckpointBuilder {
18531853

18541854
let existing_effects = self
18551855
.epoch_store
1856-
.transactions_executed_in_cur_epoch(effect.dependencies().iter())?;
1856+
.transactions_executed_in_cur_epoch(effect.dependencies())?;
18571857

18581858
for (dependency, effects_signature_exists) in
18591859
effect.dependencies().iter().zip(existing_effects.iter())

crates/sui-e2e-tests/tests/reconfiguration_tests.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -739,8 +739,7 @@ async fn test_epoch_flag_upgrade() {
739739
return None;
740740
}
741741

742-
// start with only UseVersionAssignmentTablesV3
743-
let flags: Vec<EpochFlag> = vec![EpochFlag::UseVersionAssignmentTablesV3];
742+
let flags: Vec<EpochFlag> = EpochFlag::mandatory_flags();
744743
Some(flags)
745744
});
746745

0 commit comments

Comments
 (0)