Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "sap-cloud-sdk"
version = "0.18.1"
version = "0.18.2"
description = "SAP Cloud SDK for Python"
readme = "README.md"
license = "Apache-2.0"
Expand Down
3 changes: 2 additions & 1 deletion src/sap_cloud_sdk/agentgateway/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
]
"""

from sap_cloud_sdk.agentgateway._models import MCPTool
from sap_cloud_sdk.agentgateway._models import AuthResult, MCPTool
from sap_cloud_sdk.agentgateway.agw_client import create_client, AgentGatewayClient
from sap_cloud_sdk.agentgateway.exceptions import (
AgentGatewaySDKError,
Expand All @@ -66,6 +66,7 @@
# Client class
"AgentGatewayClient",
# Data models
"AuthResult",
"MCPTool",
# Exceptions
"AgentGatewaySDKError",
Expand Down
47 changes: 8 additions & 39 deletions src/sap_cloud_sdk/agentgateway/_customer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
- Tool invocation: mTLS + jwt-bearer grant → user-scoped token (principal propagation)
"""

import asyncio
import json
import logging
import os
Expand Down Expand Up @@ -427,16 +426,16 @@ async def _list_server_tools(

async def get_mcp_tools_customer(
credentials: CustomerCredentials,
app_tid: str | None = None,
system_token: str,
) -> list[MCPTool]:
"""List all MCP tools from servers defined in credentials.

Iterates over all integrationDependencies in the credentials file and
discovers tools from each MCP server using mTLS client credentials.
discovers tools from each MCP server using a pre-fetched system token.

Args:
credentials: Customer credentials with integrationDependencies.
app_tid: BTP Application Tenant ID of subscriber (optional).
system_token: Pre-fetched raw system token for authentication.

Returns:
List of MCPTool objects from all servers.
Expand All @@ -453,12 +452,6 @@ async def get_mcp_tools_customer(

logger.info("Discovering tools from %d MCP server(s)", len(dependencies))

# Get system token for discovery
loop = asyncio.get_running_loop()
system_token = await loop.run_in_executor(
None, get_system_token_mtls, credentials, app_tid
)

tools: list[MCPTool] = []

for dep in dependencies:
Expand All @@ -484,52 +477,28 @@ async def get_mcp_tools_customer(


async def call_mcp_tool_customer(
credentials: CustomerCredentials,
tool: MCPTool,
user_token: str | None,
app_tid: str | None = None,
auth_token: str,
**kwargs,
) -> str:
"""Invoke an MCP tool using customer flow.

If user_token is provided, exchanges it for an AGW-scoped token to preserve
user identity for principal propagation. Otherwise, falls back to system token.
Uses a pre-fetched token (either user-scoped or system-scoped) for
authentication against the MCP server.

Args:
credentials: Customer credentials.
tool: MCPTool to invoke.
user_token: User's JWT token for principal propagation (optional).
If None, system token is used instead (no principal propagation).
app_tid: BTP Application Tenant ID of subscriber (optional).
auth_token: Pre-fetched raw access token for authentication.
**kwargs: Tool input parameters.

Returns:
Tool execution result as string.
"""
logger.info("Calling tool '%s' on server '%s'", tool.name, tool.server_name)

loop = asyncio.get_running_loop()

if user_token:
# Exchange user token for AGW-scoped token (with principal propagation)
agw_token = await loop.run_in_executor(
None, exchange_user_token, credentials, user_token, app_tid
)
else:
# TODO: IBD workaround - use system token when user_token is not available.
# This bypasses principal propagation. Remove this fallback once IBD
# supports proper user token flow.
logger.warning(
"No user_token provided - using system token for tool invocation. "
"Principal propagation will NOT work."
)
agw_token = await loop.run_in_executor(
None, get_system_token_mtls, credentials, app_tid
)

async with httpx.AsyncClient(
headers={
"Authorization": f"Bearer {agw_token}",
"Authorization": f"Bearer {auth_token}",
"x-correlation-id": str(uuid.uuid4()),
},
timeout=_HTTP_TIMEOUT,
Expand Down
122 changes: 81 additions & 41 deletions src/sap_cloud_sdk/agentgateway/_lob.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"""LoB agent flow - BTP Destination Service based.

LoB agents use BTP Destination Service for credential management:
- Phase 1 (discovery): Client credentials from destination
- Phase 2 (execution): Token exchange with user_token for principal propagation
- Phase 1 (discovery): Client credentials from destination (subscriber.ias fragment)
- Phase 2 (execution): Token exchange with user_token (subscriber.ias.user fragment)
"""

import asyncio
import base64
import logging
import os
import uuid
Expand Down Expand Up @@ -33,6 +34,7 @@
# Label values for fragment discovery
_MCP_LABEL_VALUE = "agw.mcp.server"
_IAS_LABEL_VALUE = "subscriber.ias"
_IAS_USER_LABEL_VALUE = "subscriber.ias.user"

_DESTINATION_INSTANCE = "default"

Expand Down Expand Up @@ -61,16 +63,20 @@ def _fetch_auth_token(
dest_name: str,
tenant_subdomain: str,
options: ConsumptionOptions | None = None,
) -> str:
"""Fetch auth token from destination service.
) -> tuple[str, str]:
"""Fetch raw access token and gateway URL from destination service.

Extracts the raw JWT by base64-decoding the token value field
from the destination service response, and the gateway URL from
the destination's URL property.

Args:
dest_name: Destination name.
tenant_subdomain: Tenant subdomain for multi-tenant lookup.
options: Consumption options (fragment_name, user_token).

Returns:
Authorization header value.
Tuple of (raw_access_token, gateway_url).

Raises:
MCPServerNotFoundError: If no auth token is returned.
Expand All @@ -88,13 +94,14 @@ def _fetch_auth_token(
f"No auth token returned for destination '{dest_name}'"
)

auth = dest.auth_tokens[0].http_header.get("value", "")
if not auth:
raise MCPServerNotFoundError(
f"Empty Authorization header for destination '{dest_name}'"
)
token_value = dest.auth_tokens[0].value
if not token_value:
raise MCPServerNotFoundError(f"Empty token value for destination '{dest_name}'")

token = base64.b64decode(token_value).decode("utf-8")
gateway_url = (dest.url or "").rstrip("/")

return auth
return token, gateway_url


def list_mcp_fragments(tenant_subdomain: str) -> list:
Expand Down Expand Up @@ -146,10 +153,40 @@ def get_ias_fragment_name(tenant_subdomain: str) -> str:
return fragments[0].name


async def get_system_auth(
def get_ias_user_fragment_name(tenant_subdomain: str) -> str:
"""Get the IAS user fragment name for token exchange (principal propagation).

Looks up the IAS user fragment created during subscription by the
sap-managed-runtime-type=subscriber.ias.user label.

Args:
tenant_subdomain: Tenant subdomain for multi-tenant lookup.

Returns:
IAS user fragment name.

Raises:
MCPServerNotFoundError: If no IAS user fragment is found.
"""
client = create_fragment_client(instance=_DESTINATION_INSTANCE)
fragments = client.list_instance_fragments(
filter=ListOptions(
filter_labels=[Label(key=_LABEL_KEY, values=[_IAS_USER_LABEL_VALUE])]
),
tenant=tenant_subdomain,
)
if not fragments:
raise MCPServerNotFoundError(
f"No IAS user fragment found (label {_LABEL_KEY}={_IAS_USER_LABEL_VALUE}) "
f"for tenant '{tenant_subdomain}'"
)
return fragments[0].name


async def fetch_system_auth(
tenant_subdomain: str,
) -> str:
"""Get system-scoped auth (Phase 1 - client credentials).
) -> tuple[str, str]:
"""Fetch system-scoped auth (Phase 1 - client credentials).

Looks up the IAS fragment (subscriber.ias label) and uses it to acquire
a client-credentials token via BTP Destination Service.
Expand All @@ -158,7 +195,7 @@ async def get_system_auth(
tenant_subdomain: Tenant subdomain for multi-tenant lookup.

Returns:
Authorization header value (e.g., "Bearer xxx").
Tuple of (raw_access_token, gateway_url).

Raises:
MCPServerNotFoundError: If no IAS fragment or auth token is found.
Expand All @@ -185,39 +222,42 @@ def _fetch_system_auth_sync():
return await loop.run_in_executor(None, _fetch_system_auth_sync)


async def get_user_auth(
mcp_fragment_name: str,
async def fetch_user_auth(
user_token: str,
tenant_subdomain: str,
) -> str:
"""Get user-scoped auth (Phase 2 - token exchange).
) -> tuple[str, str]:
"""Fetch user-scoped auth (Phase 2 - token exchange).

Looks up the IAS user fragment (subscriber.ias.user label) and uses it
together with the user_token to perform a token exchange via BTP
Destination Service.

Args:
mcp_fragment_name: MCP fragment name for token exchange.
user_token: User's JWT for principal propagation.
tenant_subdomain: Tenant subdomain for multi-tenant lookup.

Returns:
Authorization header value with user identity embedded.
Tuple of (raw_access_token, gateway_url).

Raises:
MCPServerNotFoundError: If no auth token is returned.
MCPServerNotFoundError: If no IAS user fragment or auth token is found.
"""
loop = asyncio.get_running_loop()

def _fetch_user_auth_sync():
ias_user_fragment_name = get_ias_user_fragment_name(tenant_subdomain)
dest_name = _ias_dest_name()

logger.info(
"Exchanging user auth — destination: '%s', fragment: '%s', tenant: '%s'",
dest_name,
mcp_fragment_name,
ias_user_fragment_name,
tenant_subdomain,
)

options = ConsumptionOptions(
user_token=user_token,
fragment_name=mcp_fragment_name,
fragment_name=ias_user_fragment_name,
fragment_level=ConsumptionLevel.INSTANCE,
)

Expand All @@ -227,20 +267,23 @@ def _fetch_user_auth_sync():


async def list_server_tools(
dest_url: str, system_auth: str, fragment_name: str
dest_url: str, auth_token: str, fragment_name: str
) -> list[MCPTool]:
"""List tools from a single MCP server.

Args:
dest_url: MCP endpoint URL.
system_auth: Authorization header for the request.
auth_token: Raw access token for the request.
fragment_name: Fragment name for reference.

Returns:
List of MCPTool objects from this server.
"""
async with httpx.AsyncClient(
headers={"Authorization": system_auth, "x-correlation-id": str(uuid.uuid4())},
headers={
"Authorization": f"Bearer {auth_token}",
"x-correlation-id": str(uuid.uuid4()),
},
timeout=_HTTP_TIMEOUT,
) as http_client:
async with streamable_http_client(dest_url, http_client=http_client) as (
Expand Down Expand Up @@ -273,13 +316,15 @@ async def list_server_tools(

async def get_mcp_tools_lob(
tenant_subdomain: str,
system_token: str,
) -> list[MCPTool]:
"""List all MCP tools using LoB flow (destination-based).

Uses Phase 1 auth (client-scoped) via BTP Destination Service.
Uses a pre-fetched system token for authentication against MCP servers.

Args:
tenant_subdomain: Tenant subdomain for multi-tenant lookup.
system_token: Pre-fetched raw system token (from get_system_auth).

Returns:
List of MCPTool objects from all MCP servers.
Expand Down Expand Up @@ -308,8 +353,7 @@ async def get_mcp_tools_lob(
continue

try:
system_auth = await get_system_auth(tenant_subdomain)
server_tools = await list_server_tools(mcp_url, system_auth, fragment_name)
server_tools = await list_server_tools(mcp_url, system_token, fragment_name)
tools.extend(server_tools)
logger.debug(
"Loaded %d tool(s) from fragment '%s'",
Expand All @@ -328,35 +372,31 @@ async def get_mcp_tools_lob(

async def call_mcp_tool_lob(
tool: MCPTool,
user_token: str,
tenant_subdomain: str,
user_auth_token: str,
**kwargs,
) -> str:
"""Invoke an MCP tool using LoB flow (destination-based).

Uses Phase 2 auth (user-scoped) via token exchange.
Principal propagation ensures LoB systems see user identity.
Uses a pre-fetched user token for principal propagation.

Args:
tool: MCPTool object (from list_mcp_tools).
user_token: User's JWT for principal propagation.
tenant_subdomain: Tenant subdomain for token exchange.
user_auth_token: Pre-fetched raw user token (from get_user_auth).
**kwargs: Tool input parameters.

Returns:
Tool execution result as string.

Raises:
MCPServerNotFoundError: If destination/auth fails.
"""
if not tool.fragment_name:
raise MCPServerNotFoundError(
f"Tool '{tool.name}' missing fragment_name for LoB invocation"
)
user_auth = await get_user_auth(tool.fragment_name, user_token, tenant_subdomain)

async with httpx.AsyncClient(
headers={"Authorization": user_auth, "x-correlation-id": str(uuid.uuid4())},
headers={
"Authorization": f"Bearer {user_auth_token}",
"x-correlation-id": str(uuid.uuid4()),
},
timeout=_HTTP_TIMEOUT,
) as http_client:
async with streamable_http_client(tool.url, http_client=http_client) as (
Expand Down
Loading
Loading