diff --git a/.trajectories/completed/2026-05/traj_5nzj6v56id4z.json b/.trajectories/completed/2026-05/traj_5nzj6v56id4z.json new file mode 100644 index 000000000..732c8befc --- /dev/null +++ b/.trajectories/completed/2026-05/traj_5nzj6v56id4z.json @@ -0,0 +1,60 @@ +{ + "id": "traj_5nzj6v56id4z", + "version": 1, + "task": { + "title": "Fix PR914 review comments" + }, + "status": "completed", + "startedAt": "2026-05-19T13:20:42.407Z", + "completedAt": "2026-05-19T13:26:28.697Z", + "agents": [ + { + "name": "default", + "role": "lead", + "joinedAt": "2026-05-19T13:24:02.083Z" + } + ], + "chapters": [ + { + "id": "chap_j8u0dnm0wppl", + "title": "Work", + "agentName": "default", + "startedAt": "2026-05-19T13:24:02.083Z", + "endedAt": "2026-05-19T13:26:28.697Z", + "events": [ + { + "ts": 1779197042084, + "type": "decision", + "content": "Use file-descriptor scoped reads for CLI log windows: Use file-descriptor scoped reads for CLI log windows", + "raw": { + "question": "Use file-descriptor scoped reads for CLI log windows", + "chosen": "Use file-descriptor scoped reads for CLI log windows", + "alternatives": [], + "reasoning": "CodeQL flagged stat-then-open TOCTOU; opening once and using fstat/read on that descriptor preserves byte-window semantics while removing path re-resolution between size and read." + }, + "significance": "high" + } + ] + } + ], + "retrospective": { + "summary": "Fixed PR914 review comments: descriptor-scoped CLI log tail reads, blocked-on-send last-activity refresh, and regression coverage for default log tail helpers.", + "approach": "Standard approach", + "confidence": 0.93 + }, + "commits": [], + "filesChanged": [ + ".trajectories/index.json", + ".trajectories/completed/2026-05/traj_5nzj6v56id4z.json", + "crates/broker/src/runtime/worker_events.rs", + "src/cli/commands/agent-management.test.ts", + "src/cli/commands/agent-management.ts" + ], + "projectId": "/Users/khaliqgant/Projects/AgentWorkforce/relay", + "tags": [], + "_trace": { + "startRef": "d186133ca50bdf24cceead6d728472de107da5bd", + "endRef": "d3cca79bde317bcd7631651e86c37f0d3ac8af33", + "traceId": "cfbeca77-3301-4e65-8060-ef5ab966fe9a" + } +} diff --git a/.trajectories/completed/2026-05/traj_81kobstnzzwk.json b/.trajectories/completed/2026-05/traj_81kobstnzzwk.json new file mode 100644 index 000000000..48ab57619 --- /dev/null +++ b/.trajectories/completed/2026-05/traj_81kobstnzzwk.json @@ -0,0 +1,79 @@ +{ + "id": "traj_81kobstnzzwk", + "version": 1, + "task": { + "title": "Orchestrate team review cycle for #892 #893 #894 #895" + }, + "status": "completed", + "startedAt": "2026-05-19T08:16:32.762Z", + "completedAt": "2026-05-19T08:37:32.966Z", + "agents": [ + { + "name": "default", + "role": "lead", + "joinedAt": "2026-05-19T08:26:35.160Z" + } + ], + "chapters": [ + { + "id": "chap_0wjk79xtmt0m", + "title": "Work", + "agentName": "default", + "startedAt": "2026-05-19T08:26:35.160Z", + "endedAt": "2026-05-19T08:37:32.966Z", + "events": [ + { + "ts": 1779179195161, + "type": "decision", + "content": "Bound delivery retries and keep pending until terminal confirmation: Bound delivery retries and keep pending until terminal confirmation", + "raw": { + "question": "Bound delivery retries and keep pending until terminal confirmation", + "chosen": "Bound delivery retries and keep pending until terminal confirmation", + "alternatives": [], + "reasoning": "#892/#894 require delivery attempts to surface as confirmed/failed and watchdog state to distinguish idle from blocked-on-send; keeping pending deliveries until ack/verified/failed gives status, doctor, and lifecycle events one source of truth." + }, + "significance": "high" + }, + { + "ts": 1779179806306, + "type": "reflection", + "content": "Coordinated the worker implementation, review, and signoff loop for the reliability work. This orchestration trajectory records team coordination and skill/lockfile maintenance, not the broker or SDK code changes themselves.", + "raw": { + "confidence": 0.86 + }, + "significance": "high", + "tags": ["confidence:0.86"] + } + ] + } + ], + "retrospective": { + "summary": "Orchestrated worker/reviewer/signoff coordination for reliability issues #892-#895; direct files changed in this trajectory were skills and lockfiles only.", + "approach": "Coordinated implementation and review agents while maintaining Agent Relay skill metadata.", + "confidence": 0.86 + }, + "commits": [], + "filesChanged": [ + ".agents/skills/review-fix-signoff-loop/SKILL.md", + ".agents/skills/running-headless-orchestrator/SKILL.md", + ".agents/skills/setting-up-relayfile/SKILL.md", + ".agents/skills/using-agent-relay/SKILL.md", + ".agents/skills/writing-agent-relay-workflows/SKILL.md", + ".claude/skills/orchestrating-agent-relay/SKILL.md", + ".claude/skills/review-fix-signoff-loop/SKILL.md", + ".claude/skills/running-headless-orchestrator/SKILL.md", + ".claude/skills/setting-up-relayfile/SKILL.md", + ".claude/skills/using-agent-relay/SKILL.md", + ".claude/skills/writing-agent-relay-workflows/SKILL.md", + "package-lock.json", + "prpm.json", + "prpm.lock" + ], + "projectId": "/Users/khaliqgant/Projects/AgentWorkforce/relay", + "tags": [], + "_trace": { + "startRef": "45c8498bc8b83ea8a5cb465df1798869bbcd2c85", + "endRef": "26ba9fc131badc221ff231a5a2633758f30a0428", + "traceId": "509f2b77-2b1d-44ae-88c7-a0793a4195f5" + } +} diff --git a/.trajectories/completed/2026-05/traj_e1b7ww3un1u3.json b/.trajectories/completed/2026-05/traj_e1b7ww3un1u3.json new file mode 100644 index 000000000..240aa67e1 --- /dev/null +++ b/.trajectories/completed/2026-05/traj_e1b7ww3un1u3.json @@ -0,0 +1,53 @@ +{ + "id": "traj_e1b7ww3un1u3", + "version": 1, + "task": { + "title": "Harden agents logs raw and follow output" + }, + "status": "completed", + "startedAt": "2026-05-19T10:59:00.118Z", + "completedAt": "2026-05-19T11:04:44.466Z", + "agents": [ + { + "name": "default", + "role": "lead", + "joinedAt": "2026-05-19T11:03:09.871Z" + } + ], + "chapters": [ + { + "id": "chap_rj3r8z5el0ux", + "title": "Work", + "agentName": "default", + "startedAt": "2026-05-19T11:03:09.871Z", + "endedAt": "2026-05-19T11:04:44.466Z", + "events": [ + { + "ts": 1779188589872, + "type": "decision", + "content": "Use Buffer reads for raw agents logs and stream byte chunks through cooked follow: Use Buffer reads for raw agents logs and stream byte chunks through cooked follow", + "raw": { + "question": "Use Buffer reads for raw agents logs and stream byte chunks through cooked follow", + "chosen": "Use Buffer reads for raw agents logs and stream byte chunks through cooked follow", + "alternatives": [], + "reasoning": "Raw output must avoid UTF-8 decoding entirely, while cooked follow needs TextDecoder streaming so multibyte codepoints split by polling are reconstructed before ANSI replay" + }, + "significance": "high" + } + ] + } + ], + "retrospective": { + "summary": "Hardened agents:logs raw output to write Buffer bytes unchanged and made cooked follow preserve split ANSI CSI and UTF-8 sequences across poll chunks", + "approach": "Standard approach", + "confidence": 0.92 + }, + "commits": [], + "filesChanged": [], + "projectId": "/Users/khaliqgant/Projects/AgentWorkforce/relay", + "tags": [], + "_trace": { + "startRef": "9252ae4ea526d039517922fa196e3ce39b18c834", + "endRef": "9252ae4ea526d039517922fa196e3ce39b18c834" + } +} diff --git a/.trajectories/completed/2026-05/traj_e1b7ww3un1u3.md b/.trajectories/completed/2026-05/traj_e1b7ww3un1u3.md new file mode 100644 index 000000000..fff8e7b19 --- /dev/null +++ b/.trajectories/completed/2026-05/traj_e1b7ww3un1u3.md @@ -0,0 +1,33 @@ +# Trajectory: Harden agents logs raw and follow output + +> **Status:** ✅ Completed +> **Confidence:** 92% +> **Started:** May 19, 2026 at 12:59 PM +> **Completed:** May 19, 2026 at 01:04 PM + +--- + +## Summary + +Hardened agents:logs raw output to write Buffer bytes unchanged and made cooked follow preserve split ANSI CSI and UTF-8 sequences across poll chunks + +**Approach:** Standard approach + +--- + +## Key Decisions + +### Use Buffer reads for raw agents logs and stream byte chunks through cooked follow + +- **Chose:** Use Buffer reads for raw agents logs and stream byte chunks through cooked follow +- **Reasoning:** Raw output must avoid UTF-8 decoding entirely, while cooked follow needs TextDecoder streaming so multibyte codepoints split by polling are reconstructed before ANSI replay + +--- + +## Chapters + +### 1. Work + +_Agent: default_ + +- Use Buffer reads for raw agents logs and stream byte chunks through cooked follow: Use Buffer reads for raw agents logs and stream byte chunks through cooked follow diff --git a/.trajectories/completed/2026-05/traj_f1iac9ngymlj.json b/.trajectories/completed/2026-05/traj_f1iac9ngymlj.json new file mode 100644 index 000000000..58a4b5d89 --- /dev/null +++ b/.trajectories/completed/2026-05/traj_f1iac9ngymlj.json @@ -0,0 +1,63 @@ +{ + "id": "traj_f1iac9ngymlj", + "version": 1, + "task": { + "title": "Fix reliability review findings 892-895" + }, + "status": "completed", + "startedAt": "2026-05-19T09:52:54.932Z", + "completedAt": "2026-05-19T10:01:19.068Z", + "agents": [ + { + "name": "default", + "role": "lead", + "joinedAt": "2026-05-19T09:56:49.151Z" + } + ], + "chapters": [ + { + "id": "chap_xk1tcoquib82", + "title": "Work", + "agentName": "default", + "startedAt": "2026-05-19T09:56:49.151Z", + "endedAt": "2026-05-19T10:01:19.068Z", + "events": [ + { + "ts": 1779184609156, + "type": "decision", + "content": "Covered #892 transient blip with present worker: Covered #892 transient blip with present worker", + "raw": { + "question": "Covered #892 transient blip with present worker", + "chosen": "Covered #892 transient blip with present worker", + "alternatives": [], + "reasoning": "The new regression keeps the worker in the registry, kills its process to make every deliver write fail, loops retry_pending_delivery to MAX_DELIVERY_RETRIES, then asserts the typed message_delivery_failed frame arrives on sdk_out_tx." + }, + "significance": "high" + }, + { + "ts": 1779184712770, + "type": "reflection", + "content": "Focused #892 regression is green; production event emission now uses typed BrokerEvent variants and retry deferral is scoped to worker queue/verification windows.", + "raw": { + "confidence": 0.8 + }, + "significance": "high", + "tags": ["confidence:0.8"] + } + ] + } + ], + "retrospective": { + "summary": "Fixed #892 transient delivery blip coverage and tightened delivery event/state handling from the reliability review", + "approach": "Standard approach", + "confidence": 0.9 + }, + "commits": [], + "filesChanged": [], + "projectId": "/Users/khaliqgant/Projects/AgentWorkforce/relay", + "tags": [], + "_trace": { + "startRef": "422274a461b9d922843cfb489810add669b9d13a", + "endRef": "422274a461b9d922843cfb489810add669b9d13a" + } +} diff --git a/.trajectories/completed/2026-05/traj_f1iac9ngymlj.md b/.trajectories/completed/2026-05/traj_f1iac9ngymlj.md new file mode 100644 index 000000000..d824fe0c8 --- /dev/null +++ b/.trajectories/completed/2026-05/traj_f1iac9ngymlj.md @@ -0,0 +1,34 @@ +# Trajectory: Fix reliability review findings 892-895 + +> **Status:** ✅ Completed +> **Confidence:** 90% +> **Started:** May 19, 2026 at 11:52 AM +> **Completed:** May 19, 2026 at 12:01 PM + +--- + +## Summary + +Fixed #892 transient delivery blip coverage and tightened delivery event/state handling from the reliability review + +**Approach:** Standard approach + +--- + +## Key Decisions + +### Covered #892 transient blip with present worker + +- **Chose:** Covered #892 transient blip with present worker +- **Reasoning:** The new regression keeps the worker in the registry, kills its process to make every deliver write fail, loops retry_pending_delivery to MAX_DELIVERY_RETRIES, then asserts the typed message_delivery_failed frame arrives on sdk_out_tx. + +--- + +## Chapters + +### 1. Work + +_Agent: default_ + +- Covered #892 transient blip with present worker: Covered #892 transient blip with present worker +- Focused #892 regression is green; production event emission now uses typed BrokerEvent variants and retry deferral is scoped to worker queue/verification windows. diff --git a/.trajectories/completed/2026-05/traj_irafiyk6wpw0.json b/.trajectories/completed/2026-05/traj_irafiyk6wpw0.json new file mode 100644 index 000000000..9d5c99adc --- /dev/null +++ b/.trajectories/completed/2026-05/traj_irafiyk6wpw0.json @@ -0,0 +1,63 @@ +{ + "id": "traj_irafiyk6wpw0", + "version": 1, + "task": { + "title": "Fix agents:logs near-unparseable TTY redraw garbage (codex implement, claude review)" + }, + "status": "completed", + "startedAt": "2026-05-19T10:30:38.222Z", + "completedAt": "2026-05-19T10:44:24.757Z", + "agents": [ + { + "name": "default", + "role": "lead", + "joinedAt": "2026-05-19T10:34:18.265Z" + } + ], + "chapters": [ + { + "id": "chap_celobruq6ana", + "title": "Work", + "agentName": "default", + "startedAt": "2026-05-19T10:34:18.265Z", + "endedAt": "2026-05-19T10:44:24.757Z", + "events": [ + { + "ts": 1779186858266, + "type": "decision", + "content": "Cook agents:logs in the CLI with a small ANSI/VT replay helper: Cook agents:logs in the CLI with a small ANSI/VT replay helper", + "raw": { + "question": "Cook agents:logs in the CLI with a small ANSI/VT replay helper", + "chosen": "Cook agents:logs in the CLI with a small ANSI/VT replay helper", + "alternatives": [], + "reasoning": "The broker and workflow runner intentionally persist raw PTY bytes for dashboard/xterm debugging. The CLI can convert that stream to line-oriented output by replaying cursor positioning and clears; this avoids adding a terminal dependency and preserves --raw for exact diagnostics." + }, + "significance": "high" + }, + { + "ts": 1779187442268, + "type": "reflection", + "content": "Implemented CLI-side PTY cooking with raw opt-out; focused tests, typecheck, and build are green after a final rerun", + "raw": { + "confidence": 0.9 + }, + "significance": "high", + "tags": ["confidence:0.9"] + } + ] + } + ], + "retrospective": { + "summary": "Made agents:logs cook raw PTY redraws into line-oriented output by default, added --raw for exact terminal bytes, covered cursor-redraw and raw-mode regressions, and validated with targeted Vitest, typecheck, build, and a compiled before/after sample.", + "approach": "Standard approach", + "confidence": 0.9 + }, + "commits": [], + "filesChanged": [], + "projectId": "/Users/khaliqgant/Projects/AgentWorkforce/relay", + "tags": [], + "_trace": { + "startRef": "f5dd259e2a7009bcdbc1e9aa30c750ac74e5aeca", + "endRef": "f5dd259e2a7009bcdbc1e9aa30c750ac74e5aeca" + } +} diff --git a/.trajectories/completed/2026-05/traj_lhyrcib40kao.json b/.trajectories/completed/2026-05/traj_lhyrcib40kao.json new file mode 100644 index 000000000..fa251735f --- /dev/null +++ b/.trajectories/completed/2026-05/traj_lhyrcib40kao.json @@ -0,0 +1,65 @@ +{ + "id": "traj_lhyrcib40kao", + "version": 1, + "task": { + "title": "Address PR #914 CodeRabbit reliability review findings" + }, + "status": "completed", + "startedAt": "2026-05-19T11:52:46.110Z", + "completedAt": "2026-05-19T12:07:22.401Z", + "agents": [ + { + "name": "default", + "role": "lead", + "joinedAt": "2026-05-19T12:07:05.567Z" + } + ], + "chapters": [ + { + "id": "chap_zo07vgxdaby4", + "title": "Work", + "agentName": "default", + "startedAt": "2026-05-19T12:07:05.567Z", + "endedAt": "2026-05-19T12:07:22.401Z", + "events": [ + { + "ts": 1779192425569, + "type": "decision", + "content": "Gate terminal delivery events on event_id-aware pending removal: Gate terminal delivery events on event_id-aware pending removal", + "raw": { + "question": "Gate terminal delivery events on event_id-aware pending removal", + "chosen": "Gate terminal delivery events on event_id-aware pending removal", + "alternatives": [], + "reasoning": "Stale worker lifecycle frames can reuse delivery_id after retry/requeue; returning the removed PendingDelivery from clear_pending_delivery_if_event_matches prevents stale frames from emitting typed terminal events or mutating worker state." + }, + "significance": "high" + }, + { + "ts": 1779192425617, + "type": "decision", + "content": "Bump broker/SDK protocol version to 2: Bump broker/SDK protocol version to 2", + "raw": { + "question": "Bump broker/SDK protocol version to 2", + "chosen": "Bump broker/SDK protocol version to 2", + "alternatives": [], + "reasoning": "Agent exit reasons, typed message delivery terminal events, idle since metadata, and blocked-on-send events changed the wire contract relative to v1 clients." + }, + "significance": "high" + } + ] + } + ], + "retrospective": { + "summary": "Addressed PR #914 CodeRabbit reliability findings: broker teardown now emits terminal message_delivery_failed for drained deliveries, stale delivery frames are gated by event_id-aware removal, retryable write failures stay queued, SDK delivery waiters use typed terminal events, CLI env/doctor leaks are fixed, protocol is v2, fixtures/tests/changelog updated, and required validation passed.", + "approach": "Standard approach", + "confidence": 0.92 + }, + "commits": [], + "filesChanged": [], + "projectId": "/Users/khaliqgant/Projects/AgentWorkforce/relay", + "tags": [], + "_trace": { + "startRef": "2fe5c0359dcacf1dc487a4d8041452d5a2a082c8", + "endRef": "2fe5c0359dcacf1dc487a4d8041452d5a2a082c8" + } +} diff --git a/.trajectories/completed/2026-05/traj_lhyrcib40kao.md b/.trajectories/completed/2026-05/traj_lhyrcib40kao.md new file mode 100644 index 000000000..b47a738ee --- /dev/null +++ b/.trajectories/completed/2026-05/traj_lhyrcib40kao.md @@ -0,0 +1,39 @@ +# Trajectory: Address PR #914 CodeRabbit reliability review findings + +> **Status:** ✅ Completed +> **Confidence:** 92% +> **Started:** May 19, 2026 at 01:52 PM +> **Completed:** May 19, 2026 at 02:07 PM + +--- + +## Summary + +Addressed PR #914 CodeRabbit reliability findings: broker teardown now emits terminal message_delivery_failed for drained deliveries, stale delivery frames are gated by event_id-aware removal, retryable write failures stay queued, SDK delivery waiters use typed terminal events, CLI env/doctor leaks are fixed, protocol is v2, fixtures/tests/changelog updated, and required validation passed. + +**Approach:** Standard approach + +--- + +## Key Decisions + +### Gate terminal delivery events on event_id-aware pending removal + +- **Chose:** Gate terminal delivery events on event_id-aware pending removal +- **Reasoning:** Stale worker lifecycle frames can reuse delivery_id after retry/requeue; returning the removed PendingDelivery from clear_pending_delivery_if_event_matches prevents stale frames from emitting typed terminal events or mutating worker state. + +### Bump broker/SDK protocol version to 2 + +- **Chose:** Bump broker/SDK protocol version to 2 +- **Reasoning:** Agent exit reasons, typed message delivery terminal events, idle since metadata, and blocked-on-send events changed the wire contract relative to v1 clients. + +--- + +## Chapters + +### 1. Work + +_Agent: default_ + +- Gate terminal delivery events on event_id-aware pending removal: Gate terminal delivery events on event_id-aware pending removal +- Bump broker/SDK protocol version to 2: Bump broker/SDK protocol version to 2 diff --git a/.trajectories/index.json b/.trajectories/index.json index fcf48667c..3eda263c4 100644 --- a/.trajectories/index.json +++ b/.trajectories/index.json @@ -1,6 +1,6 @@ { "version": 1, - "lastUpdated": "2026-05-19T04:27:19.071Z", + "lastUpdated": "2026-05-19T13:26:28.836Z", "trajectories": { "traj_05xg7j388bc4": { "title": "Add browser workflow step integration", @@ -1009,6 +1009,69 @@ "startedAt": "2026-05-19T04:18:25.024Z", "completedAt": "2026-05-19T04:27:18.903Z", "path": "/Users/will/Projects/AgentWorkforce/relay/.trajectories/completed/2026-05/traj_zqwco4gl76g3.json" + }, + "traj_81kobstnzzwk": { + "title": "Orchestrate team to fix #892 #893 #894 #895 (codex implement, claude review, claude signoff)", + "status": "completed", + "startedAt": "2026-05-19T08:16:32.762Z", + "completedAt": "2026-05-19T08:37:32.966Z", + "path": "/Users/khaliqgant/Projects/AgentWorkforce/relay/.trajectories/completed/2026-05/traj_81kobstnzzwk.json" + }, + "traj_f1iac9ngymlj": { + "title": "Fix reliability review findings 892-895", + "status": "completed", + "startedAt": "2026-05-19T09:52:54.932Z", + "completedAt": "2026-05-19T10:01:19.068Z", + "path": "/Users/khaliqgant/Projects/AgentWorkforce/relay/.trajectories/completed/2026-05/traj_f1iac9ngymlj.json" + }, + "traj_irafiyk6wpw0": { + "title": "Fix agents:logs near-unparseable TTY redraw garbage (codex implement, claude review)", + "status": "completed", + "startedAt": "2026-05-19T10:30:38.222Z", + "completedAt": "2026-05-19T10:44:24.757Z", + "path": "/Users/khaliqgant/Projects/AgentWorkforce/relay/.trajectories/completed/2026-05/traj_irafiyk6wpw0.json" + }, + "traj_e1b7ww3un1u3": { + "title": "Harden agents logs raw and follow output", + "status": "completed", + "startedAt": "2026-05-19T10:59:00.118Z", + "completedAt": "2026-05-19T11:04:44.466Z", + "path": "/Users/khaliqgant/Projects/AgentWorkforce/relay/.trajectories/completed/2026-05/traj_e1b7ww3un1u3.json" + }, + "traj_l97mdks4cooq": { + "title": "Investigate PR 913 clippy failure", + "status": "completed", + "startedAt": "2026-05-19T11:16:32.729Z", + "completedAt": "2026-05-19T11:17:32.013Z", + "path": "/Users/khaliqgant/Projects/AgentWorkforce/relay/.trajectories/completed/2026-05/traj_l97mdks4cooq.json" + }, + "traj_rqz29je4z9nf": { + "title": "agents:logs hardening (findings 1&2) — codex fix + claude re-review", + "status": "completed", + "startedAt": "2026-05-19T11:19:40.333Z", + "completedAt": "2026-05-19T11:19:40.433Z", + "path": "/Users/khaliqgant/Projects/AgentWorkforce/relay/.trajectories/completed/2026-05/traj_rqz29je4z9nf.json" + }, + "traj_so81rqed6qz9": { + "title": "Remove stale relay snippet installer", + "status": "completed", + "startedAt": "2026-05-19T11:23:42.436Z", + "completedAt": "2026-05-19T11:30:01.924Z", + "path": "/Users/khaliqgant/Projects/AgentWorkforce/relay/.trajectories/completed/2026-05/traj_so81rqed6qz9.json" + }, + "traj_lhyrcib40kao": { + "title": "Address PR #914 CodeRabbit reliability review findings", + "status": "completed", + "startedAt": "2026-05-19T11:52:46.110Z", + "completedAt": "2026-05-19T12:07:22.401Z", + "path": "/Users/khaliqgant/Projects/AgentWorkforce/relay/.trajectories/completed/2026-05/traj_lhyrcib40kao.json" + }, + "traj_5nzj6v56id4z": { + "title": "Fix PR914 review comments", + "status": "completed", + "startedAt": "2026-05-19T13:20:42.407Z", + "completedAt": "2026-05-19T13:26:28.697Z", + "path": "/Users/khaliqgant/Projects/AgentWorkforce/relay/.trajectories/completed/2026-05/traj_5nzj6v56id4z.json" } } } diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e04faa34..cc44737a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Breaking Changes +- Broker/SDK wire protocol is now version 2 for delivery terminal events and lifecycle event shape changes. - `relay.spawn({ task })` now returns `success: false` and terminates the agent when task delivery fails after retries. - `agent-relay send` now uses the orchestrator identity by default so `agent-relay replies ` can correlate worker DMs. - The `relay_broker` Rust crate now exposes only `protocol`, `snippets`, and `run_cli`; broker implementation modules are crate-private. @@ -43,6 +44,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Prerelease publishing supports staging releases. - Broker `--api-bind` configures the HTTP/WS bind address. - PTY workers accept `write_pty` messages and report bytes written or worker errors. +- Broker events include delivery confirmation/failure and agent lifecycle health signals for subscribed orchestrators. ### Changed @@ -54,9 +56,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Broker snapshot requests return consistent worker timeout and error-envelope responses. - Rust and TypeScript telemetry disable PostHog reporting when no `AGENT_RELAY_POSTHOG_KEY` is configured. - `agent-relay inbox` shows unread DM content and `direction` metadata, with terminal controls stripped from text summaries. +- `agent-relay who` reports last activity, context budget, and working/idle/blocked-on-send state. +- `agent-relay doctor` reports broker Relaycast auth state and stuck outbound delivery queues. +- Relaycast MCP auto-registers workspace-key sessions as an orchestrator so read tools work without a manual register step. ### Fixed +- Broker worker teardown now emits `message_delivery_failed` for dropped pending deliveries so SDK delivery waiters terminate. +- SDK `sendAndWaitForDelivery` waits for `message_delivery_confirmed` or `message_delivery_failed` instead of treating `delivery_ack` as final. +- Relaycast MCP startup ignores unresolved `${RELAY_*}` environment placeholders before auto-registering. +- PTY context budget detection uses the latest percentage in output and can re-emit after the budget rises. +- `agent-relay agents:logs` now cooks PTY redraws into line-oriented output by default and keeps raw terminal bytes behind `--raw`. +- `agent-relay agents:logs --raw` preserves non-UTF-8 bytes, and follow mode keeps split escape/codepoint sequences intact. - CLI readiness checks use the live VT grid and cursor position to avoid false ready states in alternate screens and menus. - `agent-relay history --from ` returns the newest messages after chronological sorting. - `agent-relay replies --unread` prints nothing when there are no unread messages. @@ -66,6 +77,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Tests treat `better-sqlite3` as optional, improving CI reliability. - `agent-relay doctor` validates partial driver availability correctly. - SDK `sendInput` routes through the PTY worker protocol so input reaches the agent PTY. +- DM delivery retries now end in a surfaced `message_delivery_failed` event instead of silently retrying forever. +- The PTY watchdog marks agents with pending delivery work as blocked-on-send instead of idle. ## [6.2.2] - 2026-05-18 diff --git a/crates/broker/src/listen_api.rs b/crates/broker/src/listen_api.rs index d0e0bc591..6ec9c7cf9 100644 --- a/crates/broker/src/listen_api.rs +++ b/crates/broker/src/listen_api.rs @@ -433,7 +433,7 @@ async fn listen_api_session( ) -> axum::Json { axum::Json(json!({ "broker_version": state.broker_version, - "protocol_version": 1, + "protocol_version": 2, "workspace_key": state.workspace_key, "default_workspace_id": state.default_workspace_id, "mode": if state.persist { "persist" } else { "ephemeral" }, @@ -2347,7 +2347,7 @@ mod auth_tests { assert_eq!(response.status(), StatusCode::OK); let body = response_json(response).await; assert!(body["broker_version"].is_string()); - assert_eq!(body["protocol_version"], 1); + assert_eq!(body["protocol_version"], 2); assert_eq!(body["mode"], "ephemeral"); } diff --git a/crates/broker/src/protocol.rs b/crates/broker/src/protocol.rs index 049941a1c..7ad80fcf8 100644 --- a/crates/broker/src/protocol.rs +++ b/crates/broker/src/protocol.rs @@ -3,7 +3,7 @@ use serde_json::Value; use crate::supervisor::RestartPolicy; -pub const PROTOCOL_VERSION: u32 = 1; +pub const PROTOCOL_VERSION: u32 = 2; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] @@ -171,6 +171,12 @@ pub enum BrokerEvent { name: String, code: Option, signal: Option, + #[serde(default)] + reason: Option, + }, + AgentContextLow { + name: String, + pct: u8, }, RelayInbound { event_id: String, @@ -206,6 +212,25 @@ pub enum BrokerEvent { event_id: String, reason: String, }, + MessageDeliveryConfirmed { + name: String, + delivery_id: String, + event_id: String, + from: String, + to: String, + }, + MessageDeliveryFailed { + name: String, + #[serde(default)] + delivery_id: Option, + #[serde(default)] + event_id: Option, + from: String, + to: String, + attempts: u32, + #[serde(rename = "lastError")] + last_error: String, + }, DeliveryQueued { delivery_id: String, agent: String, @@ -240,6 +265,13 @@ pub enum BrokerEvent { AgentIdle { name: String, idle_secs: u64, + #[serde(default)] + since: Option, + }, + AgentBlockedOnSend { + name: String, + blocked_secs: u64, + pending_delivery_count: usize, }, AgentRestarting { name: String, @@ -350,7 +382,7 @@ mod tests { let encoded = serde_json::to_string(&frame).unwrap(); let decoded: ProtocolEnvelope = serde_json::from_str(&encoded).unwrap(); - assert_eq!(decoded.v, 1); + assert_eq!(decoded.v, PROTOCOL_VERSION); assert_eq!(decoded.msg_type, "spawn_agent"); assert_eq!(decoded.request_id.as_deref(), Some("req_1")); } diff --git a/crates/broker/src/pty_worker.rs b/crates/broker/src/pty_worker.rs index 98cac4abf..fb86625b8 100644 --- a/crates/broker/src/pty_worker.rs +++ b/crates/broker/src/pty_worker.rs @@ -1,5 +1,6 @@ use std::{ collections::{HashSet, VecDeque}, + sync::OnceLock, time::{Duration, Instant}, }; @@ -122,6 +123,21 @@ fn output_has_prompt(cli: &str, output: &str) -> bool { }) } +fn detect_context_budget_pct(output: &str) -> Option { + static CONTEXT_RE: OnceLock = OnceLock::new(); + let re = CONTEXT_RE.get_or_init(|| { + regex::Regex::new(r"(?i)\bcontext\s+(\d{1,3})%\s+left\b").expect("context regex compiles") + }); + let mut latest = None; + for captures in re.captures_iter(output) { + latest = captures + .get(1) + .and_then(|m| m.as_str().parse::().ok()) + .map(|pct| pct.min(100) as u8); + } + latest +} + fn evaluate_startup_gate( resolved_cli: &str, startup_output: &str, @@ -355,6 +371,7 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { const NO_OUTPUT_EXIT_TIMEOUT: Duration = Duration::from_secs(120); let mut last_pty_output_time = Instant::now(); let mut reported_idle = false; + let mut last_context_low_pct: Option = None; while running { tokio::select! { @@ -585,6 +602,17 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { ); } } + if let Some(pct) = detect_context_budget_pct(&clean_text) { + if last_context_low_pct.is_some_and(|last| pct > last) { + last_context_low_pct = None; + } + if pct <= 10 && last_context_low_pct != Some(pct) { + let _ = send_frame(&out_tx, "agent_context_low", None, json!({ + "pct": pct, + })).await; + last_context_low_pct = Some(pct); + } + } let startup_ready = startup_gate_ready( &resolved_cli, &startup_output, @@ -1023,12 +1051,17 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { // Idle detection: emit agent_idle once when silence exceeds threshold. // Granularity depends on auto_enter_interval tick rate (2s). - if let Some(threshold) = idle_threshold { + if pending_worker_injections.is_empty() + && pending_verifications.is_empty() + && pending_activities.is_empty() + { + if let Some(threshold) = idle_threshold { if let Some(idle_secs) = pty_auto.check_idle_transition(threshold) { let _ = send_frame(&out_tx, "agent_idle", None, json!({ "idle_secs": idle_secs, })).await; } + } } } @@ -1088,16 +1121,33 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { // event so the broker or dashboard can decide what to do. let silent_duration = last_pty_output_time.elapsed(); if silent_duration >= NO_OUTPUT_EXIT_TIMEOUT && !reported_idle { - tracing::info!( - target = "agent_relay::worker::pty", - silent_secs = silent_duration.as_secs(), - "watchdog: no PTY output for {}s — marking idle", - silent_duration.as_secs() - ); - let _ = send_frame(&out_tx, "agent_idle", None, json!({ - "reason": "no_output_timeout", - "idle_secs": silent_duration.as_secs(), - })).await; + let pending_count = pending_worker_injections.len() + + pending_verifications.len() + + pending_activities.len(); + if pending_count > 0 { + tracing::warn!( + target = "agent_relay::worker::pty", + silent_secs = silent_duration.as_secs(), + pending_delivery_count = pending_count, + "watchdog: no PTY output while delivery work is pending — marking blocked_on_send" + ); + let _ = send_frame(&out_tx, "agent_blocked_on_send", None, json!({ + "reason": "no_output_timeout", + "blocked_secs": silent_duration.as_secs(), + "pending_delivery_count": pending_count, + })).await; + } else { + tracing::info!( + target = "agent_relay::worker::pty", + silent_secs = silent_duration.as_secs(), + "watchdog: no PTY output for {}s — marking idle", + silent_duration.as_secs() + ); + let _ = send_frame(&out_tx, "agent_idle", None, json!({ + "reason": "no_output_timeout", + "idle_secs": silent_duration.as_secs(), + })).await; + } reported_idle = true; } } @@ -1287,4 +1337,16 @@ mod tests { assert!(!should_block_pending_injection(true, &pending)); } + + #[test] + fn detects_context_budget_percentage() { + assert_eq!(detect_context_budget_pct("Context 6% left"), Some(6)); + assert_eq!(detect_context_budget_pct("context 100% left"), Some(100)); + assert_eq!( + detect_context_budget_pct("Context 7% left\nContext 12% left"), + Some(12) + ); + assert_eq!(detect_context_budget_pct("context 125% left"), Some(100)); + assert_eq!(detect_context_budget_pct("no budget here"), None); + } } diff --git a/crates/broker/src/runtime/api.rs b/crates/broker/src/runtime/api.rs index 84bfb927a..b9b4e6efe 100644 --- a/crates/broker/src/runtime/api.rs +++ b/crates/broker/src/runtime/api.rs @@ -353,12 +353,18 @@ impl BrokerRuntime { "failed to mark released worker offline in relaycast" ); } - let dropped = drop_pending_for_worker(pending_deliveries, &name); - if dropped > 0 { + let dropped = take_pending_for_worker(pending_deliveries, &name); + if !dropped.is_empty() { let _ = send_event( sdk_out_tx, - json!({"kind":"delivery_dropped","name":&name,"count":dropped,"reason":"agent_released"}), + json!({"kind":"delivery_dropped","name":&name,"count":dropped.len(),"reason":"agent_released"}), ).await; + let _ = emit_dropped_delivery_failures( + sdk_out_tx, + &dropped, + "agent_released", + ) + .await; } fail_pending_requests_for_worker(pending_requests, &name, "agent_released"); delivery_states.remove(&name); @@ -1012,7 +1018,27 @@ impl BrokerRuntime { "delivery_id": pd.delivery.delivery_id, "worker_name": pd.worker_name, "event_id": pd.delivery.event_id, + "from": pd.delivery.from, + "to": pd.delivery.target, "attempts": pd.attempts, + "queued_at_ms": pd.queued_at_ms, + "age_ms": unix_timestamp_millis().saturating_sub(pd.queued_at_ms), + "last_error": pd.last_error, + }) + }) + .collect(); + let auth_workspaces: Vec = workspaces + .iter() + .map(|workspace| { + json!({ + "workspace_id": workspace.workspace_id, + "workspace_alias": workspace.workspace_alias, + "self_name": workspace.self_name, + "self_agent_id": workspace.self_agent_id, + "authenticated": true, + "default": default_workspace_id + .as_deref() + .is_some_and(|id| id == workspace.workspace_id), }) }) .collect(); @@ -1021,6 +1047,12 @@ impl BrokerRuntime { "agents": workers.list(), "pending_delivery_count": pending.len(), "pending_deliveries": pending, + "auth": { + "authenticated": !auth_workspaces.is_empty(), + "workspace_count": auth_workspaces.len(), + "default_workspace_id": default_workspace_id, + "workspaces": auth_workspaces, + }, }))); } ListenApiRequest::GetCrashInsights { reply } => { diff --git a/crates/broker/src/runtime/delivery.rs b/crates/broker/src/runtime/delivery.rs index ade5a6f19..061e712d7 100644 --- a/crates/broker/src/runtime/delivery.rs +++ b/crates/broker/src/runtime/delivery.rs @@ -6,6 +6,8 @@ pub(crate) struct PendingDelivery { pub(super) delivery: RelayDelivery, pub(super) attempts: u32, pub(super) next_retry_at: Instant, + pub(super) queued_at_ms: u64, + pub(super) last_error: Option, } /// Serializable snapshot of pending deliveries for crash recovery. @@ -14,6 +16,33 @@ pub(crate) struct PersistedPendingDelivery { pub(super) worker_name: String, pub(super) delivery: RelayDelivery, pub(super) attempts: u32, + #[serde(default)] + pub(super) queued_at_ms: u64, + #[serde(default)] + pub(super) last_error: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum DeliveryAttemptOutcome { + Attempted { + worker_name: String, + attempts: u32, + event_id: String, + }, + Failed { + worker_name: String, + delivery_id: String, + event_id: String, + from: String, + to: String, + attempts: u32, + last_error: String, + }, + Noop, +} + +pub(crate) fn unix_timestamp_millis() -> u64 { + chrono::Utc::now().timestamp_millis().max(0) as u64 } pub(crate) fn save_pending_deliveries( @@ -26,6 +55,8 @@ pub(crate) fn save_pending_deliveries( worker_name: pd.worker_name.clone(), delivery: pd.delivery.clone(), attempts: pd.attempts, + queued_at_ms: pd.queued_at_ms, + last_error: pd.last_error.clone(), }) .collect(); let json = serde_json::to_string_pretty(&persisted)?; @@ -58,6 +89,12 @@ pub(crate) fn load_pending_deliveries(path: &Path) -> HashMap, retry_interval: Duration, -) -> Result> { +) -> Result { let pending = match pending_deliveries.get(delivery_id) { Some(pending) => pending.clone(), - None => return Ok(None), + None => return Ok(DeliveryAttemptOutcome::Noop), }; if pending.attempts >= MAX_DELIVERY_RETRIES { - pending_deliveries.remove(delivery_id); - return Ok(None); + let removed = pending_deliveries.remove(delivery_id).unwrap_or(pending); + return Ok(DeliveryAttemptOutcome::Failed { + worker_name: removed.worker_name, + delivery_id: removed.delivery.delivery_id, + event_id: removed.delivery.event_id, + from: removed.delivery.from, + to: removed.delivery.target, + attempts: removed.attempts, + last_error: removed + .last_error + .unwrap_or_else(|| "max delivery retries exceeded".to_string()), + }); } if !workers.has_worker(&pending.worker_name) { - pending_deliveries.remove(delivery_id); - return Ok(None); + let removed = pending_deliveries.remove(delivery_id).unwrap_or(pending); + return Ok(DeliveryAttemptOutcome::Failed { + worker_name: removed.worker_name, + delivery_id: removed.delivery.delivery_id, + event_id: removed.delivery.event_id, + from: removed.delivery.from, + to: removed.delivery.target, + attempts: removed.attempts, + last_error: "recipient gone".to_string(), + }); } match workers @@ -359,30 +419,144 @@ pub(crate) async fn retry_pending_delivery( if let Some(current) = pending_deliveries.get_mut(delivery_id) { current.attempts = current.attempts.saturating_add(1); current.next_retry_at = Instant::now() + retry_interval; - return Ok(Some(( - current.worker_name.clone(), - current.attempts, - current.delivery.event_id.clone(), - ))); + current.last_error = None; + return Ok(DeliveryAttemptOutcome::Attempted { + worker_name: current.worker_name.clone(), + attempts: current.attempts, + event_id: current.delivery.event_id.clone(), + }); } - Ok(None) + Ok(DeliveryAttemptOutcome::Noop) } Err(error) => { - if let Some(current) = pending_deliveries.get_mut(delivery_id) { + let should_fail = if let Some(current) = pending_deliveries.get_mut(delivery_id) { + current.attempts = current.attempts.saturating_add(1); current.next_retry_at = Instant::now() + retry_interval; + current.last_error = Some(error.to_string()); + current.attempts >= MAX_DELIVERY_RETRIES + } else { + false + }; + + if should_fail { + if let Some(removed) = pending_deliveries.remove(delivery_id) { + return Ok(DeliveryAttemptOutcome::Failed { + worker_name: removed.worker_name, + delivery_id: removed.delivery.delivery_id, + event_id: removed.delivery.event_id, + from: removed.delivery.from, + to: removed.delivery.target, + attempts: removed.attempts, + last_error: removed + .last_error + .unwrap_or_else(|| "max delivery retries exceeded".to_string()), + }); + } + return Ok(DeliveryAttemptOutcome::Noop); } - Err(error) + Ok(DeliveryAttemptOutcome::Noop) } } } +pub(crate) async fn emit_delivery_attempt_outcome( + sdk_out_tx: &mpsc::Sender>, + delivery_id: &str, + was_retry: bool, + outcome: DeliveryAttemptOutcome, +) -> Result<()> { + match outcome { + DeliveryAttemptOutcome::Attempted { + worker_name, + attempts, + event_id, + } => { + if was_retry { + send_broker_event( + sdk_out_tx, + BrokerEvent::DeliveryRetry { + name: worker_name, + delivery_id: delivery_id.to_string(), + event_id, + attempts, + }, + ) + .await?; + } + } + DeliveryAttemptOutcome::Failed { + worker_name, + delivery_id, + event_id, + from, + to, + attempts, + last_error, + } => { + send_broker_event( + sdk_out_tx, + BrokerEvent::MessageDeliveryFailed { + name: worker_name, + delivery_id: Some(delivery_id), + event_id: Some(event_id), + from, + to, + attempts, + last_error, + }, + ) + .await?; + } + DeliveryAttemptOutcome::Noop => {} + } + Ok(()) +} + +#[cfg(test)] pub(crate) fn drop_pending_for_worker( pending_deliveries: &mut HashMap, worker_name: &str, ) -> usize { - let before = pending_deliveries.len(); - pending_deliveries.retain(|_, pending| pending.worker_name != worker_name); - before.saturating_sub(pending_deliveries.len()) + take_pending_for_worker(pending_deliveries, worker_name).len() +} + +pub(crate) fn take_pending_for_worker( + pending_deliveries: &mut HashMap, + worker_name: &str, +) -> Vec { + let delivery_ids: Vec = pending_deliveries + .iter() + .filter(|(_, pending)| pending.worker_name == worker_name) + .map(|(delivery_id, _)| delivery_id.clone()) + .collect(); + + delivery_ids + .into_iter() + .filter_map(|delivery_id| pending_deliveries.remove(&delivery_id)) + .collect() +} + +pub(crate) async fn emit_dropped_delivery_failures( + sdk_out_tx: &mpsc::Sender>, + dropped: &[PendingDelivery], + reason: &str, +) -> Result<()> { + for pending in dropped { + send_broker_event( + sdk_out_tx, + BrokerEvent::MessageDeliveryFailed { + name: pending.worker_name.clone(), + delivery_id: Some(pending.delivery.delivery_id.clone()), + event_id: Some(pending.delivery.event_id.clone()), + from: pending.delivery.from.clone(), + to: pending.delivery.target.clone(), + attempts: pending.attempts, + last_error: reason.to_string(), + }, + ) + .await?; + } + Ok(()) } /// Drain every in-flight worker request targeting `worker_name` and @@ -434,11 +608,10 @@ pub(crate) fn clear_pending_delivery_if_event_matches( event_id: Option<&str>, worker_name: &str, worker_signal: &str, -) { +) -> Option { let pending = pending_deliveries.get(delivery_id); if should_clear_pending_delivery_for_event(pending, event_id) { - pending_deliveries.remove(delivery_id); - return; + return pending_deliveries.remove(delivery_id); } if let Some(pending) = pending { @@ -452,4 +625,5 @@ pub(crate) fn clear_pending_delivery_if_event_matches( "ignoring stale delivery lifecycle event due to event_id mismatch" ); } + None } diff --git a/crates/broker/src/runtime/io.rs b/crates/broker/src/runtime/io.rs index 3383add9d..b694cdc7c 100644 --- a/crates/broker/src/runtime/io.rs +++ b/crates/broker/src/runtime/io.rs @@ -29,6 +29,13 @@ pub(crate) async fn send_event( send_frame(tx, "event", None, payload).await } +pub(crate) async fn send_broker_event( + tx: &mpsc::Sender>, + event: BrokerEvent, +) -> Result<()> { + send_event(tx, serde_json::to_value(event)?).await +} + pub(crate) async fn emit_http_api_event_with_timeout( tx: &mpsc::Sender>, payload: Value, diff --git a/crates/broker/src/runtime/maintenance.rs b/crates/broker/src/runtime/maintenance.rs index ba10d9bef..64fef4aa6 100644 --- a/crates/broker/src/runtime/maintenance.rs +++ b/crates/broker/src/runtime/maintenance.rs @@ -57,33 +57,10 @@ impl BrokerRuntime { ) .await { - Ok(Some((worker_name, attempts, event_id))) => { - if was_retry { - let _ = send_event( - sdk_out_tx, - json!({ - "kind":"delivery_retry", - "name": worker_name, - "delivery_id": delivery_id, - "event_id": event_id, - "attempts": attempts, - }), - ) - .await; - } - } - Ok(None) => { - if was_retry { - let _ = send_event( - sdk_out_tx, - json!({ - "kind": "delivery_dropped", - "delivery_id": delivery_id, - "reason": "max_retries_exceeded", - }), - ) - .await; - } + Ok(outcome) => { + let _ = + emit_delivery_attempt_outcome(sdk_out_tx, &delivery_id, was_retry, outcome) + .await; } Err(error) => { let _ = send_error( @@ -106,7 +83,8 @@ impl BrokerRuntime { vec![] } }; - for (name, code, signal) in &exited { + for (name, code, signal, exit_reason) in &exited { + let lifecycle_reason = exit_reason.as_deref().unwrap_or("worker_exited"); // Record crash in insights let (category, description) = crate::crash_insights::CrashInsights::analyze(*code, signal.as_deref()); @@ -166,18 +144,24 @@ impl BrokerRuntime { } Some(RestartDecision::PermanentlyDead { reason }) => { workers.metrics.on_permanent_death(name); - let dropped = drop_pending_for_worker(pending_deliveries, name); - if dropped > 0 { + let dropped = take_pending_for_worker(pending_deliveries, name); + if !dropped.is_empty() { let _ = send_event( sdk_out_tx, json!({ "kind":"delivery_dropped", "name": name, - "count": dropped, + "count": dropped.len(), "reason":"worker_permanently_dead", }), ) .await; + let _ = emit_dropped_delivery_failures( + sdk_out_tx, + &dropped, + "worker_permanently_dead", + ) + .await; } fail_pending_requests_for_worker( pending_requests, @@ -213,24 +197,33 @@ impl BrokerRuntime { } None => { // Not supervised — original behavior - let dropped = drop_pending_for_worker(pending_deliveries, name); - if dropped > 0 { + let dropped = take_pending_for_worker(pending_deliveries, name); + if !dropped.is_empty() { let _ = send_event( sdk_out_tx, json!({ "kind":"delivery_dropped", "name": name, - "count": dropped, + "count": dropped.len(), "reason":"worker_exited", }), ) .await; + let _ = + emit_dropped_delivery_failures(sdk_out_tx, &dropped, "worker_exited") + .await; } fail_pending_requests_for_worker(pending_requests, name, "worker_exited"); delivery_states.remove(name); let _ = send_event( sdk_out_tx, - json!({"kind":"agent_exited","name":name,"code":code,"signal":signal}), + json!({ + "kind":"agent_exited", + "name":name, + "code":code, + "signal":signal, + "reason": lifecycle_reason, + }), ) .await; publish_agent_state_transition( diff --git a/crates/broker/src/runtime/mod.rs b/crates/broker/src/runtime/mod.rs index ddb5af25c..03a8c7f9b 100644 --- a/crates/broker/src/runtime/mod.rs +++ b/crates/broker/src/runtime/mod.rs @@ -28,7 +28,7 @@ use uuid::Uuid; use crate::{ dedup::DedupCache, protocol::{ - AgentRuntime, AgentSpec, HeadlessProvider as ProtocolHeadlessProvider, + AgentRuntime, AgentSpec, BrokerEvent, HeadlessProvider as ProtocolHeadlessProvider, MessageInjectionMode, ProtocolEnvelope, RelayDelivery, PROTOCOL_VERSION, }, relaycast::{ diff --git a/crates/broker/src/runtime/relaycast_events.rs b/crates/broker/src/runtime/relaycast_events.rs index 63f5d5adf..aee75bdfc 100644 --- a/crates/broker/src/runtime/relaycast_events.rs +++ b/crates/broker/src/runtime/relaycast_events.rs @@ -88,12 +88,18 @@ impl BrokerRuntime { match workers.release(&name).await { Ok(()) => { workspace_http.forget_agent_registration(&name); - let dropped = drop_pending_for_worker(pending_deliveries, &name); - if dropped > 0 { + let dropped = take_pending_for_worker(pending_deliveries, &name); + if !dropped.is_empty() { let _ = send_event( sdk_out_tx, - json!({"kind":"delivery_dropped","name":name,"count":dropped,"reason":"agent_released"}), + json!({"kind":"delivery_dropped","name":name,"count":dropped.len(),"reason":"agent_released"}), ).await; + let _ = emit_dropped_delivery_failures( + sdk_out_tx, + &dropped, + "agent_released", + ) + .await; } fail_pending_requests_for_worker( pending_requests, diff --git a/crates/broker/src/runtime/tests.rs b/crates/broker/src/runtime/tests.rs index 60ee0e2eb..e3332cfa2 100644 --- a/crates/broker/src/runtime/tests.rs +++ b/crates/broker/src/runtime/tests.rs @@ -7,7 +7,7 @@ use std::{ }; use crate::protocol::{AgentSpec, MessageInjectionMode, RelayDelivery}; -use crate::worker::{WorkerEvent, WorkerHandle, WorkerRegistry}; +use crate::worker::{AgentWorkState, WorkerEvent, WorkerHandle, WorkerRegistry}; use crate::{ broker::injection_format::format_injection, util::{ @@ -23,16 +23,18 @@ use tokio::sync::mpsc; use super::{ build_agent_state_transition_event, build_http_api_spawn_spec, build_thread_infos, - channels_from_csv, continuity_dir, delivery_retry_interval, derive_ws_base_url_from_http, - display_target_for_dashboard, drop_pending_for_worker, ensure_ephemeral_paths, - extract_mcp_message_ids, http_api_event_emit_timeout, http_api_local_delivery_timeout, - http_api_relaycast_send_timeout, is_relaycast_self_control_target, - is_unknown_worker_error_message, normalize_channel, normalize_initial_task, normalize_sender, - parse_sort_key_from_raw_timestamp, queue_inbound_for_delivery_mode, - relaycast_spawn_control_dedup_key, relaycast_ws_control_dedup_key, - relaycast_ws_should_apply_local_spawn_echo_dedup, relaycast_ws_spawn_token, - sender_is_dashboard_label, should_clear_pending_delivery_for_event, AgentRuntime, - InboundContext, InboundQueueOutcome, PendingDelivery, ProtocolHeadlessProvider, + channels_from_csv, clear_pending_delivery_if_event_matches, continuity_dir, + delivery_retry_interval, derive_ws_base_url_from_http, display_target_for_dashboard, + drop_pending_for_worker, emit_delivery_attempt_outcome, emit_dropped_delivery_failures, + ensure_ephemeral_paths, extract_mcp_message_ids, http_api_event_emit_timeout, + http_api_local_delivery_timeout, http_api_relaycast_send_timeout, + is_relaycast_self_control_target, is_unknown_worker_error_message, normalize_channel, + normalize_initial_task, normalize_sender, parse_sort_key_from_raw_timestamp, + queue_inbound_for_delivery_mode, relaycast_spawn_control_dedup_key, + relaycast_ws_control_dedup_key, relaycast_ws_should_apply_local_spawn_echo_dedup, + relaycast_ws_spawn_token, retry_pending_delivery, sender_is_dashboard_label, + should_clear_pending_delivery_for_event, AgentRuntime, DeliveryAttemptOutcome, InboundContext, + InboundQueueOutcome, PendingDelivery, ProtocolHeadlessProvider, MAX_DELIVERY_RETRIES, }; use crate::dedup::DedupCache; use crate::relaycast::{format_worker_preregistration_error, RelaycastRegistrationError}; @@ -80,6 +82,10 @@ async fn make_worker_registry_with_worker(name: &str) -> WorkerRegistry { child, stdin, spawned_at: Instant::now(), + last_activity_at: Instant::now(), + context_budget_pct: None, + state: AgentWorkState::Working, + exit_reason: None, }, ); registry @@ -197,6 +203,226 @@ async fn inbound_queue_worker_missing_does_not_create_state() { assert!(delivery_states.is_empty()); } +#[tokio::test] +async fn delivery_retry_fails_promptly_when_recipient_is_gone() { + let (tx, _rx) = mpsc::channel::(16); + let mut workers = WorkerRegistry::new( + tx, + Vec::new(), + PathBuf::from("/tmp/agent-relay-broker-tests"), + Instant::now(), + ); + let mut pending_deliveries = HashMap::from([( + "del_gone".to_string(), + PendingDelivery { + worker_name: "ghost".to_string(), + delivery: RelayDelivery { + delivery_id: "del_gone".to_string(), + event_id: "evt_gone".to_string(), + workspace_id: Some("ws_demo".to_string()), + workspace_alias: Some("Demo".to_string()), + from: "Lead".to_string(), + target: "Worker".to_string(), + body: "hello".to_string(), + thread_id: None, + priority: Some(2), + injection_mode: MessageInjectionMode::Wait, + }, + attempts: 3, + next_retry_at: Instant::now(), + queued_at_ms: super::unix_timestamp_millis(), + last_error: Some("failed writing frame".to_string()), + }, + )]); + + let outcome = retry_pending_delivery( + "del_gone", + &mut workers, + &mut pending_deliveries, + Duration::from_millis(1), + ) + .await + .expect("retry should classify missing recipient"); + + assert_eq!( + outcome, + DeliveryAttemptOutcome::Failed { + worker_name: "ghost".to_string(), + delivery_id: "del_gone".to_string(), + event_id: "evt_gone".to_string(), + from: "Lead".to_string(), + to: "Worker".to_string(), + attempts: 3, + last_error: "recipient gone".to_string(), + } + ); + assert!( + pending_deliveries.is_empty(), + "terminal failed deliveries are removed so they cannot retry forever" + ); +} + +#[tokio::test] +async fn delivery_retry_transient_blip_emits_failed_event_for_present_worker() { + let worker_name = "worker-blip"; + let mut workers = make_worker_registry_with_worker(worker_name).await; + { + let handle = workers + .workers + .get_mut(worker_name) + .expect("present worker handle"); + let _ = handle.child.start_kill(); + let _ = handle.child.wait().await; + } + assert!( + workers.has_worker(worker_name), + "transient-blip regression must keep the recipient present" + ); + + let mut pending_deliveries = HashMap::from([( + "del_blip".to_string(), + PendingDelivery { + worker_name: worker_name.to_string(), + delivery: RelayDelivery { + delivery_id: "del_blip".to_string(), + event_id: "evt_blip".to_string(), + workspace_id: Some("ws_demo".to_string()), + workspace_alias: Some("Demo".to_string()), + from: "orchestrator".to_string(), + target: worker_name.to_string(), + body: "transient auth blip".to_string(), + thread_id: None, + priority: Some(2), + injection_mode: MessageInjectionMode::Wait, + }, + attempts: 0, + next_retry_at: Instant::now(), + queued_at_ms: super::unix_timestamp_millis(), + last_error: None, + }, + )]); + + let mut final_outcome = None; + for retry_index in 1..=MAX_DELIVERY_RETRIES { + match retry_pending_delivery( + "del_blip", + &mut workers, + &mut pending_deliveries, + Duration::from_millis(1), + ) + .await + { + Ok(outcome @ DeliveryAttemptOutcome::Failed { attempts, .. }) => { + assert_eq!(attempts, MAX_DELIVERY_RETRIES); + assert!( + retry_index <= MAX_DELIVERY_RETRIES, + "retry loop must terminate within the retry cap" + ); + final_outcome = Some(outcome); + break; + } + Ok(DeliveryAttemptOutcome::Noop) => { + assert!( + retry_index < MAX_DELIVERY_RETRIES, + "the final bounded retry should return a terminal failure" + ); + let pending = pending_deliveries + .get("del_blip") + .expect("delivery remains pending before terminal failure"); + assert_eq!(pending.attempts, retry_index); + assert!(pending + .last_error + .as_deref() + .unwrap_or_default() + .contains("failed writing frame to worker 'worker-blip'")); + } + Ok(other) => panic!("closed worker stdin should not report {other:?}"), + Err(error) => panic!("transient delivery write errors should stay queued: {error}"), + } + } + + let outcome = final_outcome.expect("present worker write blip must terminate as failed"); + assert!( + pending_deliveries.is_empty(), + "terminal failed deliveries are removed so they cannot stall silently" + ); + + let (sdk_out_tx, mut sdk_out_rx) = mpsc::channel(4); + emit_delivery_attempt_outcome(&sdk_out_tx, "del_blip", true, outcome) + .await + .expect("failed outcome should emit to sdk_out_tx"); + + let frame = tokio::time::timeout(Duration::from_secs(1), sdk_out_rx.recv()) + .await + .expect("orchestrator should receive delivery failure event promptly") + .expect("sdk_out_tx should remain open"); + assert_eq!(frame.msg_type, "event"); + assert_eq!(frame.payload["kind"], "message_delivery_failed"); + assert_eq!(frame.payload["name"], worker_name); + assert_eq!(frame.payload["delivery_id"], "del_blip"); + assert_eq!(frame.payload["event_id"], "evt_blip"); + assert_eq!(frame.payload["from"], "orchestrator"); + assert_eq!(frame.payload["to"], worker_name); + assert_eq!( + frame.payload["attempts"].as_u64(), + Some(u64::from(MAX_DELIVERY_RETRIES)) + ); + assert!(frame.payload["lastError"] + .as_str() + .unwrap_or_default() + .contains("failed writing frame to worker 'worker-blip'")); + assert!( + frame.payload.get("last_error").is_none(), + "wire event should use the typed lastError field only" + ); +} + +#[tokio::test] +async fn delivery_retry_success_clears_stale_last_error() { + let worker_name = "worker-clear-error"; + let mut workers = make_worker_registry_with_worker(worker_name).await; + let mut pending_deliveries = HashMap::from([( + "del_clear".to_string(), + PendingDelivery { + worker_name: worker_name.to_string(), + delivery: RelayDelivery { + delivery_id: "del_clear".to_string(), + event_id: "evt_clear".to_string(), + workspace_id: Some("ws_demo".to_string()), + workspace_alias: Some("Demo".to_string()), + from: "orchestrator".to_string(), + target: worker_name.to_string(), + body: "clear stale error".to_string(), + thread_id: None, + priority: Some(2), + injection_mode: MessageInjectionMode::Wait, + }, + attempts: 1, + next_retry_at: Instant::now(), + queued_at_ms: super::unix_timestamp_millis(), + last_error: Some("old transient failure".to_string()), + }, + )]); + + let outcome = retry_pending_delivery( + "del_clear", + &mut workers, + &mut pending_deliveries, + Duration::from_millis(1), + ) + .await + .expect("live worker should accept retry"); + + assert!(matches!(outcome, DeliveryAttemptOutcome::Attempted { .. })); + assert_eq!( + pending_deliveries + .get("del_clear") + .and_then(|pending| pending.last_error.as_ref()), + None + ); + cleanup_worker_registry(workers).await; +} + fn extract_kind_literals(source: &str) -> BTreeSet { let marker = "\"kind\""; let mut kinds = BTreeSet::new(); @@ -957,6 +1183,8 @@ fn drop_pending_for_worker_removes_only_matching_entries() { }, attempts: 1, next_retry_at: Instant::now(), + queued_at_ms: super::unix_timestamp_millis(), + last_error: None, }, ); pending.insert( @@ -977,6 +1205,8 @@ fn drop_pending_for_worker_removes_only_matching_entries() { }, attempts: 1, next_retry_at: Instant::now(), + queued_at_ms: super::unix_timestamp_millis(), + last_error: None, }, ); @@ -986,6 +1216,48 @@ fn drop_pending_for_worker_removes_only_matching_entries() { assert!(!pending.contains_key("del_1")); } +#[tokio::test] +async fn dropped_pending_deliveries_emit_terminal_message_failures() { + let pending = PendingDelivery { + worker_name: "A".to_string(), + delivery: RelayDelivery { + delivery_id: "del_1".to_string(), + event_id: "evt_1".to_string(), + workspace_id: Some("ws_test".to_string()), + workspace_alias: Some("test".to_string()), + from: "Lead".to_string(), + target: "A".to_string(), + body: "hello".to_string(), + thread_id: None, + priority: None, + injection_mode: MessageInjectionMode::Wait, + }, + attempts: 2, + next_retry_at: Instant::now(), + queued_at_ms: super::unix_timestamp_millis(), + last_error: Some("previous blip".to_string()), + }; + let (sdk_out_tx, mut sdk_out_rx) = mpsc::channel(4); + + emit_dropped_delivery_failures(&sdk_out_tx, &[pending], "worker_permanently_dead") + .await + .expect("dropped delivery failure should emit"); + + let frame = tokio::time::timeout(Duration::from_secs(1), sdk_out_rx.recv()) + .await + .expect("terminal failure should be emitted") + .expect("sdk_out_tx should remain open"); + assert_eq!(frame.msg_type, "event"); + assert_eq!(frame.payload["kind"], "message_delivery_failed"); + assert_eq!(frame.payload["name"], "A"); + assert_eq!(frame.payload["delivery_id"], "del_1"); + assert_eq!(frame.payload["event_id"], "evt_1"); + assert_eq!(frame.payload["from"], "Lead"); + assert_eq!(frame.payload["to"], "A"); + assert_eq!(frame.payload["attempts"].as_u64(), Some(2)); + assert_eq!(frame.payload["lastError"], "worker_permanently_dead"); +} + #[test] fn should_clear_pending_delivery_when_event_id_matches() { let pending = PendingDelivery { @@ -1004,6 +1276,8 @@ fn should_clear_pending_delivery_when_event_id_matches() { }, attempts: 1, next_retry_at: Instant::now(), + queued_at_ms: super::unix_timestamp_millis(), + last_error: None, }; assert!(should_clear_pending_delivery_for_event( @@ -1016,6 +1290,43 @@ fn should_clear_pending_delivery_when_event_id_matches() { )); } +#[test] +fn clear_pending_delivery_returns_none_for_stale_event_id() { + let mut pending = HashMap::from([( + "del_1".to_string(), + PendingDelivery { + worker_name: "A".to_string(), + delivery: RelayDelivery { + delivery_id: "del_1".to_string(), + event_id: "evt_current".to_string(), + workspace_id: Some("ws_test".to_string()), + workspace_alias: Some("test".to_string()), + from: "x".to_string(), + target: "#general".to_string(), + body: "hello".to_string(), + thread_id: None, + priority: None, + injection_mode: MessageInjectionMode::Wait, + }, + attempts: 1, + next_retry_at: Instant::now(), + queued_at_ms: super::unix_timestamp_millis(), + last_error: None, + }, + )]); + + let removed = clear_pending_delivery_if_event_matches( + &mut pending, + "del_1", + Some("evt_stale"), + "A", + "delivery_failed", + ); + + assert!(removed.is_none()); + assert!(pending.contains_key("del_1")); +} + #[test] fn should_clear_pending_delivery_without_event_id_for_compatibility() { let pending = PendingDelivery { @@ -1034,6 +1345,8 @@ fn should_clear_pending_delivery_without_event_id_for_compatibility() { }, attempts: 1, next_retry_at: Instant::now(), + queued_at_ms: super::unix_timestamp_millis(), + last_error: None, }; assert!(should_clear_pending_delivery_for_event( diff --git a/crates/broker/src/runtime/worker_events.rs b/crates/broker/src/runtime/worker_events.rs index 1f4ff3fea..abb5a16a4 100644 --- a/crates/broker/src/runtime/worker_events.rs +++ b/crates/broker/src/runtime/worker_events.rs @@ -1,4 +1,5 @@ use super::*; +use crate::worker::AgentWorkState; impl BrokerRuntime { pub(super) async fn handle_worker_event(&mut self, worker_event: WorkerEvent) { @@ -35,18 +36,23 @@ impl BrokerRuntime { return; } - if let Ok(ack) = + let pending_for_confirmation = if let Ok(ack) = serde_json::from_value::(payload.clone()) { - clear_pending_delivery_if_event_matches( + let pending = clear_pending_delivery_if_event_matches( pending_deliveries, &ack.delivery_id, Some(&ack.event_id), &name, "delivery_ack", ); - terminal_failed_deliveries.remove(&ack.delivery_id); - } + if pending.is_some() { + terminal_failed_deliveries.remove(&ack.delivery_id); + } + pending + } else { + None + }; let _ = send_event( sdk_out_tx, json!({ @@ -58,35 +64,41 @@ impl BrokerRuntime { }), ) .await; + if let Some(pending) = pending_for_confirmation { + if let Some(handle) = workers.workers.get_mut(&name) { + handle.last_activity_at = Instant::now(); + handle.state = AgentWorkState::Working; + } + let _ = send_broker_event( + sdk_out_tx, + BrokerEvent::MessageDeliveryConfirmed { + name: name.clone(), + delivery_id: pending.delivery.delivery_id, + event_id: pending.delivery.event_id, + from: pending.delivery.from, + to: pending.delivery.target, + }, + ) + .await; + } } - } else if msg_type == "delivery_queued" { - if let Some(payload) = value.get("payload") { - let _ = send_event( - sdk_out_tx, - json!({ - "kind": msg_type, - "name": name, - "delivery_id": payload.get("delivery_id"), - "event_id": payload.get("event_id"), - "timestamp": payload.get("timestamp"), - }), - ) - .await; - } - } else if msg_type == "delivery_injected" { + } else if msg_type == "delivery_queued" || msg_type == "delivery_injected" { if let Some(payload) = value.get("payload") { let delivery_id = payload .get("delivery_id") .and_then(Value::as_str) .unwrap_or(""); - let event_id = payload.get("event_id").and_then(Value::as_str); - clear_pending_delivery_if_event_matches( - pending_deliveries, - delivery_id, - event_id, - &name, - "delivery_injected", - ); + if let Some(pending) = pending_deliveries.get_mut(delivery_id) { + pending.next_retry_at = Instant::now() + + std::cmp::max( + delivery_retry_interval, + crate::broker::delivery_verification::VERIFICATION_WINDOW, + ); + } + if let Some(handle) = workers.workers.get_mut(&name) { + handle.last_activity_at = Instant::now(); + handle.state = AgentWorkState::Working; + } let _ = send_event( sdk_out_tx, json!({ @@ -116,7 +128,7 @@ impl BrokerRuntime { event_id = %event_id, "delivery verified by echo detection" ); - clear_pending_delivery_if_event_matches( + let pending_for_confirmation = clear_pending_delivery_if_event_matches( pending_deliveries, delivery_id, Some(event_id), @@ -133,9 +145,30 @@ impl BrokerRuntime { }), ) .await; + if let Some(pending) = pending_for_confirmation { + if let Some(handle) = workers.workers.get_mut(&name) { + handle.last_activity_at = Instant::now(); + handle.state = AgentWorkState::Working; + } + let _ = send_broker_event( + sdk_out_tx, + BrokerEvent::MessageDeliveryConfirmed { + name: name.clone(), + delivery_id: pending.delivery.delivery_id, + event_id: pending.delivery.event_id, + from: pending.delivery.from, + to: pending.delivery.target, + }, + ) + .await; + } } } else if msg_type == "delivery_active" { if let Some(payload) = value.get("payload") { + if let Some(handle) = workers.workers.get_mut(&name) { + handle.last_activity_at = Instant::now(); + handle.state = AgentWorkState::Working; + } let _ = send_event( sdk_out_tx, json!({ @@ -170,14 +203,14 @@ impl BrokerRuntime { reason = %reason, "delivery failed — echo not detected" ); - clear_pending_delivery_if_event_matches( + let pending_for_failure = clear_pending_delivery_if_event_matches( pending_deliveries, delivery_id, Some(event_id), &name, "delivery_failed", ); - if !delivery_id.is_empty() { + if pending_for_failure.is_some() && !delivery_id.is_empty() { terminal_failed_deliveries.insert(delivery_id.to_string()); } let _ = send_event( @@ -191,6 +224,25 @@ impl BrokerRuntime { }), ) .await; + if let Some(pending) = pending_for_failure { + if let Some(handle) = workers.workers.get_mut(&name) { + handle.last_activity_at = Instant::now(); + handle.state = AgentWorkState::Working; + } + let _ = send_broker_event( + sdk_out_tx, + BrokerEvent::MessageDeliveryFailed { + name: name.clone(), + delivery_id: Some(pending.delivery.delivery_id), + event_id: Some(pending.delivery.event_id), + from: pending.delivery.from, + to: pending.delivery.target, + attempts: pending.attempts, + last_error: reason.to_string(), + }, + ) + .await; + } } } else if msg_type == "worker_error" { let _ = send_event( @@ -226,6 +278,10 @@ impl BrokerRuntime { ); } } else if msg_type == "worker_stream" { + if let Some(handle) = workers.workers.get_mut(&name) { + handle.last_activity_at = Instant::now(); + handle.state = AgentWorkState::Working; + } let _ = send_event(sdk_out_tx, json!({ "kind": "worker_stream", "name": name, @@ -289,12 +345,18 @@ impl BrokerRuntime { .and_then(|p| p.get("idle_secs")) .and_then(Value::as_u64) .unwrap_or(0); + let since = + chrono::Utc::now() - chrono::Duration::seconds(idle_secs as i64); + if let Some(handle) = workers.workers.get_mut(&name) { + handle.state = AgentWorkState::Idle; + } let _ = send_event( sdk_out_tx, json!({ "kind": "agent_idle", "name": name, "idle_secs": idle_secs, + "since": since, }), ) .await; @@ -305,12 +367,67 @@ impl BrokerRuntime { Some("idle_threshold"), ) .await; + } else if msg_type == "agent_blocked_on_send" { + let blocked_secs = value + .get("payload") + .and_then(|p| p.get("blocked_secs")) + .and_then(Value::as_u64) + .unwrap_or(0); + let pending_delivery_count = value + .get("payload") + .and_then(|p| p.get("pending_delivery_count")) + .and_then(Value::as_u64) + .unwrap_or(0) + as usize; + if let Some(handle) = workers.workers.get_mut(&name) { + handle.last_activity_at = Instant::now(); + handle.state = AgentWorkState::BlockedOnSend; + } + let _ = send_broker_event( + sdk_out_tx, + BrokerEvent::AgentBlockedOnSend { + name: name.clone(), + blocked_secs, + pending_delivery_count, + }, + ) + .await; + publish_agent_state_transition( + ws_control_tx, + &name, + "stuck", + Some("blocked_on_send"), + ) + .await; + } else if msg_type == "agent_context_low" { + let pct = value + .get("payload") + .and_then(|p| p.get("pct")) + .and_then(Value::as_u64) + .unwrap_or(0) + .min(100) as u8; + if let Some(handle) = workers.workers.get_mut(&name) { + handle.context_budget_pct = Some(pct); + handle.last_activity_at = Instant::now(); + } + let _ = send_broker_event( + sdk_out_tx, + BrokerEvent::AgentContextLow { + name: name.clone(), + pct, + }, + ) + .await; } else if msg_type == "agent_exit" { let reason = value .get("payload") .and_then(|p| p.get("reason")) .and_then(Value::as_str) .unwrap_or("unknown"); + if let Some(handle) = workers.workers.get_mut(&name) { + handle.exit_reason = Some(reason.to_string()); + handle.last_activity_at = Instant::now(); + } tracing::info!(agent = %name, reason = %reason, "agent requested exit"); let _ = send_event( sdk_out_tx, diff --git a/crates/broker/src/swarm.rs b/crates/broker/src/swarm.rs index 304bcb51f..f9fba5461 100644 --- a/crates/broker/src/swarm.rs +++ b/crates/broker/src/swarm.rs @@ -16,7 +16,7 @@ use tokio::process::{Child, ChildStdin, ChildStdout, Command}; use tokio::sync::mpsc; use tokio::time::{interval, timeout, Instant}; -const PROTOCOL_VERSION: u32 = 1; +const PROTOCOL_VERSION: u32 = 2; const DEFAULT_PATTERN: &str = "fan-out"; const DEFAULT_TEAMS: usize = 2; const DEFAULT_TIMEOUT: &str = "300s"; diff --git a/crates/broker/src/worker.rs b/crates/broker/src/worker.rs index 0fc9f3e3e..dffe44efa 100644 --- a/crates/broker/src/worker.rs +++ b/crates/broker/src/worker.rs @@ -12,6 +12,7 @@ use crate::{ supervisor::Supervisor, }; use anyhow::{Context, Result}; +use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, @@ -37,6 +38,28 @@ pub(crate) struct WorkerHandle { pub(crate) child: Child, pub(crate) stdin: ChildStdin, pub(crate) spawned_at: Instant, + pub(crate) last_activity_at: Instant, + pub(crate) context_budget_pct: Option, + pub(crate) state: AgentWorkState, + pub(crate) exit_reason: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub(crate) enum AgentWorkState { + Working, + Idle, + BlockedOnSend, +} + +impl AgentWorkState { + pub(crate) fn as_str(self) -> &'static str { + match self { + AgentWorkState::Working => "working", + AgentWorkState::Idle => "idle", + AgentWorkState::BlockedOnSend => "blocked_on_send", + } + } } #[derive(Debug, Clone)] @@ -113,6 +136,11 @@ impl WorkerRegistry { "channels": handle.spec.channels, "parent": handle.parent, "pid": handle.child.id(), + "last_activity_ms": handle.last_activity_at.elapsed().as_millis() as u64, + "last_activity_at": chrono::Utc::now() + - chrono::Duration::from_std(handle.last_activity_at.elapsed()).unwrap_or_default(), + "context_budget_pct": handle.context_budget_pct, + "current_state": handle.state.as_str(), }) }) .collect() @@ -400,6 +428,10 @@ impl WorkerRegistry { child, stdin, spawned_at: Instant::now(), + last_activity_at: Instant::now(), + context_budget_pct: None, + state: AgentWorkState::Working, + exit_reason: None, }; self.workers.insert(spec.name.clone(), handle); @@ -515,7 +547,7 @@ impl WorkerRegistry { pub(crate) async fn reap_exited( &mut self, - ) -> Result, Option)>> { + ) -> Result, Option, Option)>> { let names: Vec = self.workers.keys().cloned().collect(); let mut exited = Vec::new(); for name in names { @@ -580,13 +612,21 @@ impl WorkerRegistry { }; #[cfg(not(unix))] let signal: Option = None; + let reason = self + .workers + .get(&name) + .and_then(|handle| handle.exit_reason.clone()); self.workers.remove(&name); self.initial_tasks.remove(&name); - exited.push((name, code, signal)); + exited.push((name, code, signal, reason)); } else if gone_via_kill0 { + let reason = self + .workers + .get(&name) + .and_then(|handle| handle.exit_reason.clone()); self.workers.remove(&name); self.initial_tasks.remove(&name); - exited.push((name, None, None)); + exited.push((name, None, None, reason)); } } Ok(exited) diff --git a/packages/contracts/fixtures/event-fixtures.json b/packages/contracts/fixtures/event-fixtures.json index 617dc518f..5d5ee0a7e 100644 --- a/packages/contracts/fixtures/event-fixtures.json +++ b/packages/contracts/fixtures/event-fixtures.json @@ -9,17 +9,15 @@ "agent_restarting", "agent_restarted", "agent_permanently_dead", + "agent_context_low", + "agent_blocked_on_send", "delivery_verified", "delivery_failed", + "message_delivery_confirmed", + "message_delivery_failed", "worker_error" ], - "message_lifecycle_states": [ - "queued", - "injected", - "verified", - "failed", - "uncertain" - ], + "message_lifecycle_states": ["queued", "injected", "verified", "failed", "uncertain"], "contract_events": [ { "kind": "relay_inbound", @@ -49,7 +47,8 @@ "timestamp": "2026-02-23T00:00:02.000Z", "payload": { "name": "WorkerA", - "code": 0 + "code": 0, + "reason": "worker_exited" } }, { @@ -142,6 +141,53 @@ "code": "worker_unresponsive", "message": "worker output stream stalled" } + }, + { + "kind": "message_delivery_confirmed", + "eventId": "evt_0013", + "seq": 113, + "timestamp": "2026-02-23T00:00:12.000Z", + "payload": { + "deliveryId": "del_3", + "name": "WorkerA", + "from": "Lead", + "to": "WorkerA" + } + }, + { + "kind": "message_delivery_failed", + "eventId": "evt_0014", + "seq": 114, + "timestamp": "2026-02-23T00:00:13.000Z", + "payload": { + "deliveryId": "del_4", + "name": "WorkerA", + "from": "Lead", + "to": "WorkerA", + "attempts": 10, + "lastError": "recipient gone" + } + }, + { + "kind": "agent_context_low", + "eventId": "evt_0015", + "seq": 115, + "timestamp": "2026-02-23T00:00:14.000Z", + "payload": { + "name": "WorkerA", + "pct": 6 + } + }, + { + "kind": "agent_blocked_on_send", + "eventId": "evt_0016", + "seq": 116, + "timestamp": "2026-02-23T00:00:15.000Z", + "payload": { + "name": "WorkerA", + "blockedSecs": 120, + "pendingDeliveryCount": 1 + } } ], "wave0_timeout_terminal_semantics": { @@ -151,11 +197,6 @@ "late_event_kind": "delivery_ack" }, "wave0_broadcast_whitelist": { - "required_kinds": [ - "relay_inbound", - "agent_spawned", - "agent_exited", - "delivery_ack" - ] + "required_kinds": ["relay_inbound", "agent_spawned", "agent_exited", "delivery_ack"] } } diff --git a/packages/sdk/src/__tests__/contract-fixtures.test.ts b/packages/sdk/src/__tests__/contract-fixtures.test.ts index 9cc395ca4..179619d77 100644 --- a/packages/sdk/src/__tests__/contract-fixtures.test.ts +++ b/packages/sdk/src/__tests__/contract-fixtures.test.ts @@ -49,8 +49,17 @@ function toCurrentSdkBrokerEventShape(event: Record): Record): Record): boolean { case 'agent_spawned': return typeof event.name === 'string' && typeof event.runtime === 'string'; case 'agent_exited': - return typeof event.name === 'string'; + return ( + typeof event.name === 'string' && + (event.code === undefined || typeof event.code === 'number') && + (event.signal === undefined || typeof event.signal === 'string') && + (event.reason === undefined || typeof event.reason === 'string') + ); case 'agent_released': return typeof event.name === 'string'; case 'worker_ready': @@ -148,6 +203,32 @@ function isCurrentSdkBrokerEventShape(event: Record): boolean { return ( typeof event.name === 'string' && typeof event.code === 'string' && typeof event.message === 'string' ); + case 'message_delivery_confirmed': + return ( + typeof event.name === 'string' && + typeof event.delivery_id === 'string' && + typeof event.event_id === 'string' && + typeof event.from === 'string' && + typeof event.to === 'string' + ); + case 'message_delivery_failed': + return ( + typeof event.name === 'string' && + typeof event.delivery_id === 'string' && + typeof event.event_id === 'string' && + typeof event.from === 'string' && + typeof event.to === 'string' && + typeof event.attempts === 'number' && + typeof event.lastError === 'string' + ); + case 'agent_context_low': + return typeof event.name === 'string' && typeof event.pct === 'number'; + case 'agent_blocked_on_send': + return ( + typeof event.name === 'string' && + typeof event.blocked_secs === 'number' && + typeof event.pending_delivery_count === 'number' + ); case 'agent_restarting': case 'agent_restarted': case 'agent_permanently_dead': diff --git a/packages/sdk/src/__tests__/orchestration-upgrades.test.ts b/packages/sdk/src/__tests__/orchestration-upgrades.test.ts index aa7fe3fe4..84f7fba03 100644 --- a/packages/sdk/src/__tests__/orchestration-upgrades.test.ts +++ b/packages/sdk/src/__tests__/orchestration-upgrades.test.ts @@ -1007,7 +1007,7 @@ describe('AgentRelay orchestration handles', () => { } }); - it('sendAndWaitForDelivery waits for delivery ack with typed response', async () => { + it('sendAndWaitForDelivery tracks delivery_ack without resolving before terminal status', async () => { const { client, mock, emit } = createMockFacadeClient(); vi.spyOn(AgentRelayClient, 'start').mockResolvedValue(client); @@ -1034,6 +1034,49 @@ describe('AgentRelay orchestration handles', () => { delivery_id: 'del_1', event_id: 'evt_1', }); + emit({ + kind: 'message_delivery_failed', + name: 'worker', + delivery_id: 'del_1', + event_id: 'evt_1', + from: 'Lead', + to: 'worker', + attempts: 1, + lastError: 'worker_permanently_dead', + }); + + await expect(wait).resolves.toEqual({ + eventId: 'evt_1', + status: 'failed', + targets: ['worker'], + }); + } finally { + await relay.shutdown(); + } + }); + + it('sendAndWaitForDelivery resolves on message_delivery_confirmed', async () => { + const { client, mock, emit } = createMockFacadeClient(); + vi.spyOn(AgentRelayClient, 'start').mockResolvedValue(client); + + const relay = new AgentRelay(); + try { + const wait = relay.sendAndWaitForDelivery({ + to: 'worker', + text: 'hello', + }); + + await vi.waitFor(() => { + expect(mock.onEvent).toHaveBeenCalledTimes(2); + }); + emit({ + kind: 'message_delivery_confirmed', + name: 'worker', + delivery_id: 'del_1', + event_id: 'evt_1', + from: 'Lead', + to: 'worker', + }); await expect(wait).resolves.toEqual({ eventId: 'evt_1', @@ -1045,6 +1088,81 @@ describe('AgentRelay orchestration handles', () => { } }); + it('sendAndWaitForDelivery ignores legacy delivery_failed until typed terminal event', async () => { + const { client, mock, emit } = createMockFacadeClient(); + vi.spyOn(AgentRelayClient, 'start').mockResolvedValue(client); + + const relay = new AgentRelay(); + try { + const wait = relay.sendAndWaitForDelivery({ + to: 'worker', + text: 'hello', + }); + + await vi.waitFor(() => { + expect(mock.onEvent).toHaveBeenCalledTimes(2); + }); + emit({ + kind: 'delivery_failed', + name: 'worker', + delivery_id: 'del_1', + event_id: 'evt_1', + reason: 'legacy failure', + }); + emit({ + kind: 'message_delivery_confirmed', + name: 'worker', + delivery_id: 'del_1', + event_id: 'evt_1', + from: 'Lead', + to: 'worker', + }); + + await expect(wait).resolves.toEqual({ + eventId: 'evt_1', + status: 'ack', + targets: ['worker'], + }); + } finally { + await relay.shutdown(); + } + }); + + it('sendAndWaitForDelivery fails on message_delivery_failed', async () => { + const { client, mock, emit } = createMockFacadeClient(); + vi.spyOn(AgentRelayClient, 'start').mockResolvedValue(client); + + const relay = new AgentRelay(); + try { + const wait = relay.sendAndWaitForDelivery({ + to: 'worker', + text: 'hello', + }); + + await vi.waitFor(() => { + expect(mock.onEvent).toHaveBeenCalledTimes(2); + }); + emit({ + kind: 'message_delivery_failed', + name: 'worker', + delivery_id: 'del_1', + event_id: 'evt_1', + from: 'Lead', + to: 'worker', + attempts: 10, + lastError: 'recipient gone', + }); + + await expect(wait).resolves.toEqual({ + eventId: 'evt_1', + status: 'failed', + targets: ['worker'], + }); + } finally { + await relay.shutdown(); + } + }); + it('sendAndWaitForDelivery timeout remains terminal in delivery state timeline (Wave 0 contract)', async () => { const { client, mock, emit } = createMockFacadeClient(); vi.spyOn(AgentRelayClient, 'start').mockResolvedValue(client); @@ -1187,6 +1305,17 @@ describe('AgentRelay orchestration handles', () => { reason: 'broken pipe', }); expect(relay.getDeliveryState('evt-state')?.status).toBe('failed'); + + emit({ + kind: 'message_delivery_failed', + name: 'worker', + event_id: 'evt-terminal', + from: 'Lead', + to: 'worker', + attempts: 1, + lastError: 'worker_permanently_dead', + }); + expect(relay.getDeliveryState('evt-terminal')?.status).toBe('failed'); expect(relay.getDeliveryState('missing-event')).toBeUndefined(); } finally { await relay.shutdown(); @@ -1276,6 +1405,34 @@ describe('Agent.status computed getter', () => { } }); + it('copies agent_exited reason before invoking onAgentExited', async () => { + const { client, emit } = createMockFacadeClient(); + vi.spyOn(AgentRelayClient, 'start').mockResolvedValue(client); + + const relay = new AgentRelay(); + const exitedReasons: Array = []; + relay.onAgentExited = (agent) => exitedReasons.push(agent.exitReason); + try { + await relay.spawnPty({ + name: 'reason-exited', + cli: 'claude', + channels: ['general'], + }); + + emit({ + kind: 'agent_exited', + name: 'reason-exited', + code: 1, + signal: undefined, + reason: 'worker_exited', + }); + + expect(exitedReasons).toEqual(['worker_exited']); + } finally { + await relay.shutdown(); + } + }); + it('transitions from idle back to ready on worker_stream', async () => { const { client, emit } = createMockFacadeClient(); vi.spyOn(AgentRelayClient, 'start').mockResolvedValue(client); diff --git a/packages/sdk/src/protocol.ts b/packages/sdk/src/protocol.ts index 4fe28a6fc..edbace5b7 100644 --- a/packages/sdk/src/protocol.ts +++ b/packages/sdk/src/protocol.ts @@ -1,4 +1,4 @@ -export const PROTOCOL_VERSION = 1 as const; +export const PROTOCOL_VERSION = 2 as const; export type AgentRuntime = 'pty' | 'headless'; export type HeadlessProvider = 'claude' | 'opencode'; @@ -151,7 +151,12 @@ export interface PendingDeliveryInfo { delivery_id: string; worker_name: string; event_id: string; + from?: string; + to?: string; attempts: number; + queued_at_ms?: number; + age_ms?: number; + last_error?: string; } export interface BrokerStatus { @@ -166,9 +171,30 @@ export interface BrokerStatus { channels: string[]; parent?: string; pid?: number; + last_activity_at?: string; + last_activity_ms?: number; + context_budget_pct?: number | null; + current_state?: AgentCurrentState; }>; pending_delivery_count: number; pending_deliveries: PendingDeliveryInfo[]; + auth?: BrokerAuthStatus; +} + +export type AgentCurrentState = 'working' | 'idle' | 'blocked_on_send'; + +export interface BrokerAuthStatus { + authenticated: boolean; + workspace_count: number; + default_workspace_id?: string | null; + workspaces: Array<{ + workspace_id: string; + workspace_alias?: string | null; + self_name: string; + self_agent_id: string; + authenticated: boolean; + default: boolean; + }>; } export type BrokerAgentStatus = 'healthy' | 'restarting' | 'dead' | 'released'; @@ -249,6 +275,12 @@ export type BrokerEvent = name: string; code?: number; signal?: string; + reason?: string; + } + | { + kind: 'agent_context_low'; + name: string; + pct: number; } | { kind: 'relay_inbound'; @@ -318,6 +350,24 @@ export type BrokerEvent = event_id: string; reason: string; } + | { + kind: 'message_delivery_confirmed'; + name: string; + delivery_id: string; + event_id: string; + from: string; + to: string; + } + | { + kind: 'message_delivery_failed'; + name: string; + delivery_id?: string; + event_id?: string; + from: string; + to: string; + attempts: number; + lastError: string; + } | { kind: 'delivery_active'; name: string; @@ -376,6 +426,13 @@ export type BrokerEvent = kind: 'agent_idle'; name: string; idle_secs: number; + since?: string; + } + | { + kind: 'agent_blocked_on_send'; + name: string; + blocked_secs: number; + pending_delivery_count: number; } | { kind: 'agent_restarting'; @@ -445,6 +502,14 @@ export type WorkerToBroker = type: 'delivery_ack'; payload: { delivery_id: string; event_id: string }; } + | { + type: 'delivery_verified'; + payload: { delivery_id: string; event_id: string; verification?: string; reason?: string }; + } + | { + type: 'delivery_failed'; + payload: { delivery_id: string; event_id: string; reason: string }; + } | { type: 'worker_stream'; payload: { stream: string; chunk: string }; diff --git a/packages/sdk/src/relay.ts b/packages/sdk/src/relay.ts index 0faf5f429..93004dfbc 100644 --- a/packages/sdk/src/relay.ts +++ b/packages/sdk/src/relay.ts @@ -182,6 +182,8 @@ export type AgentActivityReason = | 'delivery_active' | 'delivery_ack' | 'delivery_failed' + | 'message_delivery_confirmed' + | 'message_delivery_failed' | 'relay_inbound' | 'agent_idle' | 'agent_exited' @@ -899,6 +901,7 @@ export class AgentRelay { return new Promise((resolve) => { let resolved = false; const ackedTargets = new Set(); + const confirmedTargets = new Set(); // eslint-disable-next-line prefer-const let unsubscribe: (() => void) | undefined; @@ -919,7 +922,15 @@ export class AgentRelay { result.targets.includes(event.name) ) { ackedTargets.add(event.name); - if (ackedTargets.size >= result.targets.length) { + } + + if ( + event.kind === 'message_delivery_confirmed' && + event.event_id === result.event_id && + result.targets.includes(event.name) + ) { + confirmedTargets.add(event.name); + if (confirmedTargets.size >= result.targets.length) { resolved = true; clearTimeout(timer); unsubscribe?.(); @@ -928,7 +939,7 @@ export class AgentRelay { } if ( - event.kind === 'delivery_failed' && + event.kind === 'message_delivery_failed' && event.event_id === result.event_id && result.targets.includes(event.name) ) { @@ -1547,6 +1558,9 @@ export class AgentRelay { // Populate exit info before firing the hook (agent as { exitCode?: number }).exitCode = event.code; (agent as { exitSignal?: string }).exitSignal = event.signal; + if (event.reason !== undefined) { + (agent as { exitReason?: string }).exitReason = event.reason; + } this.onAgentExited?.(agent); this.knownAgents.delete(event.name); this.outputListeners.delete(event.name); @@ -1620,6 +1634,25 @@ export class AgentRelay { this.updateDeliveryState(event.event_id, event.name, 'failed', this.resolveEventTimestamp()); break; } + case 'message_delivery_confirmed': { + this.closeAgentDelivery( + event.name, + 'message_delivery_confirmed', + event.event_id, + event.delivery_id + ); + this.updateDeliveryState(event.event_id, event.name, 'verified', this.resolveEventTimestamp()); + break; + } + case 'message_delivery_failed': { + if (event.event_id) { + this.updateDeliveryState(event.event_id, event.name, 'failed', this.resolveEventTimestamp()); + } + if (event.event_id && event.delivery_id) { + this.closeAgentDelivery(event.name, 'message_delivery_failed', event.event_id, event.delivery_id); + } + break; + } case 'worker_stream': { // Agent producing output is no longer idle this.idleAgents.delete(event.name); @@ -1645,7 +1678,7 @@ export class AgentRelay { break; } } - if (event.kind.startsWith('delivery_')) { + if (event.kind.startsWith('delivery_') || event.kind.startsWith('message_delivery_')) { this.onDeliveryUpdate?.(event); } }); diff --git a/packages/sdk/src/types.ts b/packages/sdk/src/types.ts index f9062bbeb..32f23bdb3 100644 --- a/packages/sdk/src/types.ts +++ b/packages/sdk/src/types.ts @@ -2,7 +2,13 @@ * Shared input/output types for the broker SDK. */ -import type { AgentRuntime, HeadlessProvider, MessageInjectionMode, RestartPolicy } from './protocol.js'; +import type { + AgentCurrentState, + AgentRuntime, + HeadlessProvider, + MessageInjectionMode, + RestartPolicy, +} from './protocol.js'; export interface SpawnPtyInput { name: string; @@ -96,4 +102,8 @@ export interface ListAgent { channels: string[]; parent?: string; pid?: number; + last_activity_at?: string; + last_activity_ms?: number; + context_budget_pct?: number | null; + current_state?: AgentCurrentState; } diff --git a/src/cli/commands/agent-management.test.ts b/src/cli/commands/agent-management.test.ts index c1e99b209..247c7eee0 100644 --- a/src/cli/commands/agent-management.test.ts +++ b/src/cli/commands/agent-management.test.ts @@ -1,3 +1,6 @@ +import fs from 'node:fs'; +import os from 'node:os'; +import path from 'node:path'; import { Command } from 'commander'; import { describe, expect, it, vi } from 'vitest'; @@ -78,6 +81,7 @@ function createHarness(options?: { nowIso: vi.fn(() => options?.nowIso ?? '2026-02-20T12:00:00.000Z'), killProcess: vi.fn(() => undefined), sleep: vi.fn(async () => undefined), + writeChunk: vi.fn(() => undefined), log: vi.fn(() => undefined), error: vi.fn(() => undefined), exit, @@ -120,6 +124,61 @@ describe('registerAgentManagementCommands', () => { ); }); + it('reads raw log tails through the default filesystem tail helper', async () => { + const projectRoot = fs.mkdtempSync(path.join(os.tmpdir(), 'agent-relay-logs-')); + try { + const logsDir = path.join(projectRoot, '.agent-relay', 'team', 'worker-logs'); + fs.mkdirSync(logsDir, { recursive: true }); + + const expectedTail = Buffer.from('tail bytes\n'); + const logBytes = Buffer.concat([Buffer.alloc(70 * 1024, 'a'), expectedTail]); + fs.writeFileSync(path.join(logsDir, 'WorkerTail.log'), logBytes); + + const chunks: Buffer[] = []; + const program = new Command(); + registerAgentManagementCommands(program, { + getProjectRoot: () => projectRoot, + getDataDir: () => path.join(projectRoot, 'data'), + writeChunk: (chunk) => chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)), + }); + + await runCommand(program, ['agents:logs', 'WorkerTail', '--raw', '--lines', '1']); + + const output = Buffer.concat(chunks); + expect(output.byteLength).toBe(64 * 1024); + expect(output.subarray(-expectedTail.length)).toEqual(expectedTail); + } finally { + fs.rmSync(projectRoot, { recursive: true, force: true }); + } + }); + + it('reads cooked log tails through the default filesystem tail helper', async () => { + const projectRoot = fs.mkdtempSync(path.join(os.tmpdir(), 'agent-relay-logs-')); + try { + const logsDir = path.join(projectRoot, '.agent-relay', 'team', 'worker-logs'); + fs.mkdirSync(logsDir, { recursive: true }); + + fs.writeFileSync( + path.join(logsDir, 'WorkerCookedTail.log'), + `${'x'.repeat(70 * 1024)}\nline one\nline two\n` + ); + + const log = vi.fn(() => undefined); + const program = new Command(); + registerAgentManagementCommands(program, { + getProjectRoot: () => projectRoot, + getDataDir: () => path.join(projectRoot, 'data'), + log, + }); + + await runCommand(program, ['agents:logs', 'WorkerCookedTail', '--lines', '2']); + + expect(log).toHaveBeenCalledWith('line one\nline two'); + } finally { + fs.rmSync(projectRoot, { recursive: true, force: true }); + } + }); + it('spawns an agent using AgentRelayClient and exits 0', async () => { const client = createClientMock({ listAgents: vi.fn(async () => [{ name: 'WorkerA', pid: 4321 }]), @@ -339,8 +398,8 @@ describe('registerAgentManagementCommands', () => { const output = (deps.log as unknown as { mock: { calls: unknown[][] } }).mock.calls .map((call) => call.join(' ')) .join('\n'); - expect(output).toContain('Logs for WorkerLogs'); expect(output).toContain('line-2'); expect(output).toContain('line-3'); + expect(output).not.toContain('Logs for WorkerLogs'); }); }); diff --git a/src/cli/commands/agent-management.ts b/src/cli/commands/agent-management.ts index 199a142fc..802a22817 100644 --- a/src/cli/commands/agent-management.ts +++ b/src/cli/commands/agent-management.ts @@ -59,16 +59,24 @@ export interface AgentManagementDependencies { maxBytes: number, encoding?: BufferEncoding ) => { text: string; size: number }; + readFileBuffer?: (filePath: string) => Buffer; + readFileTailBuffer?: (filePath: string, maxBytes: number) => { buffer: Buffer; size: number }; readFileFrom: ( filePath: string, offset: number, maxBytes: number, encoding?: BufferEncoding ) => { text: string; size: number }; + readFileFromBuffer?: ( + filePath: string, + offset: number, + maxBytes: number + ) => { buffer: Buffer; size: number }; fetch: (url: string, init?: RequestInit) => Promise; nowIso: () => string; killProcess: (pid: number, signal?: NodeJS.Signals | number) => void; sleep: (ms: number) => Promise; + writeChunk: (chunk: string | Uint8Array) => void; log: (...args: unknown[]) => void; error: (...args: unknown[]) => void; exit: ExitFn; @@ -206,11 +214,11 @@ async function waitForBrokerClient( function withDefaults(overrides: Partial = {}): AgentManagementDependencies { const readFileTail = (filePath: string, maxBytes: number, encoding: BufferEncoding = 'utf-8') => { - const stats = fs.statSync(filePath); - const start = Math.max(0, stats.size - maxBytes); - const length = stats.size - start; const fd = fs.openSync(filePath, 'r'); try { + const stats = fs.fstatSync(fd); + const start = Math.max(0, stats.size - maxBytes); + const length = stats.size - start; const buffer = Buffer.alloc(length); fs.readSync(fd, buffer, 0, length, start); return { text: buffer.toString(encoding), size: stats.size }; @@ -218,19 +226,32 @@ function withDefaults(overrides: Partial = {}): Age fs.closeSync(fd); } }; + const readFileTailBuffer = (filePath: string, maxBytes: number) => { + const fd = fs.openSync(filePath, 'r'); + try { + const stats = fs.fstatSync(fd); + const start = Math.max(0, stats.size - maxBytes); + const length = stats.size - start; + const buffer = Buffer.alloc(length); + fs.readSync(fd, buffer, 0, length, start); + return { buffer, size: stats.size }; + } finally { + fs.closeSync(fd); + } + }; const readFileFrom = ( filePath: string, offset: number, maxBytes: number, encoding: BufferEncoding = 'utf-8' ) => { - const stats = fs.statSync(filePath); - if (stats.size <= offset) { - return { text: '', size: stats.size }; - } - const length = Math.min(maxBytes, stats.size - offset); const fd = fs.openSync(filePath, 'r'); try { + const stats = fs.fstatSync(fd); + if (stats.size <= offset) { + return { text: '', size: stats.size }; + } + const length = Math.min(maxBytes, stats.size - offset); const buffer = Buffer.alloc(length); fs.readSync(fd, buffer, 0, length, offset); return { text: buffer.toString(encoding), size: offset + length }; @@ -238,6 +259,21 @@ function withDefaults(overrides: Partial = {}): Age fs.closeSync(fd); } }; + const readFileFromBuffer = (filePath: string, offset: number, maxBytes: number) => { + const fd = fs.openSync(filePath, 'r'); + try { + const stats = fs.fstatSync(fd); + if (stats.size <= offset) { + return { buffer: Buffer.alloc(0), size: stats.size }; + } + const length = Math.min(maxBytes, stats.size - offset); + const buffer = Buffer.alloc(length); + fs.readSync(fd, buffer, 0, length, offset); + return { buffer, size: offset + length }; + } finally { + fs.closeSync(fd); + } + }; return { getProjectRoot: () => getProjectPaths().projectRoot, @@ -248,12 +284,18 @@ function withDefaults(overrides: Partial = {}): Age readTaskFromStdin, fileExists: fs.existsSync, readFile: (filePath, encoding = 'utf-8') => fs.readFileSync(filePath, encoding), + readFileBuffer: (filePath) => fs.readFileSync(filePath), readFileTail, + readFileTailBuffer, readFileFrom, + readFileFromBuffer, fetch: (url, init) => fetch(url, init), nowIso: () => new Date().toISOString(), killProcess: process.kill, sleep: (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)), + writeChunk: (chunk: string | Uint8Array) => { + process.stdout.write(chunk); + }, log: (...args: unknown[]) => console.log(...args), error: (...args: unknown[]) => console.error(...args), exit: defaultExit, @@ -451,12 +493,13 @@ export function registerAgentManagementCommands( .argument('', 'Agent name') .option('-n, --lines ', 'Number of lines to show', '50') .option('-f, --follow', 'Follow log output (like tail -f)') - .option('--plain', 'ANSI-stripped, deduped, line-oriented (greppable)') - .option('--json', 'Structured JSON: { agent, file, lines[] } (sanitized; snapshot only)') + .option('--plain', 'Line-oriented cooked output (default; kept for compatibility)') + .option('--raw', 'Emit original PTY stream with ANSI/control sequences') + .option('--json', 'Structured JSON: { agent, file, lines[] } (cooked; snapshot only)') .action( async ( name: string, - options: { lines?: string; follow?: boolean; plain?: boolean; json?: boolean } + options: { lines?: string; follow?: boolean; plain?: boolean; raw?: boolean; json?: boolean } ) => { await runAgentsLogsCommand(name, options, deps); } diff --git a/src/cli/commands/doctor.test.ts b/src/cli/commands/doctor.test.ts index efe53e19b..97fffb2b7 100644 --- a/src/cli/commands/doctor.test.ts +++ b/src/cli/commands/doctor.test.ts @@ -7,6 +7,12 @@ let tempRoot: string; let dataDir: string; let mockStore: Map; +const sdkMock = vi.hoisted(() => ({ + connect: vi.fn(), + getStatus: vi.fn(), + shutdown: vi.fn(), +})); + // Store availability in an object to ensure closure works correctly across module resets const mockAvailability = { betterAvailable: true, nodeAvailable: true }; @@ -21,6 +27,12 @@ vi.mock('@agent-relay/config', () => ({ }), })); +vi.mock('@agent-relay/sdk', () => ({ + AgentRelayClient: { + connect: sdkMock.connect, + }, +})); + // doctor.ts now reads AGENT_RELAY_STORAGE_TYPE and AGENT_RELAY_STORAGE_PATH // env vars directly instead of importing getStorageConfigFromEnv @@ -44,8 +56,7 @@ vi.mock('better-sqlite3', () => { } if (sql.includes('SELECT value FROM doctor_diagnostics')) { return { - get: (key: string) => - this.store.has(key) ? { value: this.store.get(key) } : undefined, + get: (key: string) => (this.store.has(key) ? { value: this.store.get(key) } : undefined), }; } if (sql.includes('DELETE FROM doctor_diagnostics')) { @@ -84,41 +95,40 @@ vi.mock('node:sqlite', () => { } } - exec(_sql: string) { - // no-op - } - - prepare(sql: string) { - if (sql.includes('INSERT OR REPLACE INTO doctor_diagnostics')) { - return { - run: (key: string, value: string) => { - this.store.set(key, value); - }, - }; - } - if (sql.includes('SELECT value FROM doctor_diagnostics')) { - return { - get: (key: string) => - this.store.has(key) ? { value: this.store.get(key) } : undefined, - }; - } - if (sql.includes('DELETE FROM doctor_diagnostics')) { - return { - run: (key: string) => { - this.store.delete(key); - }, - }; - } - return { - run: () => {}, - get: () => ({ result: 1 }), - }; - } - - close() { - // no-op - } + exec(_sql: string) { + // no-op + } + + prepare(sql: string) { + if (sql.includes('INSERT OR REPLACE INTO doctor_diagnostics')) { + return { + run: (key: string, value: string) => { + this.store.set(key, value); + }, + }; + } + if (sql.includes('SELECT value FROM doctor_diagnostics')) { + return { + get: (key: string) => (this.store.has(key) ? { value: this.store.get(key) } : undefined), + }; + } + if (sql.includes('DELETE FROM doctor_diagnostics')) { + return { + run: (key: string) => { + this.store.delete(key); + }, + }; } + return { + run: () => {}, + get: () => ({ result: 1 }), + }; + } + + close() { + // no-op + } + } return { DatabaseSync: MockNodeSqlite }; }); @@ -151,6 +161,18 @@ beforeEach(() => { process.env.AGENT_RELAY_DOCTOR_NODE_VERSION = '22.1.0'; delete process.env.AGENT_RELAY_DOCTOR_FORCE_NODE_SQLITE; delete process.env.AGENT_RELAY_DOCTOR_NODE_SQLITE_AVAILABLE; + sdkMock.getStatus.mockReset(); + sdkMock.shutdown.mockReset(); + sdkMock.connect.mockReset(); + sdkMock.getStatus.mockResolvedValue({ + auth: { authenticated: false, workspace_count: 0 }, + pending_deliveries: [], + }); + sdkMock.shutdown.mockResolvedValue(undefined); + sdkMock.connect.mockReturnValue({ + getStatus: sdkMock.getStatus, + shutdown: sdkMock.shutdown, + }); vi.resetModules(); }); @@ -284,4 +306,18 @@ describe('doctor diagnostics', () => { expect(output).toContain('unreadable or unwritable'); expect(process.exitCode).toBe(1); }); + + it('shuts down the broker client when status lookup throws', async () => { + process.env.AGENT_RELAY_DOCTOR_FORCE_NODE_SQLITE = '1'; + process.env.AGENT_RELAY_DOCTOR_FORCE_BETTER_SQLITE3 = '1'; + sdkMock.getStatus.mockRejectedValueOnce(new Error('broker unavailable')); + const { logs, restore } = collectLogs(); + const { runDoctor } = await loadDoctor(); + + await runDoctor(); + + restore(); + expect(logs.join('\n')).toContain('Skipped (broker unavailable: broker unavailable)'); + expect(sdkMock.shutdown).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/cli/lib/agent-management-listing.test.ts b/src/cli/lib/agent-management-listing.test.ts index 72eac8b94..e5adef7af 100644 --- a/src/cli/lib/agent-management-listing.test.ts +++ b/src/cli/lib/agent-management-listing.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it, vi } from 'vitest'; import { + PtyLogCooker, runAgentsCommand, runAgentsLogsCommand, runWhoCommand, @@ -32,6 +33,7 @@ function createDeps(options?: { : undefined; const shutdown = vi.fn(async () => undefined); const log = vi.fn(() => undefined); + const writeChunk = vi.fn(() => undefined); const error = vi.fn(() => undefined); const exit = vi.fn((code: number) => { throw new Error(`exit:${code}`); @@ -51,6 +53,7 @@ function createDeps(options?: { throw new Error('not implemented'); }), nowIso: vi.fn(() => options?.nowIso ?? '2026-03-04T00:00:00.000Z'), + writeChunk, log, error, exit, @@ -106,6 +109,9 @@ describe('agent-management-listing JSON output', () => { pid: 4321, uptimeSecs: 421, memoryBytes: 1048576, + lastActivity: null, + contextBudgetPct: null, + currentState: 'working', }, ]); }); @@ -125,6 +131,9 @@ describe('agent-management-listing JSON output', () => { pid: 99, uptimeSecs: null, memoryBytes: null, + lastActivity: null, + contextBudgetPct: null, + currentState: 'working', }, ]); }); @@ -145,6 +154,9 @@ describe('agent-management-listing JSON output', () => { pid: 99, uptimeSecs: null, memoryBytes: null, + lastActivity: null, + contextBudgetPct: null, + currentState: 'working', }, ]); }); @@ -165,6 +177,9 @@ describe('agent-management-listing JSON output', () => { pid: 99, uptimeSecs: null, memoryBytes: null, + lastActivity: null, + contextBudgetPct: null, + currentState: 'working', }, ]); }); @@ -265,6 +280,50 @@ describe('toPlainLogLines', () => { it('preserves a single genuine blank line but collapses runs', () => { expect(toPlainLogLines('a\n\n\n b ')).toEqual(['a', '', ' b']); }); + + it('replays cursor-position redraws instead of concatenating repaint fragments', () => { + const ESC = '\u001b'; + const raw = [ + `${ESC}[10;3HSt${ESC}[10;4Hta${ESC}[10;5Har${ESC}[10;6Hrt${ESC}[10;7Hti${ESC}[10;8Hin${ESC}[10;9Hng MCP servers`, + `${ESC}[11;1H•${ESC}[11;3HW${ESC}[11;3HWo${ESC}[11;4Hor${ESC}[11;5Hrk${ESC}[11;6Hki${ESC}[11;7Hin${ESC}[11;8Hng`, + `${ESC}[14;1H•${ESC}[14;3HCalling${ESC}[15;3H└ relaycast.agent.remove(...)`, + ].join(''); + + const cooked = toPlainLogLines(raw).join('\n'); + + expect(cooked).toContain('Starting MCP servers'); + expect(cooked).toContain('• Working'); + expect(cooked).toContain('Calling'); + expect(cooked).toContain('└ relaycast.agent.remove(...)'); + expect(cooked).not.toContain('Sttaarrtti'); + expect(cooked).not.toContain('WWoor'); + }); + + it('drops a leading partial CSI suffix from byte-tailed snapshots', () => { + const ESC = '\u001b'; + + expect(toPlainLogLines(`2H${ESC}[1;1Hready`)).toEqual(['ready']); + }); + + it('carries split CSI and UTF-8 sequences across streamed pushes', () => { + const ESC = '\u001b'; + const euro = Buffer.from('€', 'utf-8'); + const chunks = [ + Buffer.from(`busy\r${ESC}[`, 'utf-8'), + Buffer.concat([Buffer.from('Kdone ', 'utf-8'), euro.subarray(0, 1)]), + Buffer.concat([euro.subarray(1), Buffer.from('\n', 'utf-8')]), + ]; + const full = Buffer.concat(chunks); + + const streamed = new PtyLogCooker(); + const streamedLines = chunks.flatMap((chunk) => streamed.push(chunk)).concat(streamed.finish()); + + const oneShot = new PtyLogCooker(); + const oneShotLines = oneShot.push(full).concat(oneShot.finish()); + + expect(streamedLines).toEqual(['done €']); + expect(streamedLines).toEqual(oneShotLines); + }); }); describe('runAgentsLogsCommand --plain / --json', () => { @@ -280,6 +339,7 @@ describe('runAgentsLogsCommand --plain / --json', () => { function logsDeps() { const log = vi.fn(() => undefined); + const writeChunk = vi.fn(() => undefined); const error = vi.fn(() => undefined); const exit = vi.fn((code: number) => { throw new Error(`exit:${code}`); @@ -297,11 +357,12 @@ describe('runAgentsLogsCommand --plain / --json', () => { throw new Error('not implemented'); }), nowIso: vi.fn(() => '2026-03-04T00:00:00.000Z'), + writeChunk, log, error, exit, }; - return { deps, log, error }; + return { deps, log, writeChunk, error }; } it('--plain emits sanitized, deduped lines with no decorative header', async () => { @@ -328,17 +389,43 @@ describe('runAgentsLogsCommand --plain / --json', () => { expect(log.mock.calls[0][0] as string).not.toContain(ESC); }); - it('default (no flags) sanitizes the header and content', async () => { + it('default (no flags) emits cooked lines without a decorative header', async () => { const { deps, log } = logsDeps(); await runAgentsLogsCommand('WorkerA', {}, deps); const joined = log.mock.calls.map((c) => c[0] as string).join('\n'); - expect(joined).toContain('Logs for WorkerA'); + expect(joined).not.toContain('Logs for WorkerA'); expect(joined).toContain('line one'); expect(joined).not.toContain(ESC); }); + it('--raw emits the unmodified PTY stream through stdout', async () => { + const { deps, log, writeChunk } = logsDeps(); + + await runAgentsLogsCommand('WorkerA', { raw: true }, deps); + + expect(writeChunk).toHaveBeenCalledTimes(1); + expect(writeChunk).toHaveBeenCalledWith(Buffer.from(rawLog, 'utf-8')); + expect(log).not.toHaveBeenCalled(); + }); + + it('--raw emits non-UTF-8 and control bytes byte-identically', async () => { + const { deps, writeChunk } = logsDeps(); + const rawBytes = Buffer.from([0x6f, 0x6b, 0x0a, 0x1b, 0x5b, 0x4b, 0xff, 0x80, 0x00, 0x41]); + deps.readFile = vi.fn(() => { + throw new Error('raw path must not decode the log as UTF-8'); + }); + deps.readFileTailBuffer = vi.fn(() => ({ buffer: rawBytes, size: rawBytes.length })); + + await runAgentsLogsCommand('WorkerA', { raw: true }, deps); + + expect(writeChunk).toHaveBeenCalledTimes(1); + const emitted = writeChunk.mock.calls[0][0]; + expect(Buffer.isBuffer(emitted)).toBe(true); + expect(Buffer.compare(emitted as Buffer, rawBytes)).toBe(0); + }); + it('rejects path traversal agent names before probing or reading files', async () => { const { deps, error } = logsDeps(); @@ -408,6 +495,7 @@ describe('runAgentsLogsCommand --plain / --json', () => { throw new Error('not implemented'); }), nowIso: vi.fn(() => '2026-03-04T00:00:00.000Z'), + writeChunk: vi.fn(() => undefined), log, error, exit, @@ -443,6 +531,7 @@ describe('runAgentsLogsCommand --plain / --json', () => { throw new Error('not implemented'); }), nowIso: vi.fn(() => '2026-03-04T00:00:00.000Z'), + writeChunk: vi.fn(() => undefined), log, error, exit, diff --git a/src/cli/lib/agent-management-listing.ts b/src/cli/lib/agent-management-listing.ts index e5c9d1d5f..eb8444647 100644 --- a/src/cli/lib/agent-management-listing.ts +++ b/src/cli/lib/agent-management-listing.ts @@ -1,4 +1,5 @@ import path from 'node:path'; +import { TextDecoder } from 'node:util'; import { formatTableRow, @@ -17,6 +18,9 @@ export interface ListingWorkerInfo { model?: string; team?: string; pid?: number; + last_activity_at?: string; + context_budget_pct?: number | null; + current_state?: 'working' | 'idle' | 'blocked_on_send'; } interface CloudConfig { @@ -74,14 +78,22 @@ export interface AgentManagementListingDependencies { maxBytes: number, encoding?: BufferEncoding ) => { text: string; size: number }; + readFileBuffer?: (filePath: string) => Buffer; + readFileTailBuffer?: (filePath: string, maxBytes: number) => { buffer: Buffer; size: number }; readFileFrom?: ( filePath: string, offset: number, maxBytes: number, encoding?: BufferEncoding ) => { text: string; size: number }; + readFileFromBuffer?: ( + filePath: string, + offset: number, + maxBytes: number + ) => { buffer: Buffer; size: number }; fetch: (url: string, init?: RequestInit) => Promise; nowIso: () => string; + writeChunk: (chunk: string | Uint8Array) => void; log: (...args: unknown[]) => void; error: (...args: unknown[]) => void; exit: ExitFn; @@ -143,14 +155,6 @@ function getTailByteLimit(lineCount: number): number { return Math.min(MAX_LOG_TAIL_BYTES, Math.max(MIN_LOG_TAIL_BYTES, lineCount * 4096)); } -function tailLinesFromText(text: string, lineCount: number): string[] { - const lines = text.length > 0 ? text.split('\n') : []; - if (lines.length > 0 && lines[lines.length - 1] === '') { - lines.pop(); - } - return lines.slice(-lineCount); -} - function readLogTail( deps: AgentManagementListingDependencies, filePath: string, @@ -164,6 +168,24 @@ function readLogTail( return { text, size: Buffer.byteLength(text, 'utf-8') }; } +function readLogTailBuffer( + deps: AgentManagementListingDependencies, + filePath: string, + lineCount: number +): { buffer: Buffer; size: number } { + const maxBytes = getTailByteLimit(lineCount); + if (deps.readFileTailBuffer) { + return deps.readFileTailBuffer(filePath, maxBytes); + } + if (deps.readFileBuffer) { + const buffer = deps.readFileBuffer(filePath); + return { buffer: buffer.subarray(Math.max(0, buffer.length - maxBytes)), size: buffer.length }; + } + const text = deps.readFile(filePath, 'utf-8'); + const buffer = Buffer.from(text, 'utf-8'); + return { buffer: buffer.subarray(Math.max(0, buffer.length - maxBytes)), size: buffer.length }; +} + function readLogFrom( deps: AgentManagementListingDependencies, filePath: string, @@ -176,6 +198,24 @@ function readLogFrom( return { text: text.slice(offset), size: text.length }; } +function readLogFromBuffer( + deps: AgentManagementListingDependencies, + filePath: string, + offset: number +): { buffer: Buffer; size: number } { + if (deps.readFileFromBuffer) { + return deps.readFileFromBuffer(filePath, offset, MAX_LOG_FOLLOW_BYTES); + } + if (deps.readFileBuffer) { + const buffer = deps.readFileBuffer(filePath); + const start = Math.max(0, Math.min(offset, buffer.length)); + const end = Math.min(buffer.length, start + MAX_LOG_FOLLOW_BYTES); + return { buffer: buffer.subarray(start, end), size: end }; + } + const current = readLogFrom(deps, filePath, offset); + return { buffer: Buffer.from(current.text, 'utf-8'), size: current.size }; +} + function readCloudConfig(deps: AgentManagementListingDependencies): CloudConfig | undefined { const configPath = path.join(deps.getDataDir(), 'cloud-config.json'); if (!deps.fileExists(configPath)) { @@ -390,6 +430,9 @@ export async function runWhoCommand( pid: m?.pid ?? agent.pid ?? null, uptimeSecs: typeof m?.uptime_secs === 'number' ? m.uptime_secs : null, memoryBytes: typeof m?.memory_bytes === 'number' ? m.memory_bytes : null, + lastActivity: agent.last_activity_at ?? null, + contextBudgetPct: typeof agent.context_budget_pct === 'number' ? agent.context_budget_pct : null, + currentState: agent.current_state ?? 'working', }; }) ) @@ -402,6 +445,9 @@ export async function runWhoCommand( pid: number | null; uptimeSecs: number | null; memoryBytes: number | null; + lastActivity: string | null; + contextBudgetPct: number | null; + currentState: 'working' | 'idle' | 'blocked_on_send'; }> ); @@ -418,28 +464,358 @@ export async function runWhoCommand( return; } - deps.log('NAME STATUS CLI PID UPTIME'); - deps.log('-----------------------------------------------------'); + deps.log('NAME STATUS STATE CLI PID UPTIME CONTEXT LAST ACTIVITY'); + deps.log('------------------------------------------------------------------------------------------'); onlineAgents.forEach((agent) => { deps.log( formatTableRow([ { value: tableCell(agent.name, 'unknown'), width: 15 }, { value: tableCell(agent.status), width: 8 }, + { value: tableCell(agent.currentState), width: 16 }, { value: tableCell(agent.cli), width: 8 }, { value: agent.pid != null ? String(agent.pid) : '-', width: 8 }, - { value: agent.uptimeSecs != null ? formatUptimeSecs(agent.uptimeSecs) : '-' }, + { value: agent.uptimeSecs != null ? formatUptimeSecs(agent.uptimeSecs) : '-', width: 11 }, + { value: agent.contextBudgetPct != null ? `${agent.contextBudgetPct}%` : '-', width: 8 }, + { value: tableCell(agent.lastActivity) }, ]) ); }); } +function parseCsiParams(sequence: string): number[] { + const numeric = sequence.replace(/[?<>=]/g, ''); + if (!numeric) return []; + return numeric.split(';').map((part) => (part === '' ? 0 : Number(part) || 0)); +} + +function trimLeadingPartialCsi(raw: string): string { + // eslint-disable-next-line no-control-regex -- scanning for an ESC/C1 CSI opener in raw PTY bytes + const firstEscape = raw.search(/[\x1b\x9b]/); + if (firstEscape <= 0 || firstEscape > 20) { + return raw; + } + const prefix = raw.slice(0, firstEscape); + if (prefix.includes('\n')) { + return raw; + } + return /^[0-?;]*[ -/]*[@-~]$/.test(prefix) ? raw.slice(firstEscape) : raw; +} + +function trailingIncompleteSequenceStart(raw: string): number | undefined { + const csiStart = raw.lastIndexOf('\x9b'); + const escapeStart = raw.lastIndexOf('\x1b'); + const start = Math.max(csiStart, escapeStart); + if (start < 0) return undefined; + if (raw.slice(start).includes('\n')) return undefined; + + if (raw[start] === '\x9b') { + for (let cursor = start + 1; cursor < raw.length; cursor += 1) { + const code = raw.charCodeAt(cursor); + if (code >= 0x40 && code <= 0x7e) return undefined; + } + return start; + } + + const next = raw[start + 1]; + if (next === undefined) return start; + if (next === '[') { + for (let cursor = start + 2; cursor < raw.length; cursor += 1) { + const code = raw.charCodeAt(cursor); + if (code >= 0x40 && code <= 0x7e) return undefined; + } + return start; + } + if (next === ']') { + for (let cursor = start + 2; cursor < raw.length; cursor += 1) { + if (raw[cursor] === '\x07') return undefined; + if (raw[cursor] === '\x1b' && raw[cursor + 1] === '\\') return undefined; + } + return start; + } + return undefined; +} + +export class PtyLogCooker { + private readonly rows = new Map(); + private readonly emitted: string[] = []; + private readonly decoder = new TextDecoder('utf-8'); + private pending = ''; + private row = 0; + private col = 0; + private previousEmitted: string | undefined; + + push(raw: string | Uint8Array): string[] { + const start = this.emitted.length; + this.replayCompleteText(this.decode(raw)); + return this.emitted.slice(start); + } + + finish(): string[] { + const start = this.emitted.length; + this.replay(this.pending + this.decoder.decode()); + this.pending = ''; + this.flushScreenRows(); + return this.emitted.slice(start); + } + + lines(): string[] { + this.finish(); + return [...this.emitted]; + } + + private decode(raw: string | Uint8Array): string { + return typeof raw === 'string' ? raw : this.decoder.decode(raw, { stream: true }); + } + + private replayCompleteText(raw: string): void { + const combined = this.pending + raw; + this.pending = ''; + const pendingStart = trailingIncompleteSequenceStart(combined); + if (pendingStart === undefined) { + this.replay(combined); + return; + } + this.pending = combined.slice(pendingStart); + this.replay(combined.slice(0, pendingStart)); + } + + private replay(raw: string): void { + for (let index = 0; index < raw.length; index += 1) { + const char = raw[index]; + + if (char === '\x1b') { + index = this.skipEscape(raw, index); + continue; + } + + if (char === '\x9b') { + index = this.readCsi(raw, index + 1); + continue; + } + + if (char === '\r') { + this.col = 0; + continue; + } + + if (char === '\n') { + this.emitRow(this.row, true); + this.rows.delete(this.row); + this.row += 1; + this.col = 0; + continue; + } + + if (char === '\b') { + this.col = Math.max(0, this.col - 1); + continue; + } + + if (char === '\t') { + const nextTab = this.col + (8 - (this.col % 8)); + while (this.col < nextTab) { + this.writeChar(' '); + } + continue; + } + + if (char === '\x07' || char < ' ' || char === '\x7f') { + continue; + } + + this.writeChar(char); + } + } + + private skipEscape(raw: string, index: number): number { + const next = raw[index + 1]; + if (next === '[') { + return this.readCsi(raw, index + 2); + } + if (next === ']') { + return this.skipOsc(raw, index + 2); + } + return Math.min(raw.length - 1, index + 1); + } + + private readCsi(raw: string, index: number): number { + let cursor = index; + while (cursor < raw.length) { + const code = raw.charCodeAt(cursor); + if (raw[cursor] === '\n' || raw[cursor] === '\r' || raw[cursor] === '\x1b') { + return cursor - 1; + } + if (code >= 0x40 && code <= 0x7e) { + this.applyCsi(raw.slice(index, cursor), raw[cursor]); + return cursor; + } + cursor += 1; + } + return raw.length - 1; + } + + private skipOsc(raw: string, index: number): number { + let cursor = index; + while (cursor < raw.length) { + if (raw[cursor] === '\n' || raw[cursor] === '\r') { + return cursor - 1; + } + if (raw[cursor] === '\x07') { + return cursor; + } + if (raw[cursor] === '\x1b' && raw[cursor + 1] === '\\') { + return cursor + 1; + } + cursor += 1; + } + return raw.length - 1; + } + + private applyCsi(sequence: string, final: string): void { + const params = parseCsiParams(sequence); + const first = params[0] ?? 0; + + switch (final) { + case 'A': + this.row = Math.max(0, this.row - Math.max(1, first)); + break; + case 'B': + this.row += Math.max(1, first); + break; + case 'C': + this.col += Math.max(1, first); + break; + case 'D': + this.col = Math.max(0, this.col - Math.max(1, first)); + break; + case 'E': + this.row += Math.max(1, first); + this.col = 0; + break; + case 'F': + this.row = Math.max(0, this.row - Math.max(1, first)); + this.col = 0; + break; + case 'G': + this.col = Math.max(0, Math.max(1, first) - 1); + break; + case 'H': + case 'f': + this.row = Math.max(0, Math.max(1, first || 1) - 1); + this.col = Math.max(0, Math.max(1, params[1] ?? 1) - 1); + break; + case 'J': + this.flushScreenRows(); + this.clearScreen(first); + break; + case 'K': + this.clearLine(first); + break; + default: + break; + } + } + + private currentRow(): string[] { + const row = this.rows.get(this.row) ?? []; + this.rows.set(this.row, row); + return row; + } + + private writeChar(char: string): void { + const row = this.currentRow(); + while (row.length < this.col) { + row.push(' '); + } + row[this.col] = char; + this.col += 1; + } + + private clearLine(mode: number): void { + const row = this.currentRow(); + if (mode === 1) { + for (let index = 0; index <= this.col; index += 1) { + row[index] = ' '; + } + return; + } + if (mode === 2) { + this.rows.set(this.row, []); + return; + } + row.length = this.col; + } + + private clearScreen(mode: number): void { + if (mode === 1) { + for (const row of [...this.rows.keys()].filter((key) => key <= this.row)) { + this.rows.delete(row); + } + return; + } + if (mode === 0) { + for (const row of [...this.rows.keys()].filter((key) => key >= this.row)) { + this.rows.delete(row); + } + return; + } + this.rows.clear(); + } + + private flushScreenRows(): void { + for (const row of [...this.rows.keys()].sort((a, b) => a - b)) { + this.emitRow(row); + } + } + + private emitRow(rowNumber: number, preserveBlank = false): void { + const raw = this.rows.get(rowNumber)?.join('') ?? ''; + const clean = sanitizeForTerminal(raw).replace(/\s+$/, ''); + if ( + (clean === '' && (!preserveBlank || this.previousEmitted === undefined)) || + clean === this.previousEmitted + ) { + return; + } + this.emitted.push(clean); + this.previousEmitted = clean; + } +} + /** - * Convert a raw PTY/TTY log capture into greppable, line-oriented plain text: - * strip ANSI/cursor/control escapes, drop lines that were pure escape noise, - * and collapse consecutive identical lines (spinner/redraw frames like - * `⠙ Working(18m 07s)` re-printed every tick). + * Convert a raw PTY/TTY log capture into greppable, line-oriented plain text by + * replaying the small ANSI/VT subset used for redraws, then emitting rendered + * rows with consecutive duplicates collapsed. */ export function toPlainLogLines(raw: string): string[] { + const cooked = new PtyLogCooker(); + cooked.push(trimLeadingPartialCsi(raw)); + return cooked.lines(); +} + +function emitCookedLines(lines: string[], deps: AgentManagementListingDependencies): void { + if (lines.length > 0) { + deps.log(lines.join('\n')); + } +} + +function createCookedLogStreamer(initialText: string): { + push: (raw: string | Uint8Array) => string[]; + reset: () => void; +} { + let cooker = new PtyLogCooker(); + cooker.push(trimLeadingPartialCsi(initialText)); + cooker.finish(); + + return { + push: (raw: string | Uint8Array) => cooker.push(raw), + reset: () => { + cooker = new PtyLogCooker(); + }, + }; +} + +function legacySanitizedLines(raw: string): string[] { const out: string[] = []; let prev: string | undefined; // Drop the single file-terminating newline (its trailing '' element) so a @@ -461,7 +837,7 @@ export function toPlainLogLines(raw: string): string[] { export async function runAgentsLogsCommand( name: string, - options: { lines?: string; follow?: boolean; plain?: boolean; json?: boolean }, + options: { lines?: string; follow?: boolean; plain?: boolean; raw?: boolean; json?: boolean }, deps: AgentManagementListingDependencies ): Promise { if (!isSafeLogAgentName(name)) { @@ -488,36 +864,67 @@ export async function runAgentsLogsCommand( try { const lineCount = parseLogLineCount(options.lines); + + if (options.raw && options.json) { + throw new Error('--raw cannot be combined with --json'); + } + + if (options.raw) { + const snapshot = readLogTailBuffer(deps, logFile, lineCount); + deps.writeChunk(snapshot.buffer); + + if (options.follow) { + let lastSize = snapshot.size; + + await new Promise(() => { + const interval = setInterval(() => { + try { + if (!deps.fileExists(logFile)) { + return; + } + const current = readLogFromBuffer(deps, logFile, lastSize); + if (current.size > lastSize) { + lastSize = current.size; + deps.writeChunk(current.buffer); + } else if (current.size < lastSize) { + lastSize = 0; + } + } catch { + // Ignore read errors during follow, file may be temporarily unavailable + } + }, 500); + + if (typeof process !== 'undefined') { + process.on('SIGINT', () => { + clearInterval(interval); + process.exit(0); + }); + } + }); + } + return; + } + const snapshot = readLogTail(deps, logFile, lineCount); const text = snapshot.text; - // --json: machine-readable snapshot of sanitized, line-oriented output. - // (Snapshot only — combine with the SDK event stream for live tailing.) if (options.json) { + // --json: machine-readable snapshot of cooked, line-oriented output. + // (Snapshot only — combine with the SDK event stream for live tailing.) const plainLines = toPlainLogLines(text).slice(-lineCount); deps.log(JSON.stringify({ agent: name, file: logFile, lines: plainLines }, null, 2)); return; - } - - // --plain: ANSI-stripped, deduped, greppable. No decorative header so the - // output is pure log content for piping into grep/awk. - if (options.plain) { - const plainLines = toPlainLogLines(text).slice(-lineCount); - deps.log(plainLines.join('\n')); } else { - const tail = tailLinesFromText(text, lineCount).map(sanitizeForTerminal).join('\n'); - - deps.log(`Logs for ${sanitizeForTerminalLine(name)} (last ${lineCount} lines):`); - deps.log('─'.repeat(50)); - deps.log(tail || '(empty)'); + // Default and --plain are both cooked and headerless so the command is + // safe to pipe into grep/awk/jq-adjacent tooling. + const plainLines = toPlainLogLines(text); + const cookedLines = plainLines.length > 0 ? plainLines : legacySanitizedLines(text); + emitCookedLines(cookedLines.slice(-lineCount), deps); } if (options.follow) { let lastSize = snapshot.size; - let remainder = ''; - let prevStreamLine: string | undefined = options.plain - ? toPlainLogLines(text).slice(-lineCount).at(-1) - : undefined; + const streamer = createCookedLogStreamer(text); // Poll the log file for new content every 500ms await new Promise(() => { @@ -526,29 +933,14 @@ export async function runAgentsLogsCommand( if (!deps.fileExists(logFile)) { return; } - const current = readLogFrom(deps, logFile, lastSize); + const current = readLogFromBuffer(deps, logFile, lastSize); if (current.size > lastSize) { - const newContent = remainder + current.text; lastSize = current.size; - const newLines = newContent.split('\n'); - // Keep the last element as remainder (may be incomplete line) - remainder = newLines.pop() ?? ''; - for (const line of newLines) { - if (options.plain) { - const clean = sanitizeForTerminal(line).replace(/\s+$/, ''); - if (clean === '' && line.trim() !== '') continue; - if (clean === prevStreamLine) continue; - prevStreamLine = clean; - deps.log(clean); - } else { - deps.log(sanitizeForTerminal(line)); - } - } + emitCookedLines(streamer.push(current.buffer), deps); } else if (current.size < lastSize) { // File was truncated/rotated, reset lastSize = 0; - remainder = ''; - prevStreamLine = undefined; + streamer.reset(); } } catch { // Ignore read errors during follow, file may be temporarily unavailable diff --git a/src/cli/lib/doctor.ts b/src/cli/lib/doctor.ts index ad5b1eab9..27946677e 100644 --- a/src/cli/lib/doctor.ts +++ b/src/cli/lib/doctor.ts @@ -2,6 +2,7 @@ import fs from 'node:fs'; import path from 'node:path'; import { createRequire } from 'node:module'; import { getProjectPaths } from '@agent-relay/config'; +import { AgentRelayClient, type BrokerStatus } from '@agent-relay/sdk'; type SqliteDriver = 'better-sqlite3' | 'node'; @@ -36,6 +37,56 @@ interface DiagnosticDb { close?: () => void; } +async function checkBrokerReliability(): Promise { + let client: AgentRelayClient | undefined; + try { + client = AgentRelayClient.connect({ cwd: process.cwd() }); + const status = await client.getStatus(); + const typedStatus = status as BrokerStatus; + const auth = typedStatus.auth; + const authMessage = auth?.authenticated + ? `Authenticated (${auth.workspace_count} workspace${auth.workspace_count === 1 ? '' : 's'})` + : 'No authenticated Relaycast workspace reported by broker'; + const pending = typedStatus.pending_deliveries ?? []; + const stuck = pending.filter((delivery) => (delivery.age_ms ?? 0) >= 10_000 || delivery.last_error); + return [ + { + name: 'Broker auth', + ok: true, + message: authMessage, + }, + { + name: 'Outbound queues', + ok: stuck.length === 0, + message: + pending.length === 0 + ? 'No pending deliveries' + : `${pending.length} pending deliver${pending.length === 1 ? 'y' : 'ies'}, ${stuck.length} stuck`, + remediation: + stuck.length > 0 + ? 'Check agent-relay who --json for blocked_on_send agents and inspect pending_deliveries in /api/status.' + : undefined, + }, + ]; + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + return [ + { + name: 'Broker auth', + ok: true, + message: `Skipped (broker unavailable: ${message})`, + }, + { + name: 'Outbound queues', + ok: true, + message: 'Skipped (broker unavailable)', + }, + ]; + } finally { + await client?.shutdown().catch(() => undefined); + } +} + const require = createRequire(import.meta.url); function formatBytes(bytes: number): string { @@ -97,7 +148,9 @@ async function checkBetterSqlite3(): Promise { const require = createRequire(import.meta.url); const pkg = require('better-sqlite3/package.json'); version = pkg.version ?? 'unknown'; - } catch { /* ignore */ } + } catch { + /* ignore */ + } return { name: 'better-sqlite3', ok: true, @@ -357,7 +410,9 @@ async function checkWriteTest( created_at INTEGER ); `); - const insert = db.prepare('INSERT OR REPLACE INTO doctor_diagnostics (key, value, created_at) VALUES (?, ?, ?)'); + const insert = db.prepare( + 'INSERT OR REPLACE INTO doctor_diagnostics (key, value, created_at) VALUES (?, ?, ?)' + ); insert.run(key, value, Date.now()); db.close?.(); return { @@ -463,7 +518,11 @@ function readInstallationStatus(dataDir: string): InstallationStatus { } try { - const lines = fs.readFileSync(statusPath, 'utf-8').split('\n').map((l) => l.trim()).filter(Boolean); + const lines = fs + .readFileSync(statusPath, 'utf-8') + .split('\n') + .map((l) => l.trim()) + .filter(Boolean); const map: Record = {}; for (const line of lines) { const idx = line.indexOf(':'); @@ -547,6 +606,7 @@ export async function runDoctor(): Promise { results.push(writeResult); const readResult = await checkReadTest(storageType, dbPath, availability, testKey, testValue); results.push(readResult); + results.push(...(await checkBrokerReliability())); printHeader(); printInstallationStatus(installationStatus); diff --git a/src/cli/relaycast-mcp.startup.test.ts b/src/cli/relaycast-mcp.startup.test.ts index ac61c9b18..573bc3c99 100644 --- a/src/cli/relaycast-mcp.startup.test.ts +++ b/src/cli/relaycast-mcp.startup.test.ts @@ -240,7 +240,7 @@ describe('relaycast-mcp startup helpers', () => { apiKey: 'rk_live_env', baseUrl: 'https://api.relaycast.dev///', agentToken: 'at_live_env', - agentName: '', + agentName: 'FallbackClaw', agentType: 'human', strictAgentName: true, }); diff --git a/src/cli/relaycast-mcp.test.ts b/src/cli/relaycast-mcp.test.ts index be796e9b5..8df80e87b 100644 --- a/src/cli/relaycast-mcp.test.ts +++ b/src/cli/relaycast-mcp.test.ts @@ -1,6 +1,6 @@ import { describe, expect, it, vi } from 'vitest'; -import { registerAgentWithRebind } from './relaycast-mcp.js'; +import { optionsFromEnv, registerAgentWithRebind } from './relaycast-mcp.js'; describe('registerAgentWithRebind', () => { it('reuses the pre-registered strict token without re-registering', async () => { @@ -86,3 +86,60 @@ describe('registerAgentWithRebind', () => { }); }); }); + +describe('optionsFromEnv', () => { + it('auto-selects an orchestrator identity when a workspace key is configured', () => { + const previous = { + apiKey: process.env.RELAY_API_KEY, + agentName: process.env.RELAY_AGENT_NAME, + clawName: process.env.RELAY_CLAW_NAME, + }; + process.env.RELAY_API_KEY = 'rk_live_test'; + delete process.env.RELAY_AGENT_NAME; + delete process.env.RELAY_CLAW_NAME; + + try { + expect(optionsFromEnv()).toMatchObject({ + apiKey: 'rk_live_test', + agentName: 'orchestrator', + }); + } finally { + if (previous.apiKey === undefined) delete process.env.RELAY_API_KEY; + else process.env.RELAY_API_KEY = previous.apiKey; + if (previous.agentName === undefined) delete process.env.RELAY_AGENT_NAME; + else process.env.RELAY_AGENT_NAME = previous.agentName; + if (previous.clawName === undefined) delete process.env.RELAY_CLAW_NAME; + else process.env.RELAY_CLAW_NAME = previous.clawName; + } + }); + + it('ignores unresolved template environment placeholders', () => { + const previous = { + apiKey: process.env.RELAY_API_KEY, + agentName: process.env.RELAY_AGENT_NAME, + clawName: process.env.RELAY_CLAW_NAME, + agentToken: process.env.RELAY_AGENT_TOKEN, + }; + process.env.RELAY_API_KEY = '${RELAY_API_KEY}'; + process.env.RELAY_AGENT_NAME = '${RELAY_AGENT_NAME}'; + process.env.RELAY_CLAW_NAME = 'ClawFallback'; + process.env.RELAY_AGENT_TOKEN = '${RELAY_AGENT_TOKEN}'; + + try { + expect(optionsFromEnv()).toMatchObject({ + apiKey: undefined, + agentName: 'ClawFallback', + agentToken: undefined, + }); + } finally { + if (previous.apiKey === undefined) delete process.env.RELAY_API_KEY; + else process.env.RELAY_API_KEY = previous.apiKey; + if (previous.agentName === undefined) delete process.env.RELAY_AGENT_NAME; + else process.env.RELAY_AGENT_NAME = previous.agentName; + if (previous.clawName === undefined) delete process.env.RELAY_CLAW_NAME; + else process.env.RELAY_CLAW_NAME = previous.clawName; + if (previous.agentToken === undefined) delete process.env.RELAY_AGENT_TOKEN; + else process.env.RELAY_AGENT_TOKEN = previous.agentToken; + } + }); +}); diff --git a/src/cli/relaycast-mcp.ts b/src/cli/relaycast-mcp.ts index 1520ab6f4..bee1d40a1 100644 --- a/src/cli/relaycast-mcp.ts +++ b/src/cli/relaycast-mcp.ts @@ -26,7 +26,7 @@ const DEFAULT_SYSTEM_PROMPT = `You are an AI agent in a collaborative workspace ## Getting Started 1. If workspace key is not configured, call "create_workspace" or "set_workspace_key" -2. Call "register" with your agent name to join the workspace +2. When RELAY_API_KEY is provided at startup, this MCP server auto-registers the session as RELAY_AGENT_NAME (or "orchestrator" by default). Otherwise call "register" with your agent name to join the workspace 3. Use "list_channels" to see available channels 4. Use "join_channel" to join channels of interest 5. Use "check_inbox" to see unread messages and mentions @@ -589,13 +589,16 @@ export async function startPatchedStdio(options: PatchedMcpServerOptions): Promi } export function optionsFromEnv(): PatchedMcpServerOptions { + const apiKey = resolveEnv('RELAY_API_KEY'); + const agentName = + resolveEnv('RELAY_AGENT_NAME') ?? resolveEnv('RELAY_CLAW_NAME') ?? (apiKey ? 'orchestrator' : undefined); return { - apiKey: process.env.RELAY_API_KEY, + apiKey, baseUrl: resolveEnv('RELAY_BASE_URL'), - agentToken: process.env.RELAY_AGENT_TOKEN, - agentName: process.env.RELAY_AGENT_NAME ?? process.env.RELAY_CLAW_NAME, - agentType: normalizeAgentType(process.env.RELAY_AGENT_TYPE), - strictAgentName: envFlagEnabled(process.env.RELAY_STRICT_AGENT_NAME), + agentToken: resolveEnv('RELAY_AGENT_TOKEN'), + agentName, + agentType: normalizeAgentType(resolveEnv('RELAY_AGENT_TYPE')), + strictAgentName: envFlagEnabled(resolveEnv('RELAY_STRICT_AGENT_NAME')), }; }