Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tower/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ full = [
"limit",
"load",
"load-shed",
"circuit-breaker",
"make",
"ready-cache",
"reconnect",
Expand All @@ -48,6 +49,7 @@ hedge = ["util", "filter", "futures-util", "hdrhistogram", "tokio/time", "tracin
limit = ["tokio/time", "tokio/sync", "tokio-util", "tracing", "pin-project-lite"]
load = ["tokio/time", "tracing", "pin-project-lite"]
load-shed = ["pin-project-lite"]
circuit-breaker = ["tokio/sync", "tokio/time", "pin-project-lite"]
make = ["pin-project-lite", "tokio"]
ready-cache = ["futures-core", "futures-util", "indexmap", "tokio/sync", "tracing", "pin-project-lite"]
reconnect = ["make", "tracing"]
Expand Down
75 changes: 75 additions & 0 deletions tower/src/circuit_breaker/future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
};

use pin_project_lite::pin_project;

use super::{
policy::CircuitPolicy,
service::{CircuitError, CircuitStatus, SharedState},
};

pin_project! {
/// Response future for [`CircuitBreaker`].
///
/// [`CircuitBreaker`]: super::service::CircuitBreaker
pub struct ResponseFuture<F, T, E, P> {
#[pin]
inner: F,
shared: Arc<Mutex<SharedState<P>>>,
_marker: std::marker::PhantomData<fn() -> (T, E)>,
}
}

impl<F, T, E, P> ResponseFuture<F, T, E, P> {
pub(crate) fn new(shared: Arc<Mutex<SharedState<P>>>, inner: F) -> Self {
Self {
inner,
shared,
_marker: std::marker::PhantomData,
}
}
}

impl<F, T, E, P> Future for ResponseFuture<F, T, E, P>
where
F: Future<Output = Result<T, E>>,
P: CircuitPolicy,
{
type Output = Result<T, CircuitError<E>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();

match this.inner.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(resp)) => {
let mut s = this.shared.lock().expect("circuit breaker state poisoned");
let should_close = s.policy.on_success();
if should_close && s.status == CircuitStatus::HalfOpen {
s.status = CircuitStatus::Closed;
}
Poll::Ready(Ok(resp))
}
Poll::Ready(Err(e)) => {
let mut s = this.shared.lock().expect("circuit breaker state poisoned");
let should_open = s.policy.on_failure();
match s.status {
// Any failure during a probe reopens immediately —
// the backend is not yet ready regardless of threshold.
CircuitStatus::HalfOpen => {
s.status = CircuitStatus::Open;
}
CircuitStatus::Closed if should_open => {
s.status = CircuitStatus::Open;
}
_ => {}
}
Poll::Ready(Err(CircuitError::Inner(e)))
}
}
}
}
62 changes: 62 additions & 0 deletions tower/src/circuit_breaker/layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use std::time::Duration;

use super::{
policy::{CircuitPolicy, ConsecutiveFailures},
service::CircuitBreaker,
};

/// [`Layer`] that wraps services in a [`CircuitBreaker`].
///
/// Construct with [`CircuitBreakerLayer::new`] for the standard
/// [`ConsecutiveFailures`] policy, or with [`CircuitBreakerLayer::with_policy`]
/// to supply any custom [`CircuitPolicy`].
///
/// [`Layer`]: tower_layer::Layer
#[derive(Clone, Debug)]
pub struct CircuitBreakerLayer<P> {
policy: P,
}

impl CircuitBreakerLayer<ConsecutiveFailures> {
/// Create a layer using the built-in [`ConsecutiveFailures`] policy.
///
/// - `failure_threshold`: consecutive failures before the circuit opens.
/// - `success_threshold`: fraction of probes (0.0–1.0) that must succeed
/// during [`HalfOpen`][crate::circuit_breaker::CircuitStatus::HalfOpen]
/// before the circuit closes again.
/// - `timeout`: how long to stay open before sending the first probe.
pub fn new(failure_threshold: usize, success_threshold: f64, timeout: Duration) -> Self {
Self {
policy: ConsecutiveFailures::new(failure_threshold, success_threshold, timeout),
}
}
}

impl<P: CircuitPolicy + Clone> CircuitBreakerLayer<P> {
/// Create a layer using a custom [`CircuitPolicy`].
///
/// # Example
///
/// ```rust,ignore
/// use tower::circuit_breaker::{CircuitBreakerLayer, ConsecutiveFailures};
/// use std::time::Duration;
///
/// // Using the built-in policy explicitly:
/// let policy = ConsecutiveFailures::new(5, 0.8, Duration::from_secs(30));
/// let layer = CircuitBreakerLayer::with_policy(policy);
///
/// // Or bring your own:
/// let layer = CircuitBreakerLayer::with_policy(MyLatencyPolicy::new());
/// ```
pub fn with_policy(policy: P) -> Self {
Self { policy }
}
}

impl<S, P: CircuitPolicy + Clone> tower_layer::Layer<S> for CircuitBreakerLayer<P> {
type Service = CircuitBreaker<S, P>;

fn layer(&self, inner: S) -> Self::Service {
CircuitBreaker::new(inner, self.policy.clone())
}
}
91 changes: 91 additions & 0 deletions tower/src/circuit_breaker/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
//! Circuit breaker middleware for Tower services.
//!
//! Prevents cascading failures by tracking service health and
//! short-circuiting requests to a failing backend before they hit the
//! network.
//!
//! # States
//!
//! ```text
//! Closed ──(N consecutive failures)──► Open
//! Open ──(timeout elapsed)─────────► HalfOpen (one probe allowed)
//! HalfOpen ──(success rate ≥ threshold)► Closed
//! HalfOpen ──(probe fails)────────────► Open
//! ```
//!
//! - **Closed** — normal operation; all requests pass through.
//! - **Open** — service is unhealthy; requests are rejected immediately
//! with [`CircuitError::Open`], avoiding latency pile-up.
//! - **Half-Open** — after the recovery timeout elapses, one probe request
//! is allowed through. On success the circuit closes; on failure it
//! reopens.
//!
//! # Policies
//!
//! The circuit-breaking logic is separated from the state machine via the
//! [`CircuitPolicy`] trait. The built-in [`ConsecutiveFailures`] policy
//! opens after *N* consecutive failures and closes once enough probes
//! succeed. Implement [`CircuitPolicy`] directly to build latency-based
//! triggers, manual switches, or any other strategy.
//!
//! # Relationship to [`tower::retry::budget`]
//!
//! [`Budget`][budget] and circuit breakers are **complementary**, not
//! competing.
//!
//! - A **retry budget** governs *retry worthiness*: it limits how many
//! retried requests can be issued relative to the originals, preventing
//! retry amplification inside a single client.
//! - A **circuit breaker** governs *traffic admission*: once failure is
//! systemic it stops **all** requests (including first attempts) from
//! reaching the backend, giving it breathing room to recover.
//!
//! Using a circuit breaker without a budget still exposes you to retry
//! storms from clients above; using a budget without a circuit breaker
//! still allows traffic to pile up against a failing backend. The two
//! compose naturally:
//!
//! ```rust,ignore
//! use std::{future, sync::Arc, time::Duration};
//! use tower::{ServiceBuilder, retry::{Policy, budget::TpsBudget}};
//! use tower::circuit_breaker::CircuitBreakerLayer;
//!
//! // Budget caps how many retries each client issues.
//! // Circuit breaker stops all traffic once failure is systemic.
//! let svc = ServiceBuilder::new()
//! .layer(CircuitBreakerLayer::new(5, 0.8, Duration::from_secs(30)))
//! .layer(tower::retry::RetryLayer::new(my_budget_policy))
//! .service_fn(my_backend);
//! ```
//!
//! [budget]: crate::retry::budget
//!
//! # Quick start
//!
//! ```rust,ignore
//! use std::time::Duration;
//! use tower::ServiceBuilder;
//! use tower::circuit_breaker::CircuitBreakerLayer;
//!
//! let svc = ServiceBuilder::new()
//! .layer(CircuitBreakerLayer::new(
//! 5, // open after 5 consecutive failures
//! 0.8, // close when 80 % of probes succeed
//! Duration::from_secs(30), // wait 30 s before sending a probe
//! ))
//! .service_fn(|req: String| async move {
//! Ok::<String, std::io::Error>(req)
//! });
//! ```

mod future;
mod layer;
mod policy;
mod service;

pub use self::{
future::ResponseFuture,
layer::CircuitBreakerLayer,
policy::{CircuitPolicy, ConsecutiveFailures},
service::{CircuitBreaker, CircuitError, CircuitStatus},
};
Loading