Skip to content

Commit

Permalink
Merge pull request #3421 from ProvableHQ/process_mempool_periodically
Browse files Browse the repository at this point in the history
Periodically process the unconfirmed transmissions in the memory pool
  • Loading branch information
zosorock authored Nov 13, 2024
2 parents aab8a83 + 58c33ec commit 01b5d0f
Showing 1 changed file with 33 additions and 3 deletions.
36 changes: 33 additions & 3 deletions node/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ extern crate tracing;
use snarkos_account::Account;
use snarkos_node_bft::{
BFT,
MAX_BATCH_DELAY_IN_MS,
Primary,
helpers::{
ConsensusReceiver,
Expand Down Expand Up @@ -49,7 +50,7 @@ use colored::Colorize;
use indexmap::IndexMap;
use lru::LruCache;
use parking_lot::Mutex;
use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc};
use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration};
use tokio::{
sync::{OnceCell, oneshot},
task::JoinHandle,
Expand Down Expand Up @@ -296,7 +297,7 @@ impl<N: Network> Consensus<N> {
.lock()
.insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
}
// Process the unconfirmed solution.
// Queue the unconfirmed solution.
{
let solution_id = solution.id();

Expand All @@ -316,6 +317,12 @@ impl<N: Network> Consensus<N> {
}
}

// Try to process the unconfirmed solutions in the memory pool.
self.process_unconfirmed_solutions().await
}

/// Processes unconfirmed transactions in the memory pool.
pub async fn process_unconfirmed_solutions(&self) -> Result<()> {
// If the memory pool of this node is full, return early.
let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
Expand Down Expand Up @@ -365,7 +372,7 @@ impl<N: Network> Consensus<N> {
.lock()
.insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
}
// Process the unconfirmed transaction.
// Queue the unconfirmed transaction.
{
let transaction_id = transaction.id();

Expand All @@ -391,8 +398,14 @@ impl<N: Network> Consensus<N> {
} else if self.transactions_queue.lock().executions.put(transaction_id, transaction).is_some() {
bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
}

// Try to process the unconfirmed transactions in the memory pool.
self.process_unconfirmed_transactions().await
}
}

/// Processes unconfirmed transactions in the memory pool.
pub async fn process_unconfirmed_transactions(&self) -> Result<()> {
// If the memory pool of this node is full, return early.
let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
Expand Down Expand Up @@ -455,6 +468,23 @@ impl<N: Network> Consensus<N> {
self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
}
});

// Process the unconfirmed transactions in the memory pool.
let self_ = self.clone();
self.spawn(async move {
loop {
// Sleep briefly.
tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
// Process the unconfirmed transactions in the memory pool.
if let Err(e) = self_.process_unconfirmed_transactions().await {
warn!("Cannot process unconfirmed transactions - {e}");
}
// Process the unconfirmed solutions in the memory pool.
if let Err(e) = self_.process_unconfirmed_solutions().await {
warn!("Cannot process unconfirmed solutions - {e}");
}
}
});
}

/// Processes the committed subdag and transmissions from the BFT.
Expand Down

0 comments on commit 01b5d0f

Please sign in to comment.