Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ itertools.workspace = true
k8s-openapi = { version = "0.27.0", default-features = false, features = ["v1_31"], optional = true }
kube = { version = "3.0.1", default-features = false, features = ["client", "openssl-tls", "runtime"], optional = true }
listenfd = { version = "1.0.2", default-features = false, optional = true }
libc.workspace = true
lru = { version = "0.16.3", default-features = false }
maxminddb = { version = "0.27.0", default-features = false, optional = true, features = ["simdutf8"] }
md-5 = { version = "0.10", default-features = false, optional = true }
Expand Down Expand Up @@ -451,6 +452,7 @@ sysinfo = "0.37.2"
byteorder = "1.5.0"

[target.'cfg(windows)'.dependencies]
windows-sys = { version = "0.52", features = ["Win32_Foundation", "Win32_System_Threading"] }
windows-service = "0.8.0"
windows = { version = "0.58", features = ["Win32_System_EventLog", "Win32_Foundation", "Win32_System_Com", "Win32_Security", "Win32_Security_Authorization", "Win32_System_Threading", "Win32_Storage_FileSystem"], optional = true }
quick-xml = { version = "0.31", default-features = false, features = ["serialize"], optional = true }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Added a new counter metric `component_cpu_usage_ns_total` counting the CPU
time consumed by a transform in nanoseconds. Only supported for sync and
function transforms.

authors: gwenaskell
177 changes: 177 additions & 0 deletions rfcs/2026-04-13-component-cpu-metric.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# RFC 2026-04-13 - Per-component CPU time metric for sync transforms

The current `utilization` gauge measures the fraction of wall-clock time a
component is not idle (i.e., not waiting on its input channel). Because sync
and function transforms can run concurrently across multiple tokio worker
threads, and because wall-clock "not idle" includes time the OS has preempted
the thread, this gauge does not accurately reflect how much CPU a component
actually consumes. This RFC proposes a new **counter** metric,
`component_cpu_usage_ns_total`, that tracks the cumulative CPU time consumed
by a component's transform work in nanoseconds, measured via OS thread-level
CPU clocks.

## Context

- The existing `utilization` metric is implemented in `src/utilization.rs`.
- Sync and function transforms are spawned in `src/topology/builder.rs`
via the `Runner` struct (`run_inline` and `run_concurrently` methods).
- The `enable_concurrency` trait method controls whether a transform is
dispatched to parallel `tokio::spawn` tasks (up to
`TRANSFORM_CONCURRENCY_LIMIT`, which defaults to the number of worker
threads).

## Cross cutting concerns

- The `utilization` gauge remains as-is. This RFC adds a complementary metric;
it does not replace the existing one.
- Future work could extend this approach to task transforms and sinks.

## Scope

### In scope

- A new `component_cpu_usage_ns_total` counter for **sync and function
transforms** (both inline and concurrent execution paths).
- Two implementation tiers: a wall-clock fallback that works everywhere, and a
precise thread-CPU-time implementation using OS APIs.
- Feasibility analysis of thread-level CPU time measurement.

### Out of scope

- Task transforms (async stream-based). Their execution interleaves with the
tokio runtime in ways that make per-poll CPU measurement a distinct problem.
Furthermore, all task transforms in Vector are currently single-threaded (they
do not parallelize work), making the `utilization` metric a good indicator of
their actual usage.
- Sources and sinks.
- Replacing or modifying the existing `utilization` gauge.

## Pain

1. **Utilization is misleading under concurrency.** In the concurrent
`run_concurrently` path, the utilization timer stays in "not waiting" state
from the moment events are received (`stop_wait` in `on_events_received`)
until a completed task's output is sent (`start_wait` in `send_outputs`).
The actual CPU work happens on separate `tokio::spawn`'d tasks that the
timer does not track. This means utilization measures **occupancy** (is
there at least one batch in flight?) rather than CPU consumption.

Concrete example: a concurrent remap with 4 in-flight tasks each taking
10ms, input arriving every 5ms. Input arrives frequently enough that
`stop_wait` fires before each spawn, keeping the timer in "not waiting"
almost continuously → utilization ≈ 100%. But actual CPU consumption is
4 × 10ms / 20ms = 2 cores. The utilization gauge cannot distinguish
"2 cores" from "0.3 cores at 100% occupancy."

2. **No way to detect CPU-bound transforms.** Operators tuning pipelines need to
know which transforms are CPU-bottlenecked. A `cpu_usage_ns_total` counter,
when divided by wall-clock time (in ns), directly gives CPU core utilization
and can exceed 1.0 when a transform genuinely uses multiple cores.

## Proposal

### User Experience

A new counter metric is emitted for every sync/function transform:

```prometheus
component_cpu_usage_ns_total{component_id="my_remap",component_kind="transform",component_type="remap"} 14207
```

The value is cumulative CPU nanoseconds consumed by the component. Operators
use it to compute CPU core utilization:

```promql

Check failure

Code scanning / check-spelling

Unrecognized Spelling Error

promql is not a recognized word
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
# Per-component CPU core usage (can exceed 1.0 with concurrency)
rate(component_cpu_usage_ns_total{component_id="my_remap"}[1m]) / 1e9

# Compare against utilization to separate CPU cost from pipeline pressure
rate(component_cpu_usage_ns_total{component_id="my_remap"}[1m]) / 1e9
/
utilization{component_id="my_remap"}
```

This metric is always emitted for sync/function transforms; there is no
configuration knob.

## Rationale

- **Direct CPU cost visibility.** Operators can identify which transforms are
CPU-bottlenecked vs. backpressure-limited, enabling informed tuning.
- **Composable with existing metrics.** `rate(component_cpu_usage_ns_total[1m]) / 1e9`
gives CPU cores used; dividing by `utilization` separates CPU from pipeline effects.
- **Low overhead.** Two `clock_gettime` calls per batch (~80ns total on Linux)

Check failure

Code scanning / check-spelling

Unrecognized Spelling Error

gettime is not a recognized word
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
is negligible relative to the work `transform_all` performs.
- **No accumulation errors.** The counter stores `u64` nanoseconds; each
increment is exact integer arithmetic. The single `u64 → f64` cast at scrape
time has bounded, non-accumulated error.

## Drawbacks

- **Platform-specific code.** The precise implementation uses `cfg`-gated FFI
for Linux, macOS, and Windows. Other platforms fall back to wall-clock time,
giving three maintained code paths plus one fallback.

## Alternatives

### Extend the existing utilization gauge

Add a CPU-time-based "utilization v2" that replaces the current gauge.

**Rejected because:** The current utilization metric serves a different purpose
(pipeline flow analysis: is this component starved or saturated?). CPU time is a
complementary signal, not a replacement. Conflating them would lose information.

### Per-event latency histogram

Emit a histogram of per-event processing time instead of a cumulative counter.

**Rejected because:** Histograms are expensive at high throughput (Vector
processes millions of events/sec). A counter that increments once per batch is
far cheaper. Per-event latency can be derived from the counter and
`events_sent_total` if needed (`cpu_ns / events = avg cpu ns per event`).

### `getrusage(RUSAGE_THREAD)` instead of `clock_gettime`

Check failure

Code scanning / check-spelling

Unrecognized Spelling Error

gettime is not a recognized word

On Linux, `getrusage(RUSAGE_THREAD)` also provides per-thread CPU time (as
`ru_utime` + `ru_stime`).

**Not preferred because:** `clock_gettime(CLOCK_THREAD_CPUTIME_ID)` has

Check failure

Code scanning / check-spelling

Unrecognized Spelling Error

gettime is not a recognized word
nanosecond precision vs. microsecond for `getrusage`. Both are vDSO-accelerated
on modern kernels. The higher precision is worth the identical cost.

## Outstanding Questions

1. **User/system split:** Should we report user and system CPU time separately
(as `mode="user"` / `mode="system"` tags) like `host_cpu_seconds_total`
does? The Linux API supports this. It adds cardinality but helps distinguish
transforms that trigger syscalls (e.g., enrichment table lookups) from pure
computation.

## Plan Of Attack

- Add `src/cpu_time.rs` module with `thread_cpu_time()` and platform-specific
implementations behind `#[cfg]` gates. Include unit tests that verify the
returned duration is non-zero and monotonically increasing.
- Register `counter!("component_cpu_usage_ns_total")` in `Runner::new` and
instrument `run_inline` with wall-clock timing (Tier 1).
- Instrument `run_concurrently` with wall-clock timing (Tier 1). Verify the
counter increments correctly when multiple tasks run in parallel.
- Switch from `Instant::now()` to `thread_cpu_time()` (Tier 2). Benchmark
the overhead on Linux to confirm it is <100ns per call.
- Add integration test: run a CPU-intensive remap transform, verify
`component_cpu_usage_ns_total` is within 10% of expected CPU time.
- Add documentation for the new metric in the generated component docs.
- Add changelog fragment.

## Future Improvements

- Extend `component_cpu_usage_ns_total` to **task transforms** by measuring CPU
time per `poll` of the transform stream. This requires careful accounting to
exclude time spent in the tokio runtime between polls.
- Extend to **sources and sinks** where the component owns a synchronous
processing step (e.g., codec encoding in sinks).
- Expose a derived `**cpu_utilization` gauge\*\* (CPU seconds / wall seconds)
computed by the `UtilizationEmitter` for operators who prefer a ready-to-use
ratio.
- Add `mode="user"` / `mode="system"` tag split for deeper CPU profiling.
187 changes: 187 additions & 0 deletions src/cpu_time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
use std::time::Duration;

/// An opaque snapshot of thread CPU time.
///
/// On Linux and macOS this uses `CLOCK_THREAD_CPUTIME_ID`, which measures
/// only the time the calling thread was actually scheduled on a CPU (true CPU
/// time, excluding preemption and context switches to other threads/processes).
///
/// On Windows this uses `GetThreadTimes`, which provides the same guarantee
/// with 100ns granularity.
///
/// On other platforms this falls back to wall-clock time via
/// [`std::time::Instant`].
///
/// # Usage
///
/// Call [`ThreadTime::now`] immediately before the work to measure, then call
/// [`ThreadTime::elapsed`] immediately after:
///
/// ```ignore
/// let t0 = ThreadTime::now();
/// do_work();
/// let cpu_time = t0.elapsed();
/// ```
///
/// # Correctness for sync transforms
///
/// This measurement is accurate for [`crate::transforms::SyncTransform`]
/// because `transform_all` is synchronous and non-yielding: between the two
/// measurement points the worker thread runs only transform code, with no
/// `.await` points that could interleave other tokio tasks.
pub(crate) struct ThreadTime(Inner);

impl ThreadTime {
/// Captures the current thread CPU time.
#[inline]
pub(crate) fn now() -> Self {
ThreadTime(Inner::now())
}

/// Returns the CPU time elapsed since this snapshot was taken.
#[inline]
pub(crate) fn elapsed(&self) -> Duration {
self.0.elapsed()
}
}

// ── Linux / macOS: CLOCK_THREAD_CPUTIME_ID ────────────────────────────────

#[cfg(any(target_os = "linux", target_os = "macos"))]
struct Inner(Duration);

#[cfg(any(target_os = "linux", target_os = "macos"))]
impl Inner {
fn now() -> Self {
let mut ts = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};
// SAFETY:
// - `ts` is a valid, zero-initialised `timespec` on the stack.
// - `CLOCK_THREAD_CPUTIME_ID` is a valid clock ID on Linux ≥ 2.6 and
// macOS ≥ 10.12 (both are baseline requirements for Vector).
// - The return value is intentionally ignored: the only failure modes
// are an invalid clock ID (not the case here) or an invalid pointer
// (not the case here), neither of which can occur.
unsafe {
libc::clock_gettime(libc::CLOCK_THREAD_CPUTIME_ID, &mut ts);
}
Inner(Duration::new(ts.tv_sec as u64, ts.tv_nsec as u32))
}

#[inline]
fn elapsed(&self) -> Duration {
Self::now().0.saturating_sub(self.0)
}
}

// ── Windows: GetThreadTimes ───────────────────────────────────────────────

#[cfg(target_os = "windows")]
struct Inner(Duration);

#[cfg(target_os = "windows")]
impl Inner {
fn now() -> Self {
use windows_sys::Win32::Foundation::FILETIME;
use windows_sys::Win32::System::Threading::{GetCurrentThread, GetThreadTimes};

let mut creation = FILETIME {
dwLowDateTime: 0,
dwHighDateTime: 0,
};
let mut exit = FILETIME {
dwLowDateTime: 0,
dwHighDateTime: 0,
};
let mut kernel = FILETIME {
dwLowDateTime: 0,
dwHighDateTime: 0,
};
let mut user = FILETIME {
dwLowDateTime: 0,
dwHighDateTime: 0,
};

// SAFETY:
// - `GetCurrentThread()` returns a pseudo-handle that is always valid
// and does not need to be closed.
// - All four `FILETIME` pointers are valid, properly aligned, and
// stack-allocated.
// - The return value is intentionally ignored: failure is only possible
// with an invalid handle, which cannot occur with `GetCurrentThread()`.
unsafe {
GetThreadTimes(
GetCurrentThread(),
&mut creation,
&mut exit,
&mut kernel,
&mut user,
);
}

// Combine the low/high halves of each FILETIME into a u64, then sum
// kernel + user. FILETIME units are 100-nanosecond intervals.
let kernel_ns = filetime_to_nanos(kernel);
let user_ns = filetime_to_nanos(user);
Inner(Duration::from_nanos(kernel_ns + user_ns))
}

#[inline]
fn elapsed(&self) -> Duration {
Self::now().0.saturating_sub(self.0)
}
}

#[cfg(target_os = "windows")]
#[inline]
fn filetime_to_nanos(ft: windows_sys::Win32::Foundation::FILETIME) -> u64 {
let ticks = ((ft.dwHighDateTime as u64) << 32) | (ft.dwLowDateTime as u64);
ticks * 100 // convert 100ns intervals to nanoseconds
}

// ── Other platforms: wall-clock fallback ──────────────────────────────────

#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
struct Inner(std::time::Instant);

#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
impl Inner {
fn now() -> Self {
Inner(std::time::Instant::now())
}

#[inline]
fn elapsed(&self) -> Duration {
self.0.elapsed()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn elapsed_is_non_negative() {
let t0 = ThreadTime::now();
// Burn a small amount of CPU to ensure the clock advances.
let _: u64 = (0u64..10_000).sum();
assert!(t0.elapsed() >= Duration::ZERO);
}

#[test]
fn elapsed_is_monotone() {
// Two consecutive elapsed() calls on the same snapshot must be
// non-decreasing (the clock never goes backwards).
let t0 = ThreadTime::now();
let _: u64 = (0u64..10_000).sum();
let first = t0.elapsed();
let _: u64 = (0u64..10_000).sum();
let second = t0.elapsed();
assert!(
second >= first,
"clock went backwards: {second:?} < {first:?}"
);
}
}
Loading
Loading