Skip to content

Commit 5adb31b

Browse files
authored
Merge pull request #3422 from zkxuerb/feat/banned-peers
Spam connections and forged block locators fix
2 parents c6de459 + ba70c75 commit 5adb31b

File tree

18 files changed

+303
-44
lines changed

18 files changed

+303
-44
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/bft/src/gateway.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ use futures::SinkExt;
6565
use indexmap::{IndexMap, IndexSet};
6666
use parking_lot::{Mutex, RwLock};
6767
use rand::seq::{IteratorRandom, SliceRandom};
68+
#[cfg(not(any(test, feature = "test")))]
69+
use std::net::IpAddr;
6870
use std::{collections::HashSet, future::Future, io, net::SocketAddr, sync::Arc, time::Duration};
6971
use tokio::{
7072
net::TcpStream,
@@ -89,6 +91,12 @@ const MIN_CONNECTED_VALIDATORS: usize = 175;
8991
/// The maximum number of validators to send in a validators response event.
9092
const MAX_VALIDATORS_TO_SEND: usize = 200;
9193

94+
/// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
95+
#[cfg(not(any(test, feature = "test")))]
96+
const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
97+
/// The amount of time an IP address is prohibited from connecting.
98+
const IP_BAN_TIME_IN_SECS: u64 = 300;
99+
92100
/// Part of the Gateway API that deals with networking.
93101
/// This is a separate trait to allow for easier testing/mocking.
94102
#[async_trait]
@@ -460,6 +468,18 @@ impl<N: Network> Gateway<N> {
460468
Ok(())
461469
}
462470

471+
/// Check whether the given IP address is currently banned.
472+
#[cfg(not(any(test, feature = "test")))]
473+
fn is_ip_banned(&self, ip: IpAddr) -> bool {
474+
self.tcp.banned_peers().is_ip_banned(&ip)
475+
}
476+
477+
/// Insert or update a banned IP.
478+
#[cfg(not(any(test, feature = "test")))]
479+
fn update_ip_ban(&self, ip: IpAddr) {
480+
self.tcp.banned_peers().update_ip_ban(ip);
481+
}
482+
463483
#[cfg(feature = "metrics")]
464484
fn update_metrics(&self) {
465485
metrics::gauge(metrics::bft::CONNECTED, self.connected_peers.read().len() as f64);
@@ -885,6 +905,8 @@ impl<N: Network> Gateway<N> {
885905
self.handle_unauthorized_validators();
886906
// If the number of connected validators is less than the minimum, send a `ValidatorsRequest`.
887907
self.handle_min_connected_validators();
908+
// Unban any addresses whose ban time has expired.
909+
self.handle_banned_ips();
888910
}
889911

890912
/// Logs the connected validators.
@@ -965,6 +987,11 @@ impl<N: Network> Gateway<N> {
965987
}
966988
}
967989
}
990+
991+
// Remove addresses whose ban time has expired.
992+
fn handle_banned_ips(&self) {
993+
self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS);
994+
}
968995
}
969996

970997
#[async_trait]
@@ -1125,6 +1152,26 @@ impl<N: Network> Handshake for Gateway<N> {
11251152
// Perform the handshake.
11261153
let peer_addr = connection.addr();
11271154
let peer_side = connection.side();
1155+
1156+
// Check (or impose) IP-level bans.
1157+
#[cfg(not(any(test, feature = "test")))]
1158+
if self.dev().is_none() && peer_side == ConnectionSide::Initiator {
1159+
// If the IP is already banned reject the connection.
1160+
if self.is_ip_banned(peer_addr.ip()) {
1161+
trace!("{CONTEXT} Gateway rejected a connection request from banned IP '{}'", peer_addr.ip());
1162+
return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
1163+
}
1164+
1165+
let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), CONNECTION_ATTEMPTS_SINCE_SECS);
1166+
1167+
debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
1168+
if num_attempts > MAX_CONNECTION_ATTEMPTS {
1169+
self.update_ip_ban(peer_addr.ip());
1170+
trace!("{CONTEXT} Gateway rejected a consecutive connection request from IP '{}'", peer_addr.ip());
1171+
return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
1172+
}
1173+
}
1174+
11281175
let stream = self.borrow_stream(&mut connection);
11291176

11301177
// If this is an inbound connection, we log it, but don't know the listening address yet.

node/bft/src/sync/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::{
2424
use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
2525
use snarkos_node_bft_ledger_service::LedgerService;
2626
use snarkos_node_sync::{BlockSync, BlockSyncMode, locators::BlockLocators};
27+
use snarkos_node_tcp::P2P;
2728
use snarkvm::{
2829
console::{network::Network, types::Field},
2930
ledger::{authority::Authority, block::Block, narwhal::BatchCertificate},
@@ -67,7 +68,7 @@ impl<N: Network> Sync<N> {
6768
/// Initializes a new sync instance.
6869
pub fn new(gateway: Gateway<N>, storage: Storage<N>, ledger: Arc<dyn LedgerService<N>>) -> Self {
6970
// Initialize the block sync module.
70-
let block_sync = BlockSync::new(BlockSyncMode::Gateway, ledger.clone());
71+
let block_sync = BlockSync::new(BlockSyncMode::Gateway, ledger.clone(), gateway.tcp().clone());
7172
// Return the sync instance.
7273
Self {
7374
gateway,

node/router/src/handshake.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,26 @@ impl<N: Network> Router<N> {
101101
Some(peer_addr)
102102
};
103103

104+
// Check (or impose) IP-level bans.
105+
#[cfg(not(any(test, feature = "test")))]
106+
if !self.is_dev() && peer_side == ConnectionSide::Initiator {
107+
// If the IP is already banned reject the connection.
108+
if self.is_ip_banned(peer_addr.ip()) {
109+
trace!("Rejected a connection request from banned IP '{}'", peer_addr.ip());
110+
return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
111+
}
112+
113+
let num_attempts =
114+
self.cache.insert_inbound_connection(peer_addr.ip(), Router::<N>::CONNECTION_ATTEMPTS_SINCE_SECS);
115+
116+
debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
117+
if num_attempts > Router::<N>::MAX_CONNECTION_ATTEMPTS {
118+
self.update_ip_ban(peer_addr.ip());
119+
trace!("Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
120+
return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
121+
}
122+
}
123+
104124
// Perform the handshake; we pass on a mutable reference to peer_ip in case the process is broken at any point in time.
105125
let handshake_result = if peer_side == ConnectionSide::Responder {
106126
self.handshake_inner_initiator(peer_addr, &mut peer_ip, stream, genesis_header, restrictions_id).await

node/router/src/heartbeat.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
4343
const MAXIMUM_NUMBER_OF_PEERS: usize = 21;
4444
/// The maximum number of provers to maintain connections with.
4545
const MAXIMUM_NUMBER_OF_PROVERS: usize = Self::MAXIMUM_NUMBER_OF_PEERS / 4;
46+
/// The amount of time an IP address is prohibited from connecting.
47+
const IP_BAN_TIME_IN_SECS: u64 = 300;
4648

4749
/// Handles the heartbeat request.
4850
fn heartbeat(&self) {
@@ -61,6 +63,8 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
6163
self.handle_trusted_peers();
6264
// Keep the puzzle request up to date.
6365
self.handle_puzzle_request();
66+
// Unban any addresses whose ban time has expired.
67+
self.handle_banned_ips();
6468
}
6569

6670
/// TODO (howardwu): Consider checking minimum number of validators, to exclude clients and provers.
@@ -238,6 +242,7 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
238242
for peer_ip in self.router().candidate_peers().into_iter().choose_multiple(rng, num_deficient) {
239243
self.router().connect(peer_ip);
240244
}
245+
241246
if self.router().allow_external_peers() {
242247
// Request more peers from the connected peers.
243248
for peer_ip in self.router().connected_peers().into_iter().choose_multiple(rng, 3) {
@@ -298,4 +303,9 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
298303
fn handle_puzzle_request(&self) {
299304
// No-op
300305
}
306+
307+
// Remove addresses whose ban time has expired.
308+
fn handle_banned_ips(&self) {
309+
self.tcp().banned_peers().remove_old_bans(Self::IP_BAN_TIME_IN_SECS);
310+
}
301311
}

node/router/src/lib.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,14 @@ pub use routing::*;
4141

4242
use crate::messages::NodeType;
4343
use snarkos_account::Account;
44-
use snarkos_node_tcp::{Config, Tcp, is_bogon_ip, is_unspecified_or_broadcast_ip};
44+
use snarkos_node_tcp::{Config, P2P, Tcp, is_bogon_ip, is_unspecified_or_broadcast_ip};
45+
4546
use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey};
4647

4748
use anyhow::{Result, bail};
4849
use parking_lot::{Mutex, RwLock};
50+
#[cfg(not(any(test, feature = "test")))]
51+
use std::net::IpAddr;
4952
use std::{
5053
collections::{HashMap, HashSet, hash_map::Entry},
5154
future::Future,
@@ -103,10 +106,16 @@ pub struct InnerRouter<N: Network> {
103106
}
104107

105108
impl<N: Network> Router<N> {
109+
/// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
110+
#[cfg(not(any(test, feature = "test")))]
111+
const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
106112
/// The maximum number of candidate peers permitted to be stored in the node.
107113
const MAXIMUM_CANDIDATE_PEERS: usize = 10_000;
108114
/// The maximum number of connection failures permitted by an inbound connecting peer.
109115
const MAXIMUM_CONNECTION_FAILURES: usize = 5;
116+
/// The maximum amount of connection attempts withing a 10 second threshold
117+
#[cfg(not(any(test, feature = "test")))]
118+
const MAX_CONNECTION_ATTEMPTS: usize = 10;
110119
/// The duration in seconds after which a connected peer is considered inactive or
111120
/// disconnected if no message has been received in the meantime.
112121
const RADIO_SILENCE_IN_SECS: u64 = 150; // 2.5 minutes
@@ -390,7 +399,8 @@ impl<N: Network> Router<N> {
390399

391400
/// Returns the list of candidate peers.
392401
pub fn candidate_peers(&self) -> HashSet<SocketAddr> {
393-
self.candidate_peers.read().clone()
402+
let banned_ips = self.tcp().banned_peers().get_banned_ips();
403+
self.candidate_peers.read().iter().filter(|peer| !banned_ips.contains(&peer.ip())).copied().collect()
394404
}
395405

396406
/// Returns the list of restricted peers.
@@ -439,6 +449,18 @@ impl<N: Network> Router<N> {
439449
}
440450
}
441451

452+
/// Check whether the given IP address is currently banned.
453+
#[cfg(not(any(test, feature = "test")))]
454+
fn is_ip_banned(&self, ip: IpAddr) -> bool {
455+
self.tcp.banned_peers().is_ip_banned(&ip)
456+
}
457+
458+
/// Insert or update a banned IP.
459+
#[cfg(not(any(test, feature = "test")))]
460+
fn update_ip_ban(&self, ip: IpAddr) {
461+
self.tcp.banned_peers().update_ip_ban(ip);
462+
}
463+
442464
/// Returns the list of metrics for the connected peers.
443465
pub fn connected_metrics(&self) -> Vec<(SocketAddr, NodeType)> {
444466
self.connected_peers.read().iter().map(|(ip, peer)| (*ip, peer.node_type())).collect()

node/src/client/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,6 @@ impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
106106

107107
// Initialize the ledger service.
108108
let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), shutdown.clone()));
109-
// Initialize the sync module.
110-
let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone());
111109
// Determine if the client should allow external peers.
112110
let allow_external_peers = true;
113111

@@ -123,6 +121,10 @@ impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
123121
matches!(storage_mode, StorageMode::Development(_)),
124122
)
125123
.await?;
124+
125+
// Initialize the sync module.
126+
let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone(), router.tcp().clone());
127+
126128
// Initialize the node.
127129
let mut node = Self {
128130
ledger: ledger.clone(),

node/src/prover/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,6 @@ impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
100100

101101
// Initialize the ledger service.
102102
let ledger_service = Arc::new(ProverLedgerService::new());
103-
// Initialize the sync module.
104-
let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone());
105103
// Determine if the prover should allow external peers.
106104
let allow_external_peers = true;
107105
// Determine if the prover should rotate external peers.
@@ -119,6 +117,10 @@ impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
119117
matches!(storage_mode, StorageMode::Development(_)),
120118
)
121119
.await?;
120+
121+
// Initialize the sync module.
122+
let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone(), router.tcp().clone());
123+
122124
// Compute the maximum number of puzzle instances.
123125
let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6);
124126
// Initialize the node.

node/src/validator/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,10 @@ impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
107107

108108
// Initialize the ledger service.
109109
let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), shutdown.clone()));
110-
// Initialize the sync module.
111-
let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service.clone());
112110

113111
// Initialize the consensus.
114112
let mut consensus =
115-
Consensus::new(account.clone(), ledger_service, bft_ip, trusted_validators, storage_mode.clone())?;
113+
Consensus::new(account.clone(), ledger_service.clone(), bft_ip, trusted_validators, storage_mode.clone())?;
116114
// Initialize the primary channels.
117115
let (primary_sender, primary_receiver) = init_primary_channels::<N>();
118116
// Start the consensus.
@@ -133,6 +131,9 @@ impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
133131
)
134132
.await?;
135133

134+
// Initialize the sync module.
135+
let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service, router.tcp().clone());
136+
136137
// Initialize the node.
137138
let mut node = Self {
138139
ledger: ledger.clone(),

node/sync/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ version = "=3.0.0"
6767
path = "locators"
6868
version = "=3.0.0"
6969

70+
[dependencies.snarkos-node-tcp]
71+
path = "../tcp"
72+
version = "=3.0.0"
73+
7074
[dependencies.snarkvm]
7175
workspace = true
7276

0 commit comments

Comments
 (0)