Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions src/sinks/util/buffer/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,4 +728,41 @@ mod tests {
// as incremental metrics and this results in an empty buffer.
assert_eq!(buffer.len(), 0);
}

#[test]
fn normalizer_does_not_hold_finalizer_references() {
use vector_common::finalization::{BatchNotifier, BatchStatus, EventFinalizer, EventStatus};

let (batch, mut receiver) = BatchNotifier::new_with_receiver();

let metrics: Vec<Metric> = (0..3)
.map(|i| {
let mut m = sample_counter(i, "production", Incremental, 1.0);
m.add_finalizer(EventFinalizer::new(batch.clone()));
m
})
.collect();

// Drop our handle so only the metrics hold references
drop(batch);

let mut normalizer = MetricNormalizer::<AbsoluteMetricNormalizer>::default();
let mut normalized = Vec::new();
for metric in metrics {
if let Some(m) = normalizer.normalize(metric) {
normalized.push(m);
}
}

// Simulate what the Driver does: mark delivered, then drop finalizers
for mut metric in normalized {
metric.metadata().update_status(EventStatus::Delivered);
metric.metadata_mut().take_finalizers();
}

// The normalization cache must not retain Arc<EventFinalizer> references.
// If it did, the batch notification would never fire and the disk buffer
// could not acknowledge the events, eventually causing a deadlock.
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
}
}
22 changes: 18 additions & 4 deletions src/sinks/util/buffer/metrics/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,10 +590,19 @@ impl MetricSet {
metric = metric.with_value(new_value);
}
// Insert the updated stored value, or as store a new reference value (if the Metric changed type)
self.insert(metric.clone(), timestamp);
let mut cache_metric = metric.clone();
// Strip finalizers from the clone before caching. The normalization
// cache must not hold Arc<EventFinalizer> references, as that
// prevents the disk buffer from acknowledging events — leading to
// a deadlock once the buffer fills and no new events can replace
// cache entries.
cache_metric.metadata_mut().take_finalizers();
self.insert(cache_metric, timestamp);
}
None => {
self.insert(metric.clone(), timestamp);
let mut cache_metric = metric.clone();
cache_metric.metadata_mut().take_finalizers();
self.insert(cache_metric, timestamp);
}
}
metric.into_absolute()
Expand Down Expand Up @@ -636,13 +645,18 @@ impl MetricSet {
self.insert_with_tracking(metric.series().clone(), new_reference);
Some(metric.into_incremental())
} else {
// Metric changed type, store this and emit nothing
// Metric changed type, store this and emit nothing.
// Strip finalizers — the normalization cache must not hold
// Arc<EventFinalizer> references (see incremental_to_absolute).
metric.metadata_mut().take_finalizers();
self.insert(metric, timestamp);
None
}
}
None => {
// No reference so store this and emit nothing
// No reference so store this and emit nothing.
// Strip finalizers before caching (see incremental_to_absolute).
metric.metadata_mut().take_finalizers();
self.insert(metric, timestamp);
None
}
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/util/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ pub trait SinkBuilderExt: Stream {
Self: Stream<Item = Metric> + Unpin + Sized,
N: MetricNormalize + Default,
{
match maybe_ttl_secs {
match maybe_ttl_secs.filter(|&ttl| ttl > 0.0) {
None => Normalizer::new(self, N::default()),
Some(ttl) => {
Normalizer::new_with_ttl(self, N::default(), Duration::from_secs(ttl as u64))
Expand Down
Loading