diff --git a/.trajectories/completed/2026-05/traj_ybcrij9wg8m1.json b/.trajectories/completed/2026-05/traj_ybcrij9wg8m1.json new file mode 100644 index 000000000..521a71e82 --- /dev/null +++ b/.trajectories/completed/2026-05/traj_ybcrij9wg8m1.json @@ -0,0 +1,93 @@ +{ + "id": "traj_ybcrij9wg8m1", + "version": 1, + "task": { + "title": "Implement agent-relay view read-only stream client (#864 sub-1)", + "source": { + "system": "plain", + "id": "ENG-864" + } + }, + "status": "completed", + "startedAt": "2026-05-18T02:02:07.524Z", + "completedAt": "2026-05-18T02:05:41.120Z", + "agents": [ + { + "name": "default", + "role": "lead", + "joinedAt": "2026-05-18T02:05:33.493Z" + } + ], + "chapters": [ + { + "id": "chap_t9njxz16mexd", + "title": "Work", + "agentName": "default", + "startedAt": "2026-05-18T02:05:33.493Z", + "endedAt": "2026-05-18T02:05:41.120Z", + "events": [ + { + "ts": 1779069933494, + "type": "decision", + "content": "Reuse 'ws' npm package — already a dep: Reuse 'ws' npm package — already a dep", + "raw": { + "question": "Reuse 'ws' npm package — already a dep", + "chosen": "Reuse 'ws' npm package — already a dep", + "alternatives": [], + "reasoning": "The SDK already uses 'ws' v8.18.3 for the broker WebSocket transport. No new deps needed." + }, + "significance": "high" + }, + { + "ts": 1779069934149, + "type": "decision", + "content": "DI-style command module with onSignal-based teardown: DI-style command module with onSignal-based teardown", + "raw": { + "question": "DI-style command module with onSignal-based teardown", + "chosen": "DI-style command module with onSignal-based teardown", + "alternatives": [], + "reasoning": "Matches the project's testing.md convention; ExitSignal pattern lets us cleanly drive SIGINT in tests without actually exiting." + }, + "significance": "high" + }, + { + "ts": 1779069934808, + "type": "decision", + "content": "Pure-function chunk filter (extractMatchingChunk) exported separately: Pure-function chunk filter (extractMatchingChunk) exported separately", + "raw": { + "question": "Pure-function chunk filter (extractMatchingChunk) exported separately", + "chosen": "Pure-function chunk filter (extractMatchingChunk) exported separately", + "alternatives": [], + "reasoning": "Allows unit-testing the worker_stream filter without standing up a WebSocket or any I/O." + }, + "significance": "high" + }, + { + "ts": 1779069935286, + "type": "decision", + "content": "Skip fresh-attach snapshot rendering: Skip fresh-attach snapshot rendering", + "raw": { + "question": "Skip fresh-attach snapshot rendering", + "chosen": "Skip fresh-attach snapshot rendering", + "alternatives": [], + "reasoning": "Issue says nice-to-have; ship in follow-up. dump-pty already covers the use case manually." + }, + "significance": "high" + } + ] + } + ], + "retrospective": { + "summary": "Added agent-relay view read-only PTY stream CLI. WS-based with worker_stream filter, ANSI preserved, Ctrl+C clean exit. 24 unit tests. Reused 'ws' SDK dep.", + "approach": "Standard approach", + "confidence": 0.85 + }, + "commits": [], + "filesChanged": [], + "projectId": "/Users/will/Projects/AgentWorkforce/relay/.claude/worktrees/agent-a65b9343cd7dbadb3", + "tags": [], + "_trace": { + "startRef": "5fc8a131561feedfe990fc265f224101f6f267c4", + "endRef": "5fc8a131561feedfe990fc265f224101f6f267c4" + } +} diff --git a/.trajectories/completed/2026-05/traj_ybcrij9wg8m1.md b/.trajectories/completed/2026-05/traj_ybcrij9wg8m1.md new file mode 100644 index 000000000..26426fb69 --- /dev/null +++ b/.trajectories/completed/2026-05/traj_ybcrij9wg8m1.md @@ -0,0 +1,52 @@ +# Trajectory: Implement agent-relay view read-only stream client (#864 sub-1) + +> **Status:** ✅ Completed +> **Task:** ENG-864 +> **Confidence:** 85% +> **Started:** May 17, 2026 at 10:02 PM +> **Completed:** May 17, 2026 at 10:05 PM + +--- + +## Summary + +Added agent-relay view read-only PTY stream CLI. WS-based with worker_stream filter, ANSI preserved, Ctrl+C clean exit. 24 unit tests. Reused 'ws' SDK dep. + +**Approach:** Standard approach + +--- + +## Key Decisions + +### Reuse 'ws' npm package — already a dep + +- **Chose:** Reuse 'ws' npm package — already a dep +- **Reasoning:** The SDK already uses 'ws' v8.18.3 for the broker WebSocket transport. No new deps needed. + +### DI-style command module with onSignal-based teardown + +- **Chose:** DI-style command module with onSignal-based teardown +- **Reasoning:** Matches the project's testing.md convention; ExitSignal pattern lets us cleanly drive SIGINT in tests without actually exiting. + +### Pure-function chunk filter (extractMatchingChunk) exported separately + +- **Chose:** Pure-function chunk filter (extractMatchingChunk) exported separately +- **Reasoning:** Allows unit-testing the worker_stream filter without standing up a WebSocket or any I/O. + +### Skip fresh-attach snapshot rendering + +- **Chose:** Skip fresh-attach snapshot rendering +- **Reasoning:** Issue says nice-to-have; ship in follow-up. dump-pty already covers the use case manually. + +--- + +## Chapters + +### 1. Work + +_Agent: default_ + +- Reuse 'ws' npm package — already a dep: Reuse 'ws' npm package — already a dep +- DI-style command module with onSignal-based teardown: DI-style command module with onSignal-based teardown +- Pure-function chunk filter (extractMatchingChunk) exported separately: Pure-function chunk filter (extractMatchingChunk) exported separately +- Skip fresh-attach snapshot rendering: Skip fresh-attach snapshot rendering diff --git a/.trajectories/index.json b/.trajectories/index.json index 5c4ceb6d4..f0f1f7262 100644 --- a/.trajectories/index.json +++ b/.trajectories/index.json @@ -1,6 +1,6 @@ { "version": 1, - "lastUpdated": "2026-05-18T02:01:50.133Z", + "lastUpdated": "2026-05-18T02:05:41.270Z", "trajectories": { "traj_9gq96irkj00s": { "title": "Update relay to use published relaycast Rust reclaim fix", @@ -592,6 +592,13 @@ "completedAt": "2026-05-17T14:33:32.293Z", "path": ".trajectories/completed/2026-05/traj_cbmwd07phhm2.json" }, + "traj_ybcrij9wg8m1": { + "title": "Implement agent-relay view read-only stream client (#864 sub-1)", + "status": "completed", + "startedAt": "2026-05-18T02:02:07.524Z", + "completedAt": "2026-05-18T02:05:41.120Z", + "path": "/Users/will/Projects/AgentWorkforce/relay/.claude/worktrees/agent-a65b9343cd7dbadb3/.trajectories/completed/2026-05/traj_ybcrij9wg8m1.json" + }, "traj_piik8r6zu3i7": { "title": "Issue 867: RelayEventListener", "status": "completed", diff --git a/src/cli/bootstrap.test.ts b/src/cli/bootstrap.test.ts index a8d24055c..b9a3bc0de 100644 --- a/src/cli/bootstrap.test.ts +++ b/src/cli/bootstrap.test.ts @@ -37,6 +37,7 @@ const expectedLeafCommands = [ 'off', 'run', 'connect', + 'view', 'dlq list', 'dlq inspect', 'dlq replay', diff --git a/src/cli/bootstrap.ts b/src/cli/bootstrap.ts index 291a89fd6..bcc3d1609 100644 --- a/src/cli/bootstrap.ts +++ b/src/cli/bootstrap.ts @@ -26,6 +26,7 @@ import { registerSwarmCommands } from './commands/swarm.js'; import { registerConnectCommands } from './commands/connect.js'; import { registerOnCommands } from './commands/on.js'; import { registerDlqCommands } from './commands/dlq.js'; +import { registerViewCommands } from './commands/view.js'; dotenvConfig({ quiet: true }); @@ -281,6 +282,7 @@ export function createProgram(options: { name?: string } = {}): Command { registerOnCommands(program); registerConnectCommands(program); registerDlqCommands(program); + registerViewCommands(program); return program; } diff --git a/src/cli/commands/view.test.ts b/src/cli/commands/view.test.ts new file mode 100644 index 000000000..b247e1e97 --- /dev/null +++ b/src/cli/commands/view.test.ts @@ -0,0 +1,456 @@ +import { Command } from 'commander'; +import { afterEach, describe, expect, it, vi } from 'vitest'; + +import { + extractMatchingChunk, + registerViewCommands, + resolveViewBrokerConnection, + runViewSession, + toWsUrl, + type ViewDependencies, + type ViewWebSocket, +} from './view.js'; + +class ExitSignal extends Error { + constructor(public readonly code: number) { + super(`exit:${code}`); + } +} + +type WsListener = (...args: unknown[]) => void; + +class FakeWebSocket implements ViewWebSocket { + readonly url: string; + readonly headers: Record; + readonly listeners = new Map(); + closed = false; + closeCode?: number; + closeReason?: string; + + constructor(url: string, headers: Record) { + this.url = url; + this.headers = headers; + } + + on(event: string, listener: (...args: unknown[]) => void): unknown { + const bucket = this.listeners.get(event) ?? []; + bucket.push(listener); + this.listeners.set(event, bucket); + return this; + } + + emit(event: string, ...args: unknown[]): void { + for (const listener of this.listeners.get(event) ?? []) { + listener(...args); + } + } + + close(code?: number, reason?: string): void { + this.closed = true; + this.closeCode = code; + this.closeReason = reason; + } +} + +interface HarnessOverrides { + env?: NodeJS.ProcessEnv; + connectionFile?: unknown; + defaultStateDir?: string; + /** Override the snapshot helper outcome. Defaults to `{ status: 'ok' }` + * with no writes — most tests don't care about the snapshot path. */ + snapshotResult?: Awaited>; + /** If set, snapshot helper writes this string to `writeChunk` when called. */ + snapshotChunk?: string; +} + +function createHarness(overrides: HarnessOverrides = {}): { + deps: ViewDependencies; + writes: string[]; + errors: unknown[][]; + logs: unknown[][]; + signals: Map void | Promise>; + sockets: FakeWebSocket[]; +} { + const writes: string[] = []; + const errors: unknown[][] = []; + const logs: unknown[][] = []; + const signals = new Map void | Promise>(); + const sockets: FakeWebSocket[] = []; + + const deps: ViewDependencies = { + readConnectionFile: vi.fn(() => overrides.connectionFile ?? null), + getDefaultStateDir: vi.fn(() => overrides.defaultStateDir ?? '/tmp/fake/.agent-relay'), + env: overrides.env ?? {}, + createWebSocket: vi.fn((url: string, headers: Record) => { + const socket = new FakeWebSocket(url, headers); + sockets.push(socket); + return socket; + }), + writeChunk: (chunk: string) => { + writes.push(chunk); + }, + onSignal: (signal, handler) => { + signals.set(signal, handler); + }, + log: (...args: unknown[]) => { + logs.push(args); + }, + error: (...args: unknown[]) => { + errors.push(args); + }, + exit: vi.fn((code: number) => { + throw new ExitSignal(code); + }) as unknown as ViewDependencies['exit'], + // Snapshot path: tests opt in by overriding either the result or the + // emitted chunk via `HarnessOverrides`. Default is `ok` with no write + // so existing tests continue to assert on live-stream writes only. + fetch: vi.fn(async () => new Response('', { status: 200 })) as ViewDependencies['fetch'], + captureAndRenderSnapshot: vi.fn(async (_conn, _name, snapshotDeps) => { + if (overrides.snapshotChunk !== undefined) { + snapshotDeps.writeChunk(overrides.snapshotChunk); + } + return overrides.snapshotResult ?? { status: 'ok' }; + }) as ViewDependencies['captureAndRenderSnapshot'], + }; + + return { deps, writes, errors, logs, signals, sockets }; +} + +afterEach(() => { + vi.restoreAllMocks(); +}); + +describe('extractMatchingChunk', () => { + it('returns the chunk for matching worker_stream events', () => { + const raw = JSON.stringify({ + kind: 'worker_stream', + name: 'Alice', + stream: 'stdout', + chunk: 'hello', + }); + expect(extractMatchingChunk(raw, 'Alice')).toBe('hello'); + }); + + it('filters out events for other agents', () => { + const raw = JSON.stringify({ + kind: 'worker_stream', + name: 'Bob', + stream: 'stdout', + chunk: 'hello', + }); + expect(extractMatchingChunk(raw, 'Alice')).toBeNull(); + }); + + it('filters out events with non-worker_stream kinds', () => { + const raw = JSON.stringify({ + kind: 'agent_spawned', + name: 'Alice', + }); + expect(extractMatchingChunk(raw, 'Alice')).toBeNull(); + }); + + it('returns null for non-JSON input', () => { + expect(extractMatchingChunk('not-json', 'Alice')).toBeNull(); + }); + + it('returns null for JSON payloads missing chunk', () => { + const raw = JSON.stringify({ kind: 'worker_stream', name: 'Alice', stream: 'stdout' }); + expect(extractMatchingChunk(raw, 'Alice')).toBeNull(); + }); + + it('returns null for arrays/non-object JSON payloads', () => { + expect(extractMatchingChunk('[1,2,3]', 'Alice')).toBeNull(); + }); + + it('keeps empty chunks (server sends them to signal flushes)', () => { + const raw = JSON.stringify({ kind: 'worker_stream', name: 'Alice', stream: 'stdout', chunk: '' }); + expect(extractMatchingChunk(raw, 'Alice')).toBe(''); + }); +}); + +describe('toWsUrl', () => { + it('rewrites http://host:port to ws://host:port/ws', () => { + expect(toWsUrl('http://localhost:3889')).toBe('ws://localhost:3889/ws'); + }); + + it('rewrites https://… to wss://…/ws', () => { + expect(toWsUrl('https://broker.example.com')).toBe('wss://broker.example.com/ws'); + }); + + it('handles trailing-slash-stripped input', () => { + expect(toWsUrl('http://localhost:3889')).toBe('ws://localhost:3889/ws'); + }); +}); + +describe('resolveViewBrokerConnection', () => { + it('prefers --broker-url over env and connection.json', () => { + const { deps } = createHarness({ + env: { RELAY_BROKER_URL: 'http://env-host:1234' }, + connectionFile: { url: 'http://file-host:5678', api_key: 'file-key' }, + }); + + const conn = resolveViewBrokerConnection({ brokerUrl: 'http://flag-host:9999' }, deps); + expect(conn).toEqual({ url: 'http://flag-host:9999', apiKey: 'file-key' }); + }); + + it('uses RELAY_BROKER_URL when no flag is provided', () => { + const { deps } = createHarness({ + env: { RELAY_BROKER_URL: 'http://env-host:1234', RELAY_BROKER_API_KEY: 'env-key' }, + connectionFile: { url: 'http://file-host:5678', api_key: 'file-key' }, + }); + + const conn = resolveViewBrokerConnection({}, deps); + expect(conn).toEqual({ url: 'http://env-host:1234', apiKey: 'env-key' }); + }); + + it('falls back to connection.json for both url and api_key', () => { + const { deps } = createHarness({ + env: {}, + connectionFile: { url: 'http://file-host:5678/', api_key: 'file-key' }, + }); + + const conn = resolveViewBrokerConnection({}, deps); + expect(conn).toEqual({ url: 'http://file-host:5678', apiKey: 'file-key' }); + }); + + it('returns null when no source provides a URL', () => { + const { deps } = createHarness({ env: {}, connectionFile: null }); + expect(resolveViewBrokerConnection({}, deps)).toBeNull(); + }); + + it('allows --api-key to override the connection-file key', () => { + const { deps } = createHarness({ + env: {}, + connectionFile: { url: 'http://file-host:5678', api_key: 'file-key' }, + }); + + const conn = resolveViewBrokerConnection({ apiKey: 'flag-key' }, deps); + expect(conn).toEqual({ url: 'http://file-host:5678', apiKey: 'flag-key' }); + }); + + it('returns undefined apiKey when none of the sources have one', () => { + const { deps } = createHarness({ + env: {}, + connectionFile: { url: 'http://file-host:5678' }, + }); + + const conn = resolveViewBrokerConnection({}, deps); + expect(conn).toEqual({ url: 'http://file-host:5678', apiKey: undefined }); + }); +}); + +describe('runViewSession', () => { + it('writes chunks for matching events and ignores others', async () => { + const { deps, writes, sockets } = createHarness({ + connectionFile: { url: 'http://localhost:3889', api_key: 'k' }, + }); + + const sessionPromise = runViewSession('Alice', {}, deps); + // Wait a tick so the WebSocket factory has been called + await new Promise((resolve) => setImmediate(resolve)); + expect(sockets).toHaveLength(1); + const socket = sockets[0]; + expect(socket.url).toBe('ws://localhost:3889/ws'); + expect(socket.headers['X-API-Key']).toBe('k'); + + socket.emit('open'); + socket.emit( + 'message', + Buffer.from(JSON.stringify({ kind: 'worker_stream', name: 'Alice', stream: 'stdout', chunk: 'hi' })) + ); + socket.emit( + 'message', + Buffer.from(JSON.stringify({ kind: 'worker_stream', name: 'Bob', stream: 'stdout', chunk: 'nope' })) + ); + socket.emit( + 'message', + Buffer.from(JSON.stringify({ kind: 'agent_spawned', name: 'Alice', runtime: 'pty' })) + ); + socket.emit('close', 1000, Buffer.from('')); + + const code = await sessionPromise; + expect(code).toBe(0); + expect(writes).toEqual(['hi']); + }); + + it('preserves raw ANSI escape sequences byte-for-byte', async () => { + const { deps, writes, sockets } = createHarness({ + connectionFile: { url: 'http://localhost:3889' }, + }); + const ansi = 'RED\r\n'; + + const sessionPromise = runViewSession('Alice', {}, deps); + await new Promise((resolve) => setImmediate(resolve)); + const socket = sockets[0]; + socket.emit('open'); + socket.emit( + 'message', + JSON.stringify({ kind: 'worker_stream', name: 'Alice', stream: 'stdout', chunk: ansi }) + ); + socket.emit('close', 1000, Buffer.from('')); + + await sessionPromise; + expect(writes).toEqual([ansi]); + }); + + it('exits cleanly on SIGINT without surfacing an error', async () => { + const { deps, sockets, signals } = createHarness({ + connectionFile: { url: 'http://localhost:3889' }, + }); + + const sessionPromise = runViewSession('Alice', {}, deps); + await new Promise((resolve) => setImmediate(resolve)); + const socket = sockets[0]; + socket.emit('open'); + + const sigintHandler = signals.get('SIGINT'); + expect(sigintHandler).toBeDefined(); + await sigintHandler?.(); + + const code = await sessionPromise; + expect(code).toBe(0); + expect(socket.closed).toBe(true); + }); + + it('reports an error and resolves with 1 on abnormal close', async () => { + const { deps, errors, sockets } = createHarness({ + connectionFile: { url: 'http://localhost:3889' }, + }); + + const sessionPromise = runViewSession('Alice', {}, deps); + await new Promise((resolve) => setImmediate(resolve)); + const socket = sockets[0]; + socket.emit('close', 1006, Buffer.from('abnormal')); + + const code = await sessionPromise; + expect(code).toBe(1); + expect(errors.length).toBeGreaterThan(0); + }); + + it('returns 1 when no broker connection can be resolved', async () => { + const { deps, errors } = createHarness({ env: {}, connectionFile: null }); + const code = await runViewSession('Alice', {}, deps); + expect(code).toBe(1); + expect(errors[0]?.[0]).toMatch(/could not locate broker connection/); + }); + + it('omits the X-API-Key header when no api key is available', async () => { + const { deps, sockets } = createHarness({ + connectionFile: { url: 'http://localhost:3889' }, + }); + + const sessionPromise = runViewSession('Alice', {}, deps); + await new Promise((resolve) => setImmediate(resolve)); + const socket = sockets[0]; + expect(socket.headers['X-API-Key']).toBeUndefined(); + socket.emit('close', 1000, Buffer.from('')); + await sessionPromise; + }); + + it('renders the snapshot to stdout BEFORE the WebSocket opens', async () => { + // The snapshot helper writes a clear-screen + welcome banner first; + // the live WS then appends a delta. The user's terminal should see + // the snapshot bytes first, then the live chunk. + const snapshotBytes = '\x1b[2J\x1b[H\x1b[32mWelcome back Will\x1b[0m\n❯\n'; + const { deps, writes, sockets } = createHarness({ + connectionFile: { url: 'http://localhost:3889', api_key: 'k' }, + snapshotChunk: snapshotBytes, + }); + + const sessionPromise = runViewSession('Alice', {}, deps); + await new Promise((resolve) => setImmediate(resolve)); + + // Snapshot must have been written before any WS chunk arrives. + expect(writes).toEqual([snapshotBytes]); + + const socket = sockets[0]; + socket.emit('open'); + socket.emit( + 'message', + JSON.stringify({ kind: 'worker_stream', name: 'Alice', stream: 'stdout', chunk: 'live delta' }) + ); + socket.emit('close', 1000, Buffer.from('')); + + await sessionPromise; + expect(writes).toEqual([snapshotBytes, 'live delta']); + }); + + it('aborts with exit code 1 when the snapshot reports not_found', async () => { + const { deps, errors, sockets } = createHarness({ + connectionFile: { url: 'http://localhost:3889' }, + snapshotResult: { status: 'not_found', message: "no agent named 'Ghost'" }, + }); + + const code = await runViewSession('Ghost', {}, deps); + expect(code).toBe(1); + expect(sockets).toHaveLength(0); // never opened the WS + expect(errors[0]?.[0]).toMatch(/no agent named/); + }); + + it('aborts with exit code 1 when the worker has no PTY', async () => { + const { deps, errors, sockets } = createHarness({ + connectionFile: { url: 'http://localhost:3889' }, + snapshotResult: { + status: 'no_pty', + message: "agent 'Headless' has no PTY (headless worker — nothing to view)", + }, + }); + + const code = await runViewSession('Headless', {}, deps); + expect(code).toBe(1); + expect(sockets).toHaveLength(0); + expect(errors[0]?.[0]).toMatch(/no PTY/); + }); + + it('logs and continues when the snapshot is transiently unavailable', async () => { + // Snapshot fails (broker hiccup, worker crashed mid-snapshot, etc.) + // but the live stream should still attach — the agent may produce + // useful output even if the current screen couldn't be captured. + const { deps, logs, sockets, writes } = createHarness({ + connectionFile: { url: 'http://localhost:3889' }, + snapshotResult: { status: 'unavailable', message: 'snapshot returned HTTP 504' }, + }); + + const sessionPromise = runViewSession('Alice', {}, deps); + await new Promise((resolve) => setImmediate(resolve)); + + expect(sockets).toHaveLength(1); // WS still opened + expect(logs.some((args) => String(args[0]).includes('could not capture initial screen'))).toBe(true); + + const socket = sockets[0]; + socket.emit('open'); + socket.emit( + 'message', + JSON.stringify({ kind: 'worker_stream', name: 'Alice', stream: 'stdout', chunk: 'live' }) + ); + socket.emit('close', 1000, Buffer.from('')); + + const code = await sessionPromise; + expect(code).toBe(0); + expect(writes).toEqual(['live']); + }); +}); + +describe('registerViewCommands', () => { + it('registers a `view` command on the program', () => { + const { deps } = createHarness(); + const program = new Command(); + program.exitOverride(); + registerViewCommands(program, deps); + + const cmd = program.commands.find((c) => c.name() === 'view'); + expect(cmd).toBeDefined(); + expect(cmd?.description()).toMatch(/read-only/i); + }); + + it('wires --broker-url, --api-key, and --state-dir options', () => { + const { deps } = createHarness(); + const program = new Command(); + program.exitOverride(); + registerViewCommands(program, deps); + + const cmd = program.commands.find((c) => c.name() === 'view'); + const flags = cmd?.options.map((opt) => opt.long).filter(Boolean); + expect(flags).toEqual(expect.arrayContaining(['--broker-url', '--api-key', '--state-dir'])); + }); +}); diff --git a/src/cli/commands/view.ts b/src/cli/commands/view.ts new file mode 100644 index 000000000..65d45076b --- /dev/null +++ b/src/cli/commands/view.ts @@ -0,0 +1,326 @@ +/** + * `agent-relay view ` — read-only PTY stream client. + * + * Connects to a running broker's `/ws` WebSocket, filters the event stream + * for `worker_stream` frames matching the requested agent name, and writes + * each chunk's raw bytes to stdout (preserving ANSI escapes). + * + * No keystrokes are forwarded — the broker keeps doing whatever it's doing. + * Ctrl+C exits the client cleanly; the agent keeps running under the broker. + * + * See GitHub issue #864 for the broader vision (drive / relay / new / run). + * This module ships only the `view` verb. + */ + +import fs from 'node:fs'; +import path from 'node:path'; + +import { Command } from 'commander'; +import WebSocket from 'ws'; + +import { getProjectPaths } from '@agent-relay/config'; + +import { + captureAndRenderSnapshot, + type AttachSnapshotConnection, + type AttachSnapshotDeps, +} from '../lib/attach.js'; +import { defaultExit, runSignalHandler } from '../lib/exit.js'; + +type ExitFn = (code: number) => never; + +/** Subset of the broker's `BrokerEvent` we actually care about for `view`. */ +export interface ViewableWorkerStreamEvent { + kind: 'worker_stream'; + name: string; + stream: string; + chunk: string; +} + +/** Connection metadata discovered from `connection.json` or CLI/env overrides. */ +export interface ViewBrokerConnection { + url: string; + apiKey?: string; +} + +export interface ViewWebSocket { + on(event: 'open', listener: () => void): unknown; + on(event: 'message', listener: (data: WebSocket.RawData) => void): unknown; + on(event: 'close', listener: (code: number, reason: Buffer) => void): unknown; + on(event: 'error', listener: (err: Error) => void): unknown; + close(code?: number, reason?: string): void; +} + +export type ViewWebSocketFactory = (url: string, headers: Record) => ViewWebSocket; + +export interface ViewSignalRegistrar { + (signal: NodeJS.Signals, handler: () => void | Promise): void; +} + +export interface ViewDependencies { + /** Reads `/connection.json` and returns parsed JSON, or null. */ + readConnectionFile: (stateDir: string) => unknown; + /** Project paths helper — used to pick the default state dir. */ + getDefaultStateDir: () => string; + /** Environment variables (so tests can inject). */ + env: NodeJS.ProcessEnv; + /** Factory for the WebSocket — overridden in tests with a mock. */ + createWebSocket: ViewWebSocketFactory; + /** Where the PTY chunks get written. Defaults to `process.stdout.write`. */ + writeChunk: (chunk: string) => void; + /** Signal registration (so tests can drive SIGINT without killing the test). */ + onSignal: ViewSignalRegistrar; + log: (...args: unknown[]) => void; + error: (...args: unknown[]) => void; + exit: ExitFn; + /** HTTP client used by the snapshot-on-attach call. Defaults to global `fetch`. */ + fetch: typeof globalThis.fetch; + /** Override for the snapshot-on-attach helper (tests substitute a stub). */ + captureAndRenderSnapshot: ( + connection: AttachSnapshotConnection, + agentName: string, + deps: AttachSnapshotDeps + ) => ReturnType; +} + +function readConnectionFileFromDisk(stateDir: string): unknown { + const connPath = path.join(stateDir, 'connection.json'); + try { + const raw = fs.readFileSync(connPath, 'utf-8'); + return JSON.parse(raw) as unknown; + } catch { + return null; + } +} + +function defaultStateDir(): string { + // Match the Rust broker's discovery convention: `.agent-relay/` under the + // project root (resolved the same way the rest of the CLI does it). + const projectRoot = getProjectPaths().projectRoot; + return path.join(projectRoot, '.agent-relay'); +} + +function withDefaults(overrides: Partial = {}): ViewDependencies { + return { + readConnectionFile: readConnectionFileFromDisk, + getDefaultStateDir: defaultStateDir, + env: process.env, + createWebSocket: (url, headers) => new WebSocket(url, { headers }) as ViewWebSocket, + writeChunk: (chunk) => { + process.stdout.write(chunk); + }, + onSignal: (signal, handler) => { + process.on(signal, () => runSignalHandler(handler)); + }, + log: (...args: unknown[]) => console.error(...args), + error: (...args: unknown[]) => console.error(...args), + exit: defaultExit, + fetch: (input, init) => fetch(input, init), + captureAndRenderSnapshot, + ...overrides, + }; +} + +function isStringObject(value: unknown): value is Record { + return typeof value === 'object' && value !== null && !Array.isArray(value); +} + +function readString(obj: unknown, key: string): string | undefined { + if (!isStringObject(obj)) return undefined; + const value = obj[key]; + if (typeof value !== 'string') return undefined; + const trimmed = value.trim(); + return trimmed === '' ? undefined : trimmed; +} + +/** + * Resolve the broker connection to use for `view`, in priority order: + * + * 1. `--broker-url` / `--api-key` CLI flags + * 2. `RELAY_BROKER_URL` / `RELAY_BROKER_API_KEY` environment variables + * 3. `/connection.json` (default `.agent-relay/connection.json`) + * + * Matches the resolution order used by `agent-relay-broker dump-pty` so users + * don't have to learn two patterns. + */ +export function resolveViewBrokerConnection( + options: { brokerUrl?: string; apiKey?: string; stateDir?: string }, + deps: ViewDependencies +): ViewBrokerConnection | null { + const explicitUrl = options.brokerUrl?.trim(); + const envUrl = deps.env.RELAY_BROKER_URL?.trim(); + const stateDir = options.stateDir ? path.resolve(options.stateDir) : deps.getDefaultStateDir(); + const connectionFile = deps.readConnectionFile(stateDir); + const fileUrl = readString(connectionFile, 'url'); + + const resolveApiKey = (): string | undefined => { + const explicit = options.apiKey?.trim(); + if (explicit) return explicit; + const fromEnv = deps.env.RELAY_BROKER_API_KEY?.trim(); + if (fromEnv) return fromEnv; + return readString(connectionFile, 'api_key'); + }; + + const url = explicitUrl ?? envUrl ?? fileUrl; + if (!url) return null; + + return { + url: url.replace(/\/+$/, ''), + apiKey: resolveApiKey(), + }; +} + +/** Convert an `http(s)://host:port` base URL to the matching `ws(s)://…/ws`. */ +export function toWsUrl(baseUrl: string): string { + return `${baseUrl.replace(/^http/, 'ws')}/ws`; +} + +/** + * Inspect a single WebSocket message and, if it's a `worker_stream` event for + * the requested agent, return the raw chunk string. Returns `null` for events + * that don't match (other kinds, other agents, malformed JSON, etc.) so the + * caller can ignore them. + * + * Exported for unit testing the filter in isolation from any WebSocket. + */ +export function extractMatchingChunk(rawMessage: string, agentName: string): string | null { + let parsed: unknown; + try { + parsed = JSON.parse(rawMessage); + } catch { + return null; + } + if (!isStringObject(parsed)) return null; + if (parsed.kind !== 'worker_stream') return null; + if (parsed.name !== agentName) return null; + const chunk = parsed.chunk; + if (typeof chunk !== 'string') return null; + return chunk; +} + +/** + * Open the read-only view stream and run until the WebSocket closes, the + * caller signals SIGINT/SIGTERM, or an unrecoverable error occurs. Resolves + * with the exit code the CLI should propagate. + */ +export async function runViewSession( + agentName: string, + options: { brokerUrl?: string; apiKey?: string; stateDir?: string }, + deps: ViewDependencies +): Promise { + if (!agentName.trim()) { + deps.error('Error: agent name is required'); + return 1; + } + + const connection = resolveViewBrokerConnection(options, deps); + if (!connection) { + deps.error( + 'Error: could not locate broker connection. Pass --broker-url, set RELAY_BROKER_URL, ' + + 'or run from a directory containing .agent-relay/connection.json.' + ); + return 1; + } + + // Render the agent's current screen before the live stream begins, so + // the user sees what's there instead of staring at a blank terminal + // until the agent happens to produce more output. Hard errors + // (`not_found` / `no_pty`) abort — there's nothing meaningful to view. + // Transient errors (`unavailable` / `transport_error`) are surfaced as + // a warning and we fall through to the live stream; the agent may + // still produce useful output even if the snapshot couldn't be served. + const snapshot = await deps.captureAndRenderSnapshot( + { url: connection.url, apiKey: connection.apiKey }, + agentName, + { fetch: deps.fetch, writeChunk: deps.writeChunk } + ); + switch (snapshot.status) { + case 'ok': + break; + case 'not_found': + deps.error(`Error: ${snapshot.message ?? `no agent named '${agentName}'`}`); + return 1; + case 'no_pty': + deps.error(`Error: ${snapshot.message ?? `agent '${agentName}' has no PTY to view`}`); + return 1; + case 'unavailable': + case 'transport_error': + deps.log( + `[view] could not capture initial screen (${snapshot.message ?? snapshot.status}); streaming live output only` + ); + break; + } + + const wsUrl = toWsUrl(connection.url); + const headers: Record = {}; + if (connection.apiKey) { + headers['X-API-Key'] = connection.apiKey; + } + + return new Promise((resolve) => { + let settled = false; + const finish = (code: number) => { + if (settled) return; + settled = true; + try { + socket.close(1000, 'view client exiting'); + } catch { + // best effort — already closed + } + resolve(code); + }; + + const socket = deps.createWebSocket(wsUrl, headers); + + deps.onSignal('SIGINT', () => finish(0)); + deps.onSignal('SIGTERM', () => finish(0)); + + socket.on('open', () => { + deps.log(`[view] streaming ${agentName} from ${connection.url} (Ctrl+C to exit)`); + }); + + socket.on('message', (data) => { + const text = + typeof data === 'string' ? data : Buffer.isBuffer(data) ? data.toString('utf-8') : String(data); + const chunk = extractMatchingChunk(text, agentName); + if (chunk !== null) { + deps.writeChunk(chunk); + } + }); + + socket.on('error', (err: Error) => { + deps.error(`[view] WebSocket error: ${err.message}`); + }); + + socket.on('close', (code: number, reason: Buffer) => { + if (settled) return; + const reasonText = reason && reason.length > 0 ? reason.toString('utf-8') : ''; + if (code === 1000 || code === 1005) { + // Normal closure (server shut down or sent close frame without status) + finish(0); + } else { + deps.error(`[view] connection closed (code: ${code}${reasonText ? `, reason: ${reasonText}` : ''})`); + finish(1); + } + }); + }); +} + +/** Register `agent-relay view ` on the supplied commander program. */ +export function registerViewCommands(program: Command, overrides: Partial = {}): void { + const deps = withDefaults(overrides); + + program + .command('view') + .description("Stream a running agent's PTY output to your terminal (read-only)") + .argument('', 'Agent name to view') + .option('--broker-url ', 'Broker base URL (overrides RELAY_BROKER_URL and connection.json)') + .option('--api-key ', 'Broker API key (overrides RELAY_BROKER_API_KEY and connection.json)') + .option('--state-dir ', 'Directory containing connection.json (default: .agent-relay/)') + .action(async (name: string, options: { brokerUrl?: string; apiKey?: string; stateDir?: string }) => { + const code = await runViewSession(name, options, deps); + if (code !== 0) { + deps.exit(code); + } + }); +} diff --git a/src/cli/lib/attach.test.ts b/src/cli/lib/attach.test.ts new file mode 100644 index 000000000..221a2a00a --- /dev/null +++ b/src/cli/lib/attach.test.ts @@ -0,0 +1,196 @@ +import { afterEach, describe, expect, it, vi } from 'vitest'; + +import { + captureAndRenderSnapshot, + type AttachSnapshotConnection, + type AttachSnapshotDeps, +} from './attach.js'; + +afterEach(() => { + vi.restoreAllMocks(); +}); + +function makeDeps(overrides: Partial = {}): { + deps: AttachSnapshotDeps; + writes: string[]; +} { + const writes: string[] = []; + const deps: AttachSnapshotDeps = { + fetch: vi.fn(async () => new Response('', { status: 200 })), + writeChunk: (chunk: string) => { + writes.push(chunk); + }, + ...overrides, + }; + return { deps, writes }; +} + +const conn: AttachSnapshotConnection = { url: 'http://localhost:3889', apiKey: 'k' }; + +describe('captureAndRenderSnapshot', () => { + it('writes the decoded ANSI bytes to writeChunk on success', async () => { + const ansi = '\x1b[2J\x1b[H\x1b[32mhello\x1b[0m'; + const screen = Buffer.from(ansi, 'utf-8').toString('base64'); + const { deps, writes } = makeDeps({ + fetch: vi.fn( + async () => + new Response( + JSON.stringify({ + format: 'ansi', + rows: 24, + cols: 80, + cursor: [1, 6], + screen, + }), + { status: 200, headers: { 'content-type': 'application/json' } } + ) + ), + }); + + const result = await captureAndRenderSnapshot(conn, 'Alice', deps); + + expect(result.status).toBe('ok'); + expect(result.rows).toBe(24); + expect(result.cols).toBe(80); + expect(result.cursor).toEqual([1, 6]); + expect(writes).toEqual([ansi]); + }); + + it('hits the snapshot route with format=ansi and the X-API-Key header', async () => { + const fetchMock = vi.fn( + async () => + new Response( + JSON.stringify({ + format: 'ansi', + screen: Buffer.from('x', 'utf-8').toString('base64'), + }), + { status: 200 } + ) + ); + const { deps } = makeDeps({ fetch: fetchMock }); + + await captureAndRenderSnapshot(conn, 'Alice', deps); + + expect(fetchMock).toHaveBeenCalledTimes(1); + const [url, init] = fetchMock.mock.calls[0]; + expect(url).toBe('http://localhost:3889/api/spawned/Alice/snapshot?format=ansi'); + expect((init as RequestInit).headers).toEqual({ 'X-API-Key': 'k' }); + }); + + it('URL-encodes the agent name', async () => { + const fetchMock = vi.fn( + async () => + new Response( + JSON.stringify({ + format: 'ansi', + screen: Buffer.from('', 'utf-8').toString('base64'), + }), + { status: 200 } + ) + ); + const { deps } = makeDeps({ fetch: fetchMock }); + + await captureAndRenderSnapshot(conn, 'agent name/with slash', deps); + + const [url] = fetchMock.mock.calls[0]; + expect(url).toBe('http://localhost:3889/api/spawned/agent%20name%2Fwith%20slash/snapshot?format=ansi'); + }); + + it('omits the X-API-Key header when no api key is set', async () => { + const fetchMock = vi.fn( + async () => + new Response(JSON.stringify({ screen: Buffer.from('', 'utf-8').toString('base64') }), { status: 200 }) + ); + const { deps } = makeDeps({ fetch: fetchMock }); + + await captureAndRenderSnapshot({ url: 'http://localhost:3889' }, 'Alice', deps); + + const [, init] = fetchMock.mock.calls[0]; + expect((init as RequestInit).headers).toEqual({}); + }); + + it('returns not_found on HTTP 404 and does not write', async () => { + const { deps, writes } = makeDeps({ + fetch: vi.fn(async () => new Response('', { status: 404 })), + }); + + const result = await captureAndRenderSnapshot(conn, 'Ghost', deps); + + expect(result.status).toBe('not_found'); + expect(result.message).toContain('Ghost'); + expect(writes).toEqual([]); + }); + + it('returns no_pty on HTTP 409', async () => { + const { deps, writes } = makeDeps({ + fetch: vi.fn(async () => new Response('', { status: 409 })), + }); + + const result = await captureAndRenderSnapshot(conn, 'Headless', deps); + + expect(result.status).toBe('no_pty'); + expect(result.message).toMatch(/headless/i); + expect(writes).toEqual([]); + }); + + it('returns unavailable on 5xx', async () => { + const { deps } = makeDeps({ + fetch: vi.fn(async () => new Response('', { status: 503 })), + }); + + const result = await captureAndRenderSnapshot(conn, 'Alice', deps); + + expect(result.status).toBe('unavailable'); + expect(result.message).toContain('503'); + }); + + it('returns transport_error when fetch itself throws', async () => { + const { deps } = makeDeps({ + fetch: vi.fn(async () => { + throw new Error('network down'); + }), + }); + + const result = await captureAndRenderSnapshot(conn, 'Alice', deps); + + expect(result.status).toBe('transport_error'); + expect(result.message).toBe('network down'); + }); + + it('returns transport_error when the body is not JSON', async () => { + const { deps } = makeDeps({ + fetch: vi.fn(async () => new Response('not json', { status: 200 })), + }); + + const result = await captureAndRenderSnapshot(conn, 'Alice', deps); + + expect(result.status).toBe('transport_error'); + expect(result.message).toMatch(/not JSON/i); + }); + + it('returns transport_error when the screen field is missing', async () => { + const { deps } = makeDeps({ + fetch: vi.fn( + async () => new Response(JSON.stringify({ format: 'ansi', rows: 24, cols: 80 }), { status: 200 }) + ), + }); + + const result = await captureAndRenderSnapshot(conn, 'Alice', deps); + + expect(result.status).toBe('transport_error'); + expect(result.message).toMatch(/missing 'screen' field/); + }); + + it('strips a trailing slash from the connection URL', async () => { + const fetchMock = vi.fn( + async () => + new Response(JSON.stringify({ screen: Buffer.from('', 'utf-8').toString('base64') }), { status: 200 }) + ); + const { deps } = makeDeps({ fetch: fetchMock }); + + await captureAndRenderSnapshot({ url: 'http://localhost:3889/', apiKey: 'k' }, 'Alice', deps); + + const [url] = fetchMock.mock.calls[0]; + expect(url).toBe('http://localhost:3889/api/spawned/Alice/snapshot?format=ansi'); + }); +}); diff --git a/src/cli/lib/attach.ts b/src/cli/lib/attach.ts new file mode 100644 index 000000000..cecd7cffd --- /dev/null +++ b/src/cli/lib/attach.ts @@ -0,0 +1,124 @@ +/** + * Shared helpers for attach-style CLI commands. + * + * `view` (read-only), and future `drive` / `relay` from issue #864, all need + * to render the agent's *current* visible screen before they start streaming + * live updates — otherwise the user attaches to a quiet agent and stares at + * a blank terminal until the agent happens to produce more output. This + * module wraps the broker's snapshot endpoint so each verb gets that for one + * line of code. + */ + +/** Connection metadata used to call the broker's snapshot endpoint. */ +export interface AttachSnapshotConnection { + /** Broker base URL (no trailing slash). */ + url: string; + /** Optional API key — added as an `X-API-Key` header if present. */ + apiKey?: string; +} + +/** Dependencies for `captureAndRenderSnapshot` — injected so tests don't hit + * the network. */ +export interface AttachSnapshotDeps { + /** Native `fetch` by default; swapped out by tests. */ + fetch: typeof globalThis.fetch; + /** Where the ANSI bytes get written. Typically `process.stdout.write`. */ + writeChunk: (chunk: string) => void; +} + +/** Outcome of a snapshot capture. Callers decide whether to bail or continue + * on each variant — `view` aborts on `not_found` / `no_pty`, warns and + * continues on `unavailable` / `transport_error`. */ +export interface AttachSnapshotResult { + status: 'ok' | 'not_found' | 'no_pty' | 'unavailable' | 'transport_error'; + /** Grid dimensions as reported by the broker, if the call succeeded. */ + rows?: number; + cols?: number; + /** Cursor position `[row, col]`, 1-indexed, if the call succeeded. */ + cursor?: [number, number]; + /** Human-readable detail for error variants. */ + message?: string; +} + +/** + * Fetch a worker's current visible screen as ANSI reproduction bytes and + * write them to the caller's output. Callers should invoke this BEFORE + * subscribing to the WebSocket event stream so the user sees the agent's + * current state before live deltas start arriving. + * + * There is a tiny window between the snapshot capture and the live stream + * starting in which the agent's output could be missed (≤10ms in practice). + * Most TUI agents repaint heavily so the next update overwrites anything + * lost; if that becomes a problem we can switch to subscribe-first + + * buffer + drain, but it's not worth the complexity today. + * + * @returns A status describing the outcome. `ok` means the screen was + * rendered; other variants carry a message the caller can surface. + */ +export async function captureAndRenderSnapshot( + connection: AttachSnapshotConnection, + agentName: string, + deps: AttachSnapshotDeps +): Promise { + const baseUrl = connection.url.replace(/\/+$/, ''); + const target = `${baseUrl}/api/spawned/${encodeURIComponent(agentName)}/snapshot?format=ansi`; + const headers: Record = {}; + if (connection.apiKey) { + headers['X-API-Key'] = connection.apiKey; + } + + let res: Response; + try { + res = await deps.fetch(target, { headers }); + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + return { status: 'transport_error', message }; + } + + if (res.status === 404) { + return { status: 'not_found', message: `no agent named '${agentName}'` }; + } + if (res.status === 409) { + return { + status: 'no_pty', + message: `agent '${agentName}' has no PTY (headless worker — nothing to view)`, + }; + } + if (!res.ok) { + return { status: 'unavailable', message: `snapshot returned HTTP ${res.status}` }; + } + + let body: unknown; + try { + body = await res.json(); + } catch { + return { status: 'transport_error', message: 'snapshot response was not JSON' }; + } + if (typeof body !== 'object' || body === null) { + return { status: 'transport_error', message: 'snapshot response was not an object' }; + } + const obj = body as Record; + const screen = obj.screen; + if (typeof screen !== 'string') { + return { status: 'transport_error', message: "snapshot response missing 'screen' field" }; + } + + // Snapshot bytes are mostly ASCII (escape sequences) plus the cell + // characters which are valid Unicode codepoints (alacritty stores + // chars, not bytes). UTF-8 round-trips cleanly. + const decoded = Buffer.from(screen, 'base64').toString('utf-8'); + deps.writeChunk(decoded); + + const rows = typeof obj.rows === 'number' ? obj.rows : undefined; + const cols = typeof obj.cols === 'number' ? obj.cols : undefined; + const cursorRaw = Array.isArray(obj.cursor) ? obj.cursor : undefined; + const cursor: [number, number] | undefined = + cursorRaw && + cursorRaw.length === 2 && + typeof cursorRaw[0] === 'number' && + typeof cursorRaw[1] === 'number' + ? [cursorRaw[0], cursorRaw[1]] + : undefined; + + return { status: 'ok', rows, cols, cursor }; +} diff --git a/vitest.config.ts b/vitest.config.ts index 4bf8998af..67c5fe329 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -1,22 +1,34 @@ import { defineConfig } from 'vitest/config'; import path from 'node:path'; +// Workspace packages that vitest should resolve via their `src/index.ts` +// instead of falling through to Node's resolver (which requires `dist/` +// to be built first). Every workspace package under `packages/` that +// ships TypeScript and is imported as `@agent-relay/` from any +// test or source file MUST be listed here — otherwise tests will pass +// in CI (because CI runs `npm run build` first) but fail in fresh +// local checkouts that haven't been built yet. +// +// When you add a new workspace package, add it here too. const workspacePackages = [ 'acp-bridge', - 'events', 'agent', 'cloud', 'config', + 'events', 'gateway', + 'github-primitive', 'hooks', 'memory', 'openclaw', 'policy', 'sdk', + 'slack-primitive', 'telemetry', 'trajectory', 'user-directory', 'utils', + 'workflow-types', ] as const; const workspaceAliases = workspacePackages.flatMap((packageName) => { diff --git a/web/content/docs/cli-overview.mdx b/web/content/docs/cli-overview.mdx index 9f175e021..e6c133558 100644 --- a/web/content/docs/cli-overview.mdx +++ b/web/content/docs/cli-overview.mdx @@ -16,7 +16,7 @@ agent-relay --version ## Main command groups - Broker lifecycle: `up`, `status`, `down`, `update`, `uninstall` -- Agent management: `spawn`, `who`, `release`, `set-model`, `agents:logs` +- Agent management: `spawn`, `who`, `release`, `set-model`, `agents:logs`, `view` - Messaging: `send`, `history`, `inbox` - Workflows: `run`, `workflows list` - Cloud: `cloud login`, `cloud connect`, `cloud run`, `cloud status`, `cloud logs`, `cloud sync` diff --git a/web/content/docs/reference-cli.mdx b/web/content/docs/reference-cli.mdx index 232335e04..600b32415 100644 --- a/web/content/docs/reference-cli.mdx +++ b/web/content/docs/reference-cli.mdx @@ -49,3 +49,25 @@ Flags: `agentToken` is `null` unless `--register` successfully minted and injected a fresh Relaycast agent token. `mcp-args` writes side-effect files synchronously when the target CLI requires a config file. For `opencode`, it may write `/opencode.json`; for `cursor`, `cursor-agent`, and `agent`, it may write `/.cursor/mcp.json`; for `gemini`, it may write `/.gemini/trustedFolders.json`. Treat the command as compute plus configure for those CLIs, not as a side-effect-free dry run. + +## `view` + +Stream a running agent's PTY output to your terminal in real time. `view` is read-only: keystrokes are not forwarded to the agent, and the broker keeps doing whatever it was doing. Multiple `view` clients can attach to the same agent simultaneously. Exit with `Ctrl+C` — the agent keeps running under the broker. + +```bash +agent-relay view reviewer +``` + +The client auto-discovers the running broker by reading `.agent-relay/connection.json` in the current directory. On attach, it first captures the agent's current visible screen via `GET /api/spawned/{name}/snapshot?format=ansi` and writes the reproduction bytes to your terminal — so you immediately see whatever the agent is looking at, not a blank window. It then opens a WebSocket to the broker and filters for `worker_stream` events matching the named agent for live deltas. ANSI escape sequences are preserved byte-for-byte so colors and cursor moves render correctly. + +Flags: + +| Flag | Required | Description | +| ---- | -------- | ----------- | +| `--broker-url ` | no | Broker base URL. Falls back to `RELAY_BROKER_URL`, then `.agent-relay/connection.json`. | +| `--api-key ` | no | Broker API key. Falls back to `RELAY_BROKER_API_KEY`, then the `api_key` field in `connection.json`. | +| `--state-dir ` | no | Directory containing `connection.json` to use for auto-discovery. Defaults to `.agent-relay/` under the project root. | + +If the initial snapshot can't be served (broker hiccup, transient timeout) `view` prints a warning to stderr and falls through to the live stream — you may briefly see a blank screen until the agent next produces output. If the agent doesn't exist or has no PTY (headless worker), `view` aborts with an explanatory error. + +`view` is part of the broker-owned agent model described in [issue #864](https://github.com/AgentWorkforce/relay/issues/864). It shares the snapshot-on-attach helper with the upcoming `drive` and `relay` verbs so all three open with a faithful redraw of the agent's current screen.