feat(mcp): add dynamic MCP server loading and unloading

- Add AddServer/RemoveServer to MCPToolManager for runtime server management
- Add RemoveConnection to MCPConnectionPool for per-server teardown
- Add AddMCPServer/RemoveMCPServer/ListMCPServers to Agent and SDK Kit
- Lazily create connection pool so AddServer works without prior LoadTools
- Wire onToolsChanged callback to trigger agent tool list rebuild
- Make MCPToolManager.Close nil-safe when pool was never initialized

Tests:
- Integration tests with real stdio MCP server (Python echo server)
- Agent-level tests using mock LLM model (no API key needed)
- Unit tests for error paths, callbacks, idempotency, nil safety
- SDK type surface tests
This commit is contained in:
Ed Zynda
2026-04-09 13:54:11 +03:00
parent b87146a284
commit e07c94f49d
9 changed files with 1162 additions and 0 deletions
+53
View File
@@ -834,6 +834,59 @@ func (a *Agent) SetExtraTools(extraTools []fantasy.AgentTool) {
a.rebuildFantasyAgent()
}
// AddMCPServer connects to a new MCP server at runtime and makes its tools
// available to the agent. Returns the number of tools loaded.
// If the agent has no tool manager (no MCP servers were configured at init),
// one is created automatically.
func (a *Agent) AddMCPServer(ctx context.Context, name string, cfg config.MCPServerConfig) (int, error) {
// Ensure MCP tools from initial load are settled first.
a.ensureMCPTools()
if a.toolManager == nil {
a.toolManager = tools.NewMCPToolManager()
a.toolManager.SetModel(a.model)
a.toolManager.SetOnToolsChanged(func() {
a.rebuildFantasyAgent()
})
}
count, err := a.toolManager.AddServer(ctx, name, cfg)
if err != nil {
return 0, err
}
// AddServer's onToolsChanged callback triggers rebuildFantasyAgent,
// but only if it was wired. Ensure rebuild happens regardless.
a.rebuildFantasyAgent()
return count, nil
}
// RemoveMCPServer disconnects an MCP server and removes its tools from the agent.
func (a *Agent) RemoveMCPServer(name string) error {
if a.toolManager == nil {
return fmt.Errorf("no MCP servers loaded")
}
// Ensure MCP tools from initial load are settled first.
a.ensureMCPTools()
err := a.toolManager.RemoveServer(name)
if err != nil {
return err
}
// RemoveServer's onToolsChanged callback triggers rebuildFantasyAgent,
// but ensure rebuild happens regardless.
a.rebuildFantasyAgent()
return nil
}
// GetMCPToolManager returns the underlying MCP tool manager.
// Returns nil if no MCP servers have been configured.
func (a *Agent) GetMCPToolManager() *tools.MCPToolManager {
return a.toolManager
}
// GetLoadingMessage returns the loading message from provider creation.
func (a *Agent) GetLoadingMessage() string {
return a.loadingMessage
+242
View File
@@ -0,0 +1,242 @@
package agent
import (
"context"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
"time"
"charm.land/fantasy"
"github.com/mark3labs/kit/internal/config"
)
// mockModel is a minimal LanguageModel that satisfies the interface
// without making real API calls. Used to test tool management wiring.
type mockModel struct{}
func (m *mockModel) Generate(_ context.Context, _ fantasy.Call) (*fantasy.Response, error) {
return &fantasy.Response{}, nil
}
func (m *mockModel) Stream(_ context.Context, _ fantasy.Call) (fantasy.StreamResponse, error) {
return nil, nil
}
func (m *mockModel) GenerateObject(_ context.Context, _ fantasy.ObjectCall) (*fantasy.ObjectResponse, error) {
return &fantasy.ObjectResponse{}, nil
}
func (m *mockModel) StreamObject(_ context.Context, _ fantasy.ObjectCall) (fantasy.ObjectStreamResponse, error) {
return nil, nil
}
func (m *mockModel) Provider() string { return "mock" }
func (m *mockModel) Model() string { return "mock-model" }
// testdataDir returns the absolute path to the tools testdata directory.
func testdataDir(t *testing.T) string {
t.Helper()
_, file, _, ok := runtime.Caller(0)
if !ok {
t.Fatal("cannot determine test file path")
}
return filepath.Join(filepath.Dir(file), "..", "tools", "testdata")
}
// echoServerConfig returns an MCPServerConfig for the test echo MCP server.
func echoServerConfig(t *testing.T) config.MCPServerConfig {
t.Helper()
script := filepath.Join(testdataDir(t), "echo_server.py")
if _, err := os.Stat(script); err != nil {
t.Skipf("echo_server.py not found: %v", err)
}
return config.MCPServerConfig{
Command: []string{"python3", script},
}
}
// newTestAgent creates a minimal Agent with a mock model and no core tools,
// suitable for testing MCP server management without an API key.
func newTestAgent() *Agent {
model := &mockModel{}
a := &Agent{
model: model,
coreTools: nil,
extraTools: nil,
maxSteps: 10,
systemPrompt: "test",
fantasyAgent: fantasy.NewAgent(model),
}
return a
}
func TestAgent_AddMCPServer(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
a := newTestAgent()
defer func() { _ = a.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cfg := echoServerConfig(t)
// Initially no MCP tools.
if a.GetMCPToolCount() != 0 {
t.Fatalf("Expected 0 MCP tools initially, got %d", a.GetMCPToolCount())
}
// Add a server.
count, err := a.AddMCPServer(ctx, "echo", cfg)
if err != nil {
t.Fatalf("AddMCPServer failed: %v", err)
}
if count != 2 {
t.Errorf("Expected 2 tools, got %d", count)
}
// Verify tools are in the agent's tool list.
if a.GetMCPToolCount() != 2 {
t.Errorf("Expected 2 MCP tools, got %d", a.GetMCPToolCount())
}
allTools := a.GetTools()
toolNames := make(map[string]bool)
for _, tool := range allTools {
toolNames[tool.Info().Name] = true
}
if !toolNames["echo__echo"] {
t.Error("Expected tool 'echo__echo' in agent tools")
}
if !toolNames["echo__greet"] {
t.Error("Expected tool 'echo__greet' in agent tools")
}
// Verify loaded server names.
names := a.GetLoadedServerNames()
found := false
for _, n := range names {
if n == "echo" {
found = true
}
}
if !found {
t.Errorf("Expected 'echo' in loaded server names: %v", names)
}
}
func TestAgent_RemoveMCPServer(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
a := newTestAgent()
defer func() { _ = a.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cfg := echoServerConfig(t)
// Add then remove.
_, err := a.AddMCPServer(ctx, "echo", cfg)
if err != nil {
t.Fatalf("AddMCPServer failed: %v", err)
}
err = a.RemoveMCPServer("echo")
if err != nil {
t.Fatalf("RemoveMCPServer failed: %v", err)
}
// Verify tools removed.
if a.GetMCPToolCount() != 0 {
t.Errorf("Expected 0 MCP tools after removal, got %d", a.GetMCPToolCount())
}
// Verify agent's tool list has no MCP tools.
for _, tool := range a.GetTools() {
if strings.Contains(tool.Info().Name, "echo__") {
t.Errorf("Found leftover tool after removal: %s", tool.Info().Name)
}
}
}
func TestAgent_RemoveMCPServer_NoToolManager(t *testing.T) {
a := newTestAgent()
defer func() { _ = a.Close() }()
err := a.RemoveMCPServer("nonexistent")
if err == nil {
t.Fatal("Expected error when no tool manager exists")
}
if !strings.Contains(err.Error(), "no MCP servers loaded") {
t.Errorf("Expected 'no MCP servers loaded' error, got: %v", err)
}
}
func TestAgent_AddMCPServer_CreatesToolManager(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
a := newTestAgent()
defer func() { _ = a.Close() }()
// Initially no tool manager.
if a.GetMCPToolManager() != nil {
t.Fatal("Expected nil tool manager initially")
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cfg := echoServerConfig(t)
_, err := a.AddMCPServer(ctx, "echo", cfg)
if err != nil {
t.Fatalf("AddMCPServer failed: %v", err)
}
// Tool manager should now exist.
if a.GetMCPToolManager() == nil {
t.Fatal("Expected tool manager to be created by AddMCPServer")
}
}
func TestAgent_AddRemoveAdd_MCP(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
a := newTestAgent()
defer func() { _ = a.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cfg := echoServerConfig(t)
// Add → Remove → Add cycle.
_, err := a.AddMCPServer(ctx, "echo", cfg)
if err != nil {
t.Fatalf("First add failed: %v", err)
}
err = a.RemoveMCPServer("echo")
if err != nil {
t.Fatalf("Remove failed: %v", err)
}
count, err := a.AddMCPServer(ctx, "echo", cfg)
if err != nil {
t.Fatalf("Re-add failed: %v", err)
}
if count != 2 {
t.Errorf("Expected 2 tools on re-add, got %d", count)
}
if a.GetMCPToolCount() != 2 {
t.Errorf("Expected 2 MCP tools after re-add, got %d", a.GetMCPToolCount())
}
}
+21
View File
@@ -595,6 +595,27 @@ func (p *MCPConnectionPool) GetClients() map[string]client.MCPClient {
return clients
}
// RemoveConnection closes and removes a single connection from the pool.
// Returns an error if the connection does not exist or if closing fails.
// Thread-safe for concurrent use.
func (p *MCPConnectionPool) RemoveConnection(serverName string) error {
p.mu.Lock()
defer p.mu.Unlock()
conn, exists := p.connections[serverName]
if !exists {
return fmt.Errorf("connection %q not found in pool", serverName)
}
err := conn.client.Close()
delete(p.connections, serverName)
if p.debugLogger != nil && p.debugLogger.IsDebugEnabled() {
p.debugLogger.LogDebug(fmt.Sprintf("[POOL] Removed connection %s", serverName))
}
return err
}
// Close gracefully shuts down the connection pool, closing all client connections
// and stopping the background health check goroutine. It attempts to close all
// connections even if some fail, logging any errors encountered.
+128
View File
@@ -34,6 +34,11 @@ type MCPToolManager struct {
// onServerLoaded, if non-nil, is called when each server finishes loading.
// Called with server name, tool count, and error (nil on success).
onServerLoaded func(serverName string, toolCount int, err error)
// onToolsChanged, if non-nil, is called after AddServer or RemoveServer
// mutates the tool list. The agent layer uses this to trigger a
// rebuildFantasyAgent so the LLM sees the updated tools.
onToolsChanged func()
}
// toolMapping stores the mapping between prefixed tool names and their original details
@@ -96,6 +101,126 @@ func (m *MCPToolManager) SetOnServerLoaded(cb func(serverName string, toolCount
m.onServerLoaded = cb
}
// SetOnToolsChanged sets the callback that's invoked after AddServer or
// RemoveServer mutates the tool list. The agent layer uses this to trigger
// a rebuild of the fantasy agent so the LLM sees the updated tool set.
func (m *MCPToolManager) SetOnToolsChanged(cb func()) {
m.onToolsChanged = cb
}
// AddServer connects to a new MCP server at runtime and loads its tools.
// The server's tools are immediately available to the agent after this call.
// Returns the number of tools loaded from the server.
//
// If the connection pool has not been initialised yet (i.e. LoadTools was never
// called), AddServer creates one automatically using the manager's current
// configuration.
//
// Returns an error if a server with the same name is already loaded, or if
// the connection or tool loading fails.
func (m *MCPToolManager) AddServer(ctx context.Context, name string, cfg config.MCPServerConfig) (int, error) {
m.mu.Lock()
// Check for duplicate.
if _, exists := m.toolMap[name+"__"]; exists {
m.mu.Unlock()
return 0, fmt.Errorf("MCP server %q is already loaded", name)
}
// More thorough duplicate check: scan toolMap for any key with the server prefix.
prefix := name + "__"
for k := range m.toolMap {
if len(k) >= len(prefix) && k[:len(prefix)] == prefix {
m.mu.Unlock()
return 0, fmt.Errorf("MCP server %q is already loaded", name)
}
}
m.mu.Unlock()
// Lazily create the connection pool if LoadTools was never called.
m.ensureConnectionPool()
count, err := m.loadServerTools(ctx, name, cfg)
if err != nil {
return 0, fmt.Errorf("failed to add MCP server %q: %w", name, err)
}
// Notify listeners.
if m.onServerLoaded != nil {
m.onServerLoaded(name, count, nil)
}
if m.onToolsChanged != nil {
m.onToolsChanged()
}
return count, nil
}
// RemoveServer disconnects an MCP server and removes all its tools.
// After this call the agent will no longer see or be able to call tools from
// the named server. Returns an error if the server is not loaded.
func (m *MCPToolManager) RemoveServer(name string) error {
prefix := name + "__"
m.mu.Lock()
// Check the server actually has tools loaded.
found := false
for k := range m.toolMap {
if len(k) >= len(prefix) && k[:len(prefix)] == prefix {
found = true
break
}
}
if !found {
m.mu.Unlock()
return fmt.Errorf("MCP server %q is not loaded", name)
}
// Remove tools belonging to this server.
newTools := make([]fantasy.AgentTool, 0, len(m.tools))
for _, t := range m.tools {
if len(t.Info().Name) < len(prefix) || t.Info().Name[:len(prefix)] != prefix {
newTools = append(newTools, t)
}
}
m.tools = newTools
// Remove tool mappings.
for k := range m.toolMap {
if len(k) >= len(prefix) && k[:len(prefix)] == prefix {
delete(m.toolMap, k)
}
}
m.mu.Unlock()
// Close the connection in the pool (best-effort).
if m.connectionPool != nil {
_ = m.connectionPool.RemoveConnection(name)
}
if m.onToolsChanged != nil {
m.onToolsChanged()
}
return nil
}
// ensureConnectionPool lazily creates a connection pool if one does not exist.
// This allows AddServer to work even if LoadTools was never called.
func (m *MCPToolManager) ensureConnectionPool() {
if m.connectionPool != nil {
return
}
debug := false
if m.config != nil {
debug = m.config.Debug
}
if m.debugLogger == nil {
m.debugLogger = NewSimpleDebugLogger(debug)
}
m.connectionPool = NewMCPConnectionPool(DefaultConnectionPoolConfig(), m.model, debug, m.authHandler, m.tokenStoreFactory)
m.connectionPool.SetDebugLogger(m.debugLogger)
}
// LoadTools loads tools from all configured MCP servers based on the provided configuration.
// It initializes the connection pool, connects to each configured server, and loads their tools.
// Tools from different servers are prefixed with the server name to avoid naming conflicts.
@@ -299,6 +424,9 @@ func (m *MCPToolManager) GetLoadedServerNames() []string {
// proper cleanup of stdio processes, network connections, and other resources.
// It is safe to call Close multiple times.
func (m *MCPToolManager) Close() error {
if m.connectionPool == nil {
return nil
}
return m.connectionPool.Close()
}
@@ -0,0 +1,323 @@
package tools
import (
"context"
"os"
"path/filepath"
"runtime"
"slices"
"strings"
"sync"
"testing"
"time"
"github.com/mark3labs/kit/internal/config"
)
// testdataDir returns the absolute path to the testdata directory.
func testdataDir(t *testing.T) string {
t.Helper()
_, file, _, ok := runtime.Caller(0)
if !ok {
t.Fatal("cannot determine test file path")
}
return filepath.Join(filepath.Dir(file), "testdata")
}
// echoServerConfig returns an MCPServerConfig for the test echo MCP server.
func echoServerConfig(t *testing.T) config.MCPServerConfig {
t.Helper()
script := filepath.Join(testdataDir(t), "echo_server.py")
if _, err := os.Stat(script); err != nil {
t.Skipf("echo_server.py not found: %v", err)
}
return config.MCPServerConfig{
Command: []string{"python3", script},
}
}
// TestMCPToolManager_AddServer_Integration tests adding a real MCP server
// at runtime and verifying tools are loaded.
func TestMCPToolManager_AddServer_Integration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
manager := NewMCPToolManager()
defer func() { _ = manager.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cfg := echoServerConfig(t)
// Track callbacks.
var mu sync.Mutex
var loadedServer string
var loadedCount int
toolsChangedCount := 0
manager.SetOnServerLoaded(func(name string, count int, err error) {
mu.Lock()
loadedServer = name
loadedCount = count
mu.Unlock()
})
manager.SetOnToolsChanged(func() {
mu.Lock()
toolsChangedCount++
mu.Unlock()
})
// Add the server.
count, err := manager.AddServer(ctx, "echo", cfg)
if err != nil {
t.Fatalf("AddServer failed: %v", err)
}
if count != 2 {
t.Errorf("Expected 2 tools from echo server, got %d", count)
}
// Verify callbacks fired.
mu.Lock()
if loadedServer != "echo" {
t.Errorf("Expected onServerLoaded for 'echo', got %q", loadedServer)
}
if loadedCount != 2 {
t.Errorf("Expected onServerLoaded count=2, got %d", loadedCount)
}
if toolsChangedCount != 1 {
t.Errorf("Expected onToolsChanged called once, got %d", toolsChangedCount)
}
mu.Unlock()
// Verify tools are accessible.
tools := manager.GetTools()
if len(tools) != 2 {
t.Fatalf("Expected 2 tools, got %d", len(tools))
}
// Verify tool names are prefixed.
toolNames := make(map[string]bool)
for _, tool := range tools {
toolNames[tool.Info().Name] = true
}
if !toolNames["echo__echo"] {
t.Error("Expected tool 'echo__echo'")
}
if !toolNames["echo__greet"] {
t.Error("Expected tool 'echo__greet'")
}
// Verify server appears in loaded names.
names := manager.GetLoadedServerNames()
if !slices.Contains(names, "echo") {
t.Errorf("Expected 'echo' in loaded server names, got: %v", names)
}
}
// TestMCPToolManager_RemoveServer_Integration tests removing a real MCP server
// and verifying tools are cleaned up.
func TestMCPToolManager_RemoveServer_Integration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
manager := NewMCPToolManager()
defer func() { _ = manager.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cfg := echoServerConfig(t)
// Add the server first.
count, err := manager.AddServer(ctx, "echo", cfg)
if err != nil {
t.Fatalf("AddServer failed: %v", err)
}
if count != 2 {
t.Fatalf("Expected 2 tools, got %d", count)
}
var mu sync.Mutex
toolsChangedCount := 0
manager.SetOnToolsChanged(func() {
mu.Lock()
toolsChangedCount++
mu.Unlock()
})
// Remove the server.
err = manager.RemoveServer("echo")
if err != nil {
t.Fatalf("RemoveServer failed: %v", err)
}
// Verify tools are gone.
tools := manager.GetTools()
if len(tools) != 0 {
t.Errorf("Expected 0 tools after removal, got %d", len(tools))
}
// Verify callback fired.
mu.Lock()
if toolsChangedCount != 1 {
t.Errorf("Expected onToolsChanged called once, got %d", toolsChangedCount)
}
mu.Unlock()
// Verify server is gone from loaded names.
names := manager.GetLoadedServerNames()
for _, n := range names {
if n == "echo" {
t.Error("Server 'echo' should not appear in loaded names after removal")
}
}
// Removing again should error.
err = manager.RemoveServer("echo")
if err == nil {
t.Fatal("Expected error removing already-removed server")
}
if !strings.Contains(err.Error(), "not loaded") {
t.Errorf("Expected 'not loaded' error, got: %v", err)
}
}
// TestMCPToolManager_AddRemoveMultiple_Integration tests adding and removing
// multiple servers, verifying tool isolation.
func TestMCPToolManager_AddRemoveMultiple_Integration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
manager := NewMCPToolManager()
defer func() { _ = manager.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cfg := echoServerConfig(t)
// Add two servers with the same binary but different names.
count1, err := manager.AddServer(ctx, "server-a", cfg)
if err != nil {
t.Fatalf("AddServer server-a failed: %v", err)
}
count2, err := manager.AddServer(ctx, "server-b", cfg)
if err != nil {
t.Fatalf("AddServer server-b failed: %v", err)
}
totalTools := count1 + count2
if totalTools != 4 {
t.Fatalf("Expected 4 total tools (2+2), got %d", totalTools)
}
tools := manager.GetTools()
if len(tools) != 4 {
t.Fatalf("Expected 4 tools, got %d", len(tools))
}
// Remove server-a, verify server-b tools remain.
err = manager.RemoveServer("server-a")
if err != nil {
t.Fatalf("RemoveServer server-a failed: %v", err)
}
tools = manager.GetTools()
if len(tools) != 2 {
t.Fatalf("Expected 2 tools after removing server-a, got %d", len(tools))
}
// Remaining tools should all be from server-b.
for _, tool := range tools {
if !strings.HasPrefix(tool.Info().Name, "server-b__") {
t.Errorf("Expected tool from server-b, got: %s", tool.Info().Name)
}
}
// Remove server-b.
err = manager.RemoveServer("server-b")
if err != nil {
t.Fatalf("RemoveServer server-b failed: %v", err)
}
tools = manager.GetTools()
if len(tools) != 0 {
t.Errorf("Expected 0 tools after removing all servers, got %d", len(tools))
}
}
// TestMCPToolManager_AddServer_DuplicateDetection_Integration tests that
// adding a server with the same name as an already loaded server errors.
func TestMCPToolManager_AddServer_DuplicateDetection_Integration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
manager := NewMCPToolManager()
defer func() { _ = manager.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cfg := echoServerConfig(t)
// Add the server.
_, err := manager.AddServer(ctx, "echo", cfg)
if err != nil {
t.Fatalf("First AddServer failed: %v", err)
}
// Try to add again with the same name.
_, err = manager.AddServer(ctx, "echo", cfg)
if err == nil {
t.Fatal("Expected error adding duplicate server")
}
if !strings.Contains(err.Error(), "already loaded") {
t.Errorf("Expected 'already loaded' error, got: %v", err)
}
}
// TestMCPToolManager_AddAfterRemove_Integration tests that a server can be
// re-added after being removed.
func TestMCPToolManager_AddAfterRemove_Integration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
manager := NewMCPToolManager()
defer func() { _ = manager.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cfg := echoServerConfig(t)
// Add, remove, re-add.
_, err := manager.AddServer(ctx, "echo", cfg)
if err != nil {
t.Fatalf("First AddServer failed: %v", err)
}
err = manager.RemoveServer("echo")
if err != nil {
t.Fatalf("RemoveServer failed: %v", err)
}
count, err := manager.AddServer(ctx, "echo", cfg)
if err != nil {
t.Fatalf("Re-AddServer failed: %v", err)
}
if count != 2 {
t.Errorf("Expected 2 tools on re-add, got %d", count)
}
tools := manager.GetTools()
if len(tools) != 2 {
t.Errorf("Expected 2 tools after re-add, got %d", len(tools))
}
}
+155
View File
@@ -0,0 +1,155 @@
package tools
import (
"context"
"strings"
"sync"
"testing"
"time"
"github.com/mark3labs/kit/internal/config"
)
// TestMCPToolManager_AddServer_DuplicateName verifies that adding a server
// with a name that already exists returns an error.
func TestMCPToolManager_AddServer_DuplicateName(t *testing.T) {
manager := NewMCPToolManager()
cfg := config.MCPServerConfig{
Command: []string{"non-existent-command"},
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// First add will fail (bad command), but let's test the duplicate detection
// by simulating a loaded server via LoadTools first.
loadCfg := &config.Config{
MCPServers: map[string]config.MCPServerConfig{
"test-server": cfg,
},
}
// This will fail to load but creates the connection pool.
_ = manager.LoadTools(ctx, loadCfg)
// Now try to add the same server name — the tools didn't load (bad command),
// so AddServer should not find a duplicate and should fail with connection error.
_, err := manager.AddServer(ctx, "test-server", cfg)
if err == nil {
t.Fatal("Expected error when adding server with bad command, got nil")
}
// It should be a connection error, not a duplicate error.
if strings.Contains(err.Error(), "already loaded") {
t.Fatalf("Should not report duplicate since server failed to load initially: %v", err)
}
}
// TestMCPToolManager_RemoveServer_NotLoaded verifies that removing a server
// that doesn't exist returns an appropriate error.
func TestMCPToolManager_RemoveServer_NotLoaded(t *testing.T) {
manager := NewMCPToolManager()
err := manager.RemoveServer("nonexistent")
if err == nil {
t.Fatal("Expected error when removing non-existent server, got nil")
}
if !strings.Contains(err.Error(), "not loaded") {
t.Errorf("Expected 'not loaded' error, got: %v", err)
}
}
// TestMCPToolManager_AddServer_CreatesConnectionPool verifies that AddServer
// lazily creates a connection pool when LoadTools was never called.
func TestMCPToolManager_AddServer_CreatesConnectionPool(t *testing.T) {
manager := NewMCPToolManager()
// Connection pool should be nil initially.
if manager.connectionPool != nil {
t.Fatal("Expected nil connection pool before any operation")
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// AddServer with a bad command — should fail, but the pool should be created.
_, err := manager.AddServer(ctx, "lazy-server", config.MCPServerConfig{
Command: []string{"non-existent-command"},
})
if err == nil {
t.Fatal("Expected error for bad command")
}
// Connection pool should have been created.
if manager.connectionPool == nil {
t.Fatal("Expected connection pool to be created lazily by AddServer")
}
}
// TestMCPToolManager_OnToolsChanged_Callback verifies that the onToolsChanged
// callback fires on RemoveServer (we can't easily test AddServer with a real
// MCP server, but we can test the callback wiring).
func TestMCPToolManager_OnToolsChanged_Callback(t *testing.T) {
manager := NewMCPToolManager()
var mu sync.Mutex
callCount := 0
manager.SetOnToolsChanged(func() {
mu.Lock()
callCount++
mu.Unlock()
})
// RemoveServer on non-existent should NOT fire callback.
_ = manager.RemoveServer("nonexistent")
mu.Lock()
if callCount != 0 {
t.Errorf("Expected 0 callback calls for failed remove, got %d", callCount)
}
mu.Unlock()
}
// TestMCPToolManager_Close_NilPool verifies Close is safe when the connection
// pool was never initialized.
func TestMCPToolManager_Close_NilPool(t *testing.T) {
manager := NewMCPToolManager()
err := manager.Close()
if err != nil {
t.Fatalf("Expected nil error from Close with nil pool, got: %v", err)
}
}
// TestMCPConnectionPool_RemoveConnection_NotFound verifies that removing a
// non-existent connection returns an error.
func TestMCPConnectionPool_RemoveConnection_NotFound(t *testing.T) {
pool := NewMCPConnectionPool(DefaultConnectionPoolConfig(), nil, false, nil, nil)
defer func() { _ = pool.Close() }()
err := pool.RemoveConnection("nonexistent")
if err == nil {
t.Fatal("Expected error for non-existent connection")
}
if !strings.Contains(err.Error(), "not found") {
t.Errorf("Expected 'not found' error, got: %v", err)
}
}
// TestMCPToolManager_EnsureConnectionPool_Idempotent verifies that
// ensureConnectionPool doesn't recreate an existing pool.
func TestMCPToolManager_EnsureConnectionPool_Idempotent(t *testing.T) {
manager := NewMCPToolManager()
// First call creates the pool.
manager.ensureConnectionPool()
pool1 := manager.connectionPool
if pool1 == nil {
t.Fatal("Expected pool to be created")
}
// Second call should be a no-op.
manager.ensureConnectionPool()
pool2 := manager.connectionPool
if pool1 != pool2 {
t.Fatal("Expected ensureConnectionPool to be idempotent")
}
}
+111
View File
@@ -0,0 +1,111 @@
#!/usr/bin/env python3
"""Minimal MCP server over stdio for testing. Exposes one tool: echo."""
import json
import sys
def read_message():
"""Read a JSON-RPC message from stdin."""
line = sys.stdin.readline()
if not line:
return None
return json.loads(line.strip())
def write_message(msg):
"""Write a JSON-RPC message to stdout."""
sys.stdout.write(json.dumps(msg) + "\n")
sys.stdout.flush()
def handle(msg):
method = msg.get("method", "")
mid = msg.get("id")
if method == "initialize":
write_message({
"jsonrpc": "2.0",
"id": mid,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {"name": "test-echo", "version": "1.0.0"},
},
})
elif method == "notifications/initialized":
pass # no response needed
elif method == "tools/list":
write_message({
"jsonrpc": "2.0",
"id": mid,
"result": {
"tools": [
{
"name": "echo",
"description": "Echoes the input text back.",
"inputSchema": {
"type": "object",
"properties": {
"text": {"type": "string", "description": "Text to echo"}
},
"required": ["text"],
},
},
{
"name": "greet",
"description": "Returns a greeting.",
"inputSchema": {
"type": "object",
"properties": {
"name": {"type": "string", "description": "Name to greet"}
},
"required": ["name"],
},
},
]
},
})
elif method == "tools/call":
tool_name = msg["params"]["name"]
args = msg["params"].get("arguments", {})
if tool_name == "echo":
text = args.get("text", "")
write_message({
"jsonrpc": "2.0",
"id": mid,
"result": {
"content": [{"type": "text", "text": text}]
},
})
elif tool_name == "greet":
name = args.get("name", "World")
write_message({
"jsonrpc": "2.0",
"id": mid,
"result": {
"content": [{"type": "text", "text": f"Hello, {name}!"}]
},
})
else:
write_message({
"jsonrpc": "2.0",
"id": mid,
"error": {"code": -32601, "message": f"Unknown tool: {tool_name}"},
})
elif method == "ping":
write_message({"jsonrpc": "2.0", "id": mid, "result": {}})
else:
if mid is not None:
write_message({
"jsonrpc": "2.0",
"id": mid,
"error": {"code": -32601, "message": f"Unknown method: {method}"},
})
if __name__ == "__main__":
while True:
msg = read_message()
if msg is None:
break
handle(msg)
+73
View File
@@ -146,6 +146,79 @@ func (m *Kit) MCPToolsReady() bool {
return m.agent.MCPToolsReady()
}
// MCPServerStatus describes the runtime state of a loaded MCP server.
type MCPServerStatus struct {
// Name is the configured server name.
Name string
// ToolCount is the number of tools loaded from this server.
ToolCount int
}
// AddMCPServer connects to a new MCP server at runtime and makes its tools
// available to the agent immediately. The server's tools are prefixed with the
// server name (e.g. "myserver__tool_name") to avoid naming conflicts, matching
// the behaviour of servers loaded at initialization.
//
// Returns the number of tools loaded from the server.
//
// AddMCPServer is safe to call while the agent is idle. If a turn is in
// progress ([Kit.IsGenerating] returns true), the new tools will be visible
// starting from the next LLM step.
//
// Example:
//
// n, err := k.AddMCPServer(ctx, "github", kit.MCPServerConfig{
// Command: []string{"npx", "-y", "@modelcontextprotocol/server-github"},
// Environment: map[string]string{"GITHUB_TOKEN": os.Getenv("GITHUB_TOKEN")},
// })
func (m *Kit) AddMCPServer(ctx context.Context, name string, cfg MCPServerConfig) (int, error) {
return m.agent.AddMCPServer(ctx, name, cfg)
}
// RemoveMCPServer disconnects an MCP server and removes all its tools from
// the agent. After this call the agent will no longer see or be able to call
// tools from the named server.
//
// RemoveMCPServer is safe to call while the agent is idle. If a turn is in
// progress, the tools are removed at the next LLM step. Any in-flight tool
// calls to the removed server will fail gracefully.
//
// Returns an error if the named server is not currently loaded.
func (m *Kit) RemoveMCPServer(name string) error {
return m.agent.RemoveMCPServer(name)
}
// ListMCPServers returns the status of all currently loaded MCP servers.
// The returned slice is a snapshot; it is safe to read concurrently.
func (m *Kit) ListMCPServers() []MCPServerStatus {
names := m.agent.GetLoadedServerNames()
if len(names) == 0 {
return nil
}
// Build a tool count per server by scanning tool names for the prefix.
toolNames := m.GetToolNames()
countByServer := make(map[string]int, len(names))
for _, tn := range toolNames {
for _, sn := range names {
prefix := sn + "__"
if len(tn) > len(prefix) && tn[:len(prefix)] == prefix {
countByServer[sn]++
break
}
}
}
result := make([]MCPServerStatus, 0, len(names))
for _, n := range names {
result = append(result, MCPServerStatus{
Name: n,
ToolCount: countByServer[n],
})
}
return result
}
// GetExtensionToolCount returns the number of tools registered by extensions.
func (m *Kit) GetExtensionToolCount() int {
return m.agent.GetExtensionToolCount()
+56
View File
@@ -0,0 +1,56 @@
package kit_test
import (
"testing"
kit "github.com/mark3labs/kit/pkg/kit"
)
// TestMCPServerStatus_TypeSurface verifies the MCPServerStatus type is
// accessible and has the expected fields.
func TestMCPServerStatus_TypeSurface(t *testing.T) {
s := kit.MCPServerStatus{
Name: "test-server",
ToolCount: 5,
}
if s.Name != "test-server" {
t.Errorf("Expected Name 'test-server', got %q", s.Name)
}
if s.ToolCount != 5 {
t.Errorf("Expected ToolCount 5, got %d", s.ToolCount)
}
}
// TestMCPServerConfig_ForDynamicAdd verifies that MCPServerConfig can be
// constructed with the expected fields for dynamic server management.
func TestMCPServerConfig_ForDynamicAdd(t *testing.T) {
// Stdio server config.
stdio := kit.MCPServerConfig{
Command: []string{"npx", "-y", "@modelcontextprotocol/server-github"},
Environment: map[string]string{"GITHUB_TOKEN": "test-token"},
}
if len(stdio.Command) != 3 {
t.Errorf("Expected 3 command parts, got %d", len(stdio.Command))
}
if stdio.Environment["GITHUB_TOKEN"] != "test-token" {
t.Error("Expected GITHUB_TOKEN in environment")
}
// Remote server config.
remote := kit.MCPServerConfig{
URL: "https://mcp.example.com/sse",
Headers: []string{"Authorization: Bearer test"},
}
if remote.URL != "https://mcp.example.com/sse" {
t.Errorf("Unexpected URL: %s", remote.URL)
}
// Config with tool filtering.
filtered := kit.MCPServerConfig{
Command: []string{"some-server"},
AllowedTools: []string{"read", "write"},
}
if len(filtered.AllowedTools) != 2 {
t.Errorf("Expected 2 allowed tools, got %d", len(filtered.AllowedTools))
}
}