Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ If you want to access to your StreamNative Cloud, you will need to have followin
bin/snmcp stdio --organization my-org --key-file /path/to/key-file.json

# Start MCP server with StreamNative Cloud authentication and pre-configured context
# When --pulsar-instance and --pulsar-cluster are provided, context management tools are disabled
# When --pulsar-instance and --pulsar-cluster are provided, context mutation tools are disabled
bin/snmcp stdio --organization my-org --key-file /path/to/key-file.json --pulsar-instance my-instance --pulsar-cluster my-cluster

# Start MCP server with external Kafka
Expand All @@ -118,7 +118,7 @@ docker run -i --rm -e SNMCP_ORGANIZATION=my-org -e SNMCP_KEY_FILE=/key.json -v /
bin/snmcp sse --http-addr :9090 --http-path /mcp --organization my-org --key-file /path/to/key-file.json

# Start MCP server with SSE and pre-configured StreamNative Cloud context
# When --pulsar-instance and --pulsar-cluster are provided, context management tools are disabled
# When --pulsar-instance and --pulsar-cluster are provided, context mutation tools are disabled
bin/snmcp sse --http-addr :9090 --http-path /mcp --organization my-org --key-file /path/to/key-file.json --pulsar-instance my-instance --pulsar-cluster my-cluster

# Start MCP server with SSE and external Kafka
Expand Down Expand Up @@ -292,7 +292,7 @@ Pulsar admin feature gates also register read-only MCP resources for the matchin
| `streamnative-cloud`| Manage StreamNative Cloud context and check resource logs | [streamnative_cloud.md](docs/tools/streamnative_cloud.md) |
| `functions-as-tools` | Dynamically exposes deployed Pulsar Functions as invokable MCP tools, with automatic input/output schema handling. | [functions_as_tools.md](docs/tools/functions_as_tools.md) |

> **Note:** When using `--pulsar-instance` and `--pulsar-cluster` flags together, context management tools (`sncloud_context_use_cluster`) are automatically disabled since the context is pre-configured.
> **Note:** When using `--pulsar-instance` and `--pulsar-cluster` flags together, context mutation tools (`sncloud_context_use_cluster`, `sncloud_context_reset`) are automatically disabled since the context is pre-configured.

You can combine these features as needed using the `--features` flag. For example, to enable only Pulsar client features:
```bash
Expand Down
9 changes: 9 additions & 0 deletions docs/tools/streamnative_cloud.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ If you encounter `ContextNotSetErr`, use `sncloud_context_available_clusters` to

---

#### sncloud_context_reset

Reset the current StreamNative Cloud cluster context. After reset, the session has no bound Pulsar or Kafka cluster connection, and cluster-specific tools will return `ContextNotSetErr` until `sncloud_context_use_cluster` is used again.

- **sncloud_context_reset**
- No parameters required

---

#### sncloud_context_whoami

Display the currently logged-in service account. Returns the name of the authenticated service account and the organization.
Expand Down
10 changes: 10 additions & 0 deletions pkg/cmd/mcp/stdio.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/spf13/cobra"
"github.com/streamnative/streamnative-mcp-server/pkg/common"
"github.com/streamnative/streamnative-mcp-server/pkg/log"
mcpctx "github.com/streamnative/streamnative-mcp-server/pkg/mcp"
)

// NewCmdMcpStdioServer builds the stdio server command.
Expand Down Expand Up @@ -69,6 +70,15 @@ func runStdioServer(configOpts *ServerOptions) error {
return fmt.Errorf("failed to create MCP server: %w", err)
}

ctx = mcpctx.WithSNCloudSession(ctx, mcpServer.SNCloudSession)
ctx = mcpctx.WithPulsarSession(ctx, mcpServer.PulsarSession)
ctx = mcpctx.WithKafkaSession(ctx, mcpServer.KafkaSession)
if configOpts.KeyFile != "" && configOpts.PulsarInstance != "" && configOpts.PulsarCluster != "" {
if err := mcpctx.SetContext(ctx, configOpts.Options, configOpts.PulsarInstance, configOpts.PulsarCluster); err != nil {
return fmt.Errorf("failed to set StreamNative Cloud context: %w", err)
}
}

stdioServer := server.NewStdioServer(mcpServer.MCPServer)
stdioServer.SetErrorLogger(stdLogger)

Expand Down
22 changes: 22 additions & 0 deletions pkg/config/apiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,28 @@ func NewSNCloudSessionFromOptions(options *Options) (*Session, error) {
return session, nil
}

// SetPulsarClusterContext updates the StreamNative Cloud cluster binding for the session.
func (s *Session) SetPulsarClusterContext(instance, cluster string) {
s.mutex.Lock()
defer s.mutex.Unlock()

s.Ctx.PulsarInstance = instance
s.Ctx.PulsarCluster = cluster
}

// ResetPulsarClusterContext clears the StreamNative Cloud cluster binding for the session.
func (s *Session) ResetPulsarClusterContext() {
s.SetPulsarClusterContext("", "")
}

// GetPulsarClusterContext returns the current StreamNative Cloud cluster binding.
func (s *Session) GetPulsarClusterContext() (string, string) {
s.mutex.RLock()
defer s.mutex.RUnlock()

return s.Ctx.PulsarInstance, s.Ctx.PulsarCluster
}

// initializeTokenRefresher initializes the token refresher for the session
func (s *Session) initializeTokenRefresher() error {
s.mutex.Lock()
Expand Down
46 changes: 40 additions & 6 deletions pkg/kafka/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ func saslOpt(config *SASLConfig, opts []kgo.Opt) ([]kgo.Opt, error) {

// SetKafkaContext initializes Kafka clients using the provided context.
func (s *Session) SetKafkaContext(ctx KafkaContext) error {
s.mutex.Lock()
defer s.mutex.Unlock()

s.Ctx = ctx
kc := &s.Ctx
var err error
Expand Down Expand Up @@ -209,11 +212,32 @@ func (s *Session) SetKafkaContext(ctx KafkaContext) error {
return nil
}

// ResetKafkaContext clears the current Kafka context and closes the data client.
func (s *Session) ResetKafkaContext() {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.Client != nil {
s.Client.Close()
}

s.Ctx = KafkaContext{}
s.Client = nil
s.AdminClient = nil
s.SchemaRegistryClient = nil
s.ConnectClient = nil
s.Options = nil
}

// GetClient returns a Kafka client with optional overrides.
func (s *Session) GetClient(opts ...kgo.Opt) (*kgo.Client, error) {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.Ctx.BootstrapServers == "" {
return nil, fmt.Errorf("err: ContextNotSetErr: Please set the cluster context first")
}

if len(opts) > 0 {
//nolint:gocritic
clientOpts := append(s.Options, opts...)
Expand All @@ -240,6 +264,10 @@ func (s *Session) GetAdminClient() (*kadm.Client, error) {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.Ctx.BootstrapServers == "" {
return nil, fmt.Errorf("err: ContextNotSetErr: Please set the cluster context first")
}

if s.AdminClient == nil {
if s.Client == nil {
var err error
Expand All @@ -256,13 +284,16 @@ func (s *Session) GetAdminClient() (*kadm.Client, error) {

// GetSchemaRegistryClient returns the schema registry client.
func (s *Session) GetSchemaRegistryClient() (*sr.Client, error) {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.Ctx.BootstrapServers == "" {
return nil, fmt.Errorf("err: ContextNotSetErr: Please set the cluster context first")
}
if s.Ctx.SchemaRegistryURL == "" {
return nil, fmt.Errorf("schema registry not enabled on the current context")
}

s.mutex.Lock()
defer s.mutex.Unlock()

if s.SchemaRegistryClient == nil {
SrOpts := []sr.ClientOpt{}
SrOpts = append(SrOpts, sr.URLs(s.Ctx.SchemaRegistryURL))
Expand All @@ -285,13 +316,16 @@ func (s *Session) GetSchemaRegistryClient() (*sr.Client, error) {

// GetConnectClient returns the Kafka Connect client.
func (s *Session) GetConnectClient() (Connect, error) {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.Ctx.BootstrapServers == "" {
return nil, fmt.Errorf("err: ContextNotSetErr: Please set the cluster context first")
}
if s.Ctx.ConnectURL == "" {
return nil, fmt.Errorf("kafka connect not enabled on the current context")
}

s.mutex.Lock()
defer s.mutex.Unlock()

if s.ConnectClient == nil {
var err error
s.ConnectClient, err = NewConnect(&s.Ctx)
Expand Down
61 changes: 61 additions & 0 deletions pkg/kafka/connection_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2025 StreamNative
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"testing"

"github.com/stretchr/testify/require"
)

const contextNotSetErr = "err: ContextNotSetErr: Please set the cluster context first"

func TestSessionGetKafkaClientsRequireContext(t *testing.T) {
session := &Session{}

_, err := session.GetClient()
require.EqualError(t, err, contextNotSetErr)

_, err = session.GetAdminClient()
require.EqualError(t, err, contextNotSetErr)

_, err = session.GetSchemaRegistryClient()
require.EqualError(t, err, contextNotSetErr)

_, err = session.GetConnectClient()
require.EqualError(t, err, contextNotSetErr)
}

func TestSessionResetKafkaContextClearsContextAndClients(t *testing.T) {
session := &Session{
Ctx: KafkaContext{
BootstrapServers: "localhost:9092",
SchemaRegistryURL: "http://localhost:8081",
ConnectURL: "http://localhost:8083",
},
}

session.ResetKafkaContext()

require.Equal(t, KafkaContext{}, session.Ctx)
require.Nil(t, session.Client)
require.Nil(t, session.AdminClient)
require.Nil(t, session.SchemaRegistryClient)
require.Nil(t, session.ConnectClient)
require.Nil(t, session.Options)

_, err := session.GetAdminClient()
require.EqualError(t, err, contextNotSetErr)
}
3 changes: 2 additions & 1 deletion pkg/mcp/pulsar_functions_as_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func (s *Server) PulsarFunctionManagedMcpTools(readOnly bool, features []string,
// For example: stop the manager, send alerts, implement backoff strategies
}

if s.SNCloudSession.Ctx.Organization == "" || s.SNCloudSession.Ctx.PulsarInstance == "" || s.SNCloudSession.Ctx.PulsarCluster == "" {
instance, cluster := s.SNCloudSession.GetPulsarClusterContext()
if s.SNCloudSession.Ctx.Organization == "" || instance == "" || cluster == "" {
log.Printf("Skipping Pulsar Functions as MCP Tools because both organization, pulsar instance and pulsar cluster are not set")
return
}
Expand Down
15 changes: 14 additions & 1 deletion pkg/mcp/sncontext_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,13 @@ func RegisterContextTools(s *server.MCPServer, features []string, skipContextToo
mcp.Description("The name of the pulsar cluster to use"),
),
)
// Skip registering context tools if context is already provided
resetContextTool := mcp.NewTool("sncloud_context_reset",
mcp.WithDescription("Reset the current StreamNative Cloud cluster context. After reset, the session has no bound Pulsar or Kafka cluster connection; use `sncloud_context_use_cluster` before calling cluster-specific tools again."),
)
// Skip registering context mutation tools if context is already provided
if !skipContextTools {
s.AddTool(setContextTool, handleSetContext)
s.AddTool(resetContextTool, handleResetContext)
}

// Add available-contexts tool
Expand Down Expand Up @@ -112,6 +116,15 @@ func handleSetContext(ctx context.Context, request mcp.CallToolRequest) (*mcp.Ca
return mcp.NewToolResultText("StreamNative Cloud context set successfully"), nil
}

// handleResetContext handles the reset-context tool request
func handleResetContext(ctx context.Context, _ mcp.CallToolRequest) (*mcp.CallToolResult, error) {
if err := ResetContext(ctx); err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to reset context: %v", err)), nil
}

return mcp.NewToolResultText("StreamNative Cloud context reset successfully"), nil
}

func handleAvailableContexts(ctx context.Context, _ mcp.CallToolRequest) (*mcp.CallToolResult, error) {
promptResponse, err := buildSNCloudContextClusterPromptResult(ctx)
if err != nil || promptResponse == nil {
Expand Down
75 changes: 75 additions & 0 deletions pkg/mcp/sncontext_tools_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2025 StreamNative
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mcp

import (
"context"
"testing"

mcpgo "github.com/mark3labs/mcp-go/mcp"
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/streamnative-mcp-server/pkg/config"
"github.com/streamnative/streamnative-mcp-server/pkg/kafka"
pulsarsession "github.com/streamnative/streamnative-mcp-server/pkg/pulsar"
"github.com/stretchr/testify/require"
)

func TestHandleResetContextClearsSNCloudClusterSessions(t *testing.T) {
snSession, err := config.NewSNCloudSession(config.SNCloudContext{
JWTToken: "token",
APIURL: "https://api.example.com",
LogAPIURL: "https://logs.example.com",
Organization: "org",
PulsarInstance: "instance-a",
PulsarCluster: "cluster-a",
})
require.NoError(t, err)

pulsarSession := &pulsarsession.Session{
Ctx: pulsarsession.PulsarContext{
ServiceURL: "pulsar://pulsar.example.com:6650",
WebServiceURL: "https://pulsar.example.com",
Token: "token",
},
PulsarCtlConfig: &cmdutils.ClusterConfig{WebServiceURL: "https://pulsar.example.com"},
}
kafkaSession := &kafka.Session{
Ctx: kafka.KafkaContext{
BootstrapServers: "kafka.example.com:9093",
SchemaRegistryURL: "https://kafka.example.com/kafka",
ConnectURL: "https://api.example.com/admin/kafkaconnect/",
},
}

ctx := context.Background()
ctx = WithSNCloudSession(ctx, snSession)
ctx = WithPulsarSession(ctx, pulsarSession)
ctx = WithKafkaSession(ctx, kafkaSession)

result, err := handleResetContext(ctx, mcpgo.CallToolRequest{})
require.NoError(t, err)
require.False(t, result.IsError)

require.Empty(t, snSession.Ctx.PulsarInstance)
require.Empty(t, snSession.Ctx.PulsarCluster)
require.Equal(t, pulsarsession.PulsarContext{}, pulsarSession.Ctx)
require.Equal(t, kafka.KafkaContext{}, kafkaSession.Ctx)

_, err = pulsarSession.GetAdminClient()
require.EqualError(t, err, "err: ContextNotSetErr: Please set the cluster context first")

_, err = kafkaSession.GetAdminClient()
require.EqualError(t, err, "err: ContextNotSetErr: Please set the cluster context first")
}
Loading
Loading