diff --git a/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs b/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs index c1c21897af..3aa5fe192b 100644 --- a/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs +++ b/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs @@ -16,6 +16,8 @@ use sentry_options::options; use crate::types::{item_type_name, AggregatedOutcomesBatch, BucketKey, ItemDedupKey}; +const OPTIONS_REFRESH_INTERVAL: Duration = Duration::from_secs(10); + #[derive(Debug, Default)] struct TraceItemOutcome { key_id: u64, @@ -60,8 +62,12 @@ pub struct OutcomesAggregator { message_carried_over: Option>, /// Commit request carried over from a poll where we had a message to retry. commit_request_carried_over: Option, - /// Cached value of the `consumer.use_item_timestamp` option, refreshed on each poll. + /// Cached value of the `consumer.use_item_timestamp` option, refreshed at most once per `OPTIONS_REFRESH_INTERVAL`. use_item_timestamp: bool, + /// Cached value of the `consumer.log_duplicates` option, refreshed at most once per `OPTIONS_REFRESH_INTERVAL`. + log_duplicates: bool, + /// Last time cached options were refreshed; `None` means never refreshed yet. + last_options_refresh: Option, } impl OutcomesAggregator { @@ -82,7 +88,30 @@ impl OutcomesAggregator { message_carried_over: None, commit_request_carried_over: None, use_item_timestamp: false, + log_duplicates: false, + last_options_refresh: None, + } + } + + fn refresh_options_if_stale(&mut self) { + if self + .last_options_refresh + .is_some_and(|t| t.elapsed() < OPTIONS_REFRESH_INTERVAL) + { + return; } + let snuba_opts = options("snuba").ok(); + self.use_item_timestamp = snuba_opts + .as_ref() + .and_then(|o| o.get("consumer.use_item_timestamp").ok()) + .and_then(|v| v.as_bool()) + .unwrap_or(false); + self.log_duplicates = snuba_opts + .as_ref() + .and_then(|o| o.get("consumer.log_duplicates").ok()) + .and_then(|v| v.as_bool()) + .unwrap_or(false); + self.last_options_refresh = Some(Instant::now()); } fn is_duplicate(&mut self, trace_item: &TraceItem) -> bool { @@ -113,7 +142,19 @@ impl OutcomesAggregator { trace_id, item_id, }; - return self.batch.record_if_duplicate(item_type, dedup_key); + let is_dup = self.batch.record_if_duplicate(item_type, dedup_key); + if is_dup && self.log_duplicates { + let item_type_str = item_type_name(item_type); + let item_id_uuid = uuid::Uuid::from_bytes(item_id); + tracing::info!( + "duplicate {} trace: org_id:{}, project_id:{}, item_id:{}", + item_type_str, + org_id, + project_id, + item_id_uuid + ); + } + return is_dup; } false } @@ -170,11 +211,7 @@ impl> ProcessingStrategy { fn poll(&mut self) -> Result, StrategyError> { - self.use_item_timestamp = options("snuba") - .ok() - .and_then(|o| o.get("consumer.use_item_timestamp").ok()) - .and_then(|v| v.as_bool()) - .unwrap_or(false); + self.refresh_options_if_stale(); let commit_request = self.next_step.poll()?; self.commit_request_carried_over = @@ -758,6 +795,8 @@ mod tests { let mut offset = 0; let mut do_submit = |aggregator: &mut OutcomesAggregator| { + // Bypass the refresh throttle so the test sees option changes immediately. + aggregator.last_options_refresh = None; aggregator.poll().unwrap(); aggregator .submit(Message::new_broker_message( @@ -945,4 +984,75 @@ mod tests { Some(&1) ); } + + #[test] + fn poll_throttles_option_refresh() { + init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]).unwrap(); + let mut aggregator = OutcomesAggregator::new( + Noop { last_message: None }, + 500, + Duration::from_millis(30_000), + 60, + ); + + // First poll: option is true; cached value should pick it up. + let guard = + override_options(&[("snuba", "consumer.use_item_timestamp", json!(true))]).unwrap(); + aggregator.poll().unwrap(); + assert!(aggregator.use_item_timestamp); + assert!(aggregator.last_options_refresh.is_some()); + + // Flip the option and poll again immediately; throttle should keep cached value. + drop(guard); + let _guard = + override_options(&[("snuba", "consumer.use_item_timestamp", json!(false))]).unwrap(); + aggregator.poll().unwrap(); + assert!( + aggregator.use_item_timestamp, + "throttle should keep stale cached value within OPTIONS_REFRESH_INTERVAL" + ); + + // Force the refresh window to be elapsed and poll again; cached value updates. + aggregator.last_options_refresh = None; + aggregator.poll().unwrap(); + assert!(!aggregator.use_item_timestamp); + } + + #[test] + fn is_duplicate_logs_when_option_enabled() { + init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]).unwrap(); + let _guard = + override_options(&[("snuba", "consumer.log_duplicates", json!(true))]).unwrap(); + + let mut aggregator = OutcomesAggregator::new( + Noop { last_message: None }, + 500, + Duration::from_millis(30_000), + 60, + ); + + // Refresh options so log_duplicates picks up the override. + aggregator.poll().unwrap(); + assert!(aggregator.log_duplicates); + + let item_id: Vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]; + let span_item = TraceItem { + organization_id: 1, + project_id: 2, + item_id: item_id.clone(), + item_type: TraceItemType::Span.into(), + ..Default::default() + }; + + // Counter behavior is unchanged regardless of log_duplicates. + assert!(!aggregator.is_duplicate(&span_item)); + assert!(aggregator.is_duplicate(&span_item)); + assert_eq!( + aggregator + .batch + .duplicate_item_count + .get(&TraceItemType::Span), + Some(&1) + ); + } } diff --git a/sentry-options/schemas/snuba/schema.json b/sentry-options/schemas/snuba/schema.json index c9f3f3a0d0..2ea86bd21c 100644 --- a/sentry-options/schemas/snuba/schema.json +++ b/sentry-options/schemas/snuba/schema.json @@ -7,6 +7,11 @@ "default": false, "description": "true to use the item timestamp, false for the received timestamp" }, + "consumer.log_duplicates": { + "type": "boolean", + "default": false, + "description": "true to emit a tracing log each time a duplicate trace item is detected in the accepted-outcomes aggregator" + }, "consumer.blq_enabled": { "type": "boolean", "default": false,