From 49bad799e6ac8195183f0f3c53c9e08992074b57 Mon Sep 17 00:00:00 2001 From: iHsin Date: Thu, 18 Jun 2026 14:52:53 +0800 Subject: [PATCH 1/2] feat(wind-tuic): add ActiveConnections registry for per-user limits + kick Add an optional live-connection registry to the TUIC server core so a host binary can enforce a per-user concurrent-connection limit (count_for) and actively kick a user's live connections (kick_user) - e.g. when a panel removes or rekeys a user. Each authenticated connection registers its cancel token in serve_connection (after the on_authenticated veto) and deregisters on close; cancelling the token trips the existing shutdown path and closes the QUIC connection. Threaded through both the quinn and quiche backends as an Option-defaulting field, so existing callers (e.g. the reference tuic-server) are unaffected. dashmap backs the registry, gated behind the server feature. Co-Authored-By: Claude Opus 4.8 --- crates/wind-tuic/Cargo.toml | 7 +- crates/wind-tuic/src/active.rs | 98 ++++++++++++++++++++++++++ crates/wind-tuic/src/lib.rs | 5 ++ crates/wind-tuic/src/quiche/inbound.rs | 16 ++++- crates/wind-tuic/src/quinn/inbound.rs | 11 ++- crates/wind-tuic/src/server/mod.rs | 31 +++++++- 6 files changed, 164 insertions(+), 4 deletions(-) create mode 100644 crates/wind-tuic/src/active.rs diff --git a/crates/wind-tuic/Cargo.toml b/crates/wind-tuic/Cargo.toml index ba0191b..f412dc7 100644 --- a/crates/wind-tuic/Cargo.toml +++ b/crates/wind-tuic/Cargo.toml @@ -12,7 +12,9 @@ decode = ["tuic-core/decode"] encode = ["tuic-core/encode"] # The server decodes client requests AND encodes UDP response datagrams, so it # needs both codec directions (matching the former wind-tuiche `server` feature). -server = ["decode", "encode"] +# `dashmap` backs the server-only `ActiveConnections` registry (per-user limits + +# active kick). +server = ["decode", "encode", "dep:dashmap"] client = ["encode"] # HTTP/3 masquerade: when a connecting client speaks real HTTP/3 instead of TUIC, @@ -72,6 +74,9 @@ eyre = "0.6" uuid = { version = "1", features = ["v4"] } crossfire = { version = "3", features = ["tokio"] } moka = { version = "0.12", features = ["future"] } +# Server-only live-connection registry (per-user limits + active kick); enabled +# by the `server` feature. +dashmap = { version = "6", optional = true } arc-swap = "1.9.1" serde = { version = "1", features = ["derive"] } enum_dispatch = "0.3" diff --git a/crates/wind-tuic/src/active.rs b/crates/wind-tuic/src/active.rs new file mode 100644 index 0000000..2011c1d --- /dev/null +++ b/crates/wind-tuic/src/active.rs @@ -0,0 +1,98 @@ +//! Registry of live authenticated TUIC connections, keyed by a process-unique +//! connection id, used for per-user connection limiting and active kicking. +//! +//! Shared (as a cheap `Arc` handle) between the inbound — which registers each +//! connection with its [`CancellationToken`] once authenticated and deregisters +//! on close — and the host binary's hooks, which read the per-user count (for +//! limits) and cancel a user's connections (for kicks, e.g. when the panel +//! removes a user). A TUIC connection is exactly one authenticated user, so the +//! per-connection cancel token is a precise kick handle: cancelling it trips the +//! `serve_connection` shutdown path, which closes the QUIC connection. + +use std::sync::Arc; + +use dashmap::DashMap; +use tokio_util::sync::CancellationToken; +use wind_core::UserId; + +#[derive(Clone, Default)] +pub struct ActiveConnections { + inner: Arc>, +} + +impl ActiveConnections { + pub fn new() -> Self { + Self::default() + } + + /// Register a live connection. Cancelling `token` closes the connection. + pub fn register(&self, conn_id: u64, user: UserId, token: CancellationToken) { + self.inner.insert(conn_id, (user, token)); + } + + /// Remove a connection from the registry (call on connection close). + pub fn deregister(&self, conn_id: u64) { + self.inner.remove(&conn_id); + } + + /// Number of live connections currently attributed to `user`. + pub fn count_for(&self, user: &UserId) -> usize { + self.inner.iter().filter(|e| &e.value().0 == user).count() + } + + /// Cancel every live connection belonging to `user`. Returns how many were + /// kicked. + pub fn kick_user(&self, user: &UserId) -> usize { + let mut kicked = 0; + for entry in self.inner.iter() { + if &entry.value().0 == user { + entry.value().1.cancel(); + kicked += 1; + } + } + kicked + } + + /// Total live connections (diagnostics). + pub fn len(&self) -> usize { + self.inner.len() + } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn count_and_kick() { + let active = ActiveConnections::new(); + let u1 = UserId::from("u1"); + let u2 = UserId::from("u2"); + let t1 = CancellationToken::new(); + let t2 = CancellationToken::new(); + let t3 = CancellationToken::new(); + + active.register(1, u1.clone(), t1.clone()); + active.register(2, u1.clone(), t2.clone()); + active.register(3, u2.clone(), t3.clone()); + + assert_eq!(active.count_for(&u1), 2); + assert_eq!(active.count_for(&u2), 1); + + // Kicking u1 cancels both of its tokens, not u2's. + assert_eq!(active.kick_user(&u1), 2); + assert!(t1.is_cancelled()); + assert!(t2.is_cancelled()); + assert!(!t3.is_cancelled()); + + // Deregister mirrors a connection closing. + active.deregister(1); + active.deregister(2); + assert_eq!(active.count_for(&u1), 0); + assert_eq!(active.count_for(&u2), 1); + } +} diff --git a/crates/wind-tuic/src/lib.rs b/crates/wind-tuic/src/lib.rs index a74cb6b..dbc8e84 100644 --- a/crates/wind-tuic/src/lib.rs +++ b/crates/wind-tuic/src/lib.rs @@ -8,6 +8,11 @@ pub mod proto; +#[cfg(feature = "server")] +pub mod active; +#[cfg(feature = "server")] +pub use active::ActiveConnections; + #[cfg(feature = "server")] pub mod server; diff --git a/crates/wind-tuic/src/quiche/inbound.rs b/crates/wind-tuic/src/quiche/inbound.rs index 7a59b29..4269c76 100644 --- a/crates/wind-tuic/src/quiche/inbound.rs +++ b/crates/wind-tuic/src/quiche/inbound.rs @@ -49,6 +49,8 @@ pub struct TuicheInbound { /// Downstream extensibility hooks (auth / traffic stats / connection /// management). Defaults to all-`None` (no behavior change). hooks: InboundHooks, + /// Live-connection registry for per-user connection limits + active kick. + active: Option, } impl TuicheInbound { @@ -108,8 +110,9 @@ impl AbstractInbound for TuicheInbound { let cancel = root_cancel.child_token(); let masquerade = self.masquerade.clone(); let hooks = self.hooks.clone(); + let active = self.active.clone(); conn_tasks.spawn( - crate::server::serve_connection(conn, remote, users, AUTH_TIMEOUT, cb, cancel, masquerade, hooks) + crate::server::serve_connection(conn, remote, users, AUTH_TIMEOUT, cb, cancel, masquerade, hooks, active) .instrument(span), ); } @@ -131,6 +134,7 @@ pub struct TuicheInboundBuilder { cancel: Option, masquerade: Option, hooks: InboundHooks, + active: Option, } impl TuicheInboundBuilder { @@ -145,6 +149,7 @@ impl TuicheInboundBuilder { cancel: None, masquerade: None, hooks: InboundHooks::default(), + active: None, } } @@ -155,6 +160,14 @@ impl TuicheInboundBuilder { self } + /// Set the live-connection registry for per-user connection limits + active + /// kick. Each authenticated connection registers itself; `kick_user` drops + /// it. + pub fn active(mut self, active: Option) -> Self { + self.active = active; + self + } + /// Enable the HTTP/3 masquerade: non-TUIC connections are reverse-proxied /// to the configured upstream site instead of being dropped. pub fn masquerade(mut self, masquerade: Option) -> Self { @@ -232,6 +245,7 @@ impl TuicheInboundBuilder { cancel: self.cancel.unwrap_or_default(), masquerade: self.masquerade, hooks: self.hooks, + active: self.active, }) } } diff --git a/crates/wind-tuic/src/quinn/inbound.rs b/crates/wind-tuic/src/quinn/inbound.rs index f854965..0e3ce1b 100644 --- a/crates/wind-tuic/src/quinn/inbound.rs +++ b/crates/wind-tuic/src/quinn/inbound.rs @@ -78,6 +78,11 @@ pub struct TuicInboundOpts { /// Downstream extensibility hooks (auth / traffic stats / connection /// management). Defaults to all-`None` (no behavior change). pub hooks: InboundHooks, + + /// Live-connection registry for per-user connection limits + active kick. + /// Defaults to `None` (no registration). When set, each authenticated + /// connection registers itself and `kick_user` can drop it. + pub active: Option, } impl Default for TuicInboundOpts { @@ -103,6 +108,7 @@ impl Default for TuicInboundOpts { initial_window: 1024 * 1024, masquerade: None, hooks: InboundHooks::default(), + active: None, } } } @@ -230,6 +236,7 @@ impl AbstractInbound for TuicInbound { let zero_rtt = opts.zero_rtt; let masquerade = opts.masquerade.clone(); let hooks = opts.hooks.clone(); + let active = opts.active.clone(); let cb = cb.clone(); let conn_cancel = self.cancel.child_token(); let remote = incoming.remote_address(); @@ -240,7 +247,7 @@ impl AbstractInbound for TuicInbound { // `tasks.close()` + `tasks.wait()` after cancelling). self.ctx.tasks.spawn(spawn_logged( "Connection handler", - handle_connection(incoming, users, auth_timeout, zero_rtt, masquerade, cb, conn_cancel, hooks), + handle_connection(incoming, users, auth_timeout, zero_rtt, masquerade, cb, conn_cancel, hooks, active), ).instrument(span)); } else => { @@ -273,6 +280,7 @@ async fn handle_connection( callback: C, cancel: CancellationToken, hooks: InboundHooks, + active: Option, ) -> eyre::Result<()> { let remote_addr = incoming.remote_address(); @@ -319,6 +327,7 @@ async fn handle_connection( cancel, masquerade, hooks, + active, ) .await; diff --git a/crates/wind-tuic/src/server/mod.rs b/crates/wind-tuic/src/server/mod.rs index d6ba84c..4ce6005 100644 --- a/crates/wind-tuic/src/server/mod.rs +++ b/crates/wind-tuic/src/server/mod.rs @@ -37,7 +37,10 @@ use wind_core::{ }; use wind_quic::{QuicConnection, QuicError}; -use crate::proto::{CmdType, Command, UdpStream}; +use crate::{ + active::ActiveConnections, + proto::{CmdType, Command, UdpStream}, +}; #[cfg(feature = "masquerade")] mod masquerade; @@ -138,6 +141,14 @@ struct InboundCtx { hooks: InboundHooks, /// Per-connection context handed to connection-management hooks. conn_info: ConnInfo, + /// Live-connection registry for per-user limits + active kick. The + /// connection registers itself here once authenticated and deregisters on + /// close; `kick_user` cancels `conn_cancel` to drop it. + active: Option, + /// This connection's cancel token (clone of the one driving + /// `serve_connection`). Cancelling it closes the connection — used as the + /// kick handle stored in `active`. + conn_cancel: CancellationToken, } impl InboundCtx { @@ -283,6 +294,7 @@ pub async fn serve_connection( cancel: CancellationToken, masq: Option, hooks: InboundHooks, + active: Option, ) where C: QuicConnection, CB: InboundCallback, @@ -327,6 +339,8 @@ pub async fn serve_connection( udp_root_cancel, hooks, conn_info, + active, + conn_cancel: cancel.clone(), }); // Per-connection HTTP/3 masquerade router: a parked `run_masquerade` task plus @@ -515,6 +529,12 @@ pub async fn serve_connection( } acceptor_cancel.cancel(); + // Drop this connection from the live-connection registry (no-op if it never + // authenticated and so was never registered). + if let Some(active) = &connection.active { + active.deregister(connection.conn_info.conn_id); + } + // Connection lifecycle: notify the disconnect hook (user is `None` if the // connection never authenticated). if let Some(ch) = &connection.hooks.connection { @@ -842,6 +862,15 @@ async fn handle_auth(connection: &InboundCtx, uuid: Uuid, connection.auth.store(Some(Arc::new(AuthState { user: user.clone() }))); connection.auth_notify.notify_waiters(); + + // Register this now-authenticated connection so the host can enforce a + // per-user limit (`count_for`) and actively kick it (`kick_user`). Done after + // the `on_authenticated` veto so the limit check above only counts *other* + // live connections for this user. + if let Some(active) = &connection.active { + active.register(connection.conn_info.conn_id, user.clone(), connection.conn_cancel.clone()); + } + info!(uuid = %uuid, user = %user, "authenticated"); Ok(()) From 569bd07090d1a37250a6255080802a35a0f6de88 Mon Sep 17 00:00:00 2001 From: iHsin Date: Thu, 18 Jun 2026 14:57:43 +0800 Subject: [PATCH 2/2] style(wind-tuic): rustfmt active.rs doc comment Co-Authored-By: Claude Opus 4.8 --- crates/wind-tuic/src/active.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/wind-tuic/src/active.rs b/crates/wind-tuic/src/active.rs index 2011c1d..39a4ff1 100644 --- a/crates/wind-tuic/src/active.rs +++ b/crates/wind-tuic/src/active.rs @@ -6,8 +6,8 @@ //! on close — and the host binary's hooks, which read the per-user count (for //! limits) and cancel a user's connections (for kicks, e.g. when the panel //! removes a user). A TUIC connection is exactly one authenticated user, so the -//! per-connection cancel token is a precise kick handle: cancelling it trips the -//! `serve_connection` shutdown path, which closes the QUIC connection. +//! per-connection cancel token is a precise kick handle: cancelling it trips +//! the `serve_connection` shutdown path, which closes the QUIC connection. use std::sync::Arc;