Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,8 @@ agents/
.serena/
.envrc
scripts/update-sdk-apiserver.sh

# Local RALPH authoring loop artifacts
ralph/
scripts/ralph*
scripts/__pycache__/ralph*.pyc
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The server currently negotiates MCP protocol versions `2025-11-25`, `2025-06-18`
- Pulsar Admin operations (topics, namespaces, tenants, schemas, etc.)
- Pulsar Client operations (producers, consumers)
- Functions, Sources, and Sinks management
- Read-only MCP resources for context, catalog, and bounded admin summaries
- **Multiple Connection Options**:
- Connect to StreamNative Cloud with service account authentication
- Connect directly to external Apache Kafka clusters
Expand Down Expand Up @@ -258,8 +259,8 @@ The StreamNative MCP Server allows you to enable or disable specific groups of f

| Feature | Description | Docs |
|--------------------------|--------------------------------------------------|------|
| `all-pulsar` | Enables all Pulsar admin and client tools, without Apache Kafka and StreamNative Cloud tools | |
| `pulsar-admin` | Pulsar administrative operations (all admin tools)| |
| `all-pulsar` | Enables all Pulsar admin and client tools, without Apache Kafka and StreamNative Cloud tools | [pulsar_resources.md](docs/tools/pulsar_resources.md) |
| `pulsar-admin` | Pulsar administrative operations (all admin tools)| [pulsar_resources.md](docs/tools/pulsar_resources.md) |
| `pulsar-client` | Pulsar client operations (produce/consume) | [pulsar_client_consume.md](docs/tools/pulsar_client_consume.md), [pulsar_client_produce.md](docs/tools/pulsar_client_produce.md) |
| `pulsar-admin-brokers` | Manage Pulsar brokers | [pulsar_admin_brokers.md](docs/tools/pulsar_admin_brokers.md) |
| `pulsar-admin-brokers-status` | Check Pulsar broker or proxy status | [pulsar_admin_status.md](docs/tools/pulsar_admin_status.md) |
Expand All @@ -280,6 +281,8 @@ The StreamNative MCP Server allows you to enable or disable specific groups of f
| `pulsar-admin-sources` | Manage Pulsar Sources | [pulsar_admin_sources.md](docs/tools/pulsar_admin_sources.md) |
| `pulsar-admin-topic-policy` | Configure Pulsar topic policies | [pulsar_admin_topic_policy.md](docs/tools/pulsar_admin_topic_policy.md) |

Pulsar admin feature gates also register read-only MCP resources for the matching admin surface. These resources use `pulsar://...` URIs, return JSON snapshots, and stay separate from write-capable tools; see [pulsar_resources.md](docs/tools/pulsar_resources.md) for the supported URI templates and safety boundaries.

---

#### StreamNative Cloud Features
Expand Down
96 changes: 96 additions & 0 deletions docs/tools/pulsar_resources.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Pulsar Resources

The Pulsar resource surface exposes read-only MCP resources for lightweight cluster context, discovery, and bounded admin summaries. It is separate from Pulsar tools: tools are command-oriented operations, while resources only read current state and return JSON snapshots.

## Registration and discovery

Pulsar resources are registered next to the existing Pulsar tool wiring and use the same Pulsar admin feature gates as the matching tool families. No Pulsar resources are registered for unrelated features or for `pulsar-client` alone.

`pulsar://context` and `pulsar://resources` are registered only when at least one Pulsar admin resource family is enabled. `pulsar://resources` returns the catalog of resource URIs and URI templates actually registered for the active feature set.

All resource reads except `pulsar://resources` require a Pulsar session in the request context. Missing sessions return a clear error instead of falling back to environment state.

## Static resources

- `pulsar://context`: current Pulsar session connection metadata with authentication material redacted.
- `pulsar://resources`: catalog of the registered Pulsar resource URIs and URI templates.
- `pulsar://admin/v2/tenants`: tenant names known to the current Pulsar admin endpoint.
- `pulsar://admin/v2/resource-quotas`: default resource quota for new namespace bundles.
- `pulsar://admin/v2/status`: broker or proxy status for the current Pulsar admin endpoint.
- `pulsar://admin/v2/clusters`: cluster names known to the current Pulsar admin endpoint.
- `pulsar://admin/v2/broker-stats/summary`: bounded summary of broker monitoring metrics and load report.
- `pulsar://admin/v2/worker/cluster`: bounded summary of Pulsar Functions workers.
- `pulsar://admin/v2/worker/cluster/leader`: current Pulsar Functions worker leader.
- `pulsar://admin/v2/worker/assignments`: bounded summary of Pulsar Functions worker assignments.
- `pulsar://admin/v2/worker-stats/functionsmetrics`: bounded function instance stats reported by the Pulsar Functions worker.
- `pulsar://admin/v2/worker-stats/metrics`: bounded summary of Pulsar Functions worker monitoring metrics.

All static resources return `application/json`.

## Resource templates

- `pulsar://admin/v2/tenants/{tenant}`: gets tenant configuration.
- `pulsar://admin/v2/tenants/{tenant}/namespaces`: lists namespaces for a tenant.
- `pulsar://admin/v2/namespaces/{tenant}/{namespace}`: gets namespace policies.
- `pulsar://admin/v2/namespaces/{tenant}/{namespace}/topics`: lists topics for a namespace.
- `pulsar://admin/v2/resource-quotas/{tenant}/{namespace}/{bundle}`: gets resource quota for a namespace bundle.
- `pulsar://admin/v2/{domain}/{tenant}/{namespace}/{topic}/metadata`: gets parsed topic identity and sanitized topic properties.
- `pulsar://admin/v2/{domain}/{tenant}/{namespace}/{topic}/stats`: gets a bounded topic statistics summary without publisher or consumer details.
- `pulsar://admin/v2/{domain}/{tenant}/{namespace}/{topic}/partitions`: gets topic partition metadata.
- `pulsar://admin/v2/{domain}/{tenant}/{namespace}/{topic}/policies/{policy}`: gets one read-only topic policy value. Supported policies are `retention`, `message-ttl`, `max-producers`, `max-consumers`, `max-unacked-messages-per-consumer`, `max-unacked-messages-per-subscription`, `persistence`, `delayed-delivery`, `dispatch-rate`, `subscription-dispatch-rate`, `deduplication`, `backlog-quotas`, `compaction-threshold`, `publish-rate`, and `inactive-topic-policies`.
- `pulsar://admin/v2/{domain}/{tenant}/{namespace}/{topic}/schema`: gets the latest topic schema and version.
- `pulsar://admin/v2/{domain}/{tenant}/{namespace}/{topic}/schema/{version}`: gets a specific topic schema version.
- `pulsar://admin/v2/{domain}/{tenant}/{namespace}/{topic}/subscriptions`: lists subscriptions for a topic.
- `pulsar://admin/v2/{domain}/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/stats`: gets bounded subscription statistics without consumer details.
- `pulsar://admin/v2/{domain}/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/backlog`: gets subscription backlog counters without changing cursor state.
- `pulsar://admin/v2/persistent/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/cursor`: gets persistent topic cursor positions for a subscription.
- `pulsar://admin/v3/functions/{tenant}/{namespace}`: lists Pulsar Functions for a namespace, bounded to the first 50 names.
- `pulsar://admin/v3/functions/{tenant}/{namespace}/{function}/metadata`: gets sanitized Pulsar Function metadata.
- `pulsar://admin/v3/functions/{tenant}/{namespace}/{function}/status`: gets bounded Pulsar Function runtime status without exception detail strings.
- `pulsar://admin/v3/functions/{tenant}/{namespace}/{function}/stats`: gets bounded Pulsar Function statistics and user metric names.
- `pulsar://admin/v3/sources/{tenant}/{namespace}`: lists Pulsar Sources for a namespace, bounded to the first 50 names.
- `pulsar://admin/v3/sources/{tenant}/{namespace}/{source}/metadata`: gets sanitized Pulsar Source metadata.
- `pulsar://admin/v3/sources/{tenant}/{namespace}/{source}/status`: gets bounded Pulsar Source runtime status without exception detail strings.
- `pulsar://admin/v3/sinks/{tenant}/{namespace}`: lists Pulsar Sinks for a namespace, bounded to the first 50 names.
- `pulsar://admin/v3/sinks/{tenant}/{namespace}/{sink}/metadata`: gets sanitized Pulsar Sink metadata.
- `pulsar://admin/v3/sinks/{tenant}/{namespace}/{sink}/status`: gets bounded Pulsar Sink runtime status without exception detail strings.
- `pulsar://admin/v3/packages/{type}/{tenant}/{namespace}`: lists packages by type and namespace, bounded to the first 50 names. Supported package types are `function`, `source`, and `sink`.
- `pulsar://admin/v3/packages/{type}/{tenant}/{namespace}/{package}/versions`: lists package versions, bounded to the first 50 names.
- `pulsar://admin/v3/packages/{type}/{tenant}/{namespace}/{package}/{version}/metadata`: gets sanitized metadata for one package version.
- `pulsar://admin/v2/clusters/{cluster}`: sanitized configuration for a cluster.
- `pulsar://admin/v2/brokers/{cluster}`: lists active brokers for a cluster.
- `pulsar://admin/v2/clusters/{cluster}/failureDomains`: lists failure domains for a cluster.
- `pulsar://admin/v2/clusters/{cluster}/failureDomains/{domain}`: gets a failure domain.
- `pulsar://admin/v2/clusters/{cluster}/namespaceIsolationPolicies`: lists namespace isolation policies for a cluster.
- `pulsar://admin/v2/clusters/{cluster}/namespaceIsolationPolicies/{policy}`: gets a namespace isolation policy.

Template reads return `application/json`. Topic templates accept `domain` values of `persistent` or `non-persistent`; `topic` is the local topic name path segment. Subscription cursor resources are persistent-only because they are backed by topic internal stats. Workload and package templates use Pulsar admin v3 APIs; functions worker resources use the current Pulsar admin v2 worker endpoints.

## Feature gates

The following feature gates register Pulsar resources. Each listed family is also enabled by `pulsar-admin`, `all-pulsar`, or `all`.

| Feature gate | Resource surface |
|--------------|------------------|
| `pulsar-admin-tenants` | tenant collection and tenant configuration resources |
| `pulsar-admin-namespaces` | namespace collection by tenant |
| `pulsar-admin-namespace-policy` | namespace policy resource |
| `pulsar-admin-topics` | namespace topic collection, topic metadata, topic stats summary, and partition metadata resources |
| `pulsar-admin-topic-policy` | read-only topic policy resource |
| `pulsar-admin-schemas` | latest schema and schema version resources |
| `pulsar-admin-subscriptions` | subscription collection, bounded subscription stats, backlog summary, and persistent cursor summary resources |
| `pulsar-admin-resource-quotas` | default resource quota and namespace bundle resource quota resources |
| `pulsar-admin-brokers-status` | broker or proxy status resource |
| `pulsar-admin-clusters` | cluster collection, cluster configuration, failure-domain collection, and failure-domain resources |
| `pulsar-admin-brokers` | broker collection by cluster |
| `pulsar-admin-broker-stats` | broker stats summary resource |
| `pulsar-admin-ns-isolation-policy` | namespace isolation policy collection and policy resources |
| `pulsar-admin-functions` | function collection, metadata, status, and stats resources |
| `pulsar-admin-sources` | source collection, metadata, and status resources |
| `pulsar-admin-sinks` | sink collection, metadata, and status resources |
| `pulsar-admin-packages` | package collection, package version collection, and package metadata resources |
| `pulsar-admin-functions-worker` | functions worker cluster, leader, assignments, function stats, and metrics resources |

## Safety

Resource handlers are read-only regardless of whether write-capable Pulsar tools are enabled. They do not consume messages, commit cursors, clear backlog, unload topics, split bundles, delete resources, start workloads, or stop workloads. They also do not return tokens, auth params, key files, TLS private keys, or secret values.
1 change: 1 addition & 0 deletions pkg/cmd/mcp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func newMcpServer(_ context.Context, configOpts *ServerOptions, logrusLogger *lo
}
}

mcp.PulsarAddResources(s, configOpts.Features)
mcp.PulsarAdminAddBrokersTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.PulsarAdminAddStatusTools(s, configOpts.ReadOnly, configOpts.Features)
mcp.PulsarAdminAddBrokerStatsTools(s, configOpts.ReadOnly, configOpts.Features)
Expand Down
30 changes: 3 additions & 27 deletions pkg/mcp/builders/pulsar/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,8 @@ package pulsar
import (
"context"
"fmt"
"net/http"
"strings"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin"
pulsaradminauth "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/auth"
pulsaradminconfig "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
"github.com/streamnative/streamnative-mcp-server/pkg/mcp/builders"
Expand Down Expand Up @@ -90,36 +85,17 @@ func (b *PulsarAdminStatusToolBuilder) buildStatusHandler() func(context.Context
return mcp.NewToolResultError("Pulsar session not found in context"), nil
}

cfg, err := session.GetPulsarCtlConfig()
statusClient, err := session.GetAdminStatusClient()
if err != nil {
return b.handleError("get Pulsar configuration", err), nil
}

authProvider, err := pulsaradminauth.GetAuthProvider((*pulsaradminconfig.Config)(cfg))
if err != nil {
return b.handleError("build status auth provider", err), nil
}

statusClient := &rest.Client{
ServiceURL: cfg.WebServiceURL,
VersionInfo: admin.ReleaseVersion,
HTTPClient: &http.Client{
Timeout: admin.DefaultHTTPTimeOutDuration,
Transport: authProvider,
},
return b.handleError("get Pulsar status client", err), nil
}

data, err := statusClient.GetWithQueryParams("/status.html", nil, nil, false)
if err != nil {
return b.handleError("check Pulsar status", err), nil
}

status := strings.TrimSpace(string(data))
if status == "" {
status = string(data)
}

return mcp.NewToolResultText(status), nil
return mcp.NewToolResultText(strings.TrimSpace(string(data))), nil
}
}

Expand Down
Loading
Loading