Files
kit/internal/app/app.go
Ed Zynda 8823977612 test(app): cover steer-drain branch of releaseBusyAfterCompact
- Add unexported steerDrainFn test seam on App so unit tests can
  inject fake steer items without standing up a full *kit.Kit
  (Options.Kit is a concrete struct, not an interface).
- releaseBusyAfterCompact now prefers the seam over Kit.DrainSteer
  via a small switch; production behaviour is unchanged when the
  field is nil.
- Add TestReleaseBusyAfterCompact_splicesSteerAheadOfQueue, which
  pre-populates both fake steer items and ordinary queue prompts,
  invokes releaseBusyAfterCompact, and asserts the first dispatched
  prompt is the steer item — proving steer messages retain 'act now'
  priority and that drainQueue is actually launched (the bug from
  #27).
2026-05-08 12:18:52 +03:00

1460 lines
47 KiB
Go

package app
import (
"context"
"fmt"
"log"
"os"
"sync"
"sync/atomic"
"time"
tea "charm.land/bubbletea/v2"
"charm.land/fantasy"
"github.com/mark3labs/kit/internal/extensions"
"github.com/mark3labs/kit/internal/session"
kit "github.com/mark3labs/kit/pkg/kit"
)
// queueItem holds a prompt and optional image attachments for the execution queue.
type queueItem struct {
Prompt string
Files []kit.LLMFilePart
}
// App is the application-layer orchestrator. It owns the agentic loop,
// conversation history (via MessageStore), and queue management. It is
// designed to be created once per session and reused across multiple prompts.
//
// In interactive mode the caller creates a tea.Program and registers it via
// SetProgram; App then sends events to it as agent work progresses.
//
// In non-interactive mode the caller uses RunOnce, which writes the response
// directly to an io.Writer.
//
// App satisfies the ui.AppController interface defined in internal/ui/model.go:
//
// Run(prompt string) int
// CancelCurrentStep()
// QueueLength() int
// ClearQueue()
// ClearMessages()
type App struct {
opts Options
// store holds the conversation history.
store *MessageStore
// program is the Bubble Tea program used to send events in interactive mode.
// Nil in non-interactive mode.
program *tea.Program
// cancelStep cancels the current in-flight agent step. It is replaced on
// each new step and called by CancelCurrentStep().
cancelStep context.CancelFunc
// mu protects busy, queue, and cancelStep.
mu sync.Mutex
busy bool
queue []queueItem
// wg tracks in-flight goroutines; Close() waits on it.
wg sync.WaitGroup
// closed is set to true after Close() is called; new Run() calls are
// silently dropped.
closed bool
// rootCtx/rootCancel are used to signal shutdown to all goroutines.
rootCtx context.Context
rootCancel context.CancelFunc
// widgetUpdatePending is set to true when a WidgetUpdateEvent has been
// sent to the TUI but not yet consumed by its event loop. While the flag
// is set, subsequent NotifyWidgetUpdate calls are coalesced (dropped) to
// prevent fast extension tickers from flooding the BubbleTea mailbox with
// redundant re-render triggers. The flag is cleared after a short debounce
// (~1 frame) so new updates are always let through once the TUI has had a
// chance to process the pending event.
widgetUpdatePending atomic.Bool
// steerDrainFn is the test seam used by releaseBusyAfterCompact to pull
// any steer messages that arrived during compaction. In production it is
// nil and the helper falls back to a.opts.Kit.DrainSteer(); tests that
// need to exercise the steer-drain path without standing up a full
// *kit.Kit can set this field directly to inject fake items.
steerDrainFn func() []queueItem
}
// New creates a new App with the provided options and pre-loaded messages.
// initialMessages may be nil or empty for a fresh session.
func New(opts Options, initialMessages []kit.LLMMessage) *App {
rootCtx, rootCancel := context.WithCancel(context.Background())
return &App{
opts: opts,
store: NewMessageStoreWithMessages(initialMessages),
rootCtx: rootCtx,
rootCancel: rootCancel,
// cancelStep starts as a no-op so CancelCurrentStep() is always safe.
cancelStep: func() {},
}
}
// SetProgram registers the Bubble Tea program used to send events in
// interactive mode. Must be called before Run() in interactive mode.
func (a *App) SetProgram(p *tea.Program) {
a.mu.Lock()
defer a.mu.Unlock()
a.program = p
}
// --------------------------------------------------------------------------
// AppController interface
// --------------------------------------------------------------------------
// Run queues a prompt for execution. If the app is idle the prompt is
// executed immediately in a background goroutine; otherwise it is appended
// to the queue.
//
// Returns the current queue depth after the operation: 0 means the prompt
// was started immediately (or the app is closed), >0 means it was queued.
// The caller is responsible for updating any UI state (e.g. queue badge)
// based on the returned value — Run does NOT send events to the program,
// because it may be called synchronously from within Bubble Tea's Update
// loop where prog.Send would deadlock.
//
// Satisfies ui.AppController.
func (a *App) Run(prompt string) int {
return a.RunWithFiles(prompt, nil)
}
// RunWithFiles queues a multimodal prompt (text + image files) for execution.
// If the app is idle the prompt executes immediately; otherwise it is queued.
// Returns the current queue depth (0 = started immediately, >0 = queued).
//
// Satisfies ui.AppController.
func (a *App) RunWithFiles(prompt string, files []kit.LLMFilePart) int {
a.mu.Lock()
if a.closed {
a.mu.Unlock()
return 0
}
item := queueItem{Prompt: prompt, Files: files}
if a.busy {
a.queue = append(a.queue, item)
qLen := len(a.queue)
a.mu.Unlock()
return qLen
}
a.busy = true
a.wg.Add(1)
a.mu.Unlock()
go a.drainQueue(item)
return 0
}
// CancelCurrentStep cancels the currently executing agent step. Safe to call
// even when no step is running (it is a no-op in that case).
//
// Satisfies ui.AppController.
func (a *App) CancelCurrentStep() {
a.mu.Lock()
cancel := a.cancelStep
a.mu.Unlock()
cancel()
}
// IsBusy returns true when the agent is currently processing a turn.
func (a *App) IsBusy() bool {
a.mu.Lock()
defer a.mu.Unlock()
return a.busy
}
// Abort cancels the current agent step (if running) and clears the queue.
// Unlike InterruptAndSend, no new message is injected — the agent simply
// stops. Safe to call when idle (no-op).
func (a *App) Abort() {
a.mu.Lock()
a.queue = a.queue[:0]
cancel := a.cancelStep
a.mu.Unlock()
cancel()
}
// QueueLength returns the number of prompts currently waiting in the queue.
//
// Satisfies ui.AppController.
func (a *App) QueueLength() int {
a.mu.Lock()
defer a.mu.Unlock()
return len(a.queue)
}
// Steer injects a steering message into the currently running agent turn.
// If the agent is in a multi-step tool loop, the message is delivered after
// the current tool execution finishes but before the next LLM call (graceful
// mid-turn injection via Fantasy's PrepareStep). If the agent is streaming
// a text-only response (no pending tool calls), the message waits until the
// response completes and then executes as the next turn.
//
// If the agent is idle, the message starts executing immediately (same as Run).
//
// Returns the number of pending steer/queue items (0 = started immediately,
// >0 = injected/queued). The caller must update UI state based on the return
// value — Steer does NOT send events to the program to avoid deadlocking
// when called from within Update().
//
// Satisfies ui.AppController.
func (a *App) Steer(prompt string) int {
return a.SteerWithFiles(prompt, nil)
}
// SteerWithFiles injects a steering message with optional file attachments
// (e.g. pasted images) into the currently running agent turn. Behaves like
// Steer but includes file parts alongside the text.
//
// Satisfies ui.AppController.
func (a *App) SteerWithFiles(prompt string, files []kit.LLMFilePart) int {
a.mu.Lock()
if a.closed {
a.mu.Unlock()
return 0
}
if !a.busy {
// Not busy — start immediately, same as RunWithFiles().
item := queueItem{Prompt: prompt, Files: files}
a.busy = true
a.wg.Add(1)
a.mu.Unlock()
go a.drainQueue(item)
return 0
}
a.mu.Unlock()
// Agent is busy — inject via the SDK's steer channel. The message
// will be picked up by PrepareStep between agent steps (after tool
// execution, before next LLM call). If PrepareStep doesn't fire
// (text-only response), drainQueue will pick it up after the turn.
if a.opts.Kit != nil {
a.opts.Kit.InjectSteerWithFiles(prompt, files)
}
return 1
}
// InterruptAndSend cancels the current agent step (if running), clears the
// queue, and sends a new message that will execute as soon as the current
// step finishes cancelling. If the agent is idle, the message executes
// immediately. This is the hard-cancel delivery mode used by extensions'
// CancelAndSend.
func (a *App) InterruptAndSend(prompt string) {
a.mu.Lock()
if a.closed {
a.mu.Unlock()
return
}
item := queueItem{Prompt: prompt}
if !a.busy {
// Not busy — start immediately, same as Run().
a.busy = true
a.wg.Add(1)
a.mu.Unlock()
go a.drainQueue(item)
return
}
// Agent is busy: clear queue, insert steer message, then cancel.
a.queue = []queueItem{item}
cancel := a.cancelStep
a.mu.Unlock()
cancel()
}
// ClearQueue discards all queued prompts. The caller is responsible for
// updating any UI state (e.g. queue badge) — ClearQueue does NOT send
// events to the program, because it may be called synchronously from
// within Bubble Tea's Update loop where prog.Send would deadlock.
//
// Satisfies ui.AppController.
func (a *App) ClearQueue() {
a.mu.Lock()
a.queue = a.queue[:0]
a.mu.Unlock()
}
// ClearMessages empties the conversation history. When a tree session is
// active the leaf pointer is reset to the root, creating an implicit branch.
//
// Satisfies ui.AppController.
func (a *App) ClearMessages() {
a.store.Clear()
if a.opts.TreeSession != nil {
a.opts.TreeSession.ResetLeaf()
}
}
// ReloadMessagesFromTree clears the in-memory message store and reloads it
// from the tree session's current branch. Unlike ClearMessages, this does NOT
// reset the tree session's leaf pointer. Used after Branch() to sync the
// store with the new branch position.
func (a *App) ReloadMessagesFromTree() {
a.store.Clear()
if a.opts.TreeSession != nil {
a.store.Replace(a.opts.TreeSession.GetLLMMessages())
}
}
// GetTreeSession returns the tree session manager, or nil if not configured.
func (a *App) GetTreeSession() *session.TreeManager {
return a.opts.TreeSession
}
// SwitchTreeSession replaces the active tree session with a new one and
// reloads the in-memory message store from the new session's messages.
// The old tree session is closed. Used by /resume to switch sessions.
func (a *App) SwitchTreeSession(ts *session.TreeManager) {
// Close old session.
if old := a.opts.TreeSession; old != nil {
_ = old.Close()
}
a.opts.TreeSession = ts
// Also update the kit SDK's tree session so messages are persisted correctly.
if a.opts.Kit != nil {
a.opts.Kit.SetTreeSession(ts)
}
// Reload messages from new session.
a.store.Clear()
if ts != nil {
a.store.Replace(ts.GetLLMMessages())
}
}
// AddContextMessage adds a user-role message to the conversation history
// without triggering an LLM response. Used by the ! shell command prefix
// to inject command output into context so the LLM can reference it in
// subsequent turns.
//
// Satisfies ui.AppController.
func (a *App) AddContextMessage(text string) {
kitMsg := fantasy.NewUserMessage(text)
a.store.Add(kitMsg)
// Persist to tree session if active.
if ts := a.opts.TreeSession; ts != nil {
_, _ = ts.AppendLLMMessage(fantasy.NewUserMessage(text))
}
}
// CompactConversation summarises older messages to free context space. It
// returns an error synchronously if compaction cannot start (agent busy or
// app closed). The actual compaction runs in a background goroutine and
// delivers CompactCompleteEvent or CompactErrorEvent through the registered
// tea.Program. customInstructions is optional text appended to the summary
// prompt (e.g. "Focus on the API design decisions").
//
// Any prompts queued via Run/RunWithFiles or steering messages injected via
// Steer/SteerWithFiles while compaction is running are flushed automatically
// once compaction completes (see releaseBusyAfterCompact).
//
// Satisfies ui.AppController.
func (a *App) CompactConversation(customInstructions string) error {
a.mu.Lock()
if a.closed {
a.mu.Unlock()
return fmt.Errorf("app is closed")
}
if a.busy {
a.mu.Unlock()
return fmt.Errorf("cannot compact while the agent is working")
}
if a.opts.Kit == nil {
a.mu.Unlock()
return fmt.Errorf("SDK instance not available")
}
a.busy = true
a.wg.Add(1)
a.mu.Unlock()
go func() {
defer a.wg.Done()
defer a.releaseBusyAfterCompact()
// Subscribe to SDK events for streaming compaction summary to the TUI.
sendFn := func(msg tea.Msg) {
if a.program != nil {
a.program.Send(msg)
}
}
unsub := a.subscribeSDKEvents(sendFn, nil)
defer unsub()
result, err := a.opts.Kit.Compact(a.rootCtx, nil, customInstructions)
if err != nil {
a.sendEvent(CompactErrorEvent{Err: err})
return
}
if result == nil {
a.sendEvent(CompactErrorEvent{Err: fmt.Errorf("nothing to compact")})
return
}
// Sync in-memory store with the compacted session.
if a.opts.TreeSession != nil {
a.store.Replace(a.opts.TreeSession.GetLLMMessages())
}
a.sendEvent(CompactCompleteEvent{
Summary: result.Summary,
OriginalTokens: result.OriginalTokens,
CompactedTokens: result.CompactedTokens,
MessagesRemoved: result.MessagesRemoved,
})
}()
return nil
}
// CompactAsync is like CompactConversation but calls onComplete/onError
// callbacks instead of sending TUI events. Used by the extension API's
// ctx.Compact() which needs callback-based notification.
//
// Like CompactConversation, any prompts/steer messages received during
// compaction are flushed automatically once compaction finishes.
func (a *App) CompactAsync(customInstructions string, onComplete func(), onError func(string)) error {
a.mu.Lock()
if a.closed {
a.mu.Unlock()
return fmt.Errorf("app is closed")
}
if a.busy {
a.mu.Unlock()
return fmt.Errorf("cannot compact while the agent is working")
}
if a.opts.Kit == nil {
a.mu.Unlock()
return fmt.Errorf("SDK instance not available")
}
a.busy = true
a.wg.Add(1)
a.mu.Unlock()
go func() {
defer a.wg.Done()
defer a.releaseBusyAfterCompact()
// Subscribe to SDK events for streaming compaction summary to the TUI.
sendFn := func(msg tea.Msg) {
if a.program != nil {
a.program.Send(msg)
}
}
unsub := a.subscribeSDKEvents(sendFn, nil)
defer unsub()
result, err := a.opts.Kit.Compact(a.rootCtx, nil, customInstructions)
if err != nil {
a.sendEvent(CompactErrorEvent{Err: err})
if onError != nil {
onError(err.Error())
}
return
}
if result == nil {
a.sendEvent(CompactErrorEvent{Err: fmt.Errorf("nothing to compact")})
if onError != nil {
onError("nothing to compact")
}
return
}
// Sync in-memory store with the compacted session.
if a.opts.TreeSession != nil {
a.store.Replace(a.opts.TreeSession.GetLLMMessages())
}
a.sendEvent(CompactCompleteEvent{
Summary: result.Summary,
OriginalTokens: result.OriginalTokens,
CompactedTokens: result.CompactedTokens,
MessagesRemoved: result.MessagesRemoved,
})
if onComplete != nil {
onComplete()
}
}()
return nil
}
// releaseBusyAfterCompact is the deferred tail that runs at the end of every
// compaction goroutine (success, error, or panic-after-recover paths). It
// flips a.busy back to false, but before doing so it checks whether any
// prompts piled up while compaction was running:
//
// - Run/RunWithFiles append to a.queue when a.busy is set.
// - Steer/SteerWithFiles deposit messages into the SDK steer channel via
// Kit.InjectSteerWithFiles when a.busy is set.
//
// Without this hand-off the queue would sit idle until the user submits
// another prompt — see issue #27. If we find anything pending we keep busy
// set, splice the steer messages to the front of the queue, and start a
// fresh drainQueue goroutine to deliver them as a single batched turn.
func (a *App) releaseBusyAfterCompact() {
// Pull steer messages outside the app mutex; DrainSteer takes its own
// internal lock and we don't want to nest the two. The test seam
// (a.steerDrainFn) takes precedence so unit tests can inject fake
// steer items without a real *kit.Kit.
var steerItems []queueItem
switch {
case a.steerDrainFn != nil:
steerItems = a.steerDrainFn()
case a.opts.Kit != nil:
if leftover := a.opts.Kit.DrainSteer(); len(leftover) > 0 {
steerItems = make([]queueItem, len(leftover))
for i, sm := range leftover {
steerItems[i] = queueItem{Prompt: sm.Text, Files: sm.Files}
}
}
}
a.mu.Lock()
// If the app was closed while compaction was running, drop everything
// and just clear busy. Run/Steer would have rejected new items already
// after Close(), but this guards against in-flight items that slipped
// in just before closed was set.
if a.closed {
a.queue = a.queue[:0]
a.busy = false
a.mu.Unlock()
return
}
// Combine steer-channel items (front) with the in-memory queue (back).
// Steer messages are placed first so they retain their "act now"
// semantics relative to ordinary queued prompts that arrived later.
pending := append(steerItems, a.queue...)
a.queue = a.queue[:0]
if len(pending) == 0 {
a.busy = false
a.mu.Unlock()
return
}
// Hand off to drainQueue: it will pick up the first item directly and
// scoop the rest from a.queue on its first iteration.
first := pending[0]
if len(pending) > 1 {
a.queue = append(a.queue, pending[1:]...)
}
// Stay busy across the goroutine swap.
a.wg.Add(1)
a.mu.Unlock()
// Notify the UI that steer-channel messages were consumed so the
// steering badge can clear; ordinary queued prompts will be reflected
// by the QueueUpdatedEvent that drainQueue emits as it picks them up.
if len(steerItems) > 0 {
a.sendEvent(SteerConsumedEvent{})
}
go a.drainQueue(first)
}
// --------------------------------------------------------------------------
// Non-interactive execution
// --------------------------------------------------------------------------
// RunOnce executes a single agent step synchronously and prints the final
// response text to stdout. No intermediate events are emitted. Blocks until
// the step completes or ctx is cancelled.
func (a *App) RunOnce(ctx context.Context, prompt string) error {
return a.RunOnceWithFiles(ctx, prompt, nil)
}
// RunOnceWithFiles executes a single agent step synchronously with optional
// multimodal file attachments. Prints the response to stdout and returns.
func (a *App) RunOnceWithFiles(ctx context.Context, prompt string, files []kit.LLMFilePart) error {
stepCtx, cancel := context.WithCancel(ctx)
defer cancel()
a.mu.Lock()
a.cancelStep = cancel
a.mu.Unlock()
result, err := a.executeStep(stepCtx, prompt, nil, files)
if err != nil {
return err
}
if result.Response != "" {
fmt.Println(result.Response)
}
return nil
}
// RunOnceResult executes a single agent step synchronously and returns the
// full TurnResult without printing anything. This is used by --json mode to
// capture structured output for serialization.
func (a *App) RunOnceResult(ctx context.Context, prompt string) (*kit.TurnResult, error) {
return a.RunOnceResultWithFiles(ctx, prompt, nil)
}
// RunOnceResultWithFiles executes a single agent step synchronously with
// optional multimodal file attachments and returns the full TurnResult.
func (a *App) RunOnceResultWithFiles(ctx context.Context, prompt string, files []kit.LLMFilePart) (*kit.TurnResult, error) {
stepCtx, cancel := context.WithCancel(ctx)
defer cancel()
a.mu.Lock()
a.cancelStep = cancel
a.mu.Unlock()
return a.executeStep(stepCtx, prompt, nil, files)
}
// RunOnceWithDisplay executes a single agent step synchronously, sending
// intermediate display events (spinner, tool calls, streaming chunks, etc.)
// to eventFn. This is the non-TUI equivalent of the interactive Run() path —
// used by non-interactive --prompt mode when output is needed.
//
// The eventFn receives the same event types as the Bubble Tea TUI
// (SpinnerEvent, ToolCallStartedEvent, StreamChunkEvent, StepCompleteEvent,
// etc.) and is responsible for rendering them.
//
// Blocks until the step completes or ctx is cancelled.
func (a *App) RunOnceWithDisplay(ctx context.Context, prompt string, eventFn func(tea.Msg)) error {
return a.RunOnceWithDisplayAndFiles(ctx, prompt, eventFn, nil)
}
// RunOnceWithDisplayAndFiles executes a single agent step synchronously with
// optional multimodal file attachments, sending intermediate display events.
func (a *App) RunOnceWithDisplayAndFiles(ctx context.Context, prompt string, eventFn func(tea.Msg), files []kit.LLMFilePart) error {
stepCtx, cancel := context.WithCancel(ctx)
defer cancel()
a.mu.Lock()
a.cancelStep = cancel
a.mu.Unlock()
result, err := a.executeStep(stepCtx, prompt, eventFn, files)
if err != nil {
return err
}
// Send step complete so the display handler can render the final response.
if eventFn != nil {
eventFn(StepCompleteEvent{ResponseText: result.Response})
}
return nil
}
// --------------------------------------------------------------------------
// Close
// --------------------------------------------------------------------------
// Close signals all background goroutines to stop and waits for them to finish.
// After Close() returns it is safe to call Kit.Close() / agent.Close().
func (a *App) Close() {
a.mu.Lock()
if a.closed {
a.mu.Unlock()
return
}
a.closed = true
cancel := a.cancelStep
a.mu.Unlock()
// Cancel any in-flight step and the root context.
cancel()
a.rootCancel()
// Wait for background goroutines.
a.wg.Wait()
// Clean up empty session file on shutdown.
if ts := a.opts.TreeSession; ts != nil && ts.IsEmpty() {
if path := ts.GetFilePath(); path != "" {
_ = os.Remove(path)
}
}
}
// --------------------------------------------------------------------------
// Internal: queue drain loop
// --------------------------------------------------------------------------
// drainQueue runs in a goroutine. It collects all queued items (including the
// first one) and submits them together as a single batch. This ensures that
// when multiple messages are queued while the agent is working, they are all
// submitted together in one turn rather than sequentially.
// Must be called with a.busy == true and a.wg incremented.
func (a *App) drainQueue(first queueItem) {
defer a.wg.Done()
// Collect all items to process in this batch
var items []queueItem
items = append(items, first)
// Process batches until no more items are queued
for {
// Drain the queue to collect any pending items
a.mu.Lock()
items = append(items, a.queue...)
a.queue = a.queue[:0] // Clear the queue
a.mu.Unlock()
// Notify UI: all queued messages have been consumed into this batch.
a.sendEvent(QueueUpdatedEvent{Length: 0})
// Process all collected items as a single batch
a.runQueueBatch(items)
// Drain any unconsumed steer messages from the SDK channel.
// These arrive when the user steered during a text-only response
// (no tool calls, so PrepareStep didn't fire for a second step).
// They go to the front of the queue so they run next.
if a.opts.Kit != nil {
if leftover := a.opts.Kit.DrainSteer(); len(leftover) > 0 {
a.mu.Lock()
steerItems := make([]queueItem, len(leftover))
for i, sm := range leftover {
steerItems[i] = queueItem{Prompt: sm.Text, Files: sm.Files}
}
a.queue = append(steerItems, a.queue...)
a.mu.Unlock()
// Notify UI about the consumed steer messages.
a.sendEvent(SteerConsumedEvent{})
}
}
// Check if more items were queued while we were processing
a.mu.Lock()
hasMore := len(a.queue) > 0
if hasMore {
// Start a new batch with the newly queued items
items = a.queue
a.queue = a.queue[:0]
}
a.mu.Unlock()
if hasMore {
// Notify UI: these newly queued messages have been consumed into the next batch.
a.sendEvent(QueueUpdatedEvent{Length: 0})
}
if !hasMore {
// No more items, we're done
break
}
// Process the new batch
}
// Mark as no longer busy
a.mu.Lock()
a.busy = false
a.mu.Unlock()
}
// runQueueBatch executes multiple queue items as a single agent turn.
// All items are submitted together, and the agent responds once to the combined context.
func (a *App) runQueueBatch(items []queueItem) {
if len(items) == 0 {
return
}
// Create a per-step cancellable context.
stepCtx, cancel := context.WithCancel(a.rootCtx)
a.mu.Lock()
a.cancelStep = cancel
a.mu.Unlock()
defer cancel()
// Build event function that sends to the registered tea.Program (if any).
a.mu.Lock()
prog := a.program
a.mu.Unlock()
eventFn := func(msg tea.Msg) {
if prog != nil {
prog.Send(msg)
}
}
// Execute the batch
result, err := a.executeBatch(stepCtx, items, eventFn)
if err != nil {
if stepCtx.Err() != nil {
// Step was cancelled by the user (double-ESC). The SDK
// preserves the user message and any completed tool
// call/result pairs; only the in-progress message or tool
// call is discarded. Sync the in-memory store to match.
if ts := a.opts.TreeSession; ts != nil {
a.store.Replace(ts.GetLLMMessages())
}
a.sendEvent(StepCancelledEvent{})
return
}
a.sendEvent(StepErrorEvent{Err: err})
return
}
a.sendEvent(StepCompleteEvent{ResponseText: result.Response})
}
// --------------------------------------------------------------------------
// Internal: single agent step
// --------------------------------------------------------------------------
// executeStep runs a single agentic step by delegating to the SDK's
// PromptResult() (or PromptResultWithFiles for multimodal), which handles
// session persistence, hooks, extension events, and the generation loop.
func (a *App) executeStep(ctx context.Context, prompt string, eventFn func(tea.Msg), files []kit.LLMFilePart) (*kit.TurnResult, error) {
// Test hook: bypass SDK entirely.
if a.opts.PromptFunc != nil {
return a.opts.PromptFunc(ctx, prompt)
}
sendFn := func(msg tea.Msg) {
if eventFn != nil {
eventFn(msg)
}
}
// Subscribe to SDK events for TUI rendering and per-step usage updates.
// The subscription is temporary — it lives only for the duration of this step.
var sawStepUsage atomic.Bool
unsub := a.subscribeSDKEvents(sendFn, &sawStepUsage)
defer unsub()
// Show spinner while the agent works.
sendFn(SpinnerEvent{Show: true})
var result *kit.TurnResult
var err error
if len(files) > 0 {
result, err = a.opts.Kit.PromptResultWithFiles(ctx, prompt, files)
} else {
result, err = a.opts.Kit.PromptResult(ctx, prompt)
}
if err != nil {
return nil, err
}
// Sync in-memory store with the SDK's authoritative conversation.
a.store.Replace(result.Messages)
// Update usage tracker. If per-step usage was already recorded from
// StepUsageEvent callbacks, avoid double-counting totals.
a.updateUsageFromTurnResult(result, prompt, sawStepUsage.Load())
return result, nil
}
// executeBatch runs a batch of queue items as a single agent step by delegating
// to the SDK's PromptResultWithMessages(), which handles session persistence,
// hooks, extension events, and the generation loop.
func (a *App) executeBatch(ctx context.Context, items []queueItem, eventFn func(tea.Msg)) (*kit.TurnResult, error) {
// Test hook: bypass SDK entirely (single item only for test compatibility).
if a.opts.PromptFunc != nil {
if len(items) == 1 {
return a.opts.PromptFunc(ctx, items[0].Prompt)
}
// For batch mode with PromptFunc, just use the first item
return a.opts.PromptFunc(ctx, items[0].Prompt)
}
sendFn := func(msg tea.Msg) {
if eventFn != nil {
eventFn(msg)
}
}
// Subscribe to SDK events for TUI rendering and per-step usage updates.
// The subscription is temporary — it lives only for the duration of this step.
var sawStepUsage atomic.Bool
unsub := a.subscribeSDKEvents(sendFn, &sawStepUsage)
defer unsub()
// Show spinner while the agent works.
sendFn(SpinnerEvent{Show: true})
// Check if any items have file attachments
hasFiles := false
for _, item := range items {
if len(item.Files) > 0 {
hasFiles = true
break
}
}
var result *kit.TurnResult
var err error
if len(items) == 1 {
// Single item: use the original path for compatibility
item := items[0]
if len(item.Files) > 0 || hasFiles {
result, err = a.opts.Kit.PromptResultWithFiles(ctx, item.Prompt, item.Files)
} else {
result, err = a.opts.Kit.PromptResult(ctx, item.Prompt)
}
} else {
// Multiple items: batch them together
var messages []string
for _, item := range items {
messages = append(messages, item.Prompt)
}
// File attachments are not supported in batch mode; fall back to
// processing only the first item that carries files.
if hasFiles {
// If files exist, fall back to processing just the first item with files
for _, item := range items {
if len(item.Files) > 0 {
result, err = a.opts.Kit.PromptResultWithFiles(ctx, item.Prompt, item.Files)
break
}
}
} else {
result, err = a.opts.Kit.PromptResultWithMessages(ctx, messages)
}
}
if err != nil {
return nil, err
}
// Sync in-memory store with the SDK's authoritative conversation.
a.store.Replace(result.Messages)
// Update usage tracker (using last item's prompt for fallback estimation).
// If per-step usage was already recorded from StepUsageEvent callbacks,
// avoid double-counting totals.
a.updateUsageFromTurnResult(result, items[len(items)-1].Prompt, sawStepUsage.Load())
return result, nil
}
// sendEvent sends a tea.Msg to the registered program if one is set.
// Must NOT be called with a.mu held (to avoid deadlock with the program).
func (a *App) sendEvent(msg tea.Msg) {
a.mu.Lock()
prog := a.program
a.mu.Unlock()
if prog != nil {
prog.Send(msg)
}
}
// subscribeSDKEvents registers temporary SDK event subscribers that convert
// SDK events to tea.Msg events and dispatch them via sendFn. When stepUsageSeen
// is provided, it is set to true after any non-zero StepUsageEvent is observed.
// Returns an unsubscribe function that removes all listeners.
func (a *App) subscribeSDKEvents(sendFn func(tea.Msg), stepUsageSeen *atomic.Bool) func() {
k := a.opts.Kit
var unsubs []func()
unsubs = append(unsubs, k.Subscribe(func(e kit.Event) {
switch ev := e.(type) {
case kit.ToolCallEvent:
sendFn(ToolCallStartedEvent{ToolCallID: ev.ToolCallID, ToolName: ev.ToolName, ToolArgs: ev.ToolArgs})
case kit.ToolCallStartEvent:
sendFn(ToolCallInputStartEvent{ToolCallID: ev.ToolCallID, ToolName: ev.ToolName, ToolKind: ev.ToolKind})
case kit.ToolCallDeltaEvent:
sendFn(ToolCallInputDeltaEvent{ToolCallID: ev.ToolCallID, Delta: ev.Delta})
case kit.ToolCallEndEvent:
sendFn(ToolCallInputEndEvent{ToolCallID: ev.ToolCallID})
case kit.ToolExecutionStartEvent:
sendFn(ToolExecutionEvent{ToolCallID: ev.ToolCallID, ToolName: ev.ToolName, ToolArgs: ev.ToolArgs, IsStarting: true})
case kit.ToolExecutionEndEvent:
sendFn(ToolExecutionEvent{ToolCallID: ev.ToolCallID, ToolName: ev.ToolName, IsStarting: false})
case kit.ToolResultEvent:
sendFn(ToolResultEvent{
ToolCallID: ev.ToolCallID, ToolName: ev.ToolName, ToolArgs: ev.ToolArgs,
Result: ev.Result, IsError: ev.IsError,
})
case kit.ToolCallContentEvent:
sendFn(ToolCallContentEvent{Content: ev.Content})
case kit.ResponseEvent:
sendFn(ResponseCompleteEvent{Content: ev.Content})
case kit.MessageUpdateEvent:
sendFn(StreamChunkEvent{Content: ev.Chunk})
case kit.ReasoningDeltaEvent:
sendFn(ReasoningChunkEvent{Delta: ev.Delta})
case kit.ReasoningCompleteEvent:
sendFn(ReasoningCompleteEvent{})
case kit.ToolOutputEvent:
sendFn(ToolOutputEvent{
ToolCallID: ev.ToolCallID,
ToolName: ev.ToolName,
Chunk: ev.Chunk,
IsStderr: ev.IsStderr,
})
case kit.SteerConsumedEvent:
sendFn(SteerConsumedEvent{})
case kit.StepUsageEvent:
a.recordStepUsage(ev, stepUsageSeen, sendFn)
case kit.PasswordPromptEvent:
// Convert SDK PasswordPromptEvent to app PasswordPromptEvent
// The TUI will handle this and send the response back
responseCh := make(chan PasswordPromptResponse, 1)
sendFn(PasswordPromptEvent{
Prompt: ev.Prompt,
ResponseCh: responseCh,
})
// Wait for TUI response and forward to SDK
resp := <-responseCh
ev.ResponseCh <- kit.PasswordPromptResponse{
Password: resp.Password,
Cancelled: resp.Cancelled,
}
case kit.TurnEndEvent:
a.handleTurnEnd(ev, sendFn)
}
}))
return func() {
for _, unsub := range unsubs {
unsub()
}
}
}
// handleTurnEnd inspects a turn's final StopReason and surfaces actionable
// feedback to the user when the turn ended in a state they can act on.
//
// Today the only surfaced case is FinishReasonLength — the model hit its
// configured max_output_tokens budget and the reply was truncated. Without
// this banner the TUI used to swallow the truncation silently, leading to
// "ghost" cut-offs with no indication of why.
//
// Separated from subscribeSDKEvents so tests can exercise it directly via a
// stubbed sendFn without standing up a full Kit.
func (a *App) handleTurnEnd(ev kit.TurnEndEvent, sendFn func(tea.Msg)) {
if sendFn == nil {
return
}
if ev.StopReason != kit.FinishReasonLength {
return
}
sendFn(ExtensionPrintEvent{
Level: "info",
Text: a.formatMaxTokensTruncatedMessage(),
})
}
// formatMaxTokensTruncatedMessage builds the user-facing explanation for a
// truncated turn. It reports the active max_output_tokens budget and, when
// known, the model's catalog output ceiling so the user can judge how much
// headroom is available.
func (a *App) formatMaxTokensTruncatedMessage() string {
k := a.opts.Kit
if k == nil {
// Extremely early / test-stub case: still emit a useful generic hint.
return "⚠ Response truncated: the model hit the configured max_output_tokens limit. " +
"Raise it with --max-tokens N, KIT_MAX_TOKENS=N, or per-model " +
"modelSettings[provider/model].maxTokens in config."
}
current := k.MaxTokens()
ceiling := k.MaxOutputLimit()
model := k.GetModelString()
msg := "⚠ Response truncated: "
if model != "" {
msg += fmt.Sprintf("%s hit the configured max_output_tokens limit", model)
} else {
msg += "the model hit the configured max_output_tokens limit"
}
if current > 0 {
msg += fmt.Sprintf(" (%d)", current)
}
msg += "."
if ceiling > 0 && current > 0 && ceiling > current {
msg += fmt.Sprintf(" This model supports up to %d output tokens.", ceiling)
}
msg += "\n\nRaise it with --max-tokens N, KIT_MAX_TOKENS=N, " +
"or per-model modelSettings[provider/model].maxTokens in your config. " +
"Re-run the last prompt after raising it to get the full response."
return msg
}
// QuitFromExtension triggers a graceful shutdown. In interactive mode it
// sends a tea.QuitMsg to the program so the TUI exits cleanly. In
// non-interactive mode it cancels the root context, stopping any in-flight
// step. Safe to call from any goroutine; idempotent.
func (a *App) QuitFromExtension() {
a.mu.Lock()
prog := a.program
a.mu.Unlock()
if prog != nil {
prog.Send(tea.QuitMsg{})
return
}
// Non-interactive: cancel the root context.
a.rootCancel()
}
// PrintFromExtension outputs text from an extension to the user. The level
// controls styling: "" for plain text, "info" for a system message block,
// "error" for an error block. In interactive mode it sends an
// ExtensionPrintEvent through the program so the TUI can render it with the
// appropriate renderer. In non-interactive mode it falls back to stderr with
// a level prefix so errors are distinguishable from plain output.
func (a *App) PrintFromExtension(level, text string) {
a.mu.Lock()
prog := a.program
a.mu.Unlock()
if prog != nil {
prog.Send(ExtensionPrintEvent{Text: text, Level: level})
return
}
// Non-interactive fallback: write to stderr with a level prefix so that
// errors and info messages are distinguishable from plain output.
switch level {
case "error":
fmt.Fprintf(os.Stderr, "[ERROR] %s\n", text)
case "info":
fmt.Fprintf(os.Stderr, "[INFO] %s\n", text)
default:
fmt.Println(text)
}
}
// SetEditorTextFromExtension sends an EditorTextSetEvent to the TUI to
// pre-fill the input editor. In non-interactive mode this is a no-op.
func (a *App) SetEditorTextFromExtension(text string) {
a.mu.Lock()
prog := a.program
a.mu.Unlock()
if prog != nil {
prog.Send(EditorTextSetEvent{Text: text})
}
}
// NotifyModelChanged sends a ModelChangedEvent to the TUI so it updates
// the model name in the status bar and message attribution.
func (a *App) NotifyModelChanged(provider, model string) {
a.mu.Lock()
prog := a.program
a.mu.Unlock()
if prog != nil {
prog.Send(ModelChangedEvent{ProviderName: provider, ModelName: model})
}
}
// NotifyWidgetUpdate sends a WidgetUpdateEvent to the TUI so it re-renders
// extension widgets. Called from the extension context's SetWidget/RemoveWidget
// closures. In non-interactive mode this is a no-op (widgets are TUI-only).
//
// Coalescing: if a WidgetUpdateEvent is already queued and not yet consumed
// by the TUI event loop, additional calls within the same ~16 ms window are
// dropped. This prevents fast extension tickers from flooding BubbleTea's
// mailbox with redundant re-render triggers.
func (a *App) NotifyWidgetUpdate() {
// Coalesce: only one pending update at a time.
if !a.widgetUpdatePending.CompareAndSwap(false, true) {
return
}
a.mu.Lock()
prog := a.program
a.mu.Unlock()
if prog != nil {
prog.Send(WidgetUpdateEvent{})
// Reset the pending flag after a short debounce so subsequent calls
// within the same render cycle are also coalesced, but new updates
// after the cycle are allowed through.
go func() {
time.Sleep(16 * time.Millisecond) // ~1 frame at 60 fps
a.widgetUpdatePending.Store(false)
}()
} else {
// No program registered (non-interactive mode); clear the flag so
// future calls are never permanently blocked.
a.widgetUpdatePending.Store(false)
}
}
// NotifyContentReload sends a ContentReloadEvent to the TUI so it refreshes
// prompt templates and skills from their provider callbacks. Called by file
// watchers when .md/.txt files change in prompt or skill directories.
// In non-interactive mode this is a no-op.
func (a *App) NotifyContentReload() {
a.mu.Lock()
prog := a.program
a.mu.Unlock()
if prog != nil {
prog.Send(ContentReloadEvent{})
}
}
// NotifyMCPToolsReady sends an MCPToolsReadyEvent to the TUI so it refreshes
// tool names and MCP tool count from provider callbacks. Called when background
// MCP tool loading completes. In non-interactive mode this is a no-op.
func (a *App) NotifyMCPToolsReady() {
a.mu.Lock()
prog := a.program
a.mu.Unlock()
if prog != nil {
prog.Send(MCPToolsReadyEvent{})
}
}
// NotifyMCPServerLoaded sends an MCPServerLoadedEvent to the TUI so it can
// display a system message when a single MCP server finishes loading. Called
// per server as background MCP tool loading progresses.
func (a *App) NotifyMCPServerLoaded(serverName string, toolCount int, err error) {
a.mu.Lock()
prog := a.program
a.mu.Unlock()
if prog != nil {
prog.Send(MCPServerLoadedEvent{
ServerName: serverName,
ToolCount: toolCount,
Error: err,
})
}
}
// SendEvent sends a tea.Msg to the registered program. Safe to call from
// any goroutine. No-op when no program is registered.
//
// Satisfies ui.AppController.
func (a *App) SendEvent(msg tea.Msg) {
a.sendEvent(msg)
}
// SendPromptRequest sends a PromptRequestEvent to the TUI so the user can
// respond interactively. In non-interactive mode (no program registered) it
// immediately responds with a cancelled result via the channel, ensuring the
// calling extension goroutine never blocks indefinitely.
func (a *App) SendPromptRequest(evt PromptRequestEvent) {
a.mu.Lock()
prog := a.program
a.mu.Unlock()
if prog != nil {
prog.Send(evt)
return
}
// Non-interactive fallback: immediately cancel.
if evt.ResponseCh != nil {
evt.ResponseCh <- PromptResponse{Cancelled: true}
}
}
// SendOverlayRequest sends an OverlayRequestEvent to the TUI so the user
// can interact with a modal overlay dialog. In non-interactive mode (no
// program registered) it immediately responds with a cancelled result via the
// channel, ensuring the calling extension goroutine never blocks indefinitely.
func (a *App) SendOverlayRequest(evt OverlayRequestEvent) {
a.mu.Lock()
prog := a.program
a.mu.Unlock()
if prog != nil {
prog.Send(evt)
return
}
// Non-interactive fallback: immediately cancel.
if evt.ResponseCh != nil {
evt.ResponseCh <- OverlayResponse{Cancelled: true}
}
}
// SuspendTUI temporarily releases the terminal from the TUI, runs the
// callback (which may spawn interactive subprocesses), and then restores
// the TUI. In non-interactive mode (no program registered) the callback
// runs directly with no terminal state changes.
//
// Safe to call from any goroutine (extension command handlers run in
// goroutines). Blocks until the callback returns.
func (a *App) SuspendTUI(callback func()) error {
a.mu.Lock()
prog := a.program
a.mu.Unlock()
if prog == nil {
// Non-interactive: just run the callback directly.
callback()
return nil
}
if err := prog.ReleaseTerminal(); err != nil {
return fmt.Errorf("release terminal: %w", err)
}
callback()
if err := prog.RestoreTerminal(); err != nil {
return fmt.Errorf("restore terminal: %w", err)
}
return nil
}
// PrintBlockFromExtension outputs a custom styled block from an extension.
func (a *App) PrintBlockFromExtension(opts extensions.PrintBlockOpts) {
a.mu.Lock()
prog := a.program
a.mu.Unlock()
if prog != nil {
prog.Send(ExtensionPrintEvent{
Text: opts.Text,
Level: "block",
BorderColor: opts.BorderColor,
Subtitle: opts.Subtitle,
})
return
}
// Non-interactive fallback: render a simple framed block to stderr so
// it is visually distinct from plain stdout output.
if opts.Subtitle != "" {
fmt.Fprintf(os.Stderr, "--- %s ---\n%s\n", opts.Subtitle, opts.Text)
} else {
fmt.Fprintf(os.Stderr, "---\n%s\n---\n", opts.Text)
}
}
// recordStepUsage applies token/cost usage reported for a completed step.
// Step usage events arrive even when a turn is later cancelled, so this keeps
// the usage widget accurate on all stop paths.
//
// Both session totals (cost, token counts) and the context window fill level
// are updated here so the status bar reflects progress after every LLM call,
// not just at the end of the full turn. Context fill monotonically increases
// across steps because each step re-sends the entire conversation plus any
// new tool results, so the numbers only go up.
//
// sendFn is called with a UsageUpdatedEvent to trigger a TUI re-render so
// the updated values are visible immediately.
func (a *App) recordStepUsage(ev kit.StepUsageEvent, stepUsageSeen *atomic.Bool, sendFn func(tea.Msg)) {
hasUsage := ev.InputTokens > 0 || ev.OutputTokens > 0 || ev.CacheReadTokens > 0 || ev.CacheWriteTokens > 0
if a.opts.Debug {
log.Printf("[DEBUG] recordStepUsage: hasUsage=%v input=%d output=%d cacheRead=%d cacheWrite=%d",
hasUsage, ev.InputTokens, ev.OutputTokens, ev.CacheReadTokens, ev.CacheWriteTokens)
}
if !hasUsage {
return
}
if stepUsageSeen != nil {
stepUsageSeen.Store(true)
}
if a.opts.UsageTracker == nil {
return
}
a.opts.UsageTracker.UpdateUsage(
int(ev.InputTokens),
int(ev.OutputTokens),
int(ev.CacheReadTokens),
int(ev.CacheWriteTokens),
)
// Update context window fill from this step's usage. Each step sends
// the full conversation to the LLM, so the reported token counts
// represent the actual context utilization at that point.
contextFill := int(ev.InputTokens) + int(ev.CacheReadTokens) + int(ev.CacheWriteTokens) + int(ev.OutputTokens)
if contextFill > 0 {
if a.opts.Debug {
log.Printf("[DEBUG] recordStepUsage: SetContextTokens=%d (Input=%d + CacheRead=%d + CacheWrite=%d + Output=%d)",
contextFill, ev.InputTokens, ev.CacheReadTokens, ev.CacheWriteTokens, ev.OutputTokens)
}
a.opts.UsageTracker.SetContextTokens(contextFill)
}
// Notify the TUI so it re-renders the status bar with updated values.
if sendFn != nil {
sendFn(UsageUpdatedEvent{})
}
}
// updateUsageFromTurnResult records token usage from an SDK TurnResult into the
// configured UsageTracker. Called once per turn after the turn completes.
//
// When sawStepUsage is true, totals were already accumulated incrementally via
// StepUsageEvent callbacks; in that case this method only updates context fill.
// Otherwise it falls back to TotalUsage from the API response.
//
// NOTE: We only use ACTUAL token counts from API responses for cost tracking.
// Estimation is never used for costs - only API-reported tokens are accurate.
func (a *App) updateUsageFromTurnResult(result *kit.TurnResult, userPrompt string, sawStepUsage bool) {
if a.opts.UsageTracker == nil || result == nil {
return
}
// Debug logging for token tracking
if a.opts.Debug {
if result.TotalUsage != nil {
log.Printf("[DEBUG] updateUsageFromTurnResult TotalUsage: input=%d output=%d cacheRead=%d cacheCreate=%d",
result.TotalUsage.InputTokens, result.TotalUsage.OutputTokens,
result.TotalUsage.CacheReadTokens, result.TotalUsage.CacheCreationTokens)
} else {
log.Printf("[DEBUG] updateUsageFromTurnResult: TotalUsage=nil")
}
if result.FinalUsage != nil {
log.Printf("[DEBUG] updateUsageFromTurnResult FinalUsage: input=%d output=%d cacheRead=%d cacheCreate=%d",
result.FinalUsage.InputTokens, result.FinalUsage.OutputTokens,
result.FinalUsage.CacheReadTokens, result.FinalUsage.CacheCreationTokens)
} else {
log.Printf("[DEBUG] updateUsageFromTurnResult: FinalUsage=nil")
}
log.Printf("[DEBUG] updateUsageFromTurnResult: sawStepUsage=%v", sawStepUsage)
}
// --- Accumulate cost/token totals for the session ---
// Only use actual API-reported tokens for cost tracking.
// If sawStepUsage is true, totals were already updated via StepUsageEvent.
// Check any token field > 0 (not just InputTokens) because cached prompts
// can result in InputTokens=0 while OutputTokens>0 (OpenAI-compatible behavior).
hasTotalUsage := result.TotalUsage != nil &&
(result.TotalUsage.InputTokens > 0 ||
result.TotalUsage.OutputTokens > 0 ||
result.TotalUsage.CacheReadTokens > 0 ||
result.TotalUsage.CacheCreationTokens > 0)
if a.opts.Debug {
log.Printf("[DEBUG] updateUsageFromTurnResult: hasTotalUsage=%v", hasTotalUsage)
}
if !sawStepUsage && hasTotalUsage {
if a.opts.Debug {
log.Printf("[DEBUG] updateUsageFromTurnResult: calling UpdateUsage input=%d output=%d cacheRead=%d cacheCreate=%d",
result.TotalUsage.InputTokens, result.TotalUsage.OutputTokens,
result.TotalUsage.CacheReadTokens, result.TotalUsage.CacheCreationTokens)
}
a.opts.UsageTracker.UpdateUsage(
int(result.TotalUsage.InputTokens),
int(result.TotalUsage.OutputTokens),
int(result.TotalUsage.CacheReadTokens),
int(result.TotalUsage.CacheCreationTokens),
)
}
// --- Context window fill (drives the % bar) ---
// Calculate context fill from the LAST API call's usage. The context
// window is filled by everything sent to and received from the model:
//
// InputTokens — non-cached input (may be small with prompt caching)
// CacheReadTokens — input tokens served from cache
// CacheCreationTokens — input tokens written to cache this call
// OutputTokens — assistant output (becomes input next turn)
//
// With Anthropic prompt caching, InputTokens can drop to near-zero while
// CacheReadTokens holds the bulk of the context. We must sum all four to
// get the true context window utilization.
//
// We use FinalUsage (last step only), NOT TotalUsage, because TotalUsage
// sums across all tool-calling steps — and each step re-sends the full
// conversation, so TotalUsage massively overstates the actual window fill.
if result.FinalUsage != nil {
u := result.FinalUsage
contextFill := int(u.InputTokens) + int(u.CacheReadTokens) + int(u.CacheCreationTokens) + int(u.OutputTokens)
if contextFill > 0 {
if a.opts.Debug {
log.Printf("[DEBUG] updateUsageFromTurnResult: SetContextTokens=%d (Input=%d + CacheRead=%d + CacheCreate=%d + Output=%d)",
contextFill, u.InputTokens, u.CacheReadTokens, u.CacheCreationTokens, u.OutputTokens)
}
a.opts.UsageTracker.SetContextTokens(contextFill)
}
}
}