Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions changelog.d/prometheus_native_histograms.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
Added support for Prometheus native histograms (sparse/exponential histograms).

The `prometheus_remote_write` source now accepts native histograms sent via the
Prometheus remote write protocol, preserving their full resolution and sparse
bucket representation. The `prometheus_remote_write` sink emits native histograms
directly, enabling lossless pass-through of native histogram data between
Prometheus-compatible systems.

For sinks that do not natively support this format (such as the
`prometheus_exporter` text exposition format, InfluxDB, GreptimeDB, and Datadog),
native histograms are automatically converted to classic aggregated histograms.
This conversion is lossy but allows existing pipelines to continue operating.

authors: l1n sanjams2
67 changes: 65 additions & 2 deletions lib/prometheus-parser/proto/prometheus-types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,73 @@ message Sample {
int64 timestamp = 2;
}

// A BucketSpan defines a number of consecutive buckets with their
// offset. Logically, it would be more straightforward to include
// the bucket counts in the Span. However, the protobuf representation
// is more compact if the bucket counts are in a separate array.
message BucketSpan {
sint32 offset = 1; // Gap to previous span, or starting point for 1st span (which can be negative).
uint32 length = 2; // Length of consecutive buckets.
}

// A native histogram, also known as a sparse histogram.
// Source: https://github.com/prometheus/prometheus/blob/main/prompb/types.proto
message Histogram {
enum ResetHint {
UNKNOWN = 0; // Need to test for a counter reset explicitly.
YES = 1; // This is the 1st histogram after a counter reset.
NO = 2; // There was no counter reset between this and the previous Histogram.
GAUGE = 3; // This is a gauge histogram where counter resets don't happen.
}

oneof count { // Count of observations in the histogram.
uint64 count_int = 1;
double count_float = 2;
}
double sum = 3; // Sum of observations in the histogram.

// The schema defines the bucket schema. Currently, valid numbers
// are -4 <= n <= 8. They are all for base-2 bucket schemas, where 1
// is a bucket boundary in each case, and then each power of two is
// divided into 2^n logarithmic buckets. Or in other words, each
// bucket boundary is the previous boundary times 2^(2^-n).
sint32 schema = 4;
double zero_threshold = 5; // Breadth of the zero bucket.

oneof zero_count { // Count in zero bucket.
uint64 zero_count_int = 6;
double zero_count_float = 7;
}

// Negative Buckets.
repeated BucketSpan negative_spans = 8 [(nullable) = false];
// Use either "negative_deltas" or "negative_counts", the former for
// regular histograms with integer counts, the latter for float
// histograms.
repeated sint64 negative_deltas = 9; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
repeated double negative_counts = 10; // Absolute count of each bucket.

// Positive Buckets.
repeated BucketSpan positive_spans = 11 [(nullable) = false];
// Use either "positive_deltas" or "positive_counts", the former for
// regular histograms with integer counts, the latter for float
// histograms.
repeated sint64 positive_deltas = 12; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
repeated double positive_counts = 13; // Absolute count of each bucket.

ResetHint reset_hint = 14;

// timestamp is in ms format.
int64 timestamp = 15;
}

// TimeSeries represents samples and labels for a single time series.
message TimeSeries {
repeated Label labels = 1 [(nullable) = false];
repeated Sample samples = 2 [(nullable) = false];
repeated Label labels = 1 [(nullable) = false];
repeated Sample samples = 2 [(nullable) = false];
// field 3 is `exemplars` upstream; reserved here for wire compatibility.
reserved 3;
repeated Histogram histograms = 4 [(nullable) = false];
}

message Label {
Expand Down
114 changes: 113 additions & 1 deletion lib/prometheus-parser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,22 @@ pub struct SimpleMetric {
pub value: f64,
}

/// A Prometheus native (exponential) histogram received via remote write.
///
/// The raw proto representation is preserved so that downstream code can convert it into Vector's internal
/// `NativeHistogram` metric type without loss of information.
#[derive(Debug, PartialEq)]
pub struct NativeHistogramMetric {
pub histogram: proto::Histogram,
}

type MetricMap<T> = IndexMap<GroupKey, T>;

#[derive(Debug)]
pub enum GroupKind {
Summary(MetricMap<SummaryMetric>),
Histogram(MetricMap<HistogramMetric>),
NativeHistogram(MetricMap<NativeHistogramMetric>),
Gauge(MetricMap<SimpleMetric>),
Counter(MetricMap<SimpleMetric>),
Untyped(MetricMap<SimpleMetric>),
Expand All @@ -143,7 +153,7 @@ impl GroupKind {
match self {
Self::Counter { .. } => kind == MetricKind::Counter,
Self::Gauge { .. } => kind == MetricKind::Gauge,
Self::Histogram { .. } => kind == MetricKind::Histogram,
Self::Histogram { .. } | Self::NativeHistogram { .. } => kind == MetricKind::Histogram,
Self::Summary { .. } => kind == MetricKind::Summary,
Self::Untyped { .. } => true,
}
Expand Down Expand Up @@ -204,6 +214,17 @@ impl GroupKind {
}));
}
},
// Native histograms only receive data via proto Histogram messages, not float samples.
// If a float sample is pushed to a native histogram group, bounce it back to be
// reprocessed as a separate group.
Self::NativeHistogram(_) => {
return Ok(Some(Metric {
name: metric.name,
timestamp: key.timestamp,
labels: key.labels,
value,
}));
}
Self::Summary(metrics) => match suffix {
"" => {
let quantile = key
Expand Down Expand Up @@ -393,6 +414,39 @@ impl MetricGroupSet {
Ok(())
}

fn insert_native_histogram(
&mut self,
name: &str,
labels: &BTreeMap<String, String>,
histogram: proto::Histogram,
) {
let key = GroupKey {
timestamp: Some(histogram.timestamp),
labels: labels.clone(),
};

// Look up an existing group for this name. If it's already a native histogram group, push
// into it. If it's a classic histogram group with no data yet (created from metadata),
// upgrade it to a native histogram group. Otherwise, create a new group.
match self.0.get_mut(name) {
Some(GroupKind::NativeHistogram(metrics)) => {
metrics.insert(key, NativeHistogramMetric { histogram });
}
Some(GroupKind::Histogram(metrics)) if metrics.is_empty() => {
let mut new_metrics = IndexMap::default();
new_metrics.insert(key, NativeHistogramMetric { histogram });
self.0
.insert(name.into(), GroupKind::NativeHistogram(new_metrics));
}
_ => {
let mut metrics = IndexMap::default();
metrics.insert(key, NativeHistogramMetric { histogram });
self.0
.insert(name.into(), GroupKind::NativeHistogram(metrics));
Comment on lines +441 to +445
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid overwriting classic histogram groups

When a request contains both classic histogram samples (e.g. *_bucket, *_sum, *_count) and a native histogram for the same metric family, this branch replaces the existing GroupKind::Histogram entry with a new GroupKind::NativeHistogram map. Because self.0 is keyed only by metric name, the replacement drops all previously parsed classic histogram points for that family, leading to silent data loss in mixed-encoding remote-write payloads.

Useful? React with 👍 / 👎.

}
}
}

fn finish(self) -> Vec<MetricGroup> {
self.0
.into_iter()
Expand Down Expand Up @@ -431,6 +485,10 @@ pub fn parse_request(
for sample in timeseries.samples {
groups.insert_sample(&name, &labels, sample)?;
}

for histogram in timeseries.histograms {
groups.insert_native_histogram(&name, &labels, histogram);
}
}

Ok(groups.finish())
Expand Down Expand Up @@ -756,6 +814,7 @@ mod test {
samples: vec![
$( proto::Sample { value: $sample as f64, timestamp: $timestamp as i64 }, )*
],
histograms: vec![],
}, )* ],
}
};
Expand Down Expand Up @@ -947,6 +1006,7 @@ mod test {
value: 12345.0,
timestamp: 1395066367500,
}],
histograms: vec![],
}],
};

Expand All @@ -968,4 +1028,56 @@ mod test {
ParserError::MultipleMetricKinds { name } if name == "go_memstats_alloc_bytes"
));
}

#[test]
fn parse_request_native_histogram() {
use proto::histogram::{Count, ResetHint, ZeroCount};

let histogram = proto::Histogram {
count: Some(Count::CountInt(6)),
sum: 18.5,
schema: 0,
zero_threshold: 0.0,
zero_count: Some(ZeroCount::ZeroCountInt(0)),
negative_spans: vec![],
negative_deltas: vec![],
negative_counts: vec![],
positive_spans: vec![proto::BucketSpan {
offset: 1,
length: 3,
}],
positive_deltas: vec![2, 1, -2],
positive_counts: vec![],
reset_hint: ResetHint::No as i32,
timestamp: 1_612_325_106_789,
};

let request = proto::WriteRequest {
metadata: vec![proto::MetricMetadata {
r#type: proto::MetricType::Histogram as i32,
metric_family_name: "request_latency".into(),
help: String::default(),
unit: String::default(),
}],
timeseries: vec![proto::TimeSeries {
labels: vec![proto::Label {
name: "__name__".into(),
value: "request_latency".into(),
}],
samples: vec![],
histograms: vec![histogram],
}],
};

let parsed = parse_request(request, MetadataConflictStrategy::Reject).unwrap();
assert_eq!(parsed.len(), 1);
match_group!(parsed[0], "request_latency", NativeHistogram => |metrics: &MetricMap<NativeHistogramMetric>| {
assert_eq!(metrics.len(), 1);
let (key, metric) = metrics.get_index(0).unwrap();
assert_eq!(key.timestamp, Some(1_612_325_106_789));
assert_eq!(metric.histogram.sum, 18.5);
assert_eq!(metric.histogram.schema, 0);
assert_eq!(metric.histogram.positive_deltas, vec![2, 1, -2]);
});
}
}
50 changes: 50 additions & 0 deletions lib/vector-core/proto/event.proto
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ message Metric {
Sketch sketch = 15;
AggregatedHistogram3 aggregated_histogram3 = 16;
AggregatedSummary3 aggregated_summary3 = 17;
NativeHistogram native_histogram = 22;
}
string namespace = 11;
uint32 interval_ms = 18;
Expand Down Expand Up @@ -239,3 +240,52 @@ message Sketch {
AgentDDSketch agent_dd_sketch = 1;
}
}

message NativeHistogram {
// Span of consecutive populated buckets.
message BucketSpan {
sint32 offset = 1;
uint32 length = 2;
}

// Reset hint for the histogram.
enum ResetHint {
UNKNOWN = 0;
YES = 1;
NO = 2;
GAUGE = 3;
}

// Total observation count: either integer (counter) or float (gauge).
oneof count {
uint64 count_int = 1;
double count_float = 2;
}

double sum = 3;

// Resolution schema: higher = finer resolution.
sint32 schema = 4;

double zero_threshold = 5;

// Zero bucket count: either integer (counter) or float (gauge).
oneof zero_count {
uint64 zero_count_int = 6;
double zero_count_float = 7;
}

repeated BucketSpan negative_spans = 8;
// For integer histograms, delta-encoded. Mutually exclusive with negative_counts.
repeated sint64 negative_deltas = 9;
// For float histograms, absolute values. Mutually exclusive with negative_deltas.
repeated double negative_counts = 10;

repeated BucketSpan positive_spans = 11;
// For integer histograms, delta-encoded. Mutually exclusive with positive_counts.
repeated sint64 positive_deltas = 12;
// For float histograms, absolute values. Mutually exclusive with positive_deltas.
repeated double positive_counts = 13;

ResetHint reset_hint = 14;
}
Loading
Loading