Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,15 @@ behavior, policy enforcement, and timeout/kill behavior):
```bash
python -m pyisolate.conformance
python -m pyisolate.conformance --json
python -m pyisolate.conformance --grade
pyisolate-doctor --grade
```

Use this in CI or admission checks to replace hand-wavy security claims with a
repeatable pass/fail report.
The `--grade` output replaces a vague secure/insecure claim with an 8-point
score over the guarantees that are actually active on the host: free-threading,
eBPF-LSM, cgroup v2, Landlock fallback, no-GIL extension safety, broker crypto,
quota enforcement, and crash isolation. Use it in CI or admission checks to
attach evidence to each guarantee rather than relying on a single pass/fail bit.

### Policy editor

Expand Down
308 changes: 300 additions & 8 deletions pyisolate/conformance.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,43 @@ class ProbeResult:
evidence: dict[str, object]


@dataclass
class GradeComponent:
"""Scored status for one host guarantee area."""

key: str
label: str
score: int
max_score: int
active: bool
details: str
evidence: dict[str, object]


@dataclass
class GradeReport:
"""Machine-readable conformance score for active PyIsolate guarantees."""

score: int
max_score: int
percent: float
generated_at_epoch_s: int
host: str
components: list[GradeComponent]

def to_dict(self) -> dict[str, object]:
payload = asdict(self)
payload["components"] = [asdict(c) for c in self.components]
payload["active_guarantees"] = [c.key for c in self.components if c.active]
payload["inactive_guarantees"] = [
c.key for c in self.components if not c.active
]
return payload

def to_json(self) -> str:
return json.dumps(self.to_dict(), indent=2, sort_keys=True)


@dataclass
class ConformanceReport:
"""Structured result for a full conformance run."""
Expand All @@ -65,6 +102,82 @@ def to_json(self) -> str:
class ConformanceSuite:
"""Runs host-level probes that back PyIsolate guarantee claims."""

def grade(self) -> GradeReport:
"""Return a scored report of which PyIsolate guarantees are active."""

python_build = self._probe_python_build()
bpf_availability = self._probe_bpf_availability()
cgroup_behavior = self._probe_cgroup_behavior()
policy_enforcement = self._probe_policy_enforcement()
quota_enforcement = self._probe_timeout_and_kill_behavior()
ebpf_lsm = self._probe_ebpf_lsm(bpf_availability)
landlock_fallback = self._probe_landlock_fallback(ebpf_lsm.passed)
no_gil_extension_safety = self._probe_no_gil_extension_safety(
python_build, policy_enforcement
)
broker_crypto = self._probe_broker_crypto()
crash_isolation = self._probe_crash_isolation()

probe_components = [
(
"free_threading",
"free-threading",
python_build,
bool(python_build.evidence.get("py_gil_disabled"))
and sys.version_info >= (3, 13),
),
("ebpf_lsm", "eBPF-LSM", ebpf_lsm, ebpf_lsm.passed),
("cgroup_v2", "cgroup v2", cgroup_behavior, cgroup_behavior.passed),
(
"landlock_fallback",
"Landlock fallback",
landlock_fallback,
landlock_fallback.passed,
),
(
"no_gil_extension_safety",
"no-GIL extension safety",
no_gil_extension_safety,
no_gil_extension_safety.passed,
),
("broker_crypto", "broker crypto", broker_crypto, broker_crypto.passed),
(
"quota_enforcement",
"quota enforcement",
quota_enforcement,
quota_enforcement.passed,
),
(
"crash_isolation",
"crash isolation",
crash_isolation,
crash_isolation.passed,
),
]
components = [
GradeComponent(
key=key,
label=label,
score=1 if active else 0,
max_score=1,
active=active,
details=probe.details,
evidence=probe.evidence,
)
for key, label, probe, active in probe_components
]
score = sum(component.score for component in components)
max_score = sum(component.max_score for component in components)
percent = round((score / max_score) * 100, 1) if max_score else 0.0
return GradeReport(
score=score,
max_score=max_score,
percent=percent,
generated_at_epoch_s=int(time.time()),
host=platform.node() or "unknown",
components=components,
)

def run(self) -> ConformanceReport:
probes = [
self._probe_python_build(),
Expand Down Expand Up @@ -123,8 +236,7 @@ def _probe_kernel_capabilities(self) -> ProbeResult:
break
caps_value = int(caps_hex, 16)
present = {
name: bool(caps_value & (1 << bit))
for name, bit in CAPABILITY_BITS.items()
name: bool(caps_value & (1 << bit)) for name, bit in CAPABILITY_BITS.items()
}
passed = all(present.values())
return ProbeResult(
Expand All @@ -151,7 +263,11 @@ def _probe_bpf_availability(self) -> ProbeResult:
if mounts.exists():
for line in mounts.read_text(encoding="utf-8").splitlines():
fields = line.split()
if len(fields) >= 3 and fields[1] == "/sys/fs/bpf" and fields[2] == "bpf":
if (
len(fields) >= 3
and fields[1] == "/sys/fs/bpf"
and fields[2] == "bpf"
):
bpffs_mounted = True
break
bpftool_works = False
Expand All @@ -176,6 +292,50 @@ def _probe_bpf_availability(self) -> ProbeResult:
},
)

def _probe_ebpf_lsm(
self, bpf_availability: ProbeResult | None = None
) -> ProbeResult:
lsm_path = Path("/sys/kernel/security/lsm")
lsm_entries: list[str] = []
if lsm_path.exists():
lsm_entries = [
entry
for entry in lsm_path.read_text(encoding="utf-8").strip().split(",")
if entry
]
if bpf_availability is None:
bpf_availability = self._probe_bpf_availability()
has_bpf_lsm = "bpf" in lsm_entries
passed = has_bpf_lsm and bpf_availability.passed
return ProbeResult(
name="ebpf_lsm",
passed=passed,
required=True,
details="BPF LSM hook and BPF toolchain are available for kernel policy enforcement",
evidence={
"lsm_path": str(lsm_path),
"lsm_entries": lsm_entries,
"bpf_lsm_enabled": has_bpf_lsm,
"bpf_availability": bpf_availability.evidence,
},
)

def _probe_landlock_fallback(self, ebpf_lsm_active: bool = False) -> ProbeResult:
landlock_path = Path("/sys/kernel/security/landlock")
available = landlock_path.exists()
return ProbeResult(
name="landlock_fallback",
passed=available,
required=False,
details="Landlock fallback is available when privileged eBPF-LSM enforcement is inactive",
evidence={
"landlock_path": str(landlock_path),
"available": available,
"fallback_active": available and not ebpf_lsm_active,
"ebpf_lsm_active": ebpf_lsm_active,
},
)

def _probe_cgroup_behavior(self) -> ProbeResult:
from pyisolate import cgroup

Expand All @@ -188,9 +348,9 @@ def _probe_cgroup_behavior(self) -> ProbeResult:
if created:
cgroup.attach_current(cg_path)
threads_file = Path(cg_path) / "cgroup.threads"
attached = threads_file.exists() and str(os.gettid()) in threads_file.read_text(
encoding="utf-8"
)
attached = threads_file.exists() and str(
os.gettid()
) in threads_file.read_text(encoding="utf-8")
cgroup.delete(cg_path)
deleted = not Path(cg_path).exists()
passed = bool(is_v2 and created and attached and deleted)
Expand Down Expand Up @@ -228,6 +388,126 @@ def _probe_policy_enforcement(self) -> ProbeResult:
evidence={"blocked_disallowed_import": blocked_import},
)

def _probe_no_gil_extension_safety(
self,
python_build: ProbeResult | None = None,
policy_enforcement: ProbeResult | None = None,
) -> ProbeResult:
if python_build is None:
python_build = self._probe_python_build()
if policy_enforcement is None:
policy_enforcement = self._probe_policy_enforcement()
blocked_native_loader = False
from pyisolate.errors import PolicyError

with iso.spawn(
"conformance-native-loader", allowed_imports=["math"]
) as sandbox:
sandbox.exec("import ctypes")
try:
sandbox.recv(timeout=1)
except PolicyError:
blocked_native_loader = True
except Exception:
blocked_native_loader = False
gil_disabled = bool(python_build.evidence.get("py_gil_disabled"))
passed = gil_disabled and policy_enforcement.passed and blocked_native_loader
return ProbeResult(
name="no_gil_extension_safety",
passed=passed,
required=True,
details="Free-threaded Python is active and sandbox policy blocks unaudited native loaders",
evidence={
"py_gil_disabled": gil_disabled,
"policy_enforcement_passed": policy_enforcement.passed,
"blocked_ctypes_import": blocked_native_loader,
"compatibility_matrix": "docs/compatibility-matrix.md",
},
)

def _probe_broker_crypto(self) -> ProbeResult:
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import x25519
from pyisolate.broker.crypto import CryptoBroker

priv_a = x25519.X25519PrivateKey.generate()
priv_b = x25519.X25519PrivateKey.generate()
priv_a_bytes = priv_a.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption(),
)
priv_b_bytes = priv_b.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption(),
)
pub_a = priv_a.public_key().public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
pub_b = priv_b.public_key().public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
broker_a = CryptoBroker(priv_a_bytes, pub_b, max_frame_len=64)
broker_b = CryptoBroker(priv_b_bytes, pub_a, max_frame_len=64)
frame = broker_a.frame(b"doctor-grade")
roundtrip = broker_b.unframe(frame) == b"doctor-grade"
replay_blocked = False
oversized_blocked = False
try:
broker_b.unframe(frame)
except ValueError:
replay_blocked = True
large_frame = broker_a.frame(b"x") + (b"x" * 128)
try:
broker_b.unframe(large_frame)
except ValueError:
oversized_blocked = True
passed = roundtrip and replay_blocked and oversized_blocked
return ProbeResult(
name="broker_crypto",
passed=passed,
required=True,
details="Broker channel authenticates AEAD frames and rejects replay/oversized frames",
evidence={
"key_exchange": "X25519",
"aead": "ChaCha20-Poly1305",
"roundtrip": roundtrip,
"replay_blocked": replay_blocked,
"oversized_frame_blocked": oversized_blocked,
},
)

def _probe_crash_isolation(self) -> ProbeResult:
from pyisolate.errors import SandboxError

exception_isolated = False
supervisor_survived = False
with iso.spawn("conformance-crash") as sandbox:
sandbox.exec("raise RuntimeError('guest crash')")
try:
sandbox.recv(timeout=1)
except SandboxError:
exception_isolated = True
except Exception:
exception_isolated = False
with iso.spawn("conformance-crash-survivor") as sandbox:
sandbox.exec("post('alive')")
supervisor_survived = sandbox.recv(timeout=1) == "alive"
passed = exception_isolated and supervisor_survived
return ProbeResult(
name="crash_isolation",
passed=passed,
required=True,
details="Guest exceptions are contained and the supervisor can launch a fresh sandbox",
evidence={
"guest_exception_isolated": exception_isolated,
"supervisor_survived": supervisor_survived,
},
)

def _probe_timeout_and_kill_behavior(self) -> ProbeResult:
from pyisolate.errors import CPUExceeded, TimeoutError

Expand Down Expand Up @@ -272,10 +552,22 @@ def run_conformance_suite() -> ConformanceReport:
def main(argv: list[str] | None = None) -> int:
"""CLI entrypoint for host conformance checks."""

parser = argparse.ArgumentParser(description="Run PyIsolate host conformance checks")
parser.add_argument("--json", action="store_true", help="emit machine-readable JSON")
parser = argparse.ArgumentParser(
description="Run PyIsolate host conformance checks"
)
parser.add_argument(
"--json", action="store_true", help="emit machine-readable JSON"
)
parser.add_argument(
"--grade", action="store_true", help="emit scored guarantee report"
)
args = parser.parse_args(argv)

if args.grade:
grade = ConformanceSuite().grade()
print(grade.to_json())
return 0

report = run_conformance_suite()
if args.json:
print(report.to_json())
Expand Down
Loading
Loading