diff --git a/agents/s_full.py b/agents/s_full.py index e2f887b5c..331fa686f 100644 --- a/agents/s_full.py +++ b/agents/s_full.py @@ -227,7 +227,22 @@ def load(self, name: str) -> str: def estimate_tokens(messages: list) -> int: return len(json.dumps(messages, default=str)) // 4 +# Tools whose results should survive micro-compaction because they are +# reference material (re-reading them wastes a tool call and API tokens). +_PRESERVE_RESULT_TOOLS = {"read_file"} + def microcompact(messages: list): + # Build a map from tool_use_id -> tool_name so we can check which tool + # produced each result. We need this to honour _PRESERVE_RESULT_TOOLS. + tool_name_map: dict[str, str] = {} + for msg in messages: + if msg["role"] == "assistant": + content = msg.get("content", []) + if isinstance(content, list): + for block in content: + if hasattr(block, "type") and block.type == "tool_use": + tool_name_map[block.id] = block.name + # Collect every tool_result part. indices = [] for i, msg in enumerate(messages): if msg["role"] == "user" and isinstance(msg.get("content"), list): @@ -237,8 +252,14 @@ def microcompact(messages: list): if len(indices) <= 3: return for part in indices[:-3]: - if isinstance(part.get("content"), str) and len(part["content"]) > 100: - part["content"] = "[cleared]" + if not isinstance(part.get("content"), str) or len(part["content"]) <= 100: + continue + # Preserve read_file results — compacting them forces the agent to + # re-read the file, wasting a tool call. (Matches s06 behaviour.) + tool_id = part.get("tool_use_id", "") + if tool_name_map.get(tool_id) in _PRESERVE_RESULT_TOOLS: + continue + part["content"] = "[cleared]" def auto_compact(messages: list) -> list: TRANSCRIPT_DIR.mkdir(exist_ok=True) @@ -456,11 +477,29 @@ def _loop(self, name: str, role: str, prompt: str): # -- WORK PHASE -- for _ in range(50): inbox = self.bus.read_inbox(name) - for msg in inbox: - if msg.get("type") == "shutdown_request": - self._set_status(name, "shutdown") - return - messages.append({"role": "user", "content": json.dumps(msg)}) + if inbox: + # Merge all inbox messages into a single user turn to + # avoid consecutive user messages (Anthropic API requires + # strict user/assistant alternation). + parts = [] + for msg in inbox: + if msg.get("type") == "shutdown_request": + self._set_status(name, "shutdown") + return + parts.append(json.dumps(msg)) + if parts: + merged = "\n".join(parts) + # If the last message is already a user turn, fold + # inbox content into it; otherwise append a new one. + if messages and messages[-1]["role"] == "user": + prev = messages[-1]["content"] + if isinstance(prev, str): + messages[-1]["content"] = prev + "\n" + merged + else: + messages.append({"role": "assistant", "content": "Acknowledged."}) + messages.append({"role": "user", "content": merged}) + else: + messages.append({"role": "user", "content": merged}) try: response = client.messages.create( model=MODEL, system=sys_prompt, messages=messages, @@ -500,11 +539,20 @@ def _loop(self, name: str, role: str, prompt: str): time.sleep(POLL_INTERVAL) inbox = self.bus.read_inbox(name) if inbox: + parts = [] for msg in inbox: if msg.get("type") == "shutdown_request": self._set_status(name, "shutdown") return - messages.append({"role": "user", "content": json.dumps(msg)}) + parts.append(json.dumps(msg)) + if parts: + merged = "\n".join(parts) + # Ensure alternation: the last message after the work + # phase is either an assistant turn (natural end) or + # a user turn (tool_results from last tool call). + if messages and messages[-1]["role"] == "user": + messages.append({"role": "assistant", "content": "Idle. Checking inbox."}) + messages.append({"role": "user", "content": merged}) resume = True break unclaimed = [] @@ -520,6 +568,11 @@ def _loop(self, name: str, role: str, prompt: str): messages.insert(0, {"role": "user", "content": f"You are '{name}', role: {role}, team: {team_name}."}) messages.insert(1, {"role": "assistant", "content": f"I am {name}. Continuing."}) + # Ensure user/assistant alternation before injecting the + # auto-claimed task (the last message after work phase + # might be a user turn containing tool_results). + if messages and messages[-1]["role"] == "user": + messages.append({"role": "assistant", "content": "Idle. Looking for tasks."}) messages.append({"role": "user", "content": f"Task #{task['id']}: {task['subject']}\n{task.get('description', '')}"}) messages.append({"role": "assistant", "content": f"Claimed task #{task['id']}. Working on it."}) @@ -659,15 +712,31 @@ def agent_loop(messages: list): if estimate_tokens(messages) > TOKEN_THRESHOLD: print("[auto-compact triggered]") messages[:] = auto_compact(messages) - # s08: drain background notifications + # s08: drain background notifications + s10: check lead inbox. + # Merge both into a single user turn to maintain strict + # user/assistant alternation required by the Anthropic API. + injected_parts: list[str] = [] notifs = BG.drain() if notifs: txt = "\n".join(f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs) - messages.append({"role": "user", "content": f"\n{txt}\n"}) - # s10: check lead inbox + injected_parts.append(f"\n{txt}\n") inbox = BUS.read_inbox("lead") if inbox: - messages.append({"role": "user", "content": f"{json.dumps(inbox, indent=2)}"}) + injected_parts.append(f"{json.dumps(inbox, indent=2)}") + if injected_parts: + merged_inject = "\n".join(injected_parts) + # If the last message is already a user turn (e.g. tool_results + # from the previous iteration), fold injections into it to avoid + # sending two consecutive user messages. + if messages and messages[-1]["role"] == "user": + prev = messages[-1]["content"] + if isinstance(prev, str): + messages[-1]["content"] = prev + "\n" + merged_inject + elif isinstance(prev, list): + # Previous user turn is tool_results — append as text part. + prev.append({"type": "text", "text": merged_inject}) + else: + messages.append({"role": "user", "content": merged_inject}) # LLM call response = client.messages.create( model=MODEL, system=SYSTEM, messages=messages, diff --git a/tests/test_s_full_message_alternation.py b/tests/test_s_full_message_alternation.py new file mode 100644 index 000000000..88d70986f --- /dev/null +++ b/tests/test_s_full_message_alternation.py @@ -0,0 +1,332 @@ +#!/usr/bin/env python3 +""" +Regression tests for s_full.py message alternation and microcompact fixes. + +Covers: + 1. microcompact preserves read_file results (matches s06 behaviour) + 2. microcompact still clears non-read_file results + 3. agent_loop merges bg notifications + inbox into a single user turn + 4. teammate _loop inbox merging avoids consecutive user messages + 5. teammate auto-claim inserts assistant turn when last msg is user + 6. source code audit: no raw consecutive user appends remain + +No network / LLM dependency — all tests use message list manipulation and +source-code inspection. +""" + +import ast +import json +import os +import sys +import textwrap +from pathlib import Path +from types import SimpleNamespace + +import pytest + +# --------------------------------------------------------------------------- +# Helpers: build mock Anthropic SDK content blocks +# --------------------------------------------------------------------------- + +def _make_tool_use_block(block_id: str, name: str, input_dict: dict | None = None): + """Simulate an Anthropic SDK ToolUseBlock (has .type, .id, .name, .input).""" + return SimpleNamespace(type="tool_use", id=block_id, name=name, + input=input_dict or {}) + + +def _make_text_block(text: str): + return SimpleNamespace(type="text", text=text) + + +# --------------------------------------------------------------------------- +# Import s_full helpers without triggering the Anthropic client +# --------------------------------------------------------------------------- + +@pytest.fixture(autouse=True) +def _patch_env(monkeypatch, tmp_path): + """Set env vars and CWD so s_full module-level code doesn't crash.""" + monkeypatch.setenv("ANTHROPIC_API_KEY", "test-key") + monkeypatch.setenv("MODEL_ID", "test-model") + monkeypatch.setenv("ANTHROPIC_BASE_URL", "http://localhost:9999") + monkeypatch.chdir(tmp_path) + + +@pytest.fixture +def s_full(): + """Import (or reimport) s_full after env is patched.""" + if "agents.s_full" in sys.modules: + del sys.modules["agents.s_full"] + # Ensure the agents package is importable + repo_root = Path(__file__).resolve().parent.parent + if str(repo_root) not in sys.path: + sys.path.insert(0, str(repo_root)) + import agents.s_full as mod + return mod + + +# =================================================================== +# 1. microcompact preserves read_file results +# =================================================================== + +class TestMicrocompactPreservesReadFile: + def test_read_file_results_survive(self, s_full): + """read_file tool_result content must NOT be cleared.""" + msgs = [] + # 5 tool calls: 3 bash + 2 read_file — all old enough to be candidates + for i in range(5): + tool_name = "read_file" if i >= 3 else "bash" + block_id = f"tu_{i}" + msgs.append({"role": "assistant", "content": [ + _make_tool_use_block(block_id, tool_name, {"path": "f.py"} if tool_name == "read_file" else {"command": "ls"}), + ]}) + msgs.append({"role": "user", "content": [ + {"type": "tool_result", "tool_use_id": block_id, + "content": f"{'x' * 200} result of {tool_name} call {i}"}, + ]}) + + s_full.microcompact(msgs) + + # Collect surviving content + for msg in msgs: + if msg["role"] != "user" or not isinstance(msg["content"], list): + continue + for part in msg["content"]: + if part.get("type") != "tool_result": + continue + tid = part["tool_use_id"] + idx = int(tid.split("_")[1]) + if idx >= 3: + # read_file results must be preserved + assert part["content"] != "[cleared]", \ + f"read_file result tu_{idx} was incorrectly cleared" + + def test_bash_results_cleared(self, s_full): + """Non-read_file results older than KEEP_RECENT should be cleared.""" + msgs = [] + for i in range(6): + block_id = f"tu_{i}" + msgs.append({"role": "assistant", "content": [ + _make_tool_use_block(block_id, "bash", {"command": "ls"}), + ]}) + msgs.append({"role": "user", "content": [ + {"type": "tool_result", "tool_use_id": block_id, + "content": f"{'y' * 200} bash output {i}"}, + ]}) + + s_full.microcompact(msgs) + + cleared_count = 0 + for msg in msgs: + if msg["role"] != "user" or not isinstance(msg["content"], list): + continue + for part in msg["content"]: + if part.get("content") == "[cleared]": + cleared_count += 1 + # With 6 results and keep-recent=3, at least the first 3 should be cleared + assert cleared_count >= 3, f"Expected ≥3 cleared, got {cleared_count}" + + def test_short_results_not_cleared(self, s_full): + """Results ≤100 chars should never be cleared regardless of tool type.""" + msgs = [] + for i in range(6): + block_id = f"tu_{i}" + msgs.append({"role": "assistant", "content": [ + _make_tool_use_block(block_id, "bash"), + ]}) + msgs.append({"role": "user", "content": [ + {"type": "tool_result", "tool_use_id": block_id, + "content": "short"}, + ]}) + + s_full.microcompact(msgs) + + for msg in msgs: + if msg["role"] != "user" or not isinstance(msg["content"], list): + continue + for part in msg["content"]: + if part.get("type") == "tool_result": + assert part["content"] == "short" + + +# =================================================================== +# 2. Message alternation: no consecutive user messages +# =================================================================== + +def _validate_alternation(messages: list, label: str = ""): + """Assert that no two consecutive messages share the same role.""" + for i in range(1, len(messages)): + assert messages[i]["role"] != messages[i - 1]["role"], ( + f"{label}Consecutive {messages[i]['role']} messages at index " + f"{i - 1}–{i}: {messages[i - 1]!r:.120} / {messages[i]!r:.120}") + + +class TestAgentLoopAlternation: + """Verify that bg-notification + inbox injection doesn't break alternation.""" + + def test_bg_and_inbox_merged_after_assistant(self, s_full): + """When last msg is assistant, a single user msg should be appended.""" + msgs = [ + {"role": "user", "content": "start"}, + {"role": "assistant", "content": "ok"}, + ] + # Simulate bg notification + inbox being available + s_full.BG.notifications.put({"task_id": "abc", "status": "completed", + "result": "done"}) + # Put a message in lead's inbox + s_full.BUS.send("worker", "lead", "update", "message") + + # We can't call agent_loop (needs real LLM), so replicate its + # injection logic directly. + injected_parts = [] + notifs = s_full.BG.drain() + if notifs: + txt = "\n".join(f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs) + injected_parts.append(f"\n{txt}\n") + inbox = s_full.BUS.read_inbox("lead") + if inbox: + injected_parts.append(f"{json.dumps(inbox, indent=2)}") + + if injected_parts: + merged_inject = "\n".join(injected_parts) + if msgs and msgs[-1]["role"] == "user": + prev = msgs[-1]["content"] + if isinstance(prev, str): + msgs[-1]["content"] = prev + "\n" + merged_inject + elif isinstance(prev, list): + prev.append({"type": "text", "text": merged_inject}) + else: + msgs.append({"role": "user", "content": merged_inject}) + + _validate_alternation(msgs, "bg+inbox after assistant: ") + + def test_bg_and_inbox_folded_into_user_turn(self, s_full): + """When last msg is user (tool_results), injections fold in.""" + msgs = [ + {"role": "user", "content": "start"}, + {"role": "assistant", "content": "ok"}, + {"role": "user", "content": [ + {"type": "tool_result", "tool_use_id": "x", "content": "done"}, + ]}, + ] + s_full.BG.notifications.put({"task_id": "abc", "status": "completed", + "result": "done"}) + + injected_parts = [] + notifs = s_full.BG.drain() + if notifs: + txt = "\n".join(f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs) + injected_parts.append(f"\n{txt}\n") + + if injected_parts: + merged_inject = "\n".join(injected_parts) + if msgs and msgs[-1]["role"] == "user": + prev = msgs[-1]["content"] + if isinstance(prev, str): + msgs[-1]["content"] = prev + "\n" + merged_inject + elif isinstance(prev, list): + prev.append({"type": "text", "text": merged_inject}) + else: + msgs.append({"role": "user", "content": merged_inject}) + + _validate_alternation(msgs, "bg folded into tool_results: ") + # The last user message should have the injected text appended + last_content = msgs[-1]["content"] + assert isinstance(last_content, list) + assert any("background-results" in str(p) for p in last_content) + + +class TestTeammateAlternation: + """Verify teammate _loop inbox + auto-claim don't produce consecutive user messages.""" + + def test_inbox_after_tool_results_inserts_assistant(self): + """When last msg is user (tool_results) and inbox arrives, an assistant + turn must be inserted before the new user turn.""" + messages = [ + {"role": "user", "content": "initial prompt"}, + {"role": "assistant", "content": [_make_text_block("working")]}, + {"role": "user", "content": [ + {"type": "tool_result", "tool_use_id": "t1", "content": "ok"}, + ]}, + ] + # Simulate the fixed idle-phase inbox handling: + inbox_parts = [json.dumps({"type": "message", "content": "hello"})] + merged = "\n".join(inbox_parts) + if messages and messages[-1]["role"] == "user": + messages.append({"role": "assistant", "content": "Idle. Checking inbox."}) + messages.append({"role": "user", "content": merged}) + + _validate_alternation(messages, "teammate inbox after tool_results: ") + + def test_auto_claim_after_tool_results_inserts_assistant(self): + """Auto-claim after work phase must not produce consecutive user msgs.""" + messages = [ + {"role": "user", "content": "initial prompt"}, + {"role": "assistant", "content": [_make_text_block("working")]}, + {"role": "user", "content": [ + {"type": "tool_result", "tool_use_id": "t1", "content": "ok"}, + ]}, + ] + # Simulate auto-claim logic (as fixed): + if messages and messages[-1]["role"] == "user": + messages.append({"role": "assistant", "content": "Idle. Looking for tasks."}) + messages.append({"role": "user", "content": "Task #1: test"}) + messages.append({"role": "assistant", "content": "Claimed task #1. Working on it."}) + + _validate_alternation(messages, "teammate auto-claim: ") + + +# =================================================================== +# 3. Source code audit +# =================================================================== + +class TestSourceAudit: + """Static checks on the source to prevent regressions.""" + + @pytest.fixture + def source(self): + src_path = Path(__file__).resolve().parent.parent / "agents" / "s_full.py" + return src_path.read_text() + + def test_microcompact_has_preserve_tools(self, source): + """microcompact must reference _PRESERVE_RESULT_TOOLS.""" + assert "_PRESERVE_RESULT_TOOLS" in source + + def test_no_consecutive_user_append_in_agent_loop(self, source): + """The agent_loop section must not have two separate + messages.append(user) calls in sequence without an intervening + assistant append.""" + # Extract the agent_loop function body + tree = ast.parse(source) + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef) and node.name == "agent_loop": + body_src = ast.get_source_segment(source, node) + # Count direct "messages.append" with role="user" — there should + # only be ONE unconditional append per iteration (the tool_results). + # The bg+inbox injection should use conditional folding. + user_appends = [ + line for line in body_src.splitlines() + if 'messages.append' in line and '"user"' in line + and not line.strip().startswith('#') + ] + # Should be at most 2 (one conditional for injection, one for results) + assert len(user_appends) <= 2, \ + f"Too many user-role appends in agent_loop: {user_appends}" + break + + def test_microcompact_builds_tool_name_map(self, source): + """microcompact must build tool_name_map to look up tool names.""" + assert "tool_name_map" in source + + def test_teammate_loop_has_alternation_guards(self, source): + """The _loop method must check last message role before appending user.""" + # Look for the pattern: checking messages[-1]["role"] before append + assert 'messages[-1]["role"] == "user"' in source or \ + "messages[-1]['role'] == 'user'" in source + + def test_preserve_result_tools_contains_read_file(self, source): + """_PRESERVE_RESULT_TOOLS must include 'read_file'.""" + assert '"read_file"' in source or "'read_file'" in source + # More specifically, in the _PRESERVE_RESULT_TOOLS definition + idx = source.index("_PRESERVE_RESULT_TOOLS") + snippet = source[idx:idx + 100] + assert "read_file" in snippet