From ce9f5d3293329a52cc19d84a8b035c3f3747976f Mon Sep 17 00:00:00 2001 From: axiomatic-aardvark Date: Wed, 4 Oct 2023 16:49:53 +0300 Subject: [PATCH] fix: boot node connections --- Cargo.lock | 115 +++++++++++++++----------- Cargo.toml | 2 +- examples/ping-pong/src/main.rs | 16 ++-- src/graphcast_agent/message_typing.rs | 9 +- src/graphcast_agent/mod.rs | 8 +- src/graphcast_agent/waku_handling.rs | 64 +++++++------- 6 files changed, 123 insertions(+), 91 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e4865d2..f4a24f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -88,9 +88,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.5.0" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f58811cfac344940f1a400b6e6231ce35171f614f26439e80f8c1465c5cc0c" +checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44" dependencies = [ "anstyle", "anstyle-parse", @@ -102,15 +102,15 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b84bf0a05bbb2a83e5eb6fa36bb6e87baa08193c35ff52bbf6b38d8af2890e46" +checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" [[package]] name = "anstyle-parse" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "938874ff5980b03a87c5524b3ae5b59cf99b1d6bc836848df7bc5ada9643c333" +checksum = "317b9a89c1868f5ea6ff1d9539a69f45dffc21ce321ac1fd1160dfa48c8e2140" dependencies = [ "utf8parse", ] @@ -126,9 +126,9 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "2.1.0" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd" +checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" dependencies = [ "anstyle", "windows-sys", @@ -754,9 +754,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.5" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "824956d0dca8334758a5b7f7e50518d66ea319330cbceedcf76905c2f6ab30e3" +checksum = "d04704f56c2cde07f43e8e2c154b43f216dc5c92fc98ada720177362f953b956" dependencies = [ "clap_builder", "clap_derive", @@ -764,9 +764,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.5" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "122ec64120a49b4563ccaedcbea7818d069ed8e9aa6d829b82d8a4128936b2ab" +checksum = "0e231faeaca65ebd1ea3c737966bf858971cd38c3849107aa3ea7de90a804e45" dependencies = [ "anstream", "anstyle", @@ -865,9 +865,9 @@ dependencies = [ [[package]] name = "const-hex" -version = "1.9.0" +version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa72a10d0e914cad6bcad4e7409e68d230c1c2db67896e19a37f758b1fcbdab5" +checksum = "c37be52ef5e3b394db27a2341010685ad5103c72ac15ce2e9420a7e8f93f342c" dependencies = [ "cfg-if", "cpufeatures", @@ -1305,7 +1305,7 @@ checksum = "a4b1e0c257a9e9f25f90ff76d7a68360ed497ee519c8e428d1825ef0000799d4" dependencies = [ "der 0.7.8", "digest", - "elliptic-curve 0.13.5", + "elliptic-curve 0.13.6", "rfc6979 0.4.0", "signature 2.1.0", "spki 0.7.2", @@ -1339,9 +1339,9 @@ dependencies = [ [[package]] name = "elliptic-curve" -version = "0.13.5" +version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "968405c8fdc9b3bf4df0a6638858cc0b52462836ab6b1c87377785dd09cf1c0b" +checksum = "d97ca172ae9dc9f9b779a6e3a65d308f2af74e5b8c921299075bdb4a0370e914" dependencies = [ "base16ct 0.2.0", "crypto-bigint 0.5.3", @@ -1430,9 +1430,9 @@ dependencies = [ [[package]] name = "errno" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "136526188508e25c6fef639d7927dfb3e0e3084488bf202267829cf7fc23dbdd" +checksum = "add4f07d43996f76ef320709726a556a9d4f965d9410d8d0271132d2f8293480" dependencies = [ "errno-dragonfly", "libc", @@ -1648,7 +1648,7 @@ dependencies = [ "cargo_metadata 0.17.0", "chrono", "const-hex", - "elliptic-curve 0.13.5", + "elliptic-curve 0.13.6", "ethabi", "generic-array", "k256 0.13.1", @@ -1769,7 +1769,7 @@ dependencies = [ "coins-bip32", "coins-bip39", "const-hex", - "elliptic-curve 0.13.5", + "elliptic-curve 0.13.6", "eth-keystore", "ethers-core 2.0.10", "rand", @@ -2253,9 +2253,9 @@ checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" [[package]] name = "hashbrown" -version = "0.14.0" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" +checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" [[package]] name = "hashers" @@ -2515,12 +2515,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.0.1" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad227c3af19d4914570ad36d30409928b75967c298feb9ea1969db3a610bb14e" +checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" dependencies = [ "equivalent", - "hashbrown 0.14.0", + "hashbrown 0.14.1", ] [[package]] @@ -2644,7 +2644,7 @@ checksum = "cadb76004ed8e97623117f3df85b17aaa6626ab0b0831e6573f104df16cd1bcc" dependencies = [ "cfg-if", "ecdsa 0.16.8", - "elliptic-curve 0.13.5", + "elliptic-curve 0.13.6", "once_cell", "sha2", "signature 2.1.0", @@ -2717,9 +2717,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a9bad9f94746442c783ca431b22403b519cd7fbeed0533fdd6328b2f2212128" +checksum = "3852614a3bd9ca9804678ba6be5e3b8ce76dfc902cae004e3e0c44051b6e88db" [[package]] name = "lock_api" @@ -2770,9 +2770,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.6.3" +version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" +checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" [[package]] name = "memoffset" @@ -3255,7 +3255,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" dependencies = [ "fixedbitset", - "indexmap 2.0.1", + "indexmap 2.0.2", ] [[package]] @@ -3470,10 +3470,11 @@ dependencies = [ [[package]] name = "prometheus-http-query" -version = "0.6.6" +version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7970fd6e91b5cb87e9a093657572a896d133879ced7752d2c7635beae29eaba0" +checksum = "6704e3a7a78545b1496524d518658005a6cc308abc90ce5fccf01891ecdc298b" dependencies = [ + "mime", "reqwest", "serde", "serde_json", @@ -3609,13 +3610,13 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.5" +version = "1.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" +checksum = "ebee201405406dbf528b8b672104ae6d6d63e6d118cb10e4d51abbc7b58044ff" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.8", + "regex-automata 0.3.9", "regex-syntax 0.7.5", ] @@ -3630,9 +3631,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" +checksum = "59b23e92ee4318893fa3fe3e6fb365258efbfe6ac6ab30f090cdcbb7aa37efa9" dependencies = [ "aho-corasick", "memchr", @@ -3653,9 +3654,9 @@ checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" [[package]] name = "reqwest" -version = "0.11.20" +version = "0.11.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" +checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" dependencies = [ "base64 0.21.4", "bytes", @@ -3682,6 +3683,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", + "system-configuration", "tokio", "tokio-native-tls", "tokio-rustls", @@ -3803,9 +3805,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.14" +version = "0.38.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "747c788e9ce8e92b12cd485c49ddf90723550b654b32508f979b71a7b1ecda4f" +checksum = "d2f9da0cbd88f9f09e7814e388301c8414c51c62aa6ce1e4b5c551d49d96e531" dependencies = [ "bitflags 2.4.0", "errno", @@ -4594,6 +4596,27 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "take_mut" version = "0.2.2" @@ -4915,7 +4938,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.0.1", + "indexmap 2.0.2", "serde", "serde_spanned", "toml_datetime", @@ -5272,9 +5295,9 @@ checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" [[package]] name = "waku-bindings" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab4f521ec123865da220d0cd39f2d3deaffca80eaa32089c488ad661198a1d98" +checksum = "c3c52764c1cde43ad4e233ad23b18bbba11f9179a6a4e9b60c660d4f22450289" dependencies = [ "aes-gcm", "base64 0.21.4", diff --git a/Cargo.toml b/Cargo.toml index 1127fc8..d495920 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ keywords = ["graphprotocol", "gossip-network", "sdk", "waku", "p2p"] categories = ["network-programming", "web-programming::http-client"] [dependencies] -waku = { version = "=0.3.1", package = "waku-bindings" } +waku = { version = "=0.3.2", package = "waku-bindings" } slack-morphism = { version = "1.10", features = ["hyper", "axum"] } prost = "0.11" once_cell = "1.17" diff --git a/examples/ping-pong/src/main.rs b/examples/ping-pong/src/main.rs index 738e720..a3339d3 100644 --- a/examples/ping-pong/src/main.rs +++ b/examples/ping-pong/src/main.rs @@ -89,6 +89,8 @@ async fn main() { // if not provided then they are usually generated based on indexer allocations let subtopics: Vec = vec!["ping-pong-content-topic".to_string()]; + let discovery_enr = "enr:-P-4QJI8tS1WTdIQxq_yIrD05oIIW1Xg-tm_qfP0CHfJGnp9dfr6ttQJmHwTNxGEl4Le8Q7YHcmi-kXTtphxFysS11oBgmlkgnY0gmlwhLymh5GKbXVsdGlhZGRyc7hgAC02KG5vZGUtMDEuZG8tYW1zMy53YWt1djIucHJvZC5zdGF0dXNpbS5uZXQGdl8ALzYobm9kZS0wMS5kby1hbXMzLndha3V2Mi5wcm9kLnN0YXR1c2ltLm5ldAYfQN4DiXNlY3AyNTZrMaEDbl1X_zJIw3EAJGtmHMVn4Z2xhpSoUaP5ElsHKCv7hlWDdGNwgnZfg3VkcIIjKIV3YWt1Mg8".to_string(); + // GraphcastAgentConfig defines the configuration that the SDK expects from all Radios, regardless of their specific functionality let graphcast_agent_config = GraphcastAgentConfig::new( config.private_key.expect("No private key provided"), @@ -106,8 +108,7 @@ async fn main() { None, None, Some(true), - // Example ENR address - Some(vec![String::from("enr:-JK4QBcfVXu2YDeSKdjF2xE5EDM5f5E_1Akpkv_yw_byn1adESxDXVLVjapjDvS_ujx6MgWDu9hqO_Az_CbKLJ8azbMBgmlkgnY0gmlwhAVOUWOJc2VjcDI1NmsxoQOUZIqKLk5xkiH0RAFaMGrziGeGxypJ03kOod1-7Pum3oN0Y3CCfJyDdWRwgiMohXdha3UyDQ")]), + Some(vec![discovery_enr]), None, ) .await @@ -178,10 +179,7 @@ async fn main_loop(agent: &GraphcastAgent, running: Arc) { info!(block = block_number, "🔗 Block number"); if block_number & 2 == 0 { // If block number is even, send ping message - let msg = SimpleMessage::new( - "table".to_string(), - std::env::args().nth(1).unwrap_or("Ping".to_string()), - ); + let msg = SimpleMessage::new("table".to_string(), "Ping".to_string()); if let Err(e) = agent .send_message( // The identifier can be any string that suits your Radio logic @@ -193,6 +191,8 @@ async fn main_loop(agent: &GraphcastAgent, running: Arc) { .await { error!(error = tracing::field::debug(&e), "Failed to send message"); + } else { + debug!("Ping message sent successfully") }; // agent.send_message(msg).await; } else { @@ -217,7 +217,9 @@ async fn main_loop(agent: &GraphcastAgent, running: Arc) { .await { error!(error = tracing::field::debug(&e), "Failed to send message"); - }; + } else { + debug!("Pong message sent successfully") + } }; } diff --git a/src/graphcast_agent/message_typing.rs b/src/graphcast_agent/message_typing.rs index d10ef20..5457c73 100644 --- a/src/graphcast_agent/message_typing.rs +++ b/src/graphcast_agent/message_typing.rs @@ -134,14 +134,7 @@ impl< .map_err(WakuHandlingError::RetrievePeersError) .unwrap_or_default() .iter() - .filter(|&peer| { - // Filter out local peer_id to prevent self dial - peer.peer_id().as_str() - != node_handle - .peer_id() - .expect("Failed to find local node's peer id") - .as_str() - }) + .filter(|&peer| peer.connected()) .map(|peer: &WakuPeerData| { // Filter subscribe to all other peers node_handle diff --git a/src/graphcast_agent/mod.rs b/src/graphcast_agent/mod.rs index 63ff6e9..5c39a8c 100644 --- a/src/graphcast_agent/mod.rs +++ b/src/graphcast_agent/mod.rs @@ -208,6 +208,8 @@ pub struct GraphcastAgent { /// Upon receiving a valid waku signal event of Message type, sender send WakuMessage through mpsc. //TODO: currently agent returns the receiver to radio operator, such that radio handler can process WakuMessage however they want. Ideally we should keep WakuMessage within graphcast agent, but for now radio operator is required call generic decode with the specified types. Later we investigate an approach to dynamically register the types during runtime pub sender: Arc>>, + /// Keeps track of whether Filter protocol is enabled, if false -> we're using Relay protocol + pub filter_protocol_enabled: bool, } impl GraphcastAgent { @@ -335,6 +337,7 @@ impl GraphcastAgent { seen_msg_ids, id_validation, sender, + filter_protocol_enabled: filter_protocol.is_some(), }) } @@ -439,7 +442,8 @@ impl GraphcastAgent { ); // Check network before sending a message - network_check(&self.node_handle).map_err(GraphcastAgentError::WakuNodeError)?; + network_check(&self.node_handle, self.filter_protocol_enabled) + .map_err(GraphcastAgentError::WakuNodeError)?; trace!( address = &wallet_address(&self.graphcast_identity.wallet), "local sender id" @@ -530,6 +534,8 @@ pub fn register_handler( } } }; + + trace!("Registering handler"); waku_set_event_callback(handle_async); Ok(()) } diff --git a/src/graphcast_agent/waku_handling.rs b/src/graphcast_agent/waku_handling.rs index 3460328..9f1ffb4 100644 --- a/src/graphcast_agent/waku_handling.rs +++ b/src/graphcast_agent/waku_handling.rs @@ -90,14 +90,6 @@ pub fn filter_peer_subscriptions( .peers() .map_err(WakuHandlingError::RetrievePeersError)? .iter() - .filter(|&peer| { - // Filter out local peer_id to prevent self dial - peer.peer_id().as_str() - != node_handle - .peer_id() - .expect("Failed to find local node's peer id") - .as_str() - }) .map(|peer: &WakuPeerData| { // subscribe to all other peers let filter_res = node_handle.filter_subscribe( @@ -166,7 +158,7 @@ fn node_config( "PANIC" => WakuLogLevel::Panic, _ => WakuLogLevel::Warn, }, - Err(_) => WakuLogLevel::Error, + Err(_) => WakuLogLevel::Panic, }; let gossipsub_params = GossipSubParams { @@ -495,26 +487,42 @@ pub fn peers_data( } /// Check for peer connectivity, try to reconnect if there are disconnected peers -pub fn network_check(node_handle: &WakuNodeHandle) -> Result<(), WakuHandlingError> { - peers_data(node_handle)? - .iter() - // Get unconnected peers and try to reconnect - .filter(|&peer| !peer.connected()) - .map(|peer: &WakuPeerData| { - debug!( - peer = tracing::field::debug(&peer), - "Disconnected peer data" - ); - node_handle.connect_peer_with_id(peer.peer_id(), None) - }) - .for_each(|res| { - if let Err(e) = res { - debug!( - error = tracing::field::debug(&e), - "Could not connect to peer" - ); +pub fn network_check( + node_handle: &WakuNodeHandle, + filter_protocol_enabled: bool, +) -> Result<(), WakuHandlingError> { + let peers = peers_data(node_handle)?; + + for peer in peers.iter() { + let supports_filter = peer + .protocols() + .iter() + .any(|p| p == "/vac/waku/filter/2.0.0-beta1"); + let supports_lightpush = peer + .protocols() + .iter() + .any(|p| p == "/vac/waku/lightpush/2.0.0-beta1"); + let supports_relay = peer + .protocols() + .iter() + .any(|p| p == "/vac/waku/relay/2.0.0"); + + let should_check_filter = filter_protocol_enabled && supports_filter; + let should_check_relay = !filter_protocol_enabled && supports_relay; + + if supports_lightpush && (should_check_filter || should_check_relay) { + if !peer.connected() { + if let Err(e) = node_handle.connect_peer_with_id(peer.peer_id(), None) { + debug!( + error = tracing::field::debug(&e), + "Could not connect to peer" + ); + } } - }); + } else { + node_handle.disconnect_peer_with_id(peer.peer_id()).unwrap(); + } + } Ok(()) }