Skip to content

Commit

Permalink
pd: 🔨 rework RootCommand::start auto-https logic
Browse files Browse the repository at this point in the history
 ## 👀 overview

fixes #3627.

this reorganizes the logic in pd's startup code related to automatically
managed https functionality.

 ## 🎨 background & motivation

this PR, besides cleaning up the `rustls-acme`-related auto-https logic, is
also interested in *creating a state-of-affairs that will dovetail into
pr #3522*. in particular, this expression to start the GRPC serve given a
bound listener...

```rust
tokio::task::Builder::new()
    .name("grpc_server")
    .spawn(grpc_server.serve_with_incoming(tls_incoming))
    .expect("failed to spawn grpc server")
```

...should be adjusted so as to work with an `axum::Router`.

 ### ⚖️  `rustls-acme` and `tokio-rustls-acme`

quoth the #3627 description, citing an earlier comment:

> In the ~year since this code was written, there may be better options.
> `tokio-rustls-acme` seems promising
\- <#3522 (comment)>

for reference, the repositories for each live here, and here:
- <https://github.com/FlorianUekermann/rustls-acme>
- <https://github.com/n0-computer/tokio-rustls-acme>

after some comparison, i have come to the opinion that `rustls-acme` will still
be adequate for our purposes. the latter is a fork of the former, but active
development appears to have continued in the former, and i did not see any
particular "_must-have_" features for us in the latter.

 ## 🎴 changes

this commit moves some of the auto-https related code from the `main`
entrypoint, into standalone functions in `pd::main`.

some constants are defined, to keep control flow clear and to help
facilitate the addition of future options e.g. a flag to control the
LetsEncrypt environment to use.

 ## 🚰 dropping down to `axum`; a brief note on future upgrades

as stated above, we want to switch to an `axum::Router`. this means that
we won't be able to use the `AcmeConfig::incoming` function. the
`rustls-acme` library provides some "low-level" examples this work is
based upon

- <https://github.com/FlorianUekermann/rustls-acme/blob/main/examples/low_level_tokio.rs>
- <https://github.com/FlorianUekermann/rustls-acme/blob/main/examples/low_level_axum.rs>

we also use `tonic` 0.10.2 in pd, and elsewhere in the penumbra
monorepo. tonic isn't using hyper 1.x yet. this was being worked on in
hyperium/tonic#1583, continued on in hyperium/tonic#1595, and tracked in
hyperium/tonic#1579. that work also depends upon hyperium/hyper#3461.

we will need to be wait for tonic to finish its migration over to hyper
1.0, see:
hyperium/tonic#1579 (comment)

this is understandable, but i make note of this situation as a
signpost for our future selves when considering a migration to
recent versions of axum-server, axum, rustls-acme, or hyper.

for now, it's easiest to stay in lock-step with tonic, and we can revisit
the upgrade path(s) at a future date.

===

Refs: #3627
Refs: #3646
Refs: #3522
  • Loading branch information
cratelyn committed Jan 25, 2024
1 parent 54fd722 commit 52e89c6
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 93 deletions.
32 changes: 19 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion crates/bin/pd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,14 @@ console-subscriber = "0.2"
metrics-tracing-context = "0.11.0"
metrics-util = "0.13"
clap = { version = "3", features = ["derive", "env"] }
rustls-acme = "0.6"
atty = "0.2"
fs_extra = "1.3.0"

axum = { version = "0.6.20", features = ["tokio", "http2"] }
axum-server = { version = "0.4.7", features = ["tls-rustls"] }
rustls = "0.20.9"
rustls-acme = { version = "0.6.0", features = ["axum"] }

[dev-dependencies]
penumbra-proof-params = { path = "../../crypto/proof-params", features = [
"bundled-proving-keys",
Expand Down
128 changes: 84 additions & 44 deletions crates/bin/pd/src/auto_https.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,94 @@
use std::{
pin::Pin,
task::{Context, Poll},
};
//! Automatic HTTPS certificate management facilities.
//!
//! See [`axum_acceptor`] for more information.
use pin_project::pin_project;
use rustls_acme::futures_rustls::server::TlsStream;
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::TcpStream,
use {
anyhow::Error,
futures::Future,
rustls::ServerConfig,
rustls_acme::{axum::AxumAcceptor, caches::DirCache, AcmeConfig, AcmeState},
std::{fmt::Debug, path::PathBuf, sync::Arc},
};
use tokio_util::compat::Compat;
use tonic::transport::server::Connected;

/// Wrapper type needed to convert between futures_io and tokio traits
#[pin_project]
pub struct Wrapper {
#[pin]
pub inner: Compat<TlsStream<Compat<TcpStream>>>,
}

impl Connected for Wrapper {
type ConnectInfo = <TcpStream as Connected>::ConnectInfo;
/// Protocols supported by this server, in order of preference.
///
/// See [rfc7301] for more info on ALPN.
///
/// [rfc7301]: https://datatracker.ietf.org/doc/html/rfc7301
//
// We also permit HTTP1.1 for backwards-compatibility, specifically for grpc-web.
const ALPN_PROTOCOLS: [&'static [u8]; 2] = [b"h2", b"http/1.1"];

fn connect_info(&self) -> Self::ConnectInfo {
self.inner.get_ref().get_ref().0.get_ref().connect_info()
}
/// The location of the file-based certificate cache.
// NB: this must not be an absolute path see [Path::join].
const CACHE_DIR: &'static str = "tokio_rustls_acme_cache";

/// If true, use the production Let's Encrypt environment.
///
/// If false, the ACME resolver will use the [staging environment].
///
/// [staging environment]: https://letsencrypt.org/docs/staging-environment/
const PRODUCTION_LETS_ENCRYPT: bool = true;

/// Use ACME to resolve certificates and handle new connections.
///
/// This returns a tuple containing an [`AxumAcceptor`] that may be used with [`axum_server`], and
/// a [`Future`] that represents the background task to poll and log for changes in the
/// certificate environment.
pub fn axum_acceptor(
home: PathBuf,
domain: String,
) -> (AxumAcceptor, impl Future<Output = Result<(), Error>>) {
// Use a file-based cache located within the home directory.
let cache = home.join(CACHE_DIR);
let cache = DirCache::new(cache);

// Create an ACME client, which we will use to resolve certificates.
let state = AcmeConfig::new(vec![domain])
.cache(cache)
.directory_lets_encrypt(PRODUCTION_LETS_ENCRYPT)
.state();

// Define our server configuration, using the ACME certificate resolver.
let mut rustls_config = ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_cert_resolver(state.resolver());
rustls_config.alpn_protocols = self::alpn_protocols();
let rustls_config = Arc::new(rustls_config);

// Return our connection acceptor and our background worker task.
let acceptor = state.axum_acceptor(rustls_config.clone());
let worker = self::acme_worker(state);
(acceptor, worker)
}

impl AsyncRead for Wrapper {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
self.project().inner.poll_read(cx, buf)
/// This function defines the task responsible for handling ACME events.
///
/// This function will never return, unless an error is encountered.
#[tracing::instrument(level = "error", skip_all)]
async fn acme_worker<EC, EA>(mut state: AcmeState<EC, EA>) -> Result<(), anyhow::Error>
where
EC: Debug + 'static,
EA: Debug + 'static,
{
use futures::StreamExt;
loop {
match state.next().await {
Some(Ok(ok)) => tracing::debug!("received acme event: {:?}", ok),
Some(Err(err)) => tracing::error!("acme error: {:?}", err),
None => {
debug_assert!(false, "acme worker unexpectedly reached end-of-stream");
tracing::error!("acme worker unexpectedly reached end-of-stream");
anyhow::bail!("unexpected end-of-stream");
}
}
}
}

impl AsyncWrite for Wrapper {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
self.project().inner.poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.project().inner.poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.project().inner.poll_shutdown(cx)
}
/// Returns a vector of the protocols supported by this server.
///
/// This is a convenience method to retrieve an owned copy of [`ALPN_PROTOCOLS`].
fn alpn_protocols() -> Vec<Vec<u8>> {
ALPN_PROTOCOLS.into_iter().map(<[u8]>::to_vec).collect()
}
64 changes: 29 additions & 35 deletions crates/bin/pd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use metrics_util::layers::Stack;
use anyhow::Context;
use clap::{Parser, Subcommand};
use cnidarium::{StateDelta, Storage};
use futures::stream::TryStreamExt;
use ibc_proto::ibc::core::channel::v1::query_server::QueryServer as ChannelQueryServer;
use ibc_proto::ibc::core::client::v1::query_server::QueryServer as ClientQueryServer;
use ibc_proto::ibc::core::connection::v1::query_server::QueryServer as ConnectionQueryServer;
Expand All @@ -30,7 +29,7 @@ use penumbra_tower_trace::remote_addr;
use rand::Rng;
use rand_core::OsRng;
use tendermint_config::net::Address as TendermintAddress;
use tokio::{net::TcpListener, runtime};
use tokio::runtime;
use tonic::transport::Server;
use tower_http::cors::CorsLayer;
use tracing_subscriber::{prelude::*, EnvFilter};
Expand Down Expand Up @@ -478,39 +477,34 @@ async fn main() -> anyhow::Result<()> {
)));
}

let grpc_server = if let Some(domain) = grpc_auto_https {
use pd::auto_https::Wrapper;
use rustls_acme::{caches::DirCache, AcmeConfig};
use tokio_stream::wrappers::TcpListenerStream;
use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};

let mut acme_cache = pd_home.clone();
acme_cache.push("rustls_acme_cache");

let bound_listener = TcpListener::bind(grpc_bind)
.await
.context(format!("Failed to bind HTTPS listener on {}", grpc_bind))?;
let listener = TcpListenerStream::new(bound_listener);
// Configure HTTP2 support for the TLS negotiation; we also permit HTTP1.1
// for backwards-compatibility, specifically for grpc-web.
let alpn_config = vec!["h2".into(), "http/1.1".into()];
let tls_incoming = AcmeConfig::new([domain.as_str()])
.cache(DirCache::new(acme_cache))
.directory_lets_encrypt(true) // Use the production LE environment
.incoming(listener.map_ok(|conn| conn.compat()), alpn_config)
.map_ok(|incoming| Wrapper {
inner: incoming.compat(),
});

tokio::task::Builder::new()
.name("grpc_server")
.spawn(grpc_server.serve_with_incoming(tls_incoming))
.expect("failed to spawn grpc server")
} else {
tokio::task::Builder::new()
.name("grpc_server")
.spawn(grpc_server.serve(grpc_bind))
.expect("failed to spawn grpc server")
// Now we drop down a layer of abstraction, from tonic to axum.
//
// TODO(kate): this is where we may attach additional routes upon this router in the
// future. see #3646 for more information.
let router = grpc_server.into_router();
let make_svc = router.into_make_service();

// Now start the GRPC server, initializing an ACME client to use as a certificate
// resolver if auto-https has been enabled.
macro_rules! spawn_grpc_server {
($server:expr) => {
tokio::task::Builder::new()
.name("grpc_server")
.spawn($server.serve(make_svc))
.expect("failed to spawn grpc server")
};
}
let grpc_server = axum_server::bind(grpc_bind);
let grpc_server = match grpc_auto_https {
Some(domain) => {
let (acceptor, acme_worker) = pd::auto_https::axum_acceptor(pd_home, domain);
// TODO(kate): we should eventually propagate errors from the ACME worker task.
tokio::spawn(acme_worker);
spawn_grpc_server!(grpc_server.acceptor(acceptor))
}
None => {
spawn_grpc_server!(grpc_server)
}
};

// Configure a Prometheus recorder and exporter.
Expand Down

0 comments on commit 52e89c6

Please sign in to comment.