From e07c94f49d6eecc83ca77ed0b650c22f74c144b3 Mon Sep 17 00:00:00 2001 From: Ed Zynda Date: Thu, 9 Apr 2026 13:54:11 +0300 Subject: [PATCH] feat(mcp): add dynamic MCP server loading and unloading - Add AddServer/RemoveServer to MCPToolManager for runtime server management - Add RemoveConnection to MCPConnectionPool for per-server teardown - Add AddMCPServer/RemoveMCPServer/ListMCPServers to Agent and SDK Kit - Lazily create connection pool so AddServer works without prior LoadTools - Wire onToolsChanged callback to trigger agent tool list rebuild - Make MCPToolManager.Close nil-safe when pool was never initialized Tests: - Integration tests with real stdio MCP server (Python echo server) - Agent-level tests using mock LLM model (no API key needed) - Unit tests for error paths, callbacks, idempotency, nil safety - SDK type surface tests --- internal/agent/agent.go | 53 +++ internal/agent/agent_mcp_test.go | 242 +++++++++++++ internal/tools/connection_pool.go | 21 ++ internal/tools/mcp.go | 128 +++++++ .../tools/mcp_dynamic_integration_test.go | 323 ++++++++++++++++++ internal/tools/mcp_dynamic_test.go | 155 +++++++++ internal/tools/testdata/echo_server.py | 111 ++++++ pkg/kit/kit.go | 73 ++++ pkg/kit/mcp_dynamic_test.go | 56 +++ 9 files changed, 1162 insertions(+) create mode 100644 internal/agent/agent_mcp_test.go create mode 100644 internal/tools/mcp_dynamic_integration_test.go create mode 100644 internal/tools/mcp_dynamic_test.go create mode 100755 internal/tools/testdata/echo_server.py create mode 100644 pkg/kit/mcp_dynamic_test.go diff --git a/internal/agent/agent.go b/internal/agent/agent.go index ac29e785..09b6d39d 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -834,6 +834,59 @@ func (a *Agent) SetExtraTools(extraTools []fantasy.AgentTool) { a.rebuildFantasyAgent() } +// AddMCPServer connects to a new MCP server at runtime and makes its tools +// available to the agent. Returns the number of tools loaded. +// If the agent has no tool manager (no MCP servers were configured at init), +// one is created automatically. +func (a *Agent) AddMCPServer(ctx context.Context, name string, cfg config.MCPServerConfig) (int, error) { + // Ensure MCP tools from initial load are settled first. + a.ensureMCPTools() + + if a.toolManager == nil { + a.toolManager = tools.NewMCPToolManager() + a.toolManager.SetModel(a.model) + a.toolManager.SetOnToolsChanged(func() { + a.rebuildFantasyAgent() + }) + } + + count, err := a.toolManager.AddServer(ctx, name, cfg) + if err != nil { + return 0, err + } + + // AddServer's onToolsChanged callback triggers rebuildFantasyAgent, + // but only if it was wired. Ensure rebuild happens regardless. + a.rebuildFantasyAgent() + return count, nil +} + +// RemoveMCPServer disconnects an MCP server and removes its tools from the agent. +func (a *Agent) RemoveMCPServer(name string) error { + if a.toolManager == nil { + return fmt.Errorf("no MCP servers loaded") + } + + // Ensure MCP tools from initial load are settled first. + a.ensureMCPTools() + + err := a.toolManager.RemoveServer(name) + if err != nil { + return err + } + + // RemoveServer's onToolsChanged callback triggers rebuildFantasyAgent, + // but ensure rebuild happens regardless. + a.rebuildFantasyAgent() + return nil +} + +// GetMCPToolManager returns the underlying MCP tool manager. +// Returns nil if no MCP servers have been configured. +func (a *Agent) GetMCPToolManager() *tools.MCPToolManager { + return a.toolManager +} + // GetLoadingMessage returns the loading message from provider creation. func (a *Agent) GetLoadingMessage() string { return a.loadingMessage diff --git a/internal/agent/agent_mcp_test.go b/internal/agent/agent_mcp_test.go new file mode 100644 index 00000000..d4bd803e --- /dev/null +++ b/internal/agent/agent_mcp_test.go @@ -0,0 +1,242 @@ +package agent + +import ( + "context" + "os" + "path/filepath" + "runtime" + "strings" + "testing" + "time" + + "charm.land/fantasy" + + "github.com/mark3labs/kit/internal/config" +) + +// mockModel is a minimal LanguageModel that satisfies the interface +// without making real API calls. Used to test tool management wiring. +type mockModel struct{} + +func (m *mockModel) Generate(_ context.Context, _ fantasy.Call) (*fantasy.Response, error) { + return &fantasy.Response{}, nil +} +func (m *mockModel) Stream(_ context.Context, _ fantasy.Call) (fantasy.StreamResponse, error) { + return nil, nil +} +func (m *mockModel) GenerateObject(_ context.Context, _ fantasy.ObjectCall) (*fantasy.ObjectResponse, error) { + return &fantasy.ObjectResponse{}, nil +} +func (m *mockModel) StreamObject(_ context.Context, _ fantasy.ObjectCall) (fantasy.ObjectStreamResponse, error) { + return nil, nil +} +func (m *mockModel) Provider() string { return "mock" } +func (m *mockModel) Model() string { return "mock-model" } + +// testdataDir returns the absolute path to the tools testdata directory. +func testdataDir(t *testing.T) string { + t.Helper() + _, file, _, ok := runtime.Caller(0) + if !ok { + t.Fatal("cannot determine test file path") + } + return filepath.Join(filepath.Dir(file), "..", "tools", "testdata") +} + +// echoServerConfig returns an MCPServerConfig for the test echo MCP server. +func echoServerConfig(t *testing.T) config.MCPServerConfig { + t.Helper() + script := filepath.Join(testdataDir(t), "echo_server.py") + if _, err := os.Stat(script); err != nil { + t.Skipf("echo_server.py not found: %v", err) + } + return config.MCPServerConfig{ + Command: []string{"python3", script}, + } +} + +// newTestAgent creates a minimal Agent with a mock model and no core tools, +// suitable for testing MCP server management without an API key. +func newTestAgent() *Agent { + model := &mockModel{} + a := &Agent{ + model: model, + coreTools: nil, + extraTools: nil, + maxSteps: 10, + systemPrompt: "test", + fantasyAgent: fantasy.NewAgent(model), + } + return a +} + +func TestAgent_AddMCPServer(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + a := newTestAgent() + defer func() { _ = a.Close() }() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cfg := echoServerConfig(t) + + // Initially no MCP tools. + if a.GetMCPToolCount() != 0 { + t.Fatalf("Expected 0 MCP tools initially, got %d", a.GetMCPToolCount()) + } + + // Add a server. + count, err := a.AddMCPServer(ctx, "echo", cfg) + if err != nil { + t.Fatalf("AddMCPServer failed: %v", err) + } + if count != 2 { + t.Errorf("Expected 2 tools, got %d", count) + } + + // Verify tools are in the agent's tool list. + if a.GetMCPToolCount() != 2 { + t.Errorf("Expected 2 MCP tools, got %d", a.GetMCPToolCount()) + } + + allTools := a.GetTools() + toolNames := make(map[string]bool) + for _, tool := range allTools { + toolNames[tool.Info().Name] = true + } + if !toolNames["echo__echo"] { + t.Error("Expected tool 'echo__echo' in agent tools") + } + if !toolNames["echo__greet"] { + t.Error("Expected tool 'echo__greet' in agent tools") + } + + // Verify loaded server names. + names := a.GetLoadedServerNames() + found := false + for _, n := range names { + if n == "echo" { + found = true + } + } + if !found { + t.Errorf("Expected 'echo' in loaded server names: %v", names) + } +} + +func TestAgent_RemoveMCPServer(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + a := newTestAgent() + defer func() { _ = a.Close() }() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cfg := echoServerConfig(t) + + // Add then remove. + _, err := a.AddMCPServer(ctx, "echo", cfg) + if err != nil { + t.Fatalf("AddMCPServer failed: %v", err) + } + + err = a.RemoveMCPServer("echo") + if err != nil { + t.Fatalf("RemoveMCPServer failed: %v", err) + } + + // Verify tools removed. + if a.GetMCPToolCount() != 0 { + t.Errorf("Expected 0 MCP tools after removal, got %d", a.GetMCPToolCount()) + } + + // Verify agent's tool list has no MCP tools. + for _, tool := range a.GetTools() { + if strings.Contains(tool.Info().Name, "echo__") { + t.Errorf("Found leftover tool after removal: %s", tool.Info().Name) + } + } +} + +func TestAgent_RemoveMCPServer_NoToolManager(t *testing.T) { + a := newTestAgent() + defer func() { _ = a.Close() }() + + err := a.RemoveMCPServer("nonexistent") + if err == nil { + t.Fatal("Expected error when no tool manager exists") + } + if !strings.Contains(err.Error(), "no MCP servers loaded") { + t.Errorf("Expected 'no MCP servers loaded' error, got: %v", err) + } +} + +func TestAgent_AddMCPServer_CreatesToolManager(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + a := newTestAgent() + defer func() { _ = a.Close() }() + + // Initially no tool manager. + if a.GetMCPToolManager() != nil { + t.Fatal("Expected nil tool manager initially") + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cfg := echoServerConfig(t) + _, err := a.AddMCPServer(ctx, "echo", cfg) + if err != nil { + t.Fatalf("AddMCPServer failed: %v", err) + } + + // Tool manager should now exist. + if a.GetMCPToolManager() == nil { + t.Fatal("Expected tool manager to be created by AddMCPServer") + } +} + +func TestAgent_AddRemoveAdd_MCP(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + a := newTestAgent() + defer func() { _ = a.Close() }() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cfg := echoServerConfig(t) + + // Add → Remove → Add cycle. + _, err := a.AddMCPServer(ctx, "echo", cfg) + if err != nil { + t.Fatalf("First add failed: %v", err) + } + + err = a.RemoveMCPServer("echo") + if err != nil { + t.Fatalf("Remove failed: %v", err) + } + + count, err := a.AddMCPServer(ctx, "echo", cfg) + if err != nil { + t.Fatalf("Re-add failed: %v", err) + } + if count != 2 { + t.Errorf("Expected 2 tools on re-add, got %d", count) + } + if a.GetMCPToolCount() != 2 { + t.Errorf("Expected 2 MCP tools after re-add, got %d", a.GetMCPToolCount()) + } +} diff --git a/internal/tools/connection_pool.go b/internal/tools/connection_pool.go index 3869e25f..bfd45149 100644 --- a/internal/tools/connection_pool.go +++ b/internal/tools/connection_pool.go @@ -595,6 +595,27 @@ func (p *MCPConnectionPool) GetClients() map[string]client.MCPClient { return clients } +// RemoveConnection closes and removes a single connection from the pool. +// Returns an error if the connection does not exist or if closing fails. +// Thread-safe for concurrent use. +func (p *MCPConnectionPool) RemoveConnection(serverName string) error { + p.mu.Lock() + defer p.mu.Unlock() + + conn, exists := p.connections[serverName] + if !exists { + return fmt.Errorf("connection %q not found in pool", serverName) + } + + err := conn.client.Close() + delete(p.connections, serverName) + + if p.debugLogger != nil && p.debugLogger.IsDebugEnabled() { + p.debugLogger.LogDebug(fmt.Sprintf("[POOL] Removed connection %s", serverName)) + } + return err +} + // Close gracefully shuts down the connection pool, closing all client connections // and stopping the background health check goroutine. It attempts to close all // connections even if some fail, logging any errors encountered. diff --git a/internal/tools/mcp.go b/internal/tools/mcp.go index c1c3c9ac..c3db5157 100644 --- a/internal/tools/mcp.go +++ b/internal/tools/mcp.go @@ -34,6 +34,11 @@ type MCPToolManager struct { // onServerLoaded, if non-nil, is called when each server finishes loading. // Called with server name, tool count, and error (nil on success). onServerLoaded func(serverName string, toolCount int, err error) + + // onToolsChanged, if non-nil, is called after AddServer or RemoveServer + // mutates the tool list. The agent layer uses this to trigger a + // rebuildFantasyAgent so the LLM sees the updated tools. + onToolsChanged func() } // toolMapping stores the mapping between prefixed tool names and their original details @@ -96,6 +101,126 @@ func (m *MCPToolManager) SetOnServerLoaded(cb func(serverName string, toolCount m.onServerLoaded = cb } +// SetOnToolsChanged sets the callback that's invoked after AddServer or +// RemoveServer mutates the tool list. The agent layer uses this to trigger +// a rebuild of the fantasy agent so the LLM sees the updated tool set. +func (m *MCPToolManager) SetOnToolsChanged(cb func()) { + m.onToolsChanged = cb +} + +// AddServer connects to a new MCP server at runtime and loads its tools. +// The server's tools are immediately available to the agent after this call. +// Returns the number of tools loaded from the server. +// +// If the connection pool has not been initialised yet (i.e. LoadTools was never +// called), AddServer creates one automatically using the manager's current +// configuration. +// +// Returns an error if a server with the same name is already loaded, or if +// the connection or tool loading fails. +func (m *MCPToolManager) AddServer(ctx context.Context, name string, cfg config.MCPServerConfig) (int, error) { + m.mu.Lock() + // Check for duplicate. + if _, exists := m.toolMap[name+"__"]; exists { + m.mu.Unlock() + return 0, fmt.Errorf("MCP server %q is already loaded", name) + } + // More thorough duplicate check: scan toolMap for any key with the server prefix. + prefix := name + "__" + for k := range m.toolMap { + if len(k) >= len(prefix) && k[:len(prefix)] == prefix { + m.mu.Unlock() + return 0, fmt.Errorf("MCP server %q is already loaded", name) + } + } + m.mu.Unlock() + + // Lazily create the connection pool if LoadTools was never called. + m.ensureConnectionPool() + + count, err := m.loadServerTools(ctx, name, cfg) + if err != nil { + return 0, fmt.Errorf("failed to add MCP server %q: %w", name, err) + } + + // Notify listeners. + if m.onServerLoaded != nil { + m.onServerLoaded(name, count, nil) + } + if m.onToolsChanged != nil { + m.onToolsChanged() + } + + return count, nil +} + +// RemoveServer disconnects an MCP server and removes all its tools. +// After this call the agent will no longer see or be able to call tools from +// the named server. Returns an error if the server is not loaded. +func (m *MCPToolManager) RemoveServer(name string) error { + prefix := name + "__" + + m.mu.Lock() + + // Check the server actually has tools loaded. + found := false + for k := range m.toolMap { + if len(k) >= len(prefix) && k[:len(prefix)] == prefix { + found = true + break + } + } + if !found { + m.mu.Unlock() + return fmt.Errorf("MCP server %q is not loaded", name) + } + + // Remove tools belonging to this server. + newTools := make([]fantasy.AgentTool, 0, len(m.tools)) + for _, t := range m.tools { + if len(t.Info().Name) < len(prefix) || t.Info().Name[:len(prefix)] != prefix { + newTools = append(newTools, t) + } + } + m.tools = newTools + + // Remove tool mappings. + for k := range m.toolMap { + if len(k) >= len(prefix) && k[:len(prefix)] == prefix { + delete(m.toolMap, k) + } + } + m.mu.Unlock() + + // Close the connection in the pool (best-effort). + if m.connectionPool != nil { + _ = m.connectionPool.RemoveConnection(name) + } + + if m.onToolsChanged != nil { + m.onToolsChanged() + } + + return nil +} + +// ensureConnectionPool lazily creates a connection pool if one does not exist. +// This allows AddServer to work even if LoadTools was never called. +func (m *MCPToolManager) ensureConnectionPool() { + if m.connectionPool != nil { + return + } + debug := false + if m.config != nil { + debug = m.config.Debug + } + if m.debugLogger == nil { + m.debugLogger = NewSimpleDebugLogger(debug) + } + m.connectionPool = NewMCPConnectionPool(DefaultConnectionPoolConfig(), m.model, debug, m.authHandler, m.tokenStoreFactory) + m.connectionPool.SetDebugLogger(m.debugLogger) +} + // LoadTools loads tools from all configured MCP servers based on the provided configuration. // It initializes the connection pool, connects to each configured server, and loads their tools. // Tools from different servers are prefixed with the server name to avoid naming conflicts. @@ -299,6 +424,9 @@ func (m *MCPToolManager) GetLoadedServerNames() []string { // proper cleanup of stdio processes, network connections, and other resources. // It is safe to call Close multiple times. func (m *MCPToolManager) Close() error { + if m.connectionPool == nil { + return nil + } return m.connectionPool.Close() } diff --git a/internal/tools/mcp_dynamic_integration_test.go b/internal/tools/mcp_dynamic_integration_test.go new file mode 100644 index 00000000..0a43270d --- /dev/null +++ b/internal/tools/mcp_dynamic_integration_test.go @@ -0,0 +1,323 @@ +package tools + +import ( + "context" + "os" + "path/filepath" + "runtime" + "slices" + "strings" + "sync" + "testing" + "time" + + "github.com/mark3labs/kit/internal/config" +) + +// testdataDir returns the absolute path to the testdata directory. +func testdataDir(t *testing.T) string { + t.Helper() + _, file, _, ok := runtime.Caller(0) + if !ok { + t.Fatal("cannot determine test file path") + } + return filepath.Join(filepath.Dir(file), "testdata") +} + +// echoServerConfig returns an MCPServerConfig for the test echo MCP server. +func echoServerConfig(t *testing.T) config.MCPServerConfig { + t.Helper() + script := filepath.Join(testdataDir(t), "echo_server.py") + if _, err := os.Stat(script); err != nil { + t.Skipf("echo_server.py not found: %v", err) + } + return config.MCPServerConfig{ + Command: []string{"python3", script}, + } +} + +// TestMCPToolManager_AddServer_Integration tests adding a real MCP server +// at runtime and verifying tools are loaded. +func TestMCPToolManager_AddServer_Integration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + manager := NewMCPToolManager() + defer func() { _ = manager.Close() }() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cfg := echoServerConfig(t) + + // Track callbacks. + var mu sync.Mutex + var loadedServer string + var loadedCount int + toolsChangedCount := 0 + + manager.SetOnServerLoaded(func(name string, count int, err error) { + mu.Lock() + loadedServer = name + loadedCount = count + mu.Unlock() + }) + manager.SetOnToolsChanged(func() { + mu.Lock() + toolsChangedCount++ + mu.Unlock() + }) + + // Add the server. + count, err := manager.AddServer(ctx, "echo", cfg) + if err != nil { + t.Fatalf("AddServer failed: %v", err) + } + + if count != 2 { + t.Errorf("Expected 2 tools from echo server, got %d", count) + } + + // Verify callbacks fired. + mu.Lock() + if loadedServer != "echo" { + t.Errorf("Expected onServerLoaded for 'echo', got %q", loadedServer) + } + if loadedCount != 2 { + t.Errorf("Expected onServerLoaded count=2, got %d", loadedCount) + } + if toolsChangedCount != 1 { + t.Errorf("Expected onToolsChanged called once, got %d", toolsChangedCount) + } + mu.Unlock() + + // Verify tools are accessible. + tools := manager.GetTools() + if len(tools) != 2 { + t.Fatalf("Expected 2 tools, got %d", len(tools)) + } + + // Verify tool names are prefixed. + toolNames := make(map[string]bool) + for _, tool := range tools { + toolNames[tool.Info().Name] = true + } + if !toolNames["echo__echo"] { + t.Error("Expected tool 'echo__echo'") + } + if !toolNames["echo__greet"] { + t.Error("Expected tool 'echo__greet'") + } + + // Verify server appears in loaded names. + names := manager.GetLoadedServerNames() + if !slices.Contains(names, "echo") { + t.Errorf("Expected 'echo' in loaded server names, got: %v", names) + } +} + +// TestMCPToolManager_RemoveServer_Integration tests removing a real MCP server +// and verifying tools are cleaned up. +func TestMCPToolManager_RemoveServer_Integration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + manager := NewMCPToolManager() + defer func() { _ = manager.Close() }() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cfg := echoServerConfig(t) + + // Add the server first. + count, err := manager.AddServer(ctx, "echo", cfg) + if err != nil { + t.Fatalf("AddServer failed: %v", err) + } + if count != 2 { + t.Fatalf("Expected 2 tools, got %d", count) + } + + var mu sync.Mutex + toolsChangedCount := 0 + manager.SetOnToolsChanged(func() { + mu.Lock() + toolsChangedCount++ + mu.Unlock() + }) + + // Remove the server. + err = manager.RemoveServer("echo") + if err != nil { + t.Fatalf("RemoveServer failed: %v", err) + } + + // Verify tools are gone. + tools := manager.GetTools() + if len(tools) != 0 { + t.Errorf("Expected 0 tools after removal, got %d", len(tools)) + } + + // Verify callback fired. + mu.Lock() + if toolsChangedCount != 1 { + t.Errorf("Expected onToolsChanged called once, got %d", toolsChangedCount) + } + mu.Unlock() + + // Verify server is gone from loaded names. + names := manager.GetLoadedServerNames() + for _, n := range names { + if n == "echo" { + t.Error("Server 'echo' should not appear in loaded names after removal") + } + } + + // Removing again should error. + err = manager.RemoveServer("echo") + if err == nil { + t.Fatal("Expected error removing already-removed server") + } + if !strings.Contains(err.Error(), "not loaded") { + t.Errorf("Expected 'not loaded' error, got: %v", err) + } +} + +// TestMCPToolManager_AddRemoveMultiple_Integration tests adding and removing +// multiple servers, verifying tool isolation. +func TestMCPToolManager_AddRemoveMultiple_Integration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + manager := NewMCPToolManager() + defer func() { _ = manager.Close() }() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cfg := echoServerConfig(t) + + // Add two servers with the same binary but different names. + count1, err := manager.AddServer(ctx, "server-a", cfg) + if err != nil { + t.Fatalf("AddServer server-a failed: %v", err) + } + count2, err := manager.AddServer(ctx, "server-b", cfg) + if err != nil { + t.Fatalf("AddServer server-b failed: %v", err) + } + + totalTools := count1 + count2 + if totalTools != 4 { + t.Fatalf("Expected 4 total tools (2+2), got %d", totalTools) + } + + tools := manager.GetTools() + if len(tools) != 4 { + t.Fatalf("Expected 4 tools, got %d", len(tools)) + } + + // Remove server-a, verify server-b tools remain. + err = manager.RemoveServer("server-a") + if err != nil { + t.Fatalf("RemoveServer server-a failed: %v", err) + } + + tools = manager.GetTools() + if len(tools) != 2 { + t.Fatalf("Expected 2 tools after removing server-a, got %d", len(tools)) + } + + // Remaining tools should all be from server-b. + for _, tool := range tools { + if !strings.HasPrefix(tool.Info().Name, "server-b__") { + t.Errorf("Expected tool from server-b, got: %s", tool.Info().Name) + } + } + + // Remove server-b. + err = manager.RemoveServer("server-b") + if err != nil { + t.Fatalf("RemoveServer server-b failed: %v", err) + } + + tools = manager.GetTools() + if len(tools) != 0 { + t.Errorf("Expected 0 tools after removing all servers, got %d", len(tools)) + } +} + +// TestMCPToolManager_AddServer_DuplicateDetection_Integration tests that +// adding a server with the same name as an already loaded server errors. +func TestMCPToolManager_AddServer_DuplicateDetection_Integration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + manager := NewMCPToolManager() + defer func() { _ = manager.Close() }() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cfg := echoServerConfig(t) + + // Add the server. + _, err := manager.AddServer(ctx, "echo", cfg) + if err != nil { + t.Fatalf("First AddServer failed: %v", err) + } + + // Try to add again with the same name. + _, err = manager.AddServer(ctx, "echo", cfg) + if err == nil { + t.Fatal("Expected error adding duplicate server") + } + if !strings.Contains(err.Error(), "already loaded") { + t.Errorf("Expected 'already loaded' error, got: %v", err) + } +} + +// TestMCPToolManager_AddAfterRemove_Integration tests that a server can be +// re-added after being removed. +func TestMCPToolManager_AddAfterRemove_Integration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + manager := NewMCPToolManager() + defer func() { _ = manager.Close() }() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cfg := echoServerConfig(t) + + // Add, remove, re-add. + _, err := manager.AddServer(ctx, "echo", cfg) + if err != nil { + t.Fatalf("First AddServer failed: %v", err) + } + + err = manager.RemoveServer("echo") + if err != nil { + t.Fatalf("RemoveServer failed: %v", err) + } + + count, err := manager.AddServer(ctx, "echo", cfg) + if err != nil { + t.Fatalf("Re-AddServer failed: %v", err) + } + if count != 2 { + t.Errorf("Expected 2 tools on re-add, got %d", count) + } + + tools := manager.GetTools() + if len(tools) != 2 { + t.Errorf("Expected 2 tools after re-add, got %d", len(tools)) + } +} diff --git a/internal/tools/mcp_dynamic_test.go b/internal/tools/mcp_dynamic_test.go new file mode 100644 index 00000000..257e1ed7 --- /dev/null +++ b/internal/tools/mcp_dynamic_test.go @@ -0,0 +1,155 @@ +package tools + +import ( + "context" + "strings" + "sync" + "testing" + "time" + + "github.com/mark3labs/kit/internal/config" +) + +// TestMCPToolManager_AddServer_DuplicateName verifies that adding a server +// with a name that already exists returns an error. +func TestMCPToolManager_AddServer_DuplicateName(t *testing.T) { + manager := NewMCPToolManager() + + cfg := config.MCPServerConfig{ + Command: []string{"non-existent-command"}, + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // First add will fail (bad command), but let's test the duplicate detection + // by simulating a loaded server via LoadTools first. + loadCfg := &config.Config{ + MCPServers: map[string]config.MCPServerConfig{ + "test-server": cfg, + }, + } + // This will fail to load but creates the connection pool. + _ = manager.LoadTools(ctx, loadCfg) + + // Now try to add the same server name — the tools didn't load (bad command), + // so AddServer should not find a duplicate and should fail with connection error. + _, err := manager.AddServer(ctx, "test-server", cfg) + if err == nil { + t.Fatal("Expected error when adding server with bad command, got nil") + } + // It should be a connection error, not a duplicate error. + if strings.Contains(err.Error(), "already loaded") { + t.Fatalf("Should not report duplicate since server failed to load initially: %v", err) + } +} + +// TestMCPToolManager_RemoveServer_NotLoaded verifies that removing a server +// that doesn't exist returns an appropriate error. +func TestMCPToolManager_RemoveServer_NotLoaded(t *testing.T) { + manager := NewMCPToolManager() + + err := manager.RemoveServer("nonexistent") + if err == nil { + t.Fatal("Expected error when removing non-existent server, got nil") + } + if !strings.Contains(err.Error(), "not loaded") { + t.Errorf("Expected 'not loaded' error, got: %v", err) + } +} + +// TestMCPToolManager_AddServer_CreatesConnectionPool verifies that AddServer +// lazily creates a connection pool when LoadTools was never called. +func TestMCPToolManager_AddServer_CreatesConnectionPool(t *testing.T) { + manager := NewMCPToolManager() + + // Connection pool should be nil initially. + if manager.connectionPool != nil { + t.Fatal("Expected nil connection pool before any operation") + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // AddServer with a bad command — should fail, but the pool should be created. + _, err := manager.AddServer(ctx, "lazy-server", config.MCPServerConfig{ + Command: []string{"non-existent-command"}, + }) + if err == nil { + t.Fatal("Expected error for bad command") + } + + // Connection pool should have been created. + if manager.connectionPool == nil { + t.Fatal("Expected connection pool to be created lazily by AddServer") + } +} + +// TestMCPToolManager_OnToolsChanged_Callback verifies that the onToolsChanged +// callback fires on RemoveServer (we can't easily test AddServer with a real +// MCP server, but we can test the callback wiring). +func TestMCPToolManager_OnToolsChanged_Callback(t *testing.T) { + manager := NewMCPToolManager() + + var mu sync.Mutex + callCount := 0 + manager.SetOnToolsChanged(func() { + mu.Lock() + callCount++ + mu.Unlock() + }) + + // RemoveServer on non-existent should NOT fire callback. + _ = manager.RemoveServer("nonexistent") + + mu.Lock() + if callCount != 0 { + t.Errorf("Expected 0 callback calls for failed remove, got %d", callCount) + } + mu.Unlock() +} + +// TestMCPToolManager_Close_NilPool verifies Close is safe when the connection +// pool was never initialized. +func TestMCPToolManager_Close_NilPool(t *testing.T) { + manager := NewMCPToolManager() + err := manager.Close() + if err != nil { + t.Fatalf("Expected nil error from Close with nil pool, got: %v", err) + } +} + +// TestMCPConnectionPool_RemoveConnection_NotFound verifies that removing a +// non-existent connection returns an error. +func TestMCPConnectionPool_RemoveConnection_NotFound(t *testing.T) { + pool := NewMCPConnectionPool(DefaultConnectionPoolConfig(), nil, false, nil, nil) + defer func() { _ = pool.Close() }() + + err := pool.RemoveConnection("nonexistent") + if err == nil { + t.Fatal("Expected error for non-existent connection") + } + if !strings.Contains(err.Error(), "not found") { + t.Errorf("Expected 'not found' error, got: %v", err) + } +} + +// TestMCPToolManager_EnsureConnectionPool_Idempotent verifies that +// ensureConnectionPool doesn't recreate an existing pool. +func TestMCPToolManager_EnsureConnectionPool_Idempotent(t *testing.T) { + manager := NewMCPToolManager() + + // First call creates the pool. + manager.ensureConnectionPool() + pool1 := manager.connectionPool + if pool1 == nil { + t.Fatal("Expected pool to be created") + } + + // Second call should be a no-op. + manager.ensureConnectionPool() + pool2 := manager.connectionPool + if pool1 != pool2 { + t.Fatal("Expected ensureConnectionPool to be idempotent") + } +} diff --git a/internal/tools/testdata/echo_server.py b/internal/tools/testdata/echo_server.py new file mode 100755 index 00000000..91d42bee --- /dev/null +++ b/internal/tools/testdata/echo_server.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python3 +"""Minimal MCP server over stdio for testing. Exposes one tool: echo.""" +import json +import sys + + +def read_message(): + """Read a JSON-RPC message from stdin.""" + line = sys.stdin.readline() + if not line: + return None + return json.loads(line.strip()) + + +def write_message(msg): + """Write a JSON-RPC message to stdout.""" + sys.stdout.write(json.dumps(msg) + "\n") + sys.stdout.flush() + + +def handle(msg): + method = msg.get("method", "") + mid = msg.get("id") + + if method == "initialize": + write_message({ + "jsonrpc": "2.0", + "id": mid, + "result": { + "protocolVersion": "2024-11-05", + "capabilities": {"tools": {}}, + "serverInfo": {"name": "test-echo", "version": "1.0.0"}, + }, + }) + elif method == "notifications/initialized": + pass # no response needed + elif method == "tools/list": + write_message({ + "jsonrpc": "2.0", + "id": mid, + "result": { + "tools": [ + { + "name": "echo", + "description": "Echoes the input text back.", + "inputSchema": { + "type": "object", + "properties": { + "text": {"type": "string", "description": "Text to echo"} + }, + "required": ["text"], + }, + }, + { + "name": "greet", + "description": "Returns a greeting.", + "inputSchema": { + "type": "object", + "properties": { + "name": {"type": "string", "description": "Name to greet"} + }, + "required": ["name"], + }, + }, + ] + }, + }) + elif method == "tools/call": + tool_name = msg["params"]["name"] + args = msg["params"].get("arguments", {}) + if tool_name == "echo": + text = args.get("text", "") + write_message({ + "jsonrpc": "2.0", + "id": mid, + "result": { + "content": [{"type": "text", "text": text}] + }, + }) + elif tool_name == "greet": + name = args.get("name", "World") + write_message({ + "jsonrpc": "2.0", + "id": mid, + "result": { + "content": [{"type": "text", "text": f"Hello, {name}!"}] + }, + }) + else: + write_message({ + "jsonrpc": "2.0", + "id": mid, + "error": {"code": -32601, "message": f"Unknown tool: {tool_name}"}, + }) + elif method == "ping": + write_message({"jsonrpc": "2.0", "id": mid, "result": {}}) + else: + if mid is not None: + write_message({ + "jsonrpc": "2.0", + "id": mid, + "error": {"code": -32601, "message": f"Unknown method: {method}"}, + }) + + +if __name__ == "__main__": + while True: + msg = read_message() + if msg is None: + break + handle(msg) diff --git a/pkg/kit/kit.go b/pkg/kit/kit.go index 25c70d47..eac43583 100644 --- a/pkg/kit/kit.go +++ b/pkg/kit/kit.go @@ -146,6 +146,79 @@ func (m *Kit) MCPToolsReady() bool { return m.agent.MCPToolsReady() } +// MCPServerStatus describes the runtime state of a loaded MCP server. +type MCPServerStatus struct { + // Name is the configured server name. + Name string + // ToolCount is the number of tools loaded from this server. + ToolCount int +} + +// AddMCPServer connects to a new MCP server at runtime and makes its tools +// available to the agent immediately. The server's tools are prefixed with the +// server name (e.g. "myserver__tool_name") to avoid naming conflicts, matching +// the behaviour of servers loaded at initialization. +// +// Returns the number of tools loaded from the server. +// +// AddMCPServer is safe to call while the agent is idle. If a turn is in +// progress ([Kit.IsGenerating] returns true), the new tools will be visible +// starting from the next LLM step. +// +// Example: +// +// n, err := k.AddMCPServer(ctx, "github", kit.MCPServerConfig{ +// Command: []string{"npx", "-y", "@modelcontextprotocol/server-github"}, +// Environment: map[string]string{"GITHUB_TOKEN": os.Getenv("GITHUB_TOKEN")}, +// }) +func (m *Kit) AddMCPServer(ctx context.Context, name string, cfg MCPServerConfig) (int, error) { + return m.agent.AddMCPServer(ctx, name, cfg) +} + +// RemoveMCPServer disconnects an MCP server and removes all its tools from +// the agent. After this call the agent will no longer see or be able to call +// tools from the named server. +// +// RemoveMCPServer is safe to call while the agent is idle. If a turn is in +// progress, the tools are removed at the next LLM step. Any in-flight tool +// calls to the removed server will fail gracefully. +// +// Returns an error if the named server is not currently loaded. +func (m *Kit) RemoveMCPServer(name string) error { + return m.agent.RemoveMCPServer(name) +} + +// ListMCPServers returns the status of all currently loaded MCP servers. +// The returned slice is a snapshot; it is safe to read concurrently. +func (m *Kit) ListMCPServers() []MCPServerStatus { + names := m.agent.GetLoadedServerNames() + if len(names) == 0 { + return nil + } + + // Build a tool count per server by scanning tool names for the prefix. + toolNames := m.GetToolNames() + countByServer := make(map[string]int, len(names)) + for _, tn := range toolNames { + for _, sn := range names { + prefix := sn + "__" + if len(tn) > len(prefix) && tn[:len(prefix)] == prefix { + countByServer[sn]++ + break + } + } + } + + result := make([]MCPServerStatus, 0, len(names)) + for _, n := range names { + result = append(result, MCPServerStatus{ + Name: n, + ToolCount: countByServer[n], + }) + } + return result +} + // GetExtensionToolCount returns the number of tools registered by extensions. func (m *Kit) GetExtensionToolCount() int { return m.agent.GetExtensionToolCount() diff --git a/pkg/kit/mcp_dynamic_test.go b/pkg/kit/mcp_dynamic_test.go new file mode 100644 index 00000000..4bb354ef --- /dev/null +++ b/pkg/kit/mcp_dynamic_test.go @@ -0,0 +1,56 @@ +package kit_test + +import ( + "testing" + + kit "github.com/mark3labs/kit/pkg/kit" +) + +// TestMCPServerStatus_TypeSurface verifies the MCPServerStatus type is +// accessible and has the expected fields. +func TestMCPServerStatus_TypeSurface(t *testing.T) { + s := kit.MCPServerStatus{ + Name: "test-server", + ToolCount: 5, + } + if s.Name != "test-server" { + t.Errorf("Expected Name 'test-server', got %q", s.Name) + } + if s.ToolCount != 5 { + t.Errorf("Expected ToolCount 5, got %d", s.ToolCount) + } +} + +// TestMCPServerConfig_ForDynamicAdd verifies that MCPServerConfig can be +// constructed with the expected fields for dynamic server management. +func TestMCPServerConfig_ForDynamicAdd(t *testing.T) { + // Stdio server config. + stdio := kit.MCPServerConfig{ + Command: []string{"npx", "-y", "@modelcontextprotocol/server-github"}, + Environment: map[string]string{"GITHUB_TOKEN": "test-token"}, + } + if len(stdio.Command) != 3 { + t.Errorf("Expected 3 command parts, got %d", len(stdio.Command)) + } + if stdio.Environment["GITHUB_TOKEN"] != "test-token" { + t.Error("Expected GITHUB_TOKEN in environment") + } + + // Remote server config. + remote := kit.MCPServerConfig{ + URL: "https://mcp.example.com/sse", + Headers: []string{"Authorization: Bearer test"}, + } + if remote.URL != "https://mcp.example.com/sse" { + t.Errorf("Unexpected URL: %s", remote.URL) + } + + // Config with tool filtering. + filtered := kit.MCPServerConfig{ + Command: []string{"some-server"}, + AllowedTools: []string{"read", "write"}, + } + if len(filtered.AllowedTools) != 2 { + t.Errorf("Expected 2 allowed tools, got %d", len(filtered.AllowedTools)) + } +}