diff --git a/README.md b/README.md index cce98d7f..2747f4d4 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ A powerful, extensible AI coding agent CLI with multi-provider support, built-in - **Session Management**: Tree-based conversation history with branching support - **Non-Interactive Mode**: Script-friendly positional args with JSON output - **ACP Server**: Run Kit as an [Agent Client Protocol](https://agentclientprotocol.com) agent over stdio -- **Go SDK**: Embed Kit in your own applications +- **Go SDK**: Embed Kit in your own applications with full agent lifecycle events (30+ event types) and behavior-modifying hooks ## Installation @@ -684,7 +684,7 @@ unsub2 := host.OnToolResult(func(e kit.ToolResultEvent) { }) defer unsub2() -unsub3 := host.OnStreaming(func(e kit.MessageUpdateEvent) { +unsub3 := host.OnMessageUpdate(func(e kit.MessageUpdateEvent) { print(e.Chunk) }) defer unsub3() diff --git a/examples/sdk/basic/main.go b/examples/sdk/basic/main.go index 7b70d25f..48da2583 100644 --- a/examples/sdk/basic/main.go +++ b/examples/sdk/basic/main.go @@ -62,7 +62,7 @@ func main() { } }) // Subscribe to streaming chunks. - host3.OnStreaming(func(e kit.MessageUpdateEvent) { + host3.OnMessageUpdate(func(e kit.MessageUpdateEvent) { fmt.Print(e.Chunk) }) diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 1da108d5..c059d5b1 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "strings" + "time" "charm.land/fantasy" @@ -126,6 +127,76 @@ type StepMessagesHandler func(stepMessages []fantasy.Message) // tracking during long-running tool-calling conversations. type StepUsageHandler func(inputTokens, outputTokens, cacheReadTokens, cacheCreationTokens int64) +// StepStartHandler is called when a new LLM step begins within a turn. +type StepStartHandler func(stepNumber int) + +// StepFinishHandler is called when a step completes with full context. +type StepFinishHandler func(stepNumber int, hasToolCalls bool, finishReason string, usage fantasy.Usage) + +// TextStartHandler is called when the LLM begins generating text content. +type TextStartHandler func(id string) + +// TextEndHandler is called when the LLM finishes generating text content. +type TextEndHandler func(id string) + +// ReasoningStartHandler is called when the LLM begins reasoning/thinking. +type ReasoningStartHandler func(id string) + +// WarningsHandler is called when the LLM provider returns warnings. +type WarningsHandler func(warnings []string) + +// SourceHandler is called when the LLM references a source. +type SourceHandler func(sourceType, id, url, title string) + +// StreamFinishHandler is called when a per-step LLM stream completes. +type StreamFinishHandler func(usage fantasy.Usage, finishReason string) + +// ErrorHandler is called when an agent-level error occurs. +type ErrorHandler func(err error) + +// RetryHandler is called when the LLM request is retried. +type RetryHandler func(attempt int, err error) + +// PrepareStepHandler is called between steps to allow message modification. +// It receives the step number and current messages, and returns replacement +// messages (or nil to keep unchanged). +type PrepareStepHandler func(stepNumber int, messages []fantasy.Message) []fantasy.Message + +// GenerateCallbacks consolidates all callback functions for +// GenerateWithLoopAndStreaming into a single struct. This replaces the previous +// 16+ positional callback parameters, making it easier to add new callbacks +// without breaking existing callers (new fields default to nil). +type GenerateCallbacks struct { + OnToolCall ToolCallHandler + OnToolExecution ToolExecutionHandler + OnToolResult ToolResultHandler + OnResponse ResponseHandler + OnToolCallContent ToolCallContentHandler + OnStreamingResponse StreamingResponseHandler + OnReasoningDelta ReasoningDeltaHandler + OnReasoningComplete ReasoningCompleteHandler + OnToolOutput ToolOutputHandler + OnStepMessages StepMessagesHandler + OnStepUsage StepUsageHandler + OnPasswordPrompt PasswordPromptHandler + OnToolCallStart ToolCallStartHandler + OnToolCallDelta ToolCallDeltaHandler + OnToolCallEnd ToolCallEndHandler + + // New callbacks for previously unwired Fantasy lifecycle events. + OnStepStart StepStartHandler + OnStepFinish StepFinishHandler + OnTextStart TextStartHandler + OnTextEnd TextEndHandler + OnReasoningStart ReasoningStartHandler + OnWarnings WarningsHandler + OnSource SourceHandler + OnStreamFinish StreamFinishHandler + OnError ErrorHandler + OnRetry RetryHandler + OnPrepareStep PrepareStepHandler +} + // Agent represents an AI agent with core tool integration using the LLM library. // Core tools (bash, read, write, edit, grep, find, ls) are registered as direct // AgentTool implementations — no MCP layer, no serialization overhead. @@ -423,13 +494,20 @@ func (a *Agent) GenerateWithLoop(ctx context.Context, messages []fantasy.Message onToolCall ToolCallHandler, onToolExecution ToolExecutionHandler, onToolResult ToolResultHandler, onResponse ResponseHandler, onToolCallContent ToolCallContentHandler, ) (*GenerateWithLoopResult, error) { - return a.GenerateWithLoopAndStreaming(ctx, messages, onToolCall, onToolExecution, onToolResult, - onResponse, onToolCallContent, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + return a.GenerateWithCallbacks(ctx, messages, GenerateCallbacks{ + OnToolCall: onToolCall, + OnToolExecution: onToolExecution, + OnToolResult: onToolResult, + OnResponse: onResponse, + OnToolCallContent: onToolCallContent, + }) } // GenerateWithLoopAndStreaming processes messages using the agent with streaming and callbacks. -// The agent handles the tool call loop internally. We map the rich callback system -// to kit's existing callback interface for UI integration. +// The agent handles the tool call loop internally. +// +// Deprecated: Use GenerateWithCallbacks instead, which takes a GenerateCallbacks +// struct and is easier to extend with new callbacks. func (a *Agent) GenerateWithLoopAndStreaming(ctx context.Context, messages []fantasy.Message, onToolCall ToolCallHandler, onToolExecution ToolExecutionHandler, onToolResult ToolResultHandler, onResponse ResponseHandler, onToolCallContent ToolCallContentHandler, @@ -444,6 +522,31 @@ func (a *Agent) GenerateWithLoopAndStreaming(ctx context.Context, messages []fan onToolCallDelta ToolCallDeltaHandler, onToolCallEnd ToolCallEndHandler, ) (*GenerateWithLoopResult, error) { + return a.GenerateWithCallbacks(ctx, messages, GenerateCallbacks{ + OnToolCall: onToolCall, + OnToolExecution: onToolExecution, + OnToolResult: onToolResult, + OnResponse: onResponse, + OnToolCallContent: onToolCallContent, + OnStreamingResponse: onStreamingResponse, + OnReasoningDelta: onReasoningDelta, + OnReasoningComplete: onReasoningComplete, + OnToolOutput: onToolOutput, + OnStepMessages: onStepMessages, + OnStepUsage: onStepUsage, + OnPasswordPrompt: onPasswordPrompt, + OnToolCallStart: onToolCallStart, + OnToolCallDelta: onToolCallDelta, + OnToolCallEnd: onToolCallEnd, + }) +} + +// GenerateWithCallbacks processes messages using the agent with streaming and callbacks. +// The agent handles the tool call loop internally. We map the rich callback system +// to kit's existing callback interface for UI integration. +func (a *Agent) GenerateWithCallbacks(ctx context.Context, messages []fantasy.Message, + cb GenerateCallbacks, +) (*GenerateWithLoopResult, error) { // Wait for background MCP tool loading to complete and rebuild the // fantasy agent with the full tool set. This is a no-op when no MCP @@ -451,13 +554,13 @@ func (a *Agent) GenerateWithLoopAndStreaming(ctx context.Context, messages []fan a.ensureMCPTools() // Inject tool output handler into context for use by core tools (e.g., bash). - if onToolOutput != nil { - ctx = core.ContextWithToolOutputCallback(ctx, onToolOutput) + if cb.OnToolOutput != nil { + ctx = core.ContextWithToolOutputCallback(ctx, cb.OnToolOutput) } // Inject password prompt handler into context for use by bash tool. - if onPasswordPrompt != nil { - ctx = core.ContextWithPasswordPrompt(ctx, onPasswordPrompt) + if cb.OnPasswordPrompt != nil { + ctx = core.ContextWithPasswordPrompt(ctx, cb.OnPasswordPrompt) } // The agent requires the current user input as Prompt, with prior messages as history. @@ -477,9 +580,13 @@ func (a *Agent) GenerateWithLoopAndStreaming(ctx context.Context, messages []fan // provided. The agent only exposes tool/step callbacks on AgentStreamCall, so // Stream is required to observe tool execution in real time. The non-streaming // Generate path is reserved for the simple case with no callbacks at all. - hasCallbacks := onToolCall != nil || onToolExecution != nil || onToolResult != nil || - onToolCallContent != nil || onStreamingResponse != nil || onReasoningDelta != nil || - onToolCallStart != nil || onToolCallDelta != nil || onToolCallEnd != nil + hasCallbacks := cb.OnToolCall != nil || cb.OnToolExecution != nil || cb.OnToolResult != nil || + cb.OnToolCallContent != nil || cb.OnStreamingResponse != nil || cb.OnReasoningDelta != nil || + cb.OnToolCallStart != nil || cb.OnToolCallDelta != nil || cb.OnToolCallEnd != nil || + cb.OnStepStart != nil || cb.OnStepFinish != nil || cb.OnTextStart != nil || + cb.OnTextEnd != nil || cb.OnReasoningStart != nil || cb.OnWarnings != nil || + cb.OnSource != nil || cb.OnStreamFinish != nil || cb.OnError != nil || + cb.OnRetry != nil || cb.OnPrepareStep != nil if a.streamingEnabled || hasCallbacks { // Track completed step messages so we can return partial results @@ -488,9 +595,11 @@ func (a *Agent) GenerateWithLoopAndStreaming(ctx context.Context, messages []fan // for every step that completed before the error occurred. var completedStepMessages []fantasy.Message // persistedCount tracks how many new messages (beyond the original - // input) were persisted incrementally via onStepMessages, so the + // input) were persisted incrementally via cb.OnStepMessages, so the // caller can skip them during post-generation persistence. var persistedCount int + // stepCounter tracks the current step number for StepStart/StepFinish events. + var stepCounter int // Use the streaming agent streamCall := fantasy.AgentStreamCall{ @@ -503,8 +612,8 @@ func (a *Agent) GenerateWithLoopAndStreaming(ctx context.Context, messages []fan if ctx.Err() != nil { return ctx.Err() } - if onToolCallStart != nil { - onToolCallStart(id, toolName) + if cb.OnToolCallStart != nil { + cb.OnToolCallStart(id, toolName) } return nil }, @@ -512,8 +621,8 @@ func (a *Agent) GenerateWithLoopAndStreaming(ctx context.Context, messages []fan if ctx.Err() != nil { return ctx.Err() } - if onToolCallDelta != nil { - onToolCallDelta(id, delta) + if cb.OnToolCallDelta != nil { + cb.OnToolCallDelta(id, delta) } return nil }, @@ -521,8 +630,39 @@ func (a *Agent) GenerateWithLoopAndStreaming(ctx context.Context, messages []fan if ctx.Err() != nil { return ctx.Err() } - if onToolCallEnd != nil { - onToolCallEnd(id) + if cb.OnToolCallEnd != nil { + cb.OnToolCallEnd(id) + } + return nil + }, + + // Text start/end callbacks + OnTextStart: func(id string) error { + if ctx.Err() != nil { + return ctx.Err() + } + if cb.OnTextStart != nil { + cb.OnTextStart(id) + } + return nil + }, + OnTextEnd: func(id string) error { + if ctx.Err() != nil { + return ctx.Err() + } + if cb.OnTextEnd != nil { + cb.OnTextEnd(id) + } + return nil + }, + + // Reasoning start callback + OnReasoningStart: func(id string, _ fantasy.ReasoningContent) error { + if ctx.Err() != nil { + return ctx.Err() + } + if cb.OnReasoningStart != nil { + cb.OnReasoningStart(id) } return nil }, @@ -532,8 +672,8 @@ func (a *Agent) GenerateWithLoopAndStreaming(ctx context.Context, messages []fan if ctx.Err() != nil { return ctx.Err() } - if onReasoningDelta != nil { - onReasoningDelta(delta) + if cb.OnReasoningDelta != nil { + cb.OnReasoningDelta(delta) } return nil }, @@ -543,8 +683,8 @@ func (a *Agent) GenerateWithLoopAndStreaming(ctx context.Context, messages []fan if ctx.Err() != nil { return ctx.Err() } - if onReasoningComplete != nil { - onReasoningComplete() + if cb.OnReasoningComplete != nil { + cb.OnReasoningComplete() } return nil }, @@ -554,8 +694,64 @@ func (a *Agent) GenerateWithLoopAndStreaming(ctx context.Context, messages []fan if ctx.Err() != nil { return ctx.Err() } - if onStreamingResponse != nil { - onStreamingResponse(text) + if cb.OnStreamingResponse != nil { + cb.OnStreamingResponse(text) + } + return nil + }, + + // Warnings callback + OnWarnings: func(warnings []fantasy.CallWarning) error { + if ctx.Err() != nil { + return ctx.Err() + } + if cb.OnWarnings != nil { + strs := make([]string, len(warnings)) + for i, w := range warnings { + strs[i] = w.Message + } + cb.OnWarnings(strs) + } + return nil + }, + + // Source callback + OnSource: func(source fantasy.SourceContent) error { + if ctx.Err() != nil { + return ctx.Err() + } + if cb.OnSource != nil { + cb.OnSource(string(source.SourceType), source.ID, source.URL, source.Title) + } + return nil + }, + + // Stream finish callback (per-step stream completion) + OnStreamFinish: func(usage fantasy.Usage, finishReason fantasy.FinishReason, _ fantasy.ProviderMetadata) error { + if ctx.Err() != nil { + return ctx.Err() + } + if cb.OnStreamFinish != nil { + cb.OnStreamFinish(usage, string(finishReason)) + } + return nil + }, + + // Error callback + OnError: func(err error) { + if cb.OnError != nil { + cb.OnError(err) + } + }, + + // Step start callback + OnStepStart: func(stepNumber int) error { + if ctx.Err() != nil { + return ctx.Err() + } + stepCounter = stepNumber + if cb.OnStepStart != nil { + cb.OnStepStart(stepNumber) } return nil }, @@ -568,13 +764,13 @@ func (a *Agent) GenerateWithLoopAndStreaming(ctx context.Context, messages []fan currentToolArgs = tc.Input // Notify about the tool call - if onToolCall != nil { - onToolCall(tc.ToolCallID, tc.ToolName, tc.Input) + if cb.OnToolCall != nil { + cb.OnToolCall(tc.ToolCallID, tc.ToolName, tc.Input) } // Notify tool execution starting - if onToolExecution != nil { - onToolExecution(tc.ToolCallID, tc.ToolName, tc.Input, true) + if cb.OnToolExecution != nil { + cb.OnToolExecution(tc.ToolCallID, tc.ToolName, tc.Input, true) } return nil @@ -586,14 +782,14 @@ func (a *Agent) GenerateWithLoopAndStreaming(ctx context.Context, messages []fan return ctx.Err() } // Notify tool execution finished - if onToolExecution != nil { - onToolExecution(tr.ToolCallID, tr.ToolName, currentToolArgs, false) + if cb.OnToolExecution != nil { + cb.OnToolExecution(tr.ToolCallID, tr.ToolName, currentToolArgs, false) } - if onToolResult != nil { + if cb.OnToolResult != nil { // Extract result text and error status resultText, isError := extractToolResultText(tr) - onToolResult(tr.ToolCallID, tr.ToolName, currentToolArgs, resultText, tr.ClientMetadata, isError) + cb.OnToolResult(tr.ToolCallID, tr.ToolName, currentToolArgs, resultText, tr.ClientMetadata, isError) } return nil @@ -607,8 +803,8 @@ func (a *Agent) GenerateWithLoopAndStreaming(ctx context.Context, messages []fan // Persist step messages incrementally so progress is saved // as it happens rather than only at the end of the turn. - if onStepMessages != nil && len(step.Messages) > 0 { - onStepMessages(step.Messages) + if cb.OnStepMessages != nil && len(step.Messages) > 0 { + cb.OnStepMessages(step.Messages) persistedCount += len(step.Messages) } @@ -618,65 +814,88 @@ func (a *Agent) GenerateWithLoopAndStreaming(ctx context.Context, messages []fan // Check if step has text content alongside tool calls text := step.Content.Text() toolCalls := step.Content.ToolCalls() - if text != "" && len(toolCalls) > 0 && onToolCallContent != nil { - onToolCallContent(text) + if text != "" && len(toolCalls) > 0 && cb.OnToolCallContent != nil { + cb.OnToolCallContent(text) } // Emit step usage for real-time cost tracking - if onStepUsage != nil { - onStepUsage(step.Usage.InputTokens, step.Usage.OutputTokens, + if cb.OnStepUsage != nil { + cb.OnStepUsage(step.Usage.InputTokens, step.Usage.OutputTokens, step.Usage.CacheReadTokens, step.Usage.CacheCreationTokens) } + // Emit unified step finish event + if cb.OnStepFinish != nil { + cb.OnStepFinish(stepCounter, len(toolCalls) > 0, string(step.FinishReason), step.Usage) + } return nil }, } - // If a steer channel is attached to the context, wire up a - // PrepareStep function that drains the channel between steps - // and injects pending steer messages as user messages before - // the next LLM call. This enables graceful mid-turn steering - // without cancelling in-progress tool execution. - if steerCh := steerChFromContext(ctx); steerCh != nil { - onConsumed := steerConsumedFromContext(ctx) + // Always wire up PrepareStep to handle both steering and the + // OnPrepareStep hook. Steering drains its channel first, then + // OnPrepareStep hooks run against the (possibly already steered) + // messages. + steerCh := steerChFromContext(ctx) + onConsumed := steerConsumedFromContext(ctx) + hasSteering := steerCh != nil + hasPrepareStepHook := cb.OnPrepareStep != nil + + if hasSteering || hasPrepareStepHook { streamCall.PrepareStep = func( stepCtx context.Context, opts fantasy.PrepareStepFunctionOptions, ) (context.Context, fantasy.PrepareStepResult, error) { - // Drain all pending steer messages (non-blocking). - var steered []SteerMessage - for { - select { - case msg := <-steerCh: - steered = append(steered, msg) - default: - goto done - } - } - done: result := fantasy.PrepareStepResult{ Model: opts.Model, Messages: opts.Messages, } - if len(steered) > 0 { - // Inject each steer message as a user message so the - // LLM sees the redirection on the next step. - for _, sm := range steered { - result.Messages = append(result.Messages, - fantasy.NewUserMessage(sm.Text, sm.Files...)) + + // Phase 1: Drain steering channel (if present). + if hasSteering { + var steered []SteerMessage + for { + select { + case msg := <-steerCh: + steered = append(steered, msg) + default: + goto done + } } - // Notify that steer messages were consumed. - if onConsumed != nil { - onConsumed(len(steered)) + done: + if len(steered) > 0 { + for _, sm := range steered { + result.Messages = append(result.Messages, + fantasy.NewUserMessage(sm.Text, sm.Files...)) + } + if onConsumed != nil { + onConsumed(len(steered)) + } + } + } + + // Phase 2: Run OnPrepareStep hook (if registered). + if hasPrepareStepHook { + if replacement := cb.OnPrepareStep(opts.StepNumber, result.Messages); replacement != nil { + result.Messages = replacement } } // Apply message-level cache control for Anthropic models. - // This avoids type conflicts with provider-level options. result.Messages = applyCacheControlToMessages(result.Messages) return stepCtx, result, nil } } + // Wire OnRetry callback if provided. + if cb.OnRetry != nil { + streamCall.OnRetry = func(err *fantasy.ProviderError, _ time.Duration) { + // Use the retry number from the error if available; Fantasy + // doesn't pass a counter directly, so we approximate with a + // counter incremented on each call. + cb.OnRetry(0, err) + } + } + result, err := a.fantasyAgent.Stream(ctx, streamCall) if err != nil { // On cancellation (or any error), return a partial result @@ -702,8 +921,8 @@ func (a *Agent) GenerateWithLoopAndStreaming(ctx context.Context, messages []fan // empty (e.g. reasoning-only responses) so the UI properly resets // the stream component and avoids duplicate content on the next // flush. - if onResponse != nil { - onResponse(result.Response.Content.Text()) + if cb.OnResponse != nil { + cb.OnResponse(result.Response.Content.Text()) } r := convertAgentResult(result, messages) @@ -723,8 +942,8 @@ func (a *Agent) GenerateWithLoopAndStreaming(ctx context.Context, messages []fan // For non-streaming, fire the response callback so callers can reset // streaming state (see streaming path comment above). - if onResponse != nil { - onResponse(result.Response.Content.Text()) + if cb.OnResponse != nil { + cb.OnResponse(result.Response.Content.Text()) } return convertAgentResult(result, messages), nil diff --git a/internal/extensions/api.go b/internal/extensions/api.go index 95583db1..16d3e377 100644 --- a/internal/extensions/api.go +++ b/internal/extensions/api.go @@ -1094,6 +1094,14 @@ type API struct { onSubagentStart func(func(SubagentStartEvent, Context)) onSubagentChunk func(func(SubagentChunkEvent, Context)) onSubagentEnd func(func(SubagentEndEvent, Context)) + onStepStart func(func(StepStartEvent, Context)) + onStepFinish func(func(StepFinishEvent, Context)) + onReasoningStart func(func(ReasoningStartEvent, Context)) + onWarnings func(func(WarningsEvent, Context)) + onSource func(func(SourceEvent, Context)) + onError func(func(ErrorEvent, Context)) + onRetry func(func(RetryEvent, Context)) + onPrepareStep func(func(PrepareStepEvent, Context) *PrepareStepResult) } // OnToolCall registers a handler that fires before a tool executes. @@ -1301,6 +1309,56 @@ func (a *API) OnBeforeCompact(handler func(BeforeCompactEvent, Context) *BeforeC a.onBeforeCompact(handler) } +// OnStepStart registers a handler that fires when a new LLM call begins +// within a multi-step agent turn. +func (a *API) OnStepStart(handler func(StepStartEvent, Context)) { + a.onStepStart(handler) +} + +// OnStepFinish registers a handler that fires when a step completes, +// providing step number, finish reason, and decomposed token usage. +func (a *API) OnStepFinish(handler func(StepFinishEvent, Context)) { + a.onStepFinish(handler) +} + +// OnReasoningStart registers a handler that fires when the LLM begins +// reasoning/thinking. +func (a *API) OnReasoningStart(handler func(ReasoningStartEvent, Context)) { + a.onReasoningStart(handler) +} + +// OnWarnings registers a handler that fires when the LLM provider returns +// warnings about the request. +func (a *API) OnWarnings(handler func(WarningsEvent, Context)) { + a.onWarnings(handler) +} + +// OnSource registers a handler that fires when the LLM references a source +// (e.g. from web search tools). +func (a *API) OnSource(handler func(SourceEvent, Context)) { + a.onSource(handler) +} + +// OnError registers a handler that fires when an agent-level error occurs +// during streaming. +func (a *API) OnError(handler func(ErrorEvent, Context)) { + a.onError(handler) +} + +// OnRetry registers a handler that fires when the LLM provider request is +// retried after a transient error. +func (a *API) OnRetry(handler func(RetryEvent, Context)) { + a.onRetry(handler) +} + +// OnPrepareStep registers a handler that fires between steps within a +// multi-step agent turn, after steering messages are injected and before +// messages are sent to the LLM. Return a non-nil PrepareStepResult with +// Messages to replace the context window for this step. +func (a *API) OnPrepareStep(handler func(PrepareStepEvent, Context) *PrepareStepResult) { + a.onPrepareStep(handler) +} + // RegisterToolRenderer registers a custom renderer for a specific tool's // display in the TUI. The renderer controls the header (parameter summary) // and/or body (result display) of the tool's output block. If multiple @@ -2253,6 +2311,98 @@ type SubagentEndEvent struct { func (e SubagentEndEvent) Type() EventType { return SubagentEnd } +// --------------------------------------------------------------------------- +// Step lifecycle events (exposed to Yaegi — concrete structs) +// --------------------------------------------------------------------------- + +// StepStartEvent fires when a new LLM call begins within a multi-step agent turn. +type StepStartEvent struct { + StepNumber int +} + +func (e StepStartEvent) Type() EventType { return StepStart } + +// StepFinishEvent fires when a step completes, providing step metadata and +// token usage. Usage fields are plain int64 (not LLMUsage) because Yaegi +// cannot handle fantasy types across the interpreter boundary. +type StepFinishEvent struct { + StepNumber int + HasToolCalls bool + FinishReason string + InputTokens int64 + OutputTokens int64 + CacheReadTokens int64 + CacheWriteTokens int64 +} + +func (e StepFinishEvent) Type() EventType { return StepFinish } + +// ReasoningStartEvent fires when the LLM begins reasoning/thinking. +type ReasoningStartEvent struct { + ID string +} + +func (e ReasoningStartEvent) Type() EventType { return ReasoningStart } + +// WarningsEvent fires when the LLM provider returns warnings about the request. +type WarningsEvent struct { + Warnings []string +} + +func (e WarningsEvent) Type() EventType { return Warnings } + +// SourceEvent fires when the LLM references a source (e.g. from web search). +type SourceEvent struct { + SourceType string + ID string + URL string + Title string +} + +func (e SourceEvent) Type() EventType { return Source } + +// ErrorEvent fires when an agent-level error occurs during streaming. +// Uses string instead of error because Yaegi cannot handle the error +// interface reliably across the interpreter boundary. +type ErrorEvent struct { + Error string +} + +func (e ErrorEvent) Type() EventType { return Error } + +// RetryEvent fires when the LLM provider request is retried after a +// transient error. +type RetryEvent struct { + Attempt int + Error string +} + +func (e RetryEvent) Type() EventType { return Retry } + +// PrepareStepEvent fires between steps within a multi-step agent turn, +// after steering messages are injected and before messages are sent to +// the LLM. Handlers can inspect and replace the context window. +type PrepareStepEvent struct { + // StepNumber is the zero-based step index within the current turn. + StepNumber int + // Messages is the current context window that will be sent to the LLM. + Messages []ContextMessage +} + +func (e PrepareStepEvent) Type() EventType { return PrepareStep } + +// PrepareStepResult allows extensions to replace the context window between +// steps. Return nil Messages to leave the context unchanged. +type PrepareStepResult struct { + // Messages replaces the entire context window for this step. If nil, + // the original messages are used unchanged. Messages with a non-negative + // Index reuse the original message at that position; messages with + // Index < 0 are created fresh from Role + Content. + Messages []ContextMessage +} + +func (PrepareStepResult) isResult() {} + // ThemeColor is an adaptive color pair with light and dark hex values. // Either field may be empty to inherit from the default theme. type ThemeColor struct { diff --git a/internal/extensions/events.go b/internal/extensions/events.go index dde92158..c88a26a7 100644 --- a/internal/extensions/events.go +++ b/internal/extensions/events.go @@ -96,6 +96,35 @@ const ( // SubagentEnd fires when a subagent tool call completes (success // or error). Carries the final response and any error message. SubagentEnd EventType = "subagent_end" + + // StepStart fires when a new LLM call begins within a multi-step + // agent turn. + StepStart EventType = "step_start" + + // StepFinish fires when a step completes, providing step number, + // finish reason, and token usage. + StepFinish EventType = "step_finish" + + // ReasoningStart fires when the LLM begins reasoning/thinking. + ReasoningStart EventType = "reasoning_start" + + // Warnings fires when the LLM provider returns warnings. + Warnings EventType = "warnings" + + // Source fires when the LLM references a source (e.g. web search). + Source EventType = "source" + + // Error fires when an agent-level error occurs during streaming. + Error EventType = "error" + + // Retry fires when the LLM provider request is retried after a + // transient error. + Retry EventType = "retry" + + // PrepareStep fires between steps within a multi-step agent turn, + // after steering messages are injected and before messages are sent + // to the LLM. Handlers can replace the context window for this step. + PrepareStep EventType = "prepare_step" ) // AllEventTypes returns every supported event type. @@ -109,6 +138,8 @@ func AllEventTypes() []EventType { ModelChange, ContextPrepare, BeforeFork, BeforeSessionSwitch, BeforeCompact, SubagentStart, SubagentChunk, SubagentEnd, + StepStart, StepFinish, ReasoningStart, Warnings, Source, Error, Retry, + PrepareStep, } } diff --git a/internal/extensions/events_test.go b/internal/extensions/events_test.go index 0432bf4f..8ef9e791 100644 --- a/internal/extensions/events_test.go +++ b/internal/extensions/events_test.go @@ -4,8 +4,8 @@ import "testing" func TestAllEventTypes_Count(t *testing.T) { all := AllEventTypes() - if len(all) != 24 { - t.Fatalf("expected 24 event types, got %d", len(all)) + if len(all) != 32 { + t.Fatalf("expected 32 event types, got %d", len(all)) } } diff --git a/internal/extensions/loader.go b/internal/extensions/loader.go index 91bae920..ab68e86f 100644 --- a/internal/extensions/loader.go +++ b/internal/extensions/loader.go @@ -618,6 +618,57 @@ func loadSingleExtension(path string) (*LoadedExtension, error) { return nil }) }, + onStepStart: func(h func(StepStartEvent, Context)) { + reg(StepStart, func(e Event, c Context) Result { + h(e.(StepStartEvent), c) + return nil + }) + }, + onStepFinish: func(h func(StepFinishEvent, Context)) { + reg(StepFinish, func(e Event, c Context) Result { + h(e.(StepFinishEvent), c) + return nil + }) + }, + onReasoningStart: func(h func(ReasoningStartEvent, Context)) { + reg(ReasoningStart, func(e Event, c Context) Result { + h(e.(ReasoningStartEvent), c) + return nil + }) + }, + onWarnings: func(h func(WarningsEvent, Context)) { + reg(Warnings, func(e Event, c Context) Result { + h(e.(WarningsEvent), c) + return nil + }) + }, + onSource: func(h func(SourceEvent, Context)) { + reg(Source, func(e Event, c Context) Result { + h(e.(SourceEvent), c) + return nil + }) + }, + onError: func(h func(ErrorEvent, Context)) { + reg(Error, func(e Event, c Context) Result { + h(e.(ErrorEvent), c) + return nil + }) + }, + onRetry: func(h func(RetryEvent, Context)) { + reg(Retry, func(e Event, c Context) Result { + h(e.(RetryEvent), c) + return nil + }) + }, + onPrepareStep: func(h func(PrepareStepEvent, Context) *PrepareStepResult) { + reg(PrepareStep, func(e Event, c Context) Result { + r := h(e.(PrepareStepEvent), c) + if r == nil { + return nil + } + return *r + }) + }, } // Call Init — the extension registers its handlers, tools, commands. diff --git a/internal/extensions/symbols.go b/internal/extensions/symbols.go index 51fdf855..0092f61e 100644 --- a/internal/extensions/symbols.go +++ b/internal/extensions/symbols.go @@ -172,6 +172,17 @@ func Symbols() interp.Exports { "SessionStartEvent": reflect.ValueOf((*SessionStartEvent)(nil)), "SessionShutdownEvent": reflect.ValueOf((*SessionShutdownEvent)(nil)), "ModelChangeEvent": reflect.ValueOf((*ModelChangeEvent)(nil)), + + // Step lifecycle events + "StepStartEvent": reflect.ValueOf((*StepStartEvent)(nil)), + "StepFinishEvent": reflect.ValueOf((*StepFinishEvent)(nil)), + "ReasoningStartEvent": reflect.ValueOf((*ReasoningStartEvent)(nil)), + "WarningsEvent": reflect.ValueOf((*WarningsEvent)(nil)), + "SourceEvent": reflect.ValueOf((*SourceEvent)(nil)), + "ErrorEvent": reflect.ValueOf((*ErrorEvent)(nil)), + "RetryEvent": reflect.ValueOf((*RetryEvent)(nil)), + "PrepareStepEvent": reflect.ValueOf((*PrepareStepEvent)(nil)), + "PrepareStepResult": reflect.ValueOf((*PrepareStepResult)(nil)), }, } } diff --git a/pkg/kit/README.md b/pkg/kit/README.md index f7f451df..24e2d978 100644 --- a/pkg/kit/README.md +++ b/pkg/kit/README.md @@ -106,7 +106,7 @@ unsub2 := host.OnToolResult(func(e kit.ToolResultEvent) { }) defer unsub2() -unsub3 := host.OnStreaming(func(e kit.MessageUpdateEvent) { +unsub3 := host.OnMessageUpdate(func(e kit.MessageUpdateEvent) { fmt.Print(e.Chunk) }) defer unsub3() diff --git a/pkg/kit/events.go b/pkg/kit/events.go index dd955163..2129f322 100644 --- a/pkg/kit/events.go +++ b/pkg/kit/events.go @@ -58,6 +58,31 @@ const ( // EventSteerConsumed fires when one or more steering messages have been // injected into the agent turn via PrepareStep. EventSteerConsumed EventType = "steer_consumed" + // EventStepStart fires when a new LLM call begins within a turn. + EventStepStart EventType = "step_start" + // EventStepFinish fires when a step completes, providing full step context + // including whether tool calls were made, the finish reason, and usage stats. + EventStepFinish EventType = "step_finish" + // EventTextStart fires when the LLM begins generating text content. + EventTextStart EventType = "text_start" + // EventTextEnd fires when the LLM finishes generating text content. + EventTextEnd EventType = "text_end" + // EventReasoningStart fires when the LLM begins reasoning/thinking. + EventReasoningStart EventType = "reasoning_start" + // EventWarnings fires when the LLM provider returns warnings. + EventWarnings EventType = "warnings" + // EventSource fires when the LLM references a source (e.g. from web search). + EventSource EventType = "source" + // EventStreamFinish fires when a per-step LLM stream completes with + // usage stats and a finish reason. + EventStreamFinish EventType = "stream_finish" + // EventError fires when an agent-level error occurs during streaming. + // This is distinct from TurnEndEvent.Error — it fires at the point of + // failure, before the turn ends. + EventError EventType = "error" + // EventRetry fires when the LLM provider request is retried after a + // transient error. + EventRetry EventType = "retry" ) // --------------------------------------------------------------------------- @@ -379,6 +404,100 @@ type SteerConsumedEvent struct { // EventType implements Event. func (e SteerConsumedEvent) EventType() EventType { return EventSteerConsumed } +// StepStartEvent fires when a new LLM call begins within a multi-step agent turn. +type StepStartEvent struct { + StepNumber int +} + +// EventType implements Event. +func (e StepStartEvent) EventType() EventType { return EventStepStart } + +// StepFinishEvent fires when a step completes, providing full step context. +// This is a unified event that carries the same data as the existing +// ToolCallContentEvent and StepUsageEvent, plus additional step metadata. +type StepFinishEvent struct { + StepNumber int + HasToolCalls bool + FinishReason string + Usage LLMUsage +} + +// EventType implements Event. +func (e StepFinishEvent) EventType() EventType { return EventStepFinish } + +// TextStartEvent fires when the LLM begins generating text content. +// Paired with MessageUpdateEvent (deltas) and TextEndEvent. +type TextStartEvent struct { + ID string +} + +// EventType implements Event. +func (e TextStartEvent) EventType() EventType { return EventTextStart } + +// TextEndEvent fires when the LLM finishes generating text content. +type TextEndEvent struct { + ID string +} + +// EventType implements Event. +func (e TextEndEvent) EventType() EventType { return EventTextEnd } + +// ReasoningStartEvent fires when the LLM begins reasoning/thinking. +// Paired with ReasoningDeltaEvent (deltas) and ReasoningCompleteEvent. +type ReasoningStartEvent struct { + ID string +} + +// EventType implements Event. +func (e ReasoningStartEvent) EventType() EventType { return EventReasoningStart } + +// WarningsEvent fires when the LLM provider returns warnings about the request. +type WarningsEvent struct { + Warnings []string +} + +// EventType implements Event. +func (e WarningsEvent) EventType() EventType { return EventWarnings } + +// SourceEvent fires when the LLM references a source (e.g. from web search tools). +type SourceEvent struct { + SourceType string + ID string + URL string + Title string +} + +// EventType implements Event. +func (e SourceEvent) EventType() EventType { return EventSource } + +// StreamFinishEvent fires when a per-step LLM stream completes. +// Provides per-stream usage stats and finish reason. +type StreamFinishEvent struct { + Usage LLMUsage + FinishReason string +} + +// EventType implements Event. +func (e StreamFinishEvent) EventType() EventType { return EventStreamFinish } + +// ErrorEvent fires when an agent-level error occurs during streaming. +// This is distinct from TurnEndEvent.Error — it fires at the point of failure. +type ErrorEvent struct { + Error error +} + +// EventType implements Event. +func (e ErrorEvent) EventType() EventType { return EventError } + +// RetryEvent fires when the LLM provider request is retried after a transient error. +type RetryEvent struct { + Attempt int + Error error +} + +// EventType implements Event. +func (e RetryEvent) EventType() EventType { return EventRetry } + // PasswordPromptEvent fires when a sudo command needs a password. // The TUI should display a password prompt and send the result back via ResponseCh. type PasswordPromptEvent struct { @@ -517,7 +636,16 @@ func (m *Kit) OnToolOutput(handler func(ToolOutputEvent)) func() { // OnStreaming registers a handler that fires only for MessageUpdateEvent // (streaming text chunks). Returns an unsubscribe function. +// +// Deprecated: Use OnMessageUpdate instead. OnStreaming will be removed in a +// future release. func (m *Kit) OnStreaming(handler func(MessageUpdateEvent)) func() { + return m.OnMessageUpdate(handler) +} + +// OnMessageUpdate registers a handler that fires only for MessageUpdateEvent +// (streaming text chunks). Returns an unsubscribe function. +func (m *Kit) OnMessageUpdate(handler func(MessageUpdateEvent)) func() { return m.Subscribe(func(e Event) { if mu, ok := e.(MessageUpdateEvent); ok { handler(mu) @@ -555,6 +683,214 @@ func (m *Kit) OnTurnEnd(handler func(TurnEndEvent)) func() { }) } +// --------------------------------------------------------------------------- +// Typed subscribers for previously unsubscribed event types +// --------------------------------------------------------------------------- + +// OnMessageStart registers a handler that fires only for MessageStartEvent. +// Returns an unsubscribe function. +func (m *Kit) OnMessageStart(handler func(MessageStartEvent)) func() { + return m.Subscribe(func(e Event) { + if ms, ok := e.(MessageStartEvent); ok { + handler(ms) + } + }) +} + +// OnMessageEnd registers a handler that fires only for MessageEndEvent. +// Returns an unsubscribe function. +func (m *Kit) OnMessageEnd(handler func(MessageEndEvent)) func() { + return m.Subscribe(func(e Event) { + if me, ok := e.(MessageEndEvent); ok { + handler(me) + } + }) +} + +// OnReasoningDelta registers a handler that fires only for ReasoningDeltaEvent. +// Returns an unsubscribe function. +func (m *Kit) OnReasoningDelta(handler func(ReasoningDeltaEvent)) func() { + return m.Subscribe(func(e Event) { + if rd, ok := e.(ReasoningDeltaEvent); ok { + handler(rd) + } + }) +} + +// OnReasoningComplete registers a handler that fires only for ReasoningCompleteEvent. +// Returns an unsubscribe function. +func (m *Kit) OnReasoningComplete(handler func(ReasoningCompleteEvent)) func() { + return m.Subscribe(func(e Event) { + if rc, ok := e.(ReasoningCompleteEvent); ok { + handler(rc) + } + }) +} + +// OnToolExecutionStart registers a handler that fires only for ToolExecutionStartEvent. +// Returns an unsubscribe function. +func (m *Kit) OnToolExecutionStart(handler func(ToolExecutionStartEvent)) func() { + return m.Subscribe(func(e Event) { + if tes, ok := e.(ToolExecutionStartEvent); ok { + handler(tes) + } + }) +} + +// OnToolExecutionEnd registers a handler that fires only for ToolExecutionEndEvent. +// Returns an unsubscribe function. +func (m *Kit) OnToolExecutionEnd(handler func(ToolExecutionEndEvent)) func() { + return m.Subscribe(func(e Event) { + if tee, ok := e.(ToolExecutionEndEvent); ok { + handler(tee) + } + }) +} + +// OnToolCallContent registers a handler that fires only for ToolCallContentEvent. +// Returns an unsubscribe function. +func (m *Kit) OnToolCallContent(handler func(ToolCallContentEvent)) func() { + return m.Subscribe(func(e Event) { + if tcc, ok := e.(ToolCallContentEvent); ok { + handler(tcc) + } + }) +} + +// OnStepUsage registers a handler that fires only for StepUsageEvent. +// Returns an unsubscribe function. +func (m *Kit) OnStepUsage(handler func(StepUsageEvent)) func() { + return m.Subscribe(func(e Event) { + if su, ok := e.(StepUsageEvent); ok { + handler(su) + } + }) +} + +// OnCompaction registers a handler that fires only for CompactionEvent. +// Returns an unsubscribe function. +func (m *Kit) OnCompaction(handler func(CompactionEvent)) func() { + return m.Subscribe(func(e Event) { + if ce, ok := e.(CompactionEvent); ok { + handler(ce) + } + }) +} + +// OnSteerConsumed registers a handler that fires only for SteerConsumedEvent. +// Returns an unsubscribe function. +func (m *Kit) OnSteerConsumed(handler func(SteerConsumedEvent)) func() { + return m.Subscribe(func(e Event) { + if sc, ok := e.(SteerConsumedEvent); ok { + handler(sc) + } + }) +} + +// --------------------------------------------------------------------------- +// Typed subscribers for new event types +// --------------------------------------------------------------------------- + +// OnStepStart registers a handler that fires only for StepStartEvent. +// Returns an unsubscribe function. +func (m *Kit) OnStepStart(handler func(StepStartEvent)) func() { + return m.Subscribe(func(e Event) { + if ss, ok := e.(StepStartEvent); ok { + handler(ss) + } + }) +} + +// OnStepFinish registers a handler that fires only for StepFinishEvent. +// Returns an unsubscribe function. +func (m *Kit) OnStepFinish(handler func(StepFinishEvent)) func() { + return m.Subscribe(func(e Event) { + if sf, ok := e.(StepFinishEvent); ok { + handler(sf) + } + }) +} + +// OnTextStart registers a handler that fires only for TextStartEvent. +// Returns an unsubscribe function. +func (m *Kit) OnTextStart(handler func(TextStartEvent)) func() { + return m.Subscribe(func(e Event) { + if ts, ok := e.(TextStartEvent); ok { + handler(ts) + } + }) +} + +// OnTextEnd registers a handler that fires only for TextEndEvent. +// Returns an unsubscribe function. +func (m *Kit) OnTextEnd(handler func(TextEndEvent)) func() { + return m.Subscribe(func(e Event) { + if te, ok := e.(TextEndEvent); ok { + handler(te) + } + }) +} + +// OnReasoningStart registers a handler that fires only for ReasoningStartEvent. +// Returns an unsubscribe function. +func (m *Kit) OnReasoningStart(handler func(ReasoningStartEvent)) func() { + return m.Subscribe(func(e Event) { + if rs, ok := e.(ReasoningStartEvent); ok { + handler(rs) + } + }) +} + +// OnWarnings registers a handler that fires only for WarningsEvent. +// Returns an unsubscribe function. +func (m *Kit) OnWarnings(handler func(WarningsEvent)) func() { + return m.Subscribe(func(e Event) { + if w, ok := e.(WarningsEvent); ok { + handler(w) + } + }) +} + +// OnSource registers a handler that fires only for SourceEvent. +// Returns an unsubscribe function. +func (m *Kit) OnSource(handler func(SourceEvent)) func() { + return m.Subscribe(func(e Event) { + if s, ok := e.(SourceEvent); ok { + handler(s) + } + }) +} + +// OnStreamFinish registers a handler that fires only for StreamFinishEvent. +// Returns an unsubscribe function. +func (m *Kit) OnStreamFinish(handler func(StreamFinishEvent)) func() { + return m.Subscribe(func(e Event) { + if sf, ok := e.(StreamFinishEvent); ok { + handler(sf) + } + }) +} + +// OnError registers a handler that fires only for ErrorEvent. +// Returns an unsubscribe function. +func (m *Kit) OnError(handler func(ErrorEvent)) func() { + return m.Subscribe(func(e Event) { + if ee, ok := e.(ErrorEvent); ok { + handler(ee) + } + }) +} + +// OnRetry registers a handler that fires only for RetryEvent. +// Returns an unsubscribe function. +func (m *Kit) OnRetry(handler func(RetryEvent)) func() { + return m.Subscribe(func(e Event) { + if r, ok := e.(RetryEvent); ok { + handler(r) + } + }) +} + // --------------------------------------------------------------------------- // Subagent event subscriptions // --------------------------------------------------------------------------- diff --git a/pkg/kit/events_test.go b/pkg/kit/events_test.go index d7ac7314..bfea33fd 100644 --- a/pkg/kit/events_test.go +++ b/pkg/kit/events_test.go @@ -1,6 +1,7 @@ package kit import ( + "fmt" "sync" "sync/atomic" "testing" @@ -190,6 +191,74 @@ func TestEventTypes(t *testing.T) { } } +// TestNewEventTypes verifies that each new event struct returns the correct EventType. +func TestNewEventTypes(t *testing.T) { + tests := []struct { + event Event + expected EventType + }{ + {StepStartEvent{StepNumber: 0}, EventStepStart}, + {StepFinishEvent{StepNumber: 1, HasToolCalls: true}, EventStepFinish}, + {TextStartEvent{ID: "text-1"}, EventTextStart}, + {TextEndEvent{ID: "text-1"}, EventTextEnd}, + {ReasoningStartEvent{ID: "reason-1"}, EventReasoningStart}, + {WarningsEvent{Warnings: []string{"test"}}, EventWarnings}, + {SourceEvent{URL: "https://example.com", Title: "Example"}, EventSource}, + {StreamFinishEvent{FinishReason: "stop"}, EventStreamFinish}, + {ErrorEvent{Error: fmt.Errorf("test error")}, EventError}, + {RetryEvent{Attempt: 1, Error: fmt.Errorf("retry error")}, EventRetry}, + {ToolCallStartEvent{}, EventToolCallStart}, + {ToolCallDeltaEvent{}, EventToolCallDelta}, + {ToolCallEndEvent{}, EventToolCallEnd}, + {PasswordPromptEvent{}, EventPasswordPrompt}, + } + + for _, tt := range tests { + if got := tt.event.EventType(); got != tt.expected { + t.Errorf("%T.EventType() = %q, want %q", tt.event, got, tt.expected) + } + } +} + +// TestNewEventEmission verifies that new event types are properly emitted and received. +func TestNewEventEmission(t *testing.T) { + bus := newEventBus() + var received []Event + + bus.subscribe(func(e Event) { + received = append(received, e) + }) + + bus.emit(StepStartEvent{StepNumber: 0}) + bus.emit(TextStartEvent{ID: "text-1"}) + bus.emit(TextEndEvent{ID: "text-1"}) + bus.emit(ReasoningStartEvent{ID: "reason-1"}) + bus.emit(WarningsEvent{Warnings: []string{"low confidence"}}) + bus.emit(SourceEvent{URL: "https://example.com", Title: "Example"}) + bus.emit(StreamFinishEvent{FinishReason: "stop"}) + bus.emit(StepFinishEvent{StepNumber: 0, HasToolCalls: false, FinishReason: "stop"}) + bus.emit(ErrorEvent{Error: fmt.Errorf("test error")}) + bus.emit(RetryEvent{Attempt: 1, Error: fmt.Errorf("retry")}) + + if len(received) != 10 { + t.Fatalf("expected 10 events, got %d", len(received)) + } + + // Verify specific event fields + if ss, ok := received[0].(StepStartEvent); !ok || ss.StepNumber != 0 { + t.Errorf("event 0: expected StepStartEvent{StepNumber:0}, got %T %+v", received[0], received[0]) + } + if ts, ok := received[1].(TextStartEvent); !ok || ts.ID != "text-1" { + t.Errorf("event 1: expected TextStartEvent{ID:text-1}, got %T %+v", received[1], received[1]) + } + if w, ok := received[4].(WarningsEvent); !ok || len(w.Warnings) != 1 || w.Warnings[0] != "low confidence" { + t.Errorf("event 4: expected WarningsEvent with 1 warning, got %T %+v", received[4], received[4]) + } + if sf, ok := received[7].(StepFinishEvent); !ok || sf.StepNumber != 0 || sf.HasToolCalls { + t.Errorf("event 7: expected StepFinishEvent{StepNumber:0, HasToolCalls:false}, got %T %+v", received[7], received[7]) + } +} + // TestEventBusListenerCanUnsubscribeInCallback verifies that a listener can // safely call its own unsubscribe function from within the callback. func TestEventBusListenerCanUnsubscribeInCallback(t *testing.T) { diff --git a/pkg/kit/extensions_bridge.go b/pkg/kit/extensions_bridge.go index 7825d888..af589786 100644 --- a/pkg/kit/extensions_bridge.go +++ b/pkg/kit/extensions_bridge.go @@ -356,4 +356,134 @@ func (m *Kit) bridgeExtensions(runner *extensions.Runner) { return nil }) } + + // --- Step lifecycle observation events --- + + if runner.HasHandlers(extensions.StepStart) { + m.Subscribe(func(e Event) { + if ev, ok := e.(StepStartEvent); ok { + _, _ = runner.Emit(extensions.StepStartEvent{StepNumber: ev.StepNumber}) + } + }) + } + + if runner.HasHandlers(extensions.StepFinish) { + m.Subscribe(func(e Event) { + if ev, ok := e.(StepFinishEvent); ok { + _, _ = runner.Emit(extensions.StepFinishEvent{ + StepNumber: ev.StepNumber, + HasToolCalls: ev.HasToolCalls, + FinishReason: ev.FinishReason, + InputTokens: ev.Usage.InputTokens, + OutputTokens: ev.Usage.OutputTokens, + CacheReadTokens: ev.Usage.CacheReadTokens, + CacheWriteTokens: ev.Usage.CacheCreationTokens, + }) + } + }) + } + + if runner.HasHandlers(extensions.ReasoningStart) { + m.Subscribe(func(e Event) { + if ev, ok := e.(ReasoningStartEvent); ok { + _, _ = runner.Emit(extensions.ReasoningStartEvent{ID: ev.ID}) + } + }) + } + + if runner.HasHandlers(extensions.Warnings) { + m.Subscribe(func(e Event) { + if ev, ok := e.(WarningsEvent); ok { + _, _ = runner.Emit(extensions.WarningsEvent{Warnings: ev.Warnings}) + } + }) + } + + if runner.HasHandlers(extensions.Source) { + m.Subscribe(func(e Event) { + if ev, ok := e.(SourceEvent); ok { + _, _ = runner.Emit(extensions.SourceEvent{ + SourceType: ev.SourceType, + ID: ev.ID, + URL: ev.URL, + Title: ev.Title, + }) + } + }) + } + + if runner.HasHandlers(extensions.Error) { + m.Subscribe(func(e Event) { + if ev, ok := e.(ErrorEvent); ok { + _, _ = runner.Emit(extensions.ErrorEvent{Error: ev.Error.Error()}) + } + }) + } + + if runner.HasHandlers(extensions.Retry) { + m.Subscribe(func(e Event) { + if ev, ok := e.(RetryEvent); ok { + _, _ = runner.Emit(extensions.RetryEvent{ + Attempt: ev.Attempt, + Error: ev.Error.Error(), + }) + } + }) + } + + // --- PrepareStep hook --- + // Extension PrepareStep → SDK PrepareStep hook. + // Same pattern as ContextPrepare: convert LLMMessage ↔ ContextMessage. + if runner.HasHandlers(extensions.PrepareStep) { + m.OnPrepareStep(HookPriorityNormal, func(h PrepareStepHook) *PrepareStepResult { + // Convert LLM message slice to extension ContextMessage slice. + extMsgs := make([]extensions.ContextMessage, len(h.Messages)) + for i, msg := range h.Messages { + var sb strings.Builder + for _, part := range msg.Content { + if tp, ok := part.(LLMTextPart); ok { + sb.WriteString(tp.Text) + } + } + extMsgs[i] = extensions.ContextMessage{ + Index: i, + Role: string(msg.Role), + Content: sb.String(), + } + } + + result, _ := runner.Emit(extensions.PrepareStepEvent{ + StepNumber: h.StepNumber, + Messages: extMsgs, + }) + r, ok := result.(extensions.PrepareStepResult) + if !ok || r.Messages == nil { + return nil + } + + // Rebuild LLM message slice from extension result. + rebuilt := make([]LLMMessage, 0, len(r.Messages)) + for _, cm := range r.Messages { + if cm.Index >= 0 && cm.Index < len(h.Messages) { + rebuilt = append(rebuilt, h.Messages[cm.Index]) + } else { + role := LLMRoleUser + switch cm.Role { + case "assistant": + role = LLMRoleAssistant + case "system": + role = LLMRoleSystem + case "tool": + role = LLMRoleTool + } + rebuilt = append(rebuilt, LLMMessage{ + Role: role, + Content: []LLMMessagePart{LLMTextPart{Text: cm.Content}}, + }) + } + } + + return &PrepareStepResult{Messages: rebuilt} + }) + } } diff --git a/pkg/kit/hooks.go b/pkg/kit/hooks.go index e122f269..a22af13b 100644 --- a/pkg/kit/hooks.go +++ b/pkg/kit/hooks.go @@ -121,6 +121,32 @@ type BeforeCompactResult struct { Summary string } +// PrepareStepHook is the input for hooks that fire between steps within a +// multi-step agent turn, with full message replacement capability. This is +// the most powerful interception point — it fires after the existing steering +// logic (if any) and before the messages are sent to the LLM. +// +// Use cases: +// - Transforming tool results (e.g. converting image tool results to FilePart +// user messages for vision models that don't support media in tool results) +// - Dynamic tool filtering per step +// - Mid-turn context injection beyond simple steering +// - Custom stop conditions that inspect message history +type PrepareStepHook struct { + // StepNumber is the zero-based step index within the current turn. + StepNumber int + // Messages is the current context window that will be sent to the LLM. + // This includes any steering messages already injected in this step. + Messages []LLMMessage +} + +// PrepareStepResult can replace the context window between steps. +type PrepareStepResult struct { + // Messages replaces the entire context window for this step. If nil, + // the original messages (including any steering) are used unchanged. + Messages []LLMMessage +} + // --------------------------------------------------------------------------- // Generic hook registry with priority ordering // --------------------------------------------------------------------------- @@ -248,6 +274,19 @@ func (m *Kit) OnBeforeCompact(p HookPriority, h func(BeforeCompactHook) *BeforeC return m.beforeCompact.register(p, h) } +// OnPrepareStep registers a hook that fires between steps within a multi-step +// agent turn, after steering messages are injected and before the messages are +// sent to the LLM. Return a non-nil PrepareStepResult with Messages to replace +// the entire context window for this step. Hooks execute in priority order; +// the first non-nil result wins. Returns an unregister function. +// +// This is the most powerful interception point in the agent lifecycle. It +// enables patterns like transforming tool results, dynamic tool filtering, +// and mid-turn context injection. +func (m *Kit) OnPrepareStep(p HookPriority, h func(PrepareStepHook) *PrepareStepResult) func() { + return m.prepareStep.register(p, h) +} + // --------------------------------------------------------------------------- // Tool wrapping via hooks // --------------------------------------------------------------------------- diff --git a/pkg/kit/hooks_test.go b/pkg/kit/hooks_test.go index 4c318ce9..26250e63 100644 --- a/pkg/kit/hooks_test.go +++ b/pkg/kit/hooks_test.go @@ -538,3 +538,75 @@ func TestKit_HookMethodsExist(t *testing.T) { u3() u4() } + +// TestPrepareStepHookRegistry verifies registration and execution of PrepareStep hooks. +func TestPrepareStepHookRegistry(t *testing.T) { + hr := newHookRegistry[PrepareStepHook, PrepareStepResult]() + + // Register a hook that appends a message. + hr.register(HookPriorityNormal, func(h PrepareStepHook) *PrepareStepResult { + if h.StepNumber == 0 { + // On step 0, prepend a system message. + newMsgs := make([]LLMMessage, 0, len(h.Messages)+1) + newMsgs = append(newMsgs, fantasy.NewSystemMessage("injected")) + newMsgs = append(newMsgs, h.Messages...) + return &PrepareStepResult{Messages: newMsgs} + } + return nil // No modification for other steps. + }) + + // Test step 0 — should modify messages. + input := PrepareStepHook{ + StepNumber: 0, + Messages: []LLMMessage{fantasy.NewUserMessage("hello")}, + } + result := hr.run(input) + if result == nil { + t.Fatal("expected non-nil result for step 0") + } + if len(result.Messages) != 2 { + t.Fatalf("expected 2 messages, got %d", len(result.Messages)) + } + if result.Messages[0].Role != fantasy.MessageRoleSystem { + t.Errorf("expected system message first, got role %q", result.Messages[0].Role) + } + + // Test step 1 — should return nil (no modification). + input.StepNumber = 1 + result = hr.run(input) + if result != nil { + t.Errorf("expected nil result for step 1, got %+v", result) + } +} + +// TestPrepareStepHookPriority verifies that PrepareStep hooks respect priority ordering. +func TestPrepareStepHookPriority(t *testing.T) { + hr := newHookRegistry[PrepareStepHook, PrepareStepResult]() + + var order []string + + // Low priority — should run second. + hr.register(HookPriorityLow, func(_ PrepareStepHook) *PrepareStepResult { + order = append(order, "low") + return nil + }) + + // High priority — should run first and win. + hr.register(HookPriorityHigh, func(h PrepareStepHook) *PrepareStepResult { + order = append(order, "high") + return &PrepareStepResult{Messages: h.Messages} + }) + + input := PrepareStepHook{ + StepNumber: 0, + Messages: []LLMMessage{fantasy.NewUserMessage("test")}, + } + result := hr.run(input) + + if result == nil { + t.Fatal("expected non-nil result") + } + if len(order) != 1 || order[0] != "high" { + t.Errorf("expected [high] (first non-nil wins), got %v", order) + } +} diff --git a/pkg/kit/kit.go b/pkg/kit/kit.go index ca72f768..2ced985d 100644 --- a/pkg/kit/kit.go +++ b/pkg/kit/kit.go @@ -66,6 +66,7 @@ type Kit struct { afterTurn *hookRegistry[AfterTurnHook, AfterTurnResult] contextPrepare *hookRegistry[ContextPrepareHook, ContextPrepareResult] beforeCompact *hookRegistry[BeforeCompactHook, BeforeCompactResult] + prepareStep *hookRegistry[PrepareStepHook, PrepareStepResult] // lastInputTokens stores the API-reported input token count from the // most recent turn. Used by GetContextStats() to return accurate usage @@ -1368,6 +1369,7 @@ func New(ctx context.Context, opts *Options) (*Kit, error) { afterTurn := newHookRegistry[AfterTurnHook, AfterTurnResult]() contextPrepare := newHookRegistry[ContextPrepareHook, ContextPrepareResult]() beforeCompact := newHookRegistry[BeforeCompactHook, BeforeCompactResult]() + prepareStep := newHookRegistry[PrepareStepHook, PrepareStepResult]() // Build agent setup options, pulling CLI-specific fields when available. // Pass the pre-built ProviderConfig and scalar viper snapshots so @@ -1461,6 +1463,7 @@ func New(ctx context.Context, opts *Options) (*Kit, error) { afterTurn: afterTurn, contextPrepare: contextPrepare, beforeCompact: beforeCompact, + prepareStep: prepareStep, } // Bridge extension events to SDK hooks. @@ -1901,21 +1904,21 @@ func (m *Kit) generate(ctx context.Context, messages []fantasy.Message) (*agent. return sr, err }) - return m.agent.GenerateWithLoopAndStreaming(ctx, messages, - func(toolCallID, toolName, toolArgs string) { + return m.agent.GenerateWithCallbacks(ctx, messages, agent.GenerateCallbacks{ + OnToolCall: func(toolCallID, toolName, toolArgs string) { m.events.emit(ToolCallEvent{ ToolCallID: toolCallID, ToolName: toolName, ToolKind: toolKindFor(toolName), ToolArgs: toolArgs, ParsedArgs: parseToolArgs(toolArgs), }) }, - func(toolCallID, toolName, toolArgs string, isStarting bool) { + OnToolExecution: func(toolCallID, toolName, toolArgs string, isStarting bool) { if isStarting { m.events.emit(ToolExecutionStartEvent{ToolCallID: toolCallID, ToolName: toolName, ToolKind: toolKindFor(toolName), ToolArgs: toolArgs}) } else { m.events.emit(ToolExecutionEndEvent{ToolCallID: toolCallID, ToolName: toolName, ToolKind: toolKindFor(toolName)}) } }, - func(toolCallID, toolName, toolArgs, resultText, metadata string, isError bool) { + OnToolResult: func(toolCallID, toolName, toolArgs, resultText, metadata string, isError bool) { evt := ToolResultEvent{ ToolCallID: toolCallID, ToolName: toolName, ToolKind: toolKindFor(toolName), ToolArgs: toolArgs, ParsedArgs: parseToolArgs(toolArgs), @@ -1929,17 +1932,17 @@ func (m *Kit) generate(ctx context.Context, messages []fantasy.Message) (*agent. } m.events.emit(evt) }, - func(content string) { + OnResponse: func(content string) { m.events.emit(ResponseEvent{Content: content}) }, - func(content string) { + OnToolCallContent: func(content string) { m.events.emit(ToolCallContentEvent{Content: content}) }, // tag filtering: models like Qwen/DeepSeek wrap reasoning inside // ... tags in the regular text stream. We intercept those // spans here and re-route them as ReasoningDeltaEvent/ReasoningCompleteEvent // so callers always receive clean, tag-free text and structured reasoning. - func() func(chunk string) { + OnStreamingResponse: func() func(chunk string) { const ( thinkOpen = "" thinkClose = "" @@ -1975,14 +1978,13 @@ func (m *Kit) generate(ctx context.Context, messages []fantasy.Message) (*agent. } } }(), - func(delta string) { + OnReasoningDelta: func(delta string) { m.events.emit(ReasoningDeltaEvent{Delta: delta}) }, - func() { + OnReasoningComplete: func() { m.events.emit(ReasoningCompleteEvent{}) }, - func(toolCallID, toolName, chunk string, isStderr bool) { - // Emit tool output chunk event for streaming bash output + OnToolOutput: func(toolCallID, toolName, chunk string, isStderr bool) { m.events.emit(ToolOutputEvent{ ToolCallID: toolCallID, ToolName: toolName, @@ -1991,18 +1993,13 @@ func (m *Kit) generate(ctx context.Context, messages []fantasy.Message) (*agent. }) }, // Persist step messages incrementally so that progress survives - // crashes and long-running turns don't lose work. Each step's - // messages are persisted as a unit: for tool-calling steps this is - // the assistant message (with tool_use parts) + tool-role message - // (with tool_result parts) as a pair; for the final step it's the - // assistant text/reasoning message alone. - func(stepMessages []fantasy.Message) { + // crashes and long-running turns don't lose work. + OnStepMessages: func(stepMessages []fantasy.Message) { for _, msg := range stepMessages { _, _ = m.session.AppendMessage(msg) } }, - func(inputTokens, outputTokens, cacheReadTokens, cacheCreationTokens int64) { - // Emit step usage event for real-time cost tracking + OnStepUsage: func(inputTokens, outputTokens, cacheReadTokens, cacheCreationTokens int64) { if viper.GetBool("debug") { log.Printf("DEBUG Kit.generate emitting StepUsageEvent: input=%d output=%d cacheRead=%d cacheCreate=%d", inputTokens, outputTokens, cacheReadTokens, cacheCreationTokens, @@ -2016,37 +2013,97 @@ func (m *Kit) generate(ctx context.Context, messages []fantasy.Message) (*agent. }) }, // Password prompt handler for sudo commands - func(prompt string) (string, bool) { - // Emit event to TUI and wait for response via channel + OnPasswordPrompt: func(prompt string) (string, bool) { responseCh := make(chan PasswordPromptResponse, 1) m.events.emit(PasswordPromptEvent{ Prompt: prompt, ResponseCh: responseCh, }) - // Wait for response (TUI will send password or cancel) resp := <-responseCh return resp.Password, resp.Cancelled }, - // Tool call argument streaming — fire as the LLM generates tool arguments - func(toolCallID, toolName string) { + // Tool call argument streaming + OnToolCallStart: func(toolCallID, toolName string) { m.events.emit(ToolCallStartEvent{ ToolCallID: toolCallID, ToolName: toolName, ToolKind: toolKindFor(toolName), }) }, - func(toolCallID, delta string) { + OnToolCallDelta: func(toolCallID, delta string) { m.events.emit(ToolCallDeltaEvent{ ToolCallID: toolCallID, Delta: delta, }) }, - func(toolCallID string) { + OnToolCallEnd: func(toolCallID string) { m.events.emit(ToolCallEndEvent{ ToolCallID: toolCallID, }) }, - ) + + // New callbacks for previously unwired Fantasy lifecycle events. + OnStepStart: func(stepNumber int) { + m.events.emit(StepStartEvent{StepNumber: stepNumber}) + }, + OnStepFinish: func(stepNumber int, hasToolCalls bool, finishReason string, usage fantasy.Usage) { + m.events.emit(StepFinishEvent{ + StepNumber: stepNumber, + HasToolCalls: hasToolCalls, + FinishReason: finishReason, + Usage: usage, + }) + }, + OnTextStart: func(id string) { + m.events.emit(TextStartEvent{ID: id}) + }, + OnTextEnd: func(id string) { + m.events.emit(TextEndEvent{ID: id}) + }, + OnReasoningStart: func(id string) { + m.events.emit(ReasoningStartEvent{ID: id}) + }, + OnWarnings: func(warnings []string) { + m.events.emit(WarningsEvent{Warnings: warnings}) + }, + OnSource: func(sourceType, id, url, title string) { + m.events.emit(SourceEvent{ + SourceType: sourceType, + ID: id, + URL: url, + Title: title, + }) + }, + OnStreamFinish: func(usage fantasy.Usage, finishReason string) { + m.events.emit(StreamFinishEvent{ + Usage: usage, + FinishReason: finishReason, + }) + }, + OnError: func(err error) { + m.events.emit(ErrorEvent{Error: err}) + }, + OnRetry: func(attempt int, err error) { + m.events.emit(RetryEvent{Attempt: attempt, Error: err}) + }, + // PrepareStep hook — compose with steering (handled in agent layer) + // and then run SDK consumer hooks. + OnPrepareStep: func() agent.PrepareStepHandler { + if !m.prepareStep.hasHooks() { + return nil + } + return func(stepNumber int, messages []fantasy.Message) []fantasy.Message { + hookResult := m.prepareStep.run(PrepareStepHook{ + StepNumber: stepNumber, + Messages: messages, + }) + if hookResult != nil && hookResult.Messages != nil { + return hookResult.Messages + } + return nil + } + }(), + }) } // runTurn is the shared lifecycle for every prompt mode: diff --git a/skills/kit-sdk/SKILL.md b/skills/kit-sdk/SKILL.md index 47297d91..a136b3d6 100644 --- a/skills/kit-sdk/SKILL.md +++ b/skills/kit-sdk/SKILL.md @@ -281,7 +281,7 @@ host.OnToolOutput(func(e kit.ToolOutputEvent) { // Streaming bash output chunks }) -host.OnStreaming(func(e kit.MessageUpdateEvent) { +host.OnMessageUpdate(func(e kit.MessageUpdateEvent) { fmt.Print(e.Chunk) // real-time text streaming }) @@ -296,8 +296,64 @@ host.OnTurnStart(func(e kit.TurnStartEvent) { host.OnTurnEnd(func(e kit.TurnEndEvent) { // e.Response, e.Error, e.StopReason }) + +host.OnStepStart(func(e kit.StepStartEvent) { + // e.StepNumber — which LLM call step (1-based) +}) + +host.OnStepFinish(func(e kit.StepFinishEvent) { + // e.StepNumber, e.HasToolCalls, e.FinishReason, e.Usage (LLMUsage) +}) + +host.OnWarnings(func(e kit.WarningsEvent) { + for _, w := range e.Warnings { + log.Printf("warning: %s", w) + } +}) + +host.OnError(func(e kit.ErrorEvent) { + log.Printf("agent error: %v", e.Error) +}) + +host.OnRetry(func(e kit.RetryEvent) { + log.Printf("retrying (attempt %d): %v", e.Attempt, e.Error) +}) + +host.OnTextStart(func(e kit.TextStartEvent) { + // e.ID — content block ID +}) + +host.OnTextEnd(func(e kit.TextEndEvent) { + // e.ID — content block ID +}) + +host.OnReasoningStart(func(e kit.ReasoningStartEvent) { + // e.ID — reasoning block ID +}) + +host.OnSource(func(e kit.SourceEvent) { + // e.SourceType, e.ID, e.URL, e.Title +}) + +host.OnStreamFinish(func(e kit.StreamFinishEvent) { + // e.Usage (LLMUsage), e.FinishReason +}) + +// Additional typed subscribers for previously generic-only events: +host.OnMessageStart(func(e kit.MessageStartEvent) {}) +host.OnMessageEnd(func(e kit.MessageEndEvent) { /* e.Content */ }) +host.OnReasoningDelta(func(e kit.ReasoningDeltaEvent) { /* e.Delta */ }) +host.OnReasoningComplete(func(e kit.ReasoningCompleteEvent) {}) +host.OnToolExecutionStart(func(e kit.ToolExecutionStartEvent) { /* e.ToolCallID, e.ToolName, e.ToolKind, e.ToolArgs */ }) +host.OnToolExecutionEnd(func(e kit.ToolExecutionEndEvent) { /* e.ToolCallID, e.ToolName, e.ToolKind */ }) +host.OnToolCallContent(func(e kit.ToolCallContentEvent) { /* e.Content */ }) +host.OnStepUsage(func(e kit.StepUsageEvent) { /* e.InputTokens, e.OutputTokens, e.CacheReadTokens, e.CacheWriteTokens */ }) +host.OnCompaction(func(e kit.CompactionEvent) { /* e.Summary, e.OriginalTokens, e.CompactedTokens, ... */ }) +host.OnSteerConsumed(func(e kit.SteerConsumedEvent) { /* e.Count */ }) ``` +> **Rename note:** `OnStreaming` has been renamed to `OnMessageUpdate`. The old `OnStreaming` name is kept as a deprecated alias for one release cycle. + ### Generic subscriber (receives all events) ```go @@ -336,6 +392,16 @@ unsub := host.Subscribe(func(e kit.Event) { | `reasoning_delta` | `ReasoningDeltaEvent` | `Delta` | | `step_usage` | `StepUsageEvent` | `InputTokens`, `OutputTokens`, `CacheReadTokens`, `CacheWriteTokens` | | `steer_consumed` | `SteerConsumedEvent` | `Count` | +| `step_start` | `StepStartEvent` | `StepNumber` | +| `step_finish` | `StepFinishEvent` | `StepNumber`, `HasToolCalls`, `FinishReason`, `Usage` | +| `text_start` | `TextStartEvent` | `ID` | +| `text_end` | `TextEndEvent` | `ID` | +| `reasoning_start` | `ReasoningStartEvent` | `ID` | +| `warnings` | `WarningsEvent` | `Warnings` | +| `source` | `SourceEvent` | `SourceType`, `ID`, `URL`, `Title` | +| `stream_finish` | `StreamFinishEvent` | `Usage`, `FinishReason` | +| `error` | `ErrorEvent` | `Error` | +| `retry` | `RetryEvent` | `Attempt`, `Error` | | `password_prompt` | `PasswordPromptEvent` | `Prompt`, `ResponseCh` | **Tool call streaming lifecycle**: `ToolCallStartEvent` → `ToolCallDeltaEvent` (repeated) → `ToolCallEndEvent` → `ToolCallEvent` → `ToolExecutionStartEvent` → `ToolOutputEvent` (optional, repeated) → `ToolExecutionEndEvent` → `ToolResultEvent` @@ -421,6 +487,20 @@ host.OnAfterTurn(kit.HookPriorityNormal, func(h kit.AfterTurnHook) { }) ``` +### PrepareStep — intercept/replace messages before each LLM call + +```go +host.OnPrepareStep(kit.HookPriorityNormal, func(h kit.PrepareStepHook) *kit.PrepareStepResult { + // h.StepNumber — which step in the current turn (1-based) + // h.Messages — []kit.LLMMessage being sent to the LLM + // Return nil to pass through unchanged, or replace messages: + modified := filterSensitiveMessages(h.Messages) + return &kit.PrepareStepResult{Messages: modified} +}) +``` + +`PrepareStep` fires before every LLM API call within a turn (including tool-call loop iterations). Unlike `ContextPrepare` (which operates on the full context window once per turn), `PrepareStep` runs per-step and sees the messages that include the latest tool results. + ### ContextPrepare — filter/inject context window ```go @@ -1172,7 +1252,7 @@ for { ### Pattern: Streaming output to terminal ```go -host.OnStreaming(func(e kit.MessageUpdateEvent) { +host.OnMessageUpdate(func(e kit.MessageUpdateEvent) { fmt.Print(e.Chunk) }) response, _ := host.Prompt(ctx, "Write a poem") diff --git a/www/pages/sdk/callbacks.md b/www/pages/sdk/callbacks.md index 87d34815..c3c50400 100644 --- a/www/pages/sdk/callbacks.md +++ b/www/pages/sdk/callbacks.md @@ -20,7 +20,7 @@ unsub2 := host.OnToolResult(func(event kit.ToolResultEvent) { }) defer unsub2() -unsub3 := host.OnStreaming(func(event kit.MessageUpdateEvent) { +unsub3 := host.OnMessageUpdate(func(event kit.MessageUpdateEvent) { fmt.Print(event.Chunk) }) defer unsub3() @@ -116,6 +116,24 @@ host.OnAfterTurn(kit.HookPriorityNormal, func(h kit.AfterTurnHook) { }) ``` +### PrepareStep — intercept messages between steps + +The most powerful hook — fires between steps within a multi-step agent turn, after any steering messages are injected and before messages are sent to the LLM. Can replace the entire context window. + +```go +host.OnPrepareStep(kit.HookPriorityNormal, func(h kit.PrepareStepHook) *kit.PrepareStepResult { + // h.StepNumber — zero-based step index within the turn + // h.Messages — current context window (includes any steering) + + // Example: transform tool results with images into user messages + modified := transformImageToolResults(h.Messages) + return &kit.PrepareStepResult{Messages: modified} + // Return nil to pass through unchanged +}) +``` + +Use cases: transforming tool results (e.g., image data for vision models), dynamic tool filtering per step, mid-turn context injection, custom stop conditions. + ### Hook priorities ```go @@ -128,19 +146,41 @@ Lower values run first. First non-nil result wins. ## All event types -| Event | Description | -|-------|-------------| -| `ToolCallStartEvent` | LLM began generating tool call arguments (tool name known, args streaming) | -| `ToolCallDeltaEvent` | Streamed JSON fragment of tool call arguments | -| `ToolCallEndEvent` | Tool argument streaming complete, before execution begins | -| `ToolCallEvent` | Tool call fully parsed and about to execute | -| `ToolResultEvent` | Tool execution completed with result | -| `ToolOutputEvent` | Streaming output chunk from tool (e.g., bash stdout/stderr) | -| `MessageUpdateEvent` | Streaming text chunk from LLM | -| `ResponseEvent` | Final response received | -| `TurnStartEvent` | Agent turn started | -| `TurnEndEvent` | Agent turn completed | -| `PasswordPromptEvent` | Sudo command needs password (respond via `ResponseCh`) | +| Event | Typed Subscriber | Description | +|-------|-----------------|-------------| +| `TurnStartEvent` | `OnTurnStart` | Agent turn started | +| `TurnEndEvent` | `OnTurnEnd` | Agent turn completed | +| `MessageStartEvent` | `OnMessageStart` | New assistant message begins | +| `MessageUpdateEvent` | `OnMessageUpdate` | Streaming text chunk from LLM | +| `MessageEndEvent` | `OnMessageEnd` | Assistant message complete | +| `ToolCallStartEvent` | `OnToolCallStart` | LLM began generating tool call arguments | +| `ToolCallDeltaEvent` | `OnToolCallDelta` | Streamed JSON fragment of tool call arguments | +| `ToolCallEndEvent` | `OnToolCallEnd` | Tool argument streaming complete | +| `ToolCallEvent` | `OnToolCall` | Tool call fully parsed, about to execute | +| `ToolExecutionStartEvent` | `OnToolExecutionStart` | Tool begins executing | +| `ToolExecutionEndEvent` | `OnToolExecutionEnd` | Tool finishes executing | +| `ToolResultEvent` | `OnToolResult` | Tool execution completed with result | +| `ToolCallContentEvent` | `OnToolCallContent` | Text content alongside tool calls | +| `ToolOutputEvent` | `OnToolOutput` | Streaming output chunk from tool (e.g., bash) | +| `ResponseEvent` | `OnResponse` | Final response received | +| `ReasoningStartEvent` | `OnReasoningStart` | LLM begins reasoning/thinking | +| `ReasoningDeltaEvent` | `OnReasoningDelta` | Streaming reasoning/thinking chunk | +| `ReasoningCompleteEvent` | `OnReasoningComplete` | Reasoning/thinking finished | +| `StepStartEvent` | `OnStepStart` | New LLM call begins within a turn | +| `StepFinishEvent` | `OnStepFinish` | Step completes (with usage, finish reason, tool call info) | +| `StepUsageEvent` | `OnStepUsage` | Per-step token usage | +| `StreamFinishEvent` | `OnStreamFinish` | Per-step stream completes (with usage + finish reason) | +| `TextStartEvent` | `OnTextStart` | LLM begins text content generation | +| `TextEndEvent` | `OnTextEnd` | LLM finishes text content generation | +| `WarningsEvent` | `OnWarnings` | LLM provider returned warnings | +| `SourceEvent` | `OnSource` | LLM referenced a source (e.g., web search) | +| `ErrorEvent` | `OnError` | Agent-level error during streaming | +| `RetryEvent` | `OnRetry` | LLM request retried after transient error | +| `CompactionEvent` | `OnCompaction` | Conversation compacted | +| `SteerConsumedEvent` | `OnSteerConsumed` | Steering messages injected into turn | +| `PasswordPromptEvent` | — | Sudo command needs password (respond via `ResponseCh`) | + +> **Note:** `OnStreaming` is a deprecated alias for `OnMessageUpdate` and will be removed in a future release. ## Subagent event monitoring diff --git a/www/pages/sdk/overview.md b/www/pages/sdk/overview.md index d3cee101..7a22c93c 100644 --- a/www/pages/sdk/overview.md +++ b/www/pages/sdk/overview.md @@ -143,7 +143,7 @@ host.OnToolResult(func(event kit.ToolResultEvent) { fmt.Println("Tool result:", event.Name) }) -host.OnStreaming(func(event kit.MessageUpdateEvent) { +host.OnMessageUpdate(func(event kit.MessageUpdateEvent) { fmt.Print(event.Chunk) }) ```