Skip to content
This repository was archived by the owner on May 20, 2025. It is now read-only.

Commit ebb9a2f

Browse files
committed
Address review comments
1 parent 29a6095 commit ebb9a2f

File tree

7 files changed

+76
-67
lines changed

7 files changed

+76
-67
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bee-api/bee-rest-api/CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1919
2020
### Security -->
2121

22-
## 0.2.0 - 2022-XX-XX
22+
## 0.2.1 - 2022-02-28
23+
24+
- Update `bee-gossip` dependency to 0.5.0;
25+
26+
## 0.2.0 - 2022-01-28
2327

2428
### Added
2529

bee-api/bee-rest-api/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "bee-rest-api"
3-
version = "0.2.0"
3+
version = "0.2.1"
44
authors = [ "IOTA Stiftung" ]
55
edition = "2021"
66
description = "The default REST API implementation for the IOTA Bee node software."

bee-network/bee-gossip/src/service/event.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,9 @@ pub enum InternalEvent {
132132
},
133133

134134
/// The gossip protocol with a peer was stopped.
135-
ProtocolStopped {
135+
ProtocolStopped {
136136
/// The peer's id.
137-
peer_id: PeerId
137+
peer_id: PeerId,
138138
},
139139

140140
/// A peer didn't answer our repeated calls.

bee-network/bee-gossip/src/service/host.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
// Copyright 2020-2021 IOTA Stiftung
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use std::time::{SystemTime, UNIX_EPOCH};
5-
64
use super::{
75
command::{Command, CommandReceiver, CommandSender},
86
error::Error,
@@ -33,6 +31,8 @@ use rand::Rng;
3331
use tokio::time::{self, Duration, Instant};
3432
use tokio_stream::wrappers::{IntervalStream, UnboundedReceiverStream};
3533

34+
use std::time::{SystemTime, UNIX_EPOCH};
35+
3636
const MAX_PEER_STATE_CHECKER_DELAY_MILLIS: u64 = 2000;
3737
const MAX_DIALS: usize = 3;
3838

@@ -436,17 +436,27 @@ async fn process_internal_event(
436436
// Spin up separate buffered reader and writer to efficiently process the gossip with that peer.
437437
let (r, w) = substream.split();
438438

439-
let reader = BufReader::with_capacity(IO_BUFFER_LEN, r);
440-
let writer = BufWriter::with_capacity(IO_BUFFER_LEN, w);
439+
let inbound_gossip_rx = BufReader::with_capacity(IO_BUFFER_LEN, r);
440+
let outbound_gossip_tx = BufWriter::with_capacity(IO_BUFFER_LEN, w);
441441

442-
let (incoming_tx, incoming_rx) = iota_gossip::channel();
443-
let (outgoing_tx, outgoing_rx) = iota_gossip::channel();
442+
let (inbound_gossip_tx, gossip_in) = iota_gossip::channel();
443+
let (gossip_out, outbound_gossip_rx) = iota_gossip::channel();
444444

445-
iota_gossip::start_incoming_processor(peer_id, reader, incoming_tx, senders.internal_events.clone());
446-
iota_gossip::start_outgoing_processor(peer_id, writer, outgoing_rx, senders.internal_events.clone());
445+
iota_gossip::start_inbound_gossip_handler(
446+
peer_id,
447+
inbound_gossip_rx,
448+
inbound_gossip_tx,
449+
senders.internal_events.clone(),
450+
);
451+
iota_gossip::start_outbound_gossip_handler(
452+
peer_id,
453+
outbound_gossip_tx,
454+
outbound_gossip_rx,
455+
senders.internal_events.clone(),
456+
);
447457

448458
// We store a clone of the gossip send channel in order to send a shutdown signal.
449-
let _ = peerlist.update_state(&peer_id, |state| state.set_connected(outgoing_tx.clone()));
459+
let _ = peerlist.update_state(&peer_id, |state| state.set_connected(gossip_out.clone()));
450460

451461
// We no longer need to hold the lock.
452462
drop(peerlist);
@@ -476,8 +486,8 @@ async fn process_internal_event(
476486
.send(Event::PeerConnected {
477487
peer_id,
478488
info: peer_info,
479-
gossip_in: incoming_rx,
480-
gossip_out: outgoing_tx,
489+
gossip_in,
490+
gossip_out,
481491
})
482492
.map_err(|_| Error::SendingEventFailed)?;
483493
} else {

bee-network/bee-gossip/src/swarm/behaviour.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ impl NetworkBehaviourEventProcess<IdentifyEvent> for SwarmBehaviour {
4545
IdentifyEvent::Received { peer_id, info } => {
4646
trace!("Received Identify response from {}: {:?}.", alias!(peer_id), info,);
4747

48-
// Panic: we made sure that the sender (network host) is always dropped before the receiver (service host)
49-
// through the worker dependencies, hence this can never panic.
48+
// Panic: we made sure that the sender (network host) is always dropped before the receiver (service
49+
// host) through the worker dependencies, hence this can never panic.
5050
self.internal_sender
5151
.send(InternalEvent::PeerIdentified { peer_id })
5252
.expect("send internal event");
@@ -60,8 +60,8 @@ impl NetworkBehaviourEventProcess<IdentifyEvent> for SwarmBehaviour {
6060
IdentifyEvent::Error { peer_id, error } => {
6161
debug!("Identification error with {}: Cause: {:?}.", alias!(peer_id), error);
6262

63-
// Panic: we made sure that the sender (network host) is always dropped before the receiver (service host)
64-
// through the worker dependencies, hence this can never panic.
63+
// Panic: we made sure that the sender (network host) is always dropped before the receiver (service
64+
// host) through the worker dependencies, hence this can never panic.
6565
self.internal_sender
6666
.send(InternalEvent::PeerUnreachable { peer_id })
6767
.expect("send internal event");

bee-network/bee-gossip/src/swarm/protocols/iota_gossip/io.rs

Lines changed: 42 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -28,85 +28,80 @@ pub fn channel() -> (GossipSender, GossipReceiver) {
2828
(sender, UnboundedReceiverStream::new(receiver))
2929
}
3030

31-
pub fn start_incoming_processor(
31+
pub fn start_inbound_gossip_handler(
3232
peer_id: PeerId,
33-
mut reader: BufReader<ReadHalf<Box<NegotiatedSubstream>>>,
34-
incoming_tx: GossipSender,
35-
internal_event_sender: InternalEventSender,
33+
mut inbound_gossip_rx: BufReader<ReadHalf<Box<NegotiatedSubstream>>>,
34+
inbound_gossip_tx: GossipSender,
35+
internal_event_tx: InternalEventSender,
3636
) {
3737
tokio::spawn(async move {
38-
let mut msg_buf = vec![0u8; MSG_BUFFER_LEN];
38+
let mut buf = vec![0u8; MSG_BUFFER_LEN];
3939

4040
loop {
41-
if let Some(len) = (&mut reader).read(&mut msg_buf).await.ok().filter(|len| *len > 0) {
42-
if incoming_tx.send(msg_buf[..len].to_vec()).is_err() {
43-
debug!("gossip-in: receiver dropped locally.");
41+
if let Some(len) = (&mut inbound_gossip_rx)
42+
.read(&mut buf)
43+
.await
44+
.ok()
45+
.filter(|len| *len > 0)
46+
{
47+
if inbound_gossip_tx.send(buf[..len].to_vec()).is_err() {
48+
debug!("Terminating gossip protocol with {}.", alias!(peer_id));
4449

45-
// The receiver of this channel was dropped, maybe due to a shutdown. There is nothing we can do
46-
// to salvage this situation, hence we drop the connection.
4750
break;
4851
}
4952
} else {
50-
debug!("gossip-in: stream closed remotely.");
53+
debug!("Peer {} terminated gossip protocol.", alias!(peer_id));
5154

52-
// NB: The network service will not shut down before it has received the `ProtocolDropped` event
53-
// from all once connected peers, hence if the following send fails, then it
54-
// must be considered a bug.
55+
// Panic: we made sure that the sender (network host) is always dropped before the receiver (service
56+
// host) through the worker dependencies, hence this can never panic.
57+
internal_event_tx
58+
.send(InternalEvent::ProtocolStopped { peer_id })
59+
.expect("send internal event");
5560

5661
break;
5762
}
5863
}
5964

60-
// Ignore send errors.
61-
let _ = internal_event_sender.send(InternalEvent::ProtocolStopped { peer_id });
62-
63-
// Reasons why this task might end:
64-
// (1) The remote dropped the TCP connection.
65-
// (2) The local dropped the gossip_in receiver channel.
66-
67-
debug!("gossip-in: exiting gossip-in processor for {}.", alias!(peer_id));
65+
trace!("Dropping gossip stream reader for {}.", alias!(peer_id));
6866
});
6967
}
7068

71-
pub fn start_outgoing_processor(
69+
pub fn start_outbound_gossip_handler(
7270
peer_id: PeerId,
73-
mut writer: BufWriter<WriteHalf<Box<NegotiatedSubstream>>>,
74-
outgoing_rx: GossipReceiver,
75-
internal_event_sender: InternalEventSender,
71+
mut outbound_gossip_tx: BufWriter<WriteHalf<Box<NegotiatedSubstream>>>,
72+
outbound_gossip_rx: GossipReceiver,
73+
internal_event_tx: InternalEventSender,
7674
) {
7775
tokio::spawn(async move {
78-
let mut outgoing_gossip_receiver = outgoing_rx.fuse();
76+
let mut outbound_gossip_rx = outbound_gossip_rx.fuse();
7977

8078
// If the gossip sender dropped we end the connection.
81-
while let Some(message) = outgoing_gossip_receiver.next().await {
82-
// NB: Instead of polling another shutdown channel, we use an empty message
79+
while let Some(message) = outbound_gossip_rx.next().await {
80+
// Note: Instead of polling another shutdown channel, we use an empty message
8381
// to signal that we want to end the connection. We use this "trick" whenever the network
8482
// receives the `DisconnectPeer` command to enforce that the connection will be dropped.
85-
8683
if message.is_empty() {
87-
debug!("gossip-out: received shutdown message.");
84+
debug!(
85+
"Terminating gossip protocol with {} (received shutdown signal).",
86+
alias!(peer_id)
87+
);
88+
89+
// Panic: we made sure that the sender (network host) is always dropped before the receiver (service
90+
// host) through the worker dependencies, hence this can never panic.
91+
internal_event_tx
92+
.send(InternalEvent::ProtocolStopped { peer_id })
93+
.expect("send internal event");
8894

89-
// NB: The network service will not shut down before it has received the `ConnectionDropped` event
90-
// from all once connected peers, hence if the following send fails, then it
91-
// must be considered a bug.
9295
break;
93-
}
96+
} else if (&mut outbound_gossip_tx).write_all(&message).await.is_err()
97+
|| (&mut outbound_gossip_tx).flush().await.is_err()
98+
{
99+
debug!("Peer {} terminated gossip protocol.", alias!(peer_id));
94100

95-
// If sending to the stream fails we end the connection.
96-
// TODO: buffer for x milliseconds before flushing.
97-
if (&mut writer).write_all(&message).await.is_err() || (&mut writer).flush().await.is_err() {
98-
debug!("gossip-out: stream closed remotely");
99101
break;
100102
}
101103
}
102104

103-
// Ignore send errors.
104-
let _ = internal_event_sender.send(InternalEvent::ProtocolStopped { peer_id });
105-
106-
// Reasons why this task might end:
107-
// (1) The local send the shutdown message (len = 0)
108-
// (2) The remote dropped the TCP connection.
109-
110-
debug!("gossip-out: exiting gossip-out processor for {}.", alias!(peer_id));
105+
trace!("Dropping gossip stream writer for {}.", alias!(peer_id));
111106
});
112107
}

0 commit comments

Comments
 (0)