diff --git a/README.md b/README.md index 893be1b..ee1e057 100644 --- a/README.md +++ b/README.md @@ -97,7 +97,7 @@ If you want to access to your StreamNative Cloud, you will need to have followin bin/snmcp stdio --organization my-org --key-file /path/to/key-file.json # Start MCP server with StreamNative Cloud authentication and pre-configured context -# When --pulsar-instance and --pulsar-cluster are provided, context management tools are disabled +# When --pulsar-instance and --pulsar-cluster are provided, context mutation tools are disabled bin/snmcp stdio --organization my-org --key-file /path/to/key-file.json --pulsar-instance my-instance --pulsar-cluster my-cluster # Start MCP server with external Kafka @@ -118,7 +118,7 @@ docker run -i --rm -e SNMCP_ORGANIZATION=my-org -e SNMCP_KEY_FILE=/key.json -v / bin/snmcp sse --http-addr :9090 --http-path /mcp --organization my-org --key-file /path/to/key-file.json # Start MCP server with SSE and pre-configured StreamNative Cloud context -# When --pulsar-instance and --pulsar-cluster are provided, context management tools are disabled +# When --pulsar-instance and --pulsar-cluster are provided, context mutation tools are disabled bin/snmcp sse --http-addr :9090 --http-path /mcp --organization my-org --key-file /path/to/key-file.json --pulsar-instance my-instance --pulsar-cluster my-cluster # Start MCP server with SSE and external Kafka @@ -292,7 +292,7 @@ Pulsar admin feature gates also register read-only MCP resources for the matchin | `streamnative-cloud`| Manage StreamNative Cloud context and check resource logs | [streamnative_cloud.md](docs/tools/streamnative_cloud.md) | | `functions-as-tools` | Dynamically exposes deployed Pulsar Functions as invokable MCP tools, with automatic input/output schema handling. | [functions_as_tools.md](docs/tools/functions_as_tools.md) | -> **Note:** When using `--pulsar-instance` and `--pulsar-cluster` flags together, context management tools (`sncloud_context_use_cluster`) are automatically disabled since the context is pre-configured. +> **Note:** When using `--pulsar-instance` and `--pulsar-cluster` flags together, context mutation tools (`sncloud_context_use_cluster`, `sncloud_context_reset`) are automatically disabled since the context is pre-configured. You can combine these features as needed using the `--features` flag. For example, to enable only Pulsar client features: ```bash diff --git a/docs/tools/streamnative_cloud.md b/docs/tools/streamnative_cloud.md index a56d2a7..718f365 100644 --- a/docs/tools/streamnative_cloud.md +++ b/docs/tools/streamnative_cloud.md @@ -21,6 +21,15 @@ If you encounter `ContextNotSetErr`, use `sncloud_context_available_clusters` to --- +#### sncloud_context_reset + +Reset the current StreamNative Cloud cluster context. After reset, the session has no bound Pulsar or Kafka cluster connection, and cluster-specific tools will return `ContextNotSetErr` until `sncloud_context_use_cluster` is used again. + +- **sncloud_context_reset** + - No parameters required + +--- + #### sncloud_context_whoami Display the currently logged-in service account. Returns the name of the authenticated service account and the organization. diff --git a/pkg/cmd/mcp/stdio.go b/pkg/cmd/mcp/stdio.go index e13232e..c029231 100644 --- a/pkg/cmd/mcp/stdio.go +++ b/pkg/cmd/mcp/stdio.go @@ -30,6 +30,7 @@ import ( "github.com/spf13/cobra" "github.com/streamnative/streamnative-mcp-server/pkg/common" "github.com/streamnative/streamnative-mcp-server/pkg/log" + mcpctx "github.com/streamnative/streamnative-mcp-server/pkg/mcp" ) // NewCmdMcpStdioServer builds the stdio server command. @@ -69,6 +70,15 @@ func runStdioServer(configOpts *ServerOptions) error { return fmt.Errorf("failed to create MCP server: %w", err) } + ctx = mcpctx.WithSNCloudSession(ctx, mcpServer.SNCloudSession) + ctx = mcpctx.WithPulsarSession(ctx, mcpServer.PulsarSession) + ctx = mcpctx.WithKafkaSession(ctx, mcpServer.KafkaSession) + if configOpts.KeyFile != "" && configOpts.PulsarInstance != "" && configOpts.PulsarCluster != "" { + if err := mcpctx.SetContext(ctx, configOpts.Options, configOpts.PulsarInstance, configOpts.PulsarCluster); err != nil { + return fmt.Errorf("failed to set StreamNative Cloud context: %w", err) + } + } + stdioServer := server.NewStdioServer(mcpServer.MCPServer) stdioServer.SetErrorLogger(stdLogger) diff --git a/pkg/config/apiclient.go b/pkg/config/apiclient.go index 8393fc8..5d6604f 100644 --- a/pkg/config/apiclient.go +++ b/pkg/config/apiclient.go @@ -155,6 +155,28 @@ func NewSNCloudSessionFromOptions(options *Options) (*Session, error) { return session, nil } +// SetPulsarClusterContext updates the StreamNative Cloud cluster binding for the session. +func (s *Session) SetPulsarClusterContext(instance, cluster string) { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.Ctx.PulsarInstance = instance + s.Ctx.PulsarCluster = cluster +} + +// ResetPulsarClusterContext clears the StreamNative Cloud cluster binding for the session. +func (s *Session) ResetPulsarClusterContext() { + s.SetPulsarClusterContext("", "") +} + +// GetPulsarClusterContext returns the current StreamNative Cloud cluster binding. +func (s *Session) GetPulsarClusterContext() (string, string) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + return s.Ctx.PulsarInstance, s.Ctx.PulsarCluster +} + // initializeTokenRefresher initializes the token refresher for the session func (s *Session) initializeTokenRefresher() error { s.mutex.Lock() diff --git a/pkg/kafka/connection.go b/pkg/kafka/connection.go index 380a4d2..f046aea 100644 --- a/pkg/kafka/connection.go +++ b/pkg/kafka/connection.go @@ -149,6 +149,9 @@ func saslOpt(config *SASLConfig, opts []kgo.Opt) ([]kgo.Opt, error) { // SetKafkaContext initializes Kafka clients using the provided context. func (s *Session) SetKafkaContext(ctx KafkaContext) error { + s.mutex.Lock() + defer s.mutex.Unlock() + s.Ctx = ctx kc := &s.Ctx var err error @@ -209,11 +212,32 @@ func (s *Session) SetKafkaContext(ctx KafkaContext) error { return nil } +// ResetKafkaContext clears the current Kafka context and closes the data client. +func (s *Session) ResetKafkaContext() { + s.mutex.Lock() + defer s.mutex.Unlock() + + if s.Client != nil { + s.Client.Close() + } + + s.Ctx = KafkaContext{} + s.Client = nil + s.AdminClient = nil + s.SchemaRegistryClient = nil + s.ConnectClient = nil + s.Options = nil +} + // GetClient returns a Kafka client with optional overrides. func (s *Session) GetClient(opts ...kgo.Opt) (*kgo.Client, error) { s.mutex.Lock() defer s.mutex.Unlock() + if s.Ctx.BootstrapServers == "" { + return nil, fmt.Errorf("err: ContextNotSetErr: Please set the cluster context first") + } + if len(opts) > 0 { //nolint:gocritic clientOpts := append(s.Options, opts...) @@ -240,6 +264,10 @@ func (s *Session) GetAdminClient() (*kadm.Client, error) { s.mutex.Lock() defer s.mutex.Unlock() + if s.Ctx.BootstrapServers == "" { + return nil, fmt.Errorf("err: ContextNotSetErr: Please set the cluster context first") + } + if s.AdminClient == nil { if s.Client == nil { var err error @@ -256,13 +284,16 @@ func (s *Session) GetAdminClient() (*kadm.Client, error) { // GetSchemaRegistryClient returns the schema registry client. func (s *Session) GetSchemaRegistryClient() (*sr.Client, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + + if s.Ctx.BootstrapServers == "" { + return nil, fmt.Errorf("err: ContextNotSetErr: Please set the cluster context first") + } if s.Ctx.SchemaRegistryURL == "" { return nil, fmt.Errorf("schema registry not enabled on the current context") } - s.mutex.Lock() - defer s.mutex.Unlock() - if s.SchemaRegistryClient == nil { SrOpts := []sr.ClientOpt{} SrOpts = append(SrOpts, sr.URLs(s.Ctx.SchemaRegistryURL)) @@ -285,13 +316,16 @@ func (s *Session) GetSchemaRegistryClient() (*sr.Client, error) { // GetConnectClient returns the Kafka Connect client. func (s *Session) GetConnectClient() (Connect, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + + if s.Ctx.BootstrapServers == "" { + return nil, fmt.Errorf("err: ContextNotSetErr: Please set the cluster context first") + } if s.Ctx.ConnectURL == "" { return nil, fmt.Errorf("kafka connect not enabled on the current context") } - s.mutex.Lock() - defer s.mutex.Unlock() - if s.ConnectClient == nil { var err error s.ConnectClient, err = NewConnect(&s.Ctx) diff --git a/pkg/kafka/connection_test.go b/pkg/kafka/connection_test.go new file mode 100644 index 0000000..2975903 --- /dev/null +++ b/pkg/kafka/connection_test.go @@ -0,0 +1,61 @@ +// Copyright 2025 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +const contextNotSetErr = "err: ContextNotSetErr: Please set the cluster context first" + +func TestSessionGetKafkaClientsRequireContext(t *testing.T) { + session := &Session{} + + _, err := session.GetClient() + require.EqualError(t, err, contextNotSetErr) + + _, err = session.GetAdminClient() + require.EqualError(t, err, contextNotSetErr) + + _, err = session.GetSchemaRegistryClient() + require.EqualError(t, err, contextNotSetErr) + + _, err = session.GetConnectClient() + require.EqualError(t, err, contextNotSetErr) +} + +func TestSessionResetKafkaContextClearsContextAndClients(t *testing.T) { + session := &Session{ + Ctx: KafkaContext{ + BootstrapServers: "localhost:9092", + SchemaRegistryURL: "http://localhost:8081", + ConnectURL: "http://localhost:8083", + }, + } + + session.ResetKafkaContext() + + require.Equal(t, KafkaContext{}, session.Ctx) + require.Nil(t, session.Client) + require.Nil(t, session.AdminClient) + require.Nil(t, session.SchemaRegistryClient) + require.Nil(t, session.ConnectClient) + require.Nil(t, session.Options) + + _, err := session.GetAdminClient() + require.EqualError(t, err, contextNotSetErr) +} diff --git a/pkg/mcp/pulsar_functions_as_tools.go b/pkg/mcp/pulsar_functions_as_tools.go index 1c76afb..36635ce 100644 --- a/pkg/mcp/pulsar_functions_as_tools.go +++ b/pkg/mcp/pulsar_functions_as_tools.go @@ -73,7 +73,8 @@ func (s *Server) PulsarFunctionManagedMcpTools(readOnly bool, features []string, // For example: stop the manager, send alerts, implement backoff strategies } - if s.SNCloudSession.Ctx.Organization == "" || s.SNCloudSession.Ctx.PulsarInstance == "" || s.SNCloudSession.Ctx.PulsarCluster == "" { + instance, cluster := s.SNCloudSession.GetPulsarClusterContext() + if s.SNCloudSession.Ctx.Organization == "" || instance == "" || cluster == "" { log.Printf("Skipping Pulsar Functions as MCP Tools because both organization, pulsar instance and pulsar cluster are not set") return } diff --git a/pkg/mcp/sncontext_tools.go b/pkg/mcp/sncontext_tools.go index cdf0016..27104c0 100644 --- a/pkg/mcp/sncontext_tools.go +++ b/pkg/mcp/sncontext_tools.go @@ -49,9 +49,13 @@ func RegisterContextTools(s *server.MCPServer, features []string, skipContextToo mcp.Description("The name of the pulsar cluster to use"), ), ) - // Skip registering context tools if context is already provided + resetContextTool := mcp.NewTool("sncloud_context_reset", + mcp.WithDescription("Reset the current StreamNative Cloud cluster context. After reset, the session has no bound Pulsar or Kafka cluster connection; use `sncloud_context_use_cluster` before calling cluster-specific tools again."), + ) + // Skip registering context mutation tools if context is already provided if !skipContextTools { s.AddTool(setContextTool, handleSetContext) + s.AddTool(resetContextTool, handleResetContext) } // Add available-contexts tool @@ -112,6 +116,15 @@ func handleSetContext(ctx context.Context, request mcp.CallToolRequest) (*mcp.Ca return mcp.NewToolResultText("StreamNative Cloud context set successfully"), nil } +// handleResetContext handles the reset-context tool request +func handleResetContext(ctx context.Context, _ mcp.CallToolRequest) (*mcp.CallToolResult, error) { + if err := ResetContext(ctx); err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to reset context: %v", err)), nil + } + + return mcp.NewToolResultText("StreamNative Cloud context reset successfully"), nil +} + func handleAvailableContexts(ctx context.Context, _ mcp.CallToolRequest) (*mcp.CallToolResult, error) { promptResponse, err := buildSNCloudContextClusterPromptResult(ctx) if err != nil || promptResponse == nil { diff --git a/pkg/mcp/sncontext_tools_test.go b/pkg/mcp/sncontext_tools_test.go new file mode 100644 index 0000000..9ff05d6 --- /dev/null +++ b/pkg/mcp/sncontext_tools_test.go @@ -0,0 +1,75 @@ +// Copyright 2025 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mcp + +import ( + "context" + "testing" + + mcpgo "github.com/mark3labs/mcp-go/mcp" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/streamnative-mcp-server/pkg/config" + "github.com/streamnative/streamnative-mcp-server/pkg/kafka" + pulsarsession "github.com/streamnative/streamnative-mcp-server/pkg/pulsar" + "github.com/stretchr/testify/require" +) + +func TestHandleResetContextClearsSNCloudClusterSessions(t *testing.T) { + snSession, err := config.NewSNCloudSession(config.SNCloudContext{ + JWTToken: "token", + APIURL: "https://api.example.com", + LogAPIURL: "https://logs.example.com", + Organization: "org", + PulsarInstance: "instance-a", + PulsarCluster: "cluster-a", + }) + require.NoError(t, err) + + pulsarSession := &pulsarsession.Session{ + Ctx: pulsarsession.PulsarContext{ + ServiceURL: "pulsar://pulsar.example.com:6650", + WebServiceURL: "https://pulsar.example.com", + Token: "token", + }, + PulsarCtlConfig: &cmdutils.ClusterConfig{WebServiceURL: "https://pulsar.example.com"}, + } + kafkaSession := &kafka.Session{ + Ctx: kafka.KafkaContext{ + BootstrapServers: "kafka.example.com:9093", + SchemaRegistryURL: "https://kafka.example.com/kafka", + ConnectURL: "https://api.example.com/admin/kafkaconnect/", + }, + } + + ctx := context.Background() + ctx = WithSNCloudSession(ctx, snSession) + ctx = WithPulsarSession(ctx, pulsarSession) + ctx = WithKafkaSession(ctx, kafkaSession) + + result, err := handleResetContext(ctx, mcpgo.CallToolRequest{}) + require.NoError(t, err) + require.False(t, result.IsError) + + require.Empty(t, snSession.Ctx.PulsarInstance) + require.Empty(t, snSession.Ctx.PulsarCluster) + require.Equal(t, pulsarsession.PulsarContext{}, pulsarSession.Ctx) + require.Equal(t, kafka.KafkaContext{}, kafkaSession.Ctx) + + _, err = pulsarSession.GetAdminClient() + require.EqualError(t, err, "err: ContextNotSetErr: Please set the cluster context first") + + _, err = kafkaSession.GetAdminClient() + require.EqualError(t, err, "err: ContextNotSetErr: Please set the cluster context first") +} diff --git a/pkg/mcp/sncontext_utils.go b/pkg/mcp/sncontext_utils.go index 6865a70..9ece5ef 100644 --- a/pkg/mcp/sncontext_utils.go +++ b/pkg/mcp/sncontext_utils.go @@ -192,9 +192,35 @@ func SetContext(ctx context.Context, options *config.Options, instanceName, clus return fmt.Errorf("failed to change kafka context: %v", err) } + session.SetPulsarClusterContext(instanceName, clusterName) + // TODO: check if need to set log client // if issuer != nil && options.AuthOptions.Store != nil { // } return nil } + +// ResetContext clears StreamNative Cloud cluster bindings and protocol sessions. +func ResetContext(ctx context.Context) error { + session := context2.GetSNCloudSession(ctx) + if session == nil { + return fmt.Errorf("failed to get StreamNative Cloud session") + } + + psession := context2.GetPulsarSession(ctx) + if psession == nil { + return fmt.Errorf("failed to get pulsar session") + } + + ksession := context2.GetKafkaSession(ctx) + if ksession == nil { + return fmt.Errorf("failed to get kafka session") + } + + psession.ResetPulsarContext() + ksession.ResetKafkaContext() + session.ResetPulsarClusterContext() + + return nil +} diff --git a/pkg/mcp/streamnative_resources_log_tools.go b/pkg/mcp/streamnative_resources_log_tools.go index 81e0a2d..37fbc88 100644 --- a/pkg/mcp/streamnative_resources_log_tools.go +++ b/pkg/mcp/streamnative_resources_log_tools.go @@ -123,7 +123,8 @@ func HandleSNCloudLogs(ctx context.Context, request mcp.CallToolRequest) (*mcp.C if session == nil { return nil, fmt.Errorf("failed to get StreamNative Cloud session") } - instance, cluster, organization := session.Ctx.PulsarInstance, session.Ctx.PulsarCluster, session.Ctx.Organization + instance, cluster := session.GetPulsarClusterContext() + organization := session.Ctx.Organization if instance == "" || cluster == "" || organization == "" { return mcp.NewToolResultError("No context is set, please use `sncloud_context_use_cluster` to set the context first."), nil } diff --git a/pkg/pulsar/connection.go b/pkg/pulsar/connection.go index 99df993..55d5379 100644 --- a/pkg/pulsar/connection.go +++ b/pkg/pulsar/connection.go @@ -170,12 +170,30 @@ func (s *Session) SetPulsarContext(ctx PulsarContext) error { return nil } +// ResetPulsarContext clears the current Pulsar context and closes the data client. +func (s *Session) ResetPulsarContext() { + s.mutex.Lock() + defer s.mutex.Unlock() + + if s.Client != nil { + s.Client.Close() + } + + s.Ctx = PulsarContext{} + s.Client = nil + s.AdminClient = nil + s.AdminV3Client = nil + s.ClientOptions = pulsar.ClientOptions{} + s.PulsarCtlConfig = nil + s.adminStatusREST = nil +} + // GetAdminClient returns the Pulsar admin v2 client. func (s *Session) GetAdminClient() (cmdutils.Client, error) { s.mutex.RLock() defer s.mutex.RUnlock() - if s.PulsarCtlConfig.WebServiceURL == "" { + if s.PulsarCtlConfig == nil || s.PulsarCtlConfig.WebServiceURL == "" { return nil, fmt.Errorf("err: ContextNotSetErr: Please set the cluster context first") } return s.AdminClient, nil @@ -186,7 +204,7 @@ func (s *Session) GetAdminV3Client() (cmdutils.Client, error) { s.mutex.RLock() defer s.mutex.RUnlock() - if s.PulsarCtlConfig.WebServiceURL == "" { + if s.PulsarCtlConfig == nil || s.PulsarCtlConfig.WebServiceURL == "" { return nil, fmt.Errorf("err: ContextNotSetErr: Please set the cluster context first") } return s.AdminV3Client, nil diff --git a/pkg/pulsar/connection_test.go b/pkg/pulsar/connection_test.go index 5b8965d..d0a5a2a 100644 --- a/pkg/pulsar/connection_test.go +++ b/pkg/pulsar/connection_test.go @@ -17,20 +17,23 @@ package pulsar import ( "testing" + pulsarclient "github.com/apache/pulsar-client-go/pulsar" "github.com/streamnative/pulsarctl/pkg/cmdutils" "github.com/stretchr/testify/require" ) +const contextNotSetErr = "err: ContextNotSetErr: Please set the cluster context first" + func TestSessionGetPulsarCtlConfigRequiresWebServiceURL(t *testing.T) { session := &Session{} _, err := session.GetPulsarCtlConfig() - require.EqualError(t, err, "err: ContextNotSetErr: Please set the cluster context first") + require.EqualError(t, err, contextNotSetErr) session.PulsarCtlConfig = &cmdutils.ClusterConfig{} _, err = session.GetPulsarCtlConfig() - require.EqualError(t, err, "err: ContextNotSetErr: Please set the cluster context first") + require.EqualError(t, err, contextNotSetErr) } func TestSessionGetPulsarCtlConfigReturnsCopy(t *testing.T) { @@ -53,12 +56,12 @@ func TestSessionGetAdminStatusClientRequiresWebServiceURL(t *testing.T) { session := &Session{} _, err := session.GetAdminStatusClient() - require.EqualError(t, err, "err: ContextNotSetErr: Please set the cluster context first") + require.EqualError(t, err, contextNotSetErr) session.PulsarCtlConfig = &cmdutils.ClusterConfig{} _, err = session.GetAdminStatusClient() - require.EqualError(t, err, "err: ContextNotSetErr: Please set the cluster context first") + require.EqualError(t, err, contextNotSetErr) } func TestSessionGetAdminStatusClientReturnsCachedClient(t *testing.T) { @@ -77,6 +80,54 @@ func TestSessionGetAdminStatusClientReturnsCachedClient(t *testing.T) { require.Same(t, firstClient, secondClient) } +func TestSessionGetAdminClientsRequireWebServiceURL(t *testing.T) { + session := &Session{} + + _, err := session.GetAdminClient() + require.EqualError(t, err, contextNotSetErr) + + _, err = session.GetAdminV3Client() + require.EqualError(t, err, contextNotSetErr) + + session.PulsarCtlConfig = &cmdutils.ClusterConfig{} + + _, err = session.GetAdminClient() + require.EqualError(t, err, contextNotSetErr) + + _, err = session.GetAdminV3Client() + require.EqualError(t, err, contextNotSetErr) +} + +func TestSessionResetPulsarContextClearsContextAndClients(t *testing.T) { + session := &Session{ + Ctx: PulsarContext{ + ServiceURL: "pulsar://pulsar.example.com:6650", + WebServiceURL: "http://pulsar.example.com:8080", + Token: "token", + }, + PulsarCtlConfig: &cmdutils.ClusterConfig{ + WebServiceURL: "http://pulsar.example.com:8080", + }, + ClientOptions: pulsarclient.ClientOptions{URL: "pulsar://pulsar.example.com:6650"}, + } + + session.ResetPulsarContext() + + require.Equal(t, PulsarContext{}, session.Ctx) + require.Nil(t, session.Client) + require.Nil(t, session.AdminClient) + require.Nil(t, session.AdminV3Client) + require.Equal(t, pulsarclient.ClientOptions{}, session.ClientOptions) + require.Nil(t, session.PulsarCtlConfig) + require.Nil(t, session.adminStatusREST) + + _, err := session.GetAdminClient() + require.EqualError(t, err, contextNotSetErr) + + _, err = session.GetPulsarClient() + require.EqualError(t, err, contextNotSetErr) +} + func TestSessionSetPulsarContextResetsAdminStatusClient(t *testing.T) { session := &Session{ PulsarCtlConfig: &cmdutils.ClusterConfig{