Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 1 addition & 34 deletions src/sentry/dynamic_sampling/tasks/boost_low_volume_projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from collections import defaultdict
from collections.abc import Iterator, Mapping, Sequence
from datetime import datetime, timedelta
from typing import TypedDict

import sentry_sdk
from snuba_sdk import (
Expand Down Expand Up @@ -37,6 +36,7 @@
get_redis_client_for_ds,
)
from sentry.dynamic_sampling.tasks.common import (
MEASURE_CONFIGS,
GetActiveOrgs,
are_equal_with_epsilon,
sample_rate_to_float,
Expand All @@ -59,10 +59,8 @@
from sentry.models.organization import Organization
from sentry.models.project import Project
from sentry.sentry_metrics import indexer
from sentry.sentry_metrics.use_case_id_registry import UseCaseID
from sentry.silo.base import SiloMode
from sentry.snuba.dataset import Dataset, EntityKey
from sentry.snuba.metrics.naming_layer.mri import SpanMRI, TransactionMRI
from sentry.snuba.referrer import Referrer
from sentry.tasks.base import instrumented_task
from sentry.tasks.relay import schedule_invalidate_project_config
Expand All @@ -83,14 +81,6 @@
OrgProjectVolumes = tuple[OrganizationId, ProjectId, int, DecisionKeepCount, DecisionDropCount]


class MeasureConfig(TypedDict):
"""Configuration for a sampling measure query."""

mri: str
use_case_id: UseCaseID
tags: dict[str, str]


@instrumented_task(
name="sentry.dynamic_sampling.tasks.boost_low_volume_projects",
namespace=telemetry_experience_tasks,
Expand Down Expand Up @@ -284,29 +274,6 @@ def fetch_projects_with_total_root_transaction_count_and_rates(
return aggregated_projects


# Configuration for each sampling measure type
MEASURE_CONFIGS: dict[SamplingMeasure, MeasureConfig] = {
# SEGMENTS: SpanMRI with is_segment=true filter (replacement for transactions)
SamplingMeasure.SEGMENTS: {
"mri": SpanMRI.COUNT_PER_ROOT_PROJECT.value,
"use_case_id": UseCaseID.SPANS,
"tags": {"is_segment": "true"},
},
# SPANS: SpanMRI without is_segment filter (AM3/project mode - counts all spans)
SamplingMeasure.SPANS: {
"mri": SpanMRI.COUNT_PER_ROOT_PROJECT.value,
"use_case_id": UseCaseID.SPANS,
"tags": {},
},
# TRANSACTIONS: TransactionMRI without tag filters (legacy)
SamplingMeasure.TRANSACTIONS: {
"mri": TransactionMRI.COUNT_PER_ROOT_PROJECT.value,
"use_case_id": UseCaseID.TRANSACTIONS,
"tags": {},
},
}


@dynamic_sampling_task
def query_project_counts_by_org(
org_ids: list[int], measure: SamplingMeasure, query_interval: timedelta | None = None
Expand Down
86 changes: 68 additions & 18 deletions src/sentry/dynamic_sampling/tasks/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import math
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import TypedDict

import sentry_sdk
from snuba_sdk import (
Expand All @@ -21,10 +22,11 @@
from sentry import quotas
from sentry.dynamic_sampling.tasks.constants import CHUNK_SIZE, MAX_ORGS_PER_QUERY
from sentry.dynamic_sampling.tasks.helpers.sliding_window import extrapolate_monthly_volume
from sentry.dynamic_sampling.types import SamplingMeasure
from sentry.sentry_metrics import indexer
from sentry.sentry_metrics.use_case_id_registry import UseCaseID
from sentry.snuba.dataset import Dataset, EntityKey
from sentry.snuba.metrics.naming_layer.mri import TransactionMRI
from sentry.snuba.metrics.naming_layer.mri import SpanMRI, TransactionMRI
from sentry.snuba.referrer import Referrer
from sentry.utils.snuba import raw_snql_query

Expand All @@ -35,6 +37,37 @@
ACTIVE_ORGS_VOLUMES_DEFAULT_GRANULARITY = Granularity(60)


class MeasureConfig(TypedDict):
"""Configuration for a sampling measure query."""

mri: str
use_case_id: UseCaseID
tags: dict[str, str]


# Configuration for each sampling measure type
MEASURE_CONFIGS: dict[SamplingMeasure, MeasureConfig] = {
# SEGMENTS: SpanMRI with is_segment=true filter (replacement for transactions)
SamplingMeasure.SEGMENTS: {
"mri": SpanMRI.COUNT_PER_ROOT_PROJECT.value,
"use_case_id": UseCaseID.SPANS,
"tags": {"is_segment": "true"},
},
# SPANS: SpanMRI without is_segment filter (AM3/project mode - counts all spans)
SamplingMeasure.SPANS: {
"mri": SpanMRI.COUNT_PER_ROOT_PROJECT.value,
"use_case_id": UseCaseID.SPANS,
"tags": {},
},
# TRANSACTIONS: TransactionMRI without tag filters (legacy)
SamplingMeasure.TRANSACTIONS: {
"mri": TransactionMRI.COUNT_PER_ROOT_PROJECT.value,
"use_case_id": UseCaseID.TRANSACTIONS,
"tags": {},
},
}


class GetActiveOrgs:
"""
Fetch organizations in batches.
Expand All @@ -50,10 +83,13 @@ def __init__(
max_projects: int | None = None,
time_interval: timedelta = ACTIVE_ORGS_DEFAULT_TIME_INTERVAL,
granularity: Granularity = ACTIVE_ORGS_DEFAULT_GRANULARITY,
measure: SamplingMeasure = SamplingMeasure.TRANSACTIONS,
) -> None:
self.metric_id = indexer.resolve_shared_org(
str(TransactionMRI.COUNT_PER_ROOT_PROJECT.value)
)
config = MEASURE_CONFIGS[measure]
self.metric_id = indexer.resolve_shared_org(str(config["mri"]))
self.use_case_id = config["use_case_id"]
self.tag_filters = config["tags"]

self.offset = 0
self.last_result: list[tuple[int, int]] = []
self.has_more_results = True
Expand All @@ -72,6 +108,20 @@ def __next__(self) -> list[int]:

if self.has_more_results:
# not enough for the current iteration and data still in the db top it up from db
where_conditions = [
Condition(
Column("timestamp"),
Op.GTE,
datetime.utcnow() - self.time_interval,
),
Condition(Column("timestamp"), Op.LT, datetime.utcnow()),
Condition(Column("metric_id"), Op.EQ, self.metric_id),
]
for tag_name, tag_value in self.tag_filters.items():
tag_string_id = indexer.resolve_shared_org(tag_name)
tag_column = f"tags_raw[{tag_string_id}]"
where_conditions.append(Condition(Column(tag_column), Op.EQ, tag_value))

query = (
Query(
match=Entity(EntityKey.GenericOrgMetricsCounters.value),
Expand All @@ -82,15 +132,7 @@ def __next__(self) -> list[int]:
groupby=[
Column("org_id"),
],
where=[
Condition(
Column("timestamp"),
Op.GTE,
datetime.utcnow() - self.time_interval,
),
Condition(Column("timestamp"), Op.LT, datetime.utcnow()),
Condition(Column("metric_id"), Op.EQ, self.metric_id),
],
where=where_conditions,
orderby=[
OrderBy(Column("org_id"), Direction.ASC),
],
Expand All @@ -104,7 +146,7 @@ def __next__(self) -> list[int]:
app_id="dynamic_sampling",
query=query,
tenant_ids={
"use_case_id": UseCaseID.TRANSACTIONS.value,
"use_case_id": self.use_case_id.value,
"cross_org_query": 1,
},
)
Expand Down Expand Up @@ -200,12 +242,15 @@ def __init__(
granularity: Granularity = ACTIVE_ORGS_VOLUMES_DEFAULT_GRANULARITY,
include_keep: bool = True,
orgs: list[int] | None = None,
measure: SamplingMeasure = SamplingMeasure.TRANSACTIONS,
) -> None:
self.include_keep = include_keep
self.orgs = orgs
self.metric_id = indexer.resolve_shared_org(
str(TransactionMRI.COUNT_PER_ROOT_PROJECT.value)
)

config = MEASURE_CONFIGS[measure]
self.metric_id = indexer.resolve_shared_org(str(config["mri"]))
self.use_case_id = config["use_case_id"]
self.tag_filters = config["tags"]

if self.include_keep:
decision_string_id = indexer.resolve_shared_org("decision")
Expand Down Expand Up @@ -251,6 +296,11 @@ def __next__(self) -> list[OrganizationDataVolume]:
Condition(Column("metric_id"), Op.EQ, self.metric_id),
]

for tag_name, tag_value in self.tag_filters.items():
tag_string_id = indexer.resolve_shared_org(tag_name)
tag_column = f"tags_raw[{tag_string_id}]"
where.append(Condition(Column(tag_column), Op.EQ, tag_value))

if self.orgs:
where.append(Condition(Column("org_id"), Op.IN, self.orgs))

Expand All @@ -277,7 +327,7 @@ def __next__(self) -> list[OrganizationDataVolume]:
app_id="dynamic_sampling",
query=query,
tenant_ids={
"use_case_id": UseCaseID.TRANSACTIONS.value,
"use_case_id": self.use_case_id.value,
"cross_org_query": 1,
},
)
Expand Down
Loading
Loading