Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
sync_peer tx request selection rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
vorot93 committed May 23, 2020
1 parent 8f278d8 commit edb713a
Showing 1 changed file with 27 additions and 15 deletions.
42 changes: 27 additions & 15 deletions ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ pub struct ChainSync {
sync_start_time: Option<Instant>,
/// Transactions propagation statistics
transactions_stats: TransactionsStats,
/// Unfetched transactions
/// Transactions whose hash has been announced, but that we have not fetched
unfetched_pooled_transactions: H256FastMap<UnfetchedTransaction>,
/// Enable ancient block downloading
download_old_blocks: bool,
Expand Down Expand Up @@ -1126,33 +1126,45 @@ impl ChainSync {
}
}

// get some peers to give us transaction pool
// get the peer to give us at least some of announced but unfetched transactions
if !self.unfetched_pooled_transactions.is_empty() {
if let Some(s) = &mut self.peers.get_mut(&peer_id).unwrap().asking_pooled_transactions {
if let Some(s) = &mut self.peers.get_mut(&peer_id).expect("this is always an active peer; qed").asking_pooled_transactions {
let now = Instant::now();

let mut new_asking_pooled_transactions = s.iter().copied().collect::<HashSet<_>>();
let mut new_unfetched_pooled_transactions = self.unfetched_pooled_transactions.clone();
while new_asking_pooled_transactions.len() <= 256 {
for (hash, mut item) in self.unfetched_pooled_transactions.drain() {
if item.next_fetch < now {
new_asking_pooled_transactions.insert(hash);
item.tries += 1;
if item.tries < 5 {
item.next_fetch = now + (POOLED_TRANSACTIONS_TIMEOUT / 2);
new_unfetched_pooled_transactions.insert(hash, item);
}
let mut remaining_unfetched_pooled_transactions = self.unfetched_pooled_transactions.clone();
for (hash, mut item) in self.unfetched_pooled_transactions.drain() {
if new_asking_pooled_transactions.len() >= 256 {
// can't request any more transactions
break;
}

// if enough time has passed since last attempt...
if item.next_fetch < now {
// ...queue this hash for requesting
new_asking_pooled_transactions.insert(hash);
item.tries += 1;

// if we just started asking for it, queue it to be asked later on again
if item.tries < 5 {
item.next_fetch = now + (POOLED_TRANSACTIONS_TIMEOUT / 2);
remaining_unfetched_pooled_transactions.insert(hash, item);
} else {
// ...otherwise we assume this transaction does not exist and remove its hash from request queue
remaining_unfetched_pooled_transactions.remove(&hash);
}
}
}

let new_asking_pooled_transactions = new_asking_pooled_transactions.into_iter().collect::<Vec<_>>();
SyncRequester::request_pooled_transactions(self, io, peer_id, &new_asking_pooled_transactions);

self.peers.get_mut(&peer_id).unwrap().asking_pooled_transactions = Some(new_asking_pooled_transactions);
self.unfetched_pooled_transactions = new_unfetched_pooled_transactions;
self.peers.get_mut(&peer_id).expect("this is always an active peer; qed").asking_pooled_transactions = Some(new_asking_pooled_transactions);
self.unfetched_pooled_transactions = remaining_unfetched_pooled_transactions;

return;
} else {
trace!(target: "sync", "Skipping transaction fetch for peer {} as they don't support eth/65", peer_id);
}
}

Expand Down

0 comments on commit edb713a

Please sign in to comment.