diff --git a/src/session_store.py b/src/session_store.py index 5f7502a56d..63cac1c142 100644 --- a/src/session_store.py +++ b/src/session_store.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +import re from dataclasses import asdict, dataclass from pathlib import Path @@ -15,16 +16,31 @@ class StoredSession: DEFAULT_SESSION_DIR = Path('.port_sessions') +# Session ids become filenames inside the session directory, so they must not be +# able to escape it via path separators or traversal. Restrict to the charset +# produced by legitimate ids (uuid4 hex, `session--`). +_SESSION_ID_PATTERN = re.compile(r'\A[A-Za-z0-9_-]+\Z') + + +def _validate_session_id(session_id: str) -> str: + if not _SESSION_ID_PATTERN.match(session_id): + raise ValueError( + f'invalid session_id {session_id!r}: only [A-Za-z0-9_-] characters are allowed' + ) + return session_id + def save_session(session: StoredSession, directory: Path | None = None) -> Path: + session_id = _validate_session_id(session.session_id) target_dir = directory or DEFAULT_SESSION_DIR target_dir.mkdir(parents=True, exist_ok=True) - path = target_dir / f'{session.session_id}.json' + path = target_dir / f'{session_id}.json' path.write_text(json.dumps(asdict(session), indent=2)) return path def load_session(session_id: str, directory: Path | None = None) -> StoredSession: + session_id = _validate_session_id(session_id) target_dir = directory or DEFAULT_SESSION_DIR data = json.loads((target_dir / f'{session_id}.json').read_text()) return StoredSession( diff --git a/tests/test_security_scope.py b/tests/test_security_scope.py index 59275dda78..e086d1f7b5 100644 --- a/tests/test_security_scope.py +++ b/tests/test_security_scope.py @@ -9,9 +9,58 @@ from src.path_scope import WorkspacePathScope, extract_path_candidates from src.permissions import ToolPermissionContext from src.query_engine import QueryEnginePort +from src.session_store import StoredSession, load_session, save_session from src.tools import execute_tool +class SessionStorePathTraversalTests(unittest.TestCase): + def test_traversal_session_id_is_rejected_on_load(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + store = Path(tmp) / 'store' + store.mkdir() + # A secret living outside the session store that traversal would reach. + (Path(tmp) / 'secret.json').write_text('{}') + + with self.assertRaises(ValueError): + load_session('../secret', directory=store) + + def test_absolute_and_separator_session_ids_are_rejected(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + store = Path(tmp) / 'store' + store.mkdir() + for bad_id in ('/etc/passwd', 'nested/child', '..', 'a/../../b'): + with self.assertRaises(ValueError): + load_session(bad_id, directory=store) + + def test_traversal_session_id_is_rejected_on_save(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + store = Path(tmp) / 'store' + store.mkdir() + session = StoredSession( + session_id='../escape', + messages=('hello',), + input_tokens=1, + output_tokens=2, + ) + with self.assertRaises(ValueError): + save_session(session, directory=store) + + def test_legitimate_session_id_round_trips(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + store = Path(tmp) / 'store' + session = StoredSession( + session_id='session-1775386832313-0', + messages=('hi', 'there'), + input_tokens=3, + output_tokens=4, + ) + path = save_session(session, directory=store) + + self.assertEqual(store, path.parent) + loaded = load_session('session-1775386832313-0', directory=store) + self.assertEqual(session, loaded) + + class WorkspacePathScopeTests(unittest.TestCase): def test_direct_parent_escape_is_denied(self) -> None: with tempfile.TemporaryDirectory() as tmp: