diff --git a/apps/mesh/migrations/077-trigger-callback-tokens-per-subscription.ts b/apps/mesh/migrations/077-trigger-callback-tokens-per-subscription.ts new file mode 100644 index 0000000000..9fa0b30171 --- /dev/null +++ b/apps/mesh/migrations/077-trigger-callback-tokens-per-subscription.ts @@ -0,0 +1,81 @@ +/** + * Migration 077: Trigger Callback Tokens — per subscription + * + * Each automation trigger now gets its own callback token row, so we can + * disable a single subscription without invalidating its siblings on the + * same connection. Adds `subscription_id` (= automation_triggers.id) and + * relaxes the unique index from `(connection_id, organization_id)` to + * `subscription_id` alone. + * + * Existing rows get backfilled with a placeholder subscription_id derived + * from their primary key — they remain validatable until the next + * trigger toggle, which writes a fresh row keyed by the real trigger id. + */ + +import type { Kysely } from "kysely"; +import { sql } from "kysely"; + +export async function up(db: Kysely): Promise { + // 1. Add subscription_id, nullable initially so we can backfill. + await db.schema + .alterTable("trigger_callback_tokens") + .addColumn("subscription_id", "text") + .execute(); + + // 2. Backfill: existing rows aren't tied to a specific trigger id, so + // use the row's own primary key as a self-referential placeholder. + // Not strictly correct, but keeps the rows queryable until the next + // TRIGGER_CONFIGURE replaces them with proper subscription ids. + await sql`UPDATE trigger_callback_tokens SET subscription_id = id WHERE subscription_id IS NULL`.execute( + db, + ); + + // 3. Make NOT NULL now that everything is populated. + await db.schema + .alterTable("trigger_callback_tokens") + .alterColumn("subscription_id", (col) => col.setNotNull()) + .execute(); + + // 4. Drop the old unique index that prevented multi-row. + await db.schema + .dropIndex("idx_trigger_callback_tokens_connection_org") + .execute(); + + // 5. New unique index on subscription_id — globally unique because + // automation trigger ids are uuids. + await db.schema + .createIndex("idx_trigger_callback_tokens_subscription") + .on("trigger_callback_tokens") + .columns(["subscription_id"]) + .unique() + .execute(); + + // 6. Keep a non-unique lookup index on (connection_id, organization_id) + // for fanout queries during trigger callback validation. + await db.schema + .createIndex("idx_trigger_callback_tokens_connection_org_lookup") + .on("trigger_callback_tokens") + .columns(["connection_id", "organization_id"]) + .execute(); +} + +export async function down(db: Kysely): Promise { + await db.schema + .dropIndex("idx_trigger_callback_tokens_connection_org_lookup") + .execute(); + await db.schema + .dropIndex("idx_trigger_callback_tokens_subscription") + .execute(); + // Restore the original unique index. Note: this may fail if multi-row + // data exists from the new code path — `down` is best-effort. + await db.schema + .createIndex("idx_trigger_callback_tokens_connection_org") + .on("trigger_callback_tokens") + .columns(["connection_id", "organization_id"]) + .unique() + .execute(); + await db.schema + .alterTable("trigger_callback_tokens") + .dropColumn("subscription_id") + .execute(); +} diff --git a/apps/mesh/migrations/index.ts b/apps/mesh/migrations/index.ts index 23f3bf7b97..573615fb19 100644 --- a/apps/mesh/migrations/index.ts +++ b/apps/mesh/migrations/index.ts @@ -75,6 +75,7 @@ import * as migration073backfillbasicusageroles from "./073-backfill-basic-usage import * as migration074sandboxrunnerstatehandlenonunique from "./074-sandbox-runner-state-handle-nonunique.ts"; import * as migration075threadinflightasyncjobs from "./075-thread-inflight-async-jobs.ts"; import * as migration076automationsdropagentjson from "./076-automations-drop-agent-json.ts"; +import * as migration077triggercallbacktokenspersubscription from "./077-trigger-callback-tokens-per-subscription.ts"; /** * Core migrations for the Mesh application. @@ -165,6 +166,8 @@ const migrations: Record = { migration074sandboxrunnerstatehandlenonunique, "075-thread-inflight-async-jobs": migration075threadinflightasyncjobs, "076-automations-drop-agent-json": migration076automationsdropagentjson, + "077-trigger-callback-tokens-per-subscription": + migration077triggercallbacktokenspersubscription, }; export default migrations; diff --git a/apps/mesh/src/storage/automations.ts b/apps/mesh/src/storage/automations.ts index 1a238afe91..b09ea23c12 100644 --- a/apps/mesh/src/storage/automations.ts +++ b/apps/mesh/src/storage/automations.ts @@ -36,6 +36,13 @@ export interface UpdateAutomationInput { } export interface CreateTriggerInput { + /** + * Optional pre-allocated id. Callers that need to reference the trigger + * before insertion (e.g. event triggers calling `configureTriggerOnMcp` + * with a stable subscriptionId) generate the uuid upfront and pass it + * here so the configure call and the insert agree on the id. + */ + id?: string; automation_id: string; type: "cron" | "event"; cron_expression?: string | null; @@ -77,6 +84,15 @@ export interface AutomationsStorage { automationId: string, ): Promise<{ success: boolean }>; listTriggers(automationId: string): Promise; + /** + * List every trigger bound to a connection in an organization. Used + * during connection deletion to disable each subscription on the MCP + * before the connection record disappears. + */ + listTriggersByConnection( + connectionId: string, + organizationId: string, + ): Promise; findTriggerById(triggerId: string): Promise; findActiveEventTriggers( connectionId: string, @@ -335,7 +351,7 @@ class KyselyAutomationsStorage implements AutomationsStorage { async addTrigger(input: CreateTriggerInput): Promise { const now = new Date().toISOString(); - const id = crypto.randomUUID(); + const id = input.id ?? crypto.randomUUID(); const row = { id, @@ -383,6 +399,32 @@ class KyselyAutomationsStorage implements AutomationsStorage { return rows.map(triggerFromDbRow); } + async listTriggersByConnection( + connectionId: string, + organizationId: string, + ): Promise { + const rows = await this.db + .selectFrom("automation_triggers as t") + .innerJoin("automations as a", "a.id", "t.automation_id") + .select([ + "t.id", + "t.automation_id", + "t.type", + "t.cron_expression", + "t.connection_id", + "t.event_type", + "t.params", + "t.last_run_at", + "t.next_run_at", + "t.created_at", + ]) + .where("t.connection_id", "=", connectionId) + .where("a.organization_id", "=", organizationId) + .execute(); + + return rows.map(triggerFromDbRow); + } + async findTriggerById(triggerId: string): Promise { const row = await this.db .selectFrom("automation_triggers") diff --git a/apps/mesh/src/storage/trigger-callback-tokens.ts b/apps/mesh/src/storage/trigger-callback-tokens.ts index 4ad608e4a4..088bb289fa 100644 --- a/apps/mesh/src/storage/trigger-callback-tokens.ts +++ b/apps/mesh/src/storage/trigger-callback-tokens.ts @@ -4,6 +4,12 @@ * Manages opaque callback tokens that external MCPs use to authenticate * trigger callbacks to Mesh. Tokens are stored as SHA-256 hashes; * plaintext is only returned once at creation time. + * + * Each row keys on `subscription_id` (= `automation_triggers.id`) so a + * single connection can host many independent subscriptions, each with + * its own token. Token validation still resolves to (orgId, connId) + * because the trigger callback endpoint fans out by `(connection, type)` + * downstream of token validation. */ import type { Kysely } from "kysely"; @@ -38,40 +44,65 @@ export interface TriggerCallbackTokenStorage { generateTokenPair(): Promise; /** - * Persist a token hash for a connection+organization pair. - * Upserts — safe for concurrent calls. + * Persist a token hash for a specific subscription. Upserts on + * `subscription_id`, so re-running TRIGGER_CONFIGURE for the same + * subscription rotates the token cleanly without orphaning siblings + * on the same connection. */ - persistTokenHash( - organizationId: string, - connectionId: string, - tokenHash: string, - ): Promise; + persistTokenHash(args: { + organizationId: string; + connectionId: string; + subscriptionId: string; + tokenHash: string; + }): Promise; /** - * Create or rotate a callback token for a connection+organization pair. - * Returns the plaintext token (only available at creation time). - * Convenience method that combines generateTokenPair + persistTokenHash. + * Create or rotate a callback token for a subscription. Returns the + * plaintext token (only available at creation time). */ - createOrRotateToken( - organizationId: string, - connectionId: string, - ): Promise; + createOrRotateToken(args: { + organizationId: string; + connectionId: string; + subscriptionId: string; + }): Promise; /** - * Validate a callback token. - * Returns connection and org context if valid, null otherwise. + * Validate a callback token. Returns the connection + org context for + * the row that owns this token, or null if no row matches. */ validateToken( token: string, - ): Promise<{ organizationId: string; connectionId: string } | null>; + ): Promise<{ + organizationId: string; + connectionId: string; + subscriptionId: string; + } | null>; + + /** + * Delete the callback token for one specific subscription. + */ + deleteBySubscription(subscriptionId: string): Promise; /** - * Delete callback token for a connection+organization pair. + * Delete every callback token attached to a connection. Used during + * connection deletion to garbage-collect every subscription that was + * bound to it. */ deleteByConnection( connectionId: string, organizationId: string, ): Promise; + + /** + * List every (subscription, connection) pair tied to a connection so + * the caller can iterate them — e.g. to send TRIGGER_CONFIGURE + * disable to the MCP for each subscription before the connection + * itself is deleted. + */ + listByConnection( + connectionId: string, + organizationId: string, + ): Promise>; } export class KyselyTriggerCallbackTokenStorage @@ -85,11 +116,17 @@ export class KyselyTriggerCallbackTokenStorage return { plaintext, hash }; } - async persistTokenHash( - organizationId: string, - connectionId: string, - tokenHash: string, - ): Promise { + async persistTokenHash({ + organizationId, + connectionId, + subscriptionId, + tokenHash, + }: { + organizationId: string; + connectionId: string; + subscriptionId: string; + tokenHash: string; + }): Promise { const id = crypto.randomUUID(); await this.db .insertInto("trigger_callback_tokens") @@ -97,35 +134,52 @@ export class KyselyTriggerCallbackTokenStorage id, organization_id: organizationId, connection_id: connectionId, + subscription_id: subscriptionId, token_hash: tokenHash, created_at: new Date().toISOString(), }) .onConflict((oc) => - oc.columns(["connection_id", "organization_id"]).doUpdateSet({ + oc.columns(["subscription_id"]).doUpdateSet({ id, + organization_id: organizationId, + connection_id: connectionId, token_hash: tokenHash, }), ) .execute(); } - async createOrRotateToken( - organizationId: string, - connectionId: string, - ): Promise { + async createOrRotateToken({ + organizationId, + connectionId, + subscriptionId, + }: { + organizationId: string; + connectionId: string; + subscriptionId: string; + }): Promise { const { plaintext, hash } = await this.generateTokenPair(); - await this.persistTokenHash(organizationId, connectionId, hash); + await this.persistTokenHash({ + organizationId, + connectionId, + subscriptionId, + tokenHash: hash, + }); return plaintext; } async validateToken( token: string, - ): Promise<{ organizationId: string; connectionId: string } | null> { + ): Promise<{ + organizationId: string; + connectionId: string; + subscriptionId: string; + } | null> { const tokenHash = await hashToken(token); const row = await this.db .selectFrom("trigger_callback_tokens") - .select(["organization_id", "connection_id"]) + .select(["organization_id", "connection_id", "subscription_id"]) .where("token_hash", "=", tokenHash) .executeTakeFirst(); @@ -134,9 +188,17 @@ export class KyselyTriggerCallbackTokenStorage return { organizationId: row.organization_id, connectionId: row.connection_id, + subscriptionId: row.subscription_id, }; } + async deleteBySubscription(subscriptionId: string): Promise { + await this.db + .deleteFrom("trigger_callback_tokens") + .where("subscription_id", "=", subscriptionId) + .execute(); + } + async deleteByConnection( connectionId: string, organizationId: string, @@ -147,4 +209,17 @@ export class KyselyTriggerCallbackTokenStorage .where("organization_id", "=", organizationId) .execute(); } + + async listByConnection( + connectionId: string, + organizationId: string, + ): Promise> { + const rows = await this.db + .selectFrom("trigger_callback_tokens") + .select(["subscription_id"]) + .where("connection_id", "=", connectionId) + .where("organization_id", "=", organizationId) + .execute(); + return rows.map((r) => ({ subscriptionId: r.subscription_id })); + } } diff --git a/apps/mesh/src/storage/types.ts b/apps/mesh/src/storage/types.ts index 5a8c3c1ee2..64ffc80274 100644 --- a/apps/mesh/src/storage/types.ts +++ b/apps/mesh/src/storage/types.ts @@ -996,6 +996,7 @@ export interface TriggerCallbackTokenTable { id: string; organization_id: string; connection_id: string; + subscription_id: string; token_hash: string; created_at: ColumnType; } diff --git a/apps/mesh/src/tools/automations/configure-trigger.ts b/apps/mesh/src/tools/automations/configure-trigger.ts index d5032d7e84..304f84b345 100644 --- a/apps/mesh/src/tools/automations/configure-trigger.ts +++ b/apps/mesh/src/tools/automations/configure-trigger.ts @@ -1,9 +1,20 @@ /** * Shared helper for configuring triggers on MCP connections. * - * Calls TRIGGER_CONFIGURE on the target connection to enable/disable - * an event trigger. When enabling, generates a callback token and URL - * so the external MCP can call back to Mesh when the trigger fires. + * Calls TRIGGER_CONFIGURE on the target connection to enable/disable an + * event trigger. Each trigger is identified by `trigger.id`, which is + * passed to the MCP as `subscriptionId` so multiple subscriptions can + * coexist on the same `(connection, event_type)` without overwriting + * each other's callback credentials. + * + * Persistence and the MCP call are kept in sync with two-phase commits: + * - On enable: generate a token pair, call TRIGGER_CONFIGURE, then + * persist the hash. On timeout we still persist (the MCP may have + * accepted) so future callbacks can authenticate. On a definitive + * error we skip persistence. + * - On disable: call TRIGGER_CONFIGURE, then delete the token row by + * subscription. Sibling subscriptions on the same connection are + * untouched. */ import type { MeshContext } from "@/core/mesh-context"; @@ -22,12 +33,20 @@ export async function configureTriggerOnMcp( if (trigger.type !== "event" || !trigger.connection_id) return { success: true }; + if (!trigger.id) { + return { + success: false, + error: "trigger.id required to configure subscription on MCP", + }; + } + const connection = await ctx.storage.connections.findById( trigger.connection_id, ); if (!connection) return { success: true }; // Connection may have been deleted const organizationId = ctx.organization?.id; + const subscriptionId = trigger.id; try { const mcpClient = await clientFromConnection(connection, ctx, true); @@ -61,6 +80,7 @@ export async function configureTriggerOnMcp( enabled, callbackUrl, callbackToken, + subscriptionId, }), timeoutPromise, ]); @@ -68,11 +88,12 @@ export async function configureTriggerOnMcp( if (timedOut && enabled && tokenStorage && organizationId && tokenHash) { // Timeout is ambiguous — the MCP may still accept the token. // Persist the hash so future callbacks can authenticate. - await tokenStorage.persistTokenHash( + await tokenStorage.persistTokenHash({ organizationId, - trigger.connection_id, + connectionId: trigger.connection_id, + subscriptionId, tokenHash, - ); + }); } // On definitive (non-timeout) failure, skip persistence — // the MCP rejected the call, old token (if any) is still valid. @@ -84,17 +105,15 @@ export async function configureTriggerOnMcp( // creates the trigger record (the MCP is already listening). try { if (enabled && tokenStorage && organizationId && tokenHash) { - await tokenStorage.persistTokenHash( + await tokenStorage.persistTokenHash({ organizationId, - trigger.connection_id, + connectionId: trigger.connection_id, + subscriptionId, tokenHash, - ); + }); } - if (!enabled && tokenStorage && organizationId) { - await tokenStorage.deleteByConnection( - trigger.connection_id, - organizationId, - ); + if (!enabled && tokenStorage) { + await tokenStorage.deleteBySubscription(subscriptionId); } } catch (dbErr) { console.error( diff --git a/apps/mesh/src/tools/automations/delete.ts b/apps/mesh/src/tools/automations/delete.ts index 06663e3079..691826c92a 100644 --- a/apps/mesh/src/tools/automations/delete.ts +++ b/apps/mesh/src/tools/automations/delete.ts @@ -52,7 +52,12 @@ export const AUTOMATION_DELETE = defineTool({ await Promise.allSettled( eventTriggers.map(async (trigger) => { - const result = await configureTriggerOnMcp(ctx, trigger, false); + const result = await configureTriggerOnMcp( + ctx, + trigger, + false, + ctx.storage.triggerCallbackTokens, + ); if (!result.success) { console.warn( `Failed to disable trigger ${trigger.id}: ${result.error}`, diff --git a/apps/mesh/src/tools/automations/trigger-add.ts b/apps/mesh/src/tools/automations/trigger-add.ts index 343f84db97..d07784b669 100644 --- a/apps/mesh/src/tools/automations/trigger-add.ts +++ b/apps/mesh/src/tools/automations/trigger-add.ts @@ -71,6 +71,11 @@ export const AUTOMATION_TRIGGER_ADD = defineTool({ } } + // Cron triggers don't need a pre-allocated id; let the storage layer + // generate one. Event triggers need it to call TRIGGER_CONFIGURE + // before the row exists. + let pendingTriggerId: string | undefined; + if (input.type === "event") { if (!input.connection_id) { throw new Error("connection_id is required for event triggers"); @@ -88,9 +93,13 @@ export const AUTOMATION_TRIGGER_ADD = defineTool({ throw new Error("Connection not found"); } - // Build a temporary trigger object for configureTriggerOnMcp + // Pre-allocate the trigger id so configureTriggerOnMcp can pass + // it as `subscriptionId`. The same id is reused on the addTrigger + // insert below — keeps the MCP-side subscription record and the + // DB row in lockstep. + const triggerId = crypto.randomUUID(); const tempTrigger = { - id: "", + id: triggerId, automation_id: input.automation_id, type: "event" as const, cron_expression: null, @@ -114,6 +123,7 @@ export const AUTOMATION_TRIGGER_ADD = defineTool({ `Failed to configure trigger on connection: ${result.error}`, ); } + pendingTriggerId = triggerId; } // Compute next_run_at for cron triggers @@ -127,6 +137,7 @@ export const AUTOMATION_TRIGGER_ADD = defineTool({ // Insert trigger record const trigger = await ctx.storage.automations.addTrigger({ + id: pendingTriggerId, automation_id: input.automation_id, type: input.type, cron_expression: input.type === "cron" ? input.cron_expression : null, diff --git a/apps/mesh/src/tools/connection/delete.ts b/apps/mesh/src/tools/connection/delete.ts index 6e0a311fd8..3c6dda1887 100644 --- a/apps/mesh/src/tools/connection/delete.ts +++ b/apps/mesh/src/tools/connection/delete.ts @@ -91,6 +91,40 @@ export const COLLECTION_CONNECTIONS_DELETE = defineTool({ } } + // Cascade trigger subscriptions on the MCP side BEFORE deleting the + // connection. Once the connection row is gone we can no longer build + // an MCP client for it (configureTriggerOnMcp short-circuits on a + // missing connection), so the disable signal would never reach the + // MCP and its KV would accumulate orphaned trigger state. + // + // Imported dynamically to avoid a module evaluation cycle between + // tools/index.ts and tools/automations/configure-trigger.ts. + const boundTriggers = await ctx.storage.automations.listTriggersByConnection( + input.id, + organization.id, + ); + const eventTriggers = boundTriggers.filter((t) => t.type === "event"); + if (eventTriggers.length > 0) { + const { configureTriggerOnMcp } = await import( + "../automations/configure-trigger" + ); + await Promise.allSettled( + eventTriggers.map(async (trigger) => { + const result = await configureTriggerOnMcp( + ctx, + trigger, + false, + ctx.storage.triggerCallbackTokens, + ); + if (!result.success) { + console.warn( + `[connection/delete] Failed to disable trigger ${trigger.id} on MCP: ${result.error}`, + ); + } + }), + ); + } + // Delete connection await ctx.storage.connections.delete(input.id); diff --git a/packages/bindings/src/well-known/trigger.ts b/packages/bindings/src/well-known/trigger.ts index 87d1cea6e4..ae8a111c23 100644 --- a/packages/bindings/src/well-known/trigger.ts +++ b/packages/bindings/src/well-known/trigger.ts @@ -63,6 +63,11 @@ export type TriggerListOutput = z.infer; * TRIGGER_CONFIGURE Input Schema * * Input for configuring a trigger with parameters. + * + * `subscriptionId` uniquely identifies a single trigger registration so the + * MCP can store many independent subscriptions on the same `(connectionId, + * type)`. Optional for backward compatibility — implementations that omit it + * collapse to a single-subscription-per-connection model. */ export const TriggerConfigureInputSchema = z.object({ type: z.string(), @@ -70,6 +75,7 @@ export const TriggerConfigureInputSchema = z.object({ enabled: z.boolean(), callbackUrl: z.url().optional(), callbackToken: z.string().min(1).optional(), + subscriptionId: z.string().min(1).optional(), }); export type TriggerConfigureInput = z.infer; diff --git a/packages/runtime/src/trigger-storage.ts b/packages/runtime/src/trigger-storage.ts index 4c7a9c8d68..693abae52f 100644 --- a/packages/runtime/src/trigger-storage.ts +++ b/packages/runtime/src/trigger-storage.ts @@ -3,10 +3,18 @@ * * - StudioKV: Persists to Mesh/Studio's KV API (recommended for production) * - JsonFileStorage: Persists to a local JSON file (for dev/simple deployments) + * + * Both implementations key entries by `(connectionId, subscriptionId)` to + * support multiple independent subscriptions per connection. */ import type { TriggerStorage } from "./triggers.ts"; +interface TriggerState { + credentials: { callbackUrl: string; callbackToken: string }; + activeTriggerTypes: string[]; +} + // ============================================================================ // StudioKV — backed by Mesh's /api/kv endpoint // ============================================================================ @@ -23,6 +31,10 @@ interface StudioKVOptions { /** * TriggerStorage backed by Mesh/Studio's org-scoped KV API. * + * Stores one record per subscription, keyed `${prefix}:${connectionId}:${subscriptionId}`. + * The `list(connectionId)` operation issues a prefix scan via + * `/api/kv?prefix=...`. + * * @example * ```typescript * import { createTriggers } from "@decocms/runtime/triggers"; @@ -48,13 +60,19 @@ export class StudioKV implements TriggerStorage { this.prefix = options.prefix ?? "triggers"; } - private key(connectionId: string): string { - return `${this.prefix}:${connectionId}`; + private key(connectionId: string, subscriptionId: string): string { + return `${this.prefix}:${connectionId}:${subscriptionId}`; } - async get(connectionId: string) { + private connectionPrefix(connectionId: string): string { + return `${this.prefix}:${connectionId}:`; + } + + async get(connectionId: string, subscriptionId: string) { const res = await fetch( - `${this.baseUrl}/api/kv/${encodeURIComponent(this.key(connectionId))}`, + `${this.baseUrl}/api/kv/${encodeURIComponent( + this.key(connectionId, subscriptionId), + )}`, { headers: { Authorization: `Bearer ${this.apiKey}` }, }, @@ -67,24 +85,19 @@ export class StudioKV implements TriggerStorage { return null; } - const body = (await res.json()) as { - value?: { - credentials: { callbackUrl: string; callbackToken: string }; - activeTriggerTypes: string[]; - }; - }; + const body = (await res.json()) as { value?: TriggerState }; return body.value ?? null; } async set( connectionId: string, - state: { - credentials: { callbackUrl: string; callbackToken: string }; - activeTriggerTypes: string[]; - }, + subscriptionId: string, + state: TriggerState, ) { const res = await fetch( - `${this.baseUrl}/api/kv/${encodeURIComponent(this.key(connectionId))}`, + `${this.baseUrl}/api/kv/${encodeURIComponent( + this.key(connectionId, subscriptionId), + )}`, { method: "PUT", headers: { @@ -100,9 +113,11 @@ export class StudioKV implements TriggerStorage { } } - async delete(connectionId: string) { + async delete(connectionId: string, subscriptionId: string) { const res = await fetch( - `${this.baseUrl}/api/kv/${encodeURIComponent(this.key(connectionId))}`, + `${this.baseUrl}/api/kv/${encodeURIComponent( + this.key(connectionId, subscriptionId), + )}`, { method: "DELETE", headers: { Authorization: `Bearer ${this.apiKey}` }, @@ -115,6 +130,26 @@ export class StudioKV implements TriggerStorage { ); } } + + async list(connectionId: string) { + const url = new URL(`${this.baseUrl}/api/kv`); + url.searchParams.set("prefix", this.connectionPrefix(connectionId)); + const res = await fetch(url, { + headers: { Authorization: `Bearer ${this.apiKey}` }, + }); + if (!res.ok) { + console.error(`[StudioKV] LIST failed: ${res.status} ${res.statusText}`); + return []; + } + const body = (await res.json()) as { + items?: Array<{ key: string; value: TriggerState }>; + }; + const prefix = this.connectionPrefix(connectionId); + return (body.items ?? []).map((item) => ({ + subscriptionId: item.key.slice(prefix.length), + state: item.value, + })); + } } // ============================================================================ @@ -130,6 +165,8 @@ interface JsonFileStorageOptions { * TriggerStorage backed by a local JSON file. * Suitable for development and single-instance deployments. * + * Records are keyed `${connectionId}:${subscriptionId}` inside the file. + * * @example * ```typescript * import { createTriggers } from "@decocms/runtime/triggers"; @@ -143,18 +180,22 @@ interface JsonFileStorageOptions { */ export class JsonFileStorage implements TriggerStorage { private path: string; - private cache: Map | null = null; + private cache: Map | null = null; constructor(options: JsonFileStorageOptions) { this.path = options.path; } - private async load(): Promise> { + private compositeKey(connectionId: string, subscriptionId: string): string { + return `${connectionId}:${subscriptionId}`; + } + + private async load(): Promise> { if (this.cache) return this.cache; try { const fs = await import("node:fs/promises"); const raw = await fs.readFile(this.path, "utf-8"); - const data = JSON.parse(raw) as Record; + const data = JSON.parse(raw) as Record; this.cache = new Map(Object.entries(data)); } catch (err: unknown) { if ( @@ -176,20 +217,36 @@ export class JsonFileStorage implements TriggerStorage { await fs.writeFile(this.path, JSON.stringify(data, null, 2)); } - async get(connectionId: string) { + async get(connectionId: string, subscriptionId: string) { const map = await this.load(); - return (map.get(connectionId) as any) ?? null; + return map.get(this.compositeKey(connectionId, subscriptionId)) ?? null; } - async set(connectionId: string, state: unknown) { + async set( + connectionId: string, + subscriptionId: string, + state: TriggerState, + ) { const map = await this.load(); - map.set(connectionId, state); + map.set(this.compositeKey(connectionId, subscriptionId), state); await this.save(); } - async delete(connectionId: string) { + async delete(connectionId: string, subscriptionId: string) { const map = await this.load(); - map.delete(connectionId); + map.delete(this.compositeKey(connectionId, subscriptionId)); await this.save(); } + + async list(connectionId: string) { + const map = await this.load(); + const prefix = `${connectionId}:`; + const out: Array<{ subscriptionId: string; state: TriggerState }> = []; + for (const [key, state] of map.entries()) { + if (key.startsWith(prefix)) { + out.push({ subscriptionId: key.slice(prefix.length), state }); + } + } + return out; + } } diff --git a/packages/runtime/src/triggers.test.ts b/packages/runtime/src/triggers.test.ts index c6e5f50415..ccb39e69b7 100644 --- a/packages/runtime/src/triggers.test.ts +++ b/packages/runtime/src/triggers.test.ts @@ -94,6 +94,7 @@ describe("createTriggers", () => { enabled: true, callbackUrl: "https://mesh.example.com/api/trigger-callback", callbackToken: "test-token-123", + subscriptionId: "sub-1", }, runtimeContext: mockCtx("conn-1"), }); @@ -126,51 +127,108 @@ describe("createTriggers", () => { fetchSpy.mockRestore(); }); - it("disabling one trigger keeps credentials when another is still active", async () => { + it("multiple subscriptions on same connection each get their own callback", async () => { + const configureTool = triggers.tools()[1]; + const fetchSpy = spyOn(globalThis, "fetch").mockResolvedValue( + new Response("ok", { status: 202 }), + ); + + // Two distinct subscriptions on the same connection, same type, with + // different callback tokens — both should fire on notify. + await configureTool.execute({ + context: { + type: "github.push", + params: { repo: "alice/repo" }, + enabled: true, + callbackUrl: "https://mesh.example.com/api/trigger-callback", + callbackToken: "token-A", + subscriptionId: "sub-A", + }, + runtimeContext: mockCtx("conn-multi-sub"), + }); + await configureTool.execute({ + context: { + type: "github.push", + params: { repo: "bob/repo" }, + enabled: true, + callbackUrl: "https://mesh.example.com/api/trigger-callback", + callbackToken: "token-B", + subscriptionId: "sub-B", + }, + runtimeContext: mockCtx("conn-multi-sub"), + }); + + triggers.notify("conn-multi-sub", "github.push", { + repository: { full_name: "x/y" }, + }); + await new Promise((r) => setTimeout(r, 50)); + + const tokens = fetchSpy.mock.calls.map((call) => { + const headers = (call[1] as RequestInit).headers as Record; + return headers.Authorization; + }); + expect(tokens).toContain("Bearer token-A"); + expect(tokens).toContain("Bearer token-B"); + expect(fetchSpy.mock.calls).toHaveLength(2); + + fetchSpy.mockRestore(); + }); + + it("disabling one subscription leaves siblings alive", async () => { const configureTool = triggers.tools()[1]; const fetchSpy = spyOn(globalThis, "fetch").mockResolvedValue( new Response("ok", { status: 202 }), ); - // Enable two trigger types await configureTool.execute({ context: { type: "github.push", params: {}, enabled: true, callbackUrl: "https://mesh.example.com/api/trigger-callback", - callbackToken: "token-multi", + callbackToken: "token-keep", + subscriptionId: "keep", }, - runtimeContext: mockCtx("conn-multi"), + runtimeContext: mockCtx("conn-sibling"), }); await configureTool.execute({ context: { - type: "github.pull_request.opened", + type: "github.push", params: {}, enabled: true, + callbackUrl: "https://mesh.example.com/api/trigger-callback", + callbackToken: "token-drop", + subscriptionId: "drop", }, - runtimeContext: mockCtx("conn-multi"), + runtimeContext: mockCtx("conn-sibling"), }); - // Disable one — credentials should stay for the other + // Disable the "drop" subscription only await configureTool.execute({ context: { type: "github.push", params: {}, enabled: false, + subscriptionId: "drop", }, - runtimeContext: mockCtx("conn-multi"), + runtimeContext: mockCtx("conn-sibling"), }); - triggers.notify("conn-multi", "github.pull_request.opened", {}); + fetchSpy.mockClear(); + triggers.notify("conn-sibling", "github.push", {}); await new Promise((r) => setTimeout(r, 50)); - expect(fetchSpy).toHaveBeenCalled(); + + const tokens = fetchSpy.mock.calls.map((call) => { + const headers = (call[1] as RequestInit).headers as Record; + return headers.Authorization; + }); + expect(tokens).toEqual(["Bearer token-keep"]); fetchSpy.mockRestore(); }); - it("disabling the last trigger clears credentials", async () => { + it("disabling the last subscription clears credentials", async () => { const configureTool = triggers.tools()[1]; const fetchSpy = spyOn(globalThis, "fetch").mockResolvedValue( @@ -178,7 +236,6 @@ describe("createTriggers", () => { ); const consoleSpy = spyOn(console, "log").mockImplementation(() => {}); - // Enable a trigger await configureTool.execute({ context: { type: "github.push", @@ -186,16 +243,17 @@ describe("createTriggers", () => { enabled: true, callbackUrl: "https://mesh.example.com/api/trigger-callback", callbackToken: "token-cleanup", + subscriptionId: "only", }, runtimeContext: mockCtx("conn-cleanup"), }); - // Disable it — last trigger, credentials should be cleared await configureTool.execute({ context: { type: "github.push", params: {}, enabled: false, + subscriptionId: "only", }, runtimeContext: mockCtx("conn-cleanup"), }); @@ -204,19 +262,19 @@ describe("createTriggers", () => { await new Promise((r) => setTimeout(r, 50)); expect(fetchSpy).not.toHaveBeenCalled(); expect(consoleSpy).toHaveBeenCalledWith( - expect.stringContaining("No callback credentials"), + expect.stringContaining("No subscriptions"), ); fetchSpy.mockRestore(); consoleSpy.mockRestore(); }); - it("notify is a no-op when no credentials exist", async () => { + it("notify is a no-op when no subscriptions exist", async () => { const consoleSpy = spyOn(console, "log").mockImplementation(() => {}); triggers.notify("unknown-conn", "github.push", {}); await new Promise((r) => setTimeout(r, 50)); expect(consoleSpy).toHaveBeenCalledWith( - expect.stringContaining("No callback credentials"), + expect.stringContaining("No subscriptions"), ); consoleSpy.mockRestore(); }); @@ -228,7 +286,6 @@ describe("createTriggers", () => { ); const errorSpy = spyOn(console, "error").mockImplementation(() => {}); - // Ensure credentials exist (reuse from prior test state or set up fresh) await configureTool.execute({ context: { type: "github.push", @@ -236,6 +293,7 @@ describe("createTriggers", () => { enabled: true, callbackUrl: "https://mesh.example.com/api/trigger-callback", callbackToken: "token-err", + subscriptionId: "err", }, runtimeContext: mockCtx("conn-err"), }); @@ -264,6 +322,56 @@ describe("createTriggers", () => { }), ).rejects.toThrow("Connection ID not available"); }); + + it("TRIGGER_CONFIGURE without subscriptionId uses legacy default slot", async () => { + // Backward-compat path: studio versions that don't pass subscriptionId + // collapse to a single sub per connection. Two enables overwrite each + // other (same as the pre-multi-sub behavior). + const t = createTriggers([ + { + type: "github.push" as const, + description: "Push", + params: z.object({ repo: z.string() }), + }, + ]); + const configureTool = t.tools()[1]; + const fetchSpy = spyOn(globalThis, "fetch").mockResolvedValue( + new Response("ok", { status: 202 }), + ); + + await configureTool.execute({ + context: { + type: "github.push", + params: {}, + enabled: true, + callbackUrl: "https://mesh.example.com/api/trigger-callback", + callbackToken: "first", + }, + runtimeContext: mockCtx("conn-legacy"), + }); + await configureTool.execute({ + context: { + type: "github.push", + params: {}, + enabled: true, + callbackUrl: "https://mesh.example.com/api/trigger-callback", + callbackToken: "second", + }, + runtimeContext: mockCtx("conn-legacy"), + }); + + t.notify("conn-legacy", "github.push", {}); + await new Promise((r) => setTimeout(r, 50)); + + expect(fetchSpy).toHaveBeenCalledTimes(1); + const headers = (fetchSpy.mock.calls[0][1] as RequestInit).headers as Record< + string, + string + >; + expect(headers.Authorization).toBe("Bearer second"); + + fetchSpy.mockRestore(); + }); }); describe("createTriggers with storage", () => { @@ -271,14 +379,27 @@ describe("createTriggers with storage", () => { data: Map; } { const data = new Map(); + const composite = (connId: string, subId: string) => `${connId}\x1f${subId}`; return { data, - get: async (id) => (data.get(id) as any) ?? null, - set: async (id, state) => { - data.set(id, state); + get: async (connId, subId) => + // biome-ignore lint: test mock + ((data.get(composite(connId, subId)) as any) ?? null), + set: async (connId, subId, state) => { + data.set(composite(connId, subId), state); + }, + delete: async (connId, subId) => { + data.delete(composite(connId, subId)); }, - delete: async (id) => { - data.delete(id); + list: async (connId) => { + const prefix = `${connId}\x1f`; + const out: Array<{ subscriptionId: string; state: any }> = []; + for (const [key, state] of data.entries()) { + if (key.startsWith(prefix)) { + out.push({ subscriptionId: key.slice(prefix.length), state }); + } + } + return out; }, }; } @@ -305,12 +426,14 @@ describe("createTriggers with storage", () => { enabled: true, callbackUrl: "https://mesh.example.com/api/trigger-callback", callbackToken: "persisted-token", + subscriptionId: "sub1", }, runtimeContext: mockCtx("conn-persist"), }); - expect(storage.data.has("conn-persist")).toBe(true); - const stored = storage.data.get("conn-persist") as any; + expect(storage.data.has("conn-persist\x1fsub1")).toBe(true); + // biome-ignore lint: test mock + const stored = storage.data.get("conn-persist\x1fsub1") as any; expect(stored.credentials.callbackToken).toBe("persisted-token"); expect(stored.activeTriggerTypes).toEqual(["github.push"]); }); @@ -327,25 +450,31 @@ describe("createTriggers with storage", () => { enabled: true, callbackUrl: "https://mesh.example.com/api/trigger-callback", callbackToken: "to-delete", + subscriptionId: "sub1", }, runtimeContext: mockCtx("conn-del"), }); - expect(storage.data.has("conn-del")).toBe(true); + expect(storage.data.has("conn-del\x1fsub1")).toBe(true); await configureTool.execute({ - context: { type: "github.push", params: {}, enabled: false }, + context: { + type: "github.push", + params: {}, + enabled: false, + subscriptionId: "sub1", + }, runtimeContext: mockCtx("conn-del"), }); - expect(storage.data.has("conn-del")).toBe(false); + expect(storage.data.has("conn-del\x1fsub1")).toBe(false); }); it("restores credentials from storage on notify after restart", async () => { const storage = createMockStorage(); // Simulate prior session: write state directly to storage - storage.data.set("conn-restart", { + storage.data.set("conn-restart\x1fsub1", { credentials: { callbackUrl: "https://mesh.example.com/api/trigger-callback", callbackToken: "restored-token", @@ -375,37 +504,47 @@ describe("createTriggers with storage", () => { fetchSpy.mockRestore(); }); - it("disable after restart clears persisted credentials from storage", async () => { + it("disable after restart clears the persisted subscription only", async () => { const storage = createMockStorage(); - // Simulate prior session - storage.data.set("conn-disable-restart", { + // Simulate prior session with two siblings + storage.data.set("conn-disable-restart\x1fkeep", { credentials: { callbackUrl: "https://mesh.example.com/api/trigger-callback", - callbackToken: "stale-token", + callbackToken: "keep-token", + }, + activeTriggerTypes: ["github.push"], + }); + storage.data.set("conn-disable-restart\x1fdrop", { + credentials: { + callbackUrl: "https://mesh.example.com/api/trigger-callback", + callbackToken: "drop-token", }, activeTriggerTypes: ["github.push"], }); - // New instance (simulates restart) const t = createTriggers({ definitions: defs, storage }); const configureTool = t.tools()[1]; - // Disable the trigger — should load from storage, then clean up await configureTool.execute({ - context: { type: "github.push", params: {}, enabled: false }, + context: { + type: "github.push", + params: {}, + enabled: false, + subscriptionId: "drop", + }, runtimeContext: mockCtx("conn-disable-restart"), }); - expect(storage.data.has("conn-disable-restart")).toBe(false); + expect(storage.data.has("conn-disable-restart\x1fdrop")).toBe(false); + expect(storage.data.has("conn-disable-restart\x1fkeep")).toBe(true); - // notify should be a no-op - const consoleSpy = spyOn(console, "log").mockImplementation(() => {}); + const fetchSpy = spyOn(globalThis, "fetch").mockResolvedValue( + new Response("ok", { status: 202 }), + ); t.notify("conn-disable-restart", "github.push", {}); await new Promise((r) => setTimeout(r, 50)); - expect(consoleSpy).toHaveBeenCalledWith( - expect.stringContaining("No callback credentials"), - ); - consoleSpy.mockRestore(); + expect(fetchSpy).toHaveBeenCalledTimes(1); + fetchSpy.mockRestore(); }); }); diff --git a/packages/runtime/src/triggers.ts b/packages/runtime/src/triggers.ts index 5608adaa6f..003a2ceb2f 100644 --- a/packages/runtime/src/triggers.ts +++ b/packages/runtime/src/triggers.ts @@ -17,18 +17,43 @@ interface TriggerState { activeTriggerTypes: string[]; } +/** + * Sentinel used when TRIGGER_CONFIGURE arrives without a `subscriptionId`. + * Lets studio upgrade independently of MCPs and lets MCPs serve clients on + * older bindings that don't know how to mint a subscriptionId. The cost is + * that all such legacy registrations collapse to one slot per connection — + * the same single-sub limitation the previous version had. + */ +const LEGACY_SUBSCRIPTION_ID = "__default"; + /** * Storage interface for persisting trigger state across MCP restarts. * - * Implement this with your storage backend (KV, DB, file system, etc.) - * and pass it to `createTriggers({ storage })`. + * Each subscription is a unique (connectionId, subscriptionId) pair. The + * same connectionId may host many independent subscriptions (e.g. one + * per automation listening to the same event type with different filter + * params), each with its own callback credentials. * - * Keys are connection IDs, values are serializable trigger state objects. + * Implement this with your storage backend (KV, DB, file system, etc.) + * and pass it to `createTriggers({ storage })`. The runtime calls `list` + * during webhook fanout to find every active subscription for a given + * connection — implementations should make this fast (e.g. KV + * `list({ prefix })`). */ export interface TriggerStorage { - get(connectionId: string): Promise; - set(connectionId: string, state: TriggerState): Promise; - delete(connectionId: string): Promise; + get( + connectionId: string, + subscriptionId: string, + ): Promise; + set( + connectionId: string, + subscriptionId: string, + state: TriggerState, + ): Promise; + delete(connectionId: string, subscriptionId: string): Promise; + list( + connectionId: string, + ): Promise>; } interface TriggerDef< @@ -53,8 +78,9 @@ interface Triggers { tools(): CreatedTool[]; /** - * Notify Mesh that an event occurred. - * The SDK matches it to stored callback credentials and POSTs to Mesh. + * Notify Mesh that an event occurred. Fans out to every subscription + * registered against `connectionId` whose active types include `type`, + * POSTing the payload to each subscription's callback URL. * Fire-and-forget — errors are logged, not thrown. */ notify( @@ -64,71 +90,114 @@ interface Triggers { ): void; } -// In-memory cache backed by optional persistent storage +// In-memory cache keyed by `${connectionId}:${subscriptionId}`. Persistent +// storage is the source of truth — the cache exists to avoid a KV round +// trip on hot paths and to survive registrations that arrive before any +// notify happens. class TriggerStateManager { - private credentials = new Map(); - private activeTriggers = new Map>(); + private subscriptions = new Map(); + // Tracks whether we've already loaded all subs for a given connectionId + // from storage, so notify() doesn't issue redundant `list` calls. + private listed = new Set(); private storage: TriggerStorage | null; constructor(storage?: TriggerStorage) { this.storage = storage ?? null; } - getCredentials(connectionId: string): CallbackCredentials | undefined { - return this.credentials.get(connectionId); + private cacheKey(connectionId: string, subscriptionId: string): string { + return `${connectionId}\x1f${subscriptionId}`; + } + + private parseCacheKey( + key: string, + ): { connectionId: string; subscriptionId: string } { + const idx = key.indexOf("\x1f"); + return { + connectionId: key.slice(0, idx), + subscriptionId: key.slice(idx + 1), + }; } - async loadFromStorage(connectionId: string): Promise { - if (!this.storage || this.credentials.has(connectionId)) return; - const state = await this.storage.get(connectionId); - if (state) { - this.credentials.set(connectionId, state.credentials); - this.activeTriggers.set(connectionId, new Set(state.activeTriggerTypes)); + async listForConnection( + connectionId: string, + ): Promise> { + if (!this.listed.has(connectionId) && this.storage) { + const records = await this.storage.list(connectionId); + for (const { subscriptionId, state } of records) { + this.subscriptions.set(this.cacheKey(connectionId, subscriptionId), state); + } + this.listed.add(connectionId); } + const out: Array<{ subscriptionId: string; state: TriggerState }> = []; + const prefix = `${connectionId}\x1f`; + for (const [key, state] of this.subscriptions.entries()) { + if (key.startsWith(prefix)) { + const { subscriptionId } = this.parseCacheKey(key); + out.push({ subscriptionId, state }); + } + } + return out; } async enable( connectionId: string, + subscriptionId: string, triggerType: string, newCredentials?: CallbackCredentials, ): Promise { - if (newCredentials) { - this.credentials.set(connectionId, newCredentials); + const key = this.cacheKey(connectionId, subscriptionId); + const existing = this.subscriptions.get(key); + const credentials = newCredentials ?? existing?.credentials; + if (!credentials) { + // First enable for this subscription must include credentials. + // Without them the callback can't be delivered, so refuse loudly. + throw new Error( + `[Triggers] enable(${connectionId}/${subscriptionId}): credentials required on first registration`, + ); } - - const types = this.activeTriggers.get(connectionId) ?? new Set(); + const types = new Set(existing?.activeTriggerTypes ?? []); types.add(triggerType); - this.activeTriggers.set(connectionId, types); - - await this.persist(connectionId); + const state: TriggerState = { + credentials, + activeTriggerTypes: [...types], + }; + this.subscriptions.set(key, state); + if (this.storage) { + await this.storage.set(connectionId, subscriptionId, state); + } } - async disable(connectionId: string, triggerType: string): Promise { - // Ensure state is loaded (may be empty after restart) - await this.loadFromStorage(connectionId); - const types = this.activeTriggers.get(connectionId); - if (types) { - types.delete(triggerType); - if (types.size === 0) { - this.activeTriggers.delete(connectionId); - this.credentials.delete(connectionId); - await this.storage?.delete(connectionId); - return; + async disable( + connectionId: string, + subscriptionId: string, + triggerType: string, + ): Promise { + const key = this.cacheKey(connectionId, subscriptionId); + let existing = this.subscriptions.get(key); + if (!existing && this.storage) { + existing = (await this.storage.get(connectionId, subscriptionId)) ?? undefined; + if (existing) this.subscriptions.set(key, existing); + } + if (!existing) return; + + const types = new Set(existing.activeTriggerTypes); + types.delete(triggerType); + if (types.size === 0) { + this.subscriptions.delete(key); + if (this.storage) { + await this.storage.delete(connectionId, subscriptionId); } + return; } - - await this.persist(connectionId); - } - - private async persist(connectionId: string): Promise { - if (!this.storage) return; - const creds = this.credentials.get(connectionId); - const types = this.activeTriggers.get(connectionId); - if (!creds || !types || types.size === 0) return; - await this.storage.set(connectionId, { - credentials: creds, + const next: TriggerState = { + credentials: existing.credentials, activeTriggerTypes: [...types], - }); + }; + this.subscriptions.set(key, next); + if (this.storage) { + await this.storage.set(connectionId, subscriptionId, next); + } } } @@ -228,6 +297,8 @@ export function createTriggers( throw new Error("Connection ID not available"); } + const subscriptionId = context.subscriptionId ?? LEGACY_SUBSCRIPTION_ID; + if (context.enabled) { const creds = context.callbackUrl && context.callbackToken @@ -236,9 +307,9 @@ export function createTriggers( callbackToken: context.callbackToken, } : undefined; - await state.enable(connectionId, context.type, creds); + await state.enable(connectionId, subscriptionId, context.type, creds); } else { - await state.disable(connectionId, context.type); + await state.disable(connectionId, subscriptionId, context.type); } return { success: true }; @@ -251,29 +322,27 @@ export function createTriggers( }, notify(connectionId, type, data) { - // Try in-memory first, fall back to storage load - const credentials = state.getCredentials(connectionId); - if (credentials) { - deliverCallback(credentials, type, data); - return; - } - - // Attempt async load from storage (fire-and-forget) + // Fanout: deliver to every subscription on this connection whose + // active types include `type`. Fire-and-forget — failures log but + // don't cascade. state - .loadFromStorage(connectionId) - .then(() => { - const loaded = state.getCredentials(connectionId); - if (loaded) { - deliverCallback(loaded, type, data); - } else { + .listForConnection(connectionId) + .then((records) => { + let delivered = 0; + for (const { state: sub } of records) { + if (!sub.activeTriggerTypes.includes(type)) continue; + deliverCallback(sub.credentials, type, data); + delivered++; + } + if (delivered === 0) { console.log( - `[Triggers] No callback credentials for connection=${connectionId}, skipping notify`, + `[Triggers] No subscriptions for connection=${connectionId} type=${type}, skipping notify`, ); } }) .catch((err) => { console.error( - `[Triggers] Failed to load credentials for ${connectionId}:`, + `[Triggers] Failed to fanout for ${connectionId}/${type}:`, err, ); });