Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions packages/sdk/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,17 +269,45 @@ export class AgentRelayClient {
});
client.child = child;

// Broker may still be connecting to Relaycast. Retry getSession
// with backoff if we get 503 (broker warming up).
// The broker prints "API listening on …" the moment its TCP listener is
// bound, but it still needs to complete a Relaycast handshake before
// `getSession()` will return. Two failure modes to handle:
//
// 1. Broker is alive and warming up — the startup-only API responds
// 503 until the handshake completes. Poll until it succeeds.
// 2. Broker died during the handshake (e.g. Relaycast unreachable) —
// the in-flight fetch sees the socket drop as `TypeError: fetch
// failed`, which is uninformative on its own.
//
// We race each `getSession()` against `brokerExited` so case (2) reports
// as the actual broker exit (with its stderr tail and exit code), not as
// a mystery network error. No backoff for the death case — we know it
// immediately. 503 polling stays simple at 1s intervals.
const brokerExited = new Promise<never>((_, reject) => {
child.once('exit', (code) => {
reject(
new Error(
formatBrokerStartupError(
`Broker process exited with code ${code} during initial handshake`,
child,
{ binaryPath, args, cwd, stdoutLines, stderrLines }
)
)
);
});
});
// Suppress unhandledRejection if the race is won by getSession before
// the broker exits later (e.g. on normal shutdown).
brokerExited.catch(() => {});

let session: SessionInfo | undefined;
for (let attempt = 0; attempt < 10; attempt++) {
try {
session = await client.getSession();
session = await Promise.race([client.getSession(), brokerExited]);
break;
} catch (err) {
const is503 =
err instanceof Error &&
(err.message.includes('503') || err.message.includes('Service Unavailable'));
const message = err instanceof Error ? err.message : String(err);
const is503 = message.includes('503') || message.includes('Service Unavailable');
if (!is503 || attempt >= 9) throw err;
await new Promise((resolve) => setTimeout(resolve, 1000));
}
Expand Down
98 changes: 98 additions & 0 deletions src/cli/lib/broker-lifecycle.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import { describe, expect, it } from 'vitest';

import {
classifyBrokerStartError,
classifyBrokerStartStage,
describeError,
} from './broker-lifecycle.js';

describe('describeError', () => {
it('returns plain message for a bare Error', () => {
expect(describeError(new Error('boom'))).toBe('boom');
});

it('unwraps the Node fetch failed cause and surfaces the network code', () => {
// Mirror the shape Node 22 produces: TypeError with a cause carrying .code.
const cause = Object.assign(new Error('connect ECONNREFUSED 127.0.0.1:3889'), {
code: 'ECONNREFUSED',
});
const err = new TypeError('fetch failed', { cause });

const result = describeError(err);
expect(result).toContain('fetch failed');
expect(result).toContain('ECONNREFUSED');
expect(result).toContain('127.0.0.1');
});

it('unwraps DNS errors (ENOTFOUND)', () => {
const cause = Object.assign(new Error('getaddrinfo ENOTFOUND api.agentrelay.com'), {
code: 'ENOTFOUND',
});
const err = new TypeError('fetch failed', { cause });

const result = describeError(err);
expect(result).toContain('ENOTFOUND');
expect(result).toContain('agentrelay.com');
});

it('handles non-Error values without throwing', () => {
expect(describeError('something went wrong')).toBe('something went wrong');
expect(describeError(undefined)).toBe('undefined');
expect(describeError(null)).toBe('null');
});

it('caps the cause-chain walk so a cycle cannot loop forever', () => {
const a = new Error('a') as Error & { cause?: unknown };
const b = new Error('b') as Error & { cause?: unknown };
a.cause = b;
b.cause = a;
// Just needs to terminate — the assertion is the absence of a hang.
expect(typeof describeError(a)).toBe('string');
});
});

describe('classifyBrokerStartError', () => {
it('prefers the underlying network code over the constructor name', () => {
const cause = Object.assign(new Error('connect ECONNREFUSED'), { code: 'ECONNREFUSED' });
const err = new TypeError('fetch failed', { cause });

expect(classifyBrokerStartError(err)).toBe('ECONNREFUSED');
});

it('falls back to the constructor name when no code is present', () => {
expect(classifyBrokerStartError(new Error('whatever'))).toBe('Error');
expect(classifyBrokerStartError(new TypeError('boom'))).toBe('TypeError');
});

it('handles non-Error values', () => {
expect(classifyBrokerStartError('oops')).toBe('string');
expect(classifyBrokerStartError(undefined)).toBe('undefined');
});
});

describe('classifyBrokerStartStage', () => {
it('marks dashboard port conflicts when the dashboard was requested', () => {
const err = Object.assign(new Error('listen EADDRINUSE'), { code: 'EADDRINUSE' });
expect(classifyBrokerStartStage(err, 'listen EADDRINUSE', true)).toBe('dashboard_port');
});

it('marks already-running brokers from the message text', () => {
const message = 'another broker instance is already running in this directory (/tmp/x)';
expect(classifyBrokerStartStage(new Error(message), message, false)).toBe('already_running');
});

it('classifies fetch failures as connect-stage errors', () => {
const cause = Object.assign(new Error('ECONNREFUSED'), { code: 'ECONNREFUSED' });
const err = new TypeError('fetch failed', { cause });
expect(classifyBrokerStartStage(err, 'fetch failed', false)).toBe('connect');
});

it('classifies broker-exited-before-ready as a spawn failure', () => {
const message = 'Broker process exited with code 1 before becoming ready (pid=123; …)';
expect(classifyBrokerStartStage(new Error(message), message, false)).toBe('spawn');
});

it('falls back to startup for everything else', () => {
expect(classifyBrokerStartStage(new Error('???'), '???', false)).toBe('startup');
});
});
90 changes: 87 additions & 3 deletions src/cli/lib/broker-lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import fs from 'node:fs';
import os from 'node:os';
import path from 'node:path';
import { AgentRelayClient } from '@agent-relay/sdk';
import { track } from '@agent-relay/telemetry';

import type { CoreDependencies, CoreProjectPaths, CoreRelay, SpawnedProcess } from '../commands/core.js';
import { buildBundledRelaycastMcpCommand } from './relaycast-mcp-command.js';
import { errorClassName } from './telemetry-helpers.js';

type UpOptions = {
dashboard?: boolean;
Expand Down Expand Up @@ -86,6 +88,84 @@ function toErrorMessage(err: unknown): string {
return err instanceof Error ? err.message : String(err);
}

type ErrorWithCode = { code?: unknown };

function errorCode(err: unknown): string | undefined {
if (!err || typeof err !== 'object') return undefined;
const code = (err as ErrorWithCode).code;
return typeof code === 'string' ? code : undefined;
}

/**
* Extract a human-meaningful detail string from an error, walking `err.cause`.
*
* Node's native `fetch()` throws `TypeError: fetch failed` for any network
* problem and stuffs the real reason (ECONNREFUSED, ENOTFOUND, AbortError,
* UND_ERR_CONNECT_TIMEOUT, …) into `err.cause`. Without unwrapping, every
* outbound HTTP failure looks identical to the user.
*
* Exported for testing.
*/
export function describeError(err: unknown): string {
const top = toErrorMessage(err);
if (!(err instanceof Error) || !err.cause) return top;

// Walk the cause chain and collect the deepest message + any error codes.
const codes: string[] = [];
let detail: string | undefined;
let cursor: unknown = err.cause;
let depth = 0;
while (cursor && depth < 5) {
const code = errorCode(cursor);
if (code && !codes.includes(code)) codes.push(code);
if (cursor instanceof Error && cursor.message) {
detail = cursor.message;
}
cursor = cursor instanceof Error ? cursor.cause : undefined;
depth += 1;
}

const parts = [top];
if (detail && detail !== top) parts.push(detail);
if (codes.length > 0) parts.push(`[${codes.join(', ')}]`);
return parts.join(' — ');
}

/**
* Pick the best `error_class` for telemetry. Prefer a network-style code from
* `err.cause` (ECONNREFUSED etc.) over the generic constructor name (TypeError)
* — a code is more actionable in PostHog and matches the schema's example
* values for `BrokerStartFailedEvent.error_class`.
*
* Exported for testing.
*/
export function classifyBrokerStartError(err: unknown): string {
let cursor: unknown = err;
let depth = 0;
while (cursor && depth < 5) {
const code = errorCode(cursor);
if (code) return code;
cursor = cursor instanceof Error ? cursor.cause : undefined;
depth += 1;
}
return errorClassName(err) ?? 'Error';
}

/** Exported for testing. */
export function classifyBrokerStartStage(
err: unknown,
message: string,
wantsDashboard: boolean
): string {
if (errorCode(err) === 'EADDRINUSE' && wantsDashboard) return 'dashboard_port';
if (isBrokerAlreadyRunningError(message)) return 'already_running';
if (/fetch failed/i.test(message)) return 'connect';
if (/Broker did not report API port/i.test(message)) return 'spawn';
if (/Broker process exited with code/i.test(message)) return 'spawn';
if (/ENOENT/i.test(message) && /broker/i.test(message)) return 'resolve_binary';
return 'startup';
}

async function resolveApiPortWithFallback(
startApiPort: number,
maxAttempts: number,
Expand Down Expand Up @@ -1126,14 +1206,18 @@ export async function runUpCommand(options: UpOptions, deps: CoreDependencies):
await deps.holdOpen();
} catch (err: unknown) {
await shutdownOnce();
const withCode = err as { code?: string };
const message = toErrorMessage(err);
if (withCode.code === 'EADDRINUSE' && wantsDashboard) {
const stage = classifyBrokerStartStage(err, message, wantsDashboard);
track('broker_start_failed', {
stage,
error_class: classifyBrokerStartError(err),
});
if (errorCode(err) === 'EADDRINUSE' && wantsDashboard) {
deps.error(`Dashboard port ${dashboardPort} is already in use.`);
} else if (isBrokerAlreadyRunningError(message)) {
reportAlreadyRunningError(message, paths.dataDir, deps);
} else {
deps.error(`Failed to start broker: ${message}`);
deps.error(`Failed to start broker: ${describeError(err)}`);
}
deps.exit(1);
}
Expand Down
Loading