Skip to content

Commit

Permalink
light-client: Replace Io with async variant (#995)
Browse files Browse the repository at this point in the history
* light-client: Rename ProdIo to RpcIo

The implication of production always has been an unfortunate choice at
it assumes implicit consensus of what a production deployment
environment entails. Instead it does exactly what it says on the tin.

Signed-off-by: xla <[email protected]>

* light-client: Turn denies into warning

To make local development more forgiving while doing larger changes this
turns denies into warnings. This should be reasonably safe as long as CI
keeps using `-Dwarnings`.

Signed-off-by: xla <[email protected]>

* light-client: Replace Io trait with async variant

While invasive this change transforms the light-client to be async all
the way. This touches all places where some I/O is taking place.
Achieved by roping in the async-trait[0] crate and dropping the existing
`block_on` implementation. Lo, this drops the timeout which guarded the
side-effectful operation, which will be reintroduced in a follow-up
change.

[0] https://github.com/dtolnay/async-trait

Signed-off-by: xla <[email protected]>

* light-client: Guard rpc calls with timeout

During the move to async io the block_on implementation was dropped,
which relied on tokio. In said implementation a timeout was enforced to
limit the duration of the execution of the given task. To not
permanently loose that guard rail this change introduces a timeout
implementation based on the futures-timer[0] crate. It supports a wasm
environment, which hasn't been validated or assessed yet but should give
reasonable confidence that it might work in a `no_std` environment,
q.e.d.

[0] https://crates.io/crates/futures-timer

Signed-off-by: xla <[email protected]>

* light-client: Guard rpc calls with timeout

During the move to async io the block_on implementation was dropped,
which relied on tokio. In said implementation a timeout was enforced to
limit the duration of the execution of the given task. To not
permanently loose that guard rail this change introduces a timeout
implementation based on the futures-timer[0] crate. It supports a wasm
environment, which hasn't been validated or assessed yet but should give
reasonable confidence that it might work in a `no_std` environment,
q.e.d.

[0] https://crates.io/crates/futures-timer

Signed-off-by: xla <[email protected]>

* light-client: Add changelog for async Io changes

Signed-off-by: xla <[email protected]>

* light-client: Fix import

Signed-off-by: xla <[email protected]>

* tools: Fix kv-test impl

Signed-off-by: xla <[email protected]>

* tools: Fix rpc-probe

Signed-off-by: xla <[email protected]>

* light-client: Fix blocking supervisor handle

Signed-off-by: xla <[email protected]>

* light-client: Formatting

Signed-off-by: xla <[email protected]>
  • Loading branch information
xla authored Apr 12, 2022
1 parent b23d626 commit 7a8d681
Show file tree
Hide file tree
Showing 23 changed files with 350 additions and 297 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Replace Io with async variant
([#953](https://github.com/informalsystems/tendermint-rs/issues/953))
7 changes: 5 additions & 2 deletions light-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,21 @@ tendermint = { version = "0.24.0-pre.1", path = "../tendermint", default-feature
tendermint-rpc = { version = "0.24.0-pre.1", path = "../rpc", default-features = false }
tendermint-light-client-verifier = { version = "0.24.0-pre.1", path = "../light-client-verifier", default-features = false }

async-recursion = { version = "0.3", default-features = false }
async-trait = { version = "0.1", default-features = false }
contracts = { version = "0.6.2", default-features = false }
crossbeam-channel = { version = "0.4.2", default-features = false }
flex-error = { version = "0.4.4", default-features = false }
flume = { version = "0.10", default-features = false, features = [ "async" ] }
derive_more = { version = "0.99.5", default-features = false, features = ["display"] }
futures = { version = "0.3.4", default-features = false }
futures-timer = { version = "3.0", default-features = false }
serde = { version = "1.0.106", default-features = false }
serde_cbor = { version = "0.11.1", default-features = false, features = ["alloc", "std"] }
serde_derive = { version = "1.0.106", default-features = false }
sled = { version = "0.34.3", optional = true, default-features = false }
static_assertions = { version = "1.1.0", default-features = false }
time = { version = "0.3.5", default-features = false, features = ["std"] }
tokio = { version = "1.0", default-features = false, features = ["rt"], optional = true }
flex-error = { version = "0.4.4", default-features = false }

[dev-dependencies]
tendermint-testgen = { path = "../testgen", default-features = false }
Expand Down
7 changes: 4 additions & 3 deletions light-client/examples/light_client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{path::PathBuf, time::Duration};

use futures::executor::block_on;
use gumdrop::Options;
use tendermint::Hash;
use tendermint_light_client::{
Expand Down Expand Up @@ -92,7 +93,7 @@ fn make_instance(
LightClientBuilder::prod(peer_id, rpc_client, Box::new(light_store), options, None);

let builder = if let (Some(height), Some(hash)) = (opts.trusted_height, opts.trusted_hash) {
builder.trust_primary_at(height, hash)
block_on(builder.trust_primary_at(height, hash))
} else {
builder.trust_from_store()
}?;
Expand All @@ -117,10 +118,10 @@ fn sync_cmd(opts: SyncOpts) -> Result<(), Box<dyn std::error::Error>> {

let handle = supervisor.handle();

std::thread::spawn(|| supervisor.run());
std::thread::spawn(|| block_on(supervisor.run()));

loop {
match handle.verify_to_highest() {
match block_on(handle.verify_to_highest()) {
Ok(light_block) => {
println!("[info] synced to block {}", light_block.height());
},
Expand Down
13 changes: 7 additions & 6 deletions light-client/src/builder/light_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tendermint::{block::Height, Hash};
#[cfg(feature = "rpc-client")]
use {
crate::components::clock::SystemClock,
crate::components::io::ProdIo,
crate::components::io::RpcIo,
crate::components::scheduler,
crate::verifier::{operations::ProdHasher, predicates::ProdPredicates, ProdVerifier},
core::time::Duration,
Expand All @@ -15,7 +15,7 @@ use crate::{
builder::error::Error,
components::{
clock::Clock,
io::{AtHeight, Io},
io::{AsyncIo, AtHeight},
scheduler::Scheduler,
},
light_client::LightClient,
Expand All @@ -42,7 +42,7 @@ pub struct HasTrustedState;
pub struct LightClientBuilder<State> {
peer_id: PeerId,
options: Options,
io: Box<dyn Io>,
io: Box<dyn AsyncIo>,
clock: Box<dyn Clock>,
hasher: Box<dyn Hasher>,
verifier: Box<dyn Verifier>,
Expand Down Expand Up @@ -86,7 +86,7 @@ impl LightClientBuilder<NoTrustedState> {
peer_id,
options,
light_store,
Box::new(ProdIo::new(peer_id, rpc_client, timeout)),
Box::new(RpcIo::new(peer_id, rpc_client, timeout)),
Box::new(ProdHasher),
Box::new(SystemClock),
Box::new(ProdVerifier::default()),
Expand All @@ -101,7 +101,7 @@ impl LightClientBuilder<NoTrustedState> {
peer_id: PeerId,
options: Options,
light_store: Box<dyn LightStore>,
io: Box<dyn Io>,
io: Box<dyn AsyncIo>,
hasher: Box<dyn Hasher>,
clock: Box<dyn Clock>,
verifier: Box<dyn Verifier>,
Expand Down Expand Up @@ -147,14 +147,15 @@ impl LightClientBuilder<NoTrustedState> {
}

/// Set the block from the primary peer at the given height as the trusted state.
pub fn trust_primary_at(
pub async fn trust_primary_at(
self,
trusted_height: Height,
trusted_hash: Hash,
) -> Result<LightClientBuilder<HasTrustedState>, Error> {
let trusted_state = self
.io
.fetch_light_block(AtHeight::At(trusted_height))
.await
.map_err(Error::io)?;

if trusted_state.height() != trusted_height {
Expand Down
116 changes: 58 additions & 58 deletions light-client/src/components/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,13 @@
use std::time::Duration;

use async_trait::async_trait;
use flex_error::{define_error, TraceError};
use tendermint_rpc as rpc;
#[cfg(feature = "rpc-client")]
use tendermint_rpc::Client;

use crate::verifier::types::{Height, LightBlock};

#[cfg(feature = "tokio")]
type TimeoutError = flex_error::DisplayOnly<tokio::time::error::Elapsed>;

#[cfg(not(feature = "tokio"))]
type TimeoutError = flex_error::NoSource;
use crate::{
utils::time::TimeError,
verifier::types::{Height, LightBlock},
};

/// Type for selecting either a specific height or the latest one
pub enum AtHeight {
Expand All @@ -37,7 +32,7 @@ define_error! {
#[derive(Debug)]
IoError {
Rpc
[ rpc::Error ]
[ tendermint_rpc::Error ]
| _ | { "rpc error" },

InvalidHeight
Expand All @@ -49,13 +44,9 @@ define_error! {
[ tendermint::Error ]
| _ | { "fetched validator set is invalid" },

Timeout
{ duration: Duration }
[ TimeoutError ]
| e | {
format_args!("task timed out after {} ms",
e.duration.as_millis())
},
Time
[ TimeError ]
| _ | { "time error" },

Runtime
[ TraceError<std::io::Error> ]
Expand All @@ -68,60 +59,67 @@ impl IoErrorDetail {
/// Whether this error means that a timeout occured when querying a node.
pub fn is_timeout(&self) -> Option<Duration> {
match self {
Self::Timeout(e) => Some(e.duration),
Self::Time(e) => e.source.is_timeout(),
_ => None,
}
}
}

/// Interface for fetching light blocks from a full node, typically via the RPC client.
pub trait Io: Send + Sync {
/// Fetch a light block at the given height from a peer
fn fetch_light_block(&self, height: AtHeight) -> Result<LightBlock, IoError>;
#[async_trait]
pub trait AsyncIo: Send + Sync {
async fn fetch_light_block(&self, height: AtHeight) -> Result<LightBlock, IoError>;
}

impl<F: Send + Sync> Io for F
#[async_trait]
impl<F, R> AsyncIo for F
where
F: Fn(AtHeight) -> Result<LightBlock, IoError>,
F: Fn(AtHeight) -> R + Send + Sync,
R: std::future::Future<Output = Result<LightBlock, IoError>> + Send,
{
fn fetch_light_block(&self, height: AtHeight) -> Result<LightBlock, IoError> {
self(height)
async fn fetch_light_block(&self, height: AtHeight) -> Result<LightBlock, IoError> {
self(height).await
}
}

#[cfg(feature = "rpc-client")]
pub use self::prod::ProdIo;
pub use self::rpc::RpcIo;

#[cfg(feature = "rpc-client")]
mod prod {
mod rpc {
use std::time::Duration;

use futures::future::FutureExt;
use tendermint::{
account::Id as TMAccountId, block::signed_header::SignedHeader as TMSignedHeader,
validator::Set as TMValidatorSet,
};
use tendermint_rpc::Paging;
use tendermint_rpc::{Client as _, Paging};

use super::*;
use crate::{utils::block_on, verifier::types::PeerId};
use crate::{utils::time::timeout, verifier::types::PeerId};

/// Production implementation of the Io component, which fetches
/// light blocks from full nodes via RPC.
/// Implementation of the Io component backed by an RPC client, which fetches
/// light blocks from full nodes.
#[derive(Clone, Debug)]
pub struct ProdIo {
pub struct RpcIo {
peer_id: PeerId,
rpc_client: rpc::HttpClient,
timeout: Option<Duration>,
rpc_client: tendermint_rpc::HttpClient,
timeout: Duration,
}

impl Io for ProdIo {
fn fetch_light_block(&self, height: AtHeight) -> Result<LightBlock, IoError> {
let signed_header = self.fetch_signed_header(height)?;
#[async_trait]
impl AsyncIo for RpcIo {
async fn fetch_light_block(&self, height: AtHeight) -> Result<LightBlock, IoError> {
let signed_header = self.fetch_signed_header(height).await?;
let height = signed_header.header.height;
let proposer_address = signed_header.header.proposer_address;

let validator_set = self.fetch_validator_set(height.into(), Some(proposer_address))?;
let next_validator_set = self.fetch_validator_set(height.increment().into(), None)?;
let validator_set = self
.fetch_validator_set(height.into(), Some(proposer_address))
.await?;
let next_validator_set = self
.fetch_validator_set(height.increment().into(), None)
.await?;

let light_block = LightBlock::new(
signed_header,
Expand All @@ -134,39 +132,41 @@ mod prod {
}
}

impl ProdIo {
/// Constructs a new ProdIo component.
impl RpcIo {
/// Constructs a new RpcIo component.
///
/// A peer map which maps peer IDS to their network address must be supplied.
pub fn new(
peer_id: PeerId,
rpc_client: rpc::HttpClient, /* TODO(thane): Generalize over client transport
* (instead of using HttpClient directly) */
rpc_client: tendermint_rpc::HttpClient, /* TODO(thane): Generalize over client
* transport
* (instead of using HttpClient directly) */
timeout: Option<Duration>,
) -> Self {
Self {
peer_id,
rpc_client,
timeout,
timeout: timeout.unwrap_or_else(|| Duration::from_secs(5)),
}
}

fn fetch_signed_header(&self, height: AtHeight) -> Result<TMSignedHeader, IoError> {
async fn fetch_signed_header(&self, height: AtHeight) -> Result<TMSignedHeader, IoError> {
let client = self.rpc_client.clone();
let res = block_on(self.timeout, async move {
match height {
AtHeight::Highest => client.latest_commit().await,
AtHeight::At(height) => client.commit(height).await,
}
})?;
let fetch_commit = match height {
AtHeight::Highest => client.latest_commit().fuse(),
AtHeight::At(height) => client.commit(height).fuse(),
};
let res = timeout(self.timeout, fetch_commit)
.await
.map_err(IoError::time)?;

match res {
Ok(response) => Ok(response.signed_header),
Err(err) => Err(IoError::rpc(err)),
}
}

fn fetch_validator_set(
async fn fetch_validator_set(
&self,
height: AtHeight,
proposer_address: Option<TMAccountId>,
Expand All @@ -179,10 +179,10 @@ mod prod {
};

let client = self.rpc_client.clone();
let response = block_on(self.timeout, async move {
client.validators(height, Paging::All).await
})?
.map_err(IoError::rpc)?;
let response = timeout(self.timeout, client.validators(height, Paging::All).fuse())
.await
.map_err(IoError::time)?
.map_err(IoError::rpc)?;

let validator_set = match proposer_address {
Some(proposer_address) => {
Expand Down
6 changes: 3 additions & 3 deletions light-client/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
use std::{fmt::Debug, time::Duration};

use crossbeam_channel as crossbeam;
use flex_error::{define_error, DisplayError, TraceError};
use flume;

// Re-export for backward compatibility
pub use crate::verifier::errors::ErrorExt;
Expand Down Expand Up @@ -148,11 +148,11 @@ impl ErrorExt for ErrorDetail {
}

impl Error {
pub fn send<T>(_e: crossbeam::SendError<T>) -> Error {
pub fn send<T>(_e: flume::SendError<T>) -> Error {
Error::channel_disconnected()
}

pub fn recv(_e: crossbeam::RecvError) -> Error {
pub fn recv(_e: flume::RecvError) -> Error {
Error::channel_disconnected()
}
}
Loading

0 comments on commit 7a8d681

Please sign in to comment.