Skip to content

Commit

Permalink
tower-abci: add support for unix domain sockets (#35)
Browse files Browse the repository at this point in the history
* feat: support unix domain sockets

* don't debug print option, only contents of option

* tower-abci(v037): refactor around `Server::run`

* tower-abci(v034): backport uds support

* tower-abci: remove redundant shadowing of stream/sink

* cargo: bump version to `0.10.0`

---------

Co-authored-by: Erwan <[email protected]>
  • Loading branch information
SuperFluffy and erwanor authored Sep 7, 2023
1 parent d30102c commit e897a33
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 55 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tower-abci"
version = "0.9.0"
version = "0.10.0"
authors = ["Henry de Valence <[email protected]>"]
edition = "2021"
license = "MIT"
Expand Down
34 changes: 19 additions & 15 deletions examples/kvstore_34/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@ use std::{
use bytes::Bytes;
use futures::future::FutureExt;
use structopt::StructOpt;
use tower::{Service, ServiceBuilder};

use tendermint::abci::{Event, EventAttributeIndexExt};

use tendermint::v0_34::abci::response;
use tendermint::v0_34::abci::{Request, Response};
use tendermint::v0_34::abci::{response, Request, Response};
use tower::{Service, ServiceBuilder};

use tower_abci::{
v034::{split, Server},
Expand Down Expand Up @@ -141,6 +138,10 @@ struct Opt {
/// Bind the TCP server to this port.
#[structopt(short, long, default_value = "26658")]
port: u16,

/// Bind the UDS server to this path
#[structopt(long)]
uds: Option<String>,
}

#[tokio::main]
Expand All @@ -157,7 +158,7 @@ async fn main() {
// Hand those components to the ABCI server, but customize request behavior
// for each category -- for instance, apply load-shedding only to mempool
// and info requests, but not to consensus requests.
let server = Server::builder()
let server_builder = Server::builder()
.consensus(consensus)
.snapshot(snapshot)
.mempool(
Expand All @@ -172,13 +173,16 @@ async fn main() {
.buffer(100)
.rate_limit(50, std::time::Duration::from_secs(1))
.service(info),
)
.finish()
.unwrap();

// Run the ABCI server.
server
.listen(format!("{}:{}", opt.host, opt.port))
.await
.unwrap();
);

let server = server_builder.finish().unwrap();

if let Some(uds_path) = opt.uds {
server.listen_unix(uds_path).await.unwrap();
} else {
server
.listen_tcp(format!("{}:{}", opt.host, opt.port))
.await
.unwrap();
}
}
27 changes: 17 additions & 10 deletions examples/kvstore_37/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ struct Opt {
/// Bind the TCP server to this port.
#[structopt(short, long, default_value = "26658")]
port: u16,

/// Bind the UDS server to this path
#[structopt(long)]
uds: Option<String>,
}

#[tokio::main]
Expand All @@ -161,7 +165,7 @@ async fn main() {
// Hand those components to the ABCI server, but customize request behavior
// for each category -- for instance, apply load-shedding only to mempool
// and info requests, but not to consensus requests.
let server = Server::builder()
let server_builder = Server::builder()
.consensus(consensus)
.snapshot(snapshot)
.mempool(
Expand All @@ -176,13 +180,16 @@ async fn main() {
.buffer(100)
.rate_limit(50, std::time::Duration::from_secs(1))
.service(info),
)
.finish()
.unwrap();

// Run the ABCI server.
server
.listen(format!("{}:{}", opt.host, opt.port))
.await
.unwrap();
);

let server = server_builder.finish().unwrap();

if let Some(uds_path) = opt.uds {
server.listen_unix(uds_path).await.unwrap();
} else {
server
.listen_tcp(format!("{}:{}", opt.host, opt.port))
.await
.unwrap();
}
}
59 changes: 45 additions & 14 deletions src/v034/server.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
use std::convert::{TryFrom, TryInto};
use std::path::Path;

use futures::future::{FutureExt, TryFutureExt};
use futures::sink::SinkExt;
use futures::stream::{FuturesOrdered, StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::{
net::{TcpListener, TcpStream, ToSocketAddrs},
net::{TcpListener, ToSocketAddrs, UnixListener},
select,
};
use tokio_util::codec::{FramedRead, FramedWrite};
use tower::{Service, ServiceExt};

use crate::BoxError;
use tendermint::abci::MethodKind;

use tendermint::v0_34::abci::{
ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
};

use crate::BoxError;
/// An ABCI server which listens for connections and forwards requests to four
/// component ABCI [`Service`]s.
pub struct Server<C, M, I, S> {
Expand Down Expand Up @@ -123,29 +126,54 @@ where
ServerBuilder::default()
}

pub async fn listen<A: ToSocketAddrs + std::fmt::Debug>(self, addr: A) -> Result<(), BoxError> {
tracing::info!(?addr, "starting ABCI server");
pub async fn listen_unix(self, path: impl AsRef<Path>) -> Result<(), BoxError> {
let listener = UnixListener::bind(path)?;
let addr = listener.local_addr()?;
tracing::info!(?addr, "ABCI server starting on uds");

loop {
match listener.accept().await {
Ok((socket, _addr)) => {
tracing::debug!(?_addr, "accepted new connection");
let conn = Connection {
consensus: self.consensus.clone(),
mempool: self.mempool.clone(),
info: self.info.clone(),
snapshot: self.snapshot.clone(),
};
let (read, write) = socket.into_split();
tokio::spawn(async move { conn.run(read, write).await.unwrap() });
}
Err(e) => {
tracing::error!({ %e }, "error accepting new connection");
}
}
}
}

pub async fn listen_tcp<A: ToSocketAddrs + std::fmt::Debug>(
self,
addr: A,
) -> Result<(), BoxError> {
let listener = TcpListener::bind(addr).await?;
let local_addr = listener.local_addr()?;
tracing::info!(?local_addr, "bound tcp listener");
let addr = listener.local_addr()?;
tracing::info!(?addr, "ABCI server starting on tcp socket");

loop {
match listener.accept().await {
Ok((socket, _addr)) => {
// set parent: None for the connection span, as it should
// exist independently of the listener's spans.
//let span = tracing::span!(parent: None, Level::ERROR, "abci", ?addr);
tracing::debug!(?_addr, "accepted new connection");
let conn = Connection {
consensus: self.consensus.clone(),
mempool: self.mempool.clone(),
info: self.info.clone(),
snapshot: self.snapshot.clone(),
};
//tokio::spawn(async move { conn.run(socket).await.unwrap() }.instrument(span));
tokio::spawn(async move { conn.run(socket).await.unwrap() });
let (read, write) = socket.into_split();
tokio::spawn(async move { conn.run(read, write).await.unwrap() });
}
Err(e) => {
tracing::warn!({ %e }, "error accepting new tcp connection");
tracing::error!({ %e }, "error accepting new connection");
}
}
}
Expand All @@ -172,14 +200,17 @@ where
{
// XXX handle errors gracefully
// figure out how / if to return errors to tendermint
async fn run(mut self, mut socket: TcpStream) -> Result<(), BoxError> {
async fn run(
mut self,
read: impl AsyncReadExt + std::marker::Unpin,
write: impl AsyncWriteExt + std::marker::Unpin,
) -> Result<(), BoxError> {
tracing::info!("listening for requests");

use tendermint_proto::v0_34::abci as pb;

let (mut request_stream, mut response_sink) = {
use crate::v034::codec::{Decode, Encode};
let (read, write) = socket.split();
(
FramedRead::new(read, Decode::<pb::Request>::default()),
FramedWrite::new(write, Encode::<pb::Response>::default()),
Expand Down
59 changes: 44 additions & 15 deletions src/v037/server.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::convert::{TryFrom, TryInto};
use std::path::Path;

use futures::future::{FutureExt, TryFutureExt};
use futures::sink::SinkExt;
use futures::stream::{FuturesOrdered, StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::{
net::{TcpListener, TcpStream, ToSocketAddrs},
net::{TcpListener, ToSocketAddrs, UnixListener},
select,
};
use tokio_util::codec::{FramedRead, FramedWrite};
use tower::{Service, ServiceExt};

use tendermint::abci::MethodKind;

use crate::BoxError;
use tendermint::abci::MethodKind;

use tendermint::v0_37::abci::{
ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
Expand Down Expand Up @@ -125,29 +126,54 @@ where
ServerBuilder::default()
}

pub async fn listen<A: ToSocketAddrs + std::fmt::Debug>(self, addr: A) -> Result<(), BoxError> {
tracing::info!(?addr, "starting ABCI server");
pub async fn listen_unix(self, path: impl AsRef<Path>) -> Result<(), BoxError> {
let listener = UnixListener::bind(path)?;
let addr = listener.local_addr()?;
tracing::info!(?addr, "ABCI server starting on uds");

loop {
match listener.accept().await {
Ok((socket, _addr)) => {
tracing::debug!(?_addr, "accepted new connection");
let conn = Connection {
consensus: self.consensus.clone(),
mempool: self.mempool.clone(),
info: self.info.clone(),
snapshot: self.snapshot.clone(),
};
let (read, write) = socket.into_split();
tokio::spawn(async move { conn.run(read, write).await.unwrap() });
}
Err(e) => {
tracing::error!({ %e }, "error accepting new connection");
}
}
}
}

pub async fn listen_tcp<A: ToSocketAddrs + std::fmt::Debug>(
self,
addr: A,
) -> Result<(), BoxError> {
let listener = TcpListener::bind(addr).await?;
let local_addr = listener.local_addr()?;
tracing::info!(?local_addr, "bound tcp listener");
let addr = listener.local_addr()?;
tracing::info!(?addr, "ABCI server starting on tcp socket");

loop {
match listener.accept().await {
Ok((socket, _addr)) => {
// set parent: None for the connection span, as it should
// exist independently of the listener's spans.
//let span = tracing::span!(parent: None, Level::ERROR, "abci", ?addr);
tracing::debug!(?_addr, "accepted new connection");
let conn = Connection {
consensus: self.consensus.clone(),
mempool: self.mempool.clone(),
info: self.info.clone(),
snapshot: self.snapshot.clone(),
};
//tokio::spawn(async move { conn.run(socket).await.unwrap() }.instrument(span));
tokio::spawn(async move { conn.run(socket).await.unwrap() });
let (read, write) = socket.into_split();
tokio::spawn(async move { conn.run(read, write).await.unwrap() });
}
Err(e) => {
tracing::warn!({ %e }, "error accepting new tcp connection");
tracing::error!({ %e }, "error accepting new connection");
}
}
}
Expand All @@ -174,14 +200,17 @@ where
{
// XXX handle errors gracefully
// figure out how / if to return errors to tendermint
async fn run(mut self, mut socket: TcpStream) -> Result<(), BoxError> {
async fn run(
mut self,
read: impl AsyncReadExt + std::marker::Unpin,
write: impl AsyncWriteExt + std::marker::Unpin,
) -> Result<(), BoxError> {
tracing::info!("listening for requests");

use tendermint_proto::v0_37::abci as pb;

let (mut request_stream, mut response_sink) = {
use crate::v037::codec::{Decode, Encode};
let (read, write) = socket.split();
(
FramedRead::new(read, Decode::<pb::Request>::default()),
FramedWrite::new(write, Encode::<pb::Response>::default()),
Expand Down

0 comments on commit e897a33

Please sign in to comment.