Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 0 additions & 82 deletions .ai/PLAN.md

This file was deleted.

8 changes: 8 additions & 0 deletions backend/src/components/utils/categorization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const SUPPORTED_CATEGORIES: readonly ComponentCategory[] = [
'input',
'transform',
'ai',
'mcp',
'security',
'it_ops',
'notification',
Expand Down Expand Up @@ -41,6 +42,13 @@ const COMPONENT_CATEGORY_CONFIG: Record<ComponentCategory, ComponentCategoryConf
emoji: '🤖',
icon: 'Brain',
},
mcp: {
label: 'MCP Servers',
color: 'text-teal-600',
description: 'Model Context Protocol servers and tool gateways',
emoji: '🔌',
icon: 'Plug',
},
security: {
label: 'Security Tools',
color: 'text-red-600',
Expand Down
11 changes: 11 additions & 0 deletions backend/src/mcp/internal-mcp.controller.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Body, Controller, Post } from '@nestjs/common';
import { ToolRegistryService } from './tool-registry.service';
import { McpGatewayService } from './mcp-gateway.service';
import { McpAuthService } from './mcp-auth.service';
import {
RegisterComponentToolInput,
Expand All @@ -12,6 +13,7 @@ export class InternalMcpController {
constructor(
private readonly toolRegistry: ToolRegistryService,
private readonly mcpAuthService: McpAuthService,
private readonly mcpGatewayService: McpGatewayService,
) {}

@Post('generate-token')
Expand All @@ -36,18 +38,27 @@ export class InternalMcpController {
@Post('register-component')
async registerComponent(@Body() body: RegisterComponentToolInput) {
await this.toolRegistry.registerComponentTool(body);
await this.mcpGatewayService.refreshServersForRun(body.runId);
return { success: true };
}

@Post('register-remote')
async registerRemote(@Body() body: RegisterRemoteMcpInput) {
await this.toolRegistry.registerRemoteMcp(body);
await this.mcpGatewayService.refreshServersForRun(body.runId);
return { success: true };
}

@Post('register-local')
async registerLocal(@Body() body: RegisterLocalMcpInput) {
await this.toolRegistry.registerLocalMcp(body);
await this.mcpGatewayService.refreshServersForRun(body.runId);
return { success: true };
}

@Post('cleanup')
async cleanupRun(@Body() body: { runId: string }) {
const containerIds = await this.toolRegistry.cleanupRun(body.runId);
return { containerIds };
}
}
134 changes: 120 additions & 14 deletions backend/src/mcp/mcp-gateway.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export class McpGatewayService {

// Cache of servers per runId
private readonly servers = new Map<string, McpServer>();
private readonly registeredToolNames = new Map<string, Set<string>>();

constructor(
private readonly toolRegistry: ToolRegistryService,
Expand Down Expand Up @@ -65,12 +66,42 @@ export class McpGatewayService {
version: '1.0.0',
});

await this.registerTools(server, runId, allowedTools, allowedNodeIds);
const toolSet = new Set<string>();
this.registeredToolNames.set(cacheKey, toolSet);
await this.registerTools(server, runId, allowedTools, allowedNodeIds, toolSet);
this.servers.set(cacheKey, server);

return server;
}

/**
* Refresh tool registrations for any cached servers for a run.
* This is used when tools register after an MCP session has already initialized.
*/
async refreshServersForRun(runId: string): Promise<void> {
const matchingEntries = Array.from(this.servers.entries()).filter(
([key]) => key === runId || key.startsWith(`${runId}:`),
);

if (matchingEntries.length === 0) {
return;
}

this.logger.log(
`Refreshing MCP servers for run ${runId} (${matchingEntries.length} instance(s))`,
);

await Promise.all(
matchingEntries.map(async ([cacheKey, server]) => {
const allowedNodeIds =
cacheKey === runId ? undefined : cacheKey.split(':').slice(1).join(':').split(',');
const toolSet = this.registeredToolNames.get(cacheKey) ?? new Set<string>();
this.registeredToolNames.set(cacheKey, toolSet);
await this.registerTools(server, runId, undefined, allowedNodeIds, toolSet);
Comment on lines +94 to +100

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve allowedTools filter when refreshing MCP servers

When a gateway session is initialized with an x-allowed-tools header, getServerForRun registers tools with that filter. The new refreshServersForRun path re-registers tools with allowedTools set to undefined, so any newly registered tools will be added to existing server instances even if they were created with a narrower tool list. This means clients that intentionally requested a limited tool set can see additional tools after any tool registration event, which breaks the expected scoping for those sessions. Consider storing the original allowedTools per cacheKey and passing it back into registerTools during refresh.

Useful? React with 👍 / 👎.

}),
);
}

private async validateRunAccess(runId: string, organizationId?: string | null) {
const run = await this.workflowRunRepository.findByRunId(runId);
if (!run) {
Expand Down Expand Up @@ -124,8 +155,15 @@ export class McpGatewayService {
runId: string,
allowedTools?: string[],
allowedNodeIds?: string[],
registeredToolNames?: Set<string>,
) {
this.logger.log(
`Registering tools for run ${runId} (allowedNodeIds=${allowedNodeIds?.join(',') ?? 'none'}, allowedTools=${allowedTools?.join(',') ?? 'none'})`,
);
const allRegistered = await this.toolRegistry.getToolsForRun(runId, allowedNodeIds);
this.logger.log(
`Tool registry returned ${allRegistered.length} tool(s) for run ${runId}: ${allRegistered.map((t) => `${t.toolName}:${t.type}`).join(', ') || 'none'}`,
);

// Filter by allowed tools if specified
if (allowedTools && allowedTools.length > 0) {
Expand All @@ -137,11 +175,19 @@ export class McpGatewayService {

// 1. Register Internal Tools
const internalTools = allRegistered.filter((t) => t.type === 'component');
this.logger.log(`Registering ${internalTools.length} internal tool(s) for run ${runId}`);
for (const tool of internalTools) {
if (allowedTools && allowedTools.length > 0 && !allowedTools.includes(tool.toolName)) {
this.logger.log(`Skipping internal tool ${tool.toolName} (not in allowedTools)`);
continue;
}

if (registeredToolNames?.has(tool.toolName)) {
this.logger.log(`Skipping internal tool ${tool.toolName} (already registered)`);
continue;
}

this.logger.log(`Registering internal tool ${tool.toolName} (node=${tool.nodeId})`);
const component = tool.componentId ? componentRegistry.get(tool.componentId) : null;
const inputShape = component ? getToolInputShape(component) : undefined;

Expand Down Expand Up @@ -217,22 +263,37 @@ export class McpGatewayService {
}
},
);
registeredToolNames?.add(tool.toolName);
}

// 2. Register External Tools (Proxied)
const externalSources = allRegistered.filter((t) => t.type !== 'component');
this.logger.log(
`Registering ${externalSources.length} external MCP source(s) for run ${runId}`,
);
for (const source of externalSources) {
try {
this.logger.log(
`Fetching tools from external source ${source.toolName} (type=${source.type}, endpoint=${source.endpoint ?? 'missing'})`,
);
const tools = await this.fetchExternalTools(source);
const prefix = source.toolName;

this.logger.log(`External source ${source.toolName} returned ${tools.length} tool(s)`);
for (const t of tools) {
const proxiedName = `${prefix}__${t.name}`;

if (allowedTools && allowedTools.length > 0 && !allowedTools.includes(proxiedName)) {
this.logger.log(`Skipping proxied tool ${proxiedName} (not in allowedTools)`);
continue;
}

if (registeredToolNames?.has(proxiedName)) {
this.logger.log(`Skipping proxied tool ${proxiedName} (already registered)`);
continue;
}

this.logger.log(`Registering proxied tool ${proxiedName} (source=${source.toolName})`);
server.registerTool(
proxiedName,
{
Expand Down Expand Up @@ -261,6 +322,7 @@ export class McpGatewayService {
}
},
);
registeredToolNames?.add(proxiedName);
}
} catch (error) {
this.logger.error(`Failed to fetch tools from external source ${source.toolName}:`, error);
Expand All @@ -272,21 +334,58 @@ export class McpGatewayService {
* Fetches tools from an external MCP source
*/
private async fetchExternalTools(source: RegisteredTool): Promise<any[]> {
if (!source.endpoint) return [];
if (!source.endpoint) {
this.logger.warn(`Missing endpoint for external source ${source.toolName}`);
return [];
}

const transport = new StreamableHTTPClientTransport(new URL(source.endpoint));
const client = new Client(
{ name: 'shipsec-gateway-client', version: '1.0.0' },
{ capabilities: {} },
);
const MAX_RETRIES = 5;
const RETRY_DELAY_MS = 1000;
let lastError: unknown;

await client.connect(transport);
try {
const response = await client.listTools();
return response.tools;
} finally {
await client.close();
for (let attempt = 1; attempt <= MAX_RETRIES; attempt += 1) {
const sessionId = `stdio-proxy-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
const transport = new StreamableHTTPClientTransport(new URL(source.endpoint), {
requestInit: {
headers: {
'Mcp-Session-Id': sessionId,
},
},
});
const client = new Client(
{ name: 'shipsec-gateway-client', version: '1.0.0' },
{ capabilities: {} },
);

this.logger.log(
`Connecting to external MCP source ${source.toolName} at ${source.endpoint} (attempt ${attempt}/${MAX_RETRIES})`,
);

try {
await client.connect(transport);
const response = await client.listTools();
this.logger.log(
`listTools from ${source.toolName} returned ${response.tools?.length ?? 0} tool(s)`,
);
return response.tools;
} catch (error) {
lastError = error;
this.logger.warn(
`listTools failed for ${source.toolName} (attempt ${attempt}/${MAX_RETRIES}): ${error instanceof Error ? error.message : String(error)}`,
);
if (attempt < MAX_RETRIES) {
await new Promise((resolve) => setTimeout(resolve, RETRY_DELAY_MS));
}
} finally {
this.logger.log(`Closing external MCP client for ${source.toolName}`);
await client.close();
}
}

if (lastError) {
throw lastError;
}
return [];
}

/**
Expand All @@ -310,7 +409,14 @@ export class McpGatewayService {
let lastError: unknown;

for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
const transport = new StreamableHTTPClientTransport(new URL(source.endpoint));
const sessionId = `stdio-proxy-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
const transport = new StreamableHTTPClientTransport(new URL(source.endpoint), {
requestInit: {
headers: {
'Mcp-Session-Id': sessionId,
},
},
});
const client = new Client(
{ name: 'shipsec-gateway-client', version: '1.0.0' },
{ capabilities: {} },
Expand Down
5 changes: 5 additions & 0 deletions backend/src/mcp/mcp.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ import { ApiKeysModule } from '../api-keys/api-keys.module';
useFactory: () => {
// Use the same Redis URL as terminal or a dedicated one
const url = process.env.TOOL_REGISTRY_REDIS_URL ?? process.env.TERMINAL_REDIS_URL;
if (!url) {
console.warn('[MCP] Redis URL not set; tool registry disabled');
} else {
console.info(`[MCP] Tool registry Redis URL: ${url}`);
}
if (!url) {
return null;
}
Expand Down
1 change: 1 addition & 0 deletions backend/src/mcp/tool-registry.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ export class ToolRegistryService implements OnModuleDestroy {

async getToolsForRun(runId: string, nodeIds?: string[]): Promise<RegisteredTool[]> {
if (!this.redis) {
this.logger.warn('Redis not configured, tool registry disabled');
return [];
}

Expand Down
Loading
Loading