From ef956ee78613364cfe2ddd90f9799f5d872a10d5 Mon Sep 17 00:00:00 2001 From: Derek Zhang Date: Thu, 16 Apr 2026 12:15:09 -0400 Subject: [PATCH 1/3] remove finalizer from cache --- src/sinks/util/buffer/metrics/normalize.rs | 8 +++++++- src/sinks/util/builder.rs | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/sinks/util/buffer/metrics/normalize.rs b/src/sinks/util/buffer/metrics/normalize.rs index c244506508f18..d7cab65b8a966 100644 --- a/src/sinks/util/buffer/metrics/normalize.rs +++ b/src/sinks/util/buffer/metrics/normalize.rs @@ -649,7 +649,13 @@ impl MetricSet { } } - fn insert(&mut self, metric: Metric, timestamp: Option) { + fn insert(&mut self, mut metric: Metric, timestamp: Option) { + // Strip finalizers before caching. The cache is only used for + // normalization state (tracking running totals) and must not hold + // Arc references, as that prevents the disk buffer + // from acknowledging events — leading to a deadlock once the buffer + // fills up and no new events can replace cache entries. + metric.metadata_mut().take_finalizers(); let (series, entry) = MetricEntry::from_metric(metric, timestamp); self.insert_with_tracking(series, entry); } diff --git a/src/sinks/util/builder.rs b/src/sinks/util/builder.rs index 725e20d974782..175cb7a92fc81 100644 --- a/src/sinks/util/builder.rs +++ b/src/sinks/util/builder.rs @@ -231,7 +231,7 @@ pub trait SinkBuilderExt: Stream { Self: Stream + Unpin + Sized, N: MetricNormalize + Default, { - match maybe_ttl_secs { + match maybe_ttl_secs.filter(|&ttl| ttl > 0.0) { None => Normalizer::new(self, N::default()), Some(ttl) => { Normalizer::new_with_ttl(self, N::default(), Duration::from_secs(ttl as u64)) From 2739bab7b12a360c9e5676fb34eb928ee87fd1d1 Mon Sep 17 00:00:00 2001 From: Derek Zhang Date: Thu, 16 Apr 2026 12:58:22 -0400 Subject: [PATCH 2/3] remove PR check --- src/sinks/util/buffer/metrics/mod.rs | 36 ++++++++++++++++++++++ src/sinks/util/buffer/metrics/normalize.rs | 30 +++++++++++------- 2 files changed, 55 insertions(+), 11 deletions(-) diff --git a/src/sinks/util/buffer/metrics/mod.rs b/src/sinks/util/buffer/metrics/mod.rs index 3e149d820acdf..d9b9f0374495c 100644 --- a/src/sinks/util/buffer/metrics/mod.rs +++ b/src/sinks/util/buffer/metrics/mod.rs @@ -728,4 +728,40 @@ mod tests { // as incremental metrics and this results in an empty buffer. assert_eq!(buffer.len(), 0); } + + #[test] + fn normalizer_does_not_hold_finalizer_references() { + use vector_common::finalization::{BatchNotifier, BatchStatus, EventFinalizer, EventStatus}; + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + + let metrics: Vec = (0..3) + .map(|i| { + let mut m = sample_counter(i, "production", Incremental, 1.0); + m.add_finalizer(EventFinalizer::new(batch.clone())); + m + }) + .collect(); + + // Drop our handle so only the metrics hold references + drop(batch); + + let mut normalizer = MetricNormalizer::::default(); + let mut normalized = Vec::new(); + for metric in metrics { + if let Some(m) = normalizer.normalize(metric) { + normalized.push(m); + } + } + + // Simulate what the Driver does: mark delivered, then drop finalizers + for mut metric in normalized { + metric.metadata().update_status(EventStatus::Delivered); + metric.metadata_mut().take_finalizers(); + } + + // Before the fix, this fails with Err(Empty) — the MetricSet cache + // holds Arc clones, preventing the batch from completing. + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + } } diff --git a/src/sinks/util/buffer/metrics/normalize.rs b/src/sinks/util/buffer/metrics/normalize.rs index d7cab65b8a966..09db988006097 100644 --- a/src/sinks/util/buffer/metrics/normalize.rs +++ b/src/sinks/util/buffer/metrics/normalize.rs @@ -590,10 +590,19 @@ impl MetricSet { metric = metric.with_value(new_value); } // Insert the updated stored value, or as store a new reference value (if the Metric changed type) - self.insert(metric.clone(), timestamp); + let mut cache_metric = metric.clone(); + // Strip finalizers from the clone before caching. The normalization + // cache must not hold Arc references, as that + // prevents the disk buffer from acknowledging events — leading to + // a deadlock once the buffer fills and no new events can replace + // cache entries. + cache_metric.metadata_mut().take_finalizers(); + self.insert(cache_metric, timestamp); } None => { - self.insert(metric.clone(), timestamp); + let mut cache_metric = metric.clone(); + cache_metric.metadata_mut().take_finalizers(); + self.insert(cache_metric, timestamp); } } metric.into_absolute() @@ -636,26 +645,25 @@ impl MetricSet { self.insert_with_tracking(metric.series().clone(), new_reference); Some(metric.into_incremental()) } else { - // Metric changed type, store this and emit nothing + // Metric changed type, store this and emit nothing. + // Strip finalizers — the normalization cache must not hold + // Arc references (see incremental_to_absolute). + metric.metadata_mut().take_finalizers(); self.insert(metric, timestamp); None } } None => { - // No reference so store this and emit nothing + // No reference so store this and emit nothing. + // Strip finalizers before caching (see incremental_to_absolute). + metric.metadata_mut().take_finalizers(); self.insert(metric, timestamp); None } } } - fn insert(&mut self, mut metric: Metric, timestamp: Option) { - // Strip finalizers before caching. The cache is only used for - // normalization state (tracking running totals) and must not hold - // Arc references, as that prevents the disk buffer - // from acknowledging events — leading to a deadlock once the buffer - // fills up and no new events can replace cache entries. - metric.metadata_mut().take_finalizers(); + fn insert(&mut self, metric: Metric, timestamp: Option) { let (series, entry) = MetricEntry::from_metric(metric, timestamp); self.insert_with_tracking(series, entry); } From 6fc74fb3f38ab1f879549d6497f66bab37d1d24b Mon Sep 17 00:00:00 2001 From: Derek Zhang Date: Thu, 16 Apr 2026 13:56:55 -0400 Subject: [PATCH 3/3] adjust comment --- src/sinks/util/buffer/metrics/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/sinks/util/buffer/metrics/mod.rs b/src/sinks/util/buffer/metrics/mod.rs index d9b9f0374495c..93e4f46ca9682 100644 --- a/src/sinks/util/buffer/metrics/mod.rs +++ b/src/sinks/util/buffer/metrics/mod.rs @@ -760,8 +760,9 @@ mod tests { metric.metadata_mut().take_finalizers(); } - // Before the fix, this fails with Err(Empty) — the MetricSet cache - // holds Arc clones, preventing the batch from completing. + // The normalization cache must not retain Arc references. + // If it did, the batch notification would never fire and the disk buffer + // could not acknowledge the events, eventually causing a deadlock. assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); } }