diff --git a/slack-mcp/server/llm.ts b/slack-mcp/server/llm.ts index 084fa021..8185e3f4 100644 --- a/slack-mcp/server/llm.ts +++ b/slack-mcp/server/llm.ts @@ -1,25 +1,22 @@ /** * LLM Module - AI Agent Integration for Slack MCP * - * The Slack webhook path lives OUTSIDE the @decocms/runtime request context - * (no JWT, no per-request bindings resolution), so the AgentOf() binding - * proxy cannot be picked up from `env.MESH_REQUEST_CONTEXT.state.AGENT`. + * The Slack webhook path runs OUTSIDE @decocms/runtime's request context + * (no JWT, no per-request binding resolution), so the AgentOf() proxy + * cannot be picked up from `env.MESH_REQUEST_CONTEXT.state.AGENT`. We + * also cannot use the runtime's `streamAgent`, which targets + * `/decopilot/runtime/stream` — that endpoint is the "resume a task" + * path and requires a pre-existing `taskId`. * - * Instead we construct an agent client by hand, using the persisted - * `meshApiKey` + `organizationSlug` + `agentId` from Supabase, and call - * `streamAgent` from @decocms/runtime/decopilot — which is the exact same - * machinery the binding proxy uses internally (POST to - * `/api//decopilot/runtime/stream` with AI SDK UIMessage chunks). - * - * This makes the slack-mcp behave identically to a runtime-resolved binding. + * Instead we call the user-facing chat endpoint `/decopilot/stream` + * directly, using the persisted `meshApiKey` for auth, and parse the + * custom SSE stream the endpoint emits (data: { type, text/delta, ... }). + * We deliberately omit `thread_id`: the endpoint rejects any id that + * was not minted by studio, and we already rebuild conversation context + * from Slack history on every webhook (`buildContextMessages` → 1 system + * message), so the agent stays coherent without depending on + * decopilot-side thread memory. */ -import { - streamAgent, - type AgentBindingConfig, - type AgentStreamParams, - type ResolvedAgentClient, -} from "@decocms/runtime/decopilot"; - import { getCachedConnectionConfig } from "./lib/config-cache.ts"; // ============================================================================ @@ -39,21 +36,35 @@ export interface SlackChatMessage { images?: MessageImage[]; } +interface UIMessagePart { + type: string; + text?: string; + url?: string; + filename?: string; + mediaType?: string; +} + +interface UIMessageLike { + role: string; + parts: UIMessagePart[]; +} + +interface AgentClient { + STREAM: (params: { + messages: UIMessageLike[]; + toolApprovalLevel?: "auto" | "readonly" | "plan"; + }) => Promise< + AsyncIterable<{ parts: Array<{ type: string; text?: string }> }> + >; +} + // ============================================================================ // Agent client // ============================================================================ -/** - * Build a ResolvedAgentClient for a given connection. - * - * Mirrors what the runtime's `createAgentProxy` builds for resolved bindings, - * but constructed manually because the Slack webhook path has no per-request - * runtime context. Uses the persisted `meshApiKey` so the call is always - * authenticated without depending on a session token. - */ async function getAgentClient( connectionId: string, -): Promise { +): Promise { const config = await getCachedConnectionConfig(connectionId); const token = config?.meshApiKey ?? config?.meshToken; const orgSlug = config?.organizationSlug ?? config?.organizationId; @@ -61,32 +72,94 @@ async function getAgentClient( return null; } - const streamUrl = `${config.meshUrl}/api/${orgSlug}/decopilot/runtime/stream`; - const agentConfig: AgentBindingConfig = { - __type: "@deco/agent", - id: config.agentId, - }; + const { meshUrl, agentId } = config; + const url = `${meshUrl}/api/${orgSlug}/decopilot/stream`; return { - STREAM: async (params, opts) => { - // We deliberately strip `thread_id` from the request: the decopilot - // endpoint does not auto-create threads — passing an unknown id (the - // user's name, a temp `-`, anything not minted by studio) - // results in 500 "Thread not found". Without a thread_id, the - // decopilot allocates a fresh thread per call. Slack-side context - // is already rebuilt every webhook via `buildContextMessages` - // (channel/thread history → 1 system message), so the agent stays - // coherent within a conversation without depending on decopilot's - // own thread memory. Per-person decopilot memory would require us - // to call a thread-create API and cache the returned id — out of - // scope here. - const { thread_id: _ignored, ...rest } = params; - console.log(`[LLM] streamAgent ${streamUrl} (no thread_id)`); - return await streamAgent(streamUrl, token, agentConfig, rest, opts); + STREAM: async (params) => { + console.log(`[LLM] POST ${url}`); + const response = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${token}`, + Accept: "application/json, text/event-stream", + }, + body: JSON.stringify({ + messages: params.messages, + agent: { id: agentId }, + stream: true, + toolApprovalLevel: params.toolApprovalLevel ?? "auto", + }), + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error( + `decopilot stream failed (${response.status}): ${errorText}`, + ); + } + + return sseResponseToAsyncIterable(response); }, }; } +/** + * Parse the decopilot's custom SSE stream into the same shape the + * AI SDK `readUIMessageStream` would produce: one yield carrying a + * `parts: [{ type: "text", text }]` array with the accumulated text. + * Tool-call events reset the buffer so we never surface them to Slack. + */ +async function* sseResponseToAsyncIterable( + response: Response, +): AsyncGenerator<{ parts: Array<{ type: string; text?: string }> }> { + const reader = response.body!.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + let textContent = ""; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed || !trimmed.startsWith("data:")) continue; + const data = trimmed.slice("data:".length).trim(); + if (!data || data === "[DONE]") continue; + + try { + const event = JSON.parse(data); + if (event.type === "text-delta" && event.delta) { + textContent += event.delta; + } else if (event.type === "text" && event.text) { + textContent += event.text; + } else if ( + event.type === "tool-call" || + event.type === "tool-input-start" + ) { + textContent = ""; + } else if (event.type === "finish") { + break; + } + } catch { + // ignore parse errors + } + } + } + } finally { + reader.releaseLock(); + } + + yield { parts: [{ type: "text", text: textContent }] }; +} + // ============================================================================ // Public API // ============================================================================ @@ -111,12 +184,10 @@ export async function isAgentAvailableAsync( } /** - * Convert SlackChatMessage[] to the UIMessage format expected by - * @decocms/runtime/decopilot's `streamAgent`. + * Convert SlackChatMessage[] to the UIMessage-like shape the decopilot + * `/stream` endpoint accepts. */ -function toUIMessages( - messages: SlackChatMessage[], -): AgentStreamParams["messages"] { +function toUIMessages(messages: SlackChatMessage[]): UIMessageLike[] { return messages.map((m) => ({ role: m.role, parts: [ @@ -132,13 +203,15 @@ function toUIMessages( } /** - * Stream an agent response via the runtime's decopilot streamAgent — - * the same path the AgentOf() binding would use. + * Stream an agent response. + * + * `threadId` is accepted for caller-side bookkeeping/logs but is NOT sent + * to the decopilot — see the file-level note on thread_id handling. */ export async function streamAgentResponse( connectionId: string, messages: SlackChatMessage[], - threadId?: string, + _threadId?: string, ) { const client = await getAgentClient(connectionId); if (!client) { @@ -154,17 +227,14 @@ export async function streamAgentResponse( return client.STREAM({ messages: toUIMessages(messages), toolApprovalLevel: "auto", - ...(threadId ? { thread_id: threadId } : {}), }); } /** - * Collect the final text from a UIMessage stream. + * Collect the final text from an agent stream. * - * `streamAgent` yields a UIMessage that's progressively rebuilt as chunks - * arrive; each yield carries the cumulative `parts` array. We take the last - * text part of the last yield, which equals the final assistant message. - * Tool-call parts are ignored (we don't surface them to Slack). + * Each yield carries the cumulative `parts` array; we keep the last text + * part of the last yield, which equals the final assistant message. */ export async function collectStreamText( stream: AsyncIterable<{ parts: Array<{ type: string; text?: string }> }>,