Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bee-gossip rewrite #1174

Draft
wants to merge 3 commits into
base: mainnet-develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
374 changes: 298 additions & 76 deletions Cargo.lock

Large diffs are not rendered by default.

4,272 changes: 4,272 additions & 0 deletions Cargo.lock.bak2

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions bee-api/bee-rest-api/src/endpoints/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::endpoints::{config::RestApiConfig, storage::StorageBackend, Bech32Hrp, NetworkId};

use bee_gossip::NetworkCommandSender;
use bee_gossip::GossipManagerCommandTx;
use bee_ledger::workers::consensus::ConsensusWorkerCommand;
use bee_protocol::workers::{
config::ProtocolConfig, MessageRequesterWorker, MessageSubmitterWorkerEvent, PeerManager, RequestedMessages,
Expand Down Expand Up @@ -64,10 +64,10 @@ pub(crate) fn with_peer_manager(
warp::any().map(move || peer_manager.clone())
}

pub(crate) fn with_network_command_sender(
command_sender: ResourceHandle<NetworkCommandSender>,
) -> impl Filter<Extract = (ResourceHandle<NetworkCommandSender>,), Error = Infallible> + Clone {
warp::any().map(move || command_sender.clone())
pub(crate) fn with_gossip_command_tx(
command_tx: ResourceHandle<GossipManagerCommandTx>,
) -> impl Filter<Extract = (ResourceHandle<GossipManagerCommandTx>,), Error = Infallible> + Clone {
warp::any().map(move || command_tx.clone())
}

pub(crate) fn with_node_info(
Expand Down
6 changes: 3 additions & 3 deletions bee-api/bee-rest-api/src/endpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use storage::StorageBackend;

use crate::types::body::{DefaultErrorResponse, ErrorBody};

use bee_gossip::NetworkCommandSender;
use bee_gossip::GossipManagerCommandTx;
use bee_ledger::workers::consensus::ConsensusWorker;
use bee_protocol::workers::{
config::ProtocolConfig, MessageRequesterWorker, MessageSubmitterWorker, PeerManager, PeerManagerResWorker,
Expand Down Expand Up @@ -84,7 +84,7 @@ where
let message_requester = node.worker::<MessageRequesterWorker>().unwrap().clone();
let requested_messages = node.resource::<RequestedMessages>();
let peer_manager = node.resource::<PeerManager>();
let network_controller = node.resource::<NetworkCommandSender>();
let gossip_command_tx = node.resource::<GossipManagerCommandTx>();
let node_info = node.info();
let bus = node.bus();

Expand All @@ -102,7 +102,7 @@ where
rest_api_config.clone(),
protocol_config,
peer_manager,
network_controller,
gossip_command_tx,
node_info,
bus,
message_requester,
Expand Down
6 changes: 3 additions & 3 deletions bee-api/bee-rest-api/src/endpoints/routes/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub mod v1;

use crate::endpoints::{config::RestApiConfig, storage::StorageBackend, Bech32Hrp, NetworkId};

use bee_gossip::NetworkCommandSender;
use bee_gossip::GossipManagerCommandTx;
use bee_ledger::workers::consensus::ConsensusWorkerCommand;
use bee_protocol::workers::{
config::ProtocolConfig, MessageRequesterWorker, MessageSubmitterWorkerEvent, PeerManager, RequestedMessages,
Expand Down Expand Up @@ -35,7 +35,7 @@ pub(crate) fn filter<B: StorageBackend>(
rest_api_config: RestApiConfig,
protocol_config: ProtocolConfig,
peer_manager: ResourceHandle<PeerManager>,
network_command_sender: ResourceHandle<NetworkCommandSender>,
gossip_command_tx: ResourceHandle<GossipManagerCommandTx>,
node_info: ResourceHandle<NodeInfo>,
bus: ResourceHandle<Bus<'static>>,
message_requester: MessageRequesterWorker,
Expand All @@ -53,7 +53,7 @@ pub(crate) fn filter<B: StorageBackend>(
rest_api_config.clone(),
protocol_config,
peer_manager,
network_command_sender,
gossip_command_tx,
node_info,
consensus_worker,
)
Expand Down
28 changes: 14 additions & 14 deletions bee-api/bee-rest-api/src/endpoints/routes/api/v1/add_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::{
endpoints::{
config::ROUTE_ADD_PEER,
filters::{with_network_command_sender, with_peer_manager},
filters::{with_gossip_command_tx, with_peer_manager},
permission::has_permission,
rejection::CustomRejection,
},
Expand All @@ -15,7 +15,7 @@ use crate::{
},
};

use bee_gossip::{Command::AddPeer, Multiaddr, NetworkCommandSender, PeerId, PeerRelation, Protocol};
use bee_gossip::{GossipManagerCommand::AddPeer, GossipManagerCommandTx, Multiaddr, PeerId, PeerType, Protocol};
use bee_protocol::workers::PeerManager;
use bee_runtime::resource::ResourceHandle;

Expand All @@ -32,24 +32,24 @@ pub(crate) fn filter(
public_routes: Box<[String]>,
allowed_ips: Box<[IpAddr]>,
peer_manager: ResourceHandle<PeerManager>,
network_command_sender: ResourceHandle<NetworkCommandSender>,
gossip_command_tx: ResourceHandle<GossipManagerCommandTx>,
) -> BoxedFilter<(impl Reply,)> {
self::path()
.and(warp::post())
.and(has_permission(ROUTE_ADD_PEER, public_routes, allowed_ips))
.and(warp::body::json())
.and(with_peer_manager(peer_manager))
.and(with_network_command_sender(network_command_sender))
.and_then(
|value, peer_manager, network_controller| async move { add_peer(value, peer_manager, network_controller) },
)
.and(with_gossip_command_tx(gossip_command_tx))
.and_then(|value, peer_manager, gossip_command_tx| async move {
add_peer(value, peer_manager, gossip_command_tx).await
})
.boxed()
}

pub(crate) fn add_peer(
pub(crate) async fn add_peer(
value: JsonValue,
peer_manager: ResourceHandle<PeerManager>,
network_controller: ResourceHandle<NetworkCommandSender>,
gossip_command_tx: ResourceHandle<GossipManagerCommandTx>,
) -> Result<impl Reply, Rejection> {
let multi_address_v = &value["multiAddress"];
let alias_v = &value["alias"];
Expand Down Expand Up @@ -77,7 +77,7 @@ pub(crate) fn add_peer(
}
};

match peer_manager.get(&peer_id) {
match peer_manager.get(&peer_id).await {
Some(peer_entry) => {
let peer_dto = PeerDto::from(peer_entry.0.as_ref());
Ok(warp::reply::with_status(
Expand All @@ -102,11 +102,11 @@ pub(crate) fn add_peer(
)
};

if let Err(e) = network_controller.send(AddPeer {
if let Err(e) = gossip_command_tx.send(AddPeer {
peer_id,
multiaddr: multi_address.clone(),
alias: alias.clone(),
relation: PeerRelation::Known,
peer_addr: multi_address.clone(),
peer_alias: alias.clone(),
peer_type: PeerType::Manual,
}) {
return Err(reject::custom(CustomRejection::NotFound(format!(
"failed to add peer: {}",
Expand Down
8 changes: 4 additions & 4 deletions bee-api/bee-rest-api/src/endpoints/routes/api/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub mod treasury;

use crate::endpoints::{config::RestApiConfig, storage::StorageBackend, Bech32Hrp, NetworkId};

use bee_gossip::NetworkCommandSender;
use bee_gossip::GossipManagerCommandTx;
use bee_ledger::workers::consensus::ConsensusWorkerCommand;
use bee_protocol::workers::{config::ProtocolConfig, MessageSubmitterWorkerEvent, PeerManager};
use bee_runtime::{node::NodeInfo, resource::ResourceHandle};
Expand All @@ -54,15 +54,15 @@ pub(crate) fn filter<B: StorageBackend>(
rest_api_config: RestApiConfig,
protocol_config: ProtocolConfig,
peer_manager: ResourceHandle<PeerManager>,
network_command_sender: ResourceHandle<NetworkCommandSender>,
gossip_command_tx: ResourceHandle<GossipManagerCommandTx>,
node_info: ResourceHandle<NodeInfo>,
consensus_worker: mpsc::UnboundedSender<ConsensusWorkerCommand>,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
add_peer::filter(
public_routes.clone(),
allowed_ips.clone(),
peer_manager.clone(),
network_command_sender.clone(),
gossip_command_tx.clone(),
)
.or(balance_bech32::filter(
public_routes.clone(),
Expand Down Expand Up @@ -155,7 +155,7 @@ pub(crate) fn filter<B: StorageBackend>(
.or(remove_peer::filter(
public_routes.clone(),
allowed_ips.clone(),
network_command_sender,
gossip_command_tx,
))
.or(submit_message::filter(
public_routes.clone(),
Expand Down
6 changes: 3 additions & 3 deletions bee-api/bee-rest-api/src/endpoints/routes/api/v1/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ pub(crate) fn filter(
.and(warp::get())
.and(has_permission(ROUTE_PEER, public_routes, allowed_ips))
.and(with_peer_manager(peer_manager))
.and_then(|peer_id, peer_manager| async move { peer(peer_id, peer_manager) })
.and_then(|peer_id, peer_manager| async move { peer(peer_id, peer_manager).await })
.boxed()
}

pub(crate) fn peer(peer_id: PeerId, peer_manager: ResourceHandle<PeerManager>) -> Result<impl Reply, Rejection> {
match peer_manager.get(&peer_id) {
pub(crate) async fn peer(peer_id: PeerId, peer_manager: ResourceHandle<PeerManager>) -> Result<impl Reply, Rejection> {
match peer_manager.get(&peer_id).await {
Some(peer_entry) => Ok(warp::reply::json(&SuccessBody::new(PeerResponse(PeerDto::from(
peer_entry.0.as_ref(),
))))),
Expand Down
6 changes: 3 additions & 3 deletions bee-api/bee-rest-api/src/endpoints/routes/api/v1/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ pub(crate) fn filter(
.and(warp::get())
.and(has_permission(ROUTE_PEERS, public_routes, allowed_ips))
.and(with_peer_manager(peer_manager))
.and_then(|peer_manager| async move { peers(peer_manager) })
.and_then(|peer_manager| async move { peers(peer_manager).await })
.boxed()
}

pub(crate) fn peers(peer_manager: ResourceHandle<PeerManager>) -> Result<impl Reply, Infallible> {
pub(crate) async fn peers(peer_manager: ResourceHandle<PeerManager>) -> Result<impl Reply, Infallible> {
let mut peers_dtos = Vec::new();
for peer in peer_manager.get_all() {
for peer in peer_manager.get_all().await {
peers_dtos.push(PeerDto::from(peer.as_ref()));
}
Ok(warp::reply::json(&SuccessBody::new(PeersResponse(peers_dtos))))
Expand Down
12 changes: 6 additions & 6 deletions bee-api/bee-rest-api/src/endpoints/routes/api/v1/remove_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

use crate::endpoints::{
config::ROUTE_REMOVE_PEER, filters::with_network_command_sender, path_params::peer_id, permission::has_permission,
config::ROUTE_REMOVE_PEER, filters::with_gossip_command_tx, path_params::peer_id, permission::has_permission,
rejection::CustomRejection,
};

use bee_gossip::{Command::RemovePeer, NetworkCommandSender, PeerId};
use bee_gossip::{GossipManagerCommand::RemovePeer, GossipManagerCommandTx, PeerId};
use bee_runtime::resource::ResourceHandle;

use warp::{filters::BoxedFilter, http::StatusCode, reject, Filter, Rejection, Reply};
Expand All @@ -23,21 +23,21 @@ fn path() -> impl Filter<Extract = (PeerId,), Error = warp::Rejection> + Clone {
pub(crate) fn filter(
public_routes: Box<[String]>,
allowed_ips: Box<[IpAddr]>,
network_command_sender: ResourceHandle<NetworkCommandSender>,
gossip_command_tx: ResourceHandle<GossipManagerCommandTx>,
) -> BoxedFilter<(impl Reply,)> {
self::path()
.and(warp::delete())
.and(has_permission(ROUTE_REMOVE_PEER, public_routes, allowed_ips))
.and(with_network_command_sender(network_command_sender))
.and(with_gossip_command_tx(gossip_command_tx))
.and_then(remove_peer)
.boxed()
}

pub(crate) async fn remove_peer(
peer_id: PeerId,
network_controller: ResourceHandle<NetworkCommandSender>,
gossip_command_tx: ResourceHandle<GossipManagerCommandTx>,
) -> Result<impl Reply, Rejection> {
if let Err(e) = network_controller.send(RemovePeer { peer_id }) {
if let Err(e) = gossip_command_tx.send(RemovePeer { peer_id }) {
return Err(reject::custom(CustomRejection::NotFound(format!(
"failed to remove peer: {}",
e
Expand Down
2 changes: 1 addition & 1 deletion bee-api/bee-rest-api/src/endpoints/routes/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub async fn is_healthy<B: StorageBackend>(tangle: &Tangle<B>, peer_manager: &Pe
return false;
}

if peer_manager.connected_peers() == 0 {
if peer_manager.connected_peers().await == 0 {
return false;
}

Expand Down
6 changes: 3 additions & 3 deletions bee-api/bee-rest-api/src/endpoints/routes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub mod health;

use crate::endpoints::{config::RestApiConfig, storage::StorageBackend, Bech32Hrp, NetworkId};

use bee_gossip::NetworkCommandSender;
use bee_gossip::GossipManagerCommandTx;
use bee_ledger::workers::consensus::ConsensusWorkerCommand;
use bee_protocol::workers::{
config::ProtocolConfig, MessageRequesterWorker, MessageSubmitterWorkerEvent, PeerManager, RequestedMessages,
Expand All @@ -31,7 +31,7 @@ pub(crate) fn filter_all<B: StorageBackend>(
rest_api_config: RestApiConfig,
protocol_config: ProtocolConfig,
peer_manager: ResourceHandle<PeerManager>,
network_command_sender: ResourceHandle<NetworkCommandSender>,
gossip_command_tx: ResourceHandle<GossipManagerCommandTx>,
node_info: ResourceHandle<NodeInfo>,
bus: ResourceHandle<Bus<'static>>,
message_requester: MessageRequesterWorker,
Expand All @@ -49,7 +49,7 @@ pub(crate) fn filter_all<B: StorageBackend>(
rest_api_config,
protocol_config,
peer_manager.clone(),
network_command_sender,
gossip_command_tx,
node_info,
bus,
message_requester,
Expand Down
6 changes: 6 additions & 0 deletions bee-network/bee-autopeering/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Security -->

## 0.4.1 - 2022-xx-xx

### Changed

- Use libp2p-core 0.30.2

## 0.4.0 - 2022-02-11

### Added
Expand Down
2 changes: 1 addition & 1 deletion bee-network/bee-autopeering/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ bytes = { version = "1.0", default-features = false }
hash32 = { version = "0.2.1", default-features = false }
hex = { version = "0.4.3", default-features = false }
iota-crypto = { version = "0.9.1", default-features = false, features = [ "ed25519", "random", "sha" ] }
libp2p-core = { version = "0.29.0", default-features = false }
libp2p-core = { version = "0.30.2", default-features = false }
log = { version = "0.4", default-features = false }
num = { version = "0.4.0", default-features = false }
num-derive = { version = "0.3.3", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion bee-network/bee-autopeering/src/peer/peer_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl PeerId {

/// Creates the corresponding `libp2p_core::PeerId` from a crypto.rs ED25519 public key.
pub fn libp2p_peer_id(public_key: &PublicKey) -> libp2p_core::PeerId {
libp2p_core::PeerId::from_public_key(libp2p_public_key(public_key))
libp2p_core::PeerId::from_public_key(&libp2p_public_key(public_key))
}

/// Creates the corresponding `libp2p_core::PublicKey` from a crypto.rs ED25519 public key.
Expand Down
13 changes: 5 additions & 8 deletions bee-network/bee-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ full = [
"libp2p/identify",
"libp2p/mplex",
"libp2p/noise",
"libp2p/ping",
"libp2p/tcp-tokio",
"libp2p/yamux",
"log",
"once_cell",
"rand",
"serde",
"thiserror",
Expand All @@ -43,10 +43,11 @@ bee-runtime = { version = "0.1.1-alpha", path = "../../bee-runtime", default-fea
async-trait = { version = "0.1.51", default-features = false, optional = true }
futures = { version = "0.3.17", default-features = false, optional = true }
hashbrown = { version = "0.11.2", default-features = false, features = [ "ahash", "inline-more" ] }
libp2p = { version = "0.39.1", default-features = false, optional = true }
libp2p-core = { version = "0.29.0", default-features = false }
libp2p = { version = "0.41.0", default-features = false, optional = true }
libp2p-core = { version = "0.30.2", default-features = false }
log = { version = "0.4.14", default-features = false, optional = true }
once_cell = { version = "1.8.0", default-features = false, optional = true }
parking_lot = "0.12.0"
priority-queue = "1.2.1"
rand = { version = "0.8.4", default-features = false, optional = true }
serde = { version = "1.0.130", default-features = false, features = [ "derive" ], optional = true }
thiserror = { version = "1.0.30", default-features = false, optional = true }
Expand All @@ -58,7 +59,3 @@ fern = { version = "0.6.0", default-features = false }
hex = { version = "0.4.3", default-features = false, features = [ "alloc" ] }
serial_test = { version = "0.5.1", default-features = false }
tokio = { version = "1.12.0", default-features = false, features = [ "io-std", "io-util", "macros", "rt", "rt-multi-thread", "signal", "time" ] }

[[example]]
name = "chat"
required-features = [ "full" ]
Loading