diff --git a/packages/sdk/src/__tests__/orchestration-upgrades.test.ts b/packages/sdk/src/__tests__/orchestration-upgrades.test.ts index 461356651..cb265e82c 100644 --- a/packages/sdk/src/__tests__/orchestration-upgrades.test.ts +++ b/packages/sdk/src/__tests__/orchestration-upgrades.test.ts @@ -260,6 +260,123 @@ describe('AgentRelayClient orchestration payloads', () => { }); }); + it('exposes session mode and pending queue HTTP routes', async () => { + const client = createProtocolClient(); + const request = vi + .spyOn((client as any).transport, 'request') + .mockResolvedValueOnce({ mode: 'human' }) + .mockResolvedValueOnce({ mode: 'passthrough', flushed: 2 }) + .mockResolvedValueOnce({ + pending: [ + { + from: 'Alice', + body: 'one', + target: '#general', + priority: 1, + mode: 'steer', + queued_at_ms: 100, + event_id: 'evt_1', + }, + ], + }) + .mockResolvedValueOnce({ flushed: 1 }); + + await expect(client.getSessionMode('worker a')).resolves.toBe('human'); + await expect(client.setSessionMode('worker a', 'passthrough')).resolves.toEqual({ + mode: 'passthrough', + flushed: 2, + }); + await expect(client.getPending('worker a')).resolves.toHaveLength(1); + await expect(client.flushPending('worker a')).resolves.toEqual({ flushed: 1 }); + + expect(request).toHaveBeenNthCalledWith(1, '/api/spawned/worker%20a/mode'); + expect(request).toHaveBeenNthCalledWith(2, '/api/spawned/worker%20a/mode', { + method: 'PUT', + body: JSON.stringify({ mode: 'passthrough' }), + }); + expect(request).toHaveBeenNthCalledWith(3, '/api/spawned/worker%20a/pending'); + expect(request).toHaveBeenNthCalledWith(4, '/api/spawned/worker%20a/flush', { method: 'POST' }); + }); + + it('exposes input, resize, and snapshot PTY routes', async () => { + const client = createProtocolClient(); + const request = vi + .spyOn((client as any).transport, 'request') + .mockResolvedValueOnce({ name: 'worker', bytes_written: 5 }) + .mockResolvedValueOnce({ name: 'worker', rows: 40, cols: 120 }) + .mockResolvedValueOnce({ + format: 'ansi', + rows: 40, + cols: 120, + cursor: [2, 3], + screen: 'YW5zaQ==', + }); + + await expect(client.sendInput('worker', 'hello')).resolves.toEqual({ + name: 'worker', + bytes_written: 5, + }); + await expect(client.resizePty('worker', 40, 120)).resolves.toEqual({ + name: 'worker', + rows: 40, + cols: 120, + }); + await expect(client.snapshot('worker', 'ansi')).resolves.toMatchObject({ + format: 'ansi', + rows: 40, + cols: 120, + screen: 'YW5zaQ==', + }); + + expect(request).toHaveBeenNthCalledWith(1, '/api/input/worker', { + method: 'POST', + body: JSON.stringify({ data: 'hello' }), + }); + expect(request).toHaveBeenNthCalledWith(2, '/api/resize/worker', { + method: 'POST', + body: JSON.stringify({ rows: 40, cols: 120 }), + }); + expect(request).toHaveBeenNthCalledWith(3, '/api/spawned/worker/snapshot?format=ansi'); + }); + + it('subscribeWorkerStream yields only matching stream chunks', async () => { + const client = createProtocolClient(); + const connect = vi.spyOn((client as any).transport, 'connect').mockImplementation(() => undefined); + const stream = client.subscribeWorkerStream('worker', { stream: 'stdout', sinceSeq: 7 }); + const iterator = stream[Symbol.asyncIterator](); + + const next = iterator.next(); + emitClientEvent(client, { kind: 'worker_stream', name: 'other', stream: 'stdout', chunk: 'skip-other' }); + emitClientEvent(client, { + kind: 'worker_stream', + name: 'worker', + stream: 'stderr', + chunk: 'skip-stderr', + }); + emitClientEvent(client, { kind: 'worker_stream', name: 'worker', stream: 'stdout', chunk: 'first' }); + + await expect(next).resolves.toEqual({ done: false, value: 'first' }); + await iterator.return?.(); + expect(connect).toHaveBeenCalledWith(7); + }); + + it('HTTP protocol errors include response status for CLI adapters', async () => { + const fetchMock = vi.fn( + async () => + new Response(JSON.stringify({ code: 'agent_not_found', message: "no agent named 'ghost'" }), { + status: 404, + headers: { 'Content-Type': 'application/json' }, + }) + ); + const client = new AgentRelayClient({ baseUrl: TEST_BASE_URL, fetch: fetchMock as typeof fetch }); + + await expect(client.getSessionMode('ghost')).rejects.toMatchObject({ + code: 'agent_not_found', + status: 404, + message: "no agent named 'ghost'", + }); + }); + it('buffers broker events and supports query/getLast helpers', () => { const client = createProtocolClient(); diff --git a/packages/sdk/src/__tests__/transport.test.ts b/packages/sdk/src/__tests__/transport.test.ts new file mode 100644 index 000000000..0f98f3023 --- /dev/null +++ b/packages/sdk/src/__tests__/transport.test.ts @@ -0,0 +1,59 @@ +import { afterEach, describe, expect, it, vi } from 'vitest'; + +import { AgentRelayProtocolError, BrokerTransport } from '../transport.js'; + +const TEST_BASE_URL = 'http://127.0.0.1:3888'; + +afterEach(() => { + vi.restoreAllMocks(); +}); + +describe('BrokerTransport.request', () => { + it('returns undefined for empty successful responses', async () => { + const fetchMock = vi.fn(async () => new Response(null, { status: 204 })); + const transport = new BrokerTransport({ baseUrl: TEST_BASE_URL, fetch: fetchMock as typeof fetch }); + + await expect(transport.request('/empty')).resolves.toBeUndefined(); + }); + + it('returns undefined for content-length zero successful responses', async () => { + const fetchMock = vi.fn( + async () => new Response(null, { status: 200, headers: { 'content-length': '0' } }) + ); + const transport = new BrokerTransport({ baseUrl: TEST_BASE_URL, fetch: fetchMock as typeof fetch }); + + await expect(transport.request('/empty')).resolves.toBeUndefined(); + }); + + it('keeps invalid_response for non-empty malformed JSON responses', async () => { + const fetchMock = vi.fn( + async () => new Response('not-json', { status: 200, headers: { 'content-type': 'application/json' } }) + ); + const transport = new BrokerTransport({ baseUrl: TEST_BASE_URL, fetch: fetchMock as typeof fetch }); + + await expect(transport.request('/bad-json')).rejects.toMatchObject>({ + code: 'invalid_response', + status: 200, + }); + }); + + it('replaces differently-cased API key headers instead of duplicating them', async () => { + const fetchMock = vi.fn( + async () => + new Response(JSON.stringify({ ok: true }), { + status: 200, + headers: { 'content-type': 'application/json' }, + }) + ); + const transport = new BrokerTransport({ + baseUrl: TEST_BASE_URL, + apiKey: 'configured-key', + fetch: fetchMock as typeof fetch, + }); + + await transport.request('/headers', { headers: { 'x-api-key': 'caller-key' } }); + + const init = fetchMock.mock.calls[0]?.[1] as RequestInit; + expect(init.headers).toEqual({ 'X-API-Key': 'configured-key' }); + }); +}); diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index 910dab533..61967e2e9 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -23,6 +23,10 @@ import type { BrokerStatus, CrashInsightsResponse, HeadlessProvider, + PendingRelayMessage, + PtySnapshot, + SessionMode, + SnapshotFormat, } from './protocol.js'; import type { AgentTransport, @@ -38,6 +42,8 @@ import type { export interface AgentRelayClientOptions { baseUrl: string; apiKey?: string; + /** Fetch implementation. Defaults to globalThis.fetch. */ + fetch?: typeof globalThis.fetch; /** Timeout in ms for HTTP requests. Default: 30000. */ requestTimeoutMs?: number; } @@ -83,6 +89,18 @@ export interface SessionInfo { uptime_secs: number; } +export interface SetSessionModeResult { + mode: SessionMode; + flushed: number; +} + +export interface WorkerStreamSubscriptionOptions { + /** Filter by stream name, for example `stdout` or `stderr`. Defaults to all streams. */ + stream?: string; + /** Sequence offset to pass to the broker event stream when connecting. */ + sinceSeq?: number; +} + interface BrokerStartupDebugContext { binaryPath: string; args: string[]; @@ -151,6 +169,7 @@ export class AgentRelayClient { this.transport = new BrokerTransport({ baseUrl: options.baseUrl, apiKey: options.apiKey, + fetch: options.fetch, requestTimeoutMs: options.requestTimeoutMs, }); } @@ -379,19 +398,19 @@ export class AgentRelayClient { body: JSON.stringify({ name: input.name, cli: input.cli, - model: input.model, + ...(input.model !== undefined ? { model: input.model } : {}), args: input.args ?? [], - task: input.task, + ...(input.task !== undefined ? { task: input.task } : {}), channels: input.channels ?? [], - cwd: input.cwd, - team: input.team, - agentToken: input.agentToken, - shadowOf: input.shadowOf, - shadowMode: input.shadowMode, - continueFrom: input.continueFrom, - idleThresholdSecs: input.idleThresholdSecs, - restartPolicy: input.restartPolicy, - skipRelayPrompt: input.skipRelayPrompt, + ...(input.cwd !== undefined ? { cwd: input.cwd } : {}), + ...(input.team !== undefined ? { team: input.team } : {}), + ...(input.agentToken !== undefined ? { agentToken: input.agentToken } : {}), + ...(input.shadowOf !== undefined ? { shadowOf: input.shadowOf } : {}), + ...(input.shadowMode !== undefined ? { shadowMode: input.shadowMode } : {}), + ...(input.continueFrom !== undefined ? { continueFrom: input.continueFrom } : {}), + ...(input.idleThresholdSecs !== undefined ? { idleThresholdSecs: input.idleThresholdSecs } : {}), + ...(input.restartPolicy !== undefined ? { restartPolicy: input.restartPolicy } : {}), + ...(input.skipRelayPrompt !== undefined ? { skipRelayPrompt: input.skipRelayPrompt } : {}), }), }); } @@ -409,19 +428,19 @@ export class AgentRelayClient { body: JSON.stringify({ name: input.name, cli: input.provider, - model: input.model, + ...(input.model !== undefined ? { model: input.model } : {}), args: input.args ?? [], - task: input.task, + ...(input.task !== undefined ? { task: input.task } : {}), channels: input.channels ?? [], - cwd: input.cwd, - team: input.team, - agentToken: input.agentToken, - shadowOf: input.shadowOf, - shadowMode: input.shadowMode, - continueFrom: input.continueFrom, - idleThresholdSecs: input.idleThresholdSecs, - restartPolicy: input.restartPolicy, - skipRelayPrompt: input.skipRelayPrompt, + ...(input.cwd !== undefined ? { cwd: input.cwd } : {}), + ...(input.team !== undefined ? { team: input.team } : {}), + ...(input.agentToken !== undefined ? { agentToken: input.agentToken } : {}), + ...(input.shadowOf !== undefined ? { shadowOf: input.shadowOf } : {}), + ...(input.shadowMode !== undefined ? { shadowMode: input.shadowMode } : {}), + ...(input.continueFrom !== undefined ? { continueFrom: input.continueFrom } : {}), + ...(input.idleThresholdSecs !== undefined ? { idleThresholdSecs: input.idleThresholdSecs } : {}), + ...(input.restartPolicy !== undefined ? { restartPolicy: input.restartPolicy } : {}), + ...(input.skipRelayPrompt !== undefined ? { skipRelayPrompt: input.skipRelayPrompt } : {}), transport, }), }); @@ -475,6 +494,135 @@ export class AgentRelayClient { }); } + async getSessionMode(name: string): Promise { + const result = await this.transport.request<{ mode?: unknown }>( + `/api/spawned/${encodeURIComponent(name)}/mode` + ); + if (result.mode !== 'human' && result.mode !== 'passthrough') { + throw new AgentRelayProtocolError({ + code: 'invalid_response', + message: "session mode response missing valid 'mode'", + }); + } + return result.mode; + } + + async setSessionMode(name: string, mode: SessionMode): Promise { + const result = await this.transport.request<{ mode?: unknown; flushed?: unknown }>( + `/api/spawned/${encodeURIComponent(name)}/mode`, + { + method: 'PUT', + body: JSON.stringify({ mode }), + } + ); + if (result.mode !== 'human' && result.mode !== 'passthrough') { + throw new AgentRelayProtocolError({ + code: 'invalid_response', + message: "set session mode response missing valid 'mode'", + }); + } + return { + mode: result.mode, + flushed: typeof result.flushed === 'number' ? result.flushed : 0, + }; + } + + async getPending(name: string): Promise { + const result = await this.transport.request<{ pending?: unknown }>( + `/api/spawned/${encodeURIComponent(name)}/pending` + ); + return Array.isArray(result.pending) ? (result.pending as PendingRelayMessage[]) : []; + } + + async flushPending(name: string): Promise<{ flushed: number }> { + const result = await this.transport.request<{ flushed?: unknown }>( + `/api/spawned/${encodeURIComponent(name)}/flush`, + { method: 'POST' } + ); + return { flushed: typeof result.flushed === 'number' ? result.flushed : 0 }; + } + + async snapshot(name: string, format: SnapshotFormat = 'plain'): Promise { + return this.transport.request( + `/api/spawned/${encodeURIComponent(name)}/snapshot?format=${encodeURIComponent(format)}` + ); + } + + subscribeWorkerStream(name: string, options: WorkerStreamSubscriptionOptions = {}): AsyncIterable { + this.connectEvents(options.sinceSeq); + + return { + [Symbol.asyncIterator]: () => { + const queue: string[] = []; + let pending: + | { + resolve: (result: IteratorResult) => void; + reject: (error: unknown) => void; + } + | undefined; + let done = false; + + const unsubscribe = this.onEvent((event) => { + if ( + event.kind !== 'worker_stream' || + event.name !== name || + (options.stream !== undefined && event.stream !== options.stream) + ) { + return; + } + if (pending) { + const { resolve } = pending; + pending = undefined; + resolve({ done: false, value: event.chunk }); + return; + } + queue.push(event.chunk); + }); + + const close = (): IteratorResult => { + done = true; + unsubscribe(); + if (pending) { + const { resolve } = pending; + pending = undefined; + resolve({ done: true, value: undefined as never }); + } + return { done: true, value: undefined as never }; + }; + + return { + next(): Promise> { + if (queue.length > 0) { + return Promise.resolve({ done: false, value: queue.shift() as string }); + } + if (done) { + return Promise.resolve({ done: true, value: undefined as never }); + } + return new Promise>((resolve, reject) => { + pending = { resolve, reject }; + }); + }, + return(): Promise> { + return Promise.resolve(close()); + }, + throw(error?: unknown): Promise> { + done = true; + unsubscribe(); + if (pending) { + const { reject } = pending; + pending = undefined; + reject(error); + } + return Promise.reject(error); + }, + [Symbol.asyncIterator]() { + return this; + }, + }; + }, + }; + } + // ── Messaging ────────────────────────────────────────────────────── async sendMessage(input: SendMessageInput): Promise<{ event_id: string; targets: string[] }> { diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index 0c311a8bc..4848124c3 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -6,7 +6,9 @@ export { type AgentRelayBrokerInitArgs, type AgentRelayClientOptions, type AgentRelaySpawnOptions, + type SetSessionModeResult, type SessionInfo, + type WorkerStreamSubscriptionOptions, } from './client.js'; export * from './models.js'; export { RelayCast, RelayError, AgentClient } from '@relaycast/sdk'; diff --git a/packages/sdk/src/protocol.ts b/packages/sdk/src/protocol.ts index 373930e39..cbcc6fbf9 100644 --- a/packages/sdk/src/protocol.ts +++ b/packages/sdk/src/protocol.ts @@ -2,6 +2,8 @@ export const PROTOCOL_VERSION = 1 as const; export type AgentRuntime = 'pty' | 'headless'; export type HeadlessProvider = 'claude' | 'opencode'; +export type SessionMode = 'passthrough' | 'human'; +export type SnapshotFormat = 'plain' | 'ansi'; export interface RestartPolicy { enabled?: boolean; @@ -40,6 +42,28 @@ export interface RelayDelivery { injection_mode?: MessageInjectionMode; } +export interface PendingRelayMessage { + from: string; + body: string; + target: string; + thread_id?: string; + workspace_id?: string; + workspace_alias?: string; + priority: number; + mode: MessageInjectionMode; + queued_at_ms: number; + event_id?: string; +} + +export interface PtySnapshot { + format: SnapshotFormat; + rows: number; + cols: number; + cursor: [number, number]; + /** Plain text for `format=plain`; base64-encoded ANSI bytes for `format=ansi`. */ + screen: string; +} + export interface ProtocolEnvelope { v: number; type: string; @@ -262,6 +286,12 @@ export type BrokerEvent = event_id: string; timestamp?: unknown; } + | { + kind: 'agent_pending_drained'; + name: string; + count: number; + reason?: string; + } | { kind: 'delivery_injected'; name: string; diff --git a/packages/sdk/src/transport.ts b/packages/sdk/src/transport.ts index 017399794..b8b5170f6 100644 --- a/packages/sdk/src/transport.ts +++ b/packages/sdk/src/transport.ts @@ -14,13 +14,21 @@ import type { BrokerEvent } from './protocol.js'; export class AgentRelayProtocolError extends Error { code: string; retryable: boolean; + status?: number; data?: unknown; - constructor(payload: { code: string; message: string; retryable?: boolean; data?: unknown }) { + constructor(payload: { + code: string; + message: string; + retryable?: boolean; + status?: number; + data?: unknown; + }) { super(payload.message); this.name = 'AgentRelayProtocolError'; this.code = payload.code; this.retryable = payload.retryable ?? false; + this.status = payload.status; this.data = payload.data; } } @@ -28,6 +36,8 @@ export class AgentRelayProtocolError extends Error { export interface BrokerTransportOptions { baseUrl: string; apiKey?: string; + /** Fetch implementation. Defaults to globalThis.fetch. */ + fetch?: typeof globalThis.fetch; /** Timeout in ms for HTTP requests. Default: 30000. */ requestTimeoutMs?: number; /** Maximum number of events to buffer in memory for queryEvents/getLastEvent */ @@ -37,6 +47,7 @@ export interface BrokerTransportOptions { export class BrokerTransport { private readonly baseUrl: string; private readonly apiKey?: string; + private readonly fetchFn: typeof globalThis.fetch; private readonly requestTimeoutMs: number; private readonly maxBufferSize: number; @@ -51,6 +62,7 @@ export class BrokerTransport { constructor(options: BrokerTransportOptions) { this.baseUrl = options.baseUrl.replace(/\/$/, ''); this.apiKey = options.apiKey; + this.fetchFn = options.fetch ?? fetch; this.requestTimeoutMs = options.requestTimeoutMs ?? 30_000; this.maxBufferSize = options.maxBufferSize ?? 1000; } @@ -66,16 +78,16 @@ export class BrokerTransport { // ── HTTP ───────────────────────────────────────────────────────────── async request(path: string, init?: RequestInit): Promise { - const headers = new Headers(init?.headers); - if (!headers.has('Content-Type')) { - headers.set('Content-Type', 'application/json'); + const headers = headersToRecord(init?.headers); + if (init?.body !== undefined && !hasHeader(headers, 'Content-Type')) { + headers['Content-Type'] = 'application/json'; } if (this.apiKey) { - headers.set('X-API-Key', this.apiKey); + setHeader(headers, 'X-API-Key', this.apiKey); } const signal = init?.signal ?? AbortSignal.timeout(this.requestTimeoutMs); - const res = await fetch(`${this.baseUrl}${path}`, { ...init, headers, signal }); + const res = await this.fetchFn(`${this.baseUrl}${path}`, { ...init, headers, signal }); if (!res.ok) { let body: { code?: string; message?: string; error?: string } | undefined; @@ -86,12 +98,31 @@ export class BrokerTransport { } throw new AgentRelayProtocolError({ code: body?.code ?? `http_${res.status}`, - message: body?.message ?? body?.error ?? res.statusText, + message: (body?.message ?? body?.error ?? res.statusText) || `HTTP ${res.status}`, retryable: res.status >= 500, + status: res.status, + data: body, }); } - return res.json() as Promise; + if (isEmptySuccessResponse(res)) { + return undefined as T; + } + + const bodyText = await res.text(); + if (bodyText.length === 0) { + return undefined as T; + } + try { + return JSON.parse(bodyText) as T; + } catch { + throw new AgentRelayProtocolError({ + code: 'invalid_response', + message: 'response was not JSON', + retryable: false, + status: res.status, + }); + } } // ── WebSocket events ───────────────────────────────────────────────── @@ -214,3 +245,49 @@ export class BrokerTransport { return undefined; } } + +function headersToRecord(headersInit: HeadersInit | undefined): Record { + const headers: Record = {}; + if (!headersInit) return headers; + + if (headersInit instanceof Headers) { + headersInit.forEach((value, key) => { + headers[key] = value; + }); + return headers; + } + + if (Array.isArray(headersInit)) { + for (const [key, value] of headersInit) { + headers[key] = value; + } + return headers; + } + + for (const [key, value] of Object.entries(headersInit)) { + headers[key] = value; + } + return headers; +} + +function hasHeader(headers: Record, name: string): boolean { + const lowerName = name.toLowerCase(); + return Object.keys(headers).some((key) => key.toLowerCase() === lowerName); +} + +function setHeader(headers: Record, name: string, value: string): void { + const lowerName = name.toLowerCase(); + for (const key of Object.keys(headers)) { + if (key.toLowerCase() === lowerName && key !== name) { + delete headers[key]; + } + } + headers[name] = value; +} + +function isEmptySuccessResponse(res: Response): boolean { + if (res.status === 204 || res.status === 205) { + return true; + } + return res.headers.get('content-length')?.trim() === '0'; +} diff --git a/src/cli/commands/drive.ts b/src/cli/commands/drive.ts index bdcf902d5..5fb944322 100644 --- a/src/cli/commands/drive.ts +++ b/src/cli/commands/drive.ts @@ -24,6 +24,7 @@ import { Buffer } from 'node:buffer'; +import type { SessionMode } from '@agent-relay/sdk'; import { Command } from 'commander'; import WebSocket from 'ws'; @@ -40,11 +41,12 @@ import { type BrokerConnection, } from '../lib/broker-connection.js'; import { defaultExit, runSignalHandler } from '../lib/exit.js'; +import { createBrokerClient, mapBrokerSdkFailure } from '../lib/sdk-client.js'; type ExitFn = (code: number) => never; /** Wire string for the broker's `SessionMode` enum. */ -export type SessionMode = 'human' | 'passthrough'; +export type { SessionMode }; /** Minimal WebSocket surface we depend on — same shape as `view`'s. */ export interface DriveWebSocket { @@ -157,11 +159,6 @@ function withDefaults(overrides: Partial = {}): DriveDependen }; } -/** Build the `X-API-Key` header set, or an empty object when no key. */ -function authHeaders(connection: BrokerConnection): Record { - return connection.apiKey ? { 'X-API-Key': connection.apiKey } : {}; -} - /** ----- HTTP helpers ----- */ /** `GET /api/spawned/{name}/mode` → `'human' | 'passthrough'` or `null` on failure. */ @@ -170,13 +167,8 @@ export async function getSessionMode( name: string, fetchFn: typeof globalThis.fetch ): Promise { - const url = `${connection.url}/api/spawned/${encodeURIComponent(name)}/mode`; try { - const res = await fetchFn(url, { headers: authHeaders(connection) }); - if (!res.ok) return null; - const body = (await res.json()) as { mode?: unknown }; - if (body.mode === 'human' || body.mode === 'passthrough') return body.mode; - return null; + return await createBrokerClient(connection, fetchFn).getSessionMode(name); } catch { return null; } @@ -198,34 +190,13 @@ export async function setSessionMode( mode: SessionMode, fetchFn: typeof globalThis.fetch ): Promise { - const url = `${connection.url}/api/spawned/${encodeURIComponent(name)}/mode`; try { - const res = await fetchFn(url, { - method: 'PUT', - headers: { ...authHeaders(connection), 'Content-Type': 'application/json' }, - body: JSON.stringify({ mode }), - }); - if (!res.ok) { - let message = `HTTP ${res.status}`; - try { - const body = (await res.json()) as { error?: unknown }; - if (typeof body.error === 'string') message = body.error; - } catch { - // body wasn't JSON; stick with HTTP status - } - return { ok: false, status: res.status, message }; - } - let flushed: number | undefined; - try { - const body = (await res.json()) as { flushed?: unknown }; - if (typeof body.flushed === 'number') flushed = body.flushed; - } catch { - // missing body is fine — mode flip still succeeded - } - return { ok: true, status: res.status, flushed }; + const body = await createBrokerClient(connection, fetchFn).setSessionMode(name, mode); + const flushed = body.flushed; + return { ok: true, status: 200, flushed }; } catch (err: unknown) { - const message = err instanceof Error ? err.message : String(err); - return { ok: false, status: 0, message }; + const failure = mapBrokerSdkFailure(err); + return { ok: false, status: failure.status, message: failure.message }; } } @@ -235,12 +206,8 @@ export async function getPendingCount( name: string, fetchFn: typeof globalThis.fetch ): Promise { - const url = `${connection.url}/api/spawned/${encodeURIComponent(name)}/pending`; try { - const res = await fetchFn(url, { headers: authHeaders(connection) }); - if (!res.ok) return 0; - const body = (await res.json()) as { pending?: unknown }; - return Array.isArray(body.pending) ? body.pending.length : 0; + return (await createBrokerClient(connection, fetchFn).getPending(name)).length; } catch { return 0; } @@ -252,22 +219,12 @@ export async function flushPending( name: string, fetchFn: typeof globalThis.fetch ): Promise<{ ok: boolean; flushed?: number; message?: string }> { - const url = `${connection.url}/api/spawned/${encodeURIComponent(name)}/flush`; try { - const res = await fetchFn(url, { method: 'POST', headers: authHeaders(connection) }); - if (!res.ok) { - return { ok: false, message: `HTTP ${res.status}` }; - } - try { - const body = (await res.json()) as { flushed?: unknown }; - const flushed = typeof body.flushed === 'number' ? body.flushed : undefined; - return { ok: true, flushed }; - } catch { - return { ok: true }; - } + const body = await createBrokerClient(connection, fetchFn).flushPending(name); + return { ok: true, flushed: body.flushed }; } catch (err: unknown) { - const message = err instanceof Error ? err.message : String(err); - return { ok: false, message }; + const failure = mapBrokerSdkFailure(err); + return { ok: false, message: failure.message }; } } @@ -278,20 +235,12 @@ export async function sendInput( data: string, fetchFn: typeof globalThis.fetch ): Promise<{ ok: boolean; message?: string }> { - const url = `${connection.url}/api/input/${encodeURIComponent(name)}`; try { - const res = await fetchFn(url, { - method: 'POST', - headers: { ...authHeaders(connection), 'Content-Type': 'application/json' }, - body: JSON.stringify({ data }), - }); - if (!res.ok) { - return { ok: false, message: `HTTP ${res.status}` }; - } + await createBrokerClient(connection, fetchFn).sendInput(name, data); return { ok: true }; } catch (err: unknown) { - const message = err instanceof Error ? err.message : String(err); - return { ok: false, message }; + const failure = mapBrokerSdkFailure(err); + return { ok: false, message: failure.message }; } } @@ -308,20 +257,12 @@ export async function resizeWorker( cols: number, fetchFn: typeof globalThis.fetch ): Promise<{ ok: boolean; message?: string }> { - const url = `${connection.url}/api/resize/${encodeURIComponent(name)}`; try { - const res = await fetchFn(url, { - method: 'POST', - headers: { ...authHeaders(connection), 'Content-Type': 'application/json' }, - body: JSON.stringify({ rows, cols }), - }); - if (!res.ok) { - return { ok: false, message: `HTTP ${res.status}` }; - } + await createBrokerClient(connection, fetchFn).resizePty(name, rows, cols); return { ok: true }; } catch (err: unknown) { - const message = err instanceof Error ? err.message : String(err); - return { ok: false, message }; + const failure = mapBrokerSdkFailure(err); + return { ok: false, message: failure.message }; } } diff --git a/src/cli/commands/new.ts b/src/cli/commands/new.ts index 362565e42..612c6b97b 100644 --- a/src/cli/commands/new.ts +++ b/src/cli/commands/new.ts @@ -21,9 +21,8 @@ * verbless `-n` alias dispatcher in `bootstrap.ts` calls — single code * path, byte-equivalent alias. * - * The longer-form `spawn` command in `agent-management.ts` also wraps - * the broker's spawn route but goes through the SDK client (with - * broker autostart and many shadow/team/model flags); `new` is the + * The longer-form `spawn` command in `agent-management.ts` layers broker + * autostart and more flags on top of the same SDK client; `new` is the * lighter "I already have a broker, just spawn this" entry point. */ @@ -36,6 +35,7 @@ import { type BrokerConnection, } from '../lib/broker-connection.js'; import { defaultExit } from '../lib/exit.js'; +import { createBrokerClient, mapBrokerSdkFailure } from '../lib/sdk-client.js'; import { buildDefaultAttachChildDeps, buildSpawnAndAttachDeps, @@ -69,10 +69,6 @@ function withDefaults(overrides: Partial = {}): NewDependencies }; } -function authHeaders(connection: BrokerConnection): Record { - return connection.apiKey ? { 'X-API-Key': connection.apiKey } : {}; -} - /** Body shape for `POST /api/spawn`. Mirrors the Rust broker's `listen_api_spawn`. */ export interface SpawnRequestBody { name: string; @@ -95,39 +91,24 @@ export interface SpawnResult { } /** - * POST `/api/spawn` against the broker. Exported so the - * spawn-and-attach helper (and any other caller) can use the same - * fetch / error mapping. + * Spawn through the SDK client against the resolved broker. Exported so + * the spawn-and-attach helper (and any other caller) can use the same + * transport / error mapping. */ export async function spawnAgent( connection: BrokerConnection, body: SpawnRequestBody, fetchFn: typeof globalThis.fetch ): Promise { - const url = `${connection.url}/api/spawn`; try { - const res = await fetchFn(url, { - method: 'POST', - headers: { ...authHeaders(connection), 'Content-Type': 'application/json' }, - body: JSON.stringify(body), - }); - let parsed: Record | undefined; - try { - const json = (await res.json()) as unknown; - if (json && typeof json === 'object' && !Array.isArray(json)) { - parsed = json as Record; - } - } catch { - // empty / non-JSON body — fine - } - if (!res.ok) { - const errMsg = parsed && typeof parsed.error === 'string' ? parsed.error : `HTTP ${res.status}`; - return { ok: false, status: res.status, message: errMsg, body: parsed }; - } - return { ok: true, status: res.status, body: parsed }; + const parsed = (await createBrokerClient(connection, fetchFn).spawnPty(body)) as unknown as Record< + string, + unknown + >; + return { ok: true, status: 200, body: parsed }; } catch (err: unknown) { - const message = err instanceof Error ? err.message : String(err); - return { ok: false, status: 0, message }; + const failure = mapBrokerSdkFailure(err); + return { ok: false, status: failure.status, message: failure.message }; } } diff --git a/src/cli/commands/rm.ts b/src/cli/commands/rm.ts index b268995b3..7f0d29ef9 100644 --- a/src/cli/commands/rm.ts +++ b/src/cli/commands/rm.ts @@ -8,9 +8,9 @@ * `RELAY_BROKER_URL` / `connection.json` chain works as for `view` / * `drive` / `relay`. * - * The longer-form `release` command in `agent-management.ts` does the - * same thing through the SDK client but costs a broker autostart; - * `rm` is the lighter "I already have a broker" entry point. + * The longer-form `release` command in `agent-management.ts` layers + * broker autostart on top of the same SDK client; `rm` is the lighter + * "I already have a broker" entry point. */ import { Command } from 'commander'; @@ -22,6 +22,7 @@ import { type BrokerConnection, } from '../lib/broker-connection.js'; import { defaultExit } from '../lib/exit.js'; +import { createBrokerClient, mapBrokerSdkFailure } from '../lib/sdk-client.js'; type ExitFn = (code: number) => never; @@ -48,10 +49,6 @@ function withDefaults(overrides: Partial = {}): RmDependencies { }; } -function authHeaders(connection: BrokerConnection): Record { - return connection.apiKey ? { 'X-API-Key': connection.apiKey } : {}; -} - /** Outcome of `releaseAgent`. Useful for the `--ephemeral` teardown in `run`. */ export interface ReleaseResult { ok: boolean; @@ -60,7 +57,7 @@ export interface ReleaseResult { } /** - * Issue `DELETE /api/spawned/{name}` against the broker. Returns a + * Release through the SDK client against the resolved broker. Returns a * structured outcome the caller can decide how to surface — `rm` prints * a one-liner, the `--ephemeral` teardown in `run` swallows failures * because the client is already on its way out. @@ -70,23 +67,12 @@ export async function releaseAgent( agentName: string, fetchFn: typeof globalThis.fetch ): Promise { - const url = `${connection.url}/api/spawned/${encodeURIComponent(agentName)}`; try { - const res = await fetchFn(url, { method: 'DELETE', headers: authHeaders(connection) }); - if (!res.ok) { - let message = `HTTP ${res.status}`; - try { - const body = (await res.json()) as { error?: unknown }; - if (typeof body.error === 'string') message = body.error; - } catch { - // not JSON — keep the HTTP status - } - return { ok: false, status: res.status, message }; - } - return { ok: true, status: res.status }; + await createBrokerClient(connection, fetchFn).release(agentName); + return { ok: true, status: 200 }; } catch (err: unknown) { - const message = err instanceof Error ? err.message : String(err); - return { ok: false, status: 0, message }; + const failure = mapBrokerSdkFailure(err); + return { ok: false, status: failure.status, message: failure.message }; } } diff --git a/src/cli/lib/attach.ts b/src/cli/lib/attach.ts index bc8d21d61..360bb91d8 100644 --- a/src/cli/lib/attach.ts +++ b/src/cli/lib/attach.ts @@ -9,6 +9,8 @@ * of code. */ +import { createBrokerClient, mapBrokerSdkFailure } from './sdk-client.js'; + /** Connection metadata used to call the broker's snapshot endpoint. */ export interface AttachSnapshotConnection { /** Broker base URL (no trailing slash). */ @@ -61,40 +63,26 @@ export async function captureAndRenderSnapshot( agentName: string, deps: AttachSnapshotDeps ): Promise { - const baseUrl = connection.url.replace(/\/+$/, ''); - const target = `${baseUrl}/api/spawned/${encodeURIComponent(agentName)}/snapshot?format=ansi`; - const headers: Record = {}; - if (connection.apiKey) { - headers['X-API-Key'] = connection.apiKey; - } - - let res: Response; + let body: unknown; try { - res = await deps.fetch(target, { headers }); + body = await createBrokerClient(connection, deps.fetch).snapshot(agentName, 'ansi'); } catch (err: unknown) { - const message = err instanceof Error ? err.message : String(err); - return { status: 'transport_error', message }; + const failure = mapBrokerSdkFailure(err); + if (failure.status === 404) { + return { status: 'not_found', message: `no agent named '${agentName}'` }; + } + if (failure.status === 409) { + return { + status: 'no_pty', + message: `agent '${agentName}' has no PTY (headless worker — nothing to view)`, + }; + } + if (failure.status === 0 || failure.status === 200) { + return { status: 'transport_error', message: failure.message }; + } + return { status: 'unavailable', message: `snapshot returned HTTP ${failure.status}` }; } - if (res.status === 404) { - return { status: 'not_found', message: `no agent named '${agentName}'` }; - } - if (res.status === 409) { - return { - status: 'no_pty', - message: `agent '${agentName}' has no PTY (headless worker — nothing to view)`, - }; - } - if (!res.ok) { - return { status: 'unavailable', message: `snapshot returned HTTP ${res.status}` }; - } - - let body: unknown; - try { - body = await res.json(); - } catch { - return { status: 'transport_error', message: 'snapshot response was not JSON' }; - } if (typeof body !== 'object' || body === null) { return { status: 'transport_error', message: 'snapshot response was not an object' }; } diff --git a/src/cli/lib/sdk-client.ts b/src/cli/lib/sdk-client.ts new file mode 100644 index 000000000..5930053a5 --- /dev/null +++ b/src/cli/lib/sdk-client.ts @@ -0,0 +1,39 @@ +import { AgentRelayClient, AgentRelayProtocolError } from '@agent-relay/sdk'; + +import type { BrokerConnection } from './broker-connection.js'; + +export function createBrokerClient( + connection: BrokerConnection, + fetchFn?: typeof globalThis.fetch +): AgentRelayClient { + return new AgentRelayClient({ + baseUrl: connection.url, + apiKey: connection.apiKey, + fetch: fetchFn, + }); +} + +export interface BrokerSdkFailure { + status: number; + message: string; +} + +export function mapBrokerSdkFailure(error: unknown): BrokerSdkFailure { + if (error instanceof AgentRelayProtocolError) { + return { + status: error.status ?? parseHttpStatus(error.code) ?? 0, + message: error.message, + }; + } + return { + status: 0, + message: error instanceof Error ? error.message : String(error), + }; +} + +function parseHttpStatus(code: string): number | undefined { + const match = /^http_(\d{3})$/.exec(code); + if (!match) return undefined; + const status = Number(match[1]); + return Number.isInteger(status) ? status : undefined; +} diff --git a/web/content/docs/reference-broker-api.mdx b/web/content/docs/reference-broker-api.mdx index 2dadaf366..85a5f4a50 100644 --- a/web/content/docs/reference-broker-api.mdx +++ b/web/content/docs/reference-broker-api.mdx @@ -107,6 +107,9 @@ status on failure. | `POST` | `/api/resize/{name}` | Resize an agent's PTY. Body `{ "rows": , "cols": }`. | | `POST` | `/api/send` | Inject a relay message into an agent. | +TypeScript SDK equivalents: `client.sendInput(name, data)`, +`client.resizePty(name, rows, cols)`, and `client.sendMessage(input)`. + #### `POST /api/input/{name}` This is the keystroke channel. The `data` string is written to the @@ -205,6 +208,12 @@ curl -s \ "http://127.0.0.1:3888/api/spawned/reviewer/snapshot?format=plain" ``` +TypeScript SDK equivalent: + +```typescript +const snapshot = await client.snapshot('reviewer', 'plain'); +``` + #### `agent-relay-broker dump-pty ` Admin CLI that wraps the snapshot route — useful for "what does this @@ -259,6 +268,10 @@ is dropped (no on-disk persistence). | `GET` | `/api/spawned/{name}/pending` | Snapshot the per-worker pending queue (FIFO, head first). | | `POST` | `/api/spawned/{name}/flush` | Drain the pending queue and inject every message into the worker. FIFO order. | +TypeScript SDK equivalents: `client.getSessionMode(name)`, +`client.setSessionMode(name, mode)`, `client.getPending(name)`, and +`client.flushPending(name)`. + #### `PUT /api/spawned/{name}/mode` ```json @@ -396,6 +409,14 @@ Ephemeral (broadcast only, no replay): | `worker_stream` | A chunk of stdout from a wrapped CLI. Payload contains `stream` (`"stdout"`) and `chunk` (the raw bytes — typically still ANSI-escaped). | | `delivery_active` | High-frequency progress events for in-flight deliveries. | +TypeScript SDK equivalent for one worker's PTY output: + +```typescript +for await (const chunk of client.subscribeWorkerStream('Alice')) { + process.stdout.write(chunk); +} +``` + ## Worked example: control a spawned agent end-to-end ```bash diff --git a/web/content/docs/typescript-sdk.mdx b/web/content/docs/typescript-sdk.mdx index b61423baf..3c9f415fa 100644 --- a/web/content/docs/typescript-sdk.mdx +++ b/web/content/docs/typescript-sdk.mdx @@ -127,6 +127,103 @@ interface Agent { --- +## Low-Level Broker Client + +`AgentRelay` is the high-level orchestration API. Use `AgentRelayClient` +when you need direct broker control over spawned workers, PTYs, session +mode, or the broker event stream. + +```typescript +import { AgentRelayClient } from '@agent-relay/sdk'; + +// Spawn a broker and connect to it. +const client = await AgentRelayClient.spawn({ cwd: '/my/project' }); + +// Or connect to an already-running broker by reading .agent-relay/connection.json. +// const client = AgentRelayClient.connect({ cwd: '/my/project' }); + +// Or connect directly when you already know the broker URL and API key. +// const client = new AgentRelayClient({ +// baseUrl: 'http://127.0.0.1:3888', +// apiKey: process.env.RELAY_BROKER_API_KEY, +// }); +``` + +### Client Options + +| Property | Type | Description | Default | +| ------------------ | ---------------------- | ------------------------------------------------ | ------- | +| `baseUrl` | `string` | Broker listen API base URL. | Required | +| `apiKey` | `string` | API key sent as `X-API-Key`. | None | +| `fetch` | `typeof globalThis.fetch` | Custom fetch implementation for tests/runtimes. | `globalThis.fetch` | +| `requestTimeoutMs` | `number` | HTTP request timeout in milliseconds. | `30000` | + +### Lifecycle and Messaging + +| Method | Broker route | Description | +| ------ | ------------ | ----------- | +| `client.spawnPty(input)` | `POST /api/spawn` | Spawn a PTY-backed worker. | +| `client.spawnProvider(input)` | `POST /api/spawn` | Spawn by provider and transport. | +| `client.spawnClaude(input)` | `POST /api/spawn` | Spawn a Claude worker. | +| `client.spawnOpencode(input)` | `POST /api/spawn` | Spawn an OpenCode worker. | +| `client.release(name, reason?)` | `DELETE /api/spawned/{name}` | Release a worker. | +| `client.listAgents()` | `GET /api/spawned` | List running workers. | +| `client.sendMessage(input)` | `POST /api/send` | Inject a relay message. | +| `client.subscribeChannels(name, channels)` | `POST /api/spawned/{name}/subscribe` | Subscribe a worker to channels. | +| `client.unsubscribeChannels(name, channels)` | `POST /api/spawned/{name}/unsubscribe` | Unsubscribe a worker from channels. | + +### PTY and Session Control + +These methods are the programmatic primitives behind CLI attach flows such +as `drive`, `view`, and `passthrough`. + +| Method | Broker route | Description | +| ------ | ------------ | ----------- | +| `client.sendInput(name, data)` | `POST /api/input/{name}` | Write raw bytes to a worker's PTY stdin. | +| `client.resizePty(name, rows, cols)` | `POST /api/resize/{name}` | Resize a worker PTY. | +| `client.snapshot(name, format?)` | `GET /api/spawned/{name}/snapshot` | Capture the visible PTY screen as `plain` text or base64 `ansi`. | +| `client.getSessionMode(name)` | `GET /api/spawned/{name}/mode` | Read `human` or `passthrough` mode. | +| `client.setSessionMode(name, mode)` | `PUT /api/spawned/{name}/mode` | Set session mode and return `{ mode, flushed }`. | +| `client.getPending(name)` | `GET /api/spawned/{name}/pending` | Read queued relay messages for a worker in `human` mode. | +| `client.flushPending(name)` | `POST /api/spawned/{name}/flush` | Drain queued relay messages into the worker PTY. | +| `client.subscribeWorkerStream(name, options?)` | `GET /ws` | Async iterator over filtered `worker_stream` chunks for one worker. | + +```typescript +import { AgentRelayClient, type SessionMode } from '@agent-relay/sdk'; + +const client = AgentRelayClient.connect({ cwd: '/my/project' }); +const name = 'Reviewer'; + +const previousMode: SessionMode = await client.getSessionMode(name); +await client.setSessionMode(name, 'human'); + +try { + const snapshot = await client.snapshot(name, 'plain'); + console.log(snapshot.screen); + + const pending = await client.getPending(name); + console.log(`Queued messages: ${pending.length}`); + + await client.sendInput(name, 'please continue\n'); + await client.resizePty(name, 40, 120); + await client.flushPending(name); +} finally { + await client.setSessionMode(name, previousMode); +} +``` + +```typescript +for await (const chunk of client.subscribeWorkerStream('Reviewer', { stream: 'stdout' })) { + process.stdout.write(chunk); +} +``` + +`drive`, `view`, and `passthrough` stay in the CLI because they manage raw +terminal mode, keybindings, status lines, signals, and teardown. The SDK +exposes the composable broker primitives those interactive surfaces use. + +--- + ## Human Handles Send messages from a named human or system identity (not a spawned CLI agent):