Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion .github/scripts/modal-sync-secrets.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
#!/bin/bash
# Sync secrets from GitHub to Modal environment
# Usage: ./modal-sync-secrets.sh <modal-environment> <gh-environment>
# Required env vars: LOGFIRE_TOKEN, GCP_CREDENTIALS_JSON (optional)
# Required env vars:
# LOGFIRE_TOKEN
# GCP_CREDENTIALS_JSON (optional)
# OBSERVABILITY_ENABLED (optional, defaults to false)
# OBSERVABILITY_SHADOW_MODE (optional, defaults to true)
# OBSERVABILITY_OTLP_ENDPOINT (required when OBSERVABILITY_ENABLED=true)
# OBSERVABILITY_OTLP_HEADERS (required when OBSERVABILITY_ENABLED=true)

set -euo pipefail

MODAL_ENV="${1:?Modal environment required}"
GH_ENV="${2:?GitHub environment required}"
OBSERVABILITY_ENABLED="${OBSERVABILITY_ENABLED:-false}"
OBSERVABILITY_SHADOW_MODE="${OBSERVABILITY_SHADOW_MODE:-true}"

echo "Syncing secrets to Modal environment: $MODAL_ENV"

Expand All @@ -25,4 +33,18 @@ if [ -n "${GCP_CREDENTIALS_JSON:-}" ]; then
--force || true
fi

if [ "$OBSERVABILITY_ENABLED" = "true" ]; then
: "${OBSERVABILITY_OTLP_ENDPOINT:?OBSERVABILITY_OTLP_ENDPOINT is required when observability is enabled}"
: "${OBSERVABILITY_OTLP_HEADERS:?OBSERVABILITY_OTLP_HEADERS is required when observability is enabled}"
fi

uv run modal secret create policyengine-observability \
"OBSERVABILITY_ENABLED=$OBSERVABILITY_ENABLED" \
"OBSERVABILITY_SHADOW_MODE=$OBSERVABILITY_SHADOW_MODE" \
"OBSERVABILITY_ENVIRONMENT=$GH_ENV" \
"OBSERVABILITY_OTLP_ENDPOINT=${OBSERVABILITY_OTLP_ENDPOINT:-}" \
"OBSERVABILITY_OTLP_HEADERS=${OBSERVABILITY_OTLP_HEADERS:-}" \
--env="$MODAL_ENV" \
--force || true

echo "Modal secrets synced"
4 changes: 4 additions & 0 deletions .github/workflows/modal-deploy.reusable.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ jobs:
MODAL_TOKEN_SECRET: ${{ secrets.MODAL_TOKEN_SECRET }}
LOGFIRE_TOKEN: ${{ secrets.LOGFIRE_TOKEN }}
GCP_CREDENTIALS_JSON: ${{ secrets.GCP_CREDENTIALS_JSON }}
OBSERVABILITY_ENABLED: ${{ vars.OBSERVABILITY_ENABLED }}
OBSERVABILITY_SHADOW_MODE: ${{ vars.OBSERVABILITY_SHADOW_MODE }}
OBSERVABILITY_OTLP_ENDPOINT: ${{ vars.OBSERVABILITY_OTLP_ENDPOINT }}
OBSERVABILITY_OTLP_HEADERS: ${{ secrets.OBSERVABILITY_OTLP_HEADERS }}
run: ../../.github/scripts/modal-sync-secrets.sh "${{ inputs.modal_environment }}" "${{ inputs.environment }}"

- name: Deploy simulation API to Modal
Expand Down
2 changes: 1 addition & 1 deletion changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
- bump: patch
changes:
changed:
- Bumped policyengine-core minimum version to 3.23.5 for pandas 3.0 compatibility
- Added baseline Grafana-compatible OTLP observability and stage timing telemetry for the simulation gateway and worker.
1 change: 1 addition & 0 deletions libs/policyengine-fastapi/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies = [
"fastapi[standard] >=0.115.8,<0.116.0",
"pyjwt >=2.10.1,<3.0.0",
"opentelemetry-sdk >=1.30.0,<2.0.0",
"opentelemetry-exporter-otlp-proto-http >=1.30.0,<2.0.0",
"sqlmodel >=0.0.22,<0.0.23",
"python-json-logger >=3.2.1,<4.0.0",
"opentelemetry-instrumentation-logging >=0.51b0,<0.52",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@
TracerCaptureMode as TracerCaptureMode,
build_observability as build_observability,
get_observability as get_observability,
reset_observability_cache as reset_observability_cache,
)
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from .config import (
ObservabilityConfig as ObservabilityConfig,
parse_bool as parse_bool,
parse_header_value_pairs as parse_header_value_pairs,
)
from .contracts import (
Expand All @@ -28,10 +29,12 @@
Observability as Observability,
NoOpObservability as NoOpObservability,
NoOpSpan as NoOpSpan,
OtlpObservability as OtlpObservability,
)
from .provider import (
build_observability as build_observability,
get_observability as get_observability,
reset_observability_cache as reset_observability_cache,
)
from .stages import (
SimulationStage as SimulationStage,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from dataclasses import dataclass, field
import os

from .stages import TracerCaptureMode

Expand Down Expand Up @@ -30,6 +31,14 @@ def parse_header_value_pairs(raw: str | None) -> dict[str, str]:
return headers


def parse_bool(raw: str | bool | None, default: bool = False) -> bool:
if raw is None:
return default
if isinstance(raw, bool):
return raw
return raw.strip().lower() in {"1", "true", "yes", "on"}


@dataclass(frozen=True)
class ObservabilityConfig:
enabled: bool = False
Expand All @@ -46,3 +55,36 @@ class ObservabilityConfig:
@classmethod
def disabled(cls, service_name: str = "policyengine-observability"):
return cls(enabled=False, service_name=service_name)

@classmethod
def from_env(
cls,
service_name: str,
environment: str = "production",
prefix: str = "OBSERVABILITY_",
) -> "ObservabilityConfig":
return cls(
enabled=parse_bool(os.getenv(f"{prefix}ENABLED"), default=False),
shadow_mode=parse_bool(
os.getenv(f"{prefix}SHADOW_MODE"),
default=True,
),
service_name=os.getenv(f"{prefix}SERVICE_NAME", service_name),
environment=os.getenv(f"{prefix}ENVIRONMENT", environment),
otlp_endpoint=os.getenv(f"{prefix}OTLP_ENDPOINT"),
otlp_headers=parse_header_value_pairs(os.getenv(f"{prefix}OTLP_HEADERS")),
artifact_bucket=os.getenv(f"{prefix}ARTIFACT_BUCKET"),
artifact_prefix=os.getenv(
f"{prefix}ARTIFACT_PREFIX",
"simulation-observability",
),
tracer_capture_mode=TracerCaptureMode(
os.getenv(
f"{prefix}TRACER_CAPTURE_MODE",
TracerCaptureMode.DISABLED.value,
)
),
slow_run_threshold_seconds=float(
os.getenv(f"{prefix}SLOW_RUN_THRESHOLD_SECONDS", "30.0")
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

from contextlib import AbstractContextManager
from dataclasses import dataclass, field
from datetime import date, datetime
from enum import Enum
import json
import logging
from typing import Protocol
from typing import Any, Mapping

Expand All @@ -23,6 +27,71 @@ def add_event(self, name: str, attributes: Mapping[str, Any] | None = None) -> N
return None


class OTelSpan(AbstractContextManager["OTelSpan"]):
def __init__(self, context_manager: AbstractContextManager):
self._context_manager = context_manager
self._span = None

def __enter__(self):
self._span = self._context_manager.__enter__()
return self

def __exit__(self, exc_type, exc_value, traceback):
return self._context_manager.__exit__(exc_type, exc_value, traceback)

def set_attribute(self, key: str, value: Any) -> None:
if self._span is not None:
self._span.set_attribute(key, _normalize_attribute_value(value))

def add_event(self, name: str, attributes: Mapping[str, Any] | None = None) -> None:
if self._span is not None:
self._span.add_event(name, _normalize_attributes(attributes))


class JsonPayloadFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
message = record.msg
if isinstance(message, str):
return message
if isinstance(message, Mapping):
payload = dict(message)
else:
payload = {"message": record.getMessage()}
payload.setdefault("severity", record.levelname)
payload.setdefault("logger", record.name)
return json.dumps(payload, sort_keys=True, default=_json_default)


def _json_default(value: Any) -> Any:
if isinstance(value, (datetime, date)):
return value.isoformat()
if isinstance(value, Enum):
return value.value
return str(value)


def _normalize_attribute_value(value: Any) -> Any:
if value is None:
return None
if isinstance(value, (bool, int, float, str)):
return value
return json.dumps(value, sort_keys=True, default=_json_default)


def _normalize_attributes(
attributes: Mapping[str, Any] | None,
) -> dict[str, Any]:
normalized: dict[str, Any] = {}
if attributes is None:
return normalized

for key, value in attributes.items():
normalized_value = _normalize_attribute_value(value)
if normalized_value is not None:
normalized[key] = normalized_value
return normalized


class Observability(Protocol):
config: ObservabilityConfig

Expand All @@ -46,7 +115,7 @@ def record_artifact_manifest(self, manifest: TracerArtifactManifest) -> None: ..

def span(
self, name: str, attributes: Mapping[str, Any] | None = None
) -> NoOpSpan: ...
) -> AbstractContextManager: ...

def flush(self) -> None: ...

Expand Down Expand Up @@ -82,3 +151,68 @@ def span(self, name: str, attributes: Mapping[str, Any] | None = None) -> NoOpSp

def flush(self) -> None:
return None


@dataclass
class OtlpObservability:
config: ObservabilityConfig
tracer: Any
meter: Any
lifecycle_logger: logging.Logger
tracer_provider: Any
meter_provider: Any
logger_provider: Any = None
counter_cache: dict[str, Any] = field(default_factory=dict)
histogram_cache: dict[str, Any] = field(default_factory=dict)

def emit_lifecycle_event(self, event: SimulationLifecycleEvent) -> None:
payload = event.model_dump(mode="json")
self.lifecycle_logger.info(payload)

def emit_counter(
self,
name: str,
value: int = 1,
attributes: Mapping[str, str] | None = None,
) -> None:
counter = self.counter_cache.get(name)
if counter is None:
counter = self.meter.create_counter(name)
self.counter_cache[name] = counter
counter.add(value, attributes=_normalize_attributes(attributes))

def emit_histogram(
self,
name: str,
value: float,
attributes: Mapping[str, str] | None = None,
) -> None:
histogram = self.histogram_cache.get(name)
if histogram is None:
histogram = self.meter.create_histogram(name)
self.histogram_cache[name] = histogram
histogram.record(value, attributes=_normalize_attributes(attributes))

def record_artifact_manifest(self, manifest: TracerArtifactManifest) -> None:
self.lifecycle_logger.info(
{
"event_name": "simulation.tracer.artifact_manifest",
"manifest": manifest.model_dump(mode="json"),
}
)

def span(self, name: str, attributes: Mapping[str, Any] | None = None) -> OTelSpan:
return OTelSpan(
self.tracer.start_as_current_span(
name,
attributes=_normalize_attributes(attributes),
)
)

def flush(self) -> None:
if self.tracer_provider is not None:
self.tracer_provider.force_flush()
if self.meter_provider is not None:
self.meter_provider.force_flush()
if self.logger_provider is not None:
self.logger_provider.force_flush()
Loading
Loading