diff --git a/docs/protocol.md b/docs/protocol.md index dca9491..57c3cbc 100644 --- a/docs/protocol.md +++ b/docs/protocol.md @@ -93,3 +93,25 @@ not match the expected value are rejected to prevent replay. - HKDF provides key separation between channels. - Empty associated data is used by default but can be extended in future revisions. + +## Denial telemetry + +Every denied operation is emitted as a structured `DenialEvent` before the +sandbox receives the corresponding `PolicyError`. The event shape is stable and +JSON-serializable: + +```python +{ + "cell": "", + "capability": "", + "attempted_action": "", + "policy_rule": "", + "kernel_decision": "", + "broker_decision": "", +} +``` + +Sandbox handles expose `get_denial_events()` for inspection, and +`PolicyError.denial_event` carries the event that caused the raised error. +Prometheus export includes aggregate denial counters plus decision-dimensional +samples so denied behavior can be segmented without scraping exception strings. diff --git a/pyisolate/__init__.py b/pyisolate/__init__.py index bcf0eec..2c95249 100644 --- a/pyisolate/__init__.py +++ b/pyisolate/__init__.py @@ -58,6 +58,7 @@ def restore(*args, **kwargs): # type: ignore[no-redef] WallTimeExceeded, ) from .logging import setup_structured_logging # noqa: F401 +from .telemetry import DenialEvent # noqa: F401 try: from .migration import migrate @@ -146,6 +147,7 @@ def migrate(*args, **kwargs): # type: ignore[no-redef] "migrate", "refresh_remote", "setup_structured_logging", + "DenialEvent", "no_gil_readiness_report", "warn_if_unsafe_native_extensions", "bpf", diff --git a/pyisolate/errors.py b/pyisolate/errors.py index 8d3a9d9..6ea80f9 100644 --- a/pyisolate/errors.py +++ b/pyisolate/errors.py @@ -1,7 +1,11 @@ """Exception hierarchy for PyIsolate.""" +from __future__ import annotations + import builtins as _builtins +from .telemetry import DenialEvent + class SandboxError(Exception): """Base class for all sandbox related errors.""" @@ -10,6 +14,10 @@ class SandboxError(Exception): class PolicyError(SandboxError): """Raised when a policy violation occurs.""" + def __init__(self, message: str = "", *, denial_event: DenialEvent | None = None): + super().__init__(message) + self.denial_event = denial_event + class PolicyAuthError(PolicyError): """Raised when a policy update is not properly authenticated.""" diff --git a/pyisolate/observability/metrics.py b/pyisolate/observability/metrics.py index 6a1dafd..9202e6c 100644 --- a/pyisolate/observability/metrics.py +++ b/pyisolate/observability/metrics.py @@ -59,6 +59,35 @@ def emit(name: str, help_text: str, typ: str, sample: str) -> None: "counter", f'pyisolate_errors_total{{sandbox="{label}"}} {stats.errors}', ) + denials = getattr(stats, "denials", []) + emit( + "pyisolate_denials_total", + "Total denied operations by sandbox", + "counter", + f'pyisolate_denials_total{{sandbox="{label}"}} {len(denials)}', + ) + for event in denials: + if hasattr(event, "to_dict"): + event = event.to_dict() + capability = _escape_label(str(event.get("capability", "unknown"))) + policy_rule = _escape_label(str(event.get("policy_rule", "unknown"))) + kernel_decision = _escape_label( + str(event.get("kernel_decision", "unknown")) + ) + broker_decision = _escape_label( + str(event.get("broker_decision", "unknown")) + ) + emit( + "pyisolate_denial_events_total", + "Structured denied operations labeled by decision dimensions", + "counter", + ( + f'pyisolate_denial_events_total{{sandbox="{label}",' + f'capability="{capability}",policy_rule="{policy_rule}",' + f'kernel_decision="{kernel_decision}",' + f'broker_decision="{broker_decision}"}} 1' + ), + ) emit( "pyisolate_cost", "Internal cost score for sandbox", diff --git a/pyisolate/runtime/thread.py b/pyisolate/runtime/thread.py index cc6900e..7ea82c4 100644 --- a/pyisolate/runtime/thread.py +++ b/pyisolate/runtime/thread.py @@ -25,7 +25,7 @@ import tracemalloc import types import weakref -from dataclasses import dataclass +from dataclasses import dataclass, field from pathlib import Path from typing import Any, Callable, Iterable, Optional @@ -49,6 +49,7 @@ ) from ..numa import bind_current_thread from ..observability.trace import Tracer +from ..telemetry import DenialEvent from ..policy.model import from_sandbox_policy _thread_local = threading.local() @@ -67,6 +68,48 @@ _BLOCKED_MODULES = {"ctypes", "multiprocessing"} +def _active_sandbox() -> "SandboxThread | None": + return getattr(_thread_local, "sandbox", None) + + +def _deny( + capability: str, + attempted_action: str, + policy_rule: str, + message: str, + *, + kernel_decision: str = "not_evaluated", + broker_decision: str = "deny", +) -> errors.PolicyError: + sandbox = _active_sandbox() + cell = sandbox.name if sandbox is not None else "" + event = DenialEvent( + cell=cell, + capability=capability, + attempted_action=attempted_action, + policy_rule=policy_rule, + kernel_decision=kernel_decision, + broker_decision=broker_decision, + ) + if sandbox is not None: + sandbox._record_denial(event) + return errors.PolicyError(message, denial_event=event) + + +def _format_roots(roots: Iterable[Path]) -> str: + return ",".join(str(root) for root in roots) + + +def _subprocess_command_name(args: object) -> str | None: + if isinstance(args, str): + return args.split(maxsplit=1)[0] if args else "" + if isinstance(args, (list, tuple)): + if not args: + return None + return str(args[0]) + return str(args) + + def _serialize_capability(capability: Any) -> Any: if isinstance(capability, FilesystemCapability): return { @@ -168,6 +211,20 @@ def _blocked_open(file, *args, **kwargs): runtime_policy = getattr(_thread_local, "runtime_policy", None) if fs_cap is not None: if not fs_cap.allows(path): + raise _deny( + "filesystem", + f"open:{path}", + f"capability:filesystem roots={_format_roots(fs_cap.roots)}", + "file access blocked", + ) + elif allowed is not None: + if not any(path.is_relative_to(a) for a in allowed): + raise _deny( + "filesystem", + f"open:{path}", + f"allow_fs:{_format_roots(allowed)}", + "file access blocked", + ) raise errors.PolicyError("file access blocked") elif runtime_policy is not None: if any(_fs_rule_matches(rule.path, path) for rule in runtime_policy.deny_fs): @@ -177,7 +234,12 @@ def _blocked_open(file, *args, **kwargs): ): raise errors.PolicyError("file access blocked") elif getattr(_thread_local, "active", False): - raise errors.PolicyError("file access blocked") + raise _deny( + "filesystem", + f"open:{path}", + "deny-by-default", + "file access blocked", + ) sandbox = getattr(_thread_local, "sandbox", None) if sandbox is not None: @@ -204,7 +266,27 @@ def _check_network_destination(address: Iterable[str]) -> None: destination = f"{host}:{port}" if net_cap is not None: if not net_cap.allows(str(host), int(port)): - raise errors.PolicyError(f"connect blocked: {destination}") + raise _deny( + "network", + f"connect:{host}:{port}", + f"capability:network destinations={','.join(sorted(net_cap.destinations))}", + f"connect blocked: {host}:{port}", + ) + elif allowed is not None: + if f"{host}:{port}" not in allowed: + raise _deny( + "network", + f"connect:{host}:{port}", + f"allow_tcp:{','.join(sorted(allowed))}", + f"connect blocked: {host}:{port}", + ) + else: + raise _deny( + "network", + f"connect:{host}:{port}", + "deny-by-default", + f"connect blocked: {host}:{port}", + ) elif runtime_policy is not None: if any(rule.destination == destination for rule in runtime_policy.deny_tcp): raise errors.PolicyError(f"connect blocked: {destination}") @@ -255,15 +337,43 @@ def _blocked(*args, **kwargs): def _blocked_subprocess_run(*args, **kwargs): cap = getattr(_thread_local, "subprocess_capability", None) + attempted = args[0] if args else kwargs.get("args") + command_name = _subprocess_command_name(attempted) + action = ( + f"subprocess.run:{command_name}" if command_name else "subprocess.run:" + ) if cap is None: - raise errors.PolicyError("subprocess access blocked") + raise _deny( + "subprocess", action, "deny-by-default", "subprocess access blocked" + ) + if isinstance(attempted, str) and not cap.allow_shell: + raise _deny( + "subprocess", + action, + "capability:subprocess shell=false", + "shell string commands are not permitted", + ) + if command_name is None: + raise ValueError("empty command") + if command_name not in cap.allowed_commands: + raise _deny( + "subprocess", + action, + f"capability:subprocess allowed_commands={','.join(sorted(cap.allowed_commands))}", + f"subprocess blocked: {command_name}", + ) return cap.run(*args, **kwargs) def _guarded_urandom(n: int) -> bytes: cap = getattr(_thread_local, "random_capability", None) if cap is None: - raise errors.PolicyError("randomness access blocked") + raise _deny( + "random", + f"random.bytes:{n}", + "deny-by-default", + "randomness access blocked", + ) return cap.bytes(n) @@ -464,7 +574,12 @@ def _make_importer(allowed: Iterable[str]): def _import(name, globals=None, locals=None, fromlist=(), level=0): base = name.split(".")[0] if base not in allowed_set: - raise errors.PolicyError(f"import of {name!r} is not permitted") + raise _deny( + "import", + f"import:{name}", + f"allow_import:{','.join(sorted(allowed_set))}", + f"import of {name!r} is not permitted", + ) module = builtins.__import__(name, globals, locals, fromlist, level) return _wrap_module(name, module) @@ -511,6 +626,7 @@ class Stats: errors: int operations: int cost: float + denials: list[DenialEvent] = field(default_factory=list) class SandboxThread(threading.Thread): @@ -600,6 +716,7 @@ def _reset_runtime_state(self) -> None: self._network_ops = 0 self._output_bytes = 0 self._child_work = 0 + self._denial_events: list[DenialEvent] = [] def __init__( self, @@ -764,6 +881,16 @@ def _trace_guard(self, frame, event, arg): raise errors.WallTimeExceeded() return self._trace_guard + def _record_denial(self, event: DenialEvent) -> None: + self._denial_events.append(event) + self._logger.warning( + "operation denied", extra={"denial_event": event.to_dict()} + ) + + def get_denial_events(self) -> list[dict[str, str]]: + """Return structured denial telemetry for this sandbox.""" + return [event.to_dict() for event in self._denial_events] + def enable_tracing(self) -> None: """Start recording guest operations.""" self._trace_enabled = True @@ -926,6 +1053,7 @@ def stats(self): errors=self._errors, operations=self._ops, cost=cost, + denials=list(self._denial_events), ) # internal thread run loop diff --git a/pyisolate/supervisor.py b/pyisolate/supervisor.py index df73a89..72155e7 100644 --- a/pyisolate/supervisor.py +++ b/pyisolate/supervisor.py @@ -22,6 +22,7 @@ from .observability.trace import Tracer from .runtime.protocol import CapabilityHandle, ControlRequest from .runtime.thread import SandboxThread +from .telemetry import DenialEvent from .watchdog import ResourceWatchdog logger = logging.getLogger(__name__) @@ -111,6 +112,10 @@ def enable_tracing(self) -> None: def get_syscall_log(self) -> list[str]: return self._thread.get_syscall_log() + def get_denial_events(self) -> list[dict[str, str]]: + """Return structured denial telemetry emitted by this sandbox.""" + return self._thread.get_denial_events() + def profile(self): return self._thread.profile() @@ -401,8 +406,16 @@ def _authorize_control( handle = CapabilityHandle(kind="root", subject=op) else: if self._policy_token is None or token != self._policy_token: - logger.warning("control operation rejected: %s invalid token", op) - raise PolicyAuthError("invalid policy token") + logger.warning("control operation rejected: invalid token for %s", op) + event = DenialEvent( + cell="supervisor", + capability="control", + attempted_action=op, + policy_rule="policy-token", + kernel_decision="not_evaluated", + broker_decision="deny", + ) + raise PolicyAuthError("invalid policy token", denial_event=event) handle = CapabilityHandle(kind="policy-token", subject=op) return ControlRequest(op=op, capability=handle, payload={}) diff --git a/pyisolate/telemetry.py b/pyisolate/telemetry.py new file mode 100644 index 0000000..f26c658 --- /dev/null +++ b/pyisolate/telemetry.py @@ -0,0 +1,29 @@ +"""Structured runtime telemetry events for PyIsolate.""" + +from __future__ import annotations + +from dataclasses import asdict, dataclass +from typing import Literal + +Decision = Literal["allow", "deny", "not_evaluated", "unavailable"] + + +@dataclass(frozen=True) +class DenialEvent: + """A first-class event emitted whenever a sandbox operation is denied. + + The event intentionally records both broker and kernel decisions so callers + can distinguish userspace broker denials from future eBPF/LSM denials. + """ + + cell: str + capability: str + attempted_action: str + policy_rule: str + kernel_decision: Decision + broker_decision: Decision + + def to_dict(self) -> dict[str, str]: + """Return a JSON-serializable representation of this denial.""" + + return asdict(self) diff --git a/tests/test_alerts.py b/tests/test_alerts.py index c79d8b9..edef049 100644 --- a/tests/test_alerts.py +++ b/tests/test_alerts.py @@ -65,9 +65,34 @@ def emit(self, record): @contextmanager def assert_policy_error(): + caught = type("Caught", (), {"value": None})() try: - yield - except iso.PolicyError: - pass + yield caught + except iso.PolicyError as exc: + caught.value = exc else: raise AssertionError("PolicyError not raised") + + +def test_denied_operation_emits_structured_telemetry(tmp_path): + p = iso.policy.Policy().allow_fs(str(tmp_path)) + sup = iso.Supervisor() + sb = sup.spawn("deny-telemetry", policy=p) + try: + sb.exec("open('/etc/hosts').read()") + with assert_policy_error() as caught: + sb.recv(timeout=1) + finally: + sup.shutdown() + + event = caught.value.denial_event + assert event is not None + assert event.to_dict() == { + "cell": "deny-telemetry", + "capability": "filesystem", + "attempted_action": "open:/etc/hosts", + "policy_rule": f"allow_fs:{tmp_path.resolve(strict=False)}", + "kernel_decision": "not_evaluated", + "broker_decision": "deny", + } + assert sb.get_denial_events() == [event.to_dict()] diff --git a/tests/test_metrics.py b/tests/test_metrics.py index a8d65a9..b677672 100644 --- a/tests/test_metrics.py +++ b/tests/test_metrics.py @@ -4,32 +4,8 @@ ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) -import sys import types - -class _StubBPFManager: - def __init__(self): - self.loaded = False - self.policy_maps = {} - - def load(self, strict: bool = False) -> None: # pragma: no cover - stub - self.loaded = False - - def hot_reload(self, policy_path: str) -> None: # pragma: no cover - stub - raise RuntimeError("BPF disabled") - - def _run(self, *_, **__): # pragma: no cover - stub - return True - - def open_ring_buffer(self): # pragma: no cover - stub - return iter(()) - - -bpf_stub = types.ModuleType("pyisolate.bpf.manager") -bpf_stub.BPFManager = _StubBPFManager # type: ignore[attr-defined] -sys.modules["pyisolate.bpf.manager"] = bpf_stub - import pyisolate as iso from pyisolate.observability.metrics import MetricsExporter @@ -122,7 +98,9 @@ def __init__(self): latency_sum=17.5, ) - monkeypatch.setattr(supervisor, "list_active", lambda: {"sandbox-z": _FakeSandbox()}) + monkeypatch.setattr( + supervisor, "list_active", lambda: {"sandbox-z": _FakeSandbox()} + ) metrics = MetricsExporter().export() bucket_lines = [ line @@ -169,3 +147,23 @@ def __init__(self): 'pyisolate_latency_ms_bucket{sandbox="sandbox-missing",le="10"} 1', 'pyisolate_latency_ms_bucket{sandbox="sandbox-missing",le="+Inf"} 1', ] + + +def test_export_contains_denial_metrics(): + sb = iso.spawn("metrics-denial") + try: + sb.exec("open('/etc/hosts').read()") + try: + sb.recv(timeout=0.5) + except iso.PolicyError: + pass + metrics = MetricsExporter().export() + assert "# HELP pyisolate_denials_total" in metrics + assert "# TYPE pyisolate_denials_total counter" in metrics + assert 'pyisolate_denials_total{sandbox="metrics-denial"} 1' in metrics + assert "pyisolate_denial_events_total" in metrics + assert 'capability="filesystem"' in metrics + assert 'kernel_decision="not_evaluated"' in metrics + assert 'broker_decision="deny"' in metrics + finally: + sb.close()