diff --git a/src/claude_agent_sdk/__init__.py b/src/claude_agent_sdk/__init__.py index 57c51718..85052364 100644 --- a/src/claude_agent_sdk/__init__.py +++ b/src/claude_agent_sdk/__init__.py @@ -14,7 +14,7 @@ ProcessError, ) from ._internal.session_mutations import rename_session, tag_session -from ._internal.sessions import get_session_messages, list_sessions +from ._internal.sessions import get_session_info, get_session_messages, list_sessions from ._internal.transport import Transport from ._version import __version__ from .client import ClaudeSDKClient @@ -418,6 +418,7 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> Any: "SdkPluginConfig", # Session listing "list_sessions", + "get_session_info", "get_session_messages", "SDKSessionInfo", "SessionMessage", diff --git a/src/claude_agent_sdk/_internal/sessions.py b/src/claude_agent_sdk/_internal/sessions.py index a49d2c63..88a18758 100644 --- a/src/claude_agent_sdk/_internal/sessions.py +++ b/src/claude_agent_sdk/_internal/sessions.py @@ -13,6 +13,7 @@ import subprocess import sys import unicodedata +from datetime import datetime from pathlib import Path from typing import Any @@ -395,6 +396,100 @@ def _get_worktree_paths(cwd: str) -> list[str]: return paths +# --------------------------------------------------------------------------- +# Field extraction — shared by list_sessions and get_session_info +# --------------------------------------------------------------------------- + + +def _parse_session_info_from_lite( + session_id: str, + lite: _LiteSessionFile, + project_path: str | None = None, +) -> SDKSessionInfo | None: + """Parses SDKSessionInfo fields from a lite session read (head/tail/stat). + + Returns None for sidechain sessions or metadata-only sessions with no + extractable summary. + + Shared by list_sessions and get_session_info. + """ + head, tail, mtime, size = lite.head, lite.tail, lite.mtime, lite.size + + # Check first line for sidechain sessions + first_newline = head.find("\n") + first_line = head[:first_newline] if first_newline >= 0 else head + if '"isSidechain":true' in first_line or '"isSidechain": true' in first_line: + return None + + # User-set title (customTitle) wins over AI-generated title (aiTitle). + # Head fallback covers short sessions where the title entry may not be in tail. + custom_title = ( + _extract_last_json_string_field(tail, "customTitle") + or _extract_last_json_string_field(head, "customTitle") + or _extract_last_json_string_field(tail, "aiTitle") + or _extract_last_json_string_field(head, "aiTitle") + or None + ) + first_prompt = _extract_first_prompt_from_head(head) or None + # lastPrompt tail entry shows what the user was most recently doing. + summary = ( + custom_title + or _extract_last_json_string_field(tail, "lastPrompt") + or _extract_last_json_string_field(tail, "summary") + or first_prompt + ) + + # Skip metadata-only sessions (no title, no summary, no prompt) + if not summary: + return None + + git_branch = ( + _extract_last_json_string_field(tail, "gitBranch") + or _extract_json_string_field(head, "gitBranch") + or None + ) + session_cwd = _extract_json_string_field(head, "cwd") or project_path or None + # Scope tag extraction to {"type":"tag"} lines — a bare tail scan for + # "tag" would match tool_use inputs (git tag, Docker tags, cloud resource + # tags). Mirrors TS listSessionsImpl.ts / sessionStorage.ts:629. + tag_line = next( + (ln for ln in reversed(tail.split("\n")) if ln.startswith('{"type":"tag"')), + None, + ) + tag = ( + (_extract_last_json_string_field(tag_line, "tag") or None) if tag_line else None + ) + + # created_at from first entry's ISO timestamp (epoch ms). More reliable + # than stat().birthtime which is unsupported on some filesystems. + created_at: int | None = None + first_timestamp = _extract_json_string_field(first_line, "timestamp") + if first_timestamp: + try: + # Python 3.10's fromisoformat doesn't support trailing 'Z' + ts = ( + first_timestamp.replace("Z", "+00:00") + if first_timestamp.endswith("Z") + else first_timestamp + ) + created_at = int(datetime.fromisoformat(ts).timestamp() * 1000) + except ValueError: + pass + + return SDKSessionInfo( + session_id=session_id, + summary=summary, + last_modified=mtime, + file_size=size, + custom_title=custom_title, + first_prompt=first_prompt, + git_branch=git_branch, + cwd=session_cwd, + tag=tag, + created_at=created_at, + ) + + # --------------------------------------------------------------------------- # Core implementation # --------------------------------------------------------------------------- @@ -427,45 +522,9 @@ def _read_sessions_from_dir( if lite is None: continue - head, tail, mtime, size = lite.head, lite.tail, lite.mtime, lite.size - - # Check first line for sidechain sessions - first_newline = head.find("\n") - first_line = head[:first_newline] if first_newline >= 0 else head - if '"isSidechain":true' in first_line or '"isSidechain": true' in first_line: - continue - - custom_title = _extract_last_json_string_field(tail, "customTitle") or None - first_prompt = _extract_first_prompt_from_head(head) or None - summary = ( - custom_title - or _extract_last_json_string_field(tail, "summary") - or first_prompt - ) - - # Skip metadata-only sessions (no title, no summary, no prompt) - if not summary: - continue - - git_branch = ( - _extract_last_json_string_field(tail, "gitBranch") - or _extract_json_string_field(head, "gitBranch") - or None - ) - session_cwd = _extract_json_string_field(head, "cwd") or project_path or None - - results.append( - SDKSessionInfo( - session_id=session_id, - summary=summary, - last_modified=mtime, - file_size=size, - custom_title=custom_title, - first_prompt=first_prompt, - git_branch=git_branch, - cwd=session_cwd, - ) - ) + info = _parse_session_info_from_lite(session_id, lite, project_path) + if info is not None: + results.append(info) return results @@ -634,6 +693,89 @@ def list_sessions( return _list_all_sessions(limit) +# --------------------------------------------------------------------------- +# get_session_info — single-session metadata lookup +# --------------------------------------------------------------------------- + + +def get_session_info( + session_id: str, + directory: str | None = None, +) -> SDKSessionInfo | None: + """Reads metadata for a single session by ID. + + Wraps ``_read_session_lite`` for one file — no O(n) directory scan. + Directory resolution matches ``get_session_messages``: ``directory`` is + the project path; when omitted, all project directories are searched for + the session file. + + Args: + session_id: UUID of the session to look up. + directory: Project directory path (same semantics as + ``list_sessions(directory=...)``). When omitted, all project + directories are searched for the session file. + + Returns: + ``SDKSessionInfo`` for the session, or ``None`` if the session file + is not found, is a sidechain session, or has no extractable summary. + + Example: + Look up a session in a specific project:: + + info = get_session_info( + "550e8400-e29b-41d4-a716-446655440000", + directory="/path/to/project", + ) + if info: + print(info.summary) + + Search all projects for a session:: + + info = get_session_info("550e8400-e29b-41d4-a716-446655440000") + """ + uuid = _validate_uuid(session_id) + if not uuid: + return None + file_name = f"{uuid}.jsonl" + + if directory: + canonical = _canonicalize_path(directory) + project_dir = _find_project_dir(canonical) + if project_dir is not None: + lite = _read_session_lite(project_dir / file_name) + if lite is not None: + return _parse_session_info_from_lite(uuid, lite, canonical) + + # Worktree fallback — matches get_session_messages semantics. + # Sessions may live under a different worktree root. + try: + worktree_paths = _get_worktree_paths(canonical) + except Exception: + worktree_paths = [] + for wt in worktree_paths: + if wt == canonical: + continue + wt_project_dir = _find_project_dir(wt) + if wt_project_dir is not None: + lite = _read_session_lite(wt_project_dir / file_name) + if lite is not None: + return _parse_session_info_from_lite(uuid, lite, wt) + + return None + + # No directory — search all project directories for the session file. + projects_dir = _get_projects_dir() + try: + dirents = [e for e in projects_dir.iterdir() if e.is_dir()] + except OSError: + return None + for entry in dirents: + lite = _read_session_lite(entry / file_name) + if lite is not None: + return _parse_session_info_from_lite(uuid, lite) + return None + + # --------------------------------------------------------------------------- # get_session_messages — full transcript reconstruction # --------------------------------------------------------------------------- @@ -749,7 +891,7 @@ def _build_conversation_chain( ) -> list[_TranscriptEntry]: """Builds the conversation chain by finding the leaf and walking parentUuid. - Returns messages in chronological order (root → leaf). + Returns messages in chronological order (root -> leaf). Note: logicalParentUuid (set on compact_boundary entries) is intentionally NOT followed. This matches VS Code IDE behavior — post-compaction, the diff --git a/src/claude_agent_sdk/types.py b/src/claude_agent_sdk/types.py index 23b0c15b..1be8cff8 100644 --- a/src/claude_agent_sdk/types.py +++ b/src/claude_agent_sdk/types.py @@ -974,21 +974,28 @@ class SDKSessionInfo: summary: Display title for the session — custom title, auto-generated summary, or first prompt. last_modified: Last modified time in milliseconds since epoch. - file_size: Session file size in bytes. - custom_title: User-set session title via /rename. + file_size: Session file size in bytes. Only populated for local + JSONL storage; may be ``None`` for remote storage backends. + custom_title: Session title — user-set custom title or AI-generated title. first_prompt: First meaningful user prompt in the session. git_branch: Git branch at the end of the session. cwd: Working directory for the session. + tag: User-set session tag. + created_at: Creation time in milliseconds since epoch, extracted + from the first entry's ISO timestamp field. More reliable + than stat().birthtime which is unsupported on some filesystems. """ session_id: str summary: str last_modified: int - file_size: int + file_size: int | None = None custom_title: str | None = None first_prompt: str | None = None git_branch: str | None = None cwd: str | None = None + tag: str | None = None + created_at: int | None = None @dataclass diff --git a/tests/test_sessions.py b/tests/test_sessions.py index 39cb7755..3fccf5d9 100644 --- a/tests/test_sessions.py +++ b/tests/test_sessions.py @@ -12,6 +12,7 @@ from claude_agent_sdk import ( SDKSessionInfo, SessionMessage, + get_session_info, get_session_messages, list_sessions, ) @@ -20,11 +21,18 @@ _extract_first_prompt_from_head, _extract_json_string_field, _extract_last_json_string_field, + _parse_session_info_from_lite, + _read_session_lite, _sanitize_path, _simple_hash, _validate_uuid, ) +# Matches the CLI's on-disk JSONL format (JSON.stringify / json.dumps with +# separators). Tag extraction scopes to '{"type":"tag"' (no space after colon) +# at column 0 to avoid matching tool_use inputs — fixtures must use this form. +_COMPACT = {"separators": (",", ":")} + # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @@ -1078,3 +1086,438 @@ def test_creation(self): assert msg.session_id == "sess" assert msg.message == {"role": "user", "content": "hi"} assert msg.parent_tool_use_id is None + + +# --------------------------------------------------------------------------- +# Tag extraction tests (Branch A additions) +# --------------------------------------------------------------------------- + + +class TestTagExtraction: + """Tests for tag field extraction in SDKSessionInfo.""" + + def test_tag_extracted_from_tail(self, claude_config_dir: Path, tmp_path: Path): + """Tag is extracted from the last {type:'tag'} entry in the tail.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid = str(uuid.uuid4()) + file_path = project_dir / f"{sid}.jsonl" + lines = [ + json.dumps({"type": "user", "message": {"content": "hello"}}), + json.dumps({"type": "tag", "tag": "my-tag", "sessionId": sid}, **_COMPACT), + ] + file_path.write_text("\n".join(lines) + "\n") + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert len(sessions) == 1 + assert sessions[0].tag == "my-tag" + + def test_tag_last_wins(self, claude_config_dir: Path, tmp_path: Path): + """When multiple tag entries exist, the last one wins.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid = str(uuid.uuid4()) + file_path = project_dir / f"{sid}.jsonl" + lines = [ + json.dumps({"type": "user", "message": {"content": "hello"}}), + json.dumps( + {"type": "tag", "tag": "first-tag", "sessionId": sid}, **_COMPACT + ), + json.dumps( + {"type": "tag", "tag": "second-tag", "sessionId": sid}, **_COMPACT + ), + ] + file_path.write_text("\n".join(lines) + "\n") + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert len(sessions) == 1 + assert sessions[0].tag == "second-tag" + + def test_tag_empty_string_is_none(self, claude_config_dir: Path, tmp_path: Path): + """Empty-string tag (clear marker) resolves to None via 'or None'.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid = str(uuid.uuid4()) + file_path = project_dir / f"{sid}.jsonl" + lines = [ + json.dumps({"type": "user", "message": {"content": "hello"}}), + json.dumps({"type": "tag", "tag": "old-tag", "sessionId": sid}, **_COMPACT), + json.dumps({"type": "tag", "tag": "", "sessionId": sid}, **_COMPACT), + ] + file_path.write_text("\n".join(lines) + "\n") + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert len(sessions) == 1 + assert sessions[0].tag is None + + def test_tag_absent(self, claude_config_dir: Path, tmp_path: Path): + """Sessions without a tag entry have tag=None.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + _make_session_file(project_dir, first_prompt="hello") + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert len(sessions) == 1 + assert sessions[0].tag is None + + def test_tag_ignores_tool_use_inputs(self, claude_config_dir: Path, tmp_path: Path): + """Tag extraction is scoped to {type:'tag'} lines — ignores "tag" fields + in tool_use inputs (git tag, Docker tags, cloud resource tags). + + Mirrors TS listSessionsImpl.ts:132 / sessionStorage.ts:629. + """ + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid = str(uuid.uuid4()) + file_path = project_dir / f"{sid}.jsonl" + lines = [ + json.dumps({"type": "user", "message": {"content": "tag this v1.0"}}), + json.dumps( + {"type": "tag", "tag": "real-tag", "sessionId": sid}, **_COMPACT + ), + # A tool_use entry with a "tag" key in its input — must NOT match. + json.dumps( + { + "type": "assistant", + "message": { + "content": [ + { + "type": "tool_use", + "name": "mcp__docker__build", + "input": {"tag": "myapp:v2", "context": "."}, + } + ], + }, + } + ), + ] + file_path.write_text("\n".join(lines) + "\n") + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert len(sessions) == 1 + assert sessions[0].tag == "real-tag" # NOT "myapp:v2" + + def test_tag_none_when_only_tool_use_tag( + self, claude_config_dir: Path, tmp_path: Path + ): + """Session with no {type:'tag'} entry but tool_use input has tag — returns None.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid = str(uuid.uuid4()) + file_path = project_dir / f"{sid}.jsonl" + lines = [ + json.dumps({"type": "user", "message": {"content": "build docker"}}), + json.dumps( + { + "type": "assistant", + "message": { + "content": [ + { + "type": "tool_use", + "input": {"tag": "prod"}, + } + ], + }, + } + ), + ] + file_path.write_text("\n".join(lines) + "\n") + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert len(sessions) == 1 + assert sessions[0].tag is None # NOT "prod" + + def test_parse_session_info_from_lite_helper(self, tmp_path: Path): + """Direct test of the refactored _parse_session_info_from_lite helper.""" + sid = str(uuid.uuid4()) + file_path = tmp_path / f"{sid}.jsonl" + lines = [ + json.dumps( + { + "type": "user", + "message": {"content": "test prompt"}, + "cwd": "/workspace", + } + ), + json.dumps( + {"type": "tag", "tag": "experiment", "sessionId": sid}, **_COMPACT + ), + ] + file_path.write_text("\n".join(lines) + "\n") + + lite = _read_session_lite(file_path) + assert lite is not None + info = _parse_session_info_from_lite(sid, lite, "/fallback") + assert info is not None + assert info.session_id == sid + assert info.summary == "test prompt" + assert info.tag == "experiment" + assert info.cwd == "/workspace" # head cwd wins over fallback + + +class TestCreatedAtExtraction: + """Tests for created_at field extraction from first entry timestamp.""" + + def test_created_at_from_iso_timestamp( + self, claude_config_dir: Path, tmp_path: Path + ): + """created_at is parsed from ISO timestamp in first entry (epoch ms).""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid = str(uuid.uuid4()) + file_path = project_dir / f"{sid}.jsonl" + # 2026-01-15T10:30:00.000Z → epoch 1768473000000 ms + lines = [ + json.dumps( + { + "type": "user", + "message": {"content": "hello"}, + "timestamp": "2026-01-15T10:30:00.000Z", + } + ), + json.dumps( + { + "type": "assistant", + "message": {"content": "hi"}, + "timestamp": "2026-01-15T10:35:00.000Z", + } + ), + ] + file_path.write_text("\n".join(lines) + "\n") + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert len(sessions) == 1 + # 2026-01-15T10:30:00Z = 1768473000 seconds = 1768473000000 ms + assert sessions[0].created_at == 1768473000000 + assert isinstance(sessions[0].created_at, int) + + def test_created_at_leq_last_modified( + self, claude_config_dir: Path, tmp_path: Path + ): + """created_at <= last_modified (creation precedes mtime).""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid = str(uuid.uuid4()) + file_path = project_dir / f"{sid}.jsonl" + lines = [ + json.dumps( + { + "type": "user", + "message": {"content": "hello"}, + "timestamp": "2026-01-01T00:00:00.000Z", + } + ), + ] + file_path.write_text("\n".join(lines) + "\n") + # Set mtime to Feb 2026 (well after the Jan timestamp) + os.utime(file_path, (1769904000, 1769904000)) # 2026-02-01 UTC + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert len(sessions) == 1 + assert sessions[0].created_at is not None + assert sessions[0].created_at <= sessions[0].last_modified + + def test_created_at_none_when_missing( + self, claude_config_dir: Path, tmp_path: Path + ): + """created_at is None when first entry lacks a timestamp field.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + # _make_session_file doesn't add a timestamp field + _make_session_file(project_dir, first_prompt="no timestamp") + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert len(sessions) == 1 + assert sessions[0].created_at is None + + def test_created_at_none_on_invalid_format(self, tmp_path: Path): + """Invalid ISO string results in created_at=None (no exception).""" + sid = str(uuid.uuid4()) + file_path = tmp_path / f"{sid}.jsonl" + lines = [ + json.dumps( + { + "type": "user", + "message": {"content": "hello"}, + "timestamp": "not-a-valid-iso-date", + } + ), + ] + file_path.write_text("\n".join(lines) + "\n") + + lite = _read_session_lite(file_path) + assert lite is not None + info = _parse_session_info_from_lite(sid, lite) + assert info is not None + assert info.created_at is None + + def test_created_at_without_z_suffix(self, tmp_path: Path): + """ISO timestamp without Z suffix (with explicit offset) also works.""" + sid = str(uuid.uuid4()) + file_path = tmp_path / f"{sid}.jsonl" + lines = [ + json.dumps( + { + "type": "user", + "message": {"content": "hello"}, + "timestamp": "2026-01-15T10:30:00+00:00", + } + ), + ] + file_path.write_text("\n".join(lines) + "\n") + + lite = _read_session_lite(file_path) + assert lite is not None + info = _parse_session_info_from_lite(sid, lite) + assert info is not None + assert info.created_at == 1768473000000 + assert isinstance(info.created_at, int) + + def test_sdksessioninfo_created_at_default(self): + """SDKSessionInfo has created_at defaulting to None.""" + info = SDKSessionInfo( + session_id="abc", + summary="test", + last_modified=1000, + file_size=42, + ) + assert info.created_at is None + + +# --------------------------------------------------------------------------- +# get_session_info() tests +# --------------------------------------------------------------------------- + + +class TestGetSessionInfo: + """Tests for the get_session_info() single-session lookup.""" + + def test_invalid_session_id(self, claude_config_dir: Path): + """Non-UUID session_id returns None.""" + assert get_session_info("not-a-uuid") is None + assert get_session_info("") is None + + def test_nonexistent_session(self, claude_config_dir: Path): + """Session file not found returns None.""" + sid = str(uuid.uuid4()) + assert get_session_info(sid) is None + + def test_no_config_dir(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + """Missing config dir returns None.""" + monkeypatch.setenv("CLAUDE_CONFIG_DIR", str(tmp_path / "nonexistent")) + sid = str(uuid.uuid4()) + assert get_session_info(sid) is None + + def test_found_with_directory(self, claude_config_dir: Path, tmp_path: Path): + """Session found in a specific project directory.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid, _ = _make_session_file( + project_dir, first_prompt="hello", git_branch="main" + ) + + info = get_session_info(sid, directory=project_path) + assert info is not None + assert info.session_id == sid + assert info.summary == "hello" + assert info.git_branch == "main" + + def test_found_without_directory(self, claude_config_dir: Path): + """Session found by searching all project directories.""" + project_dir = _make_project_dir(claude_config_dir, "/some/project") + sid, _ = _make_session_file(project_dir, first_prompt="search all") + + info = get_session_info(sid) + assert info is not None + assert info.session_id == sid + assert info.summary == "search all" + + def test_returns_none_for_sidechain(self, claude_config_dir: Path, tmp_path: Path): + """Sidechain sessions return None (filtered by parse helper).""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid, _ = _make_session_file( + project_dir, first_prompt="sidechain", is_sidechain=True + ) + + assert get_session_info(sid, directory=project_path) is None + + def test_directory_not_containing_session( + self, claude_config_dir: Path, tmp_path: Path + ): + """Returns None when directory provided but session not in it.""" + project_a = str(tmp_path / "proj-a") + project_b = str(tmp_path / "proj-b") + Path(project_a).mkdir(parents=True) + Path(project_b).mkdir(parents=True) + dir_a = _make_project_dir(claude_config_dir, os.path.realpath(project_a)) + _make_project_dir(claude_config_dir, os.path.realpath(project_b)) + sid, _ = _make_session_file(dir_a, first_prompt="in A only") + + # Session exists in A but we look in B — should return None + # (no worktree relationship between them) + assert get_session_info(sid, directory=project_b) is None + # But searching all projects finds it + assert get_session_info(sid) is not None + + def test_includes_tag(self, claude_config_dir: Path, tmp_path: Path): + """get_session_info includes the new tag field.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid = str(uuid.uuid4()) + file_path = project_dir / f"{sid}.jsonl" + lines = [ + json.dumps({"type": "user", "message": {"content": "hello"}}), + json.dumps({"type": "tag", "tag": "urgent", "sessionId": sid}, **_COMPACT), + ] + file_path.write_text("\n".join(lines) + "\n") + + info = get_session_info(sid, directory=project_path) + assert info is not None + assert info.tag == "urgent" + + def test_sdksessioninfo_new_fields_defaults(self): + """SDKSessionInfo has tag defaulting to None.""" + info = SDKSessionInfo( + session_id="abc", + summary="test", + last_modified=1000, + file_size=42, + ) + assert info.tag is None