diff --git a/src/sinks/util/buffer/metrics/mod.rs b/src/sinks/util/buffer/metrics/mod.rs index 3e149d820acdf..93e4f46ca9682 100644 --- a/src/sinks/util/buffer/metrics/mod.rs +++ b/src/sinks/util/buffer/metrics/mod.rs @@ -728,4 +728,41 @@ mod tests { // as incremental metrics and this results in an empty buffer. assert_eq!(buffer.len(), 0); } + + #[test] + fn normalizer_does_not_hold_finalizer_references() { + use vector_common::finalization::{BatchNotifier, BatchStatus, EventFinalizer, EventStatus}; + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + + let metrics: Vec = (0..3) + .map(|i| { + let mut m = sample_counter(i, "production", Incremental, 1.0); + m.add_finalizer(EventFinalizer::new(batch.clone())); + m + }) + .collect(); + + // Drop our handle so only the metrics hold references + drop(batch); + + let mut normalizer = MetricNormalizer::::default(); + let mut normalized = Vec::new(); + for metric in metrics { + if let Some(m) = normalizer.normalize(metric) { + normalized.push(m); + } + } + + // Simulate what the Driver does: mark delivered, then drop finalizers + for mut metric in normalized { + metric.metadata().update_status(EventStatus::Delivered); + metric.metadata_mut().take_finalizers(); + } + + // The normalization cache must not retain Arc references. + // If it did, the batch notification would never fire and the disk buffer + // could not acknowledge the events, eventually causing a deadlock. + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + } } diff --git a/src/sinks/util/buffer/metrics/normalize.rs b/src/sinks/util/buffer/metrics/normalize.rs index c244506508f18..09db988006097 100644 --- a/src/sinks/util/buffer/metrics/normalize.rs +++ b/src/sinks/util/buffer/metrics/normalize.rs @@ -590,10 +590,19 @@ impl MetricSet { metric = metric.with_value(new_value); } // Insert the updated stored value, or as store a new reference value (if the Metric changed type) - self.insert(metric.clone(), timestamp); + let mut cache_metric = metric.clone(); + // Strip finalizers from the clone before caching. The normalization + // cache must not hold Arc references, as that + // prevents the disk buffer from acknowledging events — leading to + // a deadlock once the buffer fills and no new events can replace + // cache entries. + cache_metric.metadata_mut().take_finalizers(); + self.insert(cache_metric, timestamp); } None => { - self.insert(metric.clone(), timestamp); + let mut cache_metric = metric.clone(); + cache_metric.metadata_mut().take_finalizers(); + self.insert(cache_metric, timestamp); } } metric.into_absolute() @@ -636,13 +645,18 @@ impl MetricSet { self.insert_with_tracking(metric.series().clone(), new_reference); Some(metric.into_incremental()) } else { - // Metric changed type, store this and emit nothing + // Metric changed type, store this and emit nothing. + // Strip finalizers — the normalization cache must not hold + // Arc references (see incremental_to_absolute). + metric.metadata_mut().take_finalizers(); self.insert(metric, timestamp); None } } None => { - // No reference so store this and emit nothing + // No reference so store this and emit nothing. + // Strip finalizers before caching (see incremental_to_absolute). + metric.metadata_mut().take_finalizers(); self.insert(metric, timestamp); None } diff --git a/src/sinks/util/builder.rs b/src/sinks/util/builder.rs index 725e20d974782..175cb7a92fc81 100644 --- a/src/sinks/util/builder.rs +++ b/src/sinks/util/builder.rs @@ -231,7 +231,7 @@ pub trait SinkBuilderExt: Stream { Self: Stream + Unpin + Sized, N: MetricNormalize + Default, { - match maybe_ttl_secs { + match maybe_ttl_secs.filter(|&ttl| ttl > 0.0) { None => Normalizer::new(self, N::default()), Some(ttl) => { Normalizer::new_with_ttl(self, N::default(), Duration::from_secs(ttl as u64))