Skip to content

Commit

Permalink
Merge pull request #105 from who-biz/bodysync-improvements
Browse files Browse the repository at this point in the history
Improvements to BodySync portion of Sync phases
  • Loading branch information
who-biz authored Nov 10, 2023
2 parents 58eb023 + dd07ba0 commit ebf5432
Show file tree
Hide file tree
Showing 6 changed files with 434 additions and 193 deletions.
279 changes: 214 additions & 65 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use crate::store;
use crate::txhashset;
use crate::txhashset::{PMMRHandle, TxHashSet};
use crate::types::{
BlockStatus, ChainAdapter, CommitPos, NoStatus, Options, Tip, TxHashsetWriteStatus,
BlockStatus, BlockchainCheckpoints, ChainAdapter, CommitPos, NoStatus, Options, Tip,
TxHashsetWriteStatus,
};
use crate::util::secp::pedersen::{Commitment, RangeProof};
use crate::util::RwLock;
Expand All @@ -44,7 +45,10 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

/// Orphan pool size is limited by MAX_ORPHAN_SIZE
pub const MAX_ORPHAN_SIZE: usize = 200;
pub const MAX_ORPHAN_SIZE: usize = 60;

/// How many blocks from chaintip to start looping through orphans
pub const ORPHAN_LOOP_THRESHOLD: u64 = 120;

/// When evicting, very old orphans are evicted first
const MAX_ORPHAN_AGE_SECS: u64 = 300;
Expand Down Expand Up @@ -133,6 +137,13 @@ impl OrphanBlockPool {
.map(|hs| hs.iter().filter_map(|h| orphans.remove(h)).collect())
}

pub fn clear(&self) -> bool {
self.orphans.write().clear();
self.height_idx.write().clear();
self.evicted.store(0, Ordering::Relaxed);
return self.orphans.read().is_empty() && self.height_idx.read().is_empty();
}

pub fn contains(&self, hash: &Hash) -> bool {
let orphans = self.orphans.read();
orphans.contains_key(hash)
Expand Down Expand Up @@ -275,13 +286,56 @@ impl Chain {
Ok(head)
}

/// Processes a single block, then checks for orphans, processing
/// those as well if they're found
/// Processes a single block, then checks for an orphan, regardless
/// of first block processing successfully. Ensures we check orphans
/// once, each time we receive a new block from peers. Behavior is
/// to loop through orphan list continually, if we are near chaintip
/// or if our OrphanBlockPool is nearly full.
pub fn process_block(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
let height = b.header.height;
let res = self.process_block_single(b, opts);
if res.is_ok() {
self.check_orphans(height + 1);
let block_height = b.header.height;
let orphan_height;
let loop_height = std::cmp::max(block_height, ORPHAN_LOOP_THRESHOLD);
let header_head = self.header_head()?;
let mut loop_orphans = false;
if header_head.height > ORPHAN_LOOP_THRESHOLD {
trace!(
"loop_height({}), ORPHAN_LOOP_THRESHOLD({}), subtracted({})",
loop_height,
ORPHAN_LOOP_THRESHOLD,
(loop_height - ORPHAN_LOOP_THRESHOLD)
);
if loop_height >= (header_head.height - ORPHAN_LOOP_THRESHOLD) {
trace!("threshold check conditon met!");
loop_orphans = true;
}
if self.orphans.len() >= (MAX_ORPHAN_SIZE - 10) {
trace!("orphan.len() conditon met!");
loop_orphans = true;
}
}
let mut res = self.process_block_single(b, opts);
match res {
Ok(_) => {
orphan_height = block_height + 1;
if loop_orphans {
trace!("looping through orphans!");
self.check_orphans_loop(orphan_height);
return res;
}
}
_ => {
orphan_height = self.head()?.height + 1;
}
}
let orphans = self.check_orphans(orphan_height);
if !orphans.is_empty() {
for orphan in orphans {
res = self.process_block_single(orphan.block, orphan.opts);
}
if loop_orphans && res.is_ok() {
trace!("looping through orphans!");
self.check_orphans_loop(orphan_height + 1);
}
}
res
}
Expand All @@ -307,10 +361,76 @@ impl Chain {
}
}

/// Quick check for "known" duplicate block up to and including current chain head.
/// Returns an error if this block is "known".
pub fn is_known(&self, header: &BlockHeader) -> Result<(), Error> {
let head = self.head()?;
if head.hash() == header.hash() {
return Err(ErrorKind::Unfit("duplicate block".to_owned()).into());
}
if header.total_difficulty() <= head.total_difficulty {
if self.block_exists(header.hash())? {
debug!(
"Block {} at {} is unfit at this time, duplicate block",
header.hash(),
header.height,
);
return Err(ErrorKind::Unfit("duplicate block".to_owned()).into());
}
}
Ok(())
}

// Check if the provided block is an orphan.
// If block is an orphan add it to our orphan block pool for deferred processing.
// If this is the "next" block immediately following current head then not an orphan.
// Or if we have the previous full block then not an orphan.
fn check_orphan(&self, block: &Block, opts: Options) -> Result<(), Error> {
let head = self.head()?;
let is_next = block.header.prev_hash == head.last_block_h;
if is_next || self.block_exists(block.header.prev_hash)? {
return Ok(());
}

let block_hash = block.hash();
let orphan = Orphan {
block: block.clone(),
opts,
added: Instant::now(),
};
self.orphans.add(orphan);

debug!(
"is_orphan: {:?}, # orphans {}{}",
block_hash,
self.orphans.len(),
if self.orphans.len_evicted() > 0 {
format!(", # evicted {}", self.orphans.len_evicted())
} else {
String::new()
},
);

Err(ErrorKind::Orphan.into())
}

/// Attempt to add a new block to the chain.
/// Returns true if it has been added to the longest chain
/// or false if it has added to a fork (or orphan?).
fn process_block_single(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
// Process the header first.
// If invalid then fail early.
// If valid then continue with block processing with header_head committed to db etc.

self.process_block_header(&b.header, opts)?;

// Check if we already know about this full block.
self.is_known(&b.header)?;

// Check if this block is an orphan.
// Only do this once we know the header PoW is valid.
self.check_orphan(&b, opts)?;

let (maybe_new_head, prev_head) = {
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
Expand All @@ -319,72 +439,24 @@ impl Chain {

let prev_head = ctx.batch.head()?;

let maybe_new_head = pipe::process_block(&b, &mut ctx);
let maybe_new_head = pipe::process_block(&b, &mut ctx)?;

// We have flushed txhashset extension changes to disk
// but not yet committed the batch.
// A node shutdown at this point can be catastrophic...
// We prevent this via the stop_lock (see above).
if maybe_new_head.is_ok() {
ctx.batch.commit()?;
}
ctx.batch.commit()?;

// release the lock and let the batch go before post-processing
(maybe_new_head, prev_head)
};

match maybe_new_head {
Ok(head) => {
let status = self.determine_status(head.clone(), prev_head);

// notifying other parts of the system of the update
self.adapter.block_accepted(&b, status, opts);

Ok(head)
}
Err(e) => match e.kind() {
ErrorKind::Orphan => {
let block_hash = b.hash();
let orphan = Orphan {
block: b,
opts: opts,
added: Instant::now(),
};
let status = self.determine_status(maybe_new_head.clone(), prev_head);

self.orphans.add(orphan);
// notifying other parts of the system of the update
self.adapter.block_accepted(&b, status, opts);

debug!(
"process_block: orphan: {:?}, # orphans {}{}",
block_hash,
self.orphans.len(),
if self.orphans.len_evicted() > 0 {
format!(", # evicted {}", self.orphans.len_evicted())
} else {
String::new()
},
);
Err(ErrorKind::Orphan.into())
}
ErrorKind::Unfit(ref msg) => {
debug!(
"Block {} at {} is unfit at this time: {}",
b.hash(),
b.header.height,
msg
);
Err(ErrorKind::Unfit(msg.clone()).into())
}
_ => {
info!(
"Rejected block {} at {}: {:?}",
b.hash(),
b.header.height,
e
);
Err(ErrorKind::Other(format!("{:?}", e)).into())
}
},
}
Ok(maybe_new_head)
}

/// Process a block header received during "header first" propagation.
Expand Down Expand Up @@ -453,10 +525,13 @@ impl Chain {
self.orphans.len_evicted()
}

/// Check for orphans, once a block is successfully added
fn check_orphans(&self, mut height: u64) {
/// Check for ophans, loop through OrphanBlockPool, adding each
/// orphan that we can, consecutively. Returns once we we do not
/// have the next height block in our OrphanBlockPool.
fn check_orphans_loop(&self, mut height: u64) {
let initial_height = height;

let mut loop_iters = 0;
// Is there an orphan in our orphans that we can now process?
loop {
trace!(
Expand All @@ -465,6 +540,9 @@ impl Chain {
self.orphans.len(),
);

loop_iters += 1;
debug!("check_orphans_loop, iters = {}", loop_iters);

let mut orphan_accepted = false;
let mut height_accepted = height;

Expand Down Expand Up @@ -492,7 +570,15 @@ impl Chain {
if orphan_accepted {
// We accepted a block, so see if we can accept any orphans
height = height_accepted + 1;
continue;
if loop_iters >= (MAX_ORPHAN_SIZE / 2) {
// we want to exit loop early to keep locking cheap here
// so break loop & exit func if iters exceed this
break;
} else {
// otherwise keep looping until we hit 30 or fail to find
// next block in orphan list
continue;
}
}
}
break;
Expand All @@ -508,6 +594,44 @@ impl Chain {
}
}

/// Check for orphans, once a block is successfully added
fn check_orphans(&self, height: u64) -> Vec<Orphan> {
//warn!("check_orphans called");
let mut orphans_result: Vec<Orphan> = Vec::new();
trace!(
"check_orphans: at {}, # orphans {}",
height,
self.orphans.len(),
);

if let Some(orphans) = self.orphans.remove_by_height(&height) {
let orphans_len = orphans.len();
//let mut subloop_iter = 0;
for (i, orphan) in orphans.into_iter().enumerate() {
//subloop_iter += 1;
//warn!("sub_loop iterations in check_orphans ({})", subloop_iter);
debug!(
"check_orphans: get block {} at {}{}",
orphan.block.hash(),
height,
if orphans_len > 1 {
format!(", no.{} of {} orphans", i, orphans_len)
} else {
String::new()
},
);
orphans_result.push(orphan);
}
}
orphans_result
}

/// Clear OrphanBlockPool completely, returns true if orphans list
/// and height_idx are empty after clearing, false if failed
pub fn clear_orphans(&self) -> bool {
self.orphans.clear()
}

/// For the given commitment find the unspent output and return the
/// associated Return an error if the output does not exist or has been
/// spent. This querying is done in a way that is consistent with the
Expand Down Expand Up @@ -1432,6 +1556,31 @@ impl Chain {
.block_exists(&h)
.map_err(|e| ErrorKind::StoreErr(e, "chain block exists".to_owned()).into())
}

/// Check block headers against checkpoints hash and height. Returns
/// boolean in Result with 'false' once we are out of checkpointed range.
pub fn check_header_against_checkpoints(&self, header: &BlockHeader) -> Result<bool, Error> {
let checkpoints = BlockchainCheckpoints::new().checkpoints;
let mut within_checkpointed_range = true;
if header.height > checkpoints.last().unwrap().height {
within_checkpointed_range = false;
} else {
for c in &checkpoints {
if header.height == c.height {
if header.hash() == c.block_hash {
info!("Checkpoint successfully passed at height({})! Hashes: header({:?}), checkpoint({:?})",
c.height,
header.hash(),
c.block_hash
);
} else {
return Err(ErrorKind::CheckpointFailure.into());
}
}
}
}
Ok(within_checkpointed_range)
}
}

fn setup_head(
Expand Down
Loading

0 comments on commit ebf5432

Please sign in to comment.