Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 117 additions & 7 deletions rust_snuba/src/strategies/accepted_outcomes/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use sentry_options::options;

use crate::types::{item_type_name, AggregatedOutcomesBatch, BucketKey, ItemDedupKey};

const OPTIONS_REFRESH_INTERVAL: Duration = Duration::from_secs(10);

#[derive(Debug, Default)]
struct TraceItemOutcome {
key_id: u64,
Expand Down Expand Up @@ -60,8 +62,12 @@ pub struct OutcomesAggregator<TNext> {
message_carried_over: Option<Message<AggregatedOutcomesBatch>>,
/// Commit request carried over from a poll where we had a message to retry.
commit_request_carried_over: Option<CommitRequest>,
/// Cached value of the `consumer.use_item_timestamp` option, refreshed on each poll.
/// Cached value of the `consumer.use_item_timestamp` option, refreshed at most once per `OPTIONS_REFRESH_INTERVAL`.
use_item_timestamp: bool,
/// Cached value of the `consumer.log_duplicates` option, refreshed at most once per `OPTIONS_REFRESH_INTERVAL`.
log_duplicates: bool,
/// Last time cached options were refreshed; `None` means never refreshed yet.
last_options_refresh: Option<Instant>,
}

impl<TNext> OutcomesAggregator<TNext> {
Expand All @@ -82,7 +88,30 @@ impl<TNext> OutcomesAggregator<TNext> {
message_carried_over: None,
commit_request_carried_over: None,
use_item_timestamp: false,
log_duplicates: false,
last_options_refresh: None,
}
}

fn refresh_options_if_stale(&mut self) {
if self
.last_options_refresh
.is_some_and(|t| t.elapsed() < OPTIONS_REFRESH_INTERVAL)
{
return;
}
let snuba_opts = options("snuba").ok();
self.use_item_timestamp = snuba_opts
.as_ref()
.and_then(|o| o.get("consumer.use_item_timestamp").ok())
.and_then(|v| v.as_bool())
.unwrap_or(false);
self.log_duplicates = snuba_opts
.as_ref()
.and_then(|o| o.get("consumer.log_duplicates").ok())
.and_then(|v| v.as_bool())
.unwrap_or(false);
self.last_options_refresh = Some(Instant::now());
}

fn is_duplicate(&mut self, trace_item: &TraceItem) -> bool {
Expand Down Expand Up @@ -113,7 +142,19 @@ impl<TNext> OutcomesAggregator<TNext> {
trace_id,
item_id,
};
return self.batch.record_if_duplicate(item_type, dedup_key);
let is_dup = self.batch.record_if_duplicate(item_type, dedup_key);
if is_dup && self.log_duplicates {
let item_type_str = item_type_name(item_type);
let item_id_uuid = uuid::Uuid::from_bytes(item_id);
tracing::info!(
"duplicate {} trace: org_id:{}, project_id:{}, item_id:{}",
item_type_str,
org_id,
project_id,
item_id_uuid
);
}
return is_dup;
}
false
}
Expand Down Expand Up @@ -170,11 +211,7 @@ impl<TNext: ProcessingStrategy<AggregatedOutcomesBatch>> ProcessingStrategy<Kafk
for OutcomesAggregator<TNext>
{
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
self.use_item_timestamp = options("snuba")
.ok()
.and_then(|o| o.get("consumer.use_item_timestamp").ok())
.and_then(|v| v.as_bool())
.unwrap_or(false);
self.refresh_options_if_stale();

let commit_request = self.next_step.poll()?;
self.commit_request_carried_over =
Expand Down Expand Up @@ -758,6 +795,8 @@ mod tests {

let mut offset = 0;
let mut do_submit = |aggregator: &mut OutcomesAggregator<Noop>| {
// Bypass the refresh throttle so the test sees option changes immediately.
aggregator.last_options_refresh = None;
aggregator.poll().unwrap();
aggregator
.submit(Message::new_broker_message(
Expand Down Expand Up @@ -945,4 +984,75 @@ mod tests {
Some(&1)
);
}

#[test]
fn poll_throttles_option_refresh() {
init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]).unwrap();
let mut aggregator = OutcomesAggregator::new(
Noop { last_message: None },
500,
Duration::from_millis(30_000),
60,
);

// First poll: option is true; cached value should pick it up.
let guard =
override_options(&[("snuba", "consumer.use_item_timestamp", json!(true))]).unwrap();
aggregator.poll().unwrap();
assert!(aggregator.use_item_timestamp);
assert!(aggregator.last_options_refresh.is_some());

// Flip the option and poll again immediately; throttle should keep cached value.
drop(guard);
let _guard =
override_options(&[("snuba", "consumer.use_item_timestamp", json!(false))]).unwrap();
aggregator.poll().unwrap();
assert!(
aggregator.use_item_timestamp,
"throttle should keep stale cached value within OPTIONS_REFRESH_INTERVAL"
);

// Force the refresh window to be elapsed and poll again; cached value updates.
aggregator.last_options_refresh = None;
aggregator.poll().unwrap();
assert!(!aggregator.use_item_timestamp);
}

#[test]
fn is_duplicate_logs_when_option_enabled() {
init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]).unwrap();
let _guard =
override_options(&[("snuba", "consumer.log_duplicates", json!(true))]).unwrap();

let mut aggregator = OutcomesAggregator::new(
Noop { last_message: None },
500,
Duration::from_millis(30_000),
60,
);

// Refresh options so log_duplicates picks up the override.
aggregator.poll().unwrap();
assert!(aggregator.log_duplicates);

let item_id: Vec<u8> = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
let span_item = TraceItem {
organization_id: 1,
project_id: 2,
item_id: item_id.clone(),
item_type: TraceItemType::Span.into(),
..Default::default()
};

// Counter behavior is unchanged regardless of log_duplicates.
assert!(!aggregator.is_duplicate(&span_item));
assert!(aggregator.is_duplicate(&span_item));
assert_eq!(
aggregator
.batch
.duplicate_item_count
.get(&TraceItemType::Span),
Some(&1)
);
}
}
5 changes: 5 additions & 0 deletions sentry-options/schemas/snuba/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
"default": false,
"description": "true to use the item timestamp, false for the received timestamp"
},
"consumer.log_duplicates": {
"type": "boolean",
"default": false,
"description": "true to emit a tracing log each time a duplicate trace item is detected in the accepted-outcomes aggregator"
},
"consumer.blq_enabled": {
"type": "boolean",
"default": false,
Expand Down
Loading