Files
Ed Zynda fd960921ca refactor: address code audit findings across SDK, cmd, and internals
- Remove deprecated GenerateWithLoopAndStreaming and TreeManager
  AppendFantasyMessage / AddFantasyMessages / GetFantasyMessages to
  close the SDK leakage caused by the kit.TreeManager type alias
- Switch extensionAPI method signatures to local Extension* aliases so
  pkg.go.dev signatures no longer expose internal package names
- Bundle runNormalMode dependencies into a runModeDeps struct, shrinking
  the runNonInteractive and runInteractive call sites from 40+ positional
  args to (ctx, deps)
- Add generic subscribeTyped[E Event] helper and collapse ~30 typed OnXxx
  wrappers in pkg/kit/events.go onto it (public signatures unchanged)
- Extract setupBashPipes / interpretBashExit in internal/core/bash.go to
  deduplicate the buffered and streaming execution paths
- Extract resolveAutoRouteAPIKey and wrapProviderErr helpers in
  internal/models/providers.go and uniformly apply them across every
  createXxxProvider site
- Reimplement internal/extensions/watcher.go as a thin wrapper over the
  general-purpose internal/watcher.ContentWatcher, eliminating ~130 LOC
  of duplicated fsnotify logic while preserving the existing test API
- Add ctx.Err() pre-flight checks in executeRead / Write / Edit / Ls so
  cancellation actually short-circuits pure file-IO tools
2026-06-06 19:22:05 +03:00

1342 lines
47 KiB
Go

package agent
import (
"context"
"encoding/json"
"fmt"
"io"
"strings"
"sync"
"time"
"charm.land/fantasy"
"github.com/mark3labs/kit/internal/config"
"github.com/mark3labs/kit/internal/core"
"github.com/mark3labs/kit/internal/message"
"github.com/mark3labs/kit/internal/models"
"github.com/mark3labs/kit/internal/tools"
)
// AgentConfig holds configuration options for creating a new Agent.
type AgentConfig struct {
ModelConfig *models.ProviderConfig
MCPConfig *config.Config
SystemPrompt string
MaxSteps int
StreamingEnabled bool
DebugLogger tools.DebugLogger
// AuthHandler handles OAuth authorization for remote MCP servers.
// When set, remote transports are configured with OAuth support.
// If nil, remote MCP servers that require OAuth will fail to connect.
AuthHandler tools.MCPAuthHandler
// TokenStoreFactory, if non-nil, creates a custom token store for each
// remote MCP server's OAuth tokens. When nil, the default file-based
// token store is used.
TokenStoreFactory tools.TokenStoreFactory
// CoreTools overrides the default core tool set. If empty, core.AllTools()
// is used. This allows SDK users to provide a custom tool set (e.g.
// CodingTools or tools with a custom WorkDir).
CoreTools []fantasy.AgentTool
// DisableCoreTools, when true, prevents loading any core tools.
// If both DisableCoreTools is true and CoreTools is empty, the agent
// will have no tools (useful for simple chat completions).
DisableCoreTools bool
// ToolWrapper is an optional function that wraps the combined tool list
// before it is passed to the LLM agent. Used by the extensions system
// to intercept tool calls/results.
ToolWrapper func([]fantasy.AgentTool) []fantasy.AgentTool
// ExtraTools are additional tools to include alongside core and MCP tools.
// Used by extensions to register custom tools.
ExtraTools []fantasy.AgentTool
// OnMCPServerLoaded, if non-nil, is called when each MCP server finishes
// loading (successfully or with error). The callback receives the server
// name, tool count, and any error. Called from the background goroutine.
OnMCPServerLoaded func(serverName string, toolCount int, err error)
// MCPTaskConfig configures task-augmented tools/call execution. The
// zero value preserves historical synchronous-only behaviour for any
// server that didn't advertise task support during initialize.
MCPTaskConfig tools.MCPTaskConfig
}
// ToolCallHandler is a function type for handling tool calls as they happen.
type ToolCallHandler func(toolCallID, toolName, toolArgs string)
// ToolExecutionHandler is a function type for handling tool execution start/end events.
type ToolExecutionHandler func(toolCallID, toolName, toolArgs string, isStarting bool)
// ToolResultHandler is a function type for handling tool results.
// The metadata parameter carries optional structured data (e.g. file diff
// info) from the tool execution, JSON-encoded. It may be empty.
type ToolResultHandler func(toolCallID, toolName, toolArgs, result, metadata string, isError bool)
// ResponseHandler is a function type for handling LLM responses.
type ResponseHandler func(content string)
// StreamingResponseHandler is a function type for handling streaming LLM responses.
type StreamingResponseHandler func(content string)
// ToolCallContentHandler is a function type for handling content that accompanies tool calls.
type ToolCallContentHandler func(content string)
// ReasoningDeltaHandler is a function type for handling streaming reasoning/thinking deltas.
type ReasoningDeltaHandler func(delta string)
// ReasoningCompleteHandler is a function type for handling reasoning/thinking completion.
// Called when the last reasoning token has been processed, before text streaming starts.
type ReasoningCompleteHandler func()
// ToolCallStartHandler is a function type for handling the moment when the LLM
// begins generating tool call arguments. The tool name is known but the full
// argument JSON is still streaming.
type ToolCallStartHandler func(toolCallID, toolName string)
// ToolCallDeltaHandler is a function type for handling streamed fragments of
// tool call arguments as they arrive from the LLM.
type ToolCallDeltaHandler func(toolCallID, delta string)
// ToolCallEndHandler is a function type for handling the end of tool argument
// streaming, before the tool call is parsed and execution begins.
type ToolCallEndHandler func(toolCallID string)
// ToolOutputHandler is a function type for handling streaming tool output chunks.
// Used by tools like bash to stream output as it arrives rather than waiting
// for the command to complete. The isStderr flag indicates if the chunk
// contains stderr output.
// Note: This is an alias for core.ToolOutputCallback to avoid import cycles.
type ToolOutputHandler = core.ToolOutputCallback
// PasswordPromptHandler is a function type for password prompts.
// Used by the bash tool when sudo requires a password. The handler receives
// a prompt message and returns the password and whether it was cancelled.
// Note: This is an alias for core.PasswordPromptCallback.
type PasswordPromptHandler = core.PasswordPromptCallback
// StepMessagesHandler is a function type for persisting messages after each
// complete step in a multi-step agent turn. The handler receives the messages
// produced by the step (typically an assistant message with tool calls followed
// by a tool-role message with results, or a final assistant message with text).
// This enables incremental session persistence so that progress is saved as
// it happens rather than only at the end of the turn.
type StepMessagesHandler func(stepMessages []fantasy.Message)
// StepUsageHandler is a function type for handling token usage after each
// complete step in a multi-step agent turn. This enables real-time cost
// tracking during long-running tool-calling conversations.
type StepUsageHandler func(inputTokens, outputTokens, cacheReadTokens, cacheCreationTokens int64)
// StepStartHandler is called when a new LLM step begins within a turn.
type StepStartHandler func(stepNumber int)
// StepFinishHandler is called when a step completes with full context.
type StepFinishHandler func(stepNumber int, hasToolCalls bool, finishReason string, usage fantasy.Usage)
// TextStartHandler is called when the LLM begins generating text content.
type TextStartHandler func(id string)
// TextEndHandler is called when the LLM finishes generating text content.
type TextEndHandler func(id string)
// ReasoningStartHandler is called when the LLM begins reasoning/thinking.
type ReasoningStartHandler func(id string)
// WarningsHandler is called when the LLM provider returns warnings.
type WarningsHandler func(warnings []string)
// SourceHandler is called when the LLM references a source.
type SourceHandler func(sourceType, id, url, title string)
// StreamFinishHandler is called when a per-step LLM stream completes.
type StreamFinishHandler func(usage fantasy.Usage, finishReason string)
// ErrorHandler is called when an agent-level error occurs.
type ErrorHandler func(err error)
// RetryHandler is called when the LLM request is retried.
type RetryHandler func(attempt int, err error)
// PrepareStepHandler is called between steps to allow message modification.
// It receives the step number and current messages, and returns replacement
// messages (or nil to keep unchanged).
type PrepareStepHandler func(stepNumber int, messages []fantasy.Message) []fantasy.Message
// GenerateCallbacks consolidates all callback functions for
// GenerateWithCallbacks into a single struct, replacing what was previously
// 16+ positional callback parameters. New fields default to nil, so adding
// new callbacks does not break existing callers.
type GenerateCallbacks struct {
OnToolCall ToolCallHandler
OnToolExecution ToolExecutionHandler
OnToolResult ToolResultHandler
OnResponse ResponseHandler
OnToolCallContent ToolCallContentHandler
OnStreamingResponse StreamingResponseHandler
OnReasoningDelta ReasoningDeltaHandler
OnReasoningComplete ReasoningCompleteHandler
OnToolOutput ToolOutputHandler
OnStepMessages StepMessagesHandler
OnStepUsage StepUsageHandler
OnPasswordPrompt PasswordPromptHandler
OnToolCallStart ToolCallStartHandler
OnToolCallDelta ToolCallDeltaHandler
OnToolCallEnd ToolCallEndHandler
// New callbacks for previously unwired Fantasy lifecycle events.
OnStepStart StepStartHandler
OnStepFinish StepFinishHandler
OnTextStart TextStartHandler
OnTextEnd TextEndHandler
OnReasoningStart ReasoningStartHandler
OnWarnings WarningsHandler
OnSource SourceHandler
OnStreamFinish StreamFinishHandler
OnError ErrorHandler
OnRetry RetryHandler
OnPrepareStep PrepareStepHandler
}
// Agent represents an AI agent with core tool integration using the LLM library.
// Core tools (bash, read, write, edit, grep, find, ls) are registered as direct
// AgentTool implementations — no MCP layer, no serialization overhead.
// Additional tools from external MCP servers can be loaded alongside core tools.
//
// When MCP servers are configured, tool loading happens in the background so the
// agent (and UI) can start immediately. The first LLM call automatically waits
// for MCP tools to finish loading before proceeding.
type Agent struct {
toolManager *tools.MCPToolManager
fantasyAgent fantasy.Agent
model fantasy.LanguageModel
providerCloser io.Closer // optional cleanup for providers like kronk
maxSteps int
systemPrompt string
loadingMessage string
providerType string
streamingEnabled bool
coreTools []fantasy.AgentTool
extraTools []fantasy.AgentTool
toolWrapper func([]fantasy.AgentTool) []fantasy.AgentTool // stored for SetModel rebuild
// providerOptions and modelConfig are stored for rebuilding the fantasy
// agent when MCP tools arrive asynchronously or on SetModel.
providerOptions fantasy.ProviderOptions
skipMaxOutputTokens bool
modelConfig *models.ProviderConfig
// authHandler and tokenStoreFactory are stored from AgentConfig so that
// AddMCPServer() can propagate them when creating a new MCPToolManager
// at runtime (i.e. when no MCP servers were configured at init time).
authHandler tools.MCPAuthHandler
tokenStoreFactory tools.TokenStoreFactory
// mcpTaskConfig is stored from AgentConfig so AddMCPServer() can
// propagate it to a lazily-created MCPToolManager.
mcpTaskConfig tools.MCPTaskConfig
// mcpReady is closed when background MCP tool loading completes (success
// or failure). nil when no MCP servers are configured.
mcpReady chan struct{}
// mcpErr holds any error from background MCP loading.
mcpErr error
// promptMu serializes runtime updates to systemPrompt and the
// accompanying fantasy agent rebuild so concurrent SetSystemPrompt
// callers (e.g. Kit.applyComposedSystemPrompt invoked from multiple
// goroutines) don't race on a.systemPrompt / a.fantasyAgent.
promptMu sync.Mutex
}
// GenerateWithLoopResult contains the result and conversation history from an agent interaction.
type GenerateWithLoopResult struct {
// FinalResponse is the last message generated by the model
FinalResponse *fantasy.Response
// ConversationMessages contains all messages in the conversation including tool calls and results
ConversationMessages []fantasy.Message
// Messages contains the conversation as custom content blocks
Messages []message.Message
// TotalUsage contains aggregate token usage across all steps
TotalUsage fantasy.Usage
// StopReason is the LLM provider's finish reason for the final response.
StopReason string
// PersistedMessageCount is the number of new messages (beyond the original
// input) that were already persisted incrementally via OnStepMessages during
// generation. The caller should skip these when doing post-generation
// persistence to avoid duplicates.
PersistedMessageCount int
}
// NewAgent creates a new Agent with core tools and optional MCP tool integration.
// Core tools (bash, read, write, edit, grep, find, ls) are always registered.
// If MCP servers are configured, their tools are loaded in the background —
// the agent returns immediately and is usable with core tools only. The first
// LLM call (GenerateWithLoop) automatically waits for MCP tools to finish
// loading and rebuilds the agent with the full tool set.
func NewAgent(ctx context.Context, agentConfig *AgentConfig) (*Agent, error) {
// Create the LLM provider
providerResult, err := models.CreateProvider(ctx, agentConfig.ModelConfig)
if err != nil {
return nil, fmt.Errorf("failed to create model provider: %v", err)
}
// Register core tools (direct AgentTool implementations, no MCP overhead).
// Use caller-provided tools if set, otherwise default to all core tools.
// DisableCoreTools allows explicitly having zero tools (for chat-only mode).
var coreTools []fantasy.AgentTool
if agentConfig.DisableCoreTools && len(agentConfig.CoreTools) == 0 {
// Explicitly zero tools - chat-only mode
coreTools = nil
} else if len(agentConfig.CoreTools) > 0 {
// Custom tools provided - use them
coreTools = agentConfig.CoreTools
} else {
// Default: load all core tools
coreTools = core.AllTools()
}
// Build the initial tool list: core tools + extension tools (no MCP yet).
allTools := make([]fantasy.AgentTool, len(coreTools))
copy(allTools, coreTools)
// Append any extra tools provided by extensions.
if len(agentConfig.ExtraTools) > 0 {
allTools = append(allTools, agentConfig.ExtraTools...)
}
// Apply tool wrapper (extension interception layer) if configured.
if agentConfig.ToolWrapper != nil {
allTools = agentConfig.ToolWrapper(allTools)
}
// Build agent options
agentOpts := buildAgentOptions(agentConfig, providerResult, allTools)
// Create the agent
fantasyAgent := fantasy.NewAgent(providerResult.Model, agentOpts...)
// Determine provider type from model string
providerType := "default"
if agentConfig.ModelConfig != nil && agentConfig.ModelConfig.ModelString != "" {
if p, _, err := models.ParseModelString(agentConfig.ModelConfig.ModelString); err == nil {
providerType = p
}
}
a := &Agent{
fantasyAgent: fantasyAgent,
model: providerResult.Model,
providerCloser: providerResult.Closer,
maxSteps: agentConfig.MaxSteps,
systemPrompt: agentConfig.SystemPrompt,
loadingMessage: providerResult.Message,
providerType: providerType,
streamingEnabled: agentConfig.StreamingEnabled,
coreTools: coreTools,
extraTools: agentConfig.ExtraTools,
toolWrapper: agentConfig.ToolWrapper,
providerOptions: providerResult.ProviderOptions,
skipMaxOutputTokens: providerResult.SkipMaxOutputTokens,
modelConfig: agentConfig.ModelConfig,
authHandler: agentConfig.AuthHandler,
tokenStoreFactory: agentConfig.TokenStoreFactory,
mcpTaskConfig: agentConfig.MCPTaskConfig,
}
// Start MCP tool loading in the background if servers are configured.
// The mcpReady channel is closed when loading completes (success or failure).
if agentConfig.MCPConfig != nil && len(agentConfig.MCPConfig.MCPServers) > 0 {
toolManager := tools.NewMCPToolManager()
if agentConfig.AuthHandler != nil {
toolManager.SetAuthHandler(agentConfig.AuthHandler)
}
if agentConfig.TokenStoreFactory != nil {
toolManager.SetTokenStoreFactory(agentConfig.TokenStoreFactory)
}
if agentConfig.DebugLogger != nil {
toolManager.SetDebugLogger(agentConfig.DebugLogger)
}
// Set per-server loaded callback if provided.
if agentConfig.OnMCPServerLoaded != nil {
toolManager.SetOnServerLoaded(agentConfig.OnMCPServerLoaded)
}
// Apply task-augmented tool execution config (zero value = no-op).
toolManager.SetTaskConfig(agentConfig.MCPTaskConfig)
a.toolManager = toolManager
a.mcpReady = make(chan struct{})
go func() {
defer close(a.mcpReady)
if err := toolManager.LoadTools(ctx, agentConfig.MCPConfig); err != nil {
a.mcpErr = err
fmt.Printf("Warning: Failed to load MCP tools: %v\n", err)
}
}()
}
return a, nil
}
// WaitForMCPTools blocks until background MCP tool loading completes.
// Returns nil if no MCP servers are configured or if loading succeeded.
// Returns the loading error if all servers failed. Safe to call multiple times.
func (a *Agent) WaitForMCPTools() error {
if a.mcpReady == nil {
return nil
}
<-a.mcpReady
return a.mcpErr
}
// MCPToolsReady returns true if MCP tool loading has completed (or was never
// started). This is a non-blocking check useful for UI status display.
func (a *Agent) MCPToolsReady() bool {
if a.mcpReady == nil {
return true
}
select {
case <-a.mcpReady:
return true
default:
return false
}
}
// ensureMCPTools waits for MCP tools to load and rebuilds the fantasy agent
// with the full tool set. Called lazily before the first LLM call.
// This is idempotent — subsequent calls after the first rebuild are no-ops.
func (a *Agent) ensureMCPTools() {
if a.mcpReady == nil {
return
}
<-a.mcpReady
// If there are MCP tools, rebuild the fantasy agent to include them.
if a.toolManager != nil && len(a.toolManager.GetTools()) > 0 {
a.rebuildFantasyAgent()
}
// Nil out the channel so future calls are instant no-ops and we
// don't rebuild again.
a.mcpReady = nil
}
// rebuildFantasyAgent reconstructs the fantasy agent with the current full
// tool set (core + MCP + extension tools). Used after MCP tools arrive
// asynchronously and by SetModel.
func (a *Agent) rebuildFantasyAgent() {
allTools := make([]fantasy.AgentTool, len(a.coreTools))
copy(allTools, a.coreTools)
if a.toolManager != nil {
allTools = append(allTools, mcpToolsToAgentTools(a.toolManager.GetTools(), a.toolManager)...)
}
if len(a.extraTools) > 0 {
allTools = append(allTools, a.extraTools...)
}
if a.toolWrapper != nil {
allTools = a.toolWrapper(allTools)
}
providerResult := &models.ProviderResult{
Model: a.model,
ProviderOptions: a.providerOptions,
SkipMaxOutputTokens: a.skipMaxOutputTokens,
}
agentOpts := buildAgentOptions(&AgentConfig{
ModelConfig: a.modelConfig,
SystemPrompt: a.systemPrompt,
MaxSteps: a.maxSteps,
}, providerResult, allTools)
a.fantasyAgent = fantasy.NewAgent(a.model, agentOpts...)
}
// buildAgentOptions constructs the fantasy.AgentOption slice from config,
// provider result, and the combined tool list. Shared by NewAgent,
// rebuildFantasyAgent, and SetModel.
func buildAgentOptions(agentConfig *AgentConfig, providerResult *models.ProviderResult, allTools []fantasy.AgentTool) []fantasy.AgentOption {
var agentOpts []fantasy.AgentOption
if agentConfig.SystemPrompt != "" {
agentOpts = append(agentOpts, fantasy.WithSystemPrompt(agentConfig.SystemPrompt))
}
if len(allTools) > 0 {
agentOpts = append(agentOpts, fantasy.WithTools(allTools...))
}
// Set max steps as stop condition
if agentConfig.MaxSteps > 0 {
agentOpts = append(agentOpts, fantasy.WithStopConditions(
fantasy.StepCountIs(agentConfig.MaxSteps),
))
}
// Pass provider-specific options (e.g. OpenAI Responses API reasoning settings).
if providerResult.ProviderOptions != nil {
agentOpts = append(agentOpts, fantasy.WithProviderOptions(providerResult.ProviderOptions))
}
// Pass generation parameters when available.
if agentConfig.ModelConfig != nil {
// Skip max_output_tokens for providers that don't support it (e.g., Codex OAuth)
if agentConfig.ModelConfig.MaxTokens > 0 && !providerResult.SkipMaxOutputTokens {
agentOpts = append(agentOpts, fantasy.WithMaxOutputTokens(int64(agentConfig.ModelConfig.MaxTokens)))
}
if agentConfig.ModelConfig.Temperature != nil {
agentOpts = append(agentOpts, fantasy.WithTemperature(float64(*agentConfig.ModelConfig.Temperature)))
}
if agentConfig.ModelConfig.TopP != nil {
agentOpts = append(agentOpts, fantasy.WithTopP(float64(*agentConfig.ModelConfig.TopP)))
}
if agentConfig.ModelConfig.TopK != nil {
agentOpts = append(agentOpts, fantasy.WithTopK(int64(*agentConfig.ModelConfig.TopK)))
}
if agentConfig.ModelConfig.FrequencyPenalty != nil {
agentOpts = append(agentOpts, fantasy.WithFrequencyPenalty(float64(*agentConfig.ModelConfig.FrequencyPenalty)))
}
if agentConfig.ModelConfig.PresencePenalty != nil {
agentOpts = append(agentOpts, fantasy.WithPresencePenalty(float64(*agentConfig.ModelConfig.PresencePenalty)))
}
}
return agentOpts
}
// GenerateWithLoop processes messages with a custom loop that displays tool calls in real-time.
func (a *Agent) GenerateWithLoop(ctx context.Context, messages []fantasy.Message,
onToolCall ToolCallHandler, onToolExecution ToolExecutionHandler, onToolResult ToolResultHandler,
onResponse ResponseHandler, onToolCallContent ToolCallContentHandler,
) (*GenerateWithLoopResult, error) {
return a.GenerateWithCallbacks(ctx, messages, GenerateCallbacks{
OnToolCall: onToolCall,
OnToolExecution: onToolExecution,
OnToolResult: onToolResult,
OnResponse: onResponse,
OnToolCallContent: onToolCallContent,
})
}
// GenerateWithCallbacks processes messages using the agent with streaming and callbacks.
// The agent handles the tool call loop internally. We map the rich callback system
// to kit's existing callback interface for UI integration.
func (a *Agent) GenerateWithCallbacks(ctx context.Context, messages []fantasy.Message,
cb GenerateCallbacks,
) (*GenerateWithLoopResult, error) {
// Wait for background MCP tool loading to complete and rebuild the
// fantasy agent with the full tool set. This is a no-op when no MCP
// servers are configured or tools have already been integrated.
a.ensureMCPTools()
// Inject tool output handler into context for use by core tools (e.g., bash).
if cb.OnToolOutput != nil {
ctx = core.ContextWithToolOutputCallback(ctx, cb.OnToolOutput)
}
// Inject password prompt handler into context for use by bash tool.
if cb.OnPasswordPrompt != nil {
ctx = core.ContextWithPasswordPrompt(ctx, cb.OnPasswordPrompt)
}
// The agent requires the current user input as Prompt, with prior messages as history.
// Extract the last user message text and files as the prompt, and pass everything
// before it as Messages. Files (e.g. clipboard images) are passed via the Files
// field so the agent includes them in the API request.
prompt, files, history := splitPromptAndHistory(messages)
// Apply message-level cache control for Anthropic models.
// This avoids type conflicts with provider-level options.
history = applyCacheControlToMessages(history)
// Track tool call args per-ToolCallID so parallel tool calls in a single
// step don't clobber each other. Without this, OnToolResult callbacks would
// all see the args of the last OnToolCall in the step. The mutex guards
// against the possibility that the underlying streaming layer dispatches
// callbacks from multiple goroutines.
toolCallArgs := make(map[string]string)
var toolCallArgsMu sync.Mutex
// Use the streaming path when streaming is enabled OR when any callbacks are
// provided. The agent only exposes tool/step callbacks on AgentStreamCall, so
// Stream is required to observe tool execution in real time. The non-streaming
// Generate path is reserved for the simple case with no callbacks at all.
hasCallbacks := cb.OnToolCall != nil || cb.OnToolExecution != nil || cb.OnToolResult != nil ||
cb.OnToolCallContent != nil || cb.OnStreamingResponse != nil || cb.OnReasoningDelta != nil ||
cb.OnToolCallStart != nil || cb.OnToolCallDelta != nil || cb.OnToolCallEnd != nil ||
cb.OnStepStart != nil || cb.OnStepFinish != nil || cb.OnTextStart != nil ||
cb.OnTextEnd != nil || cb.OnReasoningStart != nil || cb.OnWarnings != nil ||
cb.OnSource != nil || cb.OnStreamFinish != nil || cb.OnError != nil ||
cb.OnRetry != nil || cb.OnPrepareStep != nil
if a.streamingEnabled || hasCallbacks {
// Track completed step messages so we can return partial results
// on cancellation. The agent's Stream() discards accumulated steps
// when it returns an error, but the OnStepFinish callback fires
// for every step that completed before the error occurred.
var completedStepMessages []fantasy.Message
// persistedCount tracks how many new messages (beyond the original
// input) were persisted incrementally via cb.OnStepMessages, so the
// caller can skip them during post-generation persistence.
var persistedCount int
// stepCounter tracks the current step number for StepStart/StepFinish events.
var stepCounter int
// Use the streaming agent
streamCall := fantasy.AgentStreamCall{
Prompt: prompt,
Files: files,
Messages: history,
// Tool input streaming callbacks — fire during tool argument generation
OnToolInputStart: func(id, toolName string) error {
if ctx.Err() != nil {
return ctx.Err()
}
if cb.OnToolCallStart != nil {
cb.OnToolCallStart(id, toolName)
}
return nil
},
OnToolInputDelta: func(id, delta string) error {
if ctx.Err() != nil {
return ctx.Err()
}
if cb.OnToolCallDelta != nil {
cb.OnToolCallDelta(id, delta)
}
return nil
},
OnToolInputEnd: func(id string) error {
if ctx.Err() != nil {
return ctx.Err()
}
if cb.OnToolCallEnd != nil {
cb.OnToolCallEnd(id)
}
return nil
},
// Text start/end callbacks
OnTextStart: func(id string) error {
if ctx.Err() != nil {
return ctx.Err()
}
if cb.OnTextStart != nil {
cb.OnTextStart(id)
}
return nil
},
OnTextEnd: func(id string) error {
if ctx.Err() != nil {
return ctx.Err()
}
if cb.OnTextEnd != nil {
cb.OnTextEnd(id)
}
return nil
},
// Reasoning start callback
OnReasoningStart: func(id string, _ fantasy.ReasoningContent) error {
if ctx.Err() != nil {
return ctx.Err()
}
if cb.OnReasoningStart != nil {
cb.OnReasoningStart(id)
}
return nil
},
// Reasoning/thinking streaming callback
OnReasoningDelta: func(id, delta string) error {
if ctx.Err() != nil {
return ctx.Err()
}
if cb.OnReasoningDelta != nil {
cb.OnReasoningDelta(delta)
}
return nil
},
// Reasoning/thinking complete callback
OnReasoningEnd: func(id string, _ fantasy.ReasoningContent) error {
if ctx.Err() != nil {
return ctx.Err()
}
if cb.OnReasoningComplete != nil {
cb.OnReasoningComplete()
}
return nil
},
// Text streaming callback
OnTextDelta: func(id, text string) error {
if ctx.Err() != nil {
return ctx.Err()
}
if cb.OnStreamingResponse != nil {
cb.OnStreamingResponse(text)
}
return nil
},
// Warnings callback
OnWarnings: func(warnings []fantasy.CallWarning) error {
if ctx.Err() != nil {
return ctx.Err()
}
if cb.OnWarnings != nil {
strs := make([]string, len(warnings))
for i, w := range warnings {
strs[i] = w.Message
}
cb.OnWarnings(strs)
}
return nil
},
// Source callback
OnSource: func(source fantasy.SourceContent) error {
if ctx.Err() != nil {
return ctx.Err()
}
if cb.OnSource != nil {
cb.OnSource(string(source.SourceType), source.ID, source.URL, source.Title)
}
return nil
},
// Stream finish callback (per-step stream completion)
OnStreamFinish: func(usage fantasy.Usage, finishReason fantasy.FinishReason, _ fantasy.ProviderMetadata) error {
if ctx.Err() != nil {
return ctx.Err()
}
if cb.OnStreamFinish != nil {
cb.OnStreamFinish(usage, string(finishReason))
}
return nil
},
// Error callback
OnError: func(err error) {
if cb.OnError != nil {
cb.OnError(err)
}
},
// Step start callback
OnStepStart: func(stepNumber int) error {
if ctx.Err() != nil {
return ctx.Err()
}
stepCounter = stepNumber
if cb.OnStepStart != nil {
cb.OnStepStart(stepNumber)
}
return nil
},
// Tool call complete - the tool has been parsed and is about to execute
OnToolCall: func(tc fantasy.ToolCallContent) error {
if ctx.Err() != nil {
return ctx.Err()
}
toolCallArgsMu.Lock()
toolCallArgs[tc.ToolCallID] = tc.Input
toolCallArgsMu.Unlock()
// Notify about the tool call
if cb.OnToolCall != nil {
cb.OnToolCall(tc.ToolCallID, tc.ToolName, tc.Input)
}
// Notify tool execution starting
if cb.OnToolExecution != nil {
cb.OnToolExecution(tc.ToolCallID, tc.ToolName, tc.Input, true)
}
return nil
},
// Tool result - tool execution completed
OnToolResult: func(tr fantasy.ToolResultContent) error {
if ctx.Err() != nil {
return ctx.Err()
}
// Look up the args recorded for this specific tool call. Delete
// the entry so the map doesn't accumulate across steps.
toolCallArgsMu.Lock()
args := toolCallArgs[tr.ToolCallID]
delete(toolCallArgs, tr.ToolCallID)
toolCallArgsMu.Unlock()
// Notify tool execution finished
if cb.OnToolExecution != nil {
cb.OnToolExecution(tr.ToolCallID, tr.ToolName, args, false)
}
if cb.OnToolResult != nil {
// Extract result text and error status
resultText, isError := extractToolResultText(tr)
cb.OnToolResult(tr.ToolCallID, tr.ToolName, args, resultText, tr.ClientMetadata, isError)
}
return nil
},
// Step callbacks for content that accompanies tool calls
OnStepFinish: func(step fantasy.StepResult) error {
// Accumulate messages from completed steps so they can be
// persisted even if a later step is cancelled.
completedStepMessages = append(completedStepMessages, step.Messages...)
// Persist step messages incrementally so progress is saved
// as it happens rather than only at the end of the turn.
if cb.OnStepMessages != nil && len(step.Messages) > 0 {
cb.OnStepMessages(step.Messages)
persistedCount += len(step.Messages)
}
if ctx.Err() != nil {
return ctx.Err()
}
// Check if step has text content alongside tool calls
text := step.Content.Text()
toolCalls := step.Content.ToolCalls()
if text != "" && len(toolCalls) > 0 && cb.OnToolCallContent != nil {
cb.OnToolCallContent(text)
}
// Emit step usage for real-time cost tracking
if cb.OnStepUsage != nil {
cb.OnStepUsage(step.Usage.InputTokens, step.Usage.OutputTokens,
step.Usage.CacheReadTokens, step.Usage.CacheCreationTokens)
}
// Emit unified step finish event
if cb.OnStepFinish != nil {
cb.OnStepFinish(stepCounter, len(toolCalls) > 0, string(step.FinishReason), step.Usage)
}
return nil
},
}
// Always wire up PrepareStep to handle both steering and the
// OnPrepareStep hook. Steering drains its channel first, then
// OnPrepareStep hooks run against the (possibly already steered)
// messages.
steerCh := steerChFromContext(ctx)
onConsumed := steerConsumedFromContext(ctx)
hasSteering := steerCh != nil
hasPrepareStepHook := cb.OnPrepareStep != nil
if hasSteering || hasPrepareStepHook {
streamCall.PrepareStep = func(
stepCtx context.Context,
opts fantasy.PrepareStepFunctionOptions,
) (context.Context, fantasy.PrepareStepResult, error) {
result := fantasy.PrepareStepResult{
Model: opts.Model,
Messages: opts.Messages,
}
// Phase 1: Drain steering channel (if present).
if hasSteering {
var steered []SteerMessage
for {
select {
case msg := <-steerCh:
steered = append(steered, msg)
default:
goto done
}
}
done:
if len(steered) > 0 {
for _, sm := range steered {
result.Messages = append(result.Messages,
fantasy.NewUserMessage(sm.Text, sm.Files...))
}
if onConsumed != nil {
onConsumed(len(steered))
}
}
}
// Phase 2: Run OnPrepareStep hook (if registered).
if hasPrepareStepHook {
if replacement := cb.OnPrepareStep(opts.StepNumber, result.Messages); replacement != nil {
result.Messages = replacement
}
}
// Apply message-level cache control for Anthropic models.
result.Messages = applyCacheControlToMessages(result.Messages)
return stepCtx, result, nil
}
}
// Wire OnRetry callback if provided.
if cb.OnRetry != nil {
streamCall.OnRetry = func(err *fantasy.ProviderError, _ time.Duration) {
// Use the retry number from the error if available; Fantasy
// doesn't pass a counter directly, so we approximate with a
// counter incremented on each call.
cb.OnRetry(0, err)
}
}
result, err := a.fantasyAgent.Stream(ctx, streamCall)
if err != nil {
// On cancellation (or any error), return a partial result
// containing messages from completed steps so the caller can
// persist tool calls and results that finished before the
// cancellation. The original input messages are included so
// the caller sees the full conversation up to the point of
// cancellation.
if len(completedStepMessages) > 0 {
partialMessages := make([]fantasy.Message, 0, len(messages)+len(completedStepMessages))
partialMessages = append(partialMessages, messages...)
partialMessages = append(partialMessages, completedStepMessages...)
return &GenerateWithLoopResult{
ConversationMessages: partialMessages,
PersistedMessageCount: persistedCount,
}, err
}
return nil, err
}
// Fire the response callback so callers (e.g. the TUI) can reset
// streaming state. This must fire even when the response text is
// empty (e.g. reasoning-only responses) so the UI properly resets
// the stream component and avoids duplicate content on the next
// flush.
if cb.OnResponse != nil {
cb.OnResponse(result.Response.Content.Text())
}
r := convertAgentResult(result, messages)
r.PersistedMessageCount = persistedCount
return r, nil
}
// Non-streaming path with no callbacks — use the simpler Generate call.
result, err := a.fantasyAgent.Generate(ctx, fantasy.AgentCall{
Prompt: prompt,
Files: files,
Messages: history,
})
if err != nil {
return nil, err
}
// For non-streaming, fire the response callback so callers can reset
// streaming state (see streaming path comment above).
if cb.OnResponse != nil {
cb.OnResponse(result.Response.Content.Text())
}
return convertAgentResult(result, messages), nil
}
// splitPromptAndHistory extracts the last user message as the prompt string,
// and returns everything before it as conversation history. The agent's
// requires the current turn's input as Prompt (string), with prior messages
// passed separately as Messages (history).
func splitPromptAndHistory(messages []fantasy.Message) (string, []fantasy.FilePart, []fantasy.Message) {
if len(messages) == 0 {
return "", nil, nil
}
// Walk backwards to find the last user message
for i := len(messages) - 1; i >= 0; i-- {
if messages[i].Role == fantasy.MessageRoleUser {
// Extract text and file parts from the user message
var prompt string
var files []fantasy.FilePart
for _, part := range messages[i].Content {
switch p := part.(type) {
case fantasy.TextPart:
if prompt == "" {
prompt = p.Text
}
case fantasy.FilePart:
files = append(files, p)
}
}
// History is everything except this last user message
history := make([]fantasy.Message, 0, len(messages)-1)
history = append(history, messages[:i]...)
history = append(history, messages[i+1:]...)
return prompt, files, history
}
}
// No user message found — use the last message's text as prompt
last := messages[len(messages)-1]
for _, part := range last.Content {
if tp, ok := part.(fantasy.TextPart); ok {
return tp.Text, nil, messages[:len(messages)-1]
}
}
return "", nil, messages
}
// convertAgentResult converts an AgentResult to our GenerateWithLoopResult.
// It builds both the message slice and the new custom content blocks.
func convertAgentResult(result *fantasy.AgentResult, originalMessages []fantasy.Message) *GenerateWithLoopResult {
// Collect all conversation messages: original + all step messages
var allFantasyMessages []fantasy.Message
allFantasyMessages = append(allFantasyMessages, originalMessages...)
for _, step := range result.Steps {
allFantasyMessages = append(allFantasyMessages, step.Messages...)
}
// Convert to custom content blocks
var allMessages []message.Message
for _, fm := range allFantasyMessages {
allMessages = append(allMessages, message.FromLLMMessage(fm))
}
return &GenerateWithLoopResult{
FinalResponse: &result.Response,
ConversationMessages: allFantasyMessages,
Messages: allMessages,
TotalUsage: result.TotalUsage,
StopReason: string(result.Response.FinishReason),
}
}
// extractToolResultText extracts the text and error status from a ToolResultContent.
// For core tools, the result is already clean text (no MCP JSON wrapping).
// For MCP tools, it unwraps the MCP content structure.
func extractToolResultText(tr fantasy.ToolResultContent) (string, bool) {
if tr.Result == nil {
return "", false
}
// Check if this is an error result by examining the type.
if errResult, ok := tr.Result.(fantasy.ToolResultOutputContentError); ok {
return errResult.Error.Error(), true
}
// Get text directly from the result type.
if textResult, ok := tr.Result.(fantasy.ToolResultOutputContentText); ok {
// Try to unwrap MCP JSON structure (for external MCP tools).
// Core tools return plain text, so this is a no-op for them.
return extractMCPContentText(textResult.Text), false
}
// Fallback: stringify for display.
return fmt.Sprintf("%v", tr.Result), false
}
// extractMCPContentText attempts to parse an MCP tool result JSON string
// and extract the human-readable text from its content array. The expected
// format is: {"content":[{"type":"text","text":"..."}], "_meta":{...}}
// If parsing fails the original string is returned unchanged.
func extractMCPContentText(result string) string {
// Quick check: if it doesn't look like MCP JSON, return as-is
if !strings.HasPrefix(strings.TrimSpace(result), "{") {
return result
}
// Try to parse as MCP result structure
type mcpContent struct {
Type string `json:"type"`
Text string `json:"text"`
}
type mcpResult struct {
Content []mcpContent `json:"content"`
}
var parsed mcpResult
if err := json.Unmarshal([]byte(result), &parsed); err == nil && len(parsed.Content) > 0 {
var texts []string
for _, c := range parsed.Content {
if c.Type == "text" && c.Text != "" {
texts = append(texts, c.Text)
}
}
if len(texts) > 0 {
return strings.Join(texts, "\n")
}
}
return result
}
// GetTools returns the list of available tools loaded in the agent,
// including core tools, MCP tools, and extension-registered tools.
func (a *Agent) GetTools() []fantasy.AgentTool {
allTools := make([]fantasy.AgentTool, len(a.coreTools))
copy(allTools, a.coreTools)
if a.toolManager != nil {
allTools = append(allTools, mcpToolsToAgentTools(a.toolManager.GetTools(), a.toolManager)...)
}
if len(a.extraTools) > 0 {
allTools = append(allTools, a.extraTools...)
}
return allTools
}
// GetCoreToolCount returns the number of core tools.
func (a *Agent) GetCoreToolCount() int {
return len(a.coreTools)
}
// GetMCPToolCount returns the number of tools loaded from external MCP servers.
func (a *Agent) GetMCPToolCount() int {
if a.toolManager == nil {
return 0
}
return len(a.toolManager.GetTools())
}
// GetExtensionToolCount returns the number of tools registered by extensions.
func (a *Agent) GetExtensionToolCount() int {
return len(a.extraTools)
}
// SetExtraTools replaces the agent's extra tools (e.g. extension-registered
// tools) and rebuilds the internal agent with the updated tool list. The
// model, system prompt, and all other configuration are preserved.
func (a *Agent) SetExtraTools(extraTools []fantasy.AgentTool) {
a.extraTools = extraTools
a.rebuildFantasyAgent()
}
// AddMCPServer connects to a new MCP server at runtime and makes its tools
// available to the agent. Returns the number of tools loaded.
// If the agent has no tool manager (no MCP servers were configured at init),
// one is created automatically.
func (a *Agent) AddMCPServer(ctx context.Context, name string, cfg config.MCPServerConfig) (int, error) {
// Ensure MCP tools from initial load are settled first.
a.ensureMCPTools()
if a.toolManager == nil {
a.toolManager = tools.NewMCPToolManager()
if a.authHandler != nil {
a.toolManager.SetAuthHandler(a.authHandler)
}
if a.tokenStoreFactory != nil {
a.toolManager.SetTokenStoreFactory(a.tokenStoreFactory)
}
a.toolManager.SetTaskConfig(a.mcpTaskConfig)
a.toolManager.SetOnToolsChanged(func() {
a.rebuildFantasyAgent()
})
}
count, err := a.toolManager.AddServer(ctx, name, cfg)
if err != nil {
return 0, err
}
// AddServer's onToolsChanged callback triggers rebuildFantasyAgent,
// but only if it was wired. Ensure rebuild happens regardless.
a.rebuildFantasyAgent()
return count, nil
}
// RemoveMCPServer disconnects an MCP server and removes its tools from the agent.
func (a *Agent) RemoveMCPServer(name string) error {
if a.toolManager == nil {
return fmt.Errorf("no MCP servers loaded")
}
// Ensure MCP tools from initial load are settled first.
a.ensureMCPTools()
err := a.toolManager.RemoveServer(name)
if err != nil {
return err
}
// RemoveServer's onToolsChanged callback triggers rebuildFantasyAgent,
// but ensure rebuild happens regardless.
a.rebuildFantasyAgent()
return nil
}
// GetMCPToolManager returns the underlying MCP tool manager.
// Returns nil if no MCP servers have been configured.
func (a *Agent) GetMCPToolManager() *tools.MCPToolManager {
return a.toolManager
}
// GetLoadingMessage returns the loading message from provider creation.
func (a *Agent) GetLoadingMessage() string {
return a.loadingMessage
}
// GetLoadedServerNames returns the names of successfully loaded MCP servers.
func (a *Agent) GetLoadedServerNames() []string {
if a.toolManager == nil {
return nil
}
return a.toolManager.GetLoadedServerNames()
}
// GetMCPPrompts returns all prompts discovered from connected MCP servers.
// Returns nil if no MCP servers are configured or no prompts were found.
func (a *Agent) GetMCPPrompts() []tools.MCPPrompt {
if a.toolManager == nil {
return nil
}
return a.toolManager.GetPrompts()
}
// GetMCPPrompt retrieves and expands a specific prompt from an MCP server.
// This is a lazy call — the server is contacted each time.
func (a *Agent) GetMCPPrompt(ctx context.Context, serverName, promptName string, args map[string]string) (*tools.MCPPromptResult, error) {
if a.toolManager == nil {
return nil, fmt.Errorf("no MCP servers configured")
}
return a.toolManager.GetPrompt(ctx, serverName, promptName, args)
}
// GetMCPResources returns all resources discovered from connected MCP servers.
func (a *Agent) GetMCPResources() []tools.MCPResource {
if a.toolManager == nil {
return nil
}
return a.toolManager.GetResources()
}
// ReadMCPResource reads a specific resource from an MCP server by URI.
func (a *Agent) ReadMCPResource(ctx context.Context, serverName, uri string) (*tools.MCPResourceContent, error) {
if a.toolManager == nil {
return nil, fmt.Errorf("no MCP servers configured")
}
return a.toolManager.ReadResource(ctx, serverName, uri)
}
// SubscribeMCPResource subscribes to change notifications for a resource.
func (a *Agent) SubscribeMCPResource(ctx context.Context, serverName, uri string) error {
if a.toolManager == nil {
return fmt.Errorf("no MCP servers configured")
}
return a.toolManager.SubscribeResource(ctx, serverName, uri)
}
// UnsubscribeMCPResource cancels change notifications for a resource.
func (a *Agent) UnsubscribeMCPResource(ctx context.Context, serverName, uri string) error {
if a.toolManager == nil {
return fmt.Errorf("no MCP servers configured")
}
return a.toolManager.UnsubscribeResource(ctx, serverName, uri)
}
// SetModel swaps the agent's LLM provider to a new model. The existing tools
// and configuration are preserved. When the new model's ProviderConfig carries
// a system prompt (from per-model settings), it replaces the agent's stored
// prompt so the rebuilt fantasy agent uses it. The old provider is closed if
// it has a closer.
func (a *Agent) SetModel(ctx context.Context, config *models.ProviderConfig) error {
// Ensure MCP tools are loaded before rebuilding (SetModel may be called
// before the first LLM call).
a.ensureMCPTools()
providerResult, err := models.CreateProvider(ctx, config)
if err != nil {
return fmt.Errorf("failed to create model provider: %v", err)
}
// Close old provider.
if a.providerCloser != nil {
_ = a.providerCloser.Close()
}
// Swap fields.
a.model = providerResult.Model
a.providerCloser = providerResult.Closer
a.providerOptions = providerResult.ProviderOptions
a.skipMaxOutputTokens = providerResult.SkipMaxOutputTokens
a.modelConfig = config
// Update system prompt when the config carries one (from per-model
// settings or the global config). This allows model-specific system
// prompts to take effect on model switch.
if config.SystemPrompt != "" {
a.systemPrompt = config.SystemPrompt
}
// Update provider type.
if config.ModelString != "" {
if p, _, err := models.ParseModelString(config.ModelString); err == nil {
a.providerType = p
}
}
// Rebuild the fantasy agent with the new model and current tool set.
a.rebuildFantasyAgent()
return nil
}
// GetModel returns the underlying LanguageModel.
func (a *Agent) GetModel() fantasy.LanguageModel {
return a.model
}
// SetSystemPrompt updates the agent's system prompt and rebuilds the underlying
// fantasy agent so subsequent turns use the new prompt. Safe to call while the
// agent is idle; if invoked during an in-flight turn the new prompt takes
// effect on the next LLM call.
func (a *Agent) SetSystemPrompt(prompt string) {
a.promptMu.Lock()
defer a.promptMu.Unlock()
a.systemPrompt = prompt
a.rebuildFantasyAgent()
}
// GetSystemPrompt returns the agent's current system prompt.
func (a *Agent) GetSystemPrompt() string {
a.promptMu.Lock()
defer a.promptMu.Unlock()
return a.systemPrompt
}
// GetMaxTokens returns the effective max output tokens the agent currently
// sends to the LLM provider, after per-model defaults, right-sizing, and any
// Anthropic thinking-budget adjustments. Returns 0 when no ModelConfig is
// attached (e.g. early init) or when the provider suppresses the parameter
// (e.g. Codex OAuth), which allows callers to differentiate "default" from
// "explicitly capped".
func (a *Agent) GetMaxTokens() int {
if a.skipMaxOutputTokens {
return 0
}
if a.modelConfig == nil {
return 0
}
return a.modelConfig.MaxTokens
}
// Close closes the agent and cleans up resources.
// If MCP tools are still loading in the background, Close waits for them
// to finish before closing connections to avoid resource leaks.
func (a *Agent) Close() error {
// Wait for background MCP loading to finish before closing connections.
if a.mcpReady != nil {
<-a.mcpReady
}
var toolErr error
if a.toolManager != nil {
toolErr = a.toolManager.Close()
}
if a.providerCloser != nil {
if err := a.providerCloser.Close(); err != nil && toolErr == nil {
toolErr = err
}
}
return toolErr
}