feat(sdk): support mcp-go in-process transport for MCP servers

- Add InProcessServer field to MCPServerConfig (json:"-", never serialized)
- Add "inprocess" transport type to config, validation, and connection pool
- Add createInProcessClient() using mcp-go client.NewInProcessClient()
- Add Kit.AddInProcessMCPServer() convenience method
- Add Options.InProcessMCPServers for init-time registration
- Export MCPServer type alias (= server.MCPServer) in pkg/kit/types.go
- Add 8 tests covering config, pool, tool manager, and edge cases
- Update SDK README, kit-sdk skill, and www docs
This commit is contained in:
Ed Zynda
2026-04-15 16:29:07 +03:00
parent 398e825df8
commit 0974d37ab2
9 changed files with 521 additions and 1 deletions
+18 -1
View File
@@ -30,6 +30,12 @@ type MCPServerConfig struct {
OAuthClientSecret string `json:"oauthClientSecret,omitempty" yaml:"oauthClientSecret,omitempty"`
OAuthScopes []string `json:"oauthScopes,omitempty" yaml:"oauthScopes,omitempty"`
// InProcessServer holds a live *server.MCPServer for in-process transport.
// When set (and Type is "inprocess"), the connection pool creates an
// in-process client instead of spawning a subprocess or making HTTP calls.
// This field is never serialized — it is only used programmatically via the SDK.
InProcessServer any `json:"-" yaml:"-"`
// Legacy fields for backward compatibility
Transport string `json:"transport,omitempty"`
Args []string `json:"args,omitempty"`
@@ -277,11 +283,18 @@ func (s *MCPServerConfig) GetTransportType() string {
return "stdio"
case "remote":
return "streamable"
case "inprocess":
return "inprocess"
default:
return s.Type
}
}
// Programmatic in-process server detection.
if s.InProcessServer != nil {
return "inprocess"
}
// Backward compatibility: infer transport type
if len(s.Command) > 0 {
return "stdio"
@@ -312,8 +325,12 @@ func (c *Config) Validate() error {
if serverConfig.URL == "" {
return fmt.Errorf("server %s: url is required for %s transport", serverName, transport)
}
case "inprocess":
if serverConfig.InProcessServer == nil {
return fmt.Errorf("server %s: InProcessServer is required for inprocess transport", serverName)
}
default:
return fmt.Errorf("server %s: unsupported transport type '%s'. Supported types: stdio, sse, streamable", serverName, transport)
return fmt.Errorf("server %s: unsupported transport type '%s'. Supported types: stdio, sse, streamable, inprocess", serverName, transport)
}
}
return nil
+19
View File
@@ -12,6 +12,7 @@ import (
"github.com/mark3labs/mcp-go/client"
"github.com/mark3labs/mcp-go/client/transport"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
// ConnectionPoolConfig defines configuration parameters for the MCP connection pool.
@@ -304,6 +305,8 @@ func (p *MCPConnectionPool) createMCPClient(ctx context.Context, serverName stri
return p.createSSEClient(ctx, serverConfig)
case "streamable":
return p.createStreamableClient(ctx, serverConfig)
case "inprocess":
return p.createInProcessClient(serverConfig)
default:
return nil, fmt.Errorf("unsupported transport type '%s' for server %s", transportType, serverName)
}
@@ -455,6 +458,22 @@ func (p *MCPConnectionPool) createStreamableClient(ctx context.Context, serverCo
return streamableClient, nil
}
// createInProcessClient creates an in-process MCP client that communicates
// directly with an *server.MCPServer in the same process. No subprocess is
// spawned and no network I/O occurs — calls go through JSON marshal →
// MCPServer.HandleMessage → JSON unmarshal, all in-memory.
func (p *MCPConnectionPool) createInProcessClient(serverConfig config.MCPServerConfig) (client.MCPClient, error) {
srv, ok := serverConfig.InProcessServer.(*server.MCPServer)
if !ok {
return nil, fmt.Errorf("InProcessServer must be *server.MCPServer, got %T", serverConfig.InProcessServer)
}
inProcessClient, err := client.NewInProcessClient(srv)
if err != nil {
return nil, fmt.Errorf("failed to create in-process client: %w", err)
}
return inProcessClient, nil
}
// createTokenStore creates a token store for the given server URL.
// If a custom TokenStoreFactory is configured, it is used; otherwise the
// default file-backed token store is created.
+244
View File
@@ -0,0 +1,244 @@
package tools
import (
"context"
"encoding/json"
"strings"
"testing"
"github.com/mark3labs/kit/internal/config"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
// newTestInProcessServer creates a simple MCP server with one tool for testing.
func newTestInProcessServer() *server.MCPServer {
srv := server.NewMCPServer("test-server", "1.0.0",
server.WithToolCapabilities(true),
)
srv.AddTool(
mcp.NewTool("greet",
mcp.WithDescription("Say hello"),
mcp.WithString("name", mcp.Required(), mcp.Description("Name to greet")),
),
func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
name, _ := req.GetArguments()["name"].(string)
return mcp.NewToolResultText("Hello, " + name + "!"), nil
},
)
return srv
}
func TestInProcessTransportType(t *testing.T) {
cfg := config.MCPServerConfig{
Type: "inprocess",
InProcessServer: newTestInProcessServer(),
}
if got := cfg.GetTransportType(); got != "inprocess" {
t.Errorf("GetTransportType() = %q, want %q", got, "inprocess")
}
}
func TestInProcessTransportTypeInferred(t *testing.T) {
// When Type is empty but InProcessServer is set, infer "inprocess".
cfg := config.MCPServerConfig{
InProcessServer: newTestInProcessServer(),
}
if got := cfg.GetTransportType(); got != "inprocess" {
t.Errorf("GetTransportType() = %q, want %q", got, "inprocess")
}
}
func TestInProcessValidation(t *testing.T) {
// Valid: InProcessServer is set.
validCfg := &config.Config{
MCPServers: map[string]config.MCPServerConfig{
"test": {
Type: "inprocess",
InProcessServer: newTestInProcessServer(),
},
},
}
if err := validCfg.Validate(); err != nil {
t.Errorf("expected valid config, got error: %v", err)
}
// Invalid: type is inprocess but InProcessServer is nil.
invalidCfg := &config.Config{
MCPServers: map[string]config.MCPServerConfig{
"test": {
Type: "inprocess",
},
},
}
if err := invalidCfg.Validate(); err == nil {
t.Error("expected validation error for nil InProcessServer, got nil")
}
}
func TestConnectionPoolInProcessClient(t *testing.T) {
pool := NewMCPConnectionPool(DefaultConnectionPoolConfig(), false, nil, nil)
defer func() { _ = pool.Close() }()
ctx := context.Background()
srv := newTestInProcessServer()
cfg := config.MCPServerConfig{
Type: "inprocess",
InProcessServer: srv,
}
conn, err := pool.GetConnection(ctx, "test-inproc", cfg)
if err != nil {
t.Fatalf("GetConnection failed: %v", err)
}
// Verify the connection is healthy and functional.
if !conn.isHealthy {
t.Error("expected connection to be healthy")
}
// List tools to verify the connection works end-to-end.
toolsResp, err := conn.client.ListTools(ctx, mcp.ListToolsRequest{})
if err != nil {
t.Fatalf("ListTools failed: %v", err)
}
if len(toolsResp.Tools) != 1 {
t.Fatalf("expected 1 tool, got %d", len(toolsResp.Tools))
}
if toolsResp.Tools[0].Name != "greet" {
t.Errorf("expected tool name 'greet', got %q", toolsResp.Tools[0].Name)
}
}
func TestConnectionPoolInProcessToolExecution(t *testing.T) {
pool := NewMCPConnectionPool(DefaultConnectionPoolConfig(), false, nil, nil)
defer func() { _ = pool.Close() }()
ctx := context.Background()
srv := newTestInProcessServer()
cfg := config.MCPServerConfig{
Type: "inprocess",
InProcessServer: srv,
}
conn, err := pool.GetConnection(ctx, "test-inproc", cfg)
if err != nil {
t.Fatalf("GetConnection failed: %v", err)
}
// Call the tool.
result, err := conn.client.CallTool(ctx, mcp.CallToolRequest{
Request: mcp.Request{Method: "tools/call"},
Params: mcp.CallToolParams{
Name: "greet",
Arguments: map[string]any{"name": "World"},
},
})
if err != nil {
t.Fatalf("CallTool failed: %v", err)
}
if result.IsError {
t.Error("expected non-error result")
}
if len(result.Content) == 0 {
t.Fatal("expected at least one content block")
}
text, ok := result.Content[0].(mcp.TextContent)
if !ok {
t.Fatalf("expected TextContent, got %T", result.Content[0])
}
if text.Text != "Hello, World!" {
t.Errorf("expected 'Hello, World!', got %q", text.Text)
}
}
func TestMCPToolManagerInProcess(t *testing.T) {
ctx := context.Background()
srv := newTestInProcessServer()
mgr := NewMCPToolManager()
cfg := config.MCPServerConfig{
Type: "inprocess",
InProcessServer: srv,
}
count, err := mgr.AddServer(ctx, "myserver", cfg)
if err != nil {
t.Fatalf("AddServer failed: %v", err)
}
if count != 1 {
t.Errorf("expected 1 tool, got %d", count)
}
tools := mgr.GetTools()
if len(tools) != 1 {
t.Fatalf("expected 1 tool, got %d", len(tools))
}
if tools[0].Name != "myserver__greet" {
t.Errorf("expected tool name 'myserver__greet', got %q", tools[0].Name)
}
// Execute the tool.
input, _ := json.Marshal(map[string]any{"name": "SDK"})
result, err := mgr.ExecuteTool(ctx, "myserver__greet", string(input))
if err != nil {
t.Fatalf("ExecuteTool failed: %v", err)
}
if result.IsError {
t.Error("expected non-error result")
}
if result.Content == "" {
t.Error("expected non-empty result content")
}
// Verify result contains our greeting.
if !strings.Contains(result.Content, "Hello, SDK!") {
t.Errorf("expected 'Hello, SDK!' in result, got %q", result.Content)
}
}
func TestConnectionPoolInProcessInvalidServer(t *testing.T) {
pool := NewMCPConnectionPool(DefaultConnectionPoolConfig(), false, nil, nil)
defer func() { _ = pool.Close() }()
ctx := context.Background()
// Pass a non-*server.MCPServer value.
cfg := config.MCPServerConfig{
Type: "inprocess",
InProcessServer: "not a server",
}
_, err := pool.GetConnection(ctx, "bad", cfg)
if err == nil {
t.Fatal("expected error for invalid InProcessServer type")
}
}
func TestConnectionPoolInProcessReuse(t *testing.T) {
pool := NewMCPConnectionPool(DefaultConnectionPoolConfig(), false, nil, nil)
defer func() { _ = pool.Close() }()
ctx := context.Background()
srv := newTestInProcessServer()
cfg := config.MCPServerConfig{
Type: "inprocess",
InProcessServer: srv,
}
// Get connection twice — should reuse.
conn1, err := pool.GetConnection(ctx, "reuse-test", cfg)
if err != nil {
t.Fatalf("first GetConnection failed: %v", err)
}
conn2, err := pool.GetConnection(ctx, "reuse-test", cfg)
if err != nil {
t.Fatalf("second GetConnection failed: %v", err)
}
if conn1 != conn2 {
t.Error("expected same connection object on reuse")
}
}
+86
View File
@@ -77,6 +77,11 @@ host, err := kit.New(ctx, &kit.Options{
// Compaction
AutoCompact: true, // Auto-compact near context limit
// In-process MCP servers (map name → *kit.MCPServer)
InProcessMCPServers: map[string]*kit.MCPServer{
"docs": mcpSrv,
},
})
```
@@ -112,6 +117,79 @@ response, err := host.Prompt(
)
```
### Dynamic MCP Server Management
Add, remove, and list MCP servers at runtime:
```go
// Add an MCP server at runtime
n, err := host.AddMCPServer(ctx, "github", kit.MCPServerConfig{
Command: "npx",
Args: []string{"-y", "@modelcontextprotocol/server-github"},
})
fmt.Printf("Loaded %d tools from MCP server\n", n)
// List connected MCP servers
for _, s := range host.ListMCPServers() {
fmt.Printf("%s: %d tools\n", s.Name, s.ToolCount)
}
// Disconnect a server and remove its tools
host.RemoveMCPServer("github")
```
### In-Process MCP Servers
Register mcp-go servers that run in the same process — no subprocess spawning,
no network I/O. This is ideal for custom tool servers implemented in Go:
```go
import (
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
// Create an mcp-go server with tools
mcpSrv := server.NewMCPServer("my-tools", "1.0.0",
server.WithToolCapabilities(true),
)
mcpSrv.AddTool(mcp.NewTool("search_docs",
mcp.WithDescription("Search documentation"),
mcp.WithString("query", mcp.Required()),
), searchHandler)
// Option 1: At init time via Options
host, _ := kit.New(ctx, &kit.Options{
InProcessMCPServers: map[string]*kit.MCPServer{
"docs": mcpSrv,
},
})
// Option 2: At runtime
n, err := host.AddInProcessMCPServer(ctx, "docs", mcpSrv)
fmt.Printf("Loaded %d tools from in-process server\n", n)
```
Kit does not take ownership of the server's lifecycle — the caller is responsible for any cleanup. In-process server tools are prefixed the same way as external MCP servers (e.g. `"docs__search_docs"`).
### MCP Prompts
MCP servers can expose prompt templates via the MCP prompts capability.
Kit exposes these through the SDK:
```go
// List prompts from all connected MCP servers
prompts := host.ListMCPPrompts()
for _, p := range prompts {
fmt.Printf("%s/%s: %s\n", p.Server, p.Name, p.Description)
}
// Get a specific prompt with arguments
msg, err := host.GetMCPPrompt(ctx, "server-name", "prompt-name", map[string]string{
"topic": "concurrency",
})
```
### Session Management
Maintain conversation context:
@@ -145,6 +223,13 @@ kit.LLMUsage // {InputTokens, OutputTokens, TotalTokens, ...}
kit.LLMResponse // {Content, FinishReason, Usage}
kit.LLMFilePart // {Filename, Data []byte, MediaType}
// MCP OAuth types
kit.MCPServer // *server.MCPServer for in-process MCP transport
kit.MCPServerConfig // Configuration for an MCP server (stdio, SSE, or in-process)
kit.MCPTokenStore // Persists OAuth tokens for a single MCP server
kit.MCPToken // OAuth token (access token, refresh token, expiry)
kit.MCPTokenStoreFactory // Creates an MCPTokenStore for a given server URL
// Conversion helpers
msgs := kit.ConvertToLLMMessages(&msg) // SDK Message → []LLMMessage
msg := kit.ConvertFromLLMMessage(lMsg) // LLMMessage → SDK Message
@@ -192,6 +277,7 @@ Key `Options` fields for SDK usage:
| `NoSession` | Ephemeral mode (no session persistence) |
| `SessionPath` | Open specific session file |
| `Continue` | Resume most recent session |
| `InProcessMCPServers` | Map of name → `*kit.MCPServer` for in-process MCP servers |
| `Debug` | Enable debug logging |
## Environment Variables
+72
View File
@@ -176,6 +176,41 @@ func (m *Kit) AddMCPServer(ctx context.Context, name string, cfg MCPServerConfig
return m.agent.AddMCPServer(ctx, name, cfg)
}
// AddInProcessMCPServer connects an in-process mcp-go server and makes its
// tools available to the agent immediately. Unlike [AddMCPServer] with a
// command/URL config, this uses mcp-go's in-process transport — no subprocess
// is spawned and no network I/O occurs.
//
// The server must be a *[server.MCPServer] from github.com/mark3labs/mcp-go/server.
// Kit does not take ownership of the server's lifecycle; the caller is responsible
// for any cleanup when the server is no longer needed.
//
// Returns the number of tools loaded from the server.
//
// Example:
//
// import (
// "github.com/mark3labs/mcp-go/mcp"
// "github.com/mark3labs/mcp-go/server"
// )
//
// mcpSrv := server.NewMCPServer("my-tools", "1.0.0",
// server.WithToolCapabilities(true),
// )
// mcpSrv.AddTool(mcp.NewTool("search_docs",
// mcp.WithDescription("Search documentation"),
// mcp.WithString("query", mcp.Required()),
// ), searchHandler)
//
// n, err := k.AddInProcessMCPServer(ctx, "docs", mcpSrv)
func (m *Kit) AddInProcessMCPServer(ctx context.Context, name string, srv *MCPServer) (int, error) {
cfg := MCPServerConfig{
Type: "inprocess",
InProcessServer: srv,
}
return m.agent.AddMCPServer(ctx, name, cfg)
}
// RemoveMCPServer disconnects an MCP server and removes all its tools from
// the agent. After this call the agent will no longer see or be able to call
// tools from the named server.
@@ -814,6 +849,28 @@ type Options struct {
// (e.g. AGENTS.md) from the working directory.
NoContextFiles bool
// InProcessMCPServers registers mcp-go servers that run in the same
// process. Each key is the server name (used to prefix tool names, e.g.
// "docs__search"). The value must be a *[server.MCPServer].
//
// In-process servers bypass subprocess spawning and network I/O entirely.
// Kit does not take ownership of the servers — the caller is responsible
// for any cleanup after [Kit.Close].
//
// Example:
//
// mcpSrv := server.NewMCPServer("my-tools", "1.0.0",
// server.WithToolCapabilities(true),
// )
// mcpSrv.AddTool(mcp.NewTool("search", ...), handler)
//
// host, _ := kit.New(ctx, &kit.Options{
// InProcessMCPServers: map[string]*kit.MCPServer{
// "docs": mcpSrv,
// },
// })
InProcessMCPServers map[string]*MCPServer
// Compaction
AutoCompact bool // Auto-compact when near context limit
CompactionOptions *CompactionOptions // Config for auto-compaction (nil = defaults)
@@ -1091,6 +1148,21 @@ func New(ctx context.Context, opts *Options) (*Kit, error) {
}
}
// Merge in-process MCP servers from Options into the MCP config.
// These are programmatically-provided *server.MCPServer instances that
// bypass subprocess spawning and network I/O.
if len(opts.InProcessMCPServers) > 0 {
if mcpConfig.MCPServers == nil {
mcpConfig.MCPServers = make(map[string]config.MCPServerConfig, len(opts.InProcessMCPServers))
}
for name, srv := range opts.InProcessMCPServers {
mcpConfig.MCPServers[name] = config.MCPServerConfig{
Type: "inprocess",
InProcessServer: srv,
}
}
}
// Pre-create hook registries so the tool wrapper can reference them.
// Hooks registered after New() returns are still invoked because the
// wrapper captures the registries by pointer.
+7
View File
@@ -12,6 +12,7 @@ import (
"github.com/mark3labs/kit/internal/models"
"github.com/mark3labs/kit/internal/session"
"github.com/mark3labs/mcp-go/client/transport"
"github.com/mark3labs/mcp-go/server"
)
// ==== Message Types (internal/message/content.go) ====
@@ -207,6 +208,12 @@ type CompactionOptions = compaction.CompactionOptions
// ==== MCP OAuth Types ====
// MCPServer is an in-process MCP server from the mcp-go library.
// Pass an instance to [Kit.AddInProcessMCPServer] or
// [Options.InProcessMCPServers] to register tools without spawning a
// subprocess or making network calls.
type MCPServer = server.MCPServer
// MCPTokenStore persists OAuth tokens for a single MCP server. Implementations
// must be safe for concurrent use.
//
+40
View File
@@ -112,11 +112,18 @@ host, err := kit.New(ctx, &kit.Options{
MCPTokenStoreFactory: func(serverURL string) (kit.MCPTokenStore, error) {
return myCustomStore(serverURL), nil // custom OAuth token storage
},
// In-Process MCP Servers
InProcessMCPServers: map[string]*kit.MCPServer{
"docs": mcpSrv, // *server.MCPServer from mcp-go — no subprocess needed
},
})
```
**Critical distinction**: `Tools` replaces ALL default tools (core + MCP + extension). `ExtraTools` adds tools alongside the defaults. Use `Tools` to restrict the agent's capabilities; use `ExtraTools` to extend them.
**In-process MCP servers** bypass subprocess spawning entirely. Pass `*server.MCPServer` instances from mcp-go via `InProcessMCPServers` or call `AddInProcessMCPServer()` at runtime.
---
## Prompt Methods
@@ -669,6 +676,38 @@ for _, s := range servers {
`AddMCPServer` is safe to call while the agent is idle. If a turn is in progress, new tools are visible starting from the next LLM step. Tool names are prefixed with the server name (e.g. `"github__create_issue"`).
### In-Process MCP Servers
Register mcp-go servers that run in the same process — no subprocess spawning,
no network I/O:
```go
import (
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
mcpSrv := server.NewMCPServer("my-tools", "1.0.0",
server.WithToolCapabilities(true),
)
mcpSrv.AddTool(mcp.NewTool("search_docs",
mcp.WithDescription("Search documentation"),
mcp.WithString("query", mcp.Required()),
), searchHandler)
// At init time
host, _ := kit.New(ctx, &kit.Options{
InProcessMCPServers: map[string]*kit.MCPServer{
"docs": mcpSrv,
},
})
// Or at runtime
n, err := host.AddInProcessMCPServer(ctx, "docs", mcpSrv)
```
Kit does not own the server lifecycle — the caller handles cleanup. Tools are prefixed as usual (e.g. `"docs__search_docs"`).
### MCP Prompts
Query and expand prompts defined by connected MCP servers:
@@ -920,6 +959,7 @@ kit.MCPTokenStore // interface for custom OAuth token storage
kit.MCPToken // OAuth token struct (access, refresh, expiry)
kit.MCPTokenStoreFactory // func(serverURL string) (MCPTokenStore, error)
kit.ErrMCPNoToken // sentinel error for "no token stored"
kit.MCPServer // *server.MCPServer for in-process MCP transport
kit.MCPServerStatus // {Name string, ToolCount int}
kit.MCPPrompt // {Name, Description, Arguments []MCPPromptArgument, ServerName}
kit.MCPPromptArgument // {Name, Description string, Required bool}
+6
View File
@@ -55,6 +55,11 @@ host, err := kit.New(ctx, &kit.Options{
MCPTokenStoreFactory: func(serverURL string) (kit.MCPTokenStore, error) {
return myStore(serverURL), nil
},
// In-Process MCP Servers
InProcessMCPServers: map[string]*kit.MCPServer{
"docs": mcpSrv, // *server.MCPServer from mcp-go
},
})
```
@@ -86,6 +91,7 @@ host, err := kit.New(ctx, &kit.Options{
| `NoContextFiles` | `bool` | `false` | Disable automatic AGENTS.md loading |
| `SessionManager` | `SessionManager` | — | Custom session backend (advanced) |
| `MCPTokenStoreFactory` | `func` | — | Custom OAuth token storage for MCP servers |
| `InProcessMCPServers` | `map[string]*MCPServer` | — | In-process mcp-go servers (no subprocess) |
## Tool configuration
+29
View File
@@ -149,6 +149,35 @@ err = host.RemoveMCPServer("github")
servers := host.ListMCPServers() // []kit.MCPServerStatus
```
### In-process MCP servers
Register mcp-go servers running in the same process — zero subprocess overhead:
```go
import (
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
mcpSrv := server.NewMCPServer("my-tools", "1.0.0",
server.WithToolCapabilities(true),
)
mcpSrv.AddTool(mcp.NewTool("search_docs",
mcp.WithDescription("Search documentation"),
mcp.WithString("query", mcp.Required()),
), searchHandler)
// At init time
host, _ := kit.New(ctx, &kit.Options{
InProcessMCPServers: map[string]*kit.MCPServer{
"docs": mcpSrv,
},
})
// Or at runtime
n, _ := host.AddInProcessMCPServer(ctx, "docs", mcpSrv)
```
## MCP prompts and resources
Query prompts and resources exposed by connected MCP servers: