Skip to content

Commit

Permalink
Update libp2p
Browse files Browse the repository at this point in the history
  • Loading branch information
Mubelotix committed May 31, 2024
1 parent a50f68b commit 4952e1a
Show file tree
Hide file tree
Showing 18 changed files with 382 additions and 597 deletions.
850 changes: 322 additions & 528 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion census/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
futures = "0.3"
rand = "0.8"
libp2p = {version="0.51", default-features=false, features=["ed25519", "rsa", "ecdsa", "secp256k1"] }
libp2p-identity = {version="0.2", features=["peerid", "ed25519", "rsa", "ecdsa", "secp256k1"]}
libipld = "0.16"
sha2 = "0.10"
sha2-derive = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion census/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub(crate) use serde::{Serialize, Deserialize};
pub(crate) use actix_web::{get, post, web, App, HttpResponse, HttpServer, Responder, HttpRequest};
pub(crate) use rand::seq::IteratorRandom;
pub(crate) use sha2_derive::Hashable;
pub(crate) use libp2p::core::identity::PublicKey;
pub(crate) use libp2p_identity::PublicKey;
pub(crate) use futures::future::select;

pub(crate) fn now_ts() -> u64 { SystemTime::now().duration_since(UNIX_EPOCH).expect("System time incorrect").as_secs() }
9 changes: 6 additions & 3 deletions daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ version = "0.1.0"
edition = "2021"

[dependencies]
libp2p = {version="0.52", features=["tcp", "tokio", "noise", "yamux", "macros"]}
libp2p = {version="0.53", features=["tcp", "tokio", "noise", "yamux", "macros"]}
libp2p-identity = "0.2"
libp2p-identify = "0.43"
libp2p-identify = "0.44"
libp2p-tls = "0.4"
libp2p-noise = "0.44"
libp2p-yamux = "0.45"
kamilata = {path = "../kamilata"}
discovery-protocol = {path = "../discovery-protocol"}
libipld = "0.16"
kamilata = "0.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.29", features = ["full"] }
Expand Down
1 change: 0 additions & 1 deletion daemon/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ mod database;

use crate::prelude::*;


#[tokio::main]
async fn main() {
env_logger::init();
Expand Down
25 changes: 13 additions & 12 deletions daemon/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,18 @@ impl Node {
identify,
discovery,
};

let tcp_transport = tcp::tokio::Transport::new(tcp::Config::new());

let transport = tcp_transport
.upgrade(upgrade::Version::V1Lazy)
.authenticate(
noise::Config::new(&keypair).expect("Signing libp2p-noise static DH keypair failed."),
let mut swarm = SwarmBuilder::with_existing_identity(keypair.clone())
.with_tokio()
.with_tcp(
Default::default(),
(libp2p_tls::Config::new, libp2p_noise::Config::new),
libp2p_yamux::Config::default,
)
.multiplex(YamuxConfig::default())
.boxed();

let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, peer_id).build();
.expect("Failed to build swarm with transport")
.with_behaviour(|_| behaviour)
.expect("Failed to build swarm with behaviour")
.build();
for listen_addr in &config.listen_addrs {
let Ok(parsed_addr) = listen_addr.parse::<Multiaddr>() else {
error!("Invalid address: {listen_addr}");
Expand Down Expand Up @@ -165,7 +165,7 @@ impl Node {
self.sw.on_identify(&peer_id, info).await;
},
IdentifyEvent::Sent { peer_id } => trace!("Sent identify info to {peer_id}"),
IdentifyEvent::Pushed { peer_id } => trace!("Pushed identify info to {peer_id}"),
IdentifyEvent::Pushed { peer_id, info } => trace!("Pushed identify info {info:?} to {peer_id}"),
IdentifyEvent::Error { peer_id, error } => debug!("Identify error with {peer_id}: {error}"),
},
// Kamilata events
Expand Down Expand Up @@ -218,6 +218,7 @@ impl Node {
SwarmEvent::Dialing { peer_id, connection_id } => debug!("Dialing {peer_id:?} ({connection_id:?})"),
SwarmEvent::IncomingConnection { connection_id, local_addr, send_back_addr } => trace!("Incoming connection from {send_back_addr} (local addr: {local_addr}, connection id: {connection_id:?})"),
SwarmEvent::IncomingConnectionError { connection_id, local_addr, send_back_addr, error } => trace!("Incoming connection error from {send_back_addr} (local addr: {local_addr}, connection id: {connection_id:?}, error: {error})"),
other => trace!("Unknown swarm event: {other:?}"),
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion daemon/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub use tokio::{
net::TcpStream as TokioTcpStream
};
pub use libp2p::{
swarm::{dial_opts::DialOpts, Swarm, SwarmBuilder, SwarmEvent, NetworkBehaviour},
swarm::{dial_opts::DialOpts, Swarm, SwarmEvent, NetworkBehaviour}, SwarmBuilder,
core::upgrade, PeerId, Multiaddr, multiaddr::Protocol, tcp, Transport, yamux::Config as YamuxConfig, noise
};
pub use libp2p_identity::Keypair;
Expand Down
4 changes: 2 additions & 2 deletions discovery-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ edition = "2021"

[dependencies]
tokio = "1.29"
libp2p = "0.52"
libp2p-identify = "0.43"
libp2p = "0.53"
libp2p-identify = "0.44"
serde = "1.0"
serde_json = "1.0"
futures = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions discovery-protocol/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl NetworkBehaviour for Behaviour {
type ConnectionHandler = Handler;
type ToSwarm = Event;

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
FromSwarm::ConnectionEstablished(info) => {
let db = Arc::clone(&self.db);
Expand Down Expand Up @@ -156,7 +156,7 @@ impl NetworkBehaviour for Behaviour {
Ok(Handler::new(remote_peer_id, Arc::clone(&self.config), Arc::clone(&self.db)))
}

fn poll(&mut self, _cx: &mut Context<'_>, _params: &mut impl PollParameters) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
fn poll(&mut self, _cx: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if let Some((peer_id, event)) = self.events_to_dispatch.pop() {
return Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::Any, event });
}
Expand Down
25 changes: 7 additions & 18 deletions discovery-protocol/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,6 @@ pub enum BehaviorToHandlerEvent {
#[derive(Debug)]
pub enum HandlerToBehaviorEvent {}

#[derive(Debug)]
pub enum HandlerError {}

impl std::fmt::Display for HandlerError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "DiscoveryHandlerError")
}
}

impl std::error::Error for HandlerError {}

pub struct Handler {
remote_peer_id: PeerId,
config: Arc<Config>,
Expand Down Expand Up @@ -49,7 +38,7 @@ impl Handler {
impl ConnectionHandler for Handler {
type FromBehaviour = BehaviorToHandlerEvent;
type ToBehaviour = HandlerToBehaviorEvent;
type Error = HandlerError;
//type Error = HandlerError;
type InboundProtocol = ArcConfig;
type OutboundProtocol = ArcConfig;
type InboundOpenInfo = ();
Expand All @@ -59,8 +48,8 @@ impl ConnectionHandler for Handler {
SubstreamProtocol::new((&self.config).into(), ())
}

fn connection_keep_alive(&self) -> KeepAlive {
KeepAlive::Yes
fn connection_keep_alive(&self) -> bool {
true
}

fn on_behaviour_event(&mut self, event: BehaviorToHandlerEvent) {
Expand Down Expand Up @@ -92,11 +81,11 @@ impl ConnectionHandler for Handler {
let e = e.error;
error!("ListenUpgradeError: {e:?}");
},
ConnectionEvent::AddressChange(_) | ConnectionEvent::LocalProtocolsChange(_) | ConnectionEvent::RemoteProtocolsChange(_) => (),
_ => (),
}
}

fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ConnectionHandlerEvent<ArcConfig, (Request, RequestReplier), HandlerToBehaviorEvent, HandlerError>> {
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ConnectionHandlerEvent<ArcConfig, (Request, RequestReplier), HandlerToBehaviorEvent>> {
if let Some(pending_info) = self.pending_requests.pop() {
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new((&self.config).into(), pending_info),
Expand All @@ -107,7 +96,7 @@ impl ConnectionHandler for Handler {
if let Some(server_task) = self.server_tasks.first_mut() {
match server_task.as_mut().poll(cx) {
Poll::Ready(result) => {
self.server_tasks.remove(0);
drop(self.server_tasks.remove(0));
debug!("Server task finished: {result:?}");
},
Poll::Pending => (),
Expand All @@ -118,7 +107,7 @@ impl ConnectionHandler for Handler {
if let Some(client_task) = self.client_tasks.first_mut() {
match client_task.as_mut().poll(cx) {
Poll::Ready(()) => {
self.client_tasks.remove(0);
drop(self.client_tasks.remove(0));
debug!("Client task finished");
},
Poll::Pending => (),
Expand Down
3 changes: 2 additions & 1 deletion discovery-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub(crate) use futures::{
pub(crate) use libp2p::{
swarm::{
handler::ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionId,
FromSwarm, KeepAlive, NetworkBehaviour, PollParameters, SubstreamProtocol, THandlerInEvent,
FromSwarm, NetworkBehaviour, SubstreamProtocol, THandlerInEvent,
THandlerOutEvent, ToSwarm, NotifyHandler, Stream,
},
core::UpgradeInfo, InboundUpgrade, Multiaddr, OutboundUpgrade, PeerId,
Expand All @@ -20,6 +20,7 @@ pub(crate) use std::{
sync::Arc,
task::{Context, Poll},
io::Error as IoError,
mem::drop
};
pub(crate) use tokio::sync::{RwLock, oneshot::{Sender as OneshotSender, channel as oneshot_channel}};

Expand Down
6 changes: 3 additions & 3 deletions kamilata/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ categories = ["network-programming"]
keywords = ["p2p", "search-engine", "distributed-systems", "libp2p"]

[dependencies]
libp2p = {version="0.52", features=["tokio", "yamux", "noise"]}
libp2p = {version="0.53", features=["tokio", "yamux", "noise"]}
tokio = {version="1.29", features=["macros", "sync", "time"]}
futures = "0.3"
protocol = "3.4"
protocol-derive = "3.4"
unsigned-varint = {version="0.7", features = ["codec", "futures", "asynchronous_codec"]}
asynchronous-codec = "0.6"
unsigned-varint = {version="0.8", features = ["codec", "futures", "asynchronous_codec"]}
asynchronous-codec = "0.7"
log = "0.4"
either = "1.8"
async-trait = "0.1"
Expand Down
5 changes: 2 additions & 3 deletions kamilata/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl<const N: usize, S: Store<N>> NetworkBehaviour for KamilataBehaviour<N, S> {
type ConnectionHandler = KamilataHandler<N, S>;
type ToSwarm = KamilataEvent;

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
FromSwarm::ConnectionEstablished(info) => {
self.connections.entry(info.peer_id).and_modify(|count| *count += 1).or_insert(1);
Expand Down Expand Up @@ -257,8 +257,7 @@ impl<const N: usize, S: Store<N>> NetworkBehaviour for KamilataBehaviour<N, S> {

fn poll(
&mut self,
cx: &mut Context<'_>,
_params: &mut impl PollParameters,
cx: &mut Context<'_>
) -> Poll<ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
// Message handlers first
if let Some((peer_id, event)) = self.handler_event_queue.pop() {
Expand Down
15 changes: 6 additions & 9 deletions kamilata/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub struct KamilataHandler<const N: usize, S: Store<N>> {
remote_peer_id: PeerId,
db: Arc<Db<N, S>>,
config: Arc<KamilataConfig>,
keep_alive: bool,

rt_handle: tokio::runtime::Handle,

Expand All @@ -67,6 +68,7 @@ impl<const N: usize, S: Store<N>> KamilataHandler<N, S> {
remote_peer_id,
db,
config,
keep_alive: true,
rt_handle: tokio::runtime::Handle::current(),
task_counter: Counter::new(3),
tasks: HashMap::new(),
Expand All @@ -78,7 +80,6 @@ impl<const N: usize, S: Store<N>> KamilataHandler<N, S> {
impl<const N: usize, S: Store<N>> ConnectionHandler for KamilataHandler<N, S> {
type FromBehaviour = BehaviorToHandlerEvent<N, S>;
type ToBehaviour = HandlerToBehaviorEvent;
type Error = ioError;
type InboundProtocol = Either<ArcConfig, DeniedUpgrade>;
type OutboundProtocol = ArcConfig;
type InboundOpenInfo = ();
Expand Down Expand Up @@ -130,11 +131,10 @@ impl<const N: usize, S: Store<N>> ConnectionHandler for KamilataHandler<N, S> {
};
}

fn connection_keep_alive(&self) -> KeepAlive {
KeepAlive::Yes
fn connection_keep_alive(&self) -> bool {
self.keep_alive
}

#[warn(implied_bounds_entailment)]
fn on_connection_event(
&mut self,
event: ConnectionEvent<
Expand Down Expand Up @@ -173,7 +173,7 @@ impl<const N: usize, S: Store<N>> ConnectionHandler for KamilataHandler<N, S> {
let error = i.error;
warn!("{} Failed to establish outbound channel with {}: {error:?}. A {} task has been discarded.", self.our_peer_id, self.remote_peer_id, pending_task.name);
},
ConnectionEvent::ListenUpgradeError(_) | ConnectionEvent::AddressChange(_) | ConnectionEvent::LocalProtocolsChange(_) | ConnectionEvent::RemoteProtocolsChange(_) => (),
_ => (),
}
}

Expand All @@ -185,7 +185,6 @@ impl<const N: usize, S: Store<N>> ConnectionHandler for KamilataHandler<N, S> {
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
> {
// It seems this method gets called in a context where the tokio runtime does not exist.
Expand Down Expand Up @@ -241,9 +240,7 @@ impl<const N: usize, S: Store<N>> ConnectionHandler for KamilataHandler<N, S> {
HandlerTaskOutput::Disconnect(disconnect_packet) => {
debug!("{} Disconnected peer {}", self.our_peer_id, self.remote_peer_id);
// TODO: send packet
return Poll::Ready(ConnectionHandlerEvent::Close(
ioError::new(std::io::ErrorKind::Other, disconnect_packet.reason), // TODO error handling
));
self.keep_alive = false;
},
HandlerTaskOutput::None | HandlerTaskOutput::Many(_) => unreachable!(),
}
Expand Down
12 changes: 4 additions & 8 deletions kamilata/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,13 @@ pub(crate) use crate::{
behaviour::*, control::*, counter::*, db::*, handler::*, handler_proto::*, packets::*, tasks::*,
};
pub(crate) use either::Either;
pub(crate) use futures::{
future::BoxFuture,
prelude::*,
FutureExt,
};
pub(crate) use futures::{future::BoxFuture, prelude::*, FutureExt};
pub(crate) use libp2p::{
core::{upgrade::DeniedUpgrade, ConnectedPoint, Endpoint, UpgradeInfo},
swarm::{
derive_prelude::FromSwarm, handler::ConnectionEvent, ConnectionDenied, ConnectionHandler,
ConnectionHandlerEvent, ConnectionId, KeepAlive, Stream, NetworkBehaviour,
PollParameters, SubstreamProtocol, THandler, THandlerOutEvent, ToSwarm,
ConnectionHandlerEvent, ConnectionId, NetworkBehaviour, Stream, SubstreamProtocol,
THandler, THandlerOutEvent, ToSwarm,
},
InboundUpgrade, Multiaddr, OutboundUpgrade, PeerId,
};
Expand All @@ -37,11 +33,11 @@ pub(crate) use std::{
time::{Duration, Instant},
};
pub(crate) use tokio::{
spawn,
sync::{
mpsc::*,
oneshot::{channel as oneshot_channel, Sender as OneshotSender},
RwLock,
},
time::{sleep, timeout},
spawn
};
2 changes: 1 addition & 1 deletion kamilata/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod search;
mod request_maker;
mod search_request;

pub(self) use crate::prelude::*;
use crate::prelude::*;
pub(crate) use filter_seeder::*;
pub(crate) use request_handler::*;
pub(crate) use filter_leecher::*;
Expand Down
2 changes: 1 addition & 1 deletion kamilata/src/tasks/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ async fn search_one<const N: usize, S: Store<N>>(
let (routes_sender, mut routes_receiver) = channel(100);
behaviour_controller.dial_peer_and_message(remote_peer_id, addresses, BehaviorToHandlerEvent::SearchRequest { query, routes_sender, result_sender: search_follower, over_notifier }).await;

let Some(routes) = routes_receiver.recv().await else {return None};
let routes = routes_receiver.recv().await?;
let routes = routes.into_iter().map(|distant_match|
ProviderInfo {
peer_id: distant_match.peer_id.into(),
Expand Down
10 changes: 8 additions & 2 deletions kamilata/tests/common/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

use futures::future;
use libp2p::{identity::{self, Keypair}, core::transport::MemoryTransport, PeerId, Transport, Swarm, Multiaddr, swarm::{SwarmEvent, SwarmBuilder}};
use libp2p::{identity::{self, Keypair}, core::transport::MemoryTransport, PeerId, Transport, Swarm, Multiaddr, swarm::SwarmEvent, SwarmBuilder};

use tokio::sync::{
mpsc::*,
Expand Down Expand Up @@ -113,7 +113,13 @@ impl Client {
// can be observed.
let behaviour = KamilataBehaviour::new_with_config(local_peer_id, config);

let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build();
let mut swarm = SwarmBuilder::with_existing_identity(local_key.clone())
.with_tokio()
.with_other_transport(|_| transport)
.expect("Failed to build swarm with transport")
.with_behaviour(|_| behaviour)
.expect("Failed to build swarm with behaviour")
.build();

// Tell the swarm to listen on all interfaces and a random, OS-assigned port.
let mut addr: Option<Multiaddr> = None;
Expand Down

0 comments on commit 4952e1a

Please sign in to comment.