Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Migration 077: Trigger Callback Tokens — per subscription
*
* Each automation trigger now gets its own callback token row, so we can
* disable a single subscription without invalidating its siblings on the
* same connection. Adds `subscription_id` (= automation_triggers.id) and
* relaxes the unique index from `(connection_id, organization_id)` to
* `subscription_id` alone.
*
* Existing rows get backfilled with a placeholder subscription_id derived
* from their primary key — they remain validatable until the next
* trigger toggle, which writes a fresh row keyed by the real trigger id.
*/

import type { Kysely } from "kysely";
import { sql } from "kysely";

export async function up(db: Kysely<unknown>): Promise<void> {
// 1. Add subscription_id, nullable initially so we can backfill.
await db.schema
.alterTable("trigger_callback_tokens")
.addColumn("subscription_id", "text")
.execute();

// 2. Backfill: existing rows aren't tied to a specific trigger id, so
// use the row's own primary key as a self-referential placeholder.
// Not strictly correct, but keeps the rows queryable until the next
// TRIGGER_CONFIGURE replaces them with proper subscription ids.
await sql`UPDATE trigger_callback_tokens SET subscription_id = id WHERE subscription_id IS NULL`.execute(
db,
);

// 3. Make NOT NULL now that everything is populated.
await db.schema
.alterTable("trigger_callback_tokens")
.alterColumn("subscription_id", (col) => col.setNotNull())
.execute();

// 4. Drop the old unique index that prevented multi-row.
await db.schema
.dropIndex("idx_trigger_callback_tokens_connection_org")
.execute();

// 5. New unique index on subscription_id — globally unique because
// automation trigger ids are uuids.
await db.schema
.createIndex("idx_trigger_callback_tokens_subscription")
.on("trigger_callback_tokens")
.columns(["subscription_id"])
.unique()
.execute();

// 6. Keep a non-unique lookup index on (connection_id, organization_id)
// for fanout queries during trigger callback validation.
await db.schema
.createIndex("idx_trigger_callback_tokens_connection_org_lookup")
.on("trigger_callback_tokens")
.columns(["connection_id", "organization_id"])
.execute();
}

export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema
.dropIndex("idx_trigger_callback_tokens_connection_org_lookup")
.execute();
await db.schema
.dropIndex("idx_trigger_callback_tokens_subscription")
.execute();
// Restore the original unique index. Note: this may fail if multi-row
// data exists from the new code path — `down` is best-effort.
await db.schema
.createIndex("idx_trigger_callback_tokens_connection_org")
.on("trigger_callback_tokens")
.columns(["connection_id", "organization_id"])
.unique()
.execute();
await db.schema
.alterTable("trigger_callback_tokens")
.dropColumn("subscription_id")
.execute();
}
3 changes: 3 additions & 0 deletions apps/mesh/migrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ import * as migration073backfillbasicusageroles from "./073-backfill-basic-usage
import * as migration074sandboxrunnerstatehandlenonunique from "./074-sandbox-runner-state-handle-nonunique.ts";
import * as migration075threadinflightasyncjobs from "./075-thread-inflight-async-jobs.ts";
import * as migration076automationsdropagentjson from "./076-automations-drop-agent-json.ts";
import * as migration077triggercallbacktokenspersubscription from "./077-trigger-callback-tokens-per-subscription.ts";

/**
* Core migrations for the Mesh application.
Expand Down Expand Up @@ -165,6 +166,8 @@ const migrations: Record<string, Migration> = {
migration074sandboxrunnerstatehandlenonunique,
"075-thread-inflight-async-jobs": migration075threadinflightasyncjobs,
"076-automations-drop-agent-json": migration076automationsdropagentjson,
"077-trigger-callback-tokens-per-subscription":
migration077triggercallbacktokenspersubscription,
};

export default migrations;
44 changes: 43 additions & 1 deletion apps/mesh/src/storage/automations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ export interface UpdateAutomationInput {
}

export interface CreateTriggerInput {
/**
* Optional pre-allocated id. Callers that need to reference the trigger
* before insertion (e.g. event triggers calling `configureTriggerOnMcp`
* with a stable subscriptionId) generate the uuid upfront and pass it
* here so the configure call and the insert agree on the id.
*/
id?: string;
automation_id: string;
type: "cron" | "event";
cron_expression?: string | null;
Expand Down Expand Up @@ -77,6 +84,15 @@ export interface AutomationsStorage {
automationId: string,
): Promise<{ success: boolean }>;
listTriggers(automationId: string): Promise<AutomationTrigger[]>;
/**
* List every trigger bound to a connection in an organization. Used
* during connection deletion to disable each subscription on the MCP
* before the connection record disappears.
*/
listTriggersByConnection(
connectionId: string,
organizationId: string,
): Promise<AutomationTrigger[]>;
findTriggerById(triggerId: string): Promise<AutomationTrigger | null>;
findActiveEventTriggers(
connectionId: string,
Expand Down Expand Up @@ -335,7 +351,7 @@ class KyselyAutomationsStorage implements AutomationsStorage {

async addTrigger(input: CreateTriggerInput): Promise<AutomationTrigger> {
const now = new Date().toISOString();
const id = crypto.randomUUID();
const id = input.id ?? crypto.randomUUID();

const row = {
id,
Expand Down Expand Up @@ -383,6 +399,32 @@ class KyselyAutomationsStorage implements AutomationsStorage {
return rows.map(triggerFromDbRow);
}

async listTriggersByConnection(
connectionId: string,
organizationId: string,
): Promise<AutomationTrigger[]> {
const rows = await this.db
.selectFrom("automation_triggers as t")
.innerJoin("automations as a", "a.id", "t.automation_id")
.select([
"t.id",
"t.automation_id",
"t.type",
"t.cron_expression",
"t.connection_id",
"t.event_type",
"t.params",
"t.last_run_at",
"t.next_run_at",
"t.created_at",
])
.where("t.connection_id", "=", connectionId)
.where("a.organization_id", "=", organizationId)
.execute();

return rows.map(triggerFromDbRow);
}

async findTriggerById(triggerId: string): Promise<AutomationTrigger | null> {
const row = await this.db
.selectFrom("automation_triggers")
Expand Down
137 changes: 106 additions & 31 deletions apps/mesh/src/storage/trigger-callback-tokens.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
* Manages opaque callback tokens that external MCPs use to authenticate
* trigger callbacks to Mesh. Tokens are stored as SHA-256 hashes;
* plaintext is only returned once at creation time.
*
* Each row keys on `subscription_id` (= `automation_triggers.id`) so a
* single connection can host many independent subscriptions, each with
* its own token. Token validation still resolves to (orgId, connId)
* because the trigger callback endpoint fans out by `(connection, type)`
* downstream of token validation.
*/

import type { Kysely } from "kysely";
Expand Down Expand Up @@ -38,40 +44,65 @@ export interface TriggerCallbackTokenStorage {
generateTokenPair(): Promise<TokenPair>;

/**
* Persist a token hash for a connection+organization pair.
* Upserts — safe for concurrent calls.
* Persist a token hash for a specific subscription. Upserts on
* `subscription_id`, so re-running TRIGGER_CONFIGURE for the same
* subscription rotates the token cleanly without orphaning siblings
* on the same connection.
*/
persistTokenHash(
organizationId: string,
connectionId: string,
tokenHash: string,
): Promise<void>;
persistTokenHash(args: {
organizationId: string;
connectionId: string;
subscriptionId: string;
tokenHash: string;
}): Promise<void>;

/**
* Create or rotate a callback token for a connection+organization pair.
* Returns the plaintext token (only available at creation time).
* Convenience method that combines generateTokenPair + persistTokenHash.
* Create or rotate a callback token for a subscription. Returns the
* plaintext token (only available at creation time).
*/
createOrRotateToken(
organizationId: string,
connectionId: string,
): Promise<string>;
createOrRotateToken(args: {
organizationId: string;
connectionId: string;
subscriptionId: string;
}): Promise<string>;

/**
* Validate a callback token.
* Returns connection and org context if valid, null otherwise.
* Validate a callback token. Returns the connection + org context for
* the row that owns this token, or null if no row matches.
*/
validateToken(
token: string,
): Promise<{ organizationId: string; connectionId: string } | null>;
): Promise<{
organizationId: string;
connectionId: string;
subscriptionId: string;
} | null>;

/**
* Delete the callback token for one specific subscription.
*/
deleteBySubscription(subscriptionId: string): Promise<void>;

/**
* Delete callback token for a connection+organization pair.
* Delete every callback token attached to a connection. Used during
* connection deletion to garbage-collect every subscription that was
* bound to it.
*/
deleteByConnection(
connectionId: string,
organizationId: string,
): Promise<void>;

/**
* List every (subscription, connection) pair tied to a connection so
* the caller can iterate them — e.g. to send TRIGGER_CONFIGURE
* disable to the MCP for each subscription before the connection
* itself is deleted.
*/
listByConnection(
connectionId: string,
organizationId: string,
): Promise<Array<{ subscriptionId: string }>>;
}

export class KyselyTriggerCallbackTokenStorage
Expand All @@ -85,47 +116,70 @@ export class KyselyTriggerCallbackTokenStorage
return { plaintext, hash };
}

async persistTokenHash(
organizationId: string,
connectionId: string,
tokenHash: string,
): Promise<void> {
async persistTokenHash({
organizationId,
connectionId,
subscriptionId,
tokenHash,
}: {
organizationId: string;
connectionId: string;
subscriptionId: string;
tokenHash: string;
}): Promise<void> {
const id = crypto.randomUUID();
await this.db
.insertInto("trigger_callback_tokens")
.values({
id,
organization_id: organizationId,
connection_id: connectionId,
subscription_id: subscriptionId,
token_hash: tokenHash,
created_at: new Date().toISOString(),
})
.onConflict((oc) =>
oc.columns(["connection_id", "organization_id"]).doUpdateSet({
oc.columns(["subscription_id"]).doUpdateSet({
id,
organization_id: organizationId,
connection_id: connectionId,
token_hash: tokenHash,
}),
)
.execute();
}

async createOrRotateToken(
organizationId: string,
connectionId: string,
): Promise<string> {
async createOrRotateToken({
organizationId,
connectionId,
subscriptionId,
}: {
organizationId: string;
connectionId: string;
subscriptionId: string;
}): Promise<string> {
const { plaintext, hash } = await this.generateTokenPair();
await this.persistTokenHash(organizationId, connectionId, hash);
await this.persistTokenHash({
organizationId,
connectionId,
subscriptionId,
tokenHash: hash,
});
return plaintext;
}

async validateToken(
token: string,
): Promise<{ organizationId: string; connectionId: string } | null> {
): Promise<{
organizationId: string;
connectionId: string;
subscriptionId: string;
} | null> {
const tokenHash = await hashToken(token);

const row = await this.db
.selectFrom("trigger_callback_tokens")
.select(["organization_id", "connection_id"])
.select(["organization_id", "connection_id", "subscription_id"])
.where("token_hash", "=", tokenHash)
.executeTakeFirst();

Expand All @@ -134,9 +188,17 @@ export class KyselyTriggerCallbackTokenStorage
return {
organizationId: row.organization_id,
connectionId: row.connection_id,
subscriptionId: row.subscription_id,
};
}

async deleteBySubscription(subscriptionId: string): Promise<void> {
await this.db
.deleteFrom("trigger_callback_tokens")
.where("subscription_id", "=", subscriptionId)
.execute();
}

async deleteByConnection(
connectionId: string,
organizationId: string,
Expand All @@ -147,4 +209,17 @@ export class KyselyTriggerCallbackTokenStorage
.where("organization_id", "=", organizationId)
.execute();
}

async listByConnection(
connectionId: string,
organizationId: string,
): Promise<Array<{ subscriptionId: string }>> {
const rows = await this.db
.selectFrom("trigger_callback_tokens")
.select(["subscription_id"])
.where("connection_id", "=", connectionId)
.where("organization_id", "=", organizationId)
.execute();
return rows.map((r) => ({ subscriptionId: r.subscription_id }));
}
}
1 change: 1 addition & 0 deletions apps/mesh/src/storage/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,7 @@ export interface TriggerCallbackTokenTable {
id: string;
organization_id: string;
connection_id: string;
subscription_id: string;
token_hash: string;
created_at: ColumnType<Date, Date | string, never>;
}
Expand Down
Loading
Loading