Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion crates/wind-tuic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ decode = ["tuic-core/decode"]
encode = ["tuic-core/encode"]
# The server decodes client requests AND encodes UDP response datagrams, so it
# needs both codec directions (matching the former wind-tuiche `server` feature).
server = ["decode", "encode"]
# `dashmap` backs the server-only `ActiveConnections` registry (per-user limits +
# active kick).
server = ["decode", "encode", "dep:dashmap"]
client = ["encode"]

# HTTP/3 masquerade: when a connecting client speaks real HTTP/3 instead of TUIC,
Expand Down Expand Up @@ -72,6 +74,9 @@ eyre = "0.6"
uuid = { version = "1", features = ["v4"] }
crossfire = { version = "3", features = ["tokio"] }
moka = { version = "0.12", features = ["future"] }
# Server-only live-connection registry (per-user limits + active kick); enabled
# by the `server` feature.
dashmap = { version = "6", optional = true }
arc-swap = "1.9.1"
serde = { version = "1", features = ["derive"] }
enum_dispatch = "0.3"
Expand Down
98 changes: 98 additions & 0 deletions crates/wind-tuic/src/active.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
//! Registry of live authenticated TUIC connections, keyed by a process-unique
//! connection id, used for per-user connection limiting and active kicking.
//!
//! Shared (as a cheap `Arc` handle) between the inbound — which registers each
//! connection with its [`CancellationToken`] once authenticated and deregisters
//! on close — and the host binary's hooks, which read the per-user count (for
//! limits) and cancel a user's connections (for kicks, e.g. when the panel
//! removes a user). A TUIC connection is exactly one authenticated user, so the
//! per-connection cancel token is a precise kick handle: cancelling it trips
//! the `serve_connection` shutdown path, which closes the QUIC connection.

use std::sync::Arc;

use dashmap::DashMap;
use tokio_util::sync::CancellationToken;
use wind_core::UserId;

#[derive(Clone, Default)]
pub struct ActiveConnections {
inner: Arc<DashMap<u64, (UserId, CancellationToken)>>,
}

impl ActiveConnections {
pub fn new() -> Self {
Self::default()
}

/// Register a live connection. Cancelling `token` closes the connection.
pub fn register(&self, conn_id: u64, user: UserId, token: CancellationToken) {
self.inner.insert(conn_id, (user, token));
}

/// Remove a connection from the registry (call on connection close).
pub fn deregister(&self, conn_id: u64) {
self.inner.remove(&conn_id);
}

/// Number of live connections currently attributed to `user`.
pub fn count_for(&self, user: &UserId) -> usize {
self.inner.iter().filter(|e| &e.value().0 == user).count()
}

/// Cancel every live connection belonging to `user`. Returns how many were
/// kicked.
pub fn kick_user(&self, user: &UserId) -> usize {
let mut kicked = 0;
for entry in self.inner.iter() {
if &entry.value().0 == user {
entry.value().1.cancel();
kicked += 1;
}
}
kicked
}

/// Total live connections (diagnostics).
pub fn len(&self) -> usize {
self.inner.len()
}

pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn count_and_kick() {
let active = ActiveConnections::new();
let u1 = UserId::from("u1");
let u2 = UserId::from("u2");
let t1 = CancellationToken::new();
let t2 = CancellationToken::new();
let t3 = CancellationToken::new();

active.register(1, u1.clone(), t1.clone());
active.register(2, u1.clone(), t2.clone());
active.register(3, u2.clone(), t3.clone());

assert_eq!(active.count_for(&u1), 2);
assert_eq!(active.count_for(&u2), 1);

// Kicking u1 cancels both of its tokens, not u2's.
assert_eq!(active.kick_user(&u1), 2);
assert!(t1.is_cancelled());
assert!(t2.is_cancelled());
assert!(!t3.is_cancelled());

// Deregister mirrors a connection closing.
active.deregister(1);
active.deregister(2);
assert_eq!(active.count_for(&u1), 0);
assert_eq!(active.count_for(&u2), 1);
}
}
5 changes: 5 additions & 0 deletions crates/wind-tuic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@

pub mod proto;

#[cfg(feature = "server")]
pub mod active;
#[cfg(feature = "server")]
pub use active::ActiveConnections;

#[cfg(feature = "server")]
pub mod server;

Expand Down
16 changes: 15 additions & 1 deletion crates/wind-tuic/src/quiche/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub struct TuicheInbound {
/// Downstream extensibility hooks (auth / traffic stats / connection
/// management). Defaults to all-`None` (no behavior change).
hooks: InboundHooks,
/// Live-connection registry for per-user connection limits + active kick.
active: Option<crate::active::ActiveConnections>,
}

impl TuicheInbound {
Expand Down Expand Up @@ -108,8 +110,9 @@ impl AbstractInbound for TuicheInbound {
let cancel = root_cancel.child_token();
let masquerade = self.masquerade.clone();
let hooks = self.hooks.clone();
let active = self.active.clone();
conn_tasks.spawn(
crate::server::serve_connection(conn, remote, users, AUTH_TIMEOUT, cb, cancel, masquerade, hooks)
crate::server::serve_connection(conn, remote, users, AUTH_TIMEOUT, cb, cancel, masquerade, hooks, active)
.instrument(span),
);
}
Expand All @@ -131,6 +134,7 @@ pub struct TuicheInboundBuilder {
cancel: Option<CancellationToken>,
masquerade: Option<crate::server::MasqueradeConfig>,
hooks: InboundHooks,
active: Option<crate::active::ActiveConnections>,
}

impl TuicheInboundBuilder {
Expand All @@ -145,6 +149,7 @@ impl TuicheInboundBuilder {
cancel: None,
masquerade: None,
hooks: InboundHooks::default(),
active: None,
}
}

Expand All @@ -155,6 +160,14 @@ impl TuicheInboundBuilder {
self
}

/// Set the live-connection registry for per-user connection limits + active
/// kick. Each authenticated connection registers itself; `kick_user` drops
/// it.
pub fn active(mut self, active: Option<crate::active::ActiveConnections>) -> Self {
self.active = active;
self
}

/// Enable the HTTP/3 masquerade: non-TUIC connections are reverse-proxied
/// to the configured upstream site instead of being dropped.
pub fn masquerade(mut self, masquerade: Option<crate::server::MasqueradeConfig>) -> Self {
Expand Down Expand Up @@ -232,6 +245,7 @@ impl TuicheInboundBuilder {
cancel: self.cancel.unwrap_or_default(),
masquerade: self.masquerade,
hooks: self.hooks,
active: self.active,
})
}
}
Expand Down
11 changes: 10 additions & 1 deletion crates/wind-tuic/src/quinn/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ pub struct TuicInboundOpts {
/// Downstream extensibility hooks (auth / traffic stats / connection
/// management). Defaults to all-`None` (no behavior change).
pub hooks: InboundHooks,

/// Live-connection registry for per-user connection limits + active kick.
/// Defaults to `None` (no registration). When set, each authenticated
/// connection registers itself and `kick_user` can drop it.
pub active: Option<crate::active::ActiveConnections>,
}

impl Default for TuicInboundOpts {
Expand All @@ -103,6 +108,7 @@ impl Default for TuicInboundOpts {
initial_window: 1024 * 1024,
masquerade: None,
hooks: InboundHooks::default(),
active: None,
}
}
}
Expand Down Expand Up @@ -230,6 +236,7 @@ impl AbstractInbound for TuicInbound {
let zero_rtt = opts.zero_rtt;
let masquerade = opts.masquerade.clone();
let hooks = opts.hooks.clone();
let active = opts.active.clone();
let cb = cb.clone();
let conn_cancel = self.cancel.child_token();
let remote = incoming.remote_address();
Expand All @@ -240,7 +247,7 @@ impl AbstractInbound for TuicInbound {
// `tasks.close()` + `tasks.wait()` after cancelling).
self.ctx.tasks.spawn(spawn_logged(
"Connection handler",
handle_connection(incoming, users, auth_timeout, zero_rtt, masquerade, cb, conn_cancel, hooks),
handle_connection(incoming, users, auth_timeout, zero_rtt, masquerade, cb, conn_cancel, hooks, active),
).instrument(span));
}
else => {
Expand Down Expand Up @@ -273,6 +280,7 @@ async fn handle_connection<C: InboundCallback>(
callback: C,
cancel: CancellationToken,
hooks: InboundHooks,
active: Option<crate::active::ActiveConnections>,
) -> eyre::Result<()> {
let remote_addr = incoming.remote_address();

Expand Down Expand Up @@ -319,6 +327,7 @@ async fn handle_connection<C: InboundCallback>(
cancel,
masquerade,
hooks,
active,
)
.await;

Expand Down
31 changes: 30 additions & 1 deletion crates/wind-tuic/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ use wind_core::{
};
use wind_quic::{QuicConnection, QuicError};

use crate::proto::{CmdType, Command, UdpStream};
use crate::{
active::ActiveConnections,
proto::{CmdType, Command, UdpStream},
};

#[cfg(feature = "masquerade")]
mod masquerade;
Expand Down Expand Up @@ -138,6 +141,14 @@ struct InboundCtx<C: QuicConnection> {
hooks: InboundHooks,
/// Per-connection context handed to connection-management hooks.
conn_info: ConnInfo,
/// Live-connection registry for per-user limits + active kick. The
/// connection registers itself here once authenticated and deregisters on
/// close; `kick_user` cancels `conn_cancel` to drop it.
active: Option<ActiveConnections>,
/// This connection's cancel token (clone of the one driving
/// `serve_connection`). Cancelling it closes the connection — used as the
/// kick handle stored in `active`.
conn_cancel: CancellationToken,
}

impl<C: QuicConnection> InboundCtx<C> {
Expand Down Expand Up @@ -283,6 +294,7 @@ pub async fn serve_connection<C, CB>(
cancel: CancellationToken,
masq: Option<MasqueradeConfig>,
hooks: InboundHooks,
active: Option<ActiveConnections>,
) where
C: QuicConnection,
CB: InboundCallback,
Expand Down Expand Up @@ -327,6 +339,8 @@ pub async fn serve_connection<C, CB>(
udp_root_cancel,
hooks,
conn_info,
active,
conn_cancel: cancel.clone(),
});

// Per-connection HTTP/3 masquerade router: a parked `run_masquerade` task plus
Expand Down Expand Up @@ -515,6 +529,12 @@ pub async fn serve_connection<C, CB>(
}
acceptor_cancel.cancel();

// Drop this connection from the live-connection registry (no-op if it never
// authenticated and so was never registered).
if let Some(active) = &connection.active {
active.deregister(connection.conn_info.conn_id);
}

// Connection lifecycle: notify the disconnect hook (user is `None` if the
// connection never authenticated).
if let Some(ch) = &connection.hooks.connection {
Expand Down Expand Up @@ -842,6 +862,15 @@ async fn handle_auth<C: QuicConnection>(connection: &InboundCtx<C>, uuid: Uuid,

connection.auth.store(Some(Arc::new(AuthState { user: user.clone() })));
connection.auth_notify.notify_waiters();

// Register this now-authenticated connection so the host can enforce a
// per-user limit (`count_for`) and actively kick it (`kick_user`). Done after
// the `on_authenticated` veto so the limit check above only counts *other*
// live connections for this user.
if let Some(active) = &connection.active {
active.register(connection.conn_info.conn_id, user.clone(), connection.conn_cancel.clone());
}

info!(uuid = %uuid, user = %user, "authenticated");

Ok(())
Expand Down
Loading