diff --git a/packages/sdk/src/__tests__/orchestration-upgrades.test.ts b/packages/sdk/src/__tests__/orchestration-upgrades.test.ts index cb265e82c..aa7fe3fe4 100644 --- a/packages/sdk/src/__tests__/orchestration-upgrades.test.ts +++ b/packages/sdk/src/__tests__/orchestration-upgrades.test.ts @@ -260,12 +260,12 @@ describe('AgentRelayClient orchestration payloads', () => { }); }); - it('exposes session mode and pending queue HTTP routes', async () => { + it('exposes inbound delivery mode and pending queue HTTP routes', async () => { const client = createProtocolClient(); const request = vi .spyOn((client as any).transport, 'request') - .mockResolvedValueOnce({ mode: 'human' }) - .mockResolvedValueOnce({ mode: 'passthrough', flushed: 2 }) + .mockResolvedValueOnce({ mode: 'manual_flush' }) + .mockResolvedValueOnce({ mode: 'auto_inject', flushed: 2 }) .mockResolvedValueOnce({ pending: [ { @@ -281,18 +281,18 @@ describe('AgentRelayClient orchestration payloads', () => { }) .mockResolvedValueOnce({ flushed: 1 }); - await expect(client.getSessionMode('worker a')).resolves.toBe('human'); - await expect(client.setSessionMode('worker a', 'passthrough')).resolves.toEqual({ - mode: 'passthrough', + await expect(client.getInboundDeliveryMode('worker a')).resolves.toBe('manual_flush'); + await expect(client.setInboundDeliveryMode('worker a', 'auto_inject')).resolves.toEqual({ + mode: 'auto_inject', flushed: 2, }); await expect(client.getPending('worker a')).resolves.toHaveLength(1); await expect(client.flushPending('worker a')).resolves.toEqual({ flushed: 1 }); - expect(request).toHaveBeenNthCalledWith(1, '/api/spawned/worker%20a/mode'); - expect(request).toHaveBeenNthCalledWith(2, '/api/spawned/worker%20a/mode', { + expect(request).toHaveBeenNthCalledWith(1, '/api/spawned/worker%20a/delivery-mode'); + expect(request).toHaveBeenNthCalledWith(2, '/api/spawned/worker%20a/delivery-mode', { method: 'PUT', - body: JSON.stringify({ mode: 'passthrough' }), + body: JSON.stringify({ mode: 'auto_inject' }), }); expect(request).toHaveBeenNthCalledWith(3, '/api/spawned/worker%20a/pending'); expect(request).toHaveBeenNthCalledWith(4, '/api/spawned/worker%20a/flush', { method: 'POST' }); @@ -370,7 +370,7 @@ describe('AgentRelayClient orchestration payloads', () => { ); const client = new AgentRelayClient({ baseUrl: TEST_BASE_URL, fetch: fetchMock as typeof fetch }); - await expect(client.getSessionMode('ghost')).rejects.toMatchObject({ + await expect(client.getInboundDeliveryMode('ghost')).rejects.toMatchObject({ code: 'agent_not_found', status: 404, message: "no agent named 'ghost'", diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index 61967e2e9..430fc4a9d 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -25,7 +25,7 @@ import type { HeadlessProvider, PendingRelayMessage, PtySnapshot, - SessionMode, + InboundDeliveryMode, SnapshotFormat, } from './protocol.js'; import type { @@ -89,8 +89,8 @@ export interface SessionInfo { uptime_secs: number; } -export interface SetSessionModeResult { - mode: SessionMode; +export interface SetInboundDeliveryModeResult { + mode: InboundDeliveryMode; flushed: number; } @@ -494,31 +494,34 @@ export class AgentRelayClient { }); } - async getSessionMode(name: string): Promise { + async getInboundDeliveryMode(name: string): Promise { const result = await this.transport.request<{ mode?: unknown }>( - `/api/spawned/${encodeURIComponent(name)}/mode` + `/api/spawned/${encodeURIComponent(name)}/delivery-mode` ); - if (result.mode !== 'human' && result.mode !== 'passthrough') { + if (result.mode !== 'auto_inject' && result.mode !== 'manual_flush') { throw new AgentRelayProtocolError({ code: 'invalid_response', - message: "session mode response missing valid 'mode'", + message: "inbound delivery mode response missing valid 'mode'", }); } return result.mode; } - async setSessionMode(name: string, mode: SessionMode): Promise { + async setInboundDeliveryMode( + name: string, + mode: InboundDeliveryMode + ): Promise { const result = await this.transport.request<{ mode?: unknown; flushed?: unknown }>( - `/api/spawned/${encodeURIComponent(name)}/mode`, + `/api/spawned/${encodeURIComponent(name)}/delivery-mode`, { method: 'PUT', body: JSON.stringify({ mode }), } ); - if (result.mode !== 'human' && result.mode !== 'passthrough') { + if (result.mode !== 'auto_inject' && result.mode !== 'manual_flush') { throw new AgentRelayProtocolError({ code: 'invalid_response', - message: "set session mode response missing valid 'mode'", + message: "set inbound delivery mode response missing valid 'mode'", }); } return { diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index 4848124c3..51101db3b 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -6,7 +6,7 @@ export { type AgentRelayBrokerInitArgs, type AgentRelayClientOptions, type AgentRelaySpawnOptions, - type SetSessionModeResult, + type SetInboundDeliveryModeResult, type SessionInfo, type WorkerStreamSubscriptionOptions, } from './client.js'; diff --git a/packages/sdk/src/protocol.ts b/packages/sdk/src/protocol.ts index cbcc6fbf9..4fe28a6fc 100644 --- a/packages/sdk/src/protocol.ts +++ b/packages/sdk/src/protocol.ts @@ -2,7 +2,7 @@ export const PROTOCOL_VERSION = 1 as const; export type AgentRuntime = 'pty' | 'headless'; export type HeadlessProvider = 'claude' | 'opencode'; -export type SessionMode = 'passthrough' | 'human'; +export type InboundDeliveryMode = 'auto_inject' | 'manual_flush'; export type SnapshotFormat = 'plain' | 'ansi'; export interface RestartPolicy { @@ -292,6 +292,12 @@ export type BrokerEvent = count: number; reason?: string; } + | { + kind: 'agent_inbound_delivery_mode_changed'; + name: string; + previous_mode: InboundDeliveryMode; + mode: InboundDeliveryMode; + } | { kind: 'delivery_injected'; name: string; diff --git a/src/cli/bootstrap.test.ts b/src/cli/bootstrap.test.ts index e3c85b000..fd4661203 100644 --- a/src/cli/bootstrap.test.ts +++ b/src/cli/bootstrap.test.ts @@ -152,7 +152,14 @@ describe('verbless `-n NAME CLI` silent alias', () => { // that — here we just need the live snapshot. function knownVerbs(): Set { const program = createProgram(); - return new Set(program.commands.map((c) => c.name())); + const verbs = new Set(); + for (const command of program.commands) { + verbs.add(command.name()); + for (const alias of command.aliases()) { + verbs.add(alias); + } + } + return verbs; } it('recognises `-n NAME CLI`', () => { diff --git a/src/cli/bootstrap.ts b/src/cli/bootstrap.ts index f6af6b99c..fcf7d0310 100644 --- a/src/cli/bootstrap.ts +++ b/src/cli/bootstrap.ts @@ -320,6 +320,9 @@ function collectTopLevelVerbs(program: Command): Set { const verbs = new Set(); for (const command of program.commands) { verbs.add(command.name()); + for (const alias of command.aliases()) { + verbs.add(alias); + } } return verbs; } diff --git a/src/cli/commands/drive.test.ts b/src/cli/commands/drive.test.ts index c6409e153..968232ac2 100644 --- a/src/cli/commands/drive.test.ts +++ b/src/cli/commands/drive.test.ts @@ -138,11 +138,11 @@ type FetchRoute = (init?: RequestInit) => Promise; interface FetchScript { /** Map of route key → handler. Default behaviour returns 200 + sensible body. */ routes?: Record; - /** Default mode reported by `GET …/mode`. */ - initialMode?: 'human' | 'passthrough'; + /** Default mode reported by `GET …/delivery-mode`. */ + initialMode?: 'manual_flush' | 'auto_inject'; /** Default pending count reported by `GET …/pending`. */ initialPending?: number; - /** Make `PUT …/mode` to `human` fail with this status / body. */ + /** Make `PUT …/delivery-mode` to `manual_flush` fail with this status / body. */ modeFlipFailure?: { status: number; error?: string }; /** Make `captureAndRenderSnapshot` return this status. */ snapshotResult?: Awaited>; @@ -179,7 +179,7 @@ function createHarness(opts: FetchScript = {}): { opts.terminalSize === undefined ? { rows: 30, cols: 100 } : opts.terminalSize ); - const initialMode = opts.initialMode ?? 'passthrough'; + const initialMode = opts.initialMode ?? 'auto_inject'; const initialPending = opts.initialPending ?? 0; const defaultRoutes: Record = { @@ -188,12 +188,12 @@ function createHarness(opts: FetchScript = {}): { status: 200, headers: { 'Content-Type': 'application/json' }, }), - 'GET /mode': async () => + 'GET /delivery-mode': async () => new Response(JSON.stringify({ mode: initialMode }), { status: 200, headers: { 'Content-Type': 'application/json' }, }), - 'PUT /mode': async (init) => { + 'PUT /delivery-mode': async (init) => { if (opts.modeFlipFailure) { return new Response(JSON.stringify({ error: opts.modeFlipFailure.error ?? 'fail' }), { status: opts.modeFlipFailure.status, @@ -255,11 +255,11 @@ function createHarness(opts: FetchScript = {}): { } fetchLog.push({ url, method, body: bodyJson, headers }); - // Match by the trailing path segment (`/mode`, `/pending`, `/flush`) + // Match by the trailing path segment (`/delivery-mode`, `/pending`, `/flush`) // or the `/api/input/...` prefix. let key: string | null = null; - if (/\/api\/spawned\/[^/]+\/mode$/.test(url)) { - key = `${method} /mode`; + if (/\/api\/spawned\/[^/]+\/delivery-mode$/.test(url)) { + key = `${method} /delivery-mode`; } else if (/\/api\/spawned\/[^/]+\/pending$/.test(url)) { key = `${method} /pending`; } else if (/\/api\/spawned\/[^/]+\/flush$/.test(url)) { @@ -436,15 +436,15 @@ describe('KeybindParser', () => { describe('renderStatusLine', () => { it('includes agent name, mode, pending count, and detach hint', () => { - const out = renderStatusLine({ name: 'Alice', mode: 'human', pending: 3, showHelp: false }); + const out = renderStatusLine({ name: 'Alice', mode: 'manual_flush', pending: 3, showHelp: false }); expect(out).toContain('drive Alice'); - expect(out).toContain('mode=human'); + expect(out).toContain('delivery=manual_flush'); expect(out).toContain('pending=3'); expect(out).toContain('Ctrl+B D detach'); }); it('uses save/restore cursor + reverse video so the agent screen is preserved', () => { - const out = renderStatusLine({ name: 'Alice', mode: 'human', pending: 0, showHelp: false }); + const out = renderStatusLine({ name: 'Alice', mode: 'manual_flush', pending: 0, showHelp: false }); expect(out.startsWith('\x1b7')).toBe(true); // save cursor expect(out.endsWith('\x1b8')).toBe(true); // restore cursor expect(out).toContain('\x1b[7m'); // reverse video @@ -454,7 +454,7 @@ describe('renderStatusLine', () => { it('positions at the given row', () => { const out = renderStatusLine({ name: 'A', - mode: 'human', + mode: 'manual_flush', pending: 0, showHelp: false, rows: 50, @@ -463,22 +463,22 @@ describe('renderStatusLine', () => { }); it('shows extra hint when help is toggled on', () => { - const out = renderStatusLine({ name: 'A', mode: 'human', pending: 0, showHelp: true }); + const out = renderStatusLine({ name: 'A', mode: 'manual_flush', pending: 0, showHelp: true }); expect(out).toContain('hide help'); }); }); describe('runDriveSession', () => { - it('flips to human mode, renders snapshot, opens WS, then restores prior mode on detach', async () => { - const { deps, sockets, fetchLog, stdin } = createHarness({ initialMode: 'passthrough' }); + it('flips to manual_flush delivery mode, renders snapshot, opens WS, then restores prior mode on detach', async () => { + const { deps, sockets, fetchLog, stdin } = createHarness({ initialMode: 'auto_inject' }); const sessionPromise = runDriveSession('Alice', {}, deps); const socket = await openSocket(sockets); expect(socket.url).toBe('ws://localhost:3889/ws'); expect(socket.headers['X-API-Key']).toBe('k'); - // PUT /mode body should be { mode: 'human' }. - const flipCall = fetchLog.find((c) => c.method === 'PUT' && c.url.endsWith('/mode')); - expect(flipCall?.body).toEqual({ mode: 'human' }); + // PUT /delivery-mode body should be { mode: 'manual_flush' }. + const flipCall = fetchLog.find((c) => c.method === 'PUT' && c.url.endsWith('/delivery-mode')); + expect(flipCall?.body).toEqual({ mode: 'manual_flush' }); // Raw mode should be on after open. expect(stdin.rawModeCalls.includes(true)).toBe(true); @@ -491,10 +491,10 @@ describe('runDriveSession', () => { // Raw mode restored. expect(stdin.rawModeCalls).toEqual([true, false]); - // Last PUT /mode call should restore to 'passthrough' (the prior mode). - const modeCalls = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/mode')); + // Last PUT /delivery-mode call should restore to 'auto_inject' (the prior mode). + const modeCalls = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/delivery-mode')); expect(modeCalls).toHaveLength(2); - expect(modeCalls[1].body).toEqual({ mode: 'passthrough' }); + expect(modeCalls[1].body).toEqual({ mode: 'auto_inject' }); }); it('aborts before opening the WS when the broker rejects the mode flip', async () => { @@ -516,8 +516,8 @@ describe('runDriveSession', () => { expect(sockets).toHaveLength(0); expect(errors[0]?.[0]).toMatch(/no agent named/); // Best-effort restore PUT should still have fired. - const modeCalls = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/mode')); - expect(modeCalls.map((c) => c.body)).toEqual([{ mode: 'human' }, { mode: 'passthrough' }]); + const modeCalls = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/delivery-mode')); + expect(modeCalls.map((c) => c.body)).toEqual([{ mode: 'manual_flush' }, { mode: 'auto_inject' }]); }); it('aborts before opening the WS when the worker has no PTY', async () => { @@ -608,7 +608,7 @@ describe('runDriveSession', () => { }); it('restores the prior mode even on abnormal WebSocket close', async () => { - const { deps, sockets, fetchLog, errors } = createHarness({ initialMode: 'passthrough' }); + const { deps, sockets, fetchLog, errors } = createHarness({ initialMode: 'auto_inject' }); const sessionPromise = runDriveSession('Alice', {}, deps); const socket = await openSocket(sockets); @@ -617,21 +617,21 @@ describe('runDriveSession', () => { expect(code).toBe(1); expect(errors.some((args) => String(args[0]).includes('connection closed'))).toBe(true); - const modeCalls = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/mode')); - expect(modeCalls.map((c) => c.body)).toEqual([{ mode: 'human' }, { mode: 'passthrough' }]); + const modeCalls = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/delivery-mode')); + expect(modeCalls.map((c) => c.body)).toEqual([{ mode: 'manual_flush' }, { mode: 'auto_inject' }]); }); - it('proceeds when the worker is already in human mode (re-attach scenario)', async () => { - const { deps, sockets, stdin, fetchLog } = createHarness({ initialMode: 'human' }); + it('proceeds when the worker is already in manual_flush mode (re-attach scenario)', async () => { + const { deps, sockets, stdin, fetchLog } = createHarness({ initialMode: 'manual_flush' }); const sessionPromise = runDriveSession('Alice', {}, deps); await openSocket(sockets); stdin.type(Buffer.from([0x03])); await sessionPromise; - const modeCalls = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/mode')); - // Restore to 'human' since that was the prior mode. - expect(modeCalls.map((c) => c.body)).toEqual([{ mode: 'human' }, { mode: 'human' }]); + const modeCalls = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/delivery-mode')); + // Restore to 'manual_flush' since that was the prior mode. + expect(modeCalls.map((c) => c.body)).toEqual([{ mode: 'manual_flush' }, { mode: 'manual_flush' }]); }); it('exits cleanly on SIGINT', async () => { diff --git a/src/cli/commands/drive.ts b/src/cli/commands/drive.ts index 5fb944322..7b9ebbe8e 100644 --- a/src/cli/commands/drive.ts +++ b/src/cli/commands/drive.ts @@ -1,30 +1,30 @@ /** * `agent-relay drive ` — interactive read-write take-over client. * - * Attaches to a running agent, flips it into `human` session mode so the + * Attaches to a running agent, flips it into `manual_flush` inbound delivery mode so the * broker parks new relay messages in a per-worker queue, and forwards your * keystrokes to the worker's PTY. You can drain the queue on demand with * `Ctrl+G` and detach with `Ctrl+B D` (or `Ctrl+C` as a safety alias). - * Detaching restores the worker's previous session mode and leaves the + * Detaching restores the worker's previous inbound delivery mode and leaves the * agent running under the broker — `drive` never kills the worker. * * Sequence of operations on attach: * * 1. Discover broker connection (CLI flag → env → connection.json). - * 2. `GET /api/spawned/{name}/mode` → remember the previous mode. - * 3. `PUT /api/spawned/{name}/mode` → switch to `human`. + * 2. `GET /api/spawned/{name}/delivery-mode` → remember the previous mode. + * 3. `PUT /api/spawned/{name}/delivery-mode` → switch to `manual_flush`. * 4. `captureAndRenderSnapshot` → repaint the agent's current screen. * 5. `GET /api/spawned/{name}/pending` → seed the status-line counter. * 6. Open `/ws`, subscribe to events for this worker. * 7. Switch local stdin to raw mode; forward bytes to `POST /api/input/{name}`. * - * On detach (clean or abnormal), best-effort `PUT .../mode` restores the + * On detach (clean or abnormal), best-effort `PUT .../delivery-mode` restores the * previous mode so the queue doesn't fill up indefinitely. */ import { Buffer } from 'node:buffer'; -import type { SessionMode } from '@agent-relay/sdk'; +import type { InboundDeliveryMode } from '@agent-relay/sdk'; import { Command } from 'commander'; import WebSocket from 'ws'; @@ -45,8 +45,8 @@ import { createBrokerClient, mapBrokerSdkFailure } from '../lib/sdk-client.js'; type ExitFn = (code: number) => never; -/** Wire string for the broker's `SessionMode` enum. */ -export type { SessionMode }; +/** Wire string for the broker's `InboundDeliveryMode` enum. */ +export type { InboundDeliveryMode }; /** Minimal WebSocket surface we depend on — same shape as `view`'s. */ export interface DriveWebSocket { @@ -161,37 +161,37 @@ function withDefaults(overrides: Partial = {}): DriveDependen /** ----- HTTP helpers ----- */ -/** `GET /api/spawned/{name}/mode` → `'human' | 'passthrough'` or `null` on failure. */ -export async function getSessionMode( +/** `GET /api/spawned/{name}/delivery-mode` → `'manual_flush' | 'auto_inject'` or `null` on failure. */ +export async function getInboundDeliveryMode( connection: BrokerConnection, name: string, fetchFn: typeof globalThis.fetch -): Promise { +): Promise { try { - return await createBrokerClient(connection, fetchFn).getSessionMode(name); + return await createBrokerClient(connection, fetchFn).getInboundDeliveryMode(name); } catch { return null; } } -/** Outcome of a `PUT /api/spawned/{name}/mode` call. */ -export interface SetSessionModeResult { +/** Outcome of a `PUT /api/spawned/{name}/delivery-mode` call. */ +export interface SetInboundDeliveryModeResult { ok: boolean; status: number; - /** Server-reported number of pending messages drained on a `human→passthrough` flip. */ + /** Server-reported number of pending messages drained on a `manual_flush→auto_inject` flip. */ flushed?: number; /** Human-readable error message when `ok` is false. */ message?: string; } -export async function setSessionMode( +export async function setInboundDeliveryMode( connection: BrokerConnection, name: string, - mode: SessionMode, + mode: InboundDeliveryMode, fetchFn: typeof globalThis.fetch -): Promise { +): Promise { try { - const body = await createBrokerClient(connection, fetchFn).setSessionMode(name, mode); + const body = await createBrokerClient(connection, fetchFn).setInboundDeliveryMode(name, mode); const flushed = body.flushed; return { ok: true, status: 200, flushed }; } catch (err: unknown) { @@ -412,7 +412,7 @@ export class KeybindParser { */ export function renderStatusLine(opts: { name: string; - mode: SessionMode; + mode: InboundDeliveryMode; pending: number; showHelp: boolean; /** Terminal rows — defaults to 24 if unknown. The status line lands on row N. */ @@ -422,7 +422,7 @@ export function renderStatusLine(opts: { const help = opts.showHelp ? ' | Ctrl+G flush | Ctrl+B D detach | Ctrl+B ? hide help' : ' | Ctrl+G flush | Ctrl+B D detach'; - const text = `[drive ${opts.name} | mode=${opts.mode} | pending=${opts.pending}${help}]`; + const text = `[drive ${opts.name} | delivery=${opts.mode} | pending=${opts.pending}${help}]`; // ESC 7 = save cursor; ESC[;1H = move to bottom row; ESC[2K = clear line; // ESC[7m = reverse video; ESC[0m = reset; ESC 8 = restore cursor. return `\x1b7\x1b[${row};1H\x1b[2K\x1b[7m${text}\x1b[0m\x1b8`; @@ -433,7 +433,7 @@ export function renderStatusLine(opts: { /** * Open a `drive` session. Resolves with the exit code the CLI should * propagate. Cleans up its own stdin raw-mode and best-effort restores - * the worker's previous session mode on any exit path. + * the worker's previous inbound delivery mode on any exit path. */ export async function runDriveSession( agentName: string, @@ -461,19 +461,21 @@ export async function runDriveSession( // Remember the worker's prior mode so we can restore it on detach. // `null` means we couldn't read it (broker hiccup or worker missing); - // we default the restore target to `passthrough` in that case so the + // we default the restore target to `auto_inject` in that case so the // queue doesn't keep growing. - const previousMode = await getSessionMode(connection, name, deps.fetch); + const previousMode = await getInboundDeliveryMode(connection, name, deps.fetch); - // Flip the worker into human mode. If this fails outright, abort + // Flip the worker into manual_flush mode. If this fails outright, abort // before doing anything else — we don't want to redraw the screen // and then silently keep auto-injecting into the agent. - const flip = await setSessionMode(connection, name, 'human', deps.fetch); + const flip = await setInboundDeliveryMode(connection, name, 'manual_flush', deps.fetch); if (!flip.ok) { if (flip.status === 404) { deps.error(`Error: no agent named '${name}'`); } else { - deps.error(`Error: could not switch '${name}' to human mode: ${flip.message ?? 'unknown error'}`); + deps.error( + `Error: could not switch '${name}' to manual_flush mode: ${flip.message ?? 'unknown error'}` + ); } return 1; } @@ -491,11 +493,11 @@ export async function runDriveSession( break; case 'not_found': // Best-effort restore — we did flip the mode above. - await setSessionMode(connection, name, previousMode ?? 'passthrough', deps.fetch); + await setInboundDeliveryMode(connection, name, previousMode ?? 'auto_inject', deps.fetch); deps.error(`Error: ${snapshot.message ?? `no agent named '${name}'`}`); return 1; case 'no_pty': - await setSessionMode(connection, name, previousMode ?? 'passthrough', deps.fetch); + await setInboundDeliveryMode(connection, name, previousMode ?? 'auto_inject', deps.fetch); deps.error(`Error: ${snapshot.message ?? `agent '${name}' has no PTY to drive`}`); return 1; case 'unavailable': @@ -525,7 +527,7 @@ export async function runDriveSession( deps.writeChunk( renderStatusLine({ name, - mode: 'human', + mode: 'manual_flush', pending, showHelp, rows: terminalRows, @@ -667,8 +669,8 @@ export async function runDriveSession( // best effort } // Best-effort: restore the worker's previous mode so we don't - // leave it stuck in human and silently piling up queued messages. - void setSessionMode(connection, name, previousMode ?? 'passthrough', deps.fetch).finally(() => { + // leave it stuck in manual_flush and silently piling up queued messages. + void setInboundDeliveryMode(connection, name, previousMode ?? 'auto_inject', deps.fetch).finally(() => { resolve(code); }); }; diff --git a/src/cli/commands/new.test.ts b/src/cli/commands/new.test.ts index 362a18bb0..250dbd6f6 100644 --- a/src/cli/commands/new.test.ts +++ b/src/cli/commands/new.test.ts @@ -134,7 +134,12 @@ describe('runNew', () => { const { deps, logs, fetchLog } = createHarness(); const code = await runNew('Alice', 'claude', ['--say', 'hi'], {}, deps); expect(code).toBe(0); - expect(fetchLog[0].body).toEqual({ name: 'Alice', cli: 'claude', args: ['--say', 'hi'] }); + expect(fetchLog[0].body).toEqual({ + name: 'Alice', + cli: 'claude', + args: ['--say', 'hi'], + channels: [], + }); expect(logs.some((args) => String(args[0]).includes('Spawned agent: Alice'))).toBe(true); expect(logs.some((args) => String(args[0]).includes('attach with: agent-relay drive Alice'))).toBe(true); }); @@ -179,6 +184,7 @@ describe('runNew', () => { expect(fetchLog[0].body).toEqual({ name: 'Alice', cli: 'claude', + args: [], task: 'fix the bug', team: 'core', model: 'opus', diff --git a/src/cli/commands/new.ts b/src/cli/commands/new.ts index 612c6b97b..d59c765f8 100644 --- a/src/cli/commands/new.ts +++ b/src/cli/commands/new.ts @@ -261,7 +261,7 @@ export function registerNewCommands( .option('--attach', 'After spawning, immediately open a session (default mode: drive)') .option( '--mode ', - 'With --attach: session mode to open (view | drive | passthrough). Ignored without --attach.' + 'With --attach: session to open (view | drive | passthrough). Ignored without --attach.' ) .option('--ephemeral', 'With --attach: release the agent on client exit. Ignored without --attach.') .option('--broker-url ', 'Broker base URL (overrides RELAY_BROKER_URL and connection.json)') diff --git a/src/cli/commands/passthrough.test.ts b/src/cli/commands/passthrough.test.ts index 494f57d9d..9c7f43ff8 100644 --- a/src/cli/commands/passthrough.test.ts +++ b/src/cli/commands/passthrough.test.ts @@ -123,7 +123,7 @@ type FetchRoute = (init?: RequestInit) => Promise; interface FetchScript { routes?: Record; - initialMode?: 'human' | 'passthrough'; + initialMode?: 'manual_flush' | 'auto_inject'; modeFlipFailure?: { status: number; error?: string }; snapshotResult?: Awaited>; terminalSize?: { rows: number; cols: number } | null; @@ -156,7 +156,7 @@ function createHarness(opts: FetchScript = {}): { opts.terminalSize === undefined ? { rows: 30, cols: 100 } : opts.terminalSize ); - const initialMode = opts.initialMode ?? 'passthrough'; + const initialMode = opts.initialMode ?? 'auto_inject'; const defaultRoutes: Record = { 'POST /resize': async () => @@ -164,12 +164,12 @@ function createHarness(opts: FetchScript = {}): { status: 200, headers: { 'Content-Type': 'application/json' }, }), - 'GET /mode': async () => + 'GET /delivery-mode': async () => new Response(JSON.stringify({ mode: initialMode }), { status: 200, headers: { 'Content-Type': 'application/json' }, }), - 'PUT /mode': async (init) => { + 'PUT /delivery-mode': async (init) => { if (opts.modeFlipFailure) { return new Response(JSON.stringify({ error: opts.modeFlipFailure.error ?? 'fail' }), { status: opts.modeFlipFailure.status, @@ -220,8 +220,8 @@ function createHarness(opts: FetchScript = {}): { fetchLog.push({ url, method, body: bodyJson, headers }); let key: string | null = null; - if (/\/api\/spawned\/[^/]+\/mode$/.test(url)) { - key = `${method} /mode`; + if (/\/api\/spawned\/[^/]+\/delivery-mode$/.test(url)) { + key = `${method} /delivery-mode`; } else if (/\/api\/input\/[^/]+$/.test(url)) { key = `${method} /input`; } else if (/\/api\/resize\/[^/]+$/.test(url)) { @@ -301,7 +301,7 @@ describe('classifyWsEvent', () => { ).toEqual({ kind: 'other' }); }); - it('returns other for delivery_queued (no queue in passthrough mode)', () => { + it('returns other for delivery_queued (no queue in passthrough session)', () => { expect(classifyWsEvent(JSON.stringify({ kind: 'delivery_queued', name: 'Alice' }), 'Alice')).toEqual({ kind: 'other', }); @@ -355,16 +355,16 @@ describe('PassthroughKeybindParser', () => { }); describe('renderStatusLine', () => { - it('shows [passthrough name | mode=passthrough] without a pending counter', () => { - const out = renderStatusLine({ name: 'Alice', mode: 'passthrough', showHelp: false }); + it('shows [passthrough name | delivery=auto_inject] without a pending counter', () => { + const out = renderStatusLine({ name: 'Alice', mode: 'auto_inject', showHelp: false }); expect(out).toContain('passthrough Alice'); - expect(out).toContain('mode=passthrough'); + expect(out).toContain('delivery=auto_inject'); expect(out).toContain('Ctrl+B D detach'); expect(out).not.toContain('pending='); }); it('uses save/restore cursor + reverse video', () => { - const out = renderStatusLine({ name: 'A', mode: 'passthrough', showHelp: false }); + const out = renderStatusLine({ name: 'A', mode: 'auto_inject', showHelp: false }); expect(out.startsWith('\x1b7')).toBe(true); expect(out.endsWith('\x1b8')).toBe(true); expect(out).toContain('\x1b[7m'); @@ -374,31 +374,31 @@ describe('renderStatusLine', () => { describe('runPassthroughSession', () => { it('ensures passthrough mode on attach, opens WS, then restores prior mode on detach', async () => { - const { deps, sockets, fetchLog, stdin } = createHarness({ initialMode: 'passthrough' }); + const { deps, sockets, fetchLog, stdin } = createHarness({ initialMode: 'auto_inject' }); const sessionPromise = runPassthroughSession('Alice', {}, deps); const socket = await openSocket(sockets); expect(socket.url).toBe('ws://localhost:3889/ws'); expect(socket.headers['X-API-Key']).toBe('k'); - // After attach (before detach), exactly one PUT /mode should have fired: + // After attach (before detach), exactly one PUT /delivery-mode should have fired: // the "ensure passthrough" call. The restore PUT only fires after detach. - const afterAttach = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/mode')); - expect(afterAttach.map((c) => c.body)).toEqual([{ mode: 'passthrough' }]); + const afterAttach = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/delivery-mode')); + expect(afterAttach.map((c) => c.body)).toEqual([{ mode: 'auto_inject' }]); expect(stdin.rawModeCalls).toEqual([true]); stdin.type(Buffer.from([0x02, 0x44])); // Ctrl+B D const code = await sessionPromise; expect(code).toBe(0); - // After detach, the restore PUT to the prior mode ('passthrough') should + // After detach, the restore PUT to the prior mode should // have fired, and raw mode should be off. - const afterDetach = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/mode')); - expect(afterDetach.map((c) => c.body)).toEqual([{ mode: 'passthrough' }, { mode: 'passthrough' }]); + const afterDetach = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/delivery-mode')); + expect(afterDetach.map((c) => c.body)).toEqual([{ mode: 'auto_inject' }, { mode: 'auto_inject' }]); expect(stdin.rawModeCalls).toEqual([true, false]); }); - it('flips back to passthrough even when the worker was in human mode on attach, then restores to human on detach', async () => { - const { deps, sockets, fetchLog, stdin } = createHarness({ initialMode: 'human' }); + it('flips to auto_inject even when the worker was in manual_flush mode on attach, then restores on detach', async () => { + const { deps, sockets, fetchLog, stdin } = createHarness({ initialMode: 'manual_flush' }); const sessionPromise = runPassthroughSession('Alice', {}, deps); await openSocket(sockets); @@ -406,9 +406,9 @@ describe('runPassthroughSession', () => { await sessionPromise; const flipBodies = fetchLog - .filter((c) => c.method === 'PUT' && c.url.endsWith('/mode')) + .filter((c) => c.method === 'PUT' && c.url.endsWith('/delivery-mode')) .map((c) => c.body); - expect(flipBodies).toEqual([{ mode: 'passthrough' }, { mode: 'human' }]); + expect(flipBodies).toEqual([{ mode: 'auto_inject' }, { mode: 'manual_flush' }]); }); it('aborts before opening the WS when the broker rejects the mode flip', async () => { @@ -430,8 +430,8 @@ describe('runPassthroughSession', () => { expect(sockets).toHaveLength(0); expect(errors[0]?.[0]).toMatch(/no agent named/); // Best-effort restore PUT. - const flips = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/mode')); - expect(flips.map((c) => c.body)).toEqual([{ mode: 'passthrough' }, { mode: 'passthrough' }]); + const flips = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/delivery-mode')); + expect(flips.map((c) => c.body)).toEqual([{ mode: 'auto_inject' }, { mode: 'auto_inject' }]); }); it('continues with a warning when the snapshot is transiently unavailable', async () => { @@ -474,7 +474,7 @@ describe('runPassthroughSession', () => { }); it('restores the prior mode even on abnormal WebSocket close', async () => { - const { deps, sockets, fetchLog, errors } = createHarness({ initialMode: 'human' }); + const { deps, sockets, fetchLog, errors } = createHarness({ initialMode: 'manual_flush' }); const sessionPromise = runPassthroughSession('Alice', {}, deps); const socket = await openSocket(sockets); @@ -483,8 +483,10 @@ describe('runPassthroughSession', () => { expect(code).toBe(1); expect(errors.some((args) => String(args[0]).includes('connection closed'))).toBe(true); - const flips = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/mode')).map((c) => c.body); - expect(flips).toEqual([{ mode: 'passthrough' }, { mode: 'human' }]); + const flips = fetchLog + .filter((c) => c.method === 'PUT' && c.url.endsWith('/delivery-mode')) + .map((c) => c.body); + expect(flips).toEqual([{ mode: 'auto_inject' }, { mode: 'manual_flush' }]); }); it('exits cleanly on SIGINT', async () => { @@ -577,7 +579,9 @@ describe('registerPassthroughCommands', () => { registerPassthroughCommands(program, deps); const cmd = program.commands.find((c) => c.name() === 'passthrough'); expect(cmd).toBeDefined(); - expect(cmd?.description()).toMatch(/passthrough mode/i); + expect(cmd?.description()).toMatch(/passthrough/i); + expect(program.commands.find((c) => c.name() === 'relay')).toBeUndefined(); + expect(cmd?.aliases()).not.toContain('relay'); }); it('wires --broker-url, --api-key, and --state-dir', () => { diff --git a/src/cli/commands/passthrough.ts b/src/cli/commands/passthrough.ts index ad04d6cac..2458cc27a 100644 --- a/src/cli/commands/passthrough.ts +++ b/src/cli/commands/passthrough.ts @@ -1,22 +1,22 @@ /** - * `agent-relay passthrough ` — read-write attach in passthrough mode. + * `agent-relay passthrough ` — read-write attach in passthrough session. * * The broker auto-injects inbound relay messages into the agent's PTY * while the human also types; both writers race. That's the point — - * passthrough mode is for observe-and-occasionally-nudge sessions + * passthrough is for observe-and-occasionally-nudge sessions * while the broker does its coordination thing. For exclusive * deterministic control with no auto-inject, use `drive` instead. * - * On attach, ensures the worker is in `passthrough` mode (it's the + * On attach, ensures the worker is in `auto_inject` delivery mode (it's the * broker default, but if someone left a `drive` session the worker may - * be in `human` mode — `passthrough` flips it back for the session's + * be in `manual_flush` mode — `passthrough` flips it back for the session's * duration and restores the prior mode on detach). On detach, restores * the prior mode and leaves the agent running. * * The session loop (snapshot-on-attach, raw stdin, resize forwarding, * detach keybind, Ctrl+C-as-detach safety alias) mirrors the shape of * `drive.ts` minus the pending-queue UI and `Ctrl+G` flush binding - * (there's no queue in passthrough mode). `drive.ts` is the more + * (there's no queue in passthrough session). `drive.ts` is the more * heavily-commented version of the shared shape; this module * duplicates rather than abstracts because the trimmed surface is * small enough that an extra layer of indirection would cost more @@ -40,7 +40,13 @@ import { toWsUrl, } from '../lib/broker-connection.js'; import { defaultExit, runSignalHandler } from '../lib/exit.js'; -import { getSessionMode, resizeWorker, sendInput, setSessionMode, type SessionMode } from './drive.js'; +import { + getInboundDeliveryMode, + resizeWorker, + sendInput, + setInboundDeliveryMode, + type InboundDeliveryMode, +} from './drive.js'; type ExitFn = (code: number) => never; @@ -141,8 +147,8 @@ function isStringObject(value: unknown): value is Record { /** Discriminated union of broker events the `passthrough` client cares * about. No `delivery_queued` / `agent_pending_drained` — there's no - * queue in passthrough mode, so those events (which the broker doesn't - * emit for passthrough-mode workers anyway) would be `other`. */ + * queue in passthrough session, so those events (which the broker doesn't + * emit while the worker is in `auto_inject`) would be `other`. */ export type PassthroughWsEvent = { kind: 'worker_stream'; chunk: string } | { kind: 'other' }; /** @@ -237,17 +243,17 @@ export class PassthroughKeybindParser { /** * Render the bottom-of-terminal status line for `passthrough`. Same * save/restore-cursor trick as `drive`, no pending counter (there - * isn't one in passthrough mode). + * isn't one in passthrough session). */ export function renderStatusLine(opts: { name: string; - mode: SessionMode; + mode: InboundDeliveryMode; showHelp: boolean; rows?: number; }): string { const row = Math.max(opts.rows ?? 24, 1); const help = opts.showHelp ? ' | Ctrl+B D detach | Ctrl+B ? hide help' : ' | Ctrl+B D detach'; - const text = `[passthrough ${opts.name} | mode=${opts.mode}${help}]`; + const text = `[passthrough ${opts.name} | delivery=${opts.mode}${help}]`; return `\x1b7\x1b[${row};1H\x1b[2K\x1b[7m${text}\x1b[0m\x1b8`; } @@ -256,7 +262,7 @@ export function renderStatusLine(opts: { /** * Open a `passthrough` session. Resolves with the exit code the CLI * should propagate. Cleans up its own stdin raw-mode and best-effort - * restores the worker's previous session mode on any exit path. + * restores the worker's previous inbound delivery mode on any exit path. */ export async function runPassthroughSession( agentName: string, @@ -284,24 +290,24 @@ export async function runPassthroughSession( // Remember the worker's prior mode so we can restore on detach. // `null` means we couldn't read it (broker hiccup or worker missing); - // we default the restore target to `passthrough` in that case (which + // we default the restore target to `auto_inject` in that case (which // is also our preferred final state). - const previousMode = await getSessionMode(connection, name, deps.fetch); + const previousMode = await getInboundDeliveryMode(connection, name, deps.fetch); - // If the worker is in `human` mode (e.g. someone left a `drive` - // session), flip it back to `passthrough` for the duration of our + // If the worker is in `manual_flush` mode (e.g. someone left a `drive` + // session), flip it back to `auto_inject` for the duration of our // session. This matches the verb's intent: `agent-relay passthrough alice` // means "watch alice with auto-inject on". If the worker is already - // in `passthrough` we still issue the PUT — it's idempotent on the + // in `auto_inject` we still issue the PUT — it's idempotent on the // broker and gives us an early hard-failure on missing-agent before // we touch the terminal. - const flip = await setSessionMode(connection, name, 'passthrough', deps.fetch); + const flip = await setInboundDeliveryMode(connection, name, 'auto_inject', deps.fetch); if (!flip.ok) { if (flip.status === 404) { deps.error(`Error: no agent named '${name}'`); } else { deps.error( - `Error: could not ensure '${name}' is in passthrough mode: ${flip.message ?? 'unknown error'}` + `Error: could not ensure '${name}' is in passthrough session: ${flip.message ?? 'unknown error'}` ); } return 1; @@ -316,11 +322,11 @@ export async function runPassthroughSession( case 'ok': break; case 'not_found': - await setSessionMode(connection, name, previousMode ?? 'passthrough', deps.fetch); + await setInboundDeliveryMode(connection, name, previousMode ?? 'auto_inject', deps.fetch); deps.error(`Error: ${snapshot.message ?? `no agent named '${name}'`}`); return 1; case 'no_pty': - await setSessionMode(connection, name, previousMode ?? 'passthrough', deps.fetch); + await setInboundDeliveryMode(connection, name, previousMode ?? 'auto_inject', deps.fetch); deps.error(`Error: ${snapshot.message ?? `agent '${name}' has no PTY to attach to`}`); return 1; case 'unavailable': @@ -339,7 +345,7 @@ export async function runPassthroughSession( (typeof snapshot.rows === 'number' && snapshot.rows > 0 ? snapshot.rows : undefined); const paintStatus = (): void => { - deps.writeChunk(renderStatusLine({ name, mode: 'passthrough', showHelp, rows: terminalRows })); + deps.writeChunk(renderStatusLine({ name, mode: 'auto_inject', showHelp, rows: terminalRows })); }; paintStatus(); @@ -447,8 +453,8 @@ export async function runPassthroughSession( // best effort } // Restore the worker's previous mode (no-op if it was already - // passthrough, which is the common case). - void setSessionMode(connection, name, previousMode ?? 'passthrough', deps.fetch).finally(() => { + // auto-inject, which is the common case). + void setInboundDeliveryMode(connection, name, previousMode ?? 'auto_inject', deps.fetch).finally(() => { resolve(code); }); }; @@ -518,7 +524,7 @@ export function registerPassthroughCommands( program .command('passthrough') .description( - 'Watch a running agent in passthrough mode: broker auto-injects inbound relay messages while you type alongside (last-writer-wins)' + 'Watch a running agent in passthrough session: broker auto-injects inbound relay messages while you type alongside (last-writer-wins)' ) .argument('', 'Agent name to attach to') .option('--broker-url ', 'Broker base URL (overrides RELAY_BROKER_URL and connection.json)') diff --git a/src/cli/commands/rm.ts b/src/cli/commands/rm.ts index 7f0d29ef9..9f93f0a2c 100644 --- a/src/cli/commands/rm.ts +++ b/src/cli/commands/rm.ts @@ -6,7 +6,7 @@ * one-line explanatory error on failure. Connection discovery uses the * shared `resolveBrokerConnection` helper so the same `--broker-url` / * `RELAY_BROKER_URL` / `connection.json` chain works as for `view` / - * `drive` / `relay`. + * `drive` / `passthrough`. * * The longer-form `release` command in `agent-management.ts` layers * broker autostart on top of the same SDK client; `rm` is the lighter diff --git a/src/cli/lib/broker-connection.ts b/src/cli/lib/broker-connection.ts index b32260781..d837dbfd7 100644 --- a/src/cli/lib/broker-connection.ts +++ b/src/cli/lib/broker-connection.ts @@ -1,6 +1,6 @@ /** * Shared broker-connection discovery for the attach-style CLI verbs - * (`view`, `drive`, `relay`). + * (`view`, `drive`, `passthrough`). * * Resolution order matches `agent-relay-broker dump-pty` so users don't * have to learn two patterns: diff --git a/src/cli/lib/spawn-and-attach.ts b/src/cli/lib/spawn-and-attach.ts index b5747ddc4..27a656aaa 100644 --- a/src/cli/lib/spawn-and-attach.ts +++ b/src/cli/lib/spawn-and-attach.ts @@ -44,7 +44,11 @@ import { type DriveTerminal, type DriveWebSocket, } from '../commands/drive.js'; -import { runPassthroughSession, type PassthroughDependencies } from '../commands/passthrough.js'; +import { + runPassthroughSession, + type PassthroughDependencies, + type PassthroughWebSocket, +} from '../commands/passthrough.js'; import { runViewSession, type ViewDependencies, type ViewWebSocket } from '../commands/view.js'; export type AttachMode = 'view' | 'drive' | 'passthrough'; @@ -170,7 +174,7 @@ export function buildDefaultAttachChildDeps(): AttachChildDependencies { const passthroughDeps: PassthroughDependencies = { ...sharedConnectionDeps, - createWebSocket: (url, headers) => new WebSocket(url, { headers }) as DriveWebSocket, + createWebSocket: (url, headers) => new WebSocket(url, { headers }) as PassthroughWebSocket, writeChunk: sharedWriteChunk, onSignal: sharedOnSignal, log: sharedLog, diff --git a/src/listen_api.rs b/src/listen_api.rs index ca7f9dbd1..d10f379b7 100644 --- a/src/listen_api.rs +++ b/src/listen_api.rs @@ -10,7 +10,7 @@ use relay_broker::{ multi_workspace::WorkspaceMembershipSummary, protocol::MessageInjectionMode, replay_buffer::ReplayBuffer, - types::{PendingRelayMessage, SessionMode}, + types::{InboundDeliveryMode, PendingRelayMessage}, }; use serde::Deserialize; use serde_json::{json, Value}; @@ -136,64 +136,65 @@ pub enum ListenApiRequest { RenewLease { reply: tokio::sync::oneshot::Sender>, }, - /// `GET /api/spawned/{name}/mode` — read the current session mode. - GetSessionMode { + /// `GET /api/spawned/{name}/delivery-mode` — read the current inbound + /// delivery mode. + GetInboundDeliveryMode { name: String, - reply: tokio::sync::oneshot::Sender>, + reply: tokio::sync::oneshot::Sender>, }, - /// `PUT /api/spawned/{name}/mode` — set the session mode. On a - /// `human → passthrough` transition the broker drains the pending + /// `PUT /api/spawned/{name}/delivery-mode` — set the inbound delivery mode. + /// On a `manual_flush → auto_inject` transition the broker drains the pending /// queue into the worker (via the existing inject path) before /// replying; `flushed` reports how many messages were injected. - SetSessionMode { + SetInboundDeliveryMode { name: String, - mode: SessionMode, - reply: tokio::sync::oneshot::Sender>, + mode: InboundDeliveryMode, + reply: tokio::sync::oneshot::Sender>, }, /// `GET /api/spawned/{name}/pending` — snapshot the per-worker /// pending-message queue (FIFO, head first). GetPending { name: String, - reply: tokio::sync::oneshot::Sender, SessionRouteError>>, + reply: tokio::sync::oneshot::Sender, DeliveryRouteError>>, }, /// `POST /api/spawned/{name}/flush` — drain the pending queue and /// inject every message into the worker via the existing /// fire-and-forget inject path. Does *not* change the mode. FlushPending { name: String, - reply: tokio::sync::oneshot::Sender>, + reply: tokio::sync::oneshot::Sender>, }, } -/// Typed errors for the session-mode HTTP routes. Keeps the broker arm's +/// Typed errors for the inbound-delivery-mode HTTP routes. Keeps the broker arm's /// reply payload structured so the HTTP handler can map cleanly to 404 /// without parsing strings. The "broker channel closed" / "reply dropped" /// failure modes are handled at the HTTP boundary via [`internal_error`], /// so they don't need a variant here. #[derive(Debug, Clone, PartialEq, Eq)] -pub enum SessionRouteError { +pub enum DeliveryRouteError { /// No worker with that name is currently registered with the broker. WorkerNotFound(String), } -impl std::fmt::Display for SessionRouteError { +impl std::fmt::Display for DeliveryRouteError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - SessionRouteError::WorkerNotFound(name) => { + DeliveryRouteError::WorkerNotFound(name) => { write!(f, "agent_not_found: no worker named '{name}'") } } } } -impl std::error::Error for SessionRouteError {} +impl std::error::Error for DeliveryRouteError {} -/// Reply payload for [`ListenApiRequest::SetSessionMode`]. `flushed` +/// Reply payload for [`ListenApiRequest::SetInboundDeliveryMode`]. `flushed` /// is the number of pending messages drained during the transition -/// (always `0` unless we transitioned `human → passthrough`). +/// (always `0` unless we transitioned `manual_flush → auto_inject`). #[derive(Debug, Clone, PartialEq, Eq)] -pub struct SetSessionModeOk { - pub mode: SessionMode, +pub struct SetInboundDeliveryModeOk { + pub mode: InboundDeliveryMode, pub flushed: usize, } @@ -336,8 +337,9 @@ fn listen_api_router_with_auth( routing::get(listen_api_snapshot), ) .route( - "/api/spawned/{name}/mode", - routing::get(listen_api_get_session_mode).put(listen_api_set_session_mode), + "/api/spawned/{name}/delivery-mode", + routing::get(listen_api_get_inbound_delivery_mode) + .put(listen_api_set_inbound_delivery_mode), ) .route( "/api/spawned/{name}/pending", @@ -1163,23 +1165,24 @@ async fn listen_api_snapshot( } // --------------------------------------------------------------------------- -// Session mode (per-agent inject vs. queue, plus pending-queue inspection) +// Inbound delivery mode (per-agent inject vs. queue, plus pending-queue inspection) // -// The broker keeps a `SessionMode` per worker; `Human` mode parks inbound -// relay messages in a FIFO `pending` queue instead of injecting them. +// The broker keeps an `InboundDeliveryMode` per worker; `manual_flush` +// mode parks inbound relay messages in a FIFO `pending` queue instead +// of injecting them. // These four routes are the server-side surface the `agent-relay drive` // client calls to flip modes, inspect the queue, and drain it. // --------------------------------------------------------------------------- -/// `GET /api/spawned/{name}/mode` → `{ "mode": "passthrough" | "human" }`. -async fn listen_api_get_session_mode( +/// `GET /api/spawned/{name}/delivery-mode` → `{ "mode": "auto_inject" | "manual_flush" }`. +async fn listen_api_get_inbound_delivery_mode( axum::extract::State(state): axum::extract::State, axum::extract::Path(name): axum::extract::Path, ) -> (axum::http::StatusCode, axum::Json) { let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); if state .tx - .send(ListenApiRequest::GetSessionMode { + .send(ListenApiRequest::GetInboundDeliveryMode { name: name.clone(), reply: reply_tx, }) @@ -1193,33 +1196,34 @@ async fn listen_api_get_session_mode( axum::http::StatusCode::OK, axum::Json(json!({ "mode": mode.as_wire_str() })), ), - Ok(Err(err)) => session_route_error_to_response(&err), + Ok(Err(err)) => delivery_route_error_to_response(&err), Err(_) => internal_error(), } } #[derive(Debug, Deserialize)] -struct SetSessionModePayload { +struct SetInboundDeliveryModePayload { mode: String, } -/// `PUT /api/spawned/{name}/mode` — body `{ "mode": "passthrough" | "human" }`. +/// `PUT /api/spawned/{name}/delivery-mode` — body +/// `{ "mode": "auto_inject" | "manual_flush" }`. /// -/// On a `human → passthrough` transition the broker drains the pending +/// On a `manual_flush → auto_inject` transition the broker drains the pending /// queue into the worker via the existing inject path *before* replying, -/// so a caller flipping back to passthrough never strands messages. The +/// so a caller flipping back to auto-inject never strands messages. The /// response reports `flushed` (always `0` unless we drained). -async fn listen_api_set_session_mode( +async fn listen_api_set_inbound_delivery_mode( axum::extract::State(state): axum::extract::State, axum::extract::Path(name): axum::extract::Path, - axum::Json(body): axum::Json, + axum::Json(body): axum::Json, ) -> (axum::http::StatusCode, axum::Json) { - let Some(mode) = SessionMode::parse(&body.mode) else { + let Some(mode) = InboundDeliveryMode::parse(&body.mode) else { return api_error( axum::http::StatusCode::BAD_REQUEST, "invalid_mode", format!( - "unsupported session mode '{}' (expected 'passthrough' or 'human')", + "unsupported inbound delivery mode '{}' (expected 'auto_inject' or 'manual_flush')", body.mode ), ); @@ -1228,7 +1232,7 @@ async fn listen_api_set_session_mode( let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); if state .tx - .send(ListenApiRequest::SetSessionMode { + .send(ListenApiRequest::SetInboundDeliveryMode { name: name.clone(), mode, reply: reply_tx, @@ -1246,14 +1250,14 @@ async fn listen_api_set_session_mode( "flushed": ok.flushed, })), ), - Ok(Err(err)) => session_route_error_to_response(&err), + Ok(Err(err)) => delivery_route_error_to_response(&err), Err(_) => internal_error(), } } /// `GET /api/spawned/{name}/pending` → `{ "pending": [ ... ] }`, FIFO -/// (head of queue first). Empty array when the worker is in relay mode -/// or simply has no pending messages. +/// (head of queue first). Empty array when the worker is not in +/// `manual_flush` delivery mode or simply has no pending messages. async fn listen_api_get_pending( axum::extract::State(state): axum::extract::State, axum::extract::Path(name): axum::extract::Path, @@ -1307,7 +1311,7 @@ async fn listen_api_get_pending( axum::Json(json!({ "pending": pending })), ) } - Ok(Err(err)) => session_route_error_to_response(&err), + Ok(Err(err)) => delivery_route_error_to_response(&err), Err(_) => internal_error(), } } @@ -1315,9 +1319,9 @@ async fn listen_api_get_pending( /// `POST /api/spawned/{name}/flush` → `{ "flushed": N }`. /// /// Drains the queue and injects each message into the worker in order -/// using the existing fire-and-forget inject path. The session mode is -/// *not* changed — a caller still in human mode will continue to queue -/// newly-arriving messages. +/// using the existing fire-and-forget inject path. The inbound delivery mode is +/// *not* changed — a caller still in `manual_flush` delivery mode will continue +/// to queue newly-arriving messages. async fn listen_api_flush_pending( axum::extract::State(state): axum::extract::State, axum::extract::Path(name): axum::extract::Path, @@ -1339,19 +1343,19 @@ async fn listen_api_flush_pending( axum::http::StatusCode::OK, axum::Json(json!({ "flushed": flushed })), ), - Ok(Err(err)) => session_route_error_to_response(&err), + Ok(Err(err)) => delivery_route_error_to_response(&err), Err(_) => internal_error(), } } -/// Centralised mapping from [`SessionRouteError`] to HTTP responses for -/// the four session-mode routes. Mirrors +/// Centralised mapping from [`DeliveryRouteError`] to HTTP responses for +/// the four inbound-delivery-mode routes. Mirrors /// [`worker_request_error_to_response`] in shape. -fn session_route_error_to_response( - err: &SessionRouteError, +fn delivery_route_error_to_response( + err: &DeliveryRouteError, ) -> (axum::http::StatusCode, axum::Json) { match err { - SessionRouteError::WorkerNotFound(_) => api_error( + DeliveryRouteError::WorkerNotFound(_) => api_error( axum::http::StatusCode::NOT_FOUND, "agent_not_found", err.to_string(), @@ -1902,12 +1906,12 @@ mod auth_tests { use tower::ServiceExt; use super::{ - listen_api_router_with_auth, ListenApiConfig, ListenApiRequest, SessionRouteError, - SetSessionModeOk, + listen_api_router_with_auth, DeliveryRouteError, ListenApiConfig, ListenApiRequest, + SetInboundDeliveryModeOk, }; use crate::worker_request::RequestWorkerError; use relay_broker::protocol::MessageInjectionMode; - use relay_broker::types::{PendingRelayMessage, SessionMode}; + use relay_broker::types::{InboundDeliveryMode, PendingRelayMessage}; fn test_router( broker_api_key: Option<&str>, @@ -2870,7 +2874,7 @@ mod auth_tests { } // ----------------------------------------------------------------- - // Session mode: four routes that back the `agent-relay drive` + // Inbound delivery mode: four routes that back the `agent-relay drive` // client. The HTTP layer only forwards typed requests over the // broker channel — these tests cover the request shaping and // response mapping, not the broker arms (those live in main.rs and @@ -2878,13 +2882,13 @@ mod auth_tests { // ----------------------------------------------------------------- #[tokio::test] - async fn get_session_mode_route_returns_mode_string() { + async fn get_inbound_delivery_mode_route_returns_mode_string() { let (router, mut rx) = test_router(Some("secret")); let replier = tokio::spawn(async move { match rx.recv().await { - Some(ListenApiRequest::GetSessionMode { name, reply }) => { + Some(ListenApiRequest::GetInboundDeliveryMode { name, reply }) => { assert_eq!(name, "worker-a"); - let _ = reply.send(Ok(SessionMode::Human)); + let _ = reply.send(Ok(InboundDeliveryMode::ManualFlush)); } other => panic!("unexpected request: {:?}", other.map(|_| "other")), } @@ -2893,7 +2897,7 @@ mod auth_tests { let response = router .oneshot( Request::builder() - .uri("/api/spawned/worker-a/mode") + .uri("/api/spawned/worker-a/delivery-mode") .method("GET") .header("x-api-key", "secret") .body(Body::empty()) @@ -2904,23 +2908,23 @@ mod auth_tests { assert_eq!(response.status(), StatusCode::OK); let body = response_json(response).await; - assert_eq!(body, json!({ "mode": "human" })); + assert_eq!(body, json!({ "mode": "manual_flush" })); replier.await.expect("replier should complete"); } #[tokio::test] - async fn get_session_mode_route_returns_404_when_worker_missing() { + async fn get_inbound_delivery_mode_route_returns_404_when_worker_missing() { let (router, mut rx) = test_router(Some("secret")); let replier = tokio::spawn(async move { - if let Some(ListenApiRequest::GetSessionMode { reply, .. }) = rx.recv().await { - let _ = reply.send(Err(SessionRouteError::WorkerNotFound("ghost".into()))); + if let Some(ListenApiRequest::GetInboundDeliveryMode { reply, .. }) = rx.recv().await { + let _ = reply.send(Err(DeliveryRouteError::WorkerNotFound("ghost".into()))); } }); let response = router .oneshot( Request::builder() - .uri("/api/spawned/ghost/mode") + .uri("/api/spawned/ghost/delivery-mode") .method("GET") .header("x-api-key", "secret") .body(Body::empty()) @@ -2936,15 +2940,15 @@ mod auth_tests { } #[tokio::test] - async fn set_session_mode_route_forwards_parsed_mode_and_returns_flushed() { + async fn set_inbound_delivery_mode_route_forwards_parsed_mode_and_returns_flushed() { let (router, mut rx) = test_router(Some("secret")); let replier = tokio::spawn(async move { match rx.recv().await { - Some(ListenApiRequest::SetSessionMode { name, mode, reply }) => { + Some(ListenApiRequest::SetInboundDeliveryMode { name, mode, reply }) => { assert_eq!(name, "worker-a"); - assert_eq!(mode, SessionMode::Passthrough); - let _ = reply.send(Ok(SetSessionModeOk { - mode: SessionMode::Passthrough, + assert_eq!(mode, InboundDeliveryMode::AutoInject); + let _ = reply.send(Ok(SetInboundDeliveryModeOk { + mode: InboundDeliveryMode::AutoInject, flushed: 3, })); } @@ -2955,11 +2959,11 @@ mod auth_tests { let response = router .oneshot( Request::builder() - .uri("/api/spawned/worker-a/mode") + .uri("/api/spawned/worker-a/delivery-mode") .method("PUT") .header("x-api-key", "secret") .header("content-type", "application/json") - .body(Body::from(json!({ "mode": "passthrough" }).to_string())) + .body(Body::from(json!({ "mode": "auto_inject" }).to_string())) .expect("request should build"), ) .await @@ -2967,18 +2971,18 @@ mod auth_tests { assert_eq!(response.status(), StatusCode::OK); let body = response_json(response).await; - assert_eq!(body, json!({ "mode": "passthrough", "flushed": 3 })); + assert_eq!(body, json!({ "mode": "auto_inject", "flushed": 3 })); replier.await.expect("replier should complete"); } #[tokio::test] - async fn set_session_mode_route_rejects_invalid_mode_without_calling_broker() { + async fn set_inbound_delivery_mode_route_rejects_invalid_mode_without_calling_broker() { let (router, mut rx) = test_router(Some("secret")); let response = router .oneshot( Request::builder() - .uri("/api/spawned/worker-a/mode") + .uri("/api/spawned/worker-a/delivery-mode") .method("PUT") .header("x-api-key", "secret") .header("content-type", "application/json") @@ -2998,22 +3002,45 @@ mod auth_tests { } #[tokio::test] - async fn set_session_mode_route_returns_404_when_worker_missing() { + async fn legacy_mode_route_is_not_registered() { + let (router, mut rx) = test_router(Some("secret")); + + let response = router + .oneshot( + Request::builder() + .uri("/api/spawned/worker-a/mode") + .method("GET") + .header("x-api-key", "secret") + .body(Body::empty()) + .expect("request should build"), + ) + .await + .expect("request should succeed"); + + assert_eq!(response.status(), StatusCode::NOT_FOUND); + assert!( + rx.try_recv().is_err(), + "legacy /mode route should not enqueue request" + ); + } + + #[tokio::test] + async fn set_inbound_delivery_mode_route_returns_404_when_worker_missing() { let (router, mut rx) = test_router(Some("secret")); let replier = tokio::spawn(async move { - if let Some(ListenApiRequest::SetSessionMode { reply, .. }) = rx.recv().await { - let _ = reply.send(Err(SessionRouteError::WorkerNotFound("ghost".into()))); + if let Some(ListenApiRequest::SetInboundDeliveryMode { reply, .. }) = rx.recv().await { + let _ = reply.send(Err(DeliveryRouteError::WorkerNotFound("ghost".into()))); } }); let response = router .oneshot( Request::builder() - .uri("/api/spawned/ghost/mode") + .uri("/api/spawned/ghost/delivery-mode") .method("PUT") .header("x-api-key", "secret") .header("content-type", "application/json") - .body(Body::from(json!({ "mode": "human" }).to_string())) + .body(Body::from(json!({ "mode": "manual_flush" }).to_string())) .expect("request should build"), ) .await @@ -3109,7 +3136,7 @@ mod auth_tests { let (router, mut rx) = test_router(Some("secret")); let replier = tokio::spawn(async move { if let Some(ListenApiRequest::GetPending { reply, .. }) = rx.recv().await { - let _ = reply.send(Err(SessionRouteError::WorkerNotFound("ghost".into()))); + let _ = reply.send(Err(DeliveryRouteError::WorkerNotFound("ghost".into()))); } }); @@ -3167,7 +3194,7 @@ mod auth_tests { let (router, mut rx) = test_router(Some("secret")); let replier = tokio::spawn(async move { if let Some(ListenApiRequest::FlushPending { reply, .. }) = rx.recv().await { - let _ = reply.send(Err(SessionRouteError::WorkerNotFound("ghost".into()))); + let _ = reply.send(Err(DeliveryRouteError::WorkerNotFound("ghost".into()))); } }); @@ -3190,11 +3217,11 @@ mod auth_tests { } #[tokio::test] - async fn session_mode_routes_require_auth() { + async fn inbound_delivery_routes_require_auth() { let (router, _rx) = test_router(Some("secret")); for (method, path) in [ - ("GET", "/api/spawned/worker-a/mode"), - ("PUT", "/api/spawned/worker-a/mode"), + ("GET", "/api/spawned/worker-a/delivery-mode"), + ("PUT", "/api/spawned/worker-a/delivery-mode"), ("GET", "/api/spawned/worker-a/pending"), ("POST", "/api/spawned/worker-a/flush"), ] { @@ -3205,7 +3232,7 @@ mod auth_tests { .uri(path) .method(method) .header("content-type", "application/json") - .body(Body::from(json!({ "mode": "passthrough" }).to_string())) + .body(Body::from(json!({ "mode": "auto_inject" }).to_string())) .expect("request should build"), ) .await diff --git a/src/main.rs b/src/main.rs index 59f503727..8fa84b50c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,8 +29,8 @@ use helpers::{ normalize_cli_name, parse_cli_command, strip_ansi, }; use listen_api::{ - broadcast_if_relevant, listen_api_router, ListenApiConfig, ListenApiRequest, SessionRouteError, - SetSessionModeOk, + broadcast_if_relevant, listen_api_router, DeliveryRouteError, ListenApiConfig, + ListenApiRequest, SetInboundDeliveryModeOk, }; use routing::display_target_for_dashboard; @@ -65,8 +65,8 @@ use relay_broker::{ snippets::ensure_relaycast_mcp_config, telemetry::{ActionSource, TelemetryClient, TelemetryEvent}, types::{ - BrokerCommandEvent, BrokerCommandPayload, InboundKind, PendingRelayMessage, SenderKind, - SessionDispatch, SessionMode, SessionState, + BrokerCommandEvent, BrokerCommandPayload, InboundDeliveryDispatch, InboundDeliveryMode, + InboundDeliveryState, InboundKind, PendingRelayMessage, SenderKind, }, }; @@ -1407,16 +1407,16 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { // via the deadline sweep in the `reap_tick` arm below. // // The generic correlation infrastructure lives in `crate::worker_request` - // so each new request/response route (`snapshot_pty`, `mode`, `pending`, - // `flush`, ...) costs about five lines of broker plumbing. + // so each new request/response route (`snapshot_pty`, `delivery-mode`, + // `pending`, `flush`, ...) costs about five lines of broker plumbing. let mut pending_requests: HashMap = HashMap::new(); - // Per-worker session-mode + pending-relay-message queue. Lives + // Per-worker inbound-delivery-mode + pending-relay-message queue. Lives // parallel to `workers.workers` so we can swap modes / inspect / // drain without touching `WorkerHandle` (which holds OS-level - // process state). See `relay_broker::types::SessionState`. Entries + // process state). See `relay_broker::types::InboundDeliveryState`. Entries // are created lazily on first lookup and removed wherever workers // exit (`Release` arm, `worker_exited` frame, `reap_exited` sweep). - let mut session_states: HashMap = HashMap::new(); + let mut delivery_states: HashMap = HashMap::new(); let mut dm_participants_cache: HashMap)> = HashMap::new(); let mut recent_thread_messages: VecDeque = VecDeque::new(); if !pending_deliveries.is_empty() { @@ -1794,7 +1794,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { ).await; } fail_pending_requests_for_worker(&mut pending_requests, &name, "agent_released"); - session_states.remove(&name); + delivery_states.remove(&name); state.agents.remove(&name); if paths.persist { let _ = state.save(&paths.state); } let _ = send_event( @@ -1943,16 +1943,16 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { ); for worker_name in targets { - // Session-mode gate: when the worker is in human - // mode the broker parks the message in the - // per-worker pending queue instead of injecting, - // and counts it as delivered so the HTTP caller's - // ack semantics are unchanged. We pass the FULL + // Inbound-delivery gate: in `manual_flush` mode + // the broker parks the message in the per-worker + // pending queue instead of injecting, and counts + // it as delivered so the HTTP caller's ack + // semantics are unchanged. We pass the FULL // routing context so the eventual drain reproduces // the original delivery (channel/thread/workspace // /priority/mode), not a stripped-down DM. - match gate_inbound_for_session_mode( - &mut session_states, + match gate_inbound_for_delivery_mode( + &mut delivery_states, &workers, &worker_name, InboundContext { @@ -1974,7 +1974,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { event_id = %event_id, to = %normalized_to, worker = %worker_name, - "queued local delivery (human session mode)" + "queued local delivery (manual_flush inbound delivery mode)" ); let _ = send_event( &sdk_out_tx, @@ -1984,7 +1984,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { "event_id":&event_id, "from":&delivery_from, "target":&normalized_to, - "reason":"session_mode_human", + "reason":"inbound_delivery_manual_flush", }), ).await; continue; @@ -2409,27 +2409,27 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { "channels": remaining, }))); } - ListenApiRequest::GetSessionMode { name, reply } => { + ListenApiRequest::GetInboundDeliveryMode { name, reply } => { if !workers.has_worker(&name) { - let _ = reply.send(Err(SessionRouteError::WorkerNotFound(name))); + let _ = reply.send(Err(DeliveryRouteError::WorkerNotFound(name))); } else { - let mode = session_states + let mode = delivery_states .get(&name) .map(|s| s.mode) .unwrap_or_default(); let _ = reply.send(Ok(mode)); } } - ListenApiRequest::SetSessionMode { name, mode, reply } => { + ListenApiRequest::SetInboundDeliveryMode { name, mode, reply } => { if !workers.has_worker(&name) { - let _ = reply.send(Err(SessionRouteError::WorkerNotFound(name))); + let _ = reply.send(Err(DeliveryRouteError::WorkerNotFound(name))); } else { - let entry = session_states.entry(name.clone()).or_default(); + let entry = delivery_states.entry(name.clone()).or_default(); let previous = entry.mode; entry.mode = mode; let to_flush: Vec = if previous - == SessionMode::Human - && mode == SessionMode::Passthrough + == InboundDeliveryMode::ManualFlush + && mode == InboundDeliveryMode::AutoInject { entry.drain_pending() } else { @@ -2441,7 +2441,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { target = "agent_relay::broker", worker = %name, drained = flushed, - "draining pending queue on human → passthrough transition" + "draining pending queue on manual_flush → auto_inject transition" ); } for queued in to_flush { @@ -2460,13 +2460,13 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { previous_mode = previous.as_wire_str(), mode = mode.as_wire_str(), flushed, - "session mode updated" + "inbound delivery mode updated" ); if previous != mode { let _ = send_event( &sdk_out_tx, json!({ - "kind":"agent_session_mode_changed", + "kind":"agent_inbound_delivery_mode_changed", "name":&name, "previous_mode":previous.as_wire_str(), "mode":mode.as_wire_str(), @@ -2480,18 +2480,18 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { "kind":"agent_pending_drained", "name":&name, "count":flushed, - "reason":"mode_transition", + "reason":"delivery_mode_transition", }), ).await; } - let _ = reply.send(Ok(SetSessionModeOk { mode, flushed })); + let _ = reply.send(Ok(SetInboundDeliveryModeOk { mode, flushed })); } } ListenApiRequest::GetPending { name, reply } => { if !workers.has_worker(&name) { - let _ = reply.send(Err(SessionRouteError::WorkerNotFound(name))); + let _ = reply.send(Err(DeliveryRouteError::WorkerNotFound(name))); } else { - let snapshot = session_states + let snapshot = delivery_states .get(&name) .map(|s| s.pending_snapshot()) .unwrap_or_default(); @@ -2500,9 +2500,9 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { } ListenApiRequest::FlushPending { name, reply } => { if !workers.has_worker(&name) { - let _ = reply.send(Err(SessionRouteError::WorkerNotFound(name))); + let _ = reply.send(Err(DeliveryRouteError::WorkerNotFound(name))); } else { - let to_flush: Vec = session_states + let to_flush: Vec = delivery_states .get_mut(&name) .map(|state| state.drain_pending()) .unwrap_or_default(); @@ -2647,7 +2647,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { ).await; } fail_pending_requests_for_worker(&mut pending_requests, &name, "relaycast_release"); - session_states.remove(&name); + delivery_states.remove(&name); telemetry.track(TelemetryEvent::AgentRelease { cli: String::new(), release_reason: "relaycast_release".to_string(), @@ -3195,14 +3195,14 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { } for worker_name in delivery_plan.targets { - // Session-mode gate: mirrors the /api/send gate - // above. Human-mode workers see inbound relaycast - // messages parked in the pending queue rather - // than auto-injected; same full-context capture - // so drains reproduce the original delivery + // Inbound-delivery gate: mirrors the /api/send + // gate above. Manual-flush workers see inbound + // relaycast messages parked in the pending queue + // rather than auto-injected; same full-context + // capture so drains reproduce the original delivery // (channel/thread/workspace). - match gate_inbound_for_session_mode( - &mut session_states, + match gate_inbound_for_delivery_mode( + &mut delivery_states, &workers, &worker_name, InboundContext { @@ -3222,7 +3222,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { target = "agent_relay::broker", event_id = %mapped.event_id, worker = %worker_name, - "queued inbound relay message (human session mode)" + "queued inbound relay message (manual_flush inbound delivery mode)" ); let _ = send_event( &sdk_out_tx, @@ -3232,7 +3232,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { "event_id":&mapped.event_id, "from":&mapped.from, "target":&mapped.target, - "reason":"session_mode_human", + "reason":"inbound_delivery_manual_flush", }), ).await; continue; @@ -3736,7 +3736,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { ).await; } fail_pending_requests_for_worker(&mut pending_requests, &name, "worker_exited"); - session_states.remove(&name); + delivery_states.remove(&name); let _ = send_event( &sdk_out_tx, json!({ @@ -3937,7 +3937,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { ).await; } fail_pending_requests_for_worker(&mut pending_requests, name, "worker_permanently_dead"); - session_states.remove(name); + delivery_states.remove(name); let _ = send_event( &sdk_out_tx, json!({"kind":"agent_permanently_dead","name":name,"reason":reason}), @@ -3978,7 +3978,7 @@ async fn run_init(cmd: InitCommand, telemetry: TelemetryClient) -> Result<()> { ).await; } fail_pending_requests_for_worker(&mut pending_requests, name, "worker_exited"); - session_states.remove(name); + delivery_states.remove(name); let _ = send_event( &sdk_out_tx, json!({"kind":"agent_exited","name":name,"code":code,"signal":signal}), @@ -4245,7 +4245,7 @@ fn build_agent_metrics(handle: &WorkerHandle) -> AgentMetrics { } } -/// Outcome of [`gate_inbound_for_session_mode`]. Distinguishes the +/// Outcome of [`gate_inbound_for_delivery_mode`]. Distinguishes the /// three cases broker call sites care about: continue with the existing /// inject path, the message was queued (success — caller acks the /// sender), or there's no worker (caller skips this target). @@ -4256,9 +4256,9 @@ enum GateOutcome { WorkerMissing, } -/// Gate an inbound relay message through the per-worker [`SessionMode`]. +/// Gate an inbound relay message through the per-worker [`InboundDeliveryMode`]. /// -/// When the target worker is in [`SessionMode::Human`] the message is +/// When the target worker is in [`InboundDeliveryMode::ManualFlush`] the message is /// appended to the per-worker pending queue and the broker returns /// [`GateOutcome::Queued`], signalling the caller to skip the existing /// inject path. Otherwise the caller proceeds normally. @@ -4285,8 +4285,8 @@ struct InboundContext<'a> { event_id: Option<&'a str>, } -fn gate_inbound_for_session_mode( - session_states: &mut HashMap, +fn gate_inbound_for_delivery_mode( + delivery_states: &mut HashMap, workers: &WorkerRegistry, worker_name: &str, ctx: InboundContext<'_>, @@ -4294,8 +4294,8 @@ fn gate_inbound_for_session_mode( if !workers.has_worker(worker_name) { return GateOutcome::WorkerMissing; } - let state = session_states.entry(worker_name.to_string()).or_default(); - if state.mode == SessionMode::Passthrough { + let state = delivery_states.entry(worker_name.to_string()).or_default(); + if state.mode == InboundDeliveryMode::AutoInject { return GateOutcome::Inject; } let queued_at_ms = chrono::Utc::now().timestamp_millis().max(0) as u64; @@ -4312,18 +4312,18 @@ fn gate_inbound_for_session_mode( event_id: ctx.event_id.map(str::to_string), }; match state.accept_inbound(msg) { - SessionDispatch::Inject => GateOutcome::Inject, - SessionDispatch::Queued { queue_len } => { + InboundDeliveryDispatch::Inject => GateOutcome::Inject, + InboundDeliveryDispatch::Queued { queue_len } => { tracing::debug!( target = "agent_relay::broker", worker = %worker_name, from = %ctx.from, queue_len, - "queued inbound relay message (human mode)" + "queued inbound relay message (manual_flush delivery mode)" ); GateOutcome::Queued } - SessionDispatch::QueuedEvicted { + InboundDeliveryDispatch::QueuedEvicted { queue_len, dropped_from, } => { @@ -4344,7 +4344,7 @@ fn gate_inbound_for_session_mode( /// Inject a previously-queued pending relay message into the worker via /// the existing `queue_and_try_delivery_raw` path. Used by the /// `/api/spawned/{name}/flush` handler and by the auto-drain on a -/// `human → passthrough` transition. Failures are logged but not +/// `manual_flush → auto_inject` transition. Failures are logged but not /// propagated — the broker treats `flush` as best-effort fire-and-forget /// the same way `/api/send` does for individual targets. async fn inject_pending_relay_message( diff --git a/src/types.rs b/src/types.rs index 643ca52a0..782f0f18d 100644 --- a/src/types.rs +++ b/src/types.rs @@ -2,52 +2,51 @@ use serde::{Deserialize, Serialize}; use crate::protocol::MessageInjectionMode; -/// Per-worker session mode controlling how inbound relay messages are +/// Per-worker inbound delivery mode controlling how inbound relay messages are /// dispatched into the wrapped agent's PTY. /// -/// - [`SessionMode::Passthrough`] (default) injects inbound messages -/// directly into the worker. The user's own keystrokes also pass -/// through alongside the broker's auto-injections; both writers race. -/// - [`SessionMode::Human`] holds inbound messages in a per-worker -/// pending queue so a human-driven client can decide when to flush -/// them. +/// - [`InboundDeliveryMode::AutoInject`] (default) injects inbound messages +/// directly into the worker. The user's own keystrokes may also arrive +/// through an attached relay session, so both writers can race. +/// - [`InboundDeliveryMode::ManualFlush`] holds inbound messages in a +/// per-worker pending queue so a client can decide when to flush them. /// /// Mode is broker-side state only; the worker process does not observe it. -/// It resets to [`SessionMode::Passthrough`] on broker restart — there +/// It resets to [`InboundDeliveryMode::AutoInject`] on broker restart — there /// is no disk persistence. #[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] -pub enum SessionMode { +pub enum InboundDeliveryMode { /// Inbound messages auto-inject into the worker's PTY. #[default] - Passthrough, + AutoInject, /// Inbound messages append to the per-worker pending queue and wait /// for an explicit flush. - Human, + ManualFlush, } -impl SessionMode { +impl InboundDeliveryMode { pub fn as_wire_str(&self) -> &'static str { match self { - SessionMode::Passthrough => "passthrough", - SessionMode::Human => "human", + InboundDeliveryMode::AutoInject => "auto_inject", + InboundDeliveryMode::ManualFlush => "manual_flush", } } pub fn parse(value: &str) -> Option { match value.trim().to_ascii_lowercase().as_str() { - "passthrough" => Some(SessionMode::Passthrough), - "human" => Some(SessionMode::Human), + "auto_inject" => Some(InboundDeliveryMode::AutoInject), + "manual_flush" => Some(InboundDeliveryMode::ManualFlush), _ => None, } } } /// A relay message that arrived while a worker was in -/// [`SessionMode::Human`] and therefore got parked in the per-worker -/// pending queue instead of being injected. Drained in FIFO order by -/// `POST /api/spawned/{name}/flush` or the auto-drain on a -/// `human → passthrough` mode transition. +/// [`InboundDeliveryMode::ManualFlush`] and therefore got parked in the +/// per-worker pending queue instead of being injected. Drained in FIFO order +/// by `POST /api/spawned/{name}/flush` or the auto-drain on a +/// `manual_flush → auto_inject` mode transition. /// /// The full delivery context is captured at queue time so a drain /// later produces a byte-for-byte equivalent of the original delivery @@ -205,35 +204,35 @@ pub struct InjectRequest { pub attempts: u32, } -/// Per-worker session bookkeeping owned by the broker. Tracks the -/// current [`SessionMode`] plus the FIFO pending queue for messages -/// captured while in [`SessionMode::Human`]. The broker keeps one of -/// these per spawned worker in a parallel `HashMap` +/// Per-worker inbound delivery bookkeeping owned by the broker. Tracks the +/// current [`InboundDeliveryMode`] plus the FIFO pending queue for messages +/// captured while in [`InboundDeliveryMode::ManualFlush`]. The broker keeps one of +/// these per spawned worker in a parallel `HashMap` /// so the existing `WorkerHandle` (which holds OS-level process state) /// doesn't have to grow. #[derive(Debug, Default)] -pub struct SessionState { - pub mode: SessionMode, +pub struct InboundDeliveryState { + pub mode: InboundDeliveryMode, pub pending: std::collections::VecDeque, } /// Per-worker cap on the pending queue. Prevents unbounded growth when a -/// human-mode session is left open for hours; oldest message is evicted -/// with a `tracing::warn!` (see [`SessionState::push_pending`]). +/// `manual_flush` delivery mode is left open for hours; oldest message is evicted +/// with a `tracing::warn!` (see [`InboundDeliveryState::push_pending`]). pub const MAX_PENDING_PER_WORKER: usize = 256; -/// Outcome of dispatching one inbound relay message through the session -/// gate. Returned by [`SessionState::accept_inbound`] so the broker can +/// Outcome of dispatching one inbound relay message through the delivery +/// gate. Returned by [`InboundDeliveryState::accept_inbound`] so the broker can /// log + telemetry consistently. #[derive(Debug, Clone, PartialEq, Eq)] -pub enum SessionDispatch { - /// Worker is in [`SessionMode::Passthrough`]; the broker should run +pub enum InboundDeliveryDispatch { + /// Worker is in [`InboundDeliveryMode::AutoInject`]; the broker should run /// the existing inject path. Inject, - /// Worker is in [`SessionMode::Human`]; the message was queued. + /// Worker is in [`InboundDeliveryMode::ManualFlush`]; the message was queued. /// `queue_len` is the queue size *after* the push. Queued { queue_len: usize }, - /// Worker is in [`SessionMode::Human`] but the queue was full, so + /// Worker is in [`InboundDeliveryMode::ManualFlush`] but the queue was full, so /// the oldest entry was evicted to make room. `queue_len` is the /// queue size *after* the eviction + push (always equal to the cap). QueuedEvicted { @@ -242,8 +241,8 @@ pub enum SessionDispatch { }, } -impl SessionState { - pub fn new(mode: SessionMode) -> Self { +impl InboundDeliveryState { + pub fn new(mode: InboundDeliveryMode) -> Self { Self { mode, pending: std::collections::VecDeque::new(), @@ -264,31 +263,33 @@ impl SessionState { evicted_from } - /// Gate an inbound relay message through the current session mode. + /// Gate an inbound relay message through the current inbound delivery mode. /// - /// In [`SessionMode::Passthrough`] the message is *not* enqueued; - /// the caller runs the existing inject path. In [`SessionMode::Human`] - /// the message is appended (with FIFO eviction at the cap) and the - /// caller acks the sender without touching the worker's PTY. - pub fn accept_inbound(&mut self, msg: PendingRelayMessage) -> SessionDispatch { + /// In [`InboundDeliveryMode::AutoInject`] the message is *not* enqueued; + /// the caller runs the existing inject path. In + /// [`InboundDeliveryMode::ManualFlush`] the message is appended (with FIFO + /// eviction at the cap) and the caller acks the sender without touching the + /// worker's PTY. + pub fn accept_inbound(&mut self, msg: PendingRelayMessage) -> InboundDeliveryDispatch { match self.mode { - SessionMode::Passthrough => SessionDispatch::Inject, - SessionMode::Human => { + InboundDeliveryMode::AutoInject => InboundDeliveryDispatch::Inject, + InboundDeliveryMode::ManualFlush => { let evicted = self.push_pending(msg); let queue_len = self.pending.len(); match evicted { - Some(dropped_from) => SessionDispatch::QueuedEvicted { + Some(dropped_from) => InboundDeliveryDispatch::QueuedEvicted { queue_len, dropped_from, }, - None => SessionDispatch::Queued { queue_len }, + None => InboundDeliveryDispatch::Queued { queue_len }, } } } } /// Drain the pending queue in FIFO order. Used by `POST /api/flush` - /// and by the auto-drain that runs on a `human → passthrough` transition. + /// and by the auto-drain that runs on a `manual_flush → auto_inject` + /// transition. pub fn drain_pending(&mut self) -> Vec { self.pending.drain(..).collect() } @@ -300,7 +301,7 @@ impl SessionState { } #[cfg(test)] -mod session_tests { +mod inbound_delivery_tests { use super::*; fn msg(from: &str, body: &str) -> PendingRelayMessage { @@ -322,20 +323,23 @@ mod session_tests { } #[test] - fn session_mode_wire_format_matches_serde_round_trip() { + fn inbound_delivery_mode_wire_format_matches_serde_round_trip() { // Guard against `as_wire_str` / `parse` drifting from the // `#[serde(rename_all = "snake_case")]` representation. - for variant in [SessionMode::Passthrough, SessionMode::Human] { + for variant in [ + InboundDeliveryMode::AutoInject, + InboundDeliveryMode::ManualFlush, + ] { let serialized = serde_json::to_string(&variant) - .expect("SessionMode serializes") + .expect("InboundDeliveryMode serializes") .trim_matches('"') .to_string(); assert_eq!(serialized, variant.as_wire_str()); - let parsed = SessionMode::parse(&serialized).expect("wire form parses"); + let parsed = InboundDeliveryMode::parse(&serialized).expect("wire form parses"); assert_eq!(parsed, variant); - let from_serde: SessionMode = + let from_serde: InboundDeliveryMode = serde_json::from_str(&format!("\"{serialized}\"")).expect("serde round-trips"); assert_eq!(from_serde, variant); } @@ -359,37 +363,37 @@ mod session_tests { queued_at_ms: 123_456, event_id: Some("evt_xyz".to_string()), }; - let mut state = SessionState::new(SessionMode::Human); + let mut state = InboundDeliveryState::new(InboundDeliveryMode::ManualFlush); state.accept_inbound(queued.clone()); let drained = state.drain_pending(); assert_eq!(drained, vec![queued]); } #[test] - fn default_mode_is_passthrough() { - let state = SessionState::default(); - assert_eq!(state.mode, SessionMode::Passthrough); + fn default_mode_is_auto_inject() { + let state = InboundDeliveryState::default(); + assert_eq!(state.mode, InboundDeliveryMode::AutoInject); assert!(state.pending.is_empty()); } #[test] - fn passthrough_mode_does_not_queue() { - let mut state = SessionState::new(SessionMode::Passthrough); + fn auto_inject_mode_does_not_queue() { + let mut state = InboundDeliveryState::new(InboundDeliveryMode::AutoInject); let outcome = state.accept_inbound(msg("Alice", "hi")); - assert_eq!(outcome, SessionDispatch::Inject); + assert_eq!(outcome, InboundDeliveryDispatch::Inject); assert!(state.pending.is_empty()); } #[test] - fn human_mode_queues_in_fifo_order() { - let mut state = SessionState::new(SessionMode::Human); + fn manual_flush_mode_queues_in_fifo_order() { + let mut state = InboundDeliveryState::new(InboundDeliveryMode::ManualFlush); assert_eq!( state.accept_inbound(msg("Alice", "one")), - SessionDispatch::Queued { queue_len: 1 } + InboundDeliveryDispatch::Queued { queue_len: 1 } ); assert_eq!( state.accept_inbound(msg("Bob", "two")), - SessionDispatch::Queued { queue_len: 2 } + InboundDeliveryDispatch::Queued { queue_len: 2 } ); let drained = state.drain_pending(); assert_eq!(drained.len(), 2); @@ -400,18 +404,18 @@ mod session_tests { } #[test] - fn human_mode_caps_queue_with_fifo_eviction() { - let mut state = SessionState::new(SessionMode::Human); + fn manual_flush_mode_caps_queue_with_fifo_eviction() { + let mut state = InboundDeliveryState::new(InboundDeliveryMode::ManualFlush); for i in 0..MAX_PENDING_PER_WORKER { assert!(matches!( state.accept_inbound(msg(&format!("u{i}"), "x")), - SessionDispatch::Queued { .. } + InboundDeliveryDispatch::Queued { .. } )); } // Cap reached — next push evicts the oldest ("u0"). let outcome = state.accept_inbound(msg("overflow", "y")); match outcome { - SessionDispatch::QueuedEvicted { + InboundDeliveryDispatch::QueuedEvicted { queue_len, dropped_from, } => { @@ -429,7 +433,7 @@ mod session_tests { #[test] fn pending_snapshot_does_not_mutate() { - let mut state = SessionState::new(SessionMode::Human); + let mut state = InboundDeliveryState::new(InboundDeliveryMode::ManualFlush); state.accept_inbound(msg("Alice", "hi")); let snap = state.pending_snapshot(); assert_eq!(snap.len(), 1); @@ -439,15 +443,26 @@ mod session_tests { #[test] fn parse_round_trips_wire_strings() { assert_eq!( - SessionMode::parse("passthrough"), - Some(SessionMode::Passthrough) + InboundDeliveryMode::parse("auto_inject"), + Some(InboundDeliveryMode::AutoInject) + ); + assert_eq!( + InboundDeliveryMode::parse("MANUAL_FLUSH"), + Some(InboundDeliveryMode::ManualFlush) + ); + assert_eq!( + InboundDeliveryMode::parse(" manual_flush "), + Some(InboundDeliveryMode::ManualFlush) + ); + assert_eq!(InboundDeliveryMode::parse("drive"), None); + assert_eq!(InboundDeliveryMode::parse("passthrough"), None); + assert_eq!(InboundDeliveryMode::parse("human"), None); + // CLI verbs are not inbound delivery mode wire values. + assert_eq!(InboundDeliveryMode::parse("relay"), None); + assert_eq!(InboundDeliveryMode::AutoInject.as_wire_str(), "auto_inject"); + assert_eq!( + InboundDeliveryMode::ManualFlush.as_wire_str(), + "manual_flush" ); - assert_eq!(SessionMode::parse("HUMAN"), Some(SessionMode::Human)); - assert_eq!(SessionMode::parse(" human "), Some(SessionMode::Human)); - assert_eq!(SessionMode::parse("drive"), None); - // No back-compat alias: the prior "relay" wire form must not parse. - assert_eq!(SessionMode::parse("relay"), None); - assert_eq!(SessionMode::Passthrough.as_wire_str(), "passthrough"); - assert_eq!(SessionMode::Human.as_wire_str(), "human"); } } diff --git a/web/content/docs/reference-broker-api.mdx b/web/content/docs/reference-broker-api.mdx index 85a5f4a50..87d898017 100644 --- a/web/content/docs/reference-broker-api.mdx +++ b/web/content/docs/reference-broker-api.mdx @@ -242,56 +242,56 @@ The command prints the screen to stdout and exits 0 on success. On error (unknown worker, broker unreachable, invalid format) it prints a diagnostic to stderr and exits non-zero. -### Session mode +### Inbound delivery mode -Per-agent **session mode** controls how the broker dispatches inbound +Per-agent **inbound delivery mode** controls how the broker dispatches inbound relay messages to a spawned worker. Two modes are supported: -- **`passthrough`** (default) — inbound messages auto-inject into the - worker's PTY. Use this for headless agents that should react to - incoming traffic on their own, or for a `passthrough` client session - where the human types alongside the broker. -- **`human`** — inbound messages are *queued* in a per-worker pending - buffer instead of being injected. A human operator (or the - `agent-relay drive` client) decides when to drain the queue. Useful - when you've taken over an agent's PTY interactively and don't want - background traffic racing your keystrokes. +- **`auto_inject`** (default) — inbound messages auto-inject into the + worker's PTY. Use this for headless agents that should react to incoming + traffic on their own, or for an `agent-relay passthrough` client session where + the human types alongside the broker. +- **`manual_flush`** — inbound messages are *queued* in a per-worker pending + buffer instead of being injected. A human operator, SDK client, or + `agent-relay drive` client decides when to drain the queue. Useful when + you've taken over an agent's PTY interactively and don't want background + traffic racing your keystrokes. Mode is broker-side state only — the worker process never observes it. -Mode resets to `passthrough` when the broker restarts and the pending queue +Mode resets to `auto_inject` when the broker restarts and the pending queue is dropped (no on-disk persistence). | Method | Path | Purpose | | ------ | --------------------------------- | -------------------------------------------------------------------------------- | -| `GET` | `/api/spawned/{name}/mode` | Read the current session mode. Returns `{ "mode": "passthrough" \| "human" }`. | -| `PUT` | `/api/spawned/{name}/mode` | Set the session mode. Body `{ "mode": "passthrough" \| "human" }`. | +| `GET` | `/api/spawned/{name}/delivery-mode` | Read the current inbound delivery mode. Returns `{ "mode": "auto_inject" \| "manual_flush" }`. | +| `PUT` | `/api/spawned/{name}/delivery-mode` | Set the inbound delivery mode. Body `{ "mode": "auto_inject" \| "manual_flush" }`. | | `GET` | `/api/spawned/{name}/pending` | Snapshot the per-worker pending queue (FIFO, head first). | | `POST` | `/api/spawned/{name}/flush` | Drain the pending queue and inject every message into the worker. FIFO order. | -TypeScript SDK equivalents: `client.getSessionMode(name)`, -`client.setSessionMode(name, mode)`, `client.getPending(name)`, and +TypeScript SDK equivalents: `client.getInboundDeliveryMode(name)`, +`client.setInboundDeliveryMode(name, mode)`, `client.getPending(name)`, and `client.flushPending(name)`. -#### `PUT /api/spawned/{name}/mode` +#### `PUT /api/spawned/{name}/delivery-mode` ```json -{ "mode": "human" } +{ "mode": "manual_flush" } ``` -On a `human → passthrough` transition the broker auto-drains the pending +On a `manual_flush → auto_inject` transition the broker auto-drains the pending queue into the worker (via the normal inject path) **before** returning, -so flipping back to `passthrough` never strands queued messages. The response +so flipping back to `auto_inject` never strands queued messages. The response reports how many messages were flushed: ```json -{ "mode": "passthrough", "flushed": 3 } +{ "mode": "auto_inject", "flushed": 3 } ``` -A `passthrough → human` flip or a same-mode noop returns `"flushed": 0`. +An `auto_inject → manual_flush` flip or a same-mode noop returns `"flushed": 0`. Status codes: `200` on success, `400` on a body that isn't -`{ "mode": "passthrough" }` or `{ "mode": "human" }`, `404` if the agent is -not registered. +`{ "mode": "auto_inject" }` or `{ "mode": "manual_flush" }`, `404` if the +agent is not registered. #### `GET /api/spawned/{name}/pending` @@ -337,8 +337,8 @@ oldest entry is evicted with a broker-side warning. #### `POST /api/spawned/{name}/flush` Drains the queue and injects each message into the worker in FIFO order. -The session mode is **not** changed; a caller still in `human` mode will -continue queuing newly-arriving messages. +The inbound delivery mode is **not** changed; a caller still in `manual_flush` +mode will continue queuing newly-arriving messages. ```json { "flushed": 7 } @@ -398,9 +398,9 @@ Durable (replayable via `?sinceSeq=...`): | `delivery_failed` | Message delivery failed. | | `delivery_dropped` | Delivery was dropped (e.g. agent gone). | | `delivery_retry` | Delivery is being retried. | -| `delivery_queued` | Inbound delivery parked in the per-worker pending queue because the worker is in `human` session mode. Payload carries `event_id`, `from`, `target`, and `reason: "session_mode_human"`. | -| `agent_session_mode_changed` | A worker's session mode flipped via `PUT /api/spawned/{name}/mode`. Payload carries `previous_mode` and `mode`. | -| `agent_pending_drained` | The per-worker pending queue was drained. Payload carries `count` and `reason` (`mode_transition` for the auto-drain on `human → passthrough`, `explicit_flush` for `POST .../flush`). | +| `delivery_queued` | Inbound delivery parked in the per-worker pending queue because the worker is in `manual_flush` inbound delivery mode. Payload carries `event_id`, `from`, `target`, and `reason: "inbound_delivery_manual_flush"`. | +| `agent_inbound_delivery_mode_changed` | A worker's inbound delivery mode flipped via `PUT /api/spawned/{name}/delivery-mode`. Payload carries `previous_mode` and `mode`. | +| `agent_pending_drained` | The per-worker pending queue was drained. Payload carries `count` and `reason` (`delivery_mode_transition` for the auto-drain on `manual_flush → auto_inject`, `explicit_flush` for `POST .../flush`). | Ephemeral (broadcast only, no replay): @@ -454,21 +454,21 @@ curl -sX DELETE localhost:3888/api/spawned/Alice \ -H "X-API-Key: $KEY" ``` -## Worked example: take over an agent with session mode +## Worked example: take over an agent with inbound delivery mode -The four session-mode routes back the upcoming `agent-relay drive` -client. The typical drive-mode flow looks like this: +The four inbound-delivery-mode routes back the `agent-relay drive` and +`agent-relay passthrough` clients. The typical drive-mode flow looks like this: ```bash KEY="$RELAY_BROKER_API_KEY" -# 1. Flip Alice into human mode. Now inbound relay traffic +# 1. Flip Alice into manual_flush mode. Now inbound relay traffic # will queue instead of injecting. -curl -sX PUT localhost:3888/api/spawned/Alice/mode \ +curl -sX PUT localhost:3888/api/spawned/Alice/delivery-mode \ -H "X-API-Key: $KEY" \ - -d '{"mode":"human"}' + -d '{"mode":"manual_flush"}' -# 2. Send some messages while Alice is in human mode — +# 2. Send some messages while Alice is in manual_flush mode — # these land in her pending queue, not her PTY. curl -sX POST localhost:3888/api/send \ -H "X-API-Key: $KEY" \ @@ -485,11 +485,11 @@ curl -s localhost:3888/api/spawned/Alice/pending \ curl -sX POST localhost:3888/api/spawned/Alice/flush \ -H "X-API-Key: $KEY" -# 5. Flip Alice back to passthrough mode. Any messages still in the +# 5. Flip Alice back to auto_inject mode. Any messages still in the # queue are drained automatically before this call returns. -curl -sX PUT localhost:3888/api/spawned/Alice/mode \ +curl -sX PUT localhost:3888/api/spawned/Alice/delivery-mode \ -H "X-API-Key: $KEY" \ - -d '{"mode":"passthrough"}' + -d '{"mode":"auto_inject"}' ``` ## Error envelope diff --git a/web/content/docs/reference-cli.mdx b/web/content/docs/reference-cli.mdx index a1bbedf1a..279f8432f 100644 --- a/web/content/docs/reference-cli.mdx +++ b/web/content/docs/reference-cli.mdx @@ -70,11 +70,11 @@ Flags: If the initial snapshot can't be served (broker hiccup, transient timeout) `view` prints a warning to stderr and falls through to the live stream — you may briefly see a blank screen until the agent next produces output. If the agent doesn't exist or has no PTY (headless worker), `view` aborts with an explanatory error. -`view` shares the snapshot-on-attach helper with the `drive` and `relay` verbs so all three open with a faithful redraw of the agent's current screen. +`view` shares the snapshot-on-attach helper with the `drive` and `passthrough` verbs so all three open with a faithful redraw of the agent's current screen. ## `drive` -Take interactive control of a running agent. `drive` flips the worker's session mode to `human` so the broker parks new inbound relay messages in a per-worker FIFO queue, forwards your keystrokes to the worker's PTY, lets you drain the queue on demand, and detaches cleanly without killing the agent. +Take interactive control of a running agent. `drive` flips the worker's inbound delivery mode to `manual_flush` so the broker parks new inbound relay messages in a per-worker FIFO queue, forwards your keystrokes to the worker's PTY, lets you drain the queue on demand, and detaches cleanly without killing the agent. ```bash agent-relay drive reviewer @@ -82,15 +82,15 @@ agent-relay drive reviewer The client auto-discovers the running broker the same way `view` does (`--broker-url` → `RELAY_BROKER_URL` → `.agent-relay/connection.json`). On attach it: -1. Records the worker's current session mode so it can be restored on detach. -2. Calls `PUT /api/spawned/{name}/mode` with `{ "mode": "human" }` to start queueing relay messages. +1. Records the worker's current inbound delivery mode so it can be restored on detach. +2. Calls `PUT /api/spawned/{name}/delivery-mode` with `{ "mode": "manual_flush" }` to start queueing relay messages. 3. Captures and renders the agent's current visible screen via `GET /api/spawned/{name}/snapshot?format=ansi`. 4. Calls `GET /api/spawned/{name}/pending` once to seed the status-line counter. 5. Opens a WebSocket to `/ws` and subscribes to `worker_stream`, `delivery_queued`, and `agent_pending_drained` events for this agent. 6. Switches local stdin to raw mode and forwards keystrokes to `POST /api/input/{name}`. 7. Syncs the agent's PTY dimensions to your local terminal via `POST /api/resize/{name}` and forwards every subsequent `SIGWINCH`. Without this, a TUI in the agent would render into whatever 24×80 box the PTY was spawned with regardless of how big your terminal actually is. Skipped entirely when stdout isn't a TTY. -On detach (clean or abnormal), `drive` best-effort restores the worker's previous session mode so the queue doesn't fill up indefinitely, and leaves the agent running under the broker. +On detach (clean or abnormal), `drive` best-effort restores the worker's previous inbound delivery mode so the queue doesn't fill up indefinitely, and leaves the agent running under the broker. Flags: @@ -105,15 +105,15 @@ Keybinds: | Keys | Action | | ---- | ------ | | `Ctrl+G` | Flush the pending queue (`POST /api/spawned/{name}/flush`). The broker injects each queued message in FIFO order and emits `agent_pending_drained`. | -| `Ctrl+B` then `D` / `d` / `Ctrl+D` | Detach cleanly. Restores the worker's prior session mode, closes the WebSocket, exits 0. The parser accepts uppercase, lowercase, or `Ctrl+D` after the `Ctrl+B` prefix. | +| `Ctrl+B` then `D` / `d` / `Ctrl+D` | Detach cleanly. Restores the worker's prior inbound delivery mode, closes the WebSocket, exits 0. The parser accepts uppercase, lowercase, or `Ctrl+D` after the `Ctrl+B` prefix. | | `Ctrl+B` then `?` | Toggle a small help hint inside the status line. | | `Ctrl+C` | Safety alias for detach. `drive` never kills the agent — use `agent-relay agents:kill` for that. | | `Ctrl+B` then any other key | Forwarded to the agent unchanged so TUI apps that use `Ctrl+B` themselves aren't deprived. | -Status line: `drive` paints a one-line summary on the bottom row of your terminal using ANSI save/restore-cursor escapes so the agent's output isn't disturbed. It shows the agent name, current session mode, and the number of relay messages currently queued. The line repaints whenever the queue size changes (`delivery_queued` / `agent_pending_drained`) or the agent emits new output. +Status line: `drive` paints a one-line summary on the bottom row of your terminal using ANSI save/restore-cursor escapes so the agent's output isn't disturbed. It shows the agent name, current inbound delivery mode, and the number of relay messages currently queued. The line repaints whenever the queue size changes (`delivery_queued` / `agent_pending_drained`) or the agent emits new output. ``` -[drive reviewer | mode=human | pending=3 | Ctrl+G flush | Ctrl+B D detach] +[drive reviewer | delivery=manual_flush | pending=3 | Ctrl+G flush | Ctrl+B D detach] ``` Example session: @@ -132,13 +132,13 @@ The auto-inject sibling is [`passthrough`](#passthrough) below. ## `passthrough` -Watch a running agent in passthrough mode. Where [`drive`](#drive) flips the worker into `human` mode and parks inbound relay messages in a queue, `passthrough` leaves the worker in `passthrough` mode (the broker default) so the broker auto-injects inbound relay messages into the agent's PTY *while you also type*. Both streams race — that's the verb's whole point. Use `passthrough` when you want to observe and occasionally nudge while the broker does its coordination thing; use `drive` when you want exclusive deterministic control. +Watch a running agent with auto-injection enabled. Where [`drive`](#drive) flips the worker into `manual_flush` and parks inbound relay messages in a queue, `passthrough` leaves the worker in `auto_inject` mode (the broker default) so the broker auto-injects inbound relay messages into the agent's PTY *while you also type*. Both streams race — that's the verb's whole point. Use `passthrough` when you want to observe and occasionally nudge while the broker does its coordination thing; use `drive` when you want exclusive deterministic control. ```bash agent-relay passthrough reviewer ``` -Connection discovery, snapshot-on-attach, resize forwarding, and detach semantics are identical to `drive`. On attach, `passthrough` issues an idempotent `PUT /api/spawned/{name}/mode` with `{ "mode": "passthrough" }` so a worker that someone left in `human` mode is moved back to passthrough for the duration of your session, and the prior mode is restored on detach. +Connection discovery, snapshot-on-attach, resize forwarding, and detach semantics are identical to `drive`. On attach, `passthrough` issues an idempotent `PUT /api/spawned/{name}/delivery-mode` with `{ "mode": "auto_inject" }` so a worker that someone left in `manual_flush` mode is moved back to auto-inject for the duration of your session, and the prior mode is restored on detach. Flags: @@ -152,15 +152,15 @@ Keybinds: | Keys | Action | | ---- | ------ | -| `Ctrl+B` then `D` / `d` / `Ctrl+D` | Detach cleanly. Restores the worker's prior session mode, closes the WebSocket, exits 0. | +| `Ctrl+B` then `D` / `d` / `Ctrl+D` | Detach cleanly. Restores the worker's prior inbound delivery mode, closes the WebSocket, exits 0. | | `Ctrl+B` then `?` | Toggle the inline help hint in the status line. | | `Ctrl+C` | Safety alias for detach — `passthrough` never kills the agent. | | `Ctrl+B` then any other key | Forwarded to the agent unchanged. | -`passthrough` does *not* bind `Ctrl+G` (there's no queue to flush in passthrough mode — that's a `drive` concept). The status line is similar to `drive`'s but without the pending counter: +`passthrough` does *not* bind `Ctrl+G` (there's no queue to flush in `auto_inject` mode — that's a `drive` concept). The status line is similar to `drive`'s but without the pending counter: ``` -[passthrough reviewer | mode=passthrough | Ctrl+B D detach] +[passthrough reviewer | delivery=auto_inject | Ctrl+B D detach] ``` `passthrough` is single-driver-assumed in the same way `drive` is — last writer wins on keystrokes between the broker's auto-inject and your typing. @@ -206,7 +206,7 @@ Flags: | `--team ` | no | Team name for the agent. | | `--model ` | no | Model override (e.g. `opus`, `sonnet`, `gpt-4o`). | | `--attach` | no | After spawning, immediately open a session (default mode: `drive`). | -| `--mode ` | no | With `--attach`: session mode (`view`, `drive`, or `passthrough`). Defaults to `drive`. | +| `--mode ` | no | With `--attach`: session to open (`view`, `drive`, or `passthrough`). Defaults to `drive`. | | `--ephemeral` | no | With `--attach`: release the agent on client exit. | | `--broker-url ` | no | Broker base URL. Falls back to `RELAY_BROKER_URL`, then `.agent-relay/connection.json`. | | `--api-key ` | no | Broker API key. Falls back to `RELAY_BROKER_API_KEY`, then the `api_key` field in `connection.json`. | diff --git a/web/content/docs/typescript-sdk.mdx b/web/content/docs/typescript-sdk.mdx index 3c9f415fa..70bf178d5 100644 --- a/web/content/docs/typescript-sdk.mdx +++ b/web/content/docs/typescript-sdk.mdx @@ -130,7 +130,7 @@ interface Agent { ## Low-Level Broker Client `AgentRelay` is the high-level orchestration API. Use `AgentRelayClient` -when you need direct broker control over spawned workers, PTYs, session +when you need direct broker control over spawned workers, PTYs, inbound delivery mode, or the broker event stream. ```typescript @@ -172,7 +172,7 @@ const client = await AgentRelayClient.spawn({ cwd: '/my/project' }); | `client.subscribeChannels(name, channels)` | `POST /api/spawned/{name}/subscribe` | Subscribe a worker to channels. | | `client.unsubscribeChannels(name, channels)` | `POST /api/spawned/{name}/unsubscribe` | Unsubscribe a worker from channels. | -### PTY and Session Control +### PTY and Delivery Control These methods are the programmatic primitives behind CLI attach flows such as `drive`, `view`, and `passthrough`. @@ -182,20 +182,20 @@ as `drive`, `view`, and `passthrough`. | `client.sendInput(name, data)` | `POST /api/input/{name}` | Write raw bytes to a worker's PTY stdin. | | `client.resizePty(name, rows, cols)` | `POST /api/resize/{name}` | Resize a worker PTY. | | `client.snapshot(name, format?)` | `GET /api/spawned/{name}/snapshot` | Capture the visible PTY screen as `plain` text or base64 `ansi`. | -| `client.getSessionMode(name)` | `GET /api/spawned/{name}/mode` | Read `human` or `passthrough` mode. | -| `client.setSessionMode(name, mode)` | `PUT /api/spawned/{name}/mode` | Set session mode and return `{ mode, flushed }`. | -| `client.getPending(name)` | `GET /api/spawned/{name}/pending` | Read queued relay messages for a worker in `human` mode. | +| `client.getInboundDeliveryMode(name)` | `GET /api/spawned/{name}/delivery-mode` | Read `manual_flush` or `auto_inject`. | +| `client.setInboundDeliveryMode(name, mode)` | `PUT /api/spawned/{name}/delivery-mode` | Set inbound delivery mode and return `{ mode, flushed }`. | +| `client.getPending(name)` | `GET /api/spawned/{name}/pending` | Read queued relay messages for a worker in `manual_flush` mode. | | `client.flushPending(name)` | `POST /api/spawned/{name}/flush` | Drain queued relay messages into the worker PTY. | | `client.subscribeWorkerStream(name, options?)` | `GET /ws` | Async iterator over filtered `worker_stream` chunks for one worker. | ```typescript -import { AgentRelayClient, type SessionMode } from '@agent-relay/sdk'; +import { AgentRelayClient, type InboundDeliveryMode } from '@agent-relay/sdk'; const client = AgentRelayClient.connect({ cwd: '/my/project' }); const name = 'Reviewer'; -const previousMode: SessionMode = await client.getSessionMode(name); -await client.setSessionMode(name, 'human'); +const previousMode: InboundDeliveryMode = await client.getInboundDeliveryMode(name); +await client.setInboundDeliveryMode(name, 'manual_flush'); try { const snapshot = await client.snapshot(name, 'plain'); @@ -208,7 +208,7 @@ try { await client.resizePty(name, 40, 120); await client.flushPending(name); } finally { - await client.setSessionMode(name, previousMode); + await client.setInboundDeliveryMode(name, previousMode); } ```