Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion consensus/src/marshal/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ impl<B: Block, E: Rng + Spawner + Metrics + Clock + GClock + Storage, V: Variant
/// Start the actor.
pub fn start<R, P>(
mut self,
sync_height: u64,
application: impl Reporter<Activity = B>,
buffer: buffered::Mailbox<P, B>,
resolver: (mpsc::Receiver<handler::Message<B>>, R),
Expand All @@ -241,12 +242,13 @@ impl<B: Block, E: Rng + Spawner + Metrics + Clock + GClock + Storage, V: Variant
R: Resolver<Key = handler::Request<B>>,
P: PublicKey,
{
self.context.spawn_ref()(self.run(application, buffer, resolver))
self.context.spawn_ref()(self.run(sync_height, application, buffer, resolver))
}

/// Run the application actor.
async fn run<R, P>(
mut self,
sync_height: u64,
application: impl Reporter<Activity = B>,
mut buffer: buffered::Mailbox<P, B>,
(mut resolver_rx, mut resolver): (mpsc::Receiver<handler::Message<B>>, R),
Expand All @@ -264,6 +266,7 @@ impl<B: Block, E: Rng + Spawner + Metrics + Clock + GClock + Storage, V: Variant
application,
orchestrator,
notifier_rx,
sync_height,
)
.await;
self.context
Expand Down Expand Up @@ -428,6 +431,12 @@ impl<B: Block, E: Rng + Spawner + Metrics + Clock + GClock + Storage, V: Variant
}
}
Orchestration::Repair { height } => {
// While this should never happen, if the height is less than the sync
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to be sure we cancel all resolution requests if we allow the sync target to update dynamically (or track height + digest).

// height, then we don't need to repair.
if height < sync_height {
continue;
}

// Find the end of the "gap" of missing blocks, starting at `height`
let (_, Some(gap_end)) = self.finalized_blocks.next_gap(height) else {
// No gap found; height-1 is the last known finalized block
Expand Down
18 changes: 15 additions & 3 deletions consensus/src/marshal/finalizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ pub struct Finalizer<B: Block, R: Spawner + Clock + Metrics + Storage, Z: Report

// Metadata store that stores the last indexed height.
metadata: Metadata<R, FixedBytes<1>, u64>,

// The lowest height from which to begin syncing if no metadata exists.
sync_height: u64,
}

impl<B: Block, R: Spawner + Clock + Metrics + Storage, Z: Reporter<Activity = B>>
Expand All @@ -37,6 +40,7 @@ impl<B: Block, R: Spawner + Clock + Metrics + Storage, Z: Reporter<Activity = B>
application: Z,
orchestrator: Orchestrator<B>,
notifier_rx: mpsc::Receiver<()>,
sync_height: u64,
) -> Self {
// Initialize metadata
let metadata = Metadata::init(
Expand All @@ -54,14 +58,22 @@ impl<B: Block, R: Spawner + Clock + Metrics + Storage, Z: Reporter<Activity = B>
orchestrator,
notifier_rx,
metadata,
sync_height,
}
}

/// Run the finalizer, which continuously fetches and processes finalized blocks.
pub async fn run(mut self) {
// Initialize last indexed from metadata store.
// If the key does not exist, we assume the genesis block (height 0) has been indexed.
let mut latest = *self.metadata.get(&LATEST_KEY).unwrap_or(&0);
// Ensure metadata (and `latest`) stores the max of the existing value and `sync_height`.
let current = self.metadata.get(&LATEST_KEY).copied();
let desired = current.map_or(self.sync_height, |h| h.max(self.sync_height));
if current != Some(desired) {
if let Err(e) = self.metadata.put_sync(LATEST_KEY.clone(), desired).await {
error!("failed to update metadata: {e}");
return;
}
}
let mut latest = desired;

// The main loop to process finalized blocks. This loop will hot-spin until a block is
// available, at which point it will process it and continue. If a block is not available,
Expand Down
11 changes: 5 additions & 6 deletions consensus/src/marshal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,15 @@
//! The actor uses a combination of prunable and immutable storage to store blocks and
//! finalizations. Prunable storage is used to store data that is only needed for a short
//! period of time, such as unverified blocks or notarizations. Immutable storage is used to
//! store data that needs to be persisted indefinitely, such as finalized blocks. This allows
//! the actor to keep its storage footprint small while still providing a full history of the
//! chain.
//! store data that needs to be persisted indefinitely, such as finalized blocks.
//!
//! Marshal will store all blocks from a configurable starting height onward. This allows for
//! state sync from a specific height rather than from genesis.
//!
//! ## Limitations and Future Work
//!
//! - Only works with [crate::threshold_simplex] rather than general consensus.
//! - Assumes at-most one notarization per view, incompatible with some consensus protocols.
//! - No state sync supported. Will attempt to sync every block in the history of the chain.
//! - Stores the entire history of the chain, which requires indefinite amounts of disk space.
//! - Uses [`broadcast::buffered`](`commonware_broadcast::buffered`) for broadcasting and receiving
//! uncertified blocks from the network.

Expand Down Expand Up @@ -204,7 +203,7 @@ mod tests {
let application = Application::<B>::default();

// Start the application
actor.start(application.clone(), buffer, resolver);
actor.start(0, application.clone(), buffer, resolver);

(application, mailbox)
}
Expand Down
Loading