diff --git a/Cargo.lock b/Cargo.lock index d013bc09432ec..bb18fb96c7a97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12435,6 +12435,7 @@ dependencies = [ "warp", "windows 0.58.0", "windows-service", + "windows-sys 0.52.0", "wiremock", "zstd 0.13.2", ] diff --git a/Cargo.toml b/Cargo.toml index 84d89c50096f9..bf9998c89697b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -396,6 +396,7 @@ itertools.workspace = true k8s-openapi = { version = "0.27.0", default-features = false, features = ["v1_31"], optional = true } kube = { version = "3.0.1", default-features = false, features = ["client", "openssl-tls", "runtime"], optional = true } listenfd = { version = "1.0.2", default-features = false, optional = true } +libc.workspace = true lru = { version = "0.16.3", default-features = false } maxminddb = { version = "0.27.0", default-features = false, optional = true, features = ["simdutf8"] } md-5 = { version = "0.10", default-features = false, optional = true } @@ -451,6 +452,7 @@ sysinfo = "0.37.2" byteorder = "1.5.0" [target.'cfg(windows)'.dependencies] +windows-sys = { version = "0.52", features = ["Win32_Foundation", "Win32_System_Threading"] } windows-service = "0.8.0" windows = { version = "0.58", features = ["Win32_System_EventLog", "Win32_Foundation", "Win32_System_Com", "Win32_Security", "Win32_Security_Authorization", "Win32_System_Threading", "Win32_Storage_FileSystem"], optional = true } quick-xml = { version = "0.31", default-features = false, features = ["serialize"], optional = true } diff --git a/changelog.d/OPA-5012-add-per-component-cpu-metric.feature.md b/changelog.d/OPA-5012-add-per-component-cpu-metric.feature.md new file mode 100644 index 0000000000000..7e0c7edcf333b --- /dev/null +++ b/changelog.d/OPA-5012-add-per-component-cpu-metric.feature.md @@ -0,0 +1,5 @@ +Added a new counter metric `component_cpu_usage_ns_total` counting the CPU +time consumed by a transform in nanoseconds. Only supported for sync and +function transforms. + +authors: gwenaskell diff --git a/rfcs/2026-04-13-component-cpu-metric.md b/rfcs/2026-04-13-component-cpu-metric.md new file mode 100644 index 0000000000000..794cb52eb17f3 --- /dev/null +++ b/rfcs/2026-04-13-component-cpu-metric.md @@ -0,0 +1,177 @@ +# RFC 2026-04-13 - Per-component CPU time metric for sync transforms + +The current `utilization` gauge measures the fraction of wall-clock time a +component is not idle (i.e., not waiting on its input channel). Because sync +and function transforms can run concurrently across multiple tokio worker +threads, and because wall-clock "not idle" includes time the OS has preempted +the thread, this gauge does not accurately reflect how much CPU a component +actually consumes. This RFC proposes a new **counter** metric, +`component_cpu_usage_ns_total`, that tracks the cumulative CPU time consumed +by a component's transform work in nanoseconds, measured via OS thread-level +CPU clocks. + +## Context + +- The existing `utilization` metric is implemented in `src/utilization.rs`. +- Sync and function transforms are spawned in `src/topology/builder.rs` + via the `Runner` struct (`run_inline` and `run_concurrently` methods). +- The `enable_concurrency` trait method controls whether a transform is + dispatched to parallel `tokio::spawn` tasks (up to + `TRANSFORM_CONCURRENCY_LIMIT`, which defaults to the number of worker + threads). + +## Cross cutting concerns + +- The `utilization` gauge remains as-is. This RFC adds a complementary metric; + it does not replace the existing one. +- Future work could extend this approach to task transforms and sinks. + +## Scope + +### In scope + +- A new `component_cpu_usage_ns_total` counter for **sync and function + transforms** (both inline and concurrent execution paths). +- Two implementation tiers: a wall-clock fallback that works everywhere, and a + precise thread-CPU-time implementation using OS APIs. +- Feasibility analysis of thread-level CPU time measurement. + +### Out of scope + +- Task transforms (async stream-based). Their execution interleaves with the + tokio runtime in ways that make per-poll CPU measurement a distinct problem. + Furthermore, all task transforms in Vector are currently single-threaded (they + do not parallelize work), making the `utilization` metric a good indicator of + their actual usage. +- Sources and sinks. +- Replacing or modifying the existing `utilization` gauge. + +## Pain + +1. **Utilization is misleading under concurrency.** In the concurrent + `run_concurrently` path, the utilization timer stays in "not waiting" state + from the moment events are received (`stop_wait` in `on_events_received`) + until a completed task's output is sent (`start_wait` in `send_outputs`). + The actual CPU work happens on separate `tokio::spawn`'d tasks that the + timer does not track. This means utilization measures **occupancy** (is + there at least one batch in flight?) rather than CPU consumption. + + Concrete example: a concurrent remap with 4 in-flight tasks each taking + 10ms, input arriving every 5ms. Input arrives frequently enough that + `stop_wait` fires before each spawn, keeping the timer in "not waiting" + almost continuously → utilization ≈ 100%. But actual CPU consumption is + 4 × 10ms / 20ms = 2 cores. The utilization gauge cannot distinguish + "2 cores" from "0.3 cores at 100% occupancy." + +2. **No way to detect CPU-bound transforms.** Operators tuning pipelines need to + know which transforms are CPU-bottlenecked. A `cpu_usage_ns_total` counter, + when divided by wall-clock time (in ns), directly gives CPU core utilization + and can exceed 1.0 when a transform genuinely uses multiple cores. + +## Proposal + +### User Experience + +A new counter metric is emitted for every sync/function transform: + +```prometheus +component_cpu_usage_ns_total{component_id="my_remap",component_kind="transform",component_type="remap"} 14207 +``` + +The value is cumulative CPU nanoseconds consumed by the component. Operators +use it to compute CPU core utilization: + +```promql +# Per-component CPU core usage (can exceed 1.0 with concurrency) +rate(component_cpu_usage_ns_total{component_id="my_remap"}[1m]) / 1e9 + +# Compare against utilization to separate CPU cost from pipeline pressure +rate(component_cpu_usage_ns_total{component_id="my_remap"}[1m]) / 1e9 + / + utilization{component_id="my_remap"} +``` + +This metric is always emitted for sync/function transforms; there is no +configuration knob. + +## Rationale + +- **Direct CPU cost visibility.** Operators can identify which transforms are + CPU-bottlenecked vs. backpressure-limited, enabling informed tuning. +- **Composable with existing metrics.** `rate(component_cpu_usage_ns_total[1m]) / 1e9` + gives CPU cores used; dividing by `utilization` separates CPU from pipeline effects. +- **Low overhead.** Two `clock_gettime` calls per batch (~80ns total on Linux) + is negligible relative to the work `transform_all` performs. +- **No accumulation errors.** The counter stores `u64` nanoseconds; each + increment is exact integer arithmetic. The single `u64 → f64` cast at scrape + time has bounded, non-accumulated error. + +## Drawbacks + +- **Platform-specific code.** The precise implementation uses `cfg`-gated FFI + for Linux, macOS, and Windows. Other platforms fall back to wall-clock time, + giving three maintained code paths plus one fallback. + +## Alternatives + +### Extend the existing utilization gauge + +Add a CPU-time-based "utilization v2" that replaces the current gauge. + +**Rejected because:** The current utilization metric serves a different purpose +(pipeline flow analysis: is this component starved or saturated?). CPU time is a +complementary signal, not a replacement. Conflating them would lose information. + +### Per-event latency histogram + +Emit a histogram of per-event processing time instead of a cumulative counter. + +**Rejected because:** Histograms are expensive at high throughput (Vector +processes millions of events/sec). A counter that increments once per batch is +far cheaper. Per-event latency can be derived from the counter and +`events_sent_total` if needed (`cpu_ns / events = avg cpu ns per event`). + +### `getrusage(RUSAGE_THREAD)` instead of `clock_gettime` + +On Linux, `getrusage(RUSAGE_THREAD)` also provides per-thread CPU time (as +`ru_utime` + `ru_stime`). + +**Not preferred because:** `clock_gettime(CLOCK_THREAD_CPUTIME_ID)` has +nanosecond precision vs. microsecond for `getrusage`. Both are vDSO-accelerated +on modern kernels. The higher precision is worth the identical cost. + +## Outstanding Questions + +1. **User/system split:** Should we report user and system CPU time separately + (as `mode="user"` / `mode="system"` tags) like `host_cpu_seconds_total` + does? The Linux API supports this. It adds cardinality but helps distinguish + transforms that trigger syscalls (e.g., enrichment table lookups) from pure + computation. + +## Plan Of Attack + +- Add `src/cpu_time.rs` module with `thread_cpu_time()` and platform-specific + implementations behind `#[cfg]` gates. Include unit tests that verify the + returned duration is non-zero and monotonically increasing. +- Register `counter!("component_cpu_usage_ns_total")` in `Runner::new` and + instrument `run_inline` with wall-clock timing (Tier 1). +- Instrument `run_concurrently` with wall-clock timing (Tier 1). Verify the + counter increments correctly when multiple tasks run in parallel. +- Switch from `Instant::now()` to `thread_cpu_time()` (Tier 2). Benchmark + the overhead on Linux to confirm it is <100ns per call. +- Add integration test: run a CPU-intensive remap transform, verify + `component_cpu_usage_ns_total` is within 10% of expected CPU time. +- Add documentation for the new metric in the generated component docs. +- Add changelog fragment. + +## Future Improvements + +- Extend `component_cpu_usage_ns_total` to **task transforms** by measuring CPU + time per `poll` of the transform stream. This requires careful accounting to + exclude time spent in the tokio runtime between polls. +- Extend to **sources and sinks** where the component owns a synchronous + processing step (e.g., codec encoding in sinks). +- Expose a derived `**cpu_utilization` gauge\*\* (CPU seconds / wall seconds) + computed by the `UtilizationEmitter` for operators who prefer a ready-to-use + ratio. +- Add `mode="user"` / `mode="system"` tag split for deeper CPU profiling. diff --git a/src/cpu_time.rs b/src/cpu_time.rs new file mode 100644 index 0000000000000..7dc455e5939d2 --- /dev/null +++ b/src/cpu_time.rs @@ -0,0 +1,187 @@ +use std::time::Duration; + +/// An opaque snapshot of thread CPU time. +/// +/// On Linux and macOS this uses `CLOCK_THREAD_CPUTIME_ID`, which measures +/// only the time the calling thread was actually scheduled on a CPU (true CPU +/// time, excluding preemption and context switches to other threads/processes). +/// +/// On Windows this uses `GetThreadTimes`, which provides the same guarantee +/// with 100ns granularity. +/// +/// On other platforms this falls back to wall-clock time via +/// [`std::time::Instant`]. +/// +/// # Usage +/// +/// Call [`ThreadTime::now`] immediately before the work to measure, then call +/// [`ThreadTime::elapsed`] immediately after: +/// +/// ```ignore +/// let t0 = ThreadTime::now(); +/// do_work(); +/// let cpu_time = t0.elapsed(); +/// ``` +/// +/// # Correctness for sync transforms +/// +/// This measurement is accurate for [`crate::transforms::SyncTransform`] +/// because `transform_all` is synchronous and non-yielding: between the two +/// measurement points the worker thread runs only transform code, with no +/// `.await` points that could interleave other tokio tasks. +pub(crate) struct ThreadTime(Inner); + +impl ThreadTime { + /// Captures the current thread CPU time. + #[inline] + pub(crate) fn now() -> Self { + ThreadTime(Inner::now()) + } + + /// Returns the CPU time elapsed since this snapshot was taken. + #[inline] + pub(crate) fn elapsed(&self) -> Duration { + self.0.elapsed() + } +} + +// ── Linux / macOS: CLOCK_THREAD_CPUTIME_ID ──────────────────────────────── + +#[cfg(any(target_os = "linux", target_os = "macos"))] +struct Inner(Duration); + +#[cfg(any(target_os = "linux", target_os = "macos"))] +impl Inner { + fn now() -> Self { + let mut ts = libc::timespec { + tv_sec: 0, + tv_nsec: 0, + }; + // SAFETY: + // - `ts` is a valid, zero-initialised `timespec` on the stack. + // - `CLOCK_THREAD_CPUTIME_ID` is a valid clock ID on Linux ≥ 2.6 and + // macOS ≥ 10.12 (both are baseline requirements for Vector). + // - The return value is intentionally ignored: the only failure modes + // are an invalid clock ID (not the case here) or an invalid pointer + // (not the case here), neither of which can occur. + unsafe { + libc::clock_gettime(libc::CLOCK_THREAD_CPUTIME_ID, &mut ts); + } + Inner(Duration::new(ts.tv_sec as u64, ts.tv_nsec as u32)) + } + + #[inline] + fn elapsed(&self) -> Duration { + Self::now().0.saturating_sub(self.0) + } +} + +// ── Windows: GetThreadTimes ─────────────────────────────────────────────── + +#[cfg(target_os = "windows")] +struct Inner(Duration); + +#[cfg(target_os = "windows")] +impl Inner { + fn now() -> Self { + use windows_sys::Win32::Foundation::FILETIME; + use windows_sys::Win32::System::Threading::{GetCurrentThread, GetThreadTimes}; + + let mut creation = FILETIME { + dwLowDateTime: 0, + dwHighDateTime: 0, + }; + let mut exit = FILETIME { + dwLowDateTime: 0, + dwHighDateTime: 0, + }; + let mut kernel = FILETIME { + dwLowDateTime: 0, + dwHighDateTime: 0, + }; + let mut user = FILETIME { + dwLowDateTime: 0, + dwHighDateTime: 0, + }; + + // SAFETY: + // - `GetCurrentThread()` returns a pseudo-handle that is always valid + // and does not need to be closed. + // - All four `FILETIME` pointers are valid, properly aligned, and + // stack-allocated. + // - The return value is intentionally ignored: failure is only possible + // with an invalid handle, which cannot occur with `GetCurrentThread()`. + unsafe { + GetThreadTimes( + GetCurrentThread(), + &mut creation, + &mut exit, + &mut kernel, + &mut user, + ); + } + + // Combine the low/high halves of each FILETIME into a u64, then sum + // kernel + user. FILETIME units are 100-nanosecond intervals. + let kernel_ns = filetime_to_nanos(kernel); + let user_ns = filetime_to_nanos(user); + Inner(Duration::from_nanos(kernel_ns + user_ns)) + } + + #[inline] + fn elapsed(&self) -> Duration { + Self::now().0.saturating_sub(self.0) + } +} + +#[cfg(target_os = "windows")] +#[inline] +fn filetime_to_nanos(ft: windows_sys::Win32::Foundation::FILETIME) -> u64 { + let ticks = ((ft.dwHighDateTime as u64) << 32) | (ft.dwLowDateTime as u64); + ticks * 100 // convert 100ns intervals to nanoseconds +} + +// ── Other platforms: wall-clock fallback ────────────────────────────────── + +#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))] +struct Inner(std::time::Instant); + +#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))] +impl Inner { + fn now() -> Self { + Inner(std::time::Instant::now()) + } + + #[inline] + fn elapsed(&self) -> Duration { + self.0.elapsed() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn elapsed_is_non_negative() { + let t0 = ThreadTime::now(); + // Burn a small amount of CPU to ensure the clock advances. + let _: u64 = (0u64..10_000).sum(); + assert!(t0.elapsed() >= Duration::ZERO); + } + + #[test] + fn elapsed_is_monotone() { + // Two consecutive elapsed() calls on the same snapshot must be + // non-decreasing (the clock never goes backwards). + let t0 = ThreadTime::now(); + let _: u64 = (0u64..10_000).sum(); + let first = t0.elapsed(); + let _: u64 = (0u64..10_000).sum(); + let second = t0.elapsed(); + assert!( + second >= first, + "clock went backwards: {second:?} < {first:?}" + ); + } +} diff --git a/src/lib.rs b/src/lib.rs index 8d26a3b080ecf..2a876e05ea984 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -81,6 +81,7 @@ pub mod aws; pub mod common; pub mod completion; mod convert_config; +pub(crate) mod cpu_time; pub mod encoding_transcode; pub mod enrichment_tables; pub mod extra_context; diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 8048b7a6073ed..0e9d818734c8b 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -8,7 +8,7 @@ use std::{ use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered}; use futures_util::stream::FuturesUnordered; -use metrics::gauge; +use metrics::{Counter, counter, gauge}; use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire}; use tokio::{ select, @@ -45,6 +45,7 @@ use crate::{ ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId, ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput, }, + cpu_time::ThreadTime, event::{EventArray, EventContainer}, extra_context::ExtraContext, internal_events::EventsReceived, @@ -1130,6 +1131,7 @@ struct Runner { timer_tx: UtilizationComponentSender, latency_recorder: LatencyRecorder, events_received: Registered, + cpu_ns: Counter, } impl Runner { @@ -1149,6 +1151,7 @@ impl Runner { timer_tx, latency_recorder, events_received: register!(EventsReceived), + cpu_ns: counter!("component_cpu_usage_ns_total"), } } @@ -1184,7 +1187,9 @@ impl Runner { self.timer_tx.try_send_start_wait(); while let Some(events) = input_rx.next().await { self.on_events_received(&events); + let t0 = ThreadTime::now(); self.transform.transform_all(events, &mut outputs_buf); + self.cpu_ns.increment(t0.elapsed().as_nanos() as u64); self.send_outputs(&mut outputs_buf) .await .map_err(TaskError::wrapped)?; @@ -1233,10 +1238,13 @@ impl Runner { let mut t = self.transform.clone(); let mut outputs_buf = self.outputs.new_buf_with_capacity(len); + let cpu_ns = self.cpu_ns.clone(); let task = tokio::spawn(async move { + let t0 = ThreadTime::now(); for events in input_arrays { t.transform_all(events, &mut outputs_buf); } + cpu_ns.increment(t0.elapsed().as_nanos() as u64); outputs_buf }.in_current_span()); in_flight.push_back(task); diff --git a/website/cue/reference/components/sources/internal_metrics.cue b/website/cue/reference/components/sources/internal_metrics.cue index 98ed1ffe025d7..c06e3089ee264 100644 --- a/website/cue/reference/components/sources/internal_metrics.cue +++ b/website/cue/reference/components/sources/internal_metrics.cue @@ -359,6 +359,12 @@ components: sources: internal_metrics: { default_namespace: "vector" tags: _component_tags } + component_cpu_usage_ns_total: { + description: "The CPU time consumed by a component in nanoseconds. Available for sync and function transforms." + type: "counter" + default_namespace: "vector" + tags: _component_tags + } component_discarded_events_total: { description: "The number of events dropped by this component." type: "counter"