diff --git a/Cargo.lock b/Cargo.lock index 9117e35..96ded71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2102,6 +2102,8 @@ dependencies = [ "hyper", "hyper-util", "rustls", + "rustls-native-certs", + "rustls-platform-verifier", "tokio", "tokio-rustls", "tower-service", @@ -2318,6 +2320,32 @@ version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb" +[[package]] +name = "instant-acme" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f05ad37c421b962354c358d347d4a6130151df9407978372d3ad7f0c8f71a64" +dependencies = [ + "async-trait", + "aws-lc-rs", + "base64", + "bytes", + "http", + "http-body", + "http-body-util", + "httpdate", + "hyper", + "hyper-rustls", + "hyper-util", + "rcgen 0.14.8", + "rustls", + "rustls-pki-types", + "serde", + "serde_json", + "thiserror 2.0.18", + "tokio", +] + [[package]] name = "intrusive-collections" version = "0.9.7" @@ -5785,12 +5813,16 @@ dependencies = [ "async-trait", "axum", "eyre", + "instant-acme", + "rcgen 0.14.8", "rustls", "rustls-acme", + "time", "tokio", "tokio-stream", "tokio-util", "tracing", + "x509-parser 0.18.1", ] [[package]] @@ -5940,6 +5972,7 @@ dependencies = [ "bytes", "color-eyre", "crossfire", + "dashmap", "enum_dispatch", "eyre", "futures-util", diff --git a/crates/wind-acme/Cargo.toml b/crates/wind-acme/Cargo.toml index 20c5382..d1c57de 100644 --- a/crates/wind-acme/Cargo.toml +++ b/crates/wind-acme/Cargo.toml @@ -7,25 +7,39 @@ description = "ACME certificate management for wind" license = "MIT OR Apache-2.0" [features] -default = ["aws-lc-rs"] -ring = ["rustls/ring", "rustls-acme/ring"] -aws-lc-rs = ["rustls/aws-lc-rs", "rustls-acme/aws-lc-rs"] +default = ["aws-lc-rs", "resolver"] +ring = ["rustls/ring", "rustls-acme?/ring"] +aws-lc-rs = ["rustls/aws-lc-rs", "rustls-acme?/aws-lc-rs"] +# `rustls-acme`-backed resolver flow with background renewal (returns a rustls +# cert resolver). Used by the quinn backend. +resolver = ["dep:rustls-acme", "dep:tokio-stream", "dep:arc-swap"] +# One-shot HTTP-01 certificate provisioning + self-signed generation that writes +# the PEM chain/key to disk (for backends that load TLS material from files, e.g. +# quiche/tokio-quiche). Uses `instant-acme` rather than the `rustls-acme` state +# machine that backs the resolver-based flow. +http01 = ["dep:instant-acme", "dep:x509-parser", "dep:rcgen", "dep:time"] [dependencies] # TLS / ACME rustls = { version = "0.23", default-features = false } -rustls-acme = { git = "https://github.com/rust-proxy/rustls-acme", default-features = false, features = ["tower", "webpki-roots"] } +rustls-acme = { git = "https://github.com/rust-proxy/rustls-acme", default-features = false, features = ["tower", "webpki-roots"], optional = true } -# HTTP challenge server +# HTTP challenge server (used by both the resolver and http01 flows) axum = { version = "0.8", features = ["tokio"] } # Async tokio = { version = "1", default-features = false, features = ["macros", "net", "time", "rt", "fs", "sync"] } -tokio-stream = "0.1" +tokio-stream = { version = "0.1", optional = true } tokio-util = { version = "0.7", features = ["rt"] } async-trait = "0.1" # Utilities -arc-swap = "1" +arc-swap = { version = "1", optional = true } eyre = "0.6" tracing = "0.1" + +# One-shot HTTP-01 provisioning + self-signed generation (feature = "http01") +instant-acme = { version = "0.8", features = ["rcgen"], optional = true } +x509-parser = { version = "0.18.1", optional = true } +rcgen = { version = "0.14", features = ["aws_lc_rs", "pem"], optional = true } +time = { version = "0.3", optional = true } diff --git a/crates/wind-acme/src/http01.rs b/crates/wind-acme/src/http01.rs new file mode 100644 index 0000000..1eed10c --- /dev/null +++ b/crates/wind-acme/src/http01.rs @@ -0,0 +1,216 @@ +//! One-shot HTTP-01 certificate provisioning via `instant-acme`. +//! +//! Unlike the resolver-based [`start_acme`](crate::start_acme) flow (which +//! keeps a `rustls-acme` state machine running for background renewal), this +//! provisions or renews a certificate a single time and writes the PEM +//! certificate chain and private key to disk. Backends that load TLS material +//! from file paths (e.g. the quiche/tokio-quiche QUIC listeners) consume the +//! on-disk PEMs. + +use std::{collections::HashMap, path::Path, sync::Arc}; + +use axum::{ + Router, + extract::{Path as AxumPath, State}, + http::{HeaderValue, StatusCode, header}, + response::{IntoResponse, Response}, + routing::get, +}; +use eyre::{Context, Result}; +use instant_acme::{ + Account, AuthorizationStatus, ChallengeType, Identifier, LetsEncrypt, NewAccount, NewOrder, Order, OrderStatus, RetryPolicy, +}; +use tokio::{net::TcpListener, sync::RwLock}; +use tracing::{debug, info, instrument}; +use x509_parser::pem::parse_x509_pem; + +type ChallengeMap = Arc>>; + +async fn handle_challenge(State(challenges): State, AxumPath(token): AxumPath) -> Response { + let Some(key_auth) = challenges.read().await.get(&token).cloned() else { + return StatusCode::NOT_FOUND.into_response(); + }; + debug!(%token, "serving challenge"); + ( + StatusCode::OK, + [(header::CONTENT_TYPE, HeaderValue::from_static("application/octet-stream"))], + key_auth, + ) + .into_response() +} + +/// Completes all pending HTTP-01 challenges for an order using a single +/// short-lived axum server on port 80. +async fn complete_http01_challenges(order: &mut Order) -> Result<()> { + let challenges: ChallengeMap = Arc::new(RwLock::new(HashMap::new())); + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + + let listener = TcpListener::bind("0.0.0.0:80").await.context( + "Failed to bind port 80 for ACME challenge. Ensure port 80 is open and you are running as root (or use authbind).", + )?; + + let app = Router::new() + .route("/.well-known/acme-challenge/{token}", get(handle_challenge)) + .with_state(challenges.clone()); + + info!("HTTP-01 challenge server listening on :80"); + + let server_handle = tokio::spawn(async move { + axum::serve(listener, app) + .with_graceful_shutdown(async move { + let _ = shutdown_rx.await; + }) + .await + }); + + let mut authorizations = order.authorizations(); + while let Some(result) = authorizations.next().await { + let mut authz = result?; + if authz.status == AuthorizationStatus::Valid { + continue; + } + + let mut challenge = authz + .challenge(ChallengeType::Http01) + .ok_or_else(|| eyre::eyre!("No HTTP-01 challenge found"))?; + + let token = challenge.token.to_string(); + let key_auth = challenge.key_authorization().as_str().to_string(); + + // Register the response before telling Let's Encrypt the challenge is ready. + challenges.write().await.insert(token, key_auth); + challenge.set_ready().await?; + } + + info!("polling for order ready..."); + let status = order.poll_ready(&RetryPolicy::default()).await?; + + let _ = shutdown_tx.send(()); + let _ = server_handle.await; + + if status != OrderStatus::Ready { + eyre::bail!("ACME order invalid or failed: {:?}", status); + } + + Ok(()) +} + +fn cert_not_after(cert_pem: &[u8]) -> Result { + let (_, pem) = parse_x509_pem(cert_pem).map_err(|e| eyre::eyre!("parsing certificate PEM: {e}"))?; + let cert = pem.parse_x509().map_err(|e| eyre::eyre!("parsing certificate DER: {e}"))?; + Ok(cert.validity().not_after.to_datetime()) +} + +fn should_renew(not_after: time::OffsetDateTime, now: time::OffsetDateTime) -> bool { + const RENEW_BEFORE_DAYS: i64 = 30; + not_after <= now + time::Duration::days(RENEW_BEFORE_DAYS) +} + +/// Provision (or renew) an ACME certificate via HTTP-01, writing the PEM cert +/// chain and private key to disk. If a fresh certificate already exists on disk +/// (more than 30 days from expiry) this is a no-op. +#[instrument(name = "acme", skip_all, fields(hostname = %hostname))] +pub async fn ensure_acme_cert( + hostname: &str, + email: Option<&str>, + cert_path: &Path, + key_path: &Path, + staging: bool, +) -> Result<()> { + if cert_path.exists() && key_path.exists() { + let cert_pem = tokio::fs::read(cert_path).await.context("read cert file")?; + let not_after = cert_not_after(&cert_pem)?; + let now = time::OffsetDateTime::now_utc(); + let days_left = (not_after - now).whole_days(); + + if !should_renew(not_after, now) { + info!(days_left, not_after = %not_after, "cert fresh, skipping renewal"); + return Ok(()); + } + info!(days_left, not_after = %not_after, "cert expiring soon or expired, renewing"); + } else { + info!("no cert found, provisioning"); + } + + let contact: Vec = email.into_iter().map(|e| format!("mailto:{e}")).collect(); + + let directory_url = if staging { + info!("using Let's Encrypt STAGING directory"); + LetsEncrypt::Staging.url().to_owned() + } else { + LetsEncrypt::Production.url().to_owned() + }; + + let (account, _credentials) = Account::builder()? + .create( + &NewAccount { + contact: &contact.iter().map(String::as_str).collect::>(), + terms_of_service_agreed: true, + only_return_existing: false, + }, + directory_url, + None, + ) + .await + .context("Failed to create ACME account")?; + + let identifiers = vec![Identifier::Dns(hostname.to_string())]; + let mut order = account + .new_order(&NewOrder::new(&identifiers)) + .await + .context("Failed to create ACME order")?; + + let state = order.state(); + if !matches!(state.status, OrderStatus::Pending | OrderStatus::Ready) { + eyre::bail!("Unexpected order state: {:?}", state.status); + } + + if matches!(state.status, OrderStatus::Pending) { + complete_http01_challenges(&mut order).await?; + } + + info!("finalizing order..."); + let private_key_pem = order.finalize().await?; + // `poll_certificate` returns the PEM-encoded certificate chain (leaf + + // intermediates), which is required by quiche/tokio-quiche for H3. + let cert_chain_pem = order.poll_certificate(&RetryPolicy::default()).await?; + + if let Some(parent) = cert_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + if let Some(parent) = key_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + + tokio::fs::write(cert_path, cert_chain_pem).await?; + tokio::fs::write(key_path, private_key_pem).await?; + + info!("cert issued and saved"); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn renews_when_certificate_expires_within_thirty_days() { + let now = time::OffsetDateTime::now_utc(); + assert!(should_renew(now + time::Duration::days(10), now)); + assert!(should_renew(now - time::Duration::days(1), now)); + assert!(!should_renew(now + time::Duration::days(45), now)); + } + + #[test] + fn parses_certificate_not_after_from_pem() { + let not_after = time::OffsetDateTime::now_utc() + time::Duration::days(42); + let mut params = rcgen::CertificateParams::new(vec!["example.com".to_string()]).unwrap(); + params.not_after = not_after; + let key_pair = rcgen::KeyPair::generate().unwrap(); + let cert = params.self_signed(&key_pair).unwrap(); + + let parsed = cert_not_after(cert.pem().as_bytes()).unwrap(); + assert_eq!(parsed.unix_timestamp(), not_after.unix_timestamp()); + } +} diff --git a/crates/wind-acme/src/lib.rs b/crates/wind-acme/src/lib.rs index 5256518..d08f7ce 100644 --- a/crates/wind-acme/src/lib.rs +++ b/crates/wind-acme/src/lib.rs @@ -1,71 +1,28 @@ -//! ACME automatic certificate management for the wind proxy framework. +//! ACME certificate management for the wind proxy framework. //! -//! Uses `rustls-acme` to provision and renew TLS certificates from Let's -//! Encrypt via the HTTP-01 challenge. The HTTP-01 challenge server on port 80 -//! is only started when a certificate needs to be issued or renewed, and is -//! shut down once the new certificate has been deployed. - -use std::{ - path::{Path, PathBuf}, - sync::Arc, -}; - -use arc_swap::ArcSwapOption; -use async_trait::async_trait; -use axum::Router; -use eyre::{Context, Result}; -use rustls::server::ResolvesServerCert; -use rustls_acme::{AccountCache, AcmeConfig, CertCache, UseChallenge::Http01, caches::DirCache}; -use tokio::sync::watch; -use tokio_stream::StreamExt; -use tokio_util::sync::CancellationToken; -use tracing::{error, info, warn}; - -/// The PEM blob rustls-acme persists for a certificate: the PKCS#8 private key -/// followed by the certificate chain, all PEM-encoded. Published by -/// [`start_acme_with_cert`] so non-rustls consumers (e.g. the tokio-quiche -/// backend, which needs on-disk cert/key files) can materialise it. -pub type CertPem = Arc>; - -/// A [`rustls_acme`] cache that wraps [`DirCache`] and, in addition to the -/// normal disk persistence, publishes every loaded/stored certificate PEM blob -/// to a [`watch`] channel. -struct CapturingCache { - inner: DirCache, - tx: watch::Sender>, -} - -#[async_trait] -impl CertCache for CapturingCache { - type EC = std::io::Error; - - async fn load_cert(&self, domains: &[String], directory_url: &str) -> Result>, Self::EC> { - let cert = self.inner.load_cert(domains, directory_url).await?; - if let Some(pem) = &cert { - let _ = self.tx.send(Some(Arc::new(pem.clone()))); - } - Ok(cert) - } - - async fn store_cert(&self, domains: &[String], directory_url: &str, cert: &[u8]) -> Result<(), Self::EC> { - // Publish before persisting so a waiter is unblocked as early as possible. - let _ = self.tx.send(Some(Arc::new(cert.to_vec()))); - self.inner.store_cert(domains, directory_url, cert).await - } -} - -#[async_trait] -impl AccountCache for CapturingCache { - type EA = std::io::Error; +//! Two strategies are provided behind features: +//! +//! - `resolver` (default): a [`rustls_acme`]-backed background renewal state +//! machine returning a `rustls` cert resolver — see [`resolver`]. Used by the +//! rustls-based (quinn) backend. +//! - `http01`: one-shot HTTP-01 provisioning that writes the PEM chain/key to +//! disk ([`http01`]) plus self-signed generation ([`selfsigned`]). Used by +//! backends that load TLS material from files (quiche/tokio-quiche). +//! +//! [`is_valid_domain`] is always available. - async fn load_account(&self, contact: &[String], directory_url: &str) -> Result>, Self::EA> { - self.inner.load_account(contact, directory_url).await - } +/// One-shot HTTP-01 provisioning that writes PEM cert/key files to disk. +#[cfg(feature = "http01")] +pub mod http01; +/// `rustls-acme`-backed resolver flow with background renewal. +#[cfg(feature = "resolver")] +pub mod resolver; +/// Self-signed certificate generation to disk. +#[cfg(feature = "http01")] +pub mod selfsigned; - async fn store_account(&self, contact: &[String], directory_url: &str, account: &[u8]) -> Result<(), Self::EA> { - self.inner.store_account(contact, directory_url, account).await - } -} +#[cfg(feature = "resolver")] +pub use resolver::{CertPem, start_acme, start_acme_with_cert}; /// Check if a domain name is valid for ACME certificate issuance. pub fn is_valid_domain(hostname: &str) -> bool { @@ -84,177 +41,6 @@ pub fn is_valid_domain(hostname: &str) -> bool { && !hostname.ends_with('.') } -/// Start automatic ACME certificate management. -/// -/// Uses `rustls-acme` to automatically provision and renew certificates from -/// Let's Encrypt via HTTP-01 challenges. An HTTP challenge server on port 80 is -/// started **only** when the ACME state machine actually needs to answer a -/// challenge, and is shut down once the new certificate has been deployed. -/// -/// Returns a certificate resolver that can be used with a -/// `rustls::ServerConfig`. -/// -/// This is the resolver-only entry point used by the rustls-based (quinn) -/// backend. Backends that need the certificate as on-disk files (e.g. the -/// tokio-quiche backend) should use [`start_acme_with_cert`] instead. -pub async fn start_acme( - cancel: CancellationToken, - hostname: &str, - acme_email: &str, - cache_dir: &Path, - production: bool, -) -> Result> { - let (resolver, _cert_rx) = start_acme_with_cert(cancel, hostname, acme_email, cache_dir, production).await?; - Ok(resolver) -} - -/// Like [`start_acme`], but additionally returns a [`watch::Receiver`] that is -/// updated with the certificate PEM blob (PKCS#8 private key + certificate -/// chain) whenever a certificate is loaded from cache or freshly -/// issued/renewed. -/// -/// The initial value is `None`; it becomes `Some` once a cached certificate is -/// found or the first issuance completes. Consumers that need cert/key files -/// (the tokio-quiche backend) can wait on this and materialise the blob to -/// disk. -pub async fn start_acme_with_cert( - cancel: CancellationToken, - hostname: &str, - acme_email: &str, - cache_dir: &Path, - production: bool, -) -> Result<(Arc, watch::Receiver>)> { - if !is_valid_domain(hostname) { - return Err(eyre::eyre!("Invalid domain name: {hostname}")); - } - - let contact = if !acme_email.is_empty() { - format!("mailto:{acme_email}") - } else { - format!("mailto:admin@{hostname}") - }; - - info!("Starting ACME certificate management for domain: {hostname}"); - - tokio::fs::create_dir_all(cache_dir) - .await - .context("Failed to create ACME cache directory")?; - - let (cert_tx, cert_rx) = watch::channel(None); - let cache = CapturingCache { - inner: DirCache::new(cache_dir.to_path_buf()), - tx: cert_tx, - }; - - let mut state = AcmeConfig::new(vec![hostname.to_string()]) - .contact(vec![contact]) - .cache(cache) - .directory_lets_encrypt(production) - .challenge_type(Http01) - .state(); - - let default_config = state.default_rustls_config(); - let resolver = default_config.cert_resolver.clone(); - - let axum_cancel: ArcSwapOption = None.into(); - let hostname = hostname.to_string(); - - // Drive the ACME state machine in background. Previously this task was - // fire-and-forget: a stream that ended (the rustls-acme state machine - // returning None) or an error storm would simply spin down with no signal - // to the rest of the server, leaving TLS pinned to whatever cached cert - // was last loaded until process restart. We now: - // * propagate `cancel` so shutdown is graceful, - // * track consecutive errors and escalate to error! when they cluster, - // * log a loud message when the loop exits so operators see it. - tokio::spawn(async move { - let mut consecutive_errors: u32 = 0; - let mut total_errors: u64 = 0; - loop { - tokio::select! { - biased; - _ = cancel.cancelled() => { - info!("ACME: cancellation requested for domain {hostname}, shutting down state machine"); - axum_cancel.swap(None).inspect(|v| v.cancel()); - break; - } - event = state.next() => match event { - Some(Ok(event)) => { - consecutive_errors = 0; - match event { - // Requesting certificate for the first time or renewing - rustls_acme::EventOk::AccountCacheStore => { - info!("ACME event: AccountCacheStore"); - } - rustls_acme::EventOk::ValidationChallenge(challenge) => { - info!("ACME event: ValidationChallenge for {}", challenge.url); - let child = Arc::new(cancel.child_token()); - axum_cancel.swap(Some(child.clone())).inspect(|v| v.cancel()); - let http01_service = state.http01_challenge_tower_service(); - let axum_app = - Router::new().route_service("/.well-known/acme-challenge/{challenge_token}", http01_service); - if let Err(e) = spawn_axum(child.child_token(), axum_app).await { - error!("Failed to start ACME HTTP-01 challenge server: {:?}", e); - } - } - rustls_acme::EventOk::DeployedNewCert(_) => { - info!("ACME event: DeployedNewCert"); - axum_cancel.swap(None).inspect(|v| v.cancel()); - } - rustls_acme::EventOk::DeployedCachedCert(_) => { - info!("ACME event: DeployedCachedCert"); - } - _ => info!("ACME event: {:?}", event), - } - } - Some(Err(e)) => { - consecutive_errors = consecutive_errors.saturating_add(1); - total_errors = total_errors.saturating_add(1); - if consecutive_errors >= 3 { - error!( - "ACME error (#{consecutive_errors} in a row, {total_errors} total) for {hostname}: {e:?} \ - — cached certificate (if any) will continue to be served" - ); - } else { - warn!("ACME error for {hostname}: {e:?}"); - } - } - None => { - error!( - "ACME state machine stream ended for {hostname} ({total_errors} errors total). \ - No further renewals will be attempted until the process is restarted." - ); - axum_cancel.swap(None).inspect(|v| v.cancel()); - break; - } - } - } - } - info!("ACME background task for {hostname} exited"); - }); - - Ok((resolver, cert_rx)) -} - -async fn spawn_axum(cancel: CancellationToken, router: Router) -> eyre::Result<()> { - let listener = tokio::net::TcpListener::bind("[::]:80") - .await - .context("Failed to bind to port 80 for ACME HTTP-01 challenges")?; - info!("Started ACME HTTP-01 challenge server on port 80"); - tokio::spawn(async move { - tokio::select! { - Err(e) = axum::serve(listener, router) => { - error!("ACME HTTP-01 challenge server error: {:?}", e); - } - _ = cancel.cancelled() => { - info!("ACME HTTP-01 challenge server cancellation requested"); - } - } - info!("ACME certificate deployed, shutting down HTTP-01 challenge server"); - }); - Ok(()) -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/wind-acme/src/resolver.rs b/crates/wind-acme/src/resolver.rs new file mode 100644 index 0000000..476c057 --- /dev/null +++ b/crates/wind-acme/src/resolver.rs @@ -0,0 +1,239 @@ +//! Resolver-based ACME via `rustls-acme` (background renewal state machine). +//! +//! This is the entry point used by the rustls-based (quinn) backend. Backends +//! that need the certificate as on-disk files should use [`crate::http01`]. + +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; + +use arc_swap::ArcSwapOption; +use async_trait::async_trait; +use axum::Router; +use eyre::{Context, Result}; +use rustls::server::ResolvesServerCert; +use rustls_acme::{AccountCache, AcmeConfig, CertCache, UseChallenge::Http01, caches::DirCache}; +use tokio::sync::watch; +use tokio_stream::StreamExt; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, warn}; + +use crate::is_valid_domain; + +/// The PEM blob rustls-acme persists for a certificate: the PKCS#8 private key +/// followed by the certificate chain, all PEM-encoded. Published by +/// [`start_acme_with_cert`] so non-rustls consumers (e.g. the tokio-quiche +/// backend, which needs on-disk cert/key files) can materialise it. +pub type CertPem = Arc>; + +/// A [`rustls_acme`] cache that wraps [`DirCache`] and, in addition to the +/// normal disk persistence, publishes every loaded/stored certificate PEM blob +/// to a [`watch`] channel. +struct CapturingCache { + inner: DirCache, + tx: watch::Sender>, +} + +#[async_trait] +impl CertCache for CapturingCache { + type EC = std::io::Error; + + async fn load_cert(&self, domains: &[String], directory_url: &str) -> Result>, Self::EC> { + let cert = self.inner.load_cert(domains, directory_url).await?; + if let Some(pem) = &cert { + let _ = self.tx.send(Some(Arc::new(pem.clone()))); + } + Ok(cert) + } + + async fn store_cert(&self, domains: &[String], directory_url: &str, cert: &[u8]) -> Result<(), Self::EC> { + // Publish before persisting so a waiter is unblocked as early as possible. + let _ = self.tx.send(Some(Arc::new(cert.to_vec()))); + self.inner.store_cert(domains, directory_url, cert).await + } +} + +#[async_trait] +impl AccountCache for CapturingCache { + type EA = std::io::Error; + + async fn load_account(&self, contact: &[String], directory_url: &str) -> Result>, Self::EA> { + self.inner.load_account(contact, directory_url).await + } + + async fn store_account(&self, contact: &[String], directory_url: &str, account: &[u8]) -> Result<(), Self::EA> { + self.inner.store_account(contact, directory_url, account).await + } +} + +/// Start automatic ACME certificate management. +/// +/// Uses `rustls-acme` to automatically provision and renew certificates from +/// Let's Encrypt via HTTP-01 challenges. An HTTP challenge server on port 80 is +/// started **only** when the ACME state machine actually needs to answer a +/// challenge, and is shut down once the new certificate has been deployed. +/// +/// Returns a certificate resolver that can be used with a +/// `rustls::ServerConfig`. +/// +/// This is the resolver-only entry point used by the rustls-based (quinn) +/// backend. Backends that need the certificate as on-disk files (e.g. the +/// tokio-quiche backend) should use [`start_acme_with_cert`] instead. +pub async fn start_acme( + cancel: CancellationToken, + hostname: &str, + acme_email: &str, + cache_dir: &Path, + production: bool, +) -> Result> { + let (resolver, _cert_rx) = start_acme_with_cert(cancel, hostname, acme_email, cache_dir, production).await?; + Ok(resolver) +} + +/// Like [`start_acme`], but additionally returns a [`watch::Receiver`] that is +/// updated with the certificate PEM blob (PKCS#8 private key + certificate +/// chain) whenever a certificate is loaded from cache or freshly +/// issued/renewed. +/// +/// The initial value is `None`; it becomes `Some` once a cached certificate is +/// found or the first issuance completes. Consumers that need cert/key files +/// (the tokio-quiche backend) can wait on this and materialise the blob to +/// disk. +pub async fn start_acme_with_cert( + cancel: CancellationToken, + hostname: &str, + acme_email: &str, + cache_dir: &Path, + production: bool, +) -> Result<(Arc, watch::Receiver>)> { + if !is_valid_domain(hostname) { + return Err(eyre::eyre!("Invalid domain name: {hostname}")); + } + + let contact = if !acme_email.is_empty() { + format!("mailto:{acme_email}") + } else { + format!("mailto:admin@{hostname}") + }; + + info!("Starting ACME certificate management for domain: {hostname}"); + + tokio::fs::create_dir_all(cache_dir) + .await + .context("Failed to create ACME cache directory")?; + + let (cert_tx, cert_rx) = watch::channel(None); + let cache = CapturingCache { + inner: DirCache::new(cache_dir.to_path_buf()), + tx: cert_tx, + }; + + let mut state = AcmeConfig::new(vec![hostname.to_string()]) + .contact(vec![contact]) + .cache(cache) + .directory_lets_encrypt(production) + .challenge_type(Http01) + .state(); + + let default_config = state.default_rustls_config(); + let resolver = default_config.cert_resolver.clone(); + + let axum_cancel: ArcSwapOption = None.into(); + let hostname = hostname.to_string(); + + // Drive the ACME state machine in background. Previously this task was + // fire-and-forget: a stream that ended (the rustls-acme state machine + // returning None) or an error storm would simply spin down with no signal + // to the rest of the server, leaving TLS pinned to whatever cached cert + // was last loaded until process restart. We now: + // * propagate `cancel` so shutdown is graceful, + // * track consecutive errors and escalate to error! when they cluster, + // * log a loud message when the loop exits so operators see it. + tokio::spawn(async move { + let mut consecutive_errors: u32 = 0; + let mut total_errors: u64 = 0; + loop { + tokio::select! { + biased; + _ = cancel.cancelled() => { + info!("ACME: cancellation requested for domain {hostname}, shutting down state machine"); + axum_cancel.swap(None).inspect(|v| v.cancel()); + break; + } + event = state.next() => match event { + Some(Ok(event)) => { + consecutive_errors = 0; + match event { + // Requesting certificate for the first time or renewing + rustls_acme::EventOk::AccountCacheStore => { + info!("ACME event: AccountCacheStore"); + } + rustls_acme::EventOk::ValidationChallenge(challenge) => { + info!("ACME event: ValidationChallenge for {}", challenge.url); + let child = Arc::new(cancel.child_token()); + axum_cancel.swap(Some(child.clone())).inspect(|v| v.cancel()); + let http01_service = state.http01_challenge_tower_service(); + let axum_app = + Router::new().route_service("/.well-known/acme-challenge/{challenge_token}", http01_service); + if let Err(e) = spawn_axum(child.child_token(), axum_app).await { + error!("Failed to start ACME HTTP-01 challenge server: {:?}", e); + } + } + rustls_acme::EventOk::DeployedNewCert(_) => { + info!("ACME event: DeployedNewCert"); + axum_cancel.swap(None).inspect(|v| v.cancel()); + } + rustls_acme::EventOk::DeployedCachedCert(_) => { + info!("ACME event: DeployedCachedCert"); + } + _ => info!("ACME event: {:?}", event), + } + } + Some(Err(e)) => { + consecutive_errors = consecutive_errors.saturating_add(1); + total_errors = total_errors.saturating_add(1); + if consecutive_errors >= 3 { + error!( + "ACME error (#{consecutive_errors} in a row, {total_errors} total) for {hostname}: {e:?} \ + — cached certificate (if any) will continue to be served" + ); + } else { + warn!("ACME error for {hostname}: {e:?}"); + } + } + None => { + error!( + "ACME state machine stream ended for {hostname} ({total_errors} errors total). \ + No further renewals will be attempted until the process is restarted." + ); + axum_cancel.swap(None).inspect(|v| v.cancel()); + break; + } + } + } + } + info!("ACME background task for {hostname} exited"); + }); + + Ok((resolver, cert_rx)) +} + +async fn spawn_axum(cancel: CancellationToken, router: Router) -> eyre::Result<()> { + let listener = tokio::net::TcpListener::bind("[::]:80") + .await + .context("Failed to bind to port 80 for ACME HTTP-01 challenges")?; + info!("Started ACME HTTP-01 challenge server on port 80"); + tokio::spawn(async move { + tokio::select! { + Err(e) = axum::serve(listener, router) => { + error!("ACME HTTP-01 challenge server error: {:?}", e); + } + _ = cancel.cancelled() => { + info!("ACME HTTP-01 challenge server cancellation requested"); + } + } + info!("ACME certificate deployed, shutting down HTTP-01 challenge server"); + }); + Ok(()) +} diff --git a/crates/wind-acme/src/selfsigned.rs b/crates/wind-acme/src/selfsigned.rs new file mode 100644 index 0000000..9bc9053 --- /dev/null +++ b/crates/wind-acme/src/selfsigned.rs @@ -0,0 +1,47 @@ +//! Self-signed certificate generation for backends that load TLS material from +//! file paths (e.g. the QUIC listeners) and for local development. + +use std::path::Path; + +use eyre::Result; +use tokio::fs; + +/// Generate a self-signed certificate and write it to disk if the files don't +/// already exist. The QUIC listener loads TLS material from file paths, so both +/// backends consume the same on-disk PEMs. +pub async fn ensure_self_signed_cert_files(hostname: &str, cert_path: &Path, key_path: &Path) -> Result<()> { + if cert_path.exists() && key_path.exists() { + return Ok(()); + } + let (cert, key_pair) = generate_short_lived_self_signed(hostname)?; + let cert_pem = cert.pem(); + let key_pem = key_pair.serialize_pem(); + + if let Some(parent) = cert_path.parent() { + fs::create_dir_all(parent).await?; + } + if let Some(parent) = key_path.parent() { + fs::create_dir_all(parent).await?; + } + fs::write(cert_path, cert_pem.as_bytes()).await?; + fs::write(key_path, key_pem.as_bytes()).await?; + Ok(()) +} + +/// Generate a self-signed leaf certificate (ECDSA P-256, ~45-day validity). +/// +/// Uses `is_ca = false` so the leaf is a plain end-entity TLS server cert, and +/// a short validity period (≤398 days) to satisfy Chromium's certificate +/// validation requirements. +fn generate_short_lived_self_signed(hostname: &str) -> Result<(rcgen::Certificate, rcgen::KeyPair)> { + let mut params = rcgen::CertificateParams::new(vec![hostname.to_owned()]) + .map_err(|e| eyre::eyre!("creating certificate params: {e}"))?; + params.is_ca = rcgen::IsCa::NoCa; + params.not_after = time::OffsetDateTime::now_utc() + time::Duration::days(45); + let key_pair = + rcgen::KeyPair::generate_for(&rcgen::PKCS_ECDSA_P256_SHA256).map_err(|e| eyre::eyre!("generating key pair: {e}"))?; + let cert = params + .self_signed(&key_pair) + .map_err(|e| eyre::eyre!("generating self-signed certificate: {e}"))?; + Ok((cert, key_pair)) +} diff --git a/crates/wind-core/src/active.rs b/crates/wind-core/src/active.rs new file mode 100644 index 0000000..5c28cb0 --- /dev/null +++ b/crates/wind-core/src/active.rs @@ -0,0 +1,98 @@ +//! Registry of live connections, keyed by a process-unique connection id, used +//! for per-user connection limiting and active kicking. +//! +//! Shared (as a cheap `Arc` handle) between an inbound — which registers each +//! connection with a [`CancellationToken`] and deregisters on close — and a +//! host binary's hooks, which read the per-user count (for limits) and cancel a +//! user's connections (for kicks, e.g. when a panel removes a user). Used by +//! both the naive (per-CONNECT-tunnel) and TUIC (per-authenticated-connection) +//! inbounds. + +use std::sync::Arc; + +use dashmap::DashMap; +use tokio_util::sync::CancellationToken; + +use crate::UserId; + +#[derive(Clone, Default)] +pub struct ActiveConnections { + inner: Arc>, +} + +impl ActiveConnections { + pub fn new() -> Self { + Self::default() + } + + /// Register a live connection. Cancelling `token` closes the connection. + pub fn register(&self, conn_id: u64, user: UserId, token: CancellationToken) { + self.inner.insert(conn_id, (user, token)); + } + + /// Remove a connection from the registry (call on connection close). + pub fn deregister(&self, conn_id: u64) { + self.inner.remove(&conn_id); + } + + /// Number of live connections currently attributed to `user`. + pub fn count_for(&self, user: &UserId) -> usize { + self.inner.iter().filter(|e| &e.value().0 == user).count() + } + + /// Cancel every live connection belonging to `user`. Returns how many were + /// kicked. + pub fn kick_user(&self, user: &UserId) -> usize { + let mut kicked = 0; + for entry in self.inner.iter() { + if &entry.value().0 == user { + entry.value().1.cancel(); + kicked += 1; + } + } + kicked + } + + /// Total live connections (diagnostics). + pub fn len(&self) -> usize { + self.inner.len() + } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn count_and_kick() { + let active = ActiveConnections::new(); + let u1 = UserId::from("u1"); + let u2 = UserId::from("u2"); + let t1 = CancellationToken::new(); + let t2 = CancellationToken::new(); + let t3 = CancellationToken::new(); + + active.register(1, u1.clone(), t1.clone()); + active.register(2, u1.clone(), t2.clone()); + active.register(3, u2.clone(), t3.clone()); + + assert_eq!(active.count_for(&u1), 2); + assert_eq!(active.count_for(&u2), 1); + + // Kicking u1 cancels both of its tokens, not u2's. + assert_eq!(active.kick_user(&u1), 2); + assert!(t1.is_cancelled()); + assert!(t2.is_cancelled()); + assert!(!t3.is_cancelled()); + + // Deregister mirrors a connection closing. + active.deregister(1); + active.deregister(2); + assert_eq!(active.count_for(&u1), 0); + assert_eq!(active.count_for(&u2), 1); + } +} diff --git a/crates/wind-core/src/lib.rs b/crates/wind-core/src/lib.rs index f17bbc2..7dcebaa 100644 --- a/crates/wind-core/src/lib.rs +++ b/crates/wind-core/src/lib.rs @@ -1,3 +1,4 @@ +pub mod active; pub mod app; pub mod dispatcher; pub mod hooks; @@ -9,6 +10,7 @@ pub mod resolve; pub mod rule; pub mod types; +pub use active::ActiveConnections; pub use app::{App, Plugin}; pub use dispatcher::{AclRouter, Dispatcher, OutboundAction, RouteAction, Router}; pub use hooks::{ diff --git a/crates/wind-tuic/src/active.rs b/crates/wind-tuic/src/active.rs index 39a4ff1..5b49f87 100644 --- a/crates/wind-tuic/src/active.rs +++ b/crates/wind-tuic/src/active.rs @@ -1,98 +1,7 @@ -//! Registry of live authenticated TUIC connections, keyed by a process-unique -//! connection id, used for per-user connection limiting and active kicking. -//! -//! Shared (as a cheap `Arc` handle) between the inbound — which registers each -//! connection with its [`CancellationToken`] once authenticated and deregisters -//! on close — and the host binary's hooks, which read the per-user count (for -//! limits) and cancel a user's connections (for kicks, e.g. when the panel -//! removes a user). A TUIC connection is exactly one authenticated user, so the -//! per-connection cancel token is a precise kick handle: cancelling it trips -//! the `serve_connection` shutdown path, which closes the QUIC connection. +//! Registry of live authenticated TUIC connections — re-exported from +//! [`wind_core`], shared with the naive inbound. A TUIC connection is exactly +//! one authenticated user, so the per-connection cancel token is a precise kick +//! handle: cancelling it trips the `serve_connection` shutdown path, which +//! closes the QUIC connection. -use std::sync::Arc; - -use dashmap::DashMap; -use tokio_util::sync::CancellationToken; -use wind_core::UserId; - -#[derive(Clone, Default)] -pub struct ActiveConnections { - inner: Arc>, -} - -impl ActiveConnections { - pub fn new() -> Self { - Self::default() - } - - /// Register a live connection. Cancelling `token` closes the connection. - pub fn register(&self, conn_id: u64, user: UserId, token: CancellationToken) { - self.inner.insert(conn_id, (user, token)); - } - - /// Remove a connection from the registry (call on connection close). - pub fn deregister(&self, conn_id: u64) { - self.inner.remove(&conn_id); - } - - /// Number of live connections currently attributed to `user`. - pub fn count_for(&self, user: &UserId) -> usize { - self.inner.iter().filter(|e| &e.value().0 == user).count() - } - - /// Cancel every live connection belonging to `user`. Returns how many were - /// kicked. - pub fn kick_user(&self, user: &UserId) -> usize { - let mut kicked = 0; - for entry in self.inner.iter() { - if &entry.value().0 == user { - entry.value().1.cancel(); - kicked += 1; - } - } - kicked - } - - /// Total live connections (diagnostics). - pub fn len(&self) -> usize { - self.inner.len() - } - - pub fn is_empty(&self) -> bool { - self.inner.is_empty() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn count_and_kick() { - let active = ActiveConnections::new(); - let u1 = UserId::from("u1"); - let u2 = UserId::from("u2"); - let t1 = CancellationToken::new(); - let t2 = CancellationToken::new(); - let t3 = CancellationToken::new(); - - active.register(1, u1.clone(), t1.clone()); - active.register(2, u1.clone(), t2.clone()); - active.register(3, u2.clone(), t3.clone()); - - assert_eq!(active.count_for(&u1), 2); - assert_eq!(active.count_for(&u2), 1); - - // Kicking u1 cancels both of its tokens, not u2's. - assert_eq!(active.kick_user(&u1), 2); - assert!(t1.is_cancelled()); - assert!(t2.is_cancelled()); - assert!(!t3.is_cancelled()); - - // Deregister mirrors a connection closing. - active.deregister(1); - active.deregister(2); - assert_eq!(active.count_for(&u1), 0); - assert_eq!(active.count_for(&u2), 1); - } -} +pub use wind_core::ActiveConnections;