diff --git a/README.md b/README.md index 64e53c9..94719de 100644 --- a/README.md +++ b/README.md @@ -144,10 +144,15 @@ behavior, policy enforcement, and timeout/kill behavior): ```bash python -m pyisolate.conformance python -m pyisolate.conformance --json +python -m pyisolate.conformance --grade +pyisolate-doctor --grade ``` -Use this in CI or admission checks to replace hand-wavy security claims with a -repeatable pass/fail report. +The `--grade` output replaces a vague secure/insecure claim with an 8-point +score over the guarantees that are actually active on the host: free-threading, +eBPF-LSM, cgroup v2, Landlock fallback, no-GIL extension safety, broker crypto, +quota enforcement, and crash isolation. Use it in CI or admission checks to +attach evidence to each guarantee rather than relying on a single pass/fail bit. ### Policy editor diff --git a/pyisolate/conformance.py b/pyisolate/conformance.py index 340300a..bcfd195 100644 --- a/pyisolate/conformance.py +++ b/pyisolate/conformance.py @@ -40,6 +40,43 @@ class ProbeResult: evidence: dict[str, object] +@dataclass +class GradeComponent: + """Scored status for one host guarantee area.""" + + key: str + label: str + score: int + max_score: int + active: bool + details: str + evidence: dict[str, object] + + +@dataclass +class GradeReport: + """Machine-readable conformance score for active PyIsolate guarantees.""" + + score: int + max_score: int + percent: float + generated_at_epoch_s: int + host: str + components: list[GradeComponent] + + def to_dict(self) -> dict[str, object]: + payload = asdict(self) + payload["components"] = [asdict(c) for c in self.components] + payload["active_guarantees"] = [c.key for c in self.components if c.active] + payload["inactive_guarantees"] = [ + c.key for c in self.components if not c.active + ] + return payload + + def to_json(self) -> str: + return json.dumps(self.to_dict(), indent=2, sort_keys=True) + + @dataclass class ConformanceReport: """Structured result for a full conformance run.""" @@ -65,6 +102,82 @@ def to_json(self) -> str: class ConformanceSuite: """Runs host-level probes that back PyIsolate guarantee claims.""" + def grade(self) -> GradeReport: + """Return a scored report of which PyIsolate guarantees are active.""" + + python_build = self._probe_python_build() + bpf_availability = self._probe_bpf_availability() + cgroup_behavior = self._probe_cgroup_behavior() + policy_enforcement = self._probe_policy_enforcement() + quota_enforcement = self._probe_timeout_and_kill_behavior() + ebpf_lsm = self._probe_ebpf_lsm(bpf_availability) + landlock_fallback = self._probe_landlock_fallback(ebpf_lsm.passed) + no_gil_extension_safety = self._probe_no_gil_extension_safety( + python_build, policy_enforcement + ) + broker_crypto = self._probe_broker_crypto() + crash_isolation = self._probe_crash_isolation() + + probe_components = [ + ( + "free_threading", + "free-threading", + python_build, + bool(python_build.evidence.get("py_gil_disabled")) + and sys.version_info >= (3, 13), + ), + ("ebpf_lsm", "eBPF-LSM", ebpf_lsm, ebpf_lsm.passed), + ("cgroup_v2", "cgroup v2", cgroup_behavior, cgroup_behavior.passed), + ( + "landlock_fallback", + "Landlock fallback", + landlock_fallback, + landlock_fallback.passed, + ), + ( + "no_gil_extension_safety", + "no-GIL extension safety", + no_gil_extension_safety, + no_gil_extension_safety.passed, + ), + ("broker_crypto", "broker crypto", broker_crypto, broker_crypto.passed), + ( + "quota_enforcement", + "quota enforcement", + quota_enforcement, + quota_enforcement.passed, + ), + ( + "crash_isolation", + "crash isolation", + crash_isolation, + crash_isolation.passed, + ), + ] + components = [ + GradeComponent( + key=key, + label=label, + score=1 if active else 0, + max_score=1, + active=active, + details=probe.details, + evidence=probe.evidence, + ) + for key, label, probe, active in probe_components + ] + score = sum(component.score for component in components) + max_score = sum(component.max_score for component in components) + percent = round((score / max_score) * 100, 1) if max_score else 0.0 + return GradeReport( + score=score, + max_score=max_score, + percent=percent, + generated_at_epoch_s=int(time.time()), + host=platform.node() or "unknown", + components=components, + ) + def run(self) -> ConformanceReport: probes = [ self._probe_python_build(), @@ -123,8 +236,7 @@ def _probe_kernel_capabilities(self) -> ProbeResult: break caps_value = int(caps_hex, 16) present = { - name: bool(caps_value & (1 << bit)) - for name, bit in CAPABILITY_BITS.items() + name: bool(caps_value & (1 << bit)) for name, bit in CAPABILITY_BITS.items() } passed = all(present.values()) return ProbeResult( @@ -151,7 +263,11 @@ def _probe_bpf_availability(self) -> ProbeResult: if mounts.exists(): for line in mounts.read_text(encoding="utf-8").splitlines(): fields = line.split() - if len(fields) >= 3 and fields[1] == "/sys/fs/bpf" and fields[2] == "bpf": + if ( + len(fields) >= 3 + and fields[1] == "/sys/fs/bpf" + and fields[2] == "bpf" + ): bpffs_mounted = True break bpftool_works = False @@ -176,6 +292,50 @@ def _probe_bpf_availability(self) -> ProbeResult: }, ) + def _probe_ebpf_lsm( + self, bpf_availability: ProbeResult | None = None + ) -> ProbeResult: + lsm_path = Path("/sys/kernel/security/lsm") + lsm_entries: list[str] = [] + if lsm_path.exists(): + lsm_entries = [ + entry + for entry in lsm_path.read_text(encoding="utf-8").strip().split(",") + if entry + ] + if bpf_availability is None: + bpf_availability = self._probe_bpf_availability() + has_bpf_lsm = "bpf" in lsm_entries + passed = has_bpf_lsm and bpf_availability.passed + return ProbeResult( + name="ebpf_lsm", + passed=passed, + required=True, + details="BPF LSM hook and BPF toolchain are available for kernel policy enforcement", + evidence={ + "lsm_path": str(lsm_path), + "lsm_entries": lsm_entries, + "bpf_lsm_enabled": has_bpf_lsm, + "bpf_availability": bpf_availability.evidence, + }, + ) + + def _probe_landlock_fallback(self, ebpf_lsm_active: bool = False) -> ProbeResult: + landlock_path = Path("/sys/kernel/security/landlock") + available = landlock_path.exists() + return ProbeResult( + name="landlock_fallback", + passed=available, + required=False, + details="Landlock fallback is available when privileged eBPF-LSM enforcement is inactive", + evidence={ + "landlock_path": str(landlock_path), + "available": available, + "fallback_active": available and not ebpf_lsm_active, + "ebpf_lsm_active": ebpf_lsm_active, + }, + ) + def _probe_cgroup_behavior(self) -> ProbeResult: from pyisolate import cgroup @@ -188,9 +348,9 @@ def _probe_cgroup_behavior(self) -> ProbeResult: if created: cgroup.attach_current(cg_path) threads_file = Path(cg_path) / "cgroup.threads" - attached = threads_file.exists() and str(os.gettid()) in threads_file.read_text( - encoding="utf-8" - ) + attached = threads_file.exists() and str( + os.gettid() + ) in threads_file.read_text(encoding="utf-8") cgroup.delete(cg_path) deleted = not Path(cg_path).exists() passed = bool(is_v2 and created and attached and deleted) @@ -228,6 +388,126 @@ def _probe_policy_enforcement(self) -> ProbeResult: evidence={"blocked_disallowed_import": blocked_import}, ) + def _probe_no_gil_extension_safety( + self, + python_build: ProbeResult | None = None, + policy_enforcement: ProbeResult | None = None, + ) -> ProbeResult: + if python_build is None: + python_build = self._probe_python_build() + if policy_enforcement is None: + policy_enforcement = self._probe_policy_enforcement() + blocked_native_loader = False + from pyisolate.errors import PolicyError + + with iso.spawn( + "conformance-native-loader", allowed_imports=["math"] + ) as sandbox: + sandbox.exec("import ctypes") + try: + sandbox.recv(timeout=1) + except PolicyError: + blocked_native_loader = True + except Exception: + blocked_native_loader = False + gil_disabled = bool(python_build.evidence.get("py_gil_disabled")) + passed = gil_disabled and policy_enforcement.passed and blocked_native_loader + return ProbeResult( + name="no_gil_extension_safety", + passed=passed, + required=True, + details="Free-threaded Python is active and sandbox policy blocks unaudited native loaders", + evidence={ + "py_gil_disabled": gil_disabled, + "policy_enforcement_passed": policy_enforcement.passed, + "blocked_ctypes_import": blocked_native_loader, + "compatibility_matrix": "docs/compatibility-matrix.md", + }, + ) + + def _probe_broker_crypto(self) -> ProbeResult: + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import x25519 + from pyisolate.broker.crypto import CryptoBroker + + priv_a = x25519.X25519PrivateKey.generate() + priv_b = x25519.X25519PrivateKey.generate() + priv_a_bytes = priv_a.private_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PrivateFormat.Raw, + encryption_algorithm=serialization.NoEncryption(), + ) + priv_b_bytes = priv_b.private_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PrivateFormat.Raw, + encryption_algorithm=serialization.NoEncryption(), + ) + pub_a = priv_a.public_key().public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) + pub_b = priv_b.public_key().public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) + broker_a = CryptoBroker(priv_a_bytes, pub_b, max_frame_len=64) + broker_b = CryptoBroker(priv_b_bytes, pub_a, max_frame_len=64) + frame = broker_a.frame(b"doctor-grade") + roundtrip = broker_b.unframe(frame) == b"doctor-grade" + replay_blocked = False + oversized_blocked = False + try: + broker_b.unframe(frame) + except ValueError: + replay_blocked = True + large_frame = broker_a.frame(b"x") + (b"x" * 128) + try: + broker_b.unframe(large_frame) + except ValueError: + oversized_blocked = True + passed = roundtrip and replay_blocked and oversized_blocked + return ProbeResult( + name="broker_crypto", + passed=passed, + required=True, + details="Broker channel authenticates AEAD frames and rejects replay/oversized frames", + evidence={ + "key_exchange": "X25519", + "aead": "ChaCha20-Poly1305", + "roundtrip": roundtrip, + "replay_blocked": replay_blocked, + "oversized_frame_blocked": oversized_blocked, + }, + ) + + def _probe_crash_isolation(self) -> ProbeResult: + from pyisolate.errors import SandboxError + + exception_isolated = False + supervisor_survived = False + with iso.spawn("conformance-crash") as sandbox: + sandbox.exec("raise RuntimeError('guest crash')") + try: + sandbox.recv(timeout=1) + except SandboxError: + exception_isolated = True + except Exception: + exception_isolated = False + with iso.spawn("conformance-crash-survivor") as sandbox: + sandbox.exec("post('alive')") + supervisor_survived = sandbox.recv(timeout=1) == "alive" + passed = exception_isolated and supervisor_survived + return ProbeResult( + name="crash_isolation", + passed=passed, + required=True, + details="Guest exceptions are contained and the supervisor can launch a fresh sandbox", + evidence={ + "guest_exception_isolated": exception_isolated, + "supervisor_survived": supervisor_survived, + }, + ) + def _probe_timeout_and_kill_behavior(self) -> ProbeResult: from pyisolate.errors import CPUExceeded, TimeoutError @@ -272,10 +552,22 @@ def run_conformance_suite() -> ConformanceReport: def main(argv: list[str] | None = None) -> int: """CLI entrypoint for host conformance checks.""" - parser = argparse.ArgumentParser(description="Run PyIsolate host conformance checks") - parser.add_argument("--json", action="store_true", help="emit machine-readable JSON") + parser = argparse.ArgumentParser( + description="Run PyIsolate host conformance checks" + ) + parser.add_argument( + "--json", action="store_true", help="emit machine-readable JSON" + ) + parser.add_argument( + "--grade", action="store_true", help="emit scored guarantee report" + ) args = parser.parse_args(argv) + if args.grade: + grade = ConformanceSuite().grade() + print(grade.to_json()) + return 0 + report = run_conformance_suite() if args.json: print(report.to_json()) diff --git a/pyisolate/doctor.py b/pyisolate/doctor.py index b15a64f..957ad58 100644 --- a/pyisolate/doctor.py +++ b/pyisolate/doctor.py @@ -6,6 +6,7 @@ import json from typing import Any +from .conformance import ConformanceSuite from .nogil import imported_native_extensions, no_gil_readiness_report from .provenance import installation_report_json @@ -42,6 +43,14 @@ def main(argv: list[str] | None = None) -> None: prog="pyisolate-doctor", description="Print PyIsolate build provenance and kernel/no-GIL feature flags.", ) + parser.add_argument( + "--grade", + action="store_true", + help="emit a scored report of active PyIsolate guarantees", + ) + args = parser.parse_args(argv) + if args.grade: + print(ConformanceSuite().grade().to_json()) subparsers = parser.add_subparsers(dest="command") gil_parser = subparsers.add_parser( diff --git a/tests/test_conformance.py b/tests/test_conformance.py index e65f1bd..e4bb594 100644 --- a/tests/test_conformance.py +++ b/tests/test_conformance.py @@ -20,7 +20,9 @@ def test_suite_aggregates_required_and_optional(monkeypatch): monkeypatch.setattr( ConformanceSuite, "_probe_bpf_availability", lambda self: probes[2] ) - monkeypatch.setattr(ConformanceSuite, "_probe_cgroup_behavior", lambda self: probes[3]) + monkeypatch.setattr( + ConformanceSuite, "_probe_cgroup_behavior", lambda self: probes[3] + ) monkeypatch.setattr( ConformanceSuite, "_probe_policy_enforcement", lambda self: probes[4] ) @@ -56,3 +58,115 @@ def test_cli_json_output(monkeypatch, capsys): assert exit_code == 0 assert json.loads(out)["passed"] is True + + +def test_grade_report_scores_named_guarantees(monkeypatch): + monkeypatch.setattr( + ConformanceSuite, + "_probe_python_build", + lambda self: ProbeResult( + "python_build", + True, + True, + "python ok", + {"py_gil_disabled": True}, + ), + ) + monkeypatch.setattr( + ConformanceSuite, + "_probe_bpf_availability", + lambda self: ProbeResult("bpf_availability", True, True, "bpf ok", {}), + ) + monkeypatch.setattr( + ConformanceSuite, + "_probe_cgroup_behavior", + lambda self: ProbeResult("cgroup_behavior", True, True, "cgroup ok", {}), + ) + monkeypatch.setattr( + ConformanceSuite, + "_probe_policy_enforcement", + lambda self: ProbeResult("policy_enforcement", True, True, "policy ok", {}), + ) + monkeypatch.setattr( + ConformanceSuite, + "_probe_timeout_and_kill_behavior", + lambda self: ProbeResult( + "timeout_and_kill_behavior", True, True, "quota ok", {} + ), + ) + monkeypatch.setattr( + ConformanceSuite, + "_probe_ebpf_lsm", + lambda self, bpf_availability=None: ProbeResult( + "ebpf_lsm", True, True, "lsm ok", {} + ), + ) + monkeypatch.setattr( + ConformanceSuite, + "_probe_landlock_fallback", + lambda self, ebpf_lsm_active=False: ProbeResult( + "landlock_fallback", False, False, "no landlock", {} + ), + ) + monkeypatch.setattr( + ConformanceSuite, + "_probe_no_gil_extension_safety", + lambda self, python_build=None, policy_enforcement=None: ProbeResult( + "no_gil_extension_safety", True, True, "native loaders blocked", {} + ), + ) + monkeypatch.setattr( + ConformanceSuite, + "_probe_broker_crypto", + lambda self: ProbeResult("broker_crypto", True, True, "crypto ok", {}), + ) + monkeypatch.setattr( + ConformanceSuite, + "_probe_crash_isolation", + lambda self: ProbeResult("crash_isolation", True, True, "crash ok", {}), + ) + monkeypatch.setattr("pyisolate.conformance.sys.version_info", (3, 13, 0)) + + report = ConformanceSuite().grade() + payload = report.to_dict() + + assert report.score == 7 + assert report.max_score == 8 + assert payload["active_guarantees"] == [ + "free_threading", + "ebpf_lsm", + "cgroup_v2", + "no_gil_extension_safety", + "broker_crypto", + "quota_enforcement", + "crash_isolation", + ] + assert payload["inactive_guarantees"] == ["landlock_fallback"] + assert [component["label"] for component in payload["components"]] == [ + "free-threading", + "eBPF-LSM", + "cgroup v2", + "Landlock fallback", + "no-GIL extension safety", + "broker crypto", + "quota enforcement", + "crash isolation", + ] + + +def test_conformance_cli_grade(monkeypatch, capsys): + monkeypatch.setattr( + ConformanceSuite, + "grade", + lambda self: type( + "FakeGrade", + (), + {"to_json": lambda _self: json.dumps({"score": 6, "max_score": 8})}, + )(), + ) + + exit_code = main(["--grade"]) + out = capsys.readouterr().out + + assert exit_code == 0 + assert json.loads(out) == {"score": 6, "max_score": 8} diff --git a/tests/test_provenance.py b/tests/test_provenance.py index 17e2069..307fc1e 100644 --- a/tests/test_provenance.py +++ b/tests/test_provenance.py @@ -26,6 +26,23 @@ def test_doctor_cli_output(capsys): assert "features" in payload["kernel"] +def test_doctor_cli_grade_output(monkeypatch, capsys): + from pyisolate.doctor import ConformanceSuite + + monkeypatch.setattr( + ConformanceSuite, + "grade", + lambda self: type( + "FakeGrade", + (), + {"to_json": lambda _self: json.dumps({"score": 5, "max_score": 8})}, + )(), + ) + + main(["--grade"]) + captured = capsys.readouterr() + + assert json.loads(captured.out) == {"score": 5, "max_score": 8} def test_installation_report_exposes_no_gil_axis(): report = installation_report() assert report["no_gil"]["axis"]["mode"] in {