Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions packages/sdk/src/__tests__/orchestration-upgrades.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,12 @@ describe('AgentRelayClient orchestration payloads', () => {
});
});

it('exposes session mode and pending queue HTTP routes', async () => {
it('exposes inbound delivery mode and pending queue HTTP routes', async () => {
const client = createProtocolClient();
const request = vi
.spyOn((client as any).transport, 'request')
.mockResolvedValueOnce({ mode: 'human' })
.mockResolvedValueOnce({ mode: 'passthrough', flushed: 2 })
.mockResolvedValueOnce({ mode: 'manual_flush' })
.mockResolvedValueOnce({ mode: 'auto_inject', flushed: 2 })
.mockResolvedValueOnce({
pending: [
{
Expand All @@ -281,18 +281,18 @@ describe('AgentRelayClient orchestration payloads', () => {
})
.mockResolvedValueOnce({ flushed: 1 });

await expect(client.getSessionMode('worker a')).resolves.toBe('human');
await expect(client.setSessionMode('worker a', 'passthrough')).resolves.toEqual({
mode: 'passthrough',
await expect(client.getInboundDeliveryMode('worker a')).resolves.toBe('manual_flush');
await expect(client.setInboundDeliveryMode('worker a', 'auto_inject')).resolves.toEqual({
mode: 'auto_inject',
flushed: 2,
});
await expect(client.getPending('worker a')).resolves.toHaveLength(1);
await expect(client.flushPending('worker a')).resolves.toEqual({ flushed: 1 });

expect(request).toHaveBeenNthCalledWith(1, '/api/spawned/worker%20a/mode');
expect(request).toHaveBeenNthCalledWith(2, '/api/spawned/worker%20a/mode', {
expect(request).toHaveBeenNthCalledWith(1, '/api/spawned/worker%20a/delivery-mode');
expect(request).toHaveBeenNthCalledWith(2, '/api/spawned/worker%20a/delivery-mode', {
method: 'PUT',
body: JSON.stringify({ mode: 'passthrough' }),
body: JSON.stringify({ mode: 'auto_inject' }),
});
expect(request).toHaveBeenNthCalledWith(3, '/api/spawned/worker%20a/pending');
expect(request).toHaveBeenNthCalledWith(4, '/api/spawned/worker%20a/flush', { method: 'POST' });
Expand Down Expand Up @@ -370,7 +370,7 @@ describe('AgentRelayClient orchestration payloads', () => {
);
const client = new AgentRelayClient({ baseUrl: TEST_BASE_URL, fetch: fetchMock as typeof fetch });

await expect(client.getSessionMode('ghost')).rejects.toMatchObject({
await expect(client.getInboundDeliveryMode('ghost')).rejects.toMatchObject({
code: 'agent_not_found',
status: 404,
message: "no agent named 'ghost'",
Expand Down
25 changes: 14 additions & 11 deletions packages/sdk/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import type {
HeadlessProvider,
PendingRelayMessage,
PtySnapshot,
SessionMode,
InboundDeliveryMode,
SnapshotFormat,
} from './protocol.js';
import type {
Expand Down Expand Up @@ -89,8 +89,8 @@ export interface SessionInfo {
uptime_secs: number;
}

export interface SetSessionModeResult {
mode: SessionMode;
export interface SetInboundDeliveryModeResult {
mode: InboundDeliveryMode;
flushed: number;
}

Expand Down Expand Up @@ -494,31 +494,34 @@ export class AgentRelayClient {
});
}

async getSessionMode(name: string): Promise<SessionMode> {
async getInboundDeliveryMode(name: string): Promise<InboundDeliveryMode> {
const result = await this.transport.request<{ mode?: unknown }>(
`/api/spawned/${encodeURIComponent(name)}/mode`
`/api/spawned/${encodeURIComponent(name)}/delivery-mode`
);
if (result.mode !== 'human' && result.mode !== 'passthrough') {
if (result.mode !== 'auto_inject' && result.mode !== 'manual_flush') {
throw new AgentRelayProtocolError({
code: 'invalid_response',
message: "session mode response missing valid 'mode'",
message: "inbound delivery mode response missing valid 'mode'",
});
}
return result.mode;
}

async setSessionMode(name: string, mode: SessionMode): Promise<SetSessionModeResult> {
async setInboundDeliveryMode(
name: string,
mode: InboundDeliveryMode
): Promise<SetInboundDeliveryModeResult> {
const result = await this.transport.request<{ mode?: unknown; flushed?: unknown }>(
`/api/spawned/${encodeURIComponent(name)}/mode`,
`/api/spawned/${encodeURIComponent(name)}/delivery-mode`,
{
method: 'PUT',
body: JSON.stringify({ mode }),
}
);
if (result.mode !== 'human' && result.mode !== 'passthrough') {
if (result.mode !== 'auto_inject' && result.mode !== 'manual_flush') {
throw new AgentRelayProtocolError({
code: 'invalid_response',
message: "set session mode response missing valid 'mode'",
message: "set inbound delivery mode response missing valid 'mode'",
});
}
return {
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export {
type AgentRelayBrokerInitArgs,
type AgentRelayClientOptions,
type AgentRelaySpawnOptions,
type SetSessionModeResult,
type SetInboundDeliveryModeResult,
type SessionInfo,
type WorkerStreamSubscriptionOptions,
} from './client.js';
Expand Down
8 changes: 7 additions & 1 deletion packages/sdk/src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ export const PROTOCOL_VERSION = 1 as const;

export type AgentRuntime = 'pty' | 'headless';
export type HeadlessProvider = 'claude' | 'opencode';
export type SessionMode = 'passthrough' | 'human';
export type InboundDeliveryMode = 'auto_inject' | 'manual_flush';
export type SnapshotFormat = 'plain' | 'ansi';

export interface RestartPolicy {
Expand Down Expand Up @@ -292,6 +292,12 @@ export type BrokerEvent =
count: number;
reason?: string;
}
| {
kind: 'agent_inbound_delivery_mode_changed';
name: string;
previous_mode: InboundDeliveryMode;
mode: InboundDeliveryMode;
}
| {
kind: 'delivery_injected';
name: string;
Expand Down
9 changes: 8 additions & 1 deletion src/cli/bootstrap.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,14 @@ describe('verbless `-n NAME CLI` silent alias', () => {
// that — here we just need the live snapshot.
function knownVerbs(): Set<string> {
const program = createProgram();
return new Set(program.commands.map((c) => c.name()));
const verbs = new Set<string>();
for (const command of program.commands) {
verbs.add(command.name());
for (const alias of command.aliases()) {
verbs.add(alias);
}
}
return verbs;
}

it('recognises `-n NAME CLI`', () => {
Expand Down
3 changes: 3 additions & 0 deletions src/cli/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@

dotenvConfig({ quiet: true });

const __filename = fileURLToPath(import.meta.url);

Check warning on line 38 in src/cli/bootstrap.ts

View workflow job for this annotation

GitHub Actions / lint

Variable name `__filename` trimmed as `_filename` must match one of the following formats: camelCase, UPPER_CASE, PascalCase
const __dirname = path.dirname(__filename);

Check warning on line 39 in src/cli/bootstrap.ts

View workflow job for this annotation

GitHub Actions / lint

Variable name `__dirname` trimmed as `_dirname` must match one of the following formats: camelCase, UPPER_CASE, PascalCase

function findPackageJson(startDir: string): string {
let dir = startDir;
Expand Down Expand Up @@ -320,6 +320,9 @@
const verbs = new Set<string>();
for (const command of program.commands) {
verbs.add(command.name());
for (const alias of command.aliases()) {
verbs.add(alias);
}
}
return verbs;
}
Expand Down
64 changes: 32 additions & 32 deletions src/cli/commands/drive.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,11 @@
interface FetchScript {
/** Map of route key → handler. Default behaviour returns 200 + sensible body. */
routes?: Record<string, FetchRoute>;
/** Default mode reported by `GET …/mode`. */
initialMode?: 'human' | 'passthrough';
/** Default mode reported by `GET …/delivery-mode`. */
initialMode?: 'manual_flush' | 'auto_inject';
/** Default pending count reported by `GET …/pending`. */
initialPending?: number;
/** Make `PUT …/mode` to `human` fail with this status / body. */
/** Make `PUT …/delivery-mode` to `manual_flush` fail with this status / body. */
modeFlipFailure?: { status: number; error?: string };
/** Make `captureAndRenderSnapshot` return this status. */
snapshotResult?: Awaited<ReturnType<DriveDependencies['captureAndRenderSnapshot']>>;
Expand Down Expand Up @@ -179,7 +179,7 @@
opts.terminalSize === undefined ? { rows: 30, cols: 100 } : opts.terminalSize
);

const initialMode = opts.initialMode ?? 'passthrough';
const initialMode = opts.initialMode ?? 'auto_inject';
const initialPending = opts.initialPending ?? 0;

const defaultRoutes: Record<string, FetchRoute> = {
Expand All @@ -188,12 +188,12 @@
status: 200,
headers: { 'Content-Type': 'application/json' },
}),
'GET /mode': async () =>
'GET /delivery-mode': async () =>
new Response(JSON.stringify({ mode: initialMode }), {
status: 200,
headers: { 'Content-Type': 'application/json' },
}),
'PUT /mode': async (init) => {
'PUT /delivery-mode': async (init) => {
if (opts.modeFlipFailure) {
return new Response(JSON.stringify({ error: opts.modeFlipFailure.error ?? 'fail' }), {
status: opts.modeFlipFailure.status,
Expand Down Expand Up @@ -226,7 +226,7 @@
};
const routes = { ...defaultRoutes, ...(opts.routes ?? {}) };

const fetchFn = vi.fn(async (input: RequestInfo | URL, init?: RequestInit): Promise<Response> => {

Check warning on line 229 in src/cli/commands/drive.test.ts

View workflow job for this annotation

GitHub Actions / lint

Async arrow function has a complexity of 18. Maximum allowed is 15
const url = typeof input === 'string' ? input : input.toString();
const method = (init?.method ?? 'GET').toUpperCase();
let bodyJson: unknown;
Expand Down Expand Up @@ -255,11 +255,11 @@
}
fetchLog.push({ url, method, body: bodyJson, headers });

// Match by the trailing path segment (`/mode`, `/pending`, `/flush`)
// Match by the trailing path segment (`/delivery-mode`, `/pending`, `/flush`)
// or the `/api/input/...` prefix.
let key: string | null = null;
if (/\/api\/spawned\/[^/]+\/mode$/.test(url)) {
key = `${method} /mode`;
if (/\/api\/spawned\/[^/]+\/delivery-mode$/.test(url)) {
key = `${method} /delivery-mode`;
} else if (/\/api\/spawned\/[^/]+\/pending$/.test(url)) {
key = `${method} /pending`;
} else if (/\/api\/spawned\/[^/]+\/flush$/.test(url)) {
Expand Down Expand Up @@ -436,15 +436,15 @@

describe('renderStatusLine', () => {
it('includes agent name, mode, pending count, and detach hint', () => {
const out = renderStatusLine({ name: 'Alice', mode: 'human', pending: 3, showHelp: false });
const out = renderStatusLine({ name: 'Alice', mode: 'manual_flush', pending: 3, showHelp: false });
expect(out).toContain('drive Alice');
expect(out).toContain('mode=human');
expect(out).toContain('delivery=manual_flush');
expect(out).toContain('pending=3');
expect(out).toContain('Ctrl+B D detach');
});

it('uses save/restore cursor + reverse video so the agent screen is preserved', () => {
const out = renderStatusLine({ name: 'Alice', mode: 'human', pending: 0, showHelp: false });
const out = renderStatusLine({ name: 'Alice', mode: 'manual_flush', pending: 0, showHelp: false });
expect(out.startsWith('\x1b7')).toBe(true); // save cursor
expect(out.endsWith('\x1b8')).toBe(true); // restore cursor
expect(out).toContain('\x1b[7m'); // reverse video
Expand All @@ -454,7 +454,7 @@
it('positions at the given row', () => {
const out = renderStatusLine({
name: 'A',
mode: 'human',
mode: 'manual_flush',
pending: 0,
showHelp: false,
rows: 50,
Expand All @@ -463,22 +463,22 @@
});

it('shows extra hint when help is toggled on', () => {
const out = renderStatusLine({ name: 'A', mode: 'human', pending: 0, showHelp: true });
const out = renderStatusLine({ name: 'A', mode: 'manual_flush', pending: 0, showHelp: true });
expect(out).toContain('hide help');
});
});

describe('runDriveSession', () => {
it('flips to human mode, renders snapshot, opens WS, then restores prior mode on detach', async () => {
const { deps, sockets, fetchLog, stdin } = createHarness({ initialMode: 'passthrough' });
it('flips to manual_flush delivery mode, renders snapshot, opens WS, then restores prior mode on detach', async () => {
const { deps, sockets, fetchLog, stdin } = createHarness({ initialMode: 'auto_inject' });
const sessionPromise = runDriveSession('Alice', {}, deps);
const socket = await openSocket(sockets);
expect(socket.url).toBe('ws://localhost:3889/ws');
expect(socket.headers['X-API-Key']).toBe('k');

// PUT /mode body should be { mode: 'human' }.
const flipCall = fetchLog.find((c) => c.method === 'PUT' && c.url.endsWith('/mode'));
expect(flipCall?.body).toEqual({ mode: 'human' });
// PUT /delivery-mode body should be { mode: 'manual_flush' }.
const flipCall = fetchLog.find((c) => c.method === 'PUT' && c.url.endsWith('/delivery-mode'));
expect(flipCall?.body).toEqual({ mode: 'manual_flush' });

// Raw mode should be on after open.
expect(stdin.rawModeCalls.includes(true)).toBe(true);
Expand All @@ -491,10 +491,10 @@
// Raw mode restored.
expect(stdin.rawModeCalls).toEqual([true, false]);

// Last PUT /mode call should restore to 'passthrough' (the prior mode).
const modeCalls = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/mode'));
// Last PUT /delivery-mode call should restore to 'auto_inject' (the prior mode).
const modeCalls = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/delivery-mode'));
expect(modeCalls).toHaveLength(2);
expect(modeCalls[1].body).toEqual({ mode: 'passthrough' });
expect(modeCalls[1].body).toEqual({ mode: 'auto_inject' });
});

it('aborts before opening the WS when the broker rejects the mode flip', async () => {
Expand All @@ -516,8 +516,8 @@
expect(sockets).toHaveLength(0);
expect(errors[0]?.[0]).toMatch(/no agent named/);
// Best-effort restore PUT should still have fired.
const modeCalls = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/mode'));
expect(modeCalls.map((c) => c.body)).toEqual([{ mode: 'human' }, { mode: 'passthrough' }]);
const modeCalls = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/delivery-mode'));
expect(modeCalls.map((c) => c.body)).toEqual([{ mode: 'manual_flush' }, { mode: 'auto_inject' }]);
});

it('aborts before opening the WS when the worker has no PTY', async () => {
Expand Down Expand Up @@ -608,7 +608,7 @@
});

it('restores the prior mode even on abnormal WebSocket close', async () => {
const { deps, sockets, fetchLog, errors } = createHarness({ initialMode: 'passthrough' });
const { deps, sockets, fetchLog, errors } = createHarness({ initialMode: 'auto_inject' });
const sessionPromise = runDriveSession('Alice', {}, deps);
const socket = await openSocket(sockets);

Expand All @@ -617,21 +617,21 @@
expect(code).toBe(1);
expect(errors.some((args) => String(args[0]).includes('connection closed'))).toBe(true);

const modeCalls = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/mode'));
expect(modeCalls.map((c) => c.body)).toEqual([{ mode: 'human' }, { mode: 'passthrough' }]);
const modeCalls = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/delivery-mode'));
expect(modeCalls.map((c) => c.body)).toEqual([{ mode: 'manual_flush' }, { mode: 'auto_inject' }]);
});

it('proceeds when the worker is already in human mode (re-attach scenario)', async () => {
const { deps, sockets, stdin, fetchLog } = createHarness({ initialMode: 'human' });
it('proceeds when the worker is already in manual_flush mode (re-attach scenario)', async () => {
const { deps, sockets, stdin, fetchLog } = createHarness({ initialMode: 'manual_flush' });
const sessionPromise = runDriveSession('Alice', {}, deps);
await openSocket(sockets);

stdin.type(Buffer.from([0x03]));
await sessionPromise;

const modeCalls = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/mode'));
// Restore to 'human' since that was the prior mode.
expect(modeCalls.map((c) => c.body)).toEqual([{ mode: 'human' }, { mode: 'human' }]);
const modeCalls = fetchLog.filter((c) => c.method === 'PUT' && c.url.endsWith('/delivery-mode'));
// Restore to 'manual_flush' since that was the prior mode.
expect(modeCalls.map((c) => c.body)).toEqual([{ mode: 'manual_flush' }, { mode: 'manual_flush' }]);
});

it('exits cleanly on SIGINT', async () => {
Expand Down
Loading
Loading