identify + Kademlia: Only Bootstrap Node Appears in DHT, No Peer Discovery Beyond Bootstrap #6089
-
Hi libp2p team 👋, I'm building a P2P network in Rust using libp2p with the following components: Gossipsub for pub-sub messaging Kademlia for peer discovery Identify for exchanging supported protocols and listen addresses Ping for keep-alives A single bootstrap node is used. Other peers are given its multiaddr on startup. here are my behaviour, network, and the gossip event handlers //behaviour.rs
use libp2p::identify;
use libp2p::identity::Keypair;
use libp2p::kad::store::MemoryStore;
use libp2p::request_response::Config;
use libp2p::swarm::StreamProtocol;
use libp2p::{gossipsub, kad, swarm::NetworkBehaviour};
use libp2p::{
relay,
request_response::{cbor, ProtocolSupport},
};
use messages::message::Message;
use messages::types::NodeResponse;
use std::time::Duration;
use tokio::io;
/// Time in seconds for caching duplicate messages in Gossipsub.
const DUPLICATE_CACHE_TIME: u64 = 10;
/// Interval in seconds for Gossipsub heartbeat.
const HEART_BEAT_INTERVAL: u64 = 5;
/// Maximum number of messages that can be sent per RPC.
const MAX_MESSAGES_PER_RPC: usize = 10000;
/// `MyBehaviour` implements custom network behaviors for Gossipsub, Kademlia, Relay, and Request-Response protocols.
///
/// # Fields
/// - `gossipsub`: Manages Gossipsub pub/sub messaging behavior.
/// - `kademlia`: Handles Kademlia DHT behavior for peer-to-peer node discovery.
/// - `relay_client`: Handles client-side relay behavior for relayed communications.
/// - `request_response_behaviour`: Manages request/response protocol using CBOR encoding between nodes.
#[derive(NetworkBehaviour)]
pub struct MyBehaviour {
pub gossipsub: gossipsub::Behaviour,
pub kademlia: kad::Behaviour<MemoryStore>,
relay_client: relay::client::Behaviour,
pub request_response_behaviour: cbor::Behaviour<Message, NodeResponse>,
pub ping: libp2p::ping::Behaviour,
pub identify: identify::Behaviour,
}
impl MyBehaviour {
/// Creates a new instance of `MyBehaviour` with custom configurations for Gossipsub and Kademlia.
///
/// # Arguments
/// - `key`: The node's `Keypair` used for generating peer identity and signing messages.
/// - `relay_behaviour`: Relay behavior for handling relayed connections.
///
/// # Returns
/// A result containing `MyBehaviour` if successful, or an error wrapped in a `Box<dyn std::error::Error>`.
///
/// # Errors
/// This function will return an error if there is an issue with Gossipsub or Kademlia configuration.
pub fn new(key: Keypair, relay_behaviour: relay::client::Behaviour) -> Result<Self, Box<dyn std::error::Error>> {
// Generate the peer ID from the provided Keypair
let peer_id = key.public().to_peer_id();
// Define a custom message ID function for Gossipsub messages using a SHA-256 hash
let message_id_fn = |message: &gossipsub::Message| {
let s = mishti_crypto::hash256(&message.data);
gossipsub::MessageId::from(s)
};
// Set up Gossipsub configuration with heartbeat interval and message validation
let gossipsub_config = gossipsub::ConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(HEART_BEAT_INTERVAL))
.validation_mode(gossipsub::ValidationMode::Strict)
.duplicate_cache_time(Duration::from_secs(DUPLICATE_CACHE_TIME))
.message_id_fn(message_id_fn)
.max_messages_per_rpc(Some(MAX_MESSAGES_PER_RPC))
.build()
.map_err(|msg| io::Error::new(io::ErrorKind::Other, msg))?;
// Create Gossipsub behavior with message authenticity signed using the provided key
let gossipsub = gossipsub::Behaviour::new(gossipsub::MessageAuthenticity::Signed(key.clone()), gossipsub_config)?;
// Set up Kademlia configuration with a query timeout of 5 minutes
let mut kad_config = kad::Config::new(StreamProtocol::new("/human-network/kad/0.1"));
kad_config.set_query_timeout(Duration::from_secs(30));
kad_config.set_replication_factor(std::num::NonZero::new(4).unwrap());
// Create an in-memory Kademlia DHT store
let store = kad::store::MemoryStore::new(peer_id);
let kademlia = kad::Behaviour::with_config(peer_id, store, kad_config);
// Initialize and return the behavior with Gossipsub, Kademlia, Relay, and Request-Response protocols
Ok(Self {
gossipsub,
kademlia,
relay_client: relay_behaviour,
request_response_behaviour: cbor::Behaviour::new([(StreamProtocol::new("/String"), ProtocolSupport::Full)], Config::default()),
ping: libp2p::ping::Behaviour::new(libp2p::ping::Config::new().with_interval(Duration::from_secs(300)).with_timeout(Duration::from_secs(20))),
identify: identify::Behaviour::new(identify::Config::new("/human-network/identify/0.1".to_string(), key.public())),
})
}
} // network.rs
use crate::behaviour::MyBehaviour;
use libp2p::gossipsub::IdentTopic;
use libp2p::identity::Keypair;
use libp2p::{noise, tcp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder};
use messages::NETWORK_TOPIC;
use std::error::Error;
use std::time::Duration;
use tracing::info;
/// The `init_swarm` function in Rust initializes a libp2p swarm with specified configurations.
///
/// Arguments:
///
/// * `keypair`: The `keypair` parameter is an optional `Keypair` that can be provided to the
/// `init_swarm` function. If a `Keypair` is provided, it will be used for identity within the libp2p
/// swarm. If no `Keypair` is provided
/// * `bootstrap_addresses`: The `bootstrap_addresses` parameter is an optional vector of tuples
/// containing the PeerId and Multiaddr of bootstrap nodes. These nodes are used to initially connect to
/// the network and discover other peers. If provided, the swarm will attempt to connect to these
/// bootstrap nodes during initialization.
/// * `port`: The `port` parameter in the `init_swarm` function is a String that represents the port
/// number on which the libp2p swarm will listen for incoming connections. It specifies the network port
/// that the swarm will use for communication with other peers on the network.
///
/// Returns:
///
/// The function `init_swarm` returns a `Result` containing a `Swarm<MyBehaviour>` or a `Box<dyn
/// Error>`.
#[tracing::instrument(skip(keypair))]
pub async fn init_swarm(keypair: Option<Keypair>, bootstrap_addresses: Option<Vec<(PeerId, Multiaddr)>>, port: String) -> Result<Swarm<MyBehaviour>, Box<dyn Error>> {
// If keypair is provided, use it for identity
let builder = if let Some(keypair) = keypair.clone() {
SwarmBuilder::with_existing_identity(keypair)
} else {
SwarmBuilder::with_new_identity()
};
let mut config = libp2p::quic::Config::new(&keypair.unwrap().clone());
config.max_idle_timeout = 300;
config.keep_alive_interval = Duration::from_millis(100);
// Create a libp2p swarm with configuration
let mut swarm = builder
.with_tokio()
.with_tcp(tcp::Config::default(), noise::Config::new, yamux::Config::default)?
.with_quic_config(|_| config)
//.with_quic()
.with_relay_client(noise::Config::new, yamux::Config::default)?
.with_behaviour(|keypair, relay_behaviour| {
if bootstrap_addresses.is_none() {
info!("Bootstrap Peer ID :{}", keypair.public().to_peer_id());
}
MyBehaviour::new(keypair.clone(), relay_behaviour).unwrap()
})?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.build();
let listen_address = format!("/ip4/0.0.0.0/udp/{}/quic-v1", port);
info!("Listen Address :{:?}", listen_address);
// Add bootstrap nodes if provided
if let Some(ref bootstrap_addresses) = bootstrap_addresses {
for (peer_id, multi_addr) in bootstrap_addresses {
swarm.behaviour_mut().kademlia.add_address(peer_id, multi_addr.clone());
swarm.dial(multi_addr.clone())?;
}
swarm.behaviour_mut().kademlia.bootstrap()?;
}
// Subscribe to the topic (can be done after swarm creation)
swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(NETWORK_TOPIC))?;
swarm.listen_on(listen_address.parse()?)?;
Ok(swarm)
}
#[tracing::instrument(skip(swarm))]
pub async fn subscribe(swarm: &mut Swarm<MyBehaviour>, topic: IdentTopic) -> Result<(), Box<dyn Error>> {
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
Ok(())
}
#[tracing::instrument(skip(swarm))]
pub async fn unsubscribe(swarm: &mut Swarm<MyBehaviour>, topic: IdentTopic) -> Result<(), Box<dyn Error>> {
swarm.behaviour_mut().gossipsub.unsubscribe(&topic)?;
Ok(())
} pub async fn start(&mut self) {
let mut discover_tick = time::interval(Duration::from_secs(PEER_DISCOVERY_WAIT_TIME));
loop {
select! {
event = self.swarm.select_next_some() => match event {
SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(Event::Message {
propagation_source: peer_id,
message_id: _,
message,
})) => {
let msg = Message::deserialize_from_bytes(&message.data);
match msg {
Ok(msg) => {
info!("Received Gossiped Message {:?} PeerId :{:?}", msg.get_type(),peer_id);
if let Err(e) = self.process_message(msg,peer_id) {
error!(
"Error processing message: {:?} from peer: {:?}, Error: {:?}",
message.data,
peer_id,
e
);
}
}
Err(e) => {
error!(
"Received Invalid Message '{:?}' from peer: {:?}, Size: {}, error: {}",
String::from_utf8_lossy(&message.data),
peer_id,
message.data.len(),
e
);
}
}
}
SwarmEvent::NewListenAddr { address, .. } => {
info!("Local node is listening on {}", address);
}
SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(Event::Subscribed { topic, peer_id })) => {
info!("Peer_ID: {:?} Topic Subscribed: {:?}", peer_id, topic);
if self.node_type == NodeType::Relay
&& !self.polled
{
info!("Triggering First Election");
cast_message!(
ActorType::ElectionEngine,
ElectionEngineMessage::FirstElection,ElectionEngineError);
self.polled = true;
}
}
SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(Event::Unsubscribed { topic, peer_id })) => {
info!("Peer_ID: {:?} Topic Unsubscribed: {:?}", peer_id, topic);
}
SwarmEvent::Behaviour(MyBehaviourEvent::RequestResponseBehaviour(
RequestResonseEvent::Message { peer,message, .. },
)) => {
match message {
request_response::Message::Request { request, channel, .. } => {
let (tx, rx) = oneshot::channel::<NodeResponse>();
if let Message::ForwardMulRequest(request_id, request_to_network) = request {
let _ = self.process_message(Message::ProcessMulRequest(request_id, request_to_network, tx),peer);
match rx.await {
Ok(resp) => {
// Sending the OPRF Response back to the relay node.
let _ = self.swarm.behaviour_mut().request_response_behaviour.send_response(channel, resp);
}
Err(e) => {
error!(
"Error occurred while sending processing OPRF request: {:?}",
e
);
}
};
}
}
request_response::Message::Response { response, .. } => {
// Processing of all forwarded responses
if let NodeResponse::ConstructedProofSecp256k1 { node_idx, peer_id, request_id, proof, method } = response {
info!(
"Proof Received for request id: {:?} from node: {}",
request_id, node_idx
);
cast_message!(
ActorType::AppStateEngine,
AppStateChangeMessage::ProcessReceivedProofSecp256k1(node_idx, peer_id, request_id, Box::new(proof), method),
AppStateEngineError
);
}else if let NodeResponse::ConstructedProofBabyJubJub { node_idx, peer_id, request_id, proof, method } = response {
info!(
"Proof Received for request id: {:?} from node: {}",
request_id, node_idx
);
cast_message!(
ActorType::AppStateEngine,
AppStateChangeMessage::ProcessReceivedProofBabyJubJub(node_idx, peer_id, request_id, Box::new(proof), method),
AppStateEngineError
);
}
else if let NodeResponse::Error { request_id,message} = response.clone() {
info!(
"Error occured from node: {} from peer :{}",message,peer
);
// cast_message!(
// ActorType::AppStateEngine,
// AppStateChangeMessage::ForwardErrorResponse(request_id,response),
// AppStateEngineError
// );
}
}
}
}
SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
tracing::info!(peer = %peer_id, ?endpoint, "Established new connection");
if let Some(messages) = self.cached_data.get(&peer_id) {
for message in messages {
let status = self.swarm.behaviour_mut().request_response_behaviour.send_request(
&peer_id,
message.clone(),
);
info!("Forwarding message to peer. Status: {:?}", status);
}
self.cached_data.remove(&peer_id);
}
},
SwarmEvent::ConnectionClosed { peer_id,cause ,..} => {
info!("Connection closed with peer: {:?} Cause {:?}", peer_id,cause);
if let Some((_, _)) = self.bootstrap_addresses.iter().find(|(id, _)| id == &peer_id) {
info!("Lost connection to bootstrap node: {} Cause {:?}", peer_id,cause);
// Adding bootstrapping just in case of safety
let status =self.swarm.behaviour_mut().kademlia.bootstrap();
info!("Status of bootstrapping :{:?}",status);
}else{
info!("Connection closed with peer: {:?} Cause {:?}", peer_id,cause);
}
}
SwarmEvent::IncomingConnectionError {
connection_id,
local_addr,
send_back_addr,
error,
..
} => {
tracing::error!(
"Incoming connection error for connection ID {:?} from local address {:?} to send-back address {:?} with error: {:?}.",
connection_id,
local_addr,
send_back_addr,
error
);
},
SwarmEvent::OutgoingConnectionError {
connection_id,
peer_id,
error,
..
} => {
tracing::error!(
"Failed to establish outgoing connection. Connection ID: {:?}, Peer ID: {:?}, Error: {:?}.",
connection_id,
peer_id,
error
);
peer_id.and_then(|id| self.cached_data.remove(&id));
if let Some(peer_id)=peer_id{
if let Some((_, _)) = self.bootstrap_addresses.iter().find(|(id, _)| id == &peer_id) {
info!("Triggering connecting to bootstrap node");
// Adding bootstrapping just in case of safety
let status =self.swarm.behaviour_mut().kademlia.bootstrap();
info!("Status of bootstrapping :{:?}",status);
}
}
},
SwarmEvent::Behaviour(MyBehaviourEvent::Ping(ping::Event { peer, result, .. }))=> match result {
Ok(duration) => {
info!("Ping successful with {peer}, duration: {:?}", duration);
if let Some(tx)=self.ping_sender.get(&peer){
if let Err(e)=tx.send(Response::new(None, true, format!("Ping successful with {peer}, duration: {:?}", duration))).await{
error!("Error sending ping response back to the receiver :{}",e.to_string());
};
self.ping_sender.remove(&peer);
}
}
Err(e) => {
error!("Ping failed with {peer}: {:?}", e);
if let Some(tx)=self.ping_sender.get(&peer){
if let Err(e)=tx.send(Response::new(None, false, format!("Ping failed with {peer}: {:?}", e))).await{
error!("Error sending ping response back to the receiver :{}",e.to_string());
};
self.ping_sender.remove(&peer);
}
}
},
SwarmEvent::Behaviour(MyBehaviourEvent::Identify(event)) => {
match event {
identify::Event::Received { peer_id, info, .. } => {
info!(
"Received identify info from peer: {:?}, agent: {}, protocol: {:?}, listen_addrs: {:?}",
peer_id, info.agent_version, info.protocol_version, info.listen_addrs
);
if info.listen_addrs.is_empty() {
warn!("No listen addresses found in identify info for peer: {:?}", peer_id);
} else {
for multiaddr in &info.listen_addrs {
// Add to Kademlia
let agent_routing = self.swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr.clone());
match agent_routing {
RoutingUpdate::Failed => error!("IdentifyReceived: Failed to register address to Kademlia"),
RoutingUpdate::Pending => warn!("IdentifyReceived: Register address pending"),
RoutingUpdate::Success => {
info!("Added peer {:?} with address {:?} to Kademlia DHT", peer_id, multiaddr);
}
}
// Dial the peer if not already connected
if !self.swarm.is_connected(&peer_id) {
match self.swarm.dial(multiaddr.clone()) {
Ok(_) => info!("Dialing peer {:?} at {:?}", peer_id, multiaddr),
Err(e) => warn!("Failed to dial peer {:?} at {:?}: {:?}", peer_id, multiaddr, e),
}
} else {
debug!("Peer {:?} already connected, skipping dial", peer_id);
}
}
}
debug!(
"Peer {:?} supports protocols: {:?} listen_addrs: {:?}",
peer_id, info.protocols, info.listen_addrs
);
}
identify::Event::Sent { peer_id, .. } => {
debug!("Sent identify info to peer: {:?}", peer_id);
}
identify::Event::Pushed { peer_id, .. } => {
debug!("Pushed identify info to peer: {:?}", peer_id);
}
identify::Event::Error { peer_id, error, .. } => {
error!("Identify protocol error with peer {:?}: {:?}", peer_id, error);
}
}
},
SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(event)) => {
match event {
libp2p::kad::Event::OutboundQueryProgressed { result, .. } => {
if let QueryResult::GetClosestPeers(Ok(result)) = result {
info!("DHT discovered peers: {:?}", result.peers);
}
}
_ => {
debug!("received Kademlia event: {:?}", event);
}
}
},
SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
if &peer_id == self.swarm.local_peer_id() {
debug!(%peer_id, %address, "ignoring our own external address");
} else {
debug!(%peer_id, %address, "new external address of peer");
let result = self.swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, address.clone());
match result {
RoutingUpdate::Success => {
debug!(%peer_id, %address, "added peer address to kademlia");
}
RoutingUpdate::Failed => {
warn!(%peer_id, %address, "failed to add peer address to kademlia");
}
RoutingUpdate::Pending => {
debug!(%peer_id, %address, "request to add peer address to kademlia is pending");
}
}
}
}
_ => {
info!("Received event: {:?}", event);
}
}, Each peer runs a small RPC service exposing a method to return its current connected_peers() list. Here’s what I observe: ✅ The bootstrap node lists all peers as connected. ❌ Each non-bootstrap node only lists the bootstrap node as connected. So the network forms a star topology, with no lateral peer connections. All peers use Gossipsub and are subscribed to the same topic. Messages sometimes get dropped or don't propagate reliably — which I believe is due to this connectivity pattern. I'll appreciate your help on this 🙏 |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 4 replies
-
Hi! Also, you have set quite a low idle timeout for QUIC (300ms; recommended in RFC9308 is 30s). I could imagine that this also causes some of the issues that you are observing around messages being dropped. |
Beta Was this translation helpful? Give feedback.
@elenaf9 solved the above by enabling server mode, otherwise everything was well set.
Would it be possible to make it more explicit in the documentation that client mode is enabled by default, which can prevent peer discovery from working as expected?
I wasn’t initially aware of this default behavior and ended up spending quite a bit of time debugging until I stumbled upon this detail here 92c8cc4 . Highlighting it more clearly could really help others avoid the same confusion. Thanks!