Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 75 additions & 6 deletions slack-mcp/server/lib/event-publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,117 @@
*
* Publishes Slack events via trigger callbacks for cross-MCP integration.
* Other MCPs can subscribe to these events to react to Slack activity.
*
* For message-style events (`slack.message.received` and `slack.app_mention`)
* we enrich the payload with:
* - `user_name`: the resolved Slack display name of the author, so the
* subscriber doesn't have to call SLACK_GET_USER_INFO just to address
* the user properly.
* - `thread_messages`: when the incoming event lives in a thread, the
* full set of replies from that thread (oldest → newest, including the
* parent and the bot's own prior replies). Lets a trigger-driven agent
* see the entire conversation in one shot and answer coherently with
* SLACK_REPLY_IN_THREAD without having to fetch history itself.
*/

import type { SlackEvent } from "./types.ts";
import { triggers } from "./trigger-store.ts";
import { getThreadReplies, getUserInfo } from "./slack-client.ts";

interface ThreadMessageSummary {
ts: string;
user?: string;
text: string;
is_bot: boolean;
}

export function publishMessageReceived(
async function resolveUserName(
userId: string | undefined,
): Promise<string | undefined> {
if (!userId) return undefined;
try {
const info = await getUserInfo(userId);
return (
info?.profile?.display_name || info?.real_name || info?.name || undefined
);
} catch {
return undefined;
}
}

async function fetchThreadMessages(
channel: string | undefined,
threadTs: string | undefined,
): Promise<ThreadMessageSummary[] | undefined> {
if (!channel || !threadTs) return undefined;
try {
const replies = await getThreadReplies(channel, threadTs);
return replies
.sort((a, b) => Number.parseFloat(a.ts) - Number.parseFloat(b.ts))
.map((m) => ({
ts: m.ts,
user: m.user,
text: m.text ?? "",
is_bot: Boolean(m.bot_id),
}));
} catch {
return undefined;
}
}

export async function publishMessageReceived(
connectionId: string,
event: SlackEvent,
): void {
extras?: { fallback?: boolean },
): Promise<void> {
const [user_name, thread_messages] = await Promise.all([
resolveUserName(event.user),
fetchThreadMessages(event.channel, event.thread_ts),
]);

triggers.notify(connectionId, "slack.message.received", {
event: "slack.message.received",
channel_id: event.channel,
user_id: event.user,
user_name,
text: event.text ?? "",
ts: event.ts,
thread_ts: event.thread_ts,
is_dm:
event.channel?.startsWith("D") || (event as any).channel_type === "im",
has_files: !!(event as any).files?.length,
thread_messages,
timestamp: new Date().toISOString(),
...(extras?.fallback ? { fallback: true } : {}),
});
console.log(
`[Triggers] Notified slack.message.received: channel=${event.channel}`,
`[Triggers] Notified slack.message.received: channel=${event.channel}${thread_messages ? ` (${thread_messages.length} thread msgs)` : ""}`,
);
}

export function publishAppMention(
export async function publishAppMention(
connectionId: string,
event: SlackEvent,
): void {
): Promise<void> {
const [user_name, thread_messages] = await Promise.all([
resolveUserName(event.user),
fetchThreadMessages(event.channel, event.thread_ts),
]);

triggers.notify(connectionId, "slack.app_mention", {
event: "slack.app_mention",
channel_id: event.channel,
user_id: event.user,
user_name,
text: event.text ?? "",
ts: event.ts,
thread_ts: event.thread_ts,
has_files: !!(event as any).files?.length,
thread_messages,
timestamp: new Date().toISOString(),
});
console.log(
`[Triggers] Notified slack.app_mention: channel=${event.channel}`,
`[Triggers] Notified slack.app_mention: channel=${event.channel}${thread_messages ? ` (${thread_messages.length} thread msgs)` : ""}`,
);
}

Expand Down
36 changes: 30 additions & 6 deletions slack-mcp/server/slack/handlers/eventHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ export async function handleSlackEvent(
// Publishing here AND running the LLM caused every message to be
// answered twice (once by the LLM, once by the trigger subscriber).
if (triggerOnly) {
publishAppMention(connectionId, payload);
await publishAppMention(connectionId, payload);
} else {
await handleAppMention(
payload as SlackAppMentionEvent,
Expand All @@ -417,7 +417,7 @@ export async function handleSlackEvent(
break;
case "message":
if (triggerOnly) {
publishMessageReceived(connectionId, payload);
await publishMessageReceived(connectionId, payload);
} else {
await handleMessage(
payload as SlackMessageEvent,
Expand Down Expand Up @@ -598,7 +598,22 @@ async function handleMessage(
const mediaForLLM =
transcriptions.length > 0 ? media.filter((m) => m.type === "image") : media;

if (isDM) {
// A DM that is a reply inside an existing thread continues in that thread;
// a fresh top-level DM starts a brand-new thread under the user's message
// (see handleDirectMessage). Channel messages only reach this branch when
// thread_ts is set AND the bot has participated in that thread.
if (isDM && thread_ts) {
await handleThreadReply(
channel,
user,
fullText,
ts,
thread_ts,
mediaForLLM,
teamConfig,
connectionId,
);
} else if (isDM) {
await handleDirectMessage(
channel,
user,
Expand Down Expand Up @@ -646,16 +661,24 @@ async function handleDirectMessage(
await addReaction(channel, ts, "eyes");
}

// Every top-level DM kicks off a brand-new thread under the user's
// message — bot's thinking message and final reply both live inside that
// thread. Subsequent replies from the user in the thread continue there
// (handled by handleThreadReply), so each subject stays isolated.
const replyTo = ts;

const showThinking = showOnlyFinal
? false
: (teamConfig.responseConfig?.showThinkingMessage ?? true);
const thinkingMsg = showThinking ? await sendThinkingMessage(channel) : null;
const thinkingMsg = showThinking
? await sendThinkingMessage(channel, replyTo)
: null;

if (!showOnlyFinal) {
await removeReaction(channel, ts, "eyes");
}

// Resolve sender name (used both as a prefix for the LLM and as the thread_id)
// Resolve sender name (used as a prefix for the LLM context)
const senderName = await resolveUserName(user);
const senderText =
senderName && senderName !== user
Expand All @@ -673,7 +696,7 @@ async function handleDirectMessage(
if (!(await isLLMAvailable(connectionId))) {
const warningMsg =
"Bot ainda inicializando. Por favor, tente novamente em alguns segundos.";
await sendMessage({ channel, text: warningMsg });
await replyInThread(channel, replyTo, warningMsg);
if (thinkingMsg?.ts) {
await deleteMessage(channel, thinkingMsg.ts);
}
Expand All @@ -687,6 +710,7 @@ async function handleDirectMessage(
try {
await handleLLMCall(connectionId, messages, {
channel,
replyTo,
thinkingMessageTs: thinkingMsg?.ts,
streamingEnabled: enableStreaming,
userName: senderName,
Expand Down
35 changes: 20 additions & 15 deletions slack-mcp/server/slack/handlers/llm-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
deleteMessage,
} from "../../lib/slack-client.ts";
import { formatForSlack, buildResponseBlocks } from "../../lib/format.ts";
import { triggers } from "../../lib/trigger-store.ts";
import { publishMessageReceived } from "../../lib/event-publisher.ts";
import type { MessageWithImages } from "./context-builder.ts";
import { logger } from "../../lib/logger.ts";

Expand Down Expand Up @@ -246,20 +246,25 @@ export async function handleLLMCall(
await deleteMessage(channel, thinkingMessageTs).catch(() => {});
}

const isDM = channel.startsWith("D") || slackEvent.channel_type === "im";

triggers.notify(connectionId, "slack.message.received", {
event: "slack.message.received",
channel_id: channel,
user_id: slackEvent.user,
text: slackEvent.text,
ts: slackEvent.ts,
thread_ts: slackEvent.thread_ts,
is_dm: isDM,
has_files: false,
timestamp: new Date().toISOString(),
fallback: true,
});
// Reuse the standard publisher so the subscriber gets the same
// enriched payload (user_name, thread_messages, etc.) as the
// triggerOnly mode would deliver.
await publishMessageReceived(
connectionId,
{
type: "message",
event_ts: slackEvent.ts,
channel,
user: slackEvent.user,
text: slackEvent.text,
ts: slackEvent.ts,
thread_ts: slackEvent.thread_ts,
...(slackEvent.channel_type
? { channel_type: slackEvent.channel_type }
: {}),
} as Parameters<typeof publishMessageReceived>[1],
{ fallback: true },
);

// Don't throw — the trigger will handle the response
return;
Expand Down
Loading