From 24838e72c08e9af3c53a71f92f9c747976c75f53 Mon Sep 17 00:00:00 2001 From: viktormarinho Date: Fri, 8 May 2026 18:50:12 -0300 Subject: [PATCH 1/2] feat(runtime): support multiple subscriptions per (connection, type) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The TriggerStateManager keyed callback credentials by connectionId, so two TRIGGER_CONFIGURE calls on the same (connectionId, type) overwrote each other's tokens and one disable wiped the entire connection's state. This made it impossible to host independent subscriptions on a shared OAuth connection — e.g. two automations listening to gmail.message.received with different filter params. Add an optional `subscriptionId` to TRIGGER_CONFIGURE input. When present, it disambiguates registrations on the same connection; when absent it collapses to a single legacy slot so older mesh versions keep working. The TriggerStorage interface gains a `list(connectionId)` method so notify can fanout to every active subscription. Both built-in storages (StudioKV, JsonFileStorage) implement it. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/bindings/src/well-known/trigger.ts | 6 + packages/runtime/src/trigger-storage.ts | 109 +++++++--- packages/runtime/src/triggers.test.ts | 223 ++++++++++++++++---- packages/runtime/src/triggers.ts | 203 ++++++++++++------ 4 files changed, 406 insertions(+), 135 deletions(-) diff --git a/packages/bindings/src/well-known/trigger.ts b/packages/bindings/src/well-known/trigger.ts index 87d1cea6e4..ae8a111c23 100644 --- a/packages/bindings/src/well-known/trigger.ts +++ b/packages/bindings/src/well-known/trigger.ts @@ -63,6 +63,11 @@ export type TriggerListOutput = z.infer; * TRIGGER_CONFIGURE Input Schema * * Input for configuring a trigger with parameters. + * + * `subscriptionId` uniquely identifies a single trigger registration so the + * MCP can store many independent subscriptions on the same `(connectionId, + * type)`. Optional for backward compatibility — implementations that omit it + * collapse to a single-subscription-per-connection model. */ export const TriggerConfigureInputSchema = z.object({ type: z.string(), @@ -70,6 +75,7 @@ export const TriggerConfigureInputSchema = z.object({ enabled: z.boolean(), callbackUrl: z.url().optional(), callbackToken: z.string().min(1).optional(), + subscriptionId: z.string().min(1).optional(), }); export type TriggerConfigureInput = z.infer; diff --git a/packages/runtime/src/trigger-storage.ts b/packages/runtime/src/trigger-storage.ts index 4c7a9c8d68..693abae52f 100644 --- a/packages/runtime/src/trigger-storage.ts +++ b/packages/runtime/src/trigger-storage.ts @@ -3,10 +3,18 @@ * * - StudioKV: Persists to Mesh/Studio's KV API (recommended for production) * - JsonFileStorage: Persists to a local JSON file (for dev/simple deployments) + * + * Both implementations key entries by `(connectionId, subscriptionId)` to + * support multiple independent subscriptions per connection. */ import type { TriggerStorage } from "./triggers.ts"; +interface TriggerState { + credentials: { callbackUrl: string; callbackToken: string }; + activeTriggerTypes: string[]; +} + // ============================================================================ // StudioKV — backed by Mesh's /api/kv endpoint // ============================================================================ @@ -23,6 +31,10 @@ interface StudioKVOptions { /** * TriggerStorage backed by Mesh/Studio's org-scoped KV API. * + * Stores one record per subscription, keyed `${prefix}:${connectionId}:${subscriptionId}`. + * The `list(connectionId)` operation issues a prefix scan via + * `/api/kv?prefix=...`. + * * @example * ```typescript * import { createTriggers } from "@decocms/runtime/triggers"; @@ -48,13 +60,19 @@ export class StudioKV implements TriggerStorage { this.prefix = options.prefix ?? "triggers"; } - private key(connectionId: string): string { - return `${this.prefix}:${connectionId}`; + private key(connectionId: string, subscriptionId: string): string { + return `${this.prefix}:${connectionId}:${subscriptionId}`; } - async get(connectionId: string) { + private connectionPrefix(connectionId: string): string { + return `${this.prefix}:${connectionId}:`; + } + + async get(connectionId: string, subscriptionId: string) { const res = await fetch( - `${this.baseUrl}/api/kv/${encodeURIComponent(this.key(connectionId))}`, + `${this.baseUrl}/api/kv/${encodeURIComponent( + this.key(connectionId, subscriptionId), + )}`, { headers: { Authorization: `Bearer ${this.apiKey}` }, }, @@ -67,24 +85,19 @@ export class StudioKV implements TriggerStorage { return null; } - const body = (await res.json()) as { - value?: { - credentials: { callbackUrl: string; callbackToken: string }; - activeTriggerTypes: string[]; - }; - }; + const body = (await res.json()) as { value?: TriggerState }; return body.value ?? null; } async set( connectionId: string, - state: { - credentials: { callbackUrl: string; callbackToken: string }; - activeTriggerTypes: string[]; - }, + subscriptionId: string, + state: TriggerState, ) { const res = await fetch( - `${this.baseUrl}/api/kv/${encodeURIComponent(this.key(connectionId))}`, + `${this.baseUrl}/api/kv/${encodeURIComponent( + this.key(connectionId, subscriptionId), + )}`, { method: "PUT", headers: { @@ -100,9 +113,11 @@ export class StudioKV implements TriggerStorage { } } - async delete(connectionId: string) { + async delete(connectionId: string, subscriptionId: string) { const res = await fetch( - `${this.baseUrl}/api/kv/${encodeURIComponent(this.key(connectionId))}`, + `${this.baseUrl}/api/kv/${encodeURIComponent( + this.key(connectionId, subscriptionId), + )}`, { method: "DELETE", headers: { Authorization: `Bearer ${this.apiKey}` }, @@ -115,6 +130,26 @@ export class StudioKV implements TriggerStorage { ); } } + + async list(connectionId: string) { + const url = new URL(`${this.baseUrl}/api/kv`); + url.searchParams.set("prefix", this.connectionPrefix(connectionId)); + const res = await fetch(url, { + headers: { Authorization: `Bearer ${this.apiKey}` }, + }); + if (!res.ok) { + console.error(`[StudioKV] LIST failed: ${res.status} ${res.statusText}`); + return []; + } + const body = (await res.json()) as { + items?: Array<{ key: string; value: TriggerState }>; + }; + const prefix = this.connectionPrefix(connectionId); + return (body.items ?? []).map((item) => ({ + subscriptionId: item.key.slice(prefix.length), + state: item.value, + })); + } } // ============================================================================ @@ -130,6 +165,8 @@ interface JsonFileStorageOptions { * TriggerStorage backed by a local JSON file. * Suitable for development and single-instance deployments. * + * Records are keyed `${connectionId}:${subscriptionId}` inside the file. + * * @example * ```typescript * import { createTriggers } from "@decocms/runtime/triggers"; @@ -143,18 +180,22 @@ interface JsonFileStorageOptions { */ export class JsonFileStorage implements TriggerStorage { private path: string; - private cache: Map | null = null; + private cache: Map | null = null; constructor(options: JsonFileStorageOptions) { this.path = options.path; } - private async load(): Promise> { + private compositeKey(connectionId: string, subscriptionId: string): string { + return `${connectionId}:${subscriptionId}`; + } + + private async load(): Promise> { if (this.cache) return this.cache; try { const fs = await import("node:fs/promises"); const raw = await fs.readFile(this.path, "utf-8"); - const data = JSON.parse(raw) as Record; + const data = JSON.parse(raw) as Record; this.cache = new Map(Object.entries(data)); } catch (err: unknown) { if ( @@ -176,20 +217,36 @@ export class JsonFileStorage implements TriggerStorage { await fs.writeFile(this.path, JSON.stringify(data, null, 2)); } - async get(connectionId: string) { + async get(connectionId: string, subscriptionId: string) { const map = await this.load(); - return (map.get(connectionId) as any) ?? null; + return map.get(this.compositeKey(connectionId, subscriptionId)) ?? null; } - async set(connectionId: string, state: unknown) { + async set( + connectionId: string, + subscriptionId: string, + state: TriggerState, + ) { const map = await this.load(); - map.set(connectionId, state); + map.set(this.compositeKey(connectionId, subscriptionId), state); await this.save(); } - async delete(connectionId: string) { + async delete(connectionId: string, subscriptionId: string) { const map = await this.load(); - map.delete(connectionId); + map.delete(this.compositeKey(connectionId, subscriptionId)); await this.save(); } + + async list(connectionId: string) { + const map = await this.load(); + const prefix = `${connectionId}:`; + const out: Array<{ subscriptionId: string; state: TriggerState }> = []; + for (const [key, state] of map.entries()) { + if (key.startsWith(prefix)) { + out.push({ subscriptionId: key.slice(prefix.length), state }); + } + } + return out; + } } diff --git a/packages/runtime/src/triggers.test.ts b/packages/runtime/src/triggers.test.ts index c6e5f50415..ccb39e69b7 100644 --- a/packages/runtime/src/triggers.test.ts +++ b/packages/runtime/src/triggers.test.ts @@ -94,6 +94,7 @@ describe("createTriggers", () => { enabled: true, callbackUrl: "https://mesh.example.com/api/trigger-callback", callbackToken: "test-token-123", + subscriptionId: "sub-1", }, runtimeContext: mockCtx("conn-1"), }); @@ -126,51 +127,108 @@ describe("createTriggers", () => { fetchSpy.mockRestore(); }); - it("disabling one trigger keeps credentials when another is still active", async () => { + it("multiple subscriptions on same connection each get their own callback", async () => { + const configureTool = triggers.tools()[1]; + const fetchSpy = spyOn(globalThis, "fetch").mockResolvedValue( + new Response("ok", { status: 202 }), + ); + + // Two distinct subscriptions on the same connection, same type, with + // different callback tokens — both should fire on notify. + await configureTool.execute({ + context: { + type: "github.push", + params: { repo: "alice/repo" }, + enabled: true, + callbackUrl: "https://mesh.example.com/api/trigger-callback", + callbackToken: "token-A", + subscriptionId: "sub-A", + }, + runtimeContext: mockCtx("conn-multi-sub"), + }); + await configureTool.execute({ + context: { + type: "github.push", + params: { repo: "bob/repo" }, + enabled: true, + callbackUrl: "https://mesh.example.com/api/trigger-callback", + callbackToken: "token-B", + subscriptionId: "sub-B", + }, + runtimeContext: mockCtx("conn-multi-sub"), + }); + + triggers.notify("conn-multi-sub", "github.push", { + repository: { full_name: "x/y" }, + }); + await new Promise((r) => setTimeout(r, 50)); + + const tokens = fetchSpy.mock.calls.map((call) => { + const headers = (call[1] as RequestInit).headers as Record; + return headers.Authorization; + }); + expect(tokens).toContain("Bearer token-A"); + expect(tokens).toContain("Bearer token-B"); + expect(fetchSpy.mock.calls).toHaveLength(2); + + fetchSpy.mockRestore(); + }); + + it("disabling one subscription leaves siblings alive", async () => { const configureTool = triggers.tools()[1]; const fetchSpy = spyOn(globalThis, "fetch").mockResolvedValue( new Response("ok", { status: 202 }), ); - // Enable two trigger types await configureTool.execute({ context: { type: "github.push", params: {}, enabled: true, callbackUrl: "https://mesh.example.com/api/trigger-callback", - callbackToken: "token-multi", + callbackToken: "token-keep", + subscriptionId: "keep", }, - runtimeContext: mockCtx("conn-multi"), + runtimeContext: mockCtx("conn-sibling"), }); await configureTool.execute({ context: { - type: "github.pull_request.opened", + type: "github.push", params: {}, enabled: true, + callbackUrl: "https://mesh.example.com/api/trigger-callback", + callbackToken: "token-drop", + subscriptionId: "drop", }, - runtimeContext: mockCtx("conn-multi"), + runtimeContext: mockCtx("conn-sibling"), }); - // Disable one — credentials should stay for the other + // Disable the "drop" subscription only await configureTool.execute({ context: { type: "github.push", params: {}, enabled: false, + subscriptionId: "drop", }, - runtimeContext: mockCtx("conn-multi"), + runtimeContext: mockCtx("conn-sibling"), }); - triggers.notify("conn-multi", "github.pull_request.opened", {}); + fetchSpy.mockClear(); + triggers.notify("conn-sibling", "github.push", {}); await new Promise((r) => setTimeout(r, 50)); - expect(fetchSpy).toHaveBeenCalled(); + + const tokens = fetchSpy.mock.calls.map((call) => { + const headers = (call[1] as RequestInit).headers as Record; + return headers.Authorization; + }); + expect(tokens).toEqual(["Bearer token-keep"]); fetchSpy.mockRestore(); }); - it("disabling the last trigger clears credentials", async () => { + it("disabling the last subscription clears credentials", async () => { const configureTool = triggers.tools()[1]; const fetchSpy = spyOn(globalThis, "fetch").mockResolvedValue( @@ -178,7 +236,6 @@ describe("createTriggers", () => { ); const consoleSpy = spyOn(console, "log").mockImplementation(() => {}); - // Enable a trigger await configureTool.execute({ context: { type: "github.push", @@ -186,16 +243,17 @@ describe("createTriggers", () => { enabled: true, callbackUrl: "https://mesh.example.com/api/trigger-callback", callbackToken: "token-cleanup", + subscriptionId: "only", }, runtimeContext: mockCtx("conn-cleanup"), }); - // Disable it — last trigger, credentials should be cleared await configureTool.execute({ context: { type: "github.push", params: {}, enabled: false, + subscriptionId: "only", }, runtimeContext: mockCtx("conn-cleanup"), }); @@ -204,19 +262,19 @@ describe("createTriggers", () => { await new Promise((r) => setTimeout(r, 50)); expect(fetchSpy).not.toHaveBeenCalled(); expect(consoleSpy).toHaveBeenCalledWith( - expect.stringContaining("No callback credentials"), + expect.stringContaining("No subscriptions"), ); fetchSpy.mockRestore(); consoleSpy.mockRestore(); }); - it("notify is a no-op when no credentials exist", async () => { + it("notify is a no-op when no subscriptions exist", async () => { const consoleSpy = spyOn(console, "log").mockImplementation(() => {}); triggers.notify("unknown-conn", "github.push", {}); await new Promise((r) => setTimeout(r, 50)); expect(consoleSpy).toHaveBeenCalledWith( - expect.stringContaining("No callback credentials"), + expect.stringContaining("No subscriptions"), ); consoleSpy.mockRestore(); }); @@ -228,7 +286,6 @@ describe("createTriggers", () => { ); const errorSpy = spyOn(console, "error").mockImplementation(() => {}); - // Ensure credentials exist (reuse from prior test state or set up fresh) await configureTool.execute({ context: { type: "github.push", @@ -236,6 +293,7 @@ describe("createTriggers", () => { enabled: true, callbackUrl: "https://mesh.example.com/api/trigger-callback", callbackToken: "token-err", + subscriptionId: "err", }, runtimeContext: mockCtx("conn-err"), }); @@ -264,6 +322,56 @@ describe("createTriggers", () => { }), ).rejects.toThrow("Connection ID not available"); }); + + it("TRIGGER_CONFIGURE without subscriptionId uses legacy default slot", async () => { + // Backward-compat path: studio versions that don't pass subscriptionId + // collapse to a single sub per connection. Two enables overwrite each + // other (same as the pre-multi-sub behavior). + const t = createTriggers([ + { + type: "github.push" as const, + description: "Push", + params: z.object({ repo: z.string() }), + }, + ]); + const configureTool = t.tools()[1]; + const fetchSpy = spyOn(globalThis, "fetch").mockResolvedValue( + new Response("ok", { status: 202 }), + ); + + await configureTool.execute({ + context: { + type: "github.push", + params: {}, + enabled: true, + callbackUrl: "https://mesh.example.com/api/trigger-callback", + callbackToken: "first", + }, + runtimeContext: mockCtx("conn-legacy"), + }); + await configureTool.execute({ + context: { + type: "github.push", + params: {}, + enabled: true, + callbackUrl: "https://mesh.example.com/api/trigger-callback", + callbackToken: "second", + }, + runtimeContext: mockCtx("conn-legacy"), + }); + + t.notify("conn-legacy", "github.push", {}); + await new Promise((r) => setTimeout(r, 50)); + + expect(fetchSpy).toHaveBeenCalledTimes(1); + const headers = (fetchSpy.mock.calls[0][1] as RequestInit).headers as Record< + string, + string + >; + expect(headers.Authorization).toBe("Bearer second"); + + fetchSpy.mockRestore(); + }); }); describe("createTriggers with storage", () => { @@ -271,14 +379,27 @@ describe("createTriggers with storage", () => { data: Map; } { const data = new Map(); + const composite = (connId: string, subId: string) => `${connId}\x1f${subId}`; return { data, - get: async (id) => (data.get(id) as any) ?? null, - set: async (id, state) => { - data.set(id, state); + get: async (connId, subId) => + // biome-ignore lint: test mock + ((data.get(composite(connId, subId)) as any) ?? null), + set: async (connId, subId, state) => { + data.set(composite(connId, subId), state); + }, + delete: async (connId, subId) => { + data.delete(composite(connId, subId)); }, - delete: async (id) => { - data.delete(id); + list: async (connId) => { + const prefix = `${connId}\x1f`; + const out: Array<{ subscriptionId: string; state: any }> = []; + for (const [key, state] of data.entries()) { + if (key.startsWith(prefix)) { + out.push({ subscriptionId: key.slice(prefix.length), state }); + } + } + return out; }, }; } @@ -305,12 +426,14 @@ describe("createTriggers with storage", () => { enabled: true, callbackUrl: "https://mesh.example.com/api/trigger-callback", callbackToken: "persisted-token", + subscriptionId: "sub1", }, runtimeContext: mockCtx("conn-persist"), }); - expect(storage.data.has("conn-persist")).toBe(true); - const stored = storage.data.get("conn-persist") as any; + expect(storage.data.has("conn-persist\x1fsub1")).toBe(true); + // biome-ignore lint: test mock + const stored = storage.data.get("conn-persist\x1fsub1") as any; expect(stored.credentials.callbackToken).toBe("persisted-token"); expect(stored.activeTriggerTypes).toEqual(["github.push"]); }); @@ -327,25 +450,31 @@ describe("createTriggers with storage", () => { enabled: true, callbackUrl: "https://mesh.example.com/api/trigger-callback", callbackToken: "to-delete", + subscriptionId: "sub1", }, runtimeContext: mockCtx("conn-del"), }); - expect(storage.data.has("conn-del")).toBe(true); + expect(storage.data.has("conn-del\x1fsub1")).toBe(true); await configureTool.execute({ - context: { type: "github.push", params: {}, enabled: false }, + context: { + type: "github.push", + params: {}, + enabled: false, + subscriptionId: "sub1", + }, runtimeContext: mockCtx("conn-del"), }); - expect(storage.data.has("conn-del")).toBe(false); + expect(storage.data.has("conn-del\x1fsub1")).toBe(false); }); it("restores credentials from storage on notify after restart", async () => { const storage = createMockStorage(); // Simulate prior session: write state directly to storage - storage.data.set("conn-restart", { + storage.data.set("conn-restart\x1fsub1", { credentials: { callbackUrl: "https://mesh.example.com/api/trigger-callback", callbackToken: "restored-token", @@ -375,37 +504,47 @@ describe("createTriggers with storage", () => { fetchSpy.mockRestore(); }); - it("disable after restart clears persisted credentials from storage", async () => { + it("disable after restart clears the persisted subscription only", async () => { const storage = createMockStorage(); - // Simulate prior session - storage.data.set("conn-disable-restart", { + // Simulate prior session with two siblings + storage.data.set("conn-disable-restart\x1fkeep", { credentials: { callbackUrl: "https://mesh.example.com/api/trigger-callback", - callbackToken: "stale-token", + callbackToken: "keep-token", + }, + activeTriggerTypes: ["github.push"], + }); + storage.data.set("conn-disable-restart\x1fdrop", { + credentials: { + callbackUrl: "https://mesh.example.com/api/trigger-callback", + callbackToken: "drop-token", }, activeTriggerTypes: ["github.push"], }); - // New instance (simulates restart) const t = createTriggers({ definitions: defs, storage }); const configureTool = t.tools()[1]; - // Disable the trigger — should load from storage, then clean up await configureTool.execute({ - context: { type: "github.push", params: {}, enabled: false }, + context: { + type: "github.push", + params: {}, + enabled: false, + subscriptionId: "drop", + }, runtimeContext: mockCtx("conn-disable-restart"), }); - expect(storage.data.has("conn-disable-restart")).toBe(false); + expect(storage.data.has("conn-disable-restart\x1fdrop")).toBe(false); + expect(storage.data.has("conn-disable-restart\x1fkeep")).toBe(true); - // notify should be a no-op - const consoleSpy = spyOn(console, "log").mockImplementation(() => {}); + const fetchSpy = spyOn(globalThis, "fetch").mockResolvedValue( + new Response("ok", { status: 202 }), + ); t.notify("conn-disable-restart", "github.push", {}); await new Promise((r) => setTimeout(r, 50)); - expect(consoleSpy).toHaveBeenCalledWith( - expect.stringContaining("No callback credentials"), - ); - consoleSpy.mockRestore(); + expect(fetchSpy).toHaveBeenCalledTimes(1); + fetchSpy.mockRestore(); }); }); diff --git a/packages/runtime/src/triggers.ts b/packages/runtime/src/triggers.ts index 5608adaa6f..003a2ceb2f 100644 --- a/packages/runtime/src/triggers.ts +++ b/packages/runtime/src/triggers.ts @@ -17,18 +17,43 @@ interface TriggerState { activeTriggerTypes: string[]; } +/** + * Sentinel used when TRIGGER_CONFIGURE arrives without a `subscriptionId`. + * Lets studio upgrade independently of MCPs and lets MCPs serve clients on + * older bindings that don't know how to mint a subscriptionId. The cost is + * that all such legacy registrations collapse to one slot per connection — + * the same single-sub limitation the previous version had. + */ +const LEGACY_SUBSCRIPTION_ID = "__default"; + /** * Storage interface for persisting trigger state across MCP restarts. * - * Implement this with your storage backend (KV, DB, file system, etc.) - * and pass it to `createTriggers({ storage })`. + * Each subscription is a unique (connectionId, subscriptionId) pair. The + * same connectionId may host many independent subscriptions (e.g. one + * per automation listening to the same event type with different filter + * params), each with its own callback credentials. * - * Keys are connection IDs, values are serializable trigger state objects. + * Implement this with your storage backend (KV, DB, file system, etc.) + * and pass it to `createTriggers({ storage })`. The runtime calls `list` + * during webhook fanout to find every active subscription for a given + * connection — implementations should make this fast (e.g. KV + * `list({ prefix })`). */ export interface TriggerStorage { - get(connectionId: string): Promise; - set(connectionId: string, state: TriggerState): Promise; - delete(connectionId: string): Promise; + get( + connectionId: string, + subscriptionId: string, + ): Promise; + set( + connectionId: string, + subscriptionId: string, + state: TriggerState, + ): Promise; + delete(connectionId: string, subscriptionId: string): Promise; + list( + connectionId: string, + ): Promise>; } interface TriggerDef< @@ -53,8 +78,9 @@ interface Triggers { tools(): CreatedTool[]; /** - * Notify Mesh that an event occurred. - * The SDK matches it to stored callback credentials and POSTs to Mesh. + * Notify Mesh that an event occurred. Fans out to every subscription + * registered against `connectionId` whose active types include `type`, + * POSTing the payload to each subscription's callback URL. * Fire-and-forget — errors are logged, not thrown. */ notify( @@ -64,71 +90,114 @@ interface Triggers { ): void; } -// In-memory cache backed by optional persistent storage +// In-memory cache keyed by `${connectionId}:${subscriptionId}`. Persistent +// storage is the source of truth — the cache exists to avoid a KV round +// trip on hot paths and to survive registrations that arrive before any +// notify happens. class TriggerStateManager { - private credentials = new Map(); - private activeTriggers = new Map>(); + private subscriptions = new Map(); + // Tracks whether we've already loaded all subs for a given connectionId + // from storage, so notify() doesn't issue redundant `list` calls. + private listed = new Set(); private storage: TriggerStorage | null; constructor(storage?: TriggerStorage) { this.storage = storage ?? null; } - getCredentials(connectionId: string): CallbackCredentials | undefined { - return this.credentials.get(connectionId); + private cacheKey(connectionId: string, subscriptionId: string): string { + return `${connectionId}\x1f${subscriptionId}`; + } + + private parseCacheKey( + key: string, + ): { connectionId: string; subscriptionId: string } { + const idx = key.indexOf("\x1f"); + return { + connectionId: key.slice(0, idx), + subscriptionId: key.slice(idx + 1), + }; } - async loadFromStorage(connectionId: string): Promise { - if (!this.storage || this.credentials.has(connectionId)) return; - const state = await this.storage.get(connectionId); - if (state) { - this.credentials.set(connectionId, state.credentials); - this.activeTriggers.set(connectionId, new Set(state.activeTriggerTypes)); + async listForConnection( + connectionId: string, + ): Promise> { + if (!this.listed.has(connectionId) && this.storage) { + const records = await this.storage.list(connectionId); + for (const { subscriptionId, state } of records) { + this.subscriptions.set(this.cacheKey(connectionId, subscriptionId), state); + } + this.listed.add(connectionId); } + const out: Array<{ subscriptionId: string; state: TriggerState }> = []; + const prefix = `${connectionId}\x1f`; + for (const [key, state] of this.subscriptions.entries()) { + if (key.startsWith(prefix)) { + const { subscriptionId } = this.parseCacheKey(key); + out.push({ subscriptionId, state }); + } + } + return out; } async enable( connectionId: string, + subscriptionId: string, triggerType: string, newCredentials?: CallbackCredentials, ): Promise { - if (newCredentials) { - this.credentials.set(connectionId, newCredentials); + const key = this.cacheKey(connectionId, subscriptionId); + const existing = this.subscriptions.get(key); + const credentials = newCredentials ?? existing?.credentials; + if (!credentials) { + // First enable for this subscription must include credentials. + // Without them the callback can't be delivered, so refuse loudly. + throw new Error( + `[Triggers] enable(${connectionId}/${subscriptionId}): credentials required on first registration`, + ); } - - const types = this.activeTriggers.get(connectionId) ?? new Set(); + const types = new Set(existing?.activeTriggerTypes ?? []); types.add(triggerType); - this.activeTriggers.set(connectionId, types); - - await this.persist(connectionId); + const state: TriggerState = { + credentials, + activeTriggerTypes: [...types], + }; + this.subscriptions.set(key, state); + if (this.storage) { + await this.storage.set(connectionId, subscriptionId, state); + } } - async disable(connectionId: string, triggerType: string): Promise { - // Ensure state is loaded (may be empty after restart) - await this.loadFromStorage(connectionId); - const types = this.activeTriggers.get(connectionId); - if (types) { - types.delete(triggerType); - if (types.size === 0) { - this.activeTriggers.delete(connectionId); - this.credentials.delete(connectionId); - await this.storage?.delete(connectionId); - return; + async disable( + connectionId: string, + subscriptionId: string, + triggerType: string, + ): Promise { + const key = this.cacheKey(connectionId, subscriptionId); + let existing = this.subscriptions.get(key); + if (!existing && this.storage) { + existing = (await this.storage.get(connectionId, subscriptionId)) ?? undefined; + if (existing) this.subscriptions.set(key, existing); + } + if (!existing) return; + + const types = new Set(existing.activeTriggerTypes); + types.delete(triggerType); + if (types.size === 0) { + this.subscriptions.delete(key); + if (this.storage) { + await this.storage.delete(connectionId, subscriptionId); } + return; } - - await this.persist(connectionId); - } - - private async persist(connectionId: string): Promise { - if (!this.storage) return; - const creds = this.credentials.get(connectionId); - const types = this.activeTriggers.get(connectionId); - if (!creds || !types || types.size === 0) return; - await this.storage.set(connectionId, { - credentials: creds, + const next: TriggerState = { + credentials: existing.credentials, activeTriggerTypes: [...types], - }); + }; + this.subscriptions.set(key, next); + if (this.storage) { + await this.storage.set(connectionId, subscriptionId, next); + } } } @@ -228,6 +297,8 @@ export function createTriggers( throw new Error("Connection ID not available"); } + const subscriptionId = context.subscriptionId ?? LEGACY_SUBSCRIPTION_ID; + if (context.enabled) { const creds = context.callbackUrl && context.callbackToken @@ -236,9 +307,9 @@ export function createTriggers( callbackToken: context.callbackToken, } : undefined; - await state.enable(connectionId, context.type, creds); + await state.enable(connectionId, subscriptionId, context.type, creds); } else { - await state.disable(connectionId, context.type); + await state.disable(connectionId, subscriptionId, context.type); } return { success: true }; @@ -251,29 +322,27 @@ export function createTriggers( }, notify(connectionId, type, data) { - // Try in-memory first, fall back to storage load - const credentials = state.getCredentials(connectionId); - if (credentials) { - deliverCallback(credentials, type, data); - return; - } - - // Attempt async load from storage (fire-and-forget) + // Fanout: deliver to every subscription on this connection whose + // active types include `type`. Fire-and-forget — failures log but + // don't cascade. state - .loadFromStorage(connectionId) - .then(() => { - const loaded = state.getCredentials(connectionId); - if (loaded) { - deliverCallback(loaded, type, data); - } else { + .listForConnection(connectionId) + .then((records) => { + let delivered = 0; + for (const { state: sub } of records) { + if (!sub.activeTriggerTypes.includes(type)) continue; + deliverCallback(sub.credentials, type, data); + delivered++; + } + if (delivered === 0) { console.log( - `[Triggers] No callback credentials for connection=${connectionId}, skipping notify`, + `[Triggers] No subscriptions for connection=${connectionId} type=${type}, skipping notify`, ); } }) .catch((err) => { console.error( - `[Triggers] Failed to load credentials for ${connectionId}:`, + `[Triggers] Failed to fanout for ${connectionId}/${type}:`, err, ); }); From 2a4e59c5717686d44a6abb3a0e83e07b658c109c Mon Sep 17 00:00:00 2001 From: viktormarinho Date: Fri, 8 May 2026 19:04:51 -0300 Subject: [PATCH 2/2] feat(mesh): persist trigger callback tokens per subscription, cascade on connection delete Each automation trigger now lives as its own row in `trigger_callback_tokens`, keyed on the trigger id. Previously the table was constrained to one row per (connection_id, organization_id) and re-running TRIGGER_CONFIGURE for a sibling subscription overwrote the prior token, leaving older triggers with a stale callback the MCP could no longer authenticate. The `id` of `automation_triggers` is now passed to TRIGGER_CONFIGURE as `subscriptionId` so the MCP can store independent records per registration. `trigger-add` pre-allocates the uuid before calling the MCP so the configure call and the DB insert agree on the same id. `connection/delete` now disables every event trigger bound to the connection on its MCP before removing the connection row. Without this cascade the MCP retained orphaned subscription state and continued trying to deliver to invalidated callback tokens. Migration 077 relaxes the unique index from (connection_id, organization_id) to (subscription_id), keeping a non-unique lookup index on (connection_id, organization_id) for fanout queries. Existing rows are backfilled with subscription_id = id; they remain valid until the next TRIGGER_CONFIGURE replaces them. Co-Authored-By: Claude Opus 4.7 (1M context) --- ...rigger-callback-tokens-per-subscription.ts | 81 +++++++++++ apps/mesh/migrations/index.ts | 3 + apps/mesh/src/storage/automations.ts | 44 +++++- .../src/storage/trigger-callback-tokens.ts | 137 ++++++++++++++---- apps/mesh/src/storage/types.ts | 1 + .../tools/automations/configure-trigger.ts | 47 ++++-- apps/mesh/src/tools/automations/delete.ts | 7 +- .../mesh/src/tools/automations/trigger-add.ts | 15 +- apps/mesh/src/tools/connection/delete.ts | 34 +++++ 9 files changed, 320 insertions(+), 49 deletions(-) create mode 100644 apps/mesh/migrations/077-trigger-callback-tokens-per-subscription.ts diff --git a/apps/mesh/migrations/077-trigger-callback-tokens-per-subscription.ts b/apps/mesh/migrations/077-trigger-callback-tokens-per-subscription.ts new file mode 100644 index 0000000000..9fa0b30171 --- /dev/null +++ b/apps/mesh/migrations/077-trigger-callback-tokens-per-subscription.ts @@ -0,0 +1,81 @@ +/** + * Migration 077: Trigger Callback Tokens — per subscription + * + * Each automation trigger now gets its own callback token row, so we can + * disable a single subscription without invalidating its siblings on the + * same connection. Adds `subscription_id` (= automation_triggers.id) and + * relaxes the unique index from `(connection_id, organization_id)` to + * `subscription_id` alone. + * + * Existing rows get backfilled with a placeholder subscription_id derived + * from their primary key — they remain validatable until the next + * trigger toggle, which writes a fresh row keyed by the real trigger id. + */ + +import type { Kysely } from "kysely"; +import { sql } from "kysely"; + +export async function up(db: Kysely): Promise { + // 1. Add subscription_id, nullable initially so we can backfill. + await db.schema + .alterTable("trigger_callback_tokens") + .addColumn("subscription_id", "text") + .execute(); + + // 2. Backfill: existing rows aren't tied to a specific trigger id, so + // use the row's own primary key as a self-referential placeholder. + // Not strictly correct, but keeps the rows queryable until the next + // TRIGGER_CONFIGURE replaces them with proper subscription ids. + await sql`UPDATE trigger_callback_tokens SET subscription_id = id WHERE subscription_id IS NULL`.execute( + db, + ); + + // 3. Make NOT NULL now that everything is populated. + await db.schema + .alterTable("trigger_callback_tokens") + .alterColumn("subscription_id", (col) => col.setNotNull()) + .execute(); + + // 4. Drop the old unique index that prevented multi-row. + await db.schema + .dropIndex("idx_trigger_callback_tokens_connection_org") + .execute(); + + // 5. New unique index on subscription_id — globally unique because + // automation trigger ids are uuids. + await db.schema + .createIndex("idx_trigger_callback_tokens_subscription") + .on("trigger_callback_tokens") + .columns(["subscription_id"]) + .unique() + .execute(); + + // 6. Keep a non-unique lookup index on (connection_id, organization_id) + // for fanout queries during trigger callback validation. + await db.schema + .createIndex("idx_trigger_callback_tokens_connection_org_lookup") + .on("trigger_callback_tokens") + .columns(["connection_id", "organization_id"]) + .execute(); +} + +export async function down(db: Kysely): Promise { + await db.schema + .dropIndex("idx_trigger_callback_tokens_connection_org_lookup") + .execute(); + await db.schema + .dropIndex("idx_trigger_callback_tokens_subscription") + .execute(); + // Restore the original unique index. Note: this may fail if multi-row + // data exists from the new code path — `down` is best-effort. + await db.schema + .createIndex("idx_trigger_callback_tokens_connection_org") + .on("trigger_callback_tokens") + .columns(["connection_id", "organization_id"]) + .unique() + .execute(); + await db.schema + .alterTable("trigger_callback_tokens") + .dropColumn("subscription_id") + .execute(); +} diff --git a/apps/mesh/migrations/index.ts b/apps/mesh/migrations/index.ts index 23f3bf7b97..573615fb19 100644 --- a/apps/mesh/migrations/index.ts +++ b/apps/mesh/migrations/index.ts @@ -75,6 +75,7 @@ import * as migration073backfillbasicusageroles from "./073-backfill-basic-usage import * as migration074sandboxrunnerstatehandlenonunique from "./074-sandbox-runner-state-handle-nonunique.ts"; import * as migration075threadinflightasyncjobs from "./075-thread-inflight-async-jobs.ts"; import * as migration076automationsdropagentjson from "./076-automations-drop-agent-json.ts"; +import * as migration077triggercallbacktokenspersubscription from "./077-trigger-callback-tokens-per-subscription.ts"; /** * Core migrations for the Mesh application. @@ -165,6 +166,8 @@ const migrations: Record = { migration074sandboxrunnerstatehandlenonunique, "075-thread-inflight-async-jobs": migration075threadinflightasyncjobs, "076-automations-drop-agent-json": migration076automationsdropagentjson, + "077-trigger-callback-tokens-per-subscription": + migration077triggercallbacktokenspersubscription, }; export default migrations; diff --git a/apps/mesh/src/storage/automations.ts b/apps/mesh/src/storage/automations.ts index 1a238afe91..b09ea23c12 100644 --- a/apps/mesh/src/storage/automations.ts +++ b/apps/mesh/src/storage/automations.ts @@ -36,6 +36,13 @@ export interface UpdateAutomationInput { } export interface CreateTriggerInput { + /** + * Optional pre-allocated id. Callers that need to reference the trigger + * before insertion (e.g. event triggers calling `configureTriggerOnMcp` + * with a stable subscriptionId) generate the uuid upfront and pass it + * here so the configure call and the insert agree on the id. + */ + id?: string; automation_id: string; type: "cron" | "event"; cron_expression?: string | null; @@ -77,6 +84,15 @@ export interface AutomationsStorage { automationId: string, ): Promise<{ success: boolean }>; listTriggers(automationId: string): Promise; + /** + * List every trigger bound to a connection in an organization. Used + * during connection deletion to disable each subscription on the MCP + * before the connection record disappears. + */ + listTriggersByConnection( + connectionId: string, + organizationId: string, + ): Promise; findTriggerById(triggerId: string): Promise; findActiveEventTriggers( connectionId: string, @@ -335,7 +351,7 @@ class KyselyAutomationsStorage implements AutomationsStorage { async addTrigger(input: CreateTriggerInput): Promise { const now = new Date().toISOString(); - const id = crypto.randomUUID(); + const id = input.id ?? crypto.randomUUID(); const row = { id, @@ -383,6 +399,32 @@ class KyselyAutomationsStorage implements AutomationsStorage { return rows.map(triggerFromDbRow); } + async listTriggersByConnection( + connectionId: string, + organizationId: string, + ): Promise { + const rows = await this.db + .selectFrom("automation_triggers as t") + .innerJoin("automations as a", "a.id", "t.automation_id") + .select([ + "t.id", + "t.automation_id", + "t.type", + "t.cron_expression", + "t.connection_id", + "t.event_type", + "t.params", + "t.last_run_at", + "t.next_run_at", + "t.created_at", + ]) + .where("t.connection_id", "=", connectionId) + .where("a.organization_id", "=", organizationId) + .execute(); + + return rows.map(triggerFromDbRow); + } + async findTriggerById(triggerId: string): Promise { const row = await this.db .selectFrom("automation_triggers") diff --git a/apps/mesh/src/storage/trigger-callback-tokens.ts b/apps/mesh/src/storage/trigger-callback-tokens.ts index 4ad608e4a4..088bb289fa 100644 --- a/apps/mesh/src/storage/trigger-callback-tokens.ts +++ b/apps/mesh/src/storage/trigger-callback-tokens.ts @@ -4,6 +4,12 @@ * Manages opaque callback tokens that external MCPs use to authenticate * trigger callbacks to Mesh. Tokens are stored as SHA-256 hashes; * plaintext is only returned once at creation time. + * + * Each row keys on `subscription_id` (= `automation_triggers.id`) so a + * single connection can host many independent subscriptions, each with + * its own token. Token validation still resolves to (orgId, connId) + * because the trigger callback endpoint fans out by `(connection, type)` + * downstream of token validation. */ import type { Kysely } from "kysely"; @@ -38,40 +44,65 @@ export interface TriggerCallbackTokenStorage { generateTokenPair(): Promise; /** - * Persist a token hash for a connection+organization pair. - * Upserts — safe for concurrent calls. + * Persist a token hash for a specific subscription. Upserts on + * `subscription_id`, so re-running TRIGGER_CONFIGURE for the same + * subscription rotates the token cleanly without orphaning siblings + * on the same connection. */ - persistTokenHash( - organizationId: string, - connectionId: string, - tokenHash: string, - ): Promise; + persistTokenHash(args: { + organizationId: string; + connectionId: string; + subscriptionId: string; + tokenHash: string; + }): Promise; /** - * Create or rotate a callback token for a connection+organization pair. - * Returns the plaintext token (only available at creation time). - * Convenience method that combines generateTokenPair + persistTokenHash. + * Create or rotate a callback token for a subscription. Returns the + * plaintext token (only available at creation time). */ - createOrRotateToken( - organizationId: string, - connectionId: string, - ): Promise; + createOrRotateToken(args: { + organizationId: string; + connectionId: string; + subscriptionId: string; + }): Promise; /** - * Validate a callback token. - * Returns connection and org context if valid, null otherwise. + * Validate a callback token. Returns the connection + org context for + * the row that owns this token, or null if no row matches. */ validateToken( token: string, - ): Promise<{ organizationId: string; connectionId: string } | null>; + ): Promise<{ + organizationId: string; + connectionId: string; + subscriptionId: string; + } | null>; + + /** + * Delete the callback token for one specific subscription. + */ + deleteBySubscription(subscriptionId: string): Promise; /** - * Delete callback token for a connection+organization pair. + * Delete every callback token attached to a connection. Used during + * connection deletion to garbage-collect every subscription that was + * bound to it. */ deleteByConnection( connectionId: string, organizationId: string, ): Promise; + + /** + * List every (subscription, connection) pair tied to a connection so + * the caller can iterate them — e.g. to send TRIGGER_CONFIGURE + * disable to the MCP for each subscription before the connection + * itself is deleted. + */ + listByConnection( + connectionId: string, + organizationId: string, + ): Promise>; } export class KyselyTriggerCallbackTokenStorage @@ -85,11 +116,17 @@ export class KyselyTriggerCallbackTokenStorage return { plaintext, hash }; } - async persistTokenHash( - organizationId: string, - connectionId: string, - tokenHash: string, - ): Promise { + async persistTokenHash({ + organizationId, + connectionId, + subscriptionId, + tokenHash, + }: { + organizationId: string; + connectionId: string; + subscriptionId: string; + tokenHash: string; + }): Promise { const id = crypto.randomUUID(); await this.db .insertInto("trigger_callback_tokens") @@ -97,35 +134,52 @@ export class KyselyTriggerCallbackTokenStorage id, organization_id: organizationId, connection_id: connectionId, + subscription_id: subscriptionId, token_hash: tokenHash, created_at: new Date().toISOString(), }) .onConflict((oc) => - oc.columns(["connection_id", "organization_id"]).doUpdateSet({ + oc.columns(["subscription_id"]).doUpdateSet({ id, + organization_id: organizationId, + connection_id: connectionId, token_hash: tokenHash, }), ) .execute(); } - async createOrRotateToken( - organizationId: string, - connectionId: string, - ): Promise { + async createOrRotateToken({ + organizationId, + connectionId, + subscriptionId, + }: { + organizationId: string; + connectionId: string; + subscriptionId: string; + }): Promise { const { plaintext, hash } = await this.generateTokenPair(); - await this.persistTokenHash(organizationId, connectionId, hash); + await this.persistTokenHash({ + organizationId, + connectionId, + subscriptionId, + tokenHash: hash, + }); return plaintext; } async validateToken( token: string, - ): Promise<{ organizationId: string; connectionId: string } | null> { + ): Promise<{ + organizationId: string; + connectionId: string; + subscriptionId: string; + } | null> { const tokenHash = await hashToken(token); const row = await this.db .selectFrom("trigger_callback_tokens") - .select(["organization_id", "connection_id"]) + .select(["organization_id", "connection_id", "subscription_id"]) .where("token_hash", "=", tokenHash) .executeTakeFirst(); @@ -134,9 +188,17 @@ export class KyselyTriggerCallbackTokenStorage return { organizationId: row.organization_id, connectionId: row.connection_id, + subscriptionId: row.subscription_id, }; } + async deleteBySubscription(subscriptionId: string): Promise { + await this.db + .deleteFrom("trigger_callback_tokens") + .where("subscription_id", "=", subscriptionId) + .execute(); + } + async deleteByConnection( connectionId: string, organizationId: string, @@ -147,4 +209,17 @@ export class KyselyTriggerCallbackTokenStorage .where("organization_id", "=", organizationId) .execute(); } + + async listByConnection( + connectionId: string, + organizationId: string, + ): Promise> { + const rows = await this.db + .selectFrom("trigger_callback_tokens") + .select(["subscription_id"]) + .where("connection_id", "=", connectionId) + .where("organization_id", "=", organizationId) + .execute(); + return rows.map((r) => ({ subscriptionId: r.subscription_id })); + } } diff --git a/apps/mesh/src/storage/types.ts b/apps/mesh/src/storage/types.ts index 5a8c3c1ee2..64ffc80274 100644 --- a/apps/mesh/src/storage/types.ts +++ b/apps/mesh/src/storage/types.ts @@ -996,6 +996,7 @@ export interface TriggerCallbackTokenTable { id: string; organization_id: string; connection_id: string; + subscription_id: string; token_hash: string; created_at: ColumnType; } diff --git a/apps/mesh/src/tools/automations/configure-trigger.ts b/apps/mesh/src/tools/automations/configure-trigger.ts index d5032d7e84..304f84b345 100644 --- a/apps/mesh/src/tools/automations/configure-trigger.ts +++ b/apps/mesh/src/tools/automations/configure-trigger.ts @@ -1,9 +1,20 @@ /** * Shared helper for configuring triggers on MCP connections. * - * Calls TRIGGER_CONFIGURE on the target connection to enable/disable - * an event trigger. When enabling, generates a callback token and URL - * so the external MCP can call back to Mesh when the trigger fires. + * Calls TRIGGER_CONFIGURE on the target connection to enable/disable an + * event trigger. Each trigger is identified by `trigger.id`, which is + * passed to the MCP as `subscriptionId` so multiple subscriptions can + * coexist on the same `(connection, event_type)` without overwriting + * each other's callback credentials. + * + * Persistence and the MCP call are kept in sync with two-phase commits: + * - On enable: generate a token pair, call TRIGGER_CONFIGURE, then + * persist the hash. On timeout we still persist (the MCP may have + * accepted) so future callbacks can authenticate. On a definitive + * error we skip persistence. + * - On disable: call TRIGGER_CONFIGURE, then delete the token row by + * subscription. Sibling subscriptions on the same connection are + * untouched. */ import type { MeshContext } from "@/core/mesh-context"; @@ -22,12 +33,20 @@ export async function configureTriggerOnMcp( if (trigger.type !== "event" || !trigger.connection_id) return { success: true }; + if (!trigger.id) { + return { + success: false, + error: "trigger.id required to configure subscription on MCP", + }; + } + const connection = await ctx.storage.connections.findById( trigger.connection_id, ); if (!connection) return { success: true }; // Connection may have been deleted const organizationId = ctx.organization?.id; + const subscriptionId = trigger.id; try { const mcpClient = await clientFromConnection(connection, ctx, true); @@ -61,6 +80,7 @@ export async function configureTriggerOnMcp( enabled, callbackUrl, callbackToken, + subscriptionId, }), timeoutPromise, ]); @@ -68,11 +88,12 @@ export async function configureTriggerOnMcp( if (timedOut && enabled && tokenStorage && organizationId && tokenHash) { // Timeout is ambiguous — the MCP may still accept the token. // Persist the hash so future callbacks can authenticate. - await tokenStorage.persistTokenHash( + await tokenStorage.persistTokenHash({ organizationId, - trigger.connection_id, + connectionId: trigger.connection_id, + subscriptionId, tokenHash, - ); + }); } // On definitive (non-timeout) failure, skip persistence — // the MCP rejected the call, old token (if any) is still valid. @@ -84,17 +105,15 @@ export async function configureTriggerOnMcp( // creates the trigger record (the MCP is already listening). try { if (enabled && tokenStorage && organizationId && tokenHash) { - await tokenStorage.persistTokenHash( + await tokenStorage.persistTokenHash({ organizationId, - trigger.connection_id, + connectionId: trigger.connection_id, + subscriptionId, tokenHash, - ); + }); } - if (!enabled && tokenStorage && organizationId) { - await tokenStorage.deleteByConnection( - trigger.connection_id, - organizationId, - ); + if (!enabled && tokenStorage) { + await tokenStorage.deleteBySubscription(subscriptionId); } } catch (dbErr) { console.error( diff --git a/apps/mesh/src/tools/automations/delete.ts b/apps/mesh/src/tools/automations/delete.ts index 06663e3079..691826c92a 100644 --- a/apps/mesh/src/tools/automations/delete.ts +++ b/apps/mesh/src/tools/automations/delete.ts @@ -52,7 +52,12 @@ export const AUTOMATION_DELETE = defineTool({ await Promise.allSettled( eventTriggers.map(async (trigger) => { - const result = await configureTriggerOnMcp(ctx, trigger, false); + const result = await configureTriggerOnMcp( + ctx, + trigger, + false, + ctx.storage.triggerCallbackTokens, + ); if (!result.success) { console.warn( `Failed to disable trigger ${trigger.id}: ${result.error}`, diff --git a/apps/mesh/src/tools/automations/trigger-add.ts b/apps/mesh/src/tools/automations/trigger-add.ts index 343f84db97..d07784b669 100644 --- a/apps/mesh/src/tools/automations/trigger-add.ts +++ b/apps/mesh/src/tools/automations/trigger-add.ts @@ -71,6 +71,11 @@ export const AUTOMATION_TRIGGER_ADD = defineTool({ } } + // Cron triggers don't need a pre-allocated id; let the storage layer + // generate one. Event triggers need it to call TRIGGER_CONFIGURE + // before the row exists. + let pendingTriggerId: string | undefined; + if (input.type === "event") { if (!input.connection_id) { throw new Error("connection_id is required for event triggers"); @@ -88,9 +93,13 @@ export const AUTOMATION_TRIGGER_ADD = defineTool({ throw new Error("Connection not found"); } - // Build a temporary trigger object for configureTriggerOnMcp + // Pre-allocate the trigger id so configureTriggerOnMcp can pass + // it as `subscriptionId`. The same id is reused on the addTrigger + // insert below — keeps the MCP-side subscription record and the + // DB row in lockstep. + const triggerId = crypto.randomUUID(); const tempTrigger = { - id: "", + id: triggerId, automation_id: input.automation_id, type: "event" as const, cron_expression: null, @@ -114,6 +123,7 @@ export const AUTOMATION_TRIGGER_ADD = defineTool({ `Failed to configure trigger on connection: ${result.error}`, ); } + pendingTriggerId = triggerId; } // Compute next_run_at for cron triggers @@ -127,6 +137,7 @@ export const AUTOMATION_TRIGGER_ADD = defineTool({ // Insert trigger record const trigger = await ctx.storage.automations.addTrigger({ + id: pendingTriggerId, automation_id: input.automation_id, type: input.type, cron_expression: input.type === "cron" ? input.cron_expression : null, diff --git a/apps/mesh/src/tools/connection/delete.ts b/apps/mesh/src/tools/connection/delete.ts index 6e0a311fd8..3c6dda1887 100644 --- a/apps/mesh/src/tools/connection/delete.ts +++ b/apps/mesh/src/tools/connection/delete.ts @@ -91,6 +91,40 @@ export const COLLECTION_CONNECTIONS_DELETE = defineTool({ } } + // Cascade trigger subscriptions on the MCP side BEFORE deleting the + // connection. Once the connection row is gone we can no longer build + // an MCP client for it (configureTriggerOnMcp short-circuits on a + // missing connection), so the disable signal would never reach the + // MCP and its KV would accumulate orphaned trigger state. + // + // Imported dynamically to avoid a module evaluation cycle between + // tools/index.ts and tools/automations/configure-trigger.ts. + const boundTriggers = await ctx.storage.automations.listTriggersByConnection( + input.id, + organization.id, + ); + const eventTriggers = boundTriggers.filter((t) => t.type === "event"); + if (eventTriggers.length > 0) { + const { configureTriggerOnMcp } = await import( + "../automations/configure-trigger" + ); + await Promise.allSettled( + eventTriggers.map(async (trigger) => { + const result = await configureTriggerOnMcp( + ctx, + trigger, + false, + ctx.storage.triggerCallbackTokens, + ); + if (!result.success) { + console.warn( + `[connection/delete] Failed to disable trigger ${trigger.id} on MCP: ${result.error}`, + ); + } + }), + ); + } + // Delete connection await ctx.storage.connections.delete(input.id);