mirror of
https://github.com/mark3labs/kit.git
synced 2026-06-14 03:30:26 +00:00
8823977612
- Add unexported steerDrainFn test seam on App so unit tests can inject fake steer items without standing up a full *kit.Kit (Options.Kit is a concrete struct, not an interface). - releaseBusyAfterCompact now prefers the seam over Kit.DrainSteer via a small switch; production behaviour is unchanged when the field is nil. - Add TestReleaseBusyAfterCompact_splicesSteerAheadOfQueue, which pre-populates both fake steer items and ordinary queue prompts, invokes releaseBusyAfterCompact, and asserts the first dispatched prompt is the steer item — proving steer messages retain 'act now' priority and that drainQueue is actually launched (the bug from #27).
1460 lines
47 KiB
Go
1460 lines
47 KiB
Go
package app
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
tea "charm.land/bubbletea/v2"
|
|
"charm.land/fantasy"
|
|
|
|
"github.com/mark3labs/kit/internal/extensions"
|
|
"github.com/mark3labs/kit/internal/session"
|
|
kit "github.com/mark3labs/kit/pkg/kit"
|
|
)
|
|
|
|
// queueItem holds a prompt and optional image attachments for the execution queue.
|
|
type queueItem struct {
|
|
Prompt string
|
|
Files []kit.LLMFilePart
|
|
}
|
|
|
|
// App is the application-layer orchestrator. It owns the agentic loop,
|
|
// conversation history (via MessageStore), and queue management. It is
|
|
// designed to be created once per session and reused across multiple prompts.
|
|
//
|
|
// In interactive mode the caller creates a tea.Program and registers it via
|
|
// SetProgram; App then sends events to it as agent work progresses.
|
|
//
|
|
// In non-interactive mode the caller uses RunOnce, which writes the response
|
|
// directly to an io.Writer.
|
|
//
|
|
// App satisfies the ui.AppController interface defined in internal/ui/model.go:
|
|
//
|
|
// Run(prompt string) int
|
|
// CancelCurrentStep()
|
|
// QueueLength() int
|
|
// ClearQueue()
|
|
// ClearMessages()
|
|
type App struct {
|
|
opts Options
|
|
|
|
// store holds the conversation history.
|
|
store *MessageStore
|
|
|
|
// program is the Bubble Tea program used to send events in interactive mode.
|
|
// Nil in non-interactive mode.
|
|
program *tea.Program
|
|
|
|
// cancelStep cancels the current in-flight agent step. It is replaced on
|
|
// each new step and called by CancelCurrentStep().
|
|
cancelStep context.CancelFunc
|
|
|
|
// mu protects busy, queue, and cancelStep.
|
|
mu sync.Mutex
|
|
busy bool
|
|
queue []queueItem
|
|
|
|
// wg tracks in-flight goroutines; Close() waits on it.
|
|
wg sync.WaitGroup
|
|
|
|
// closed is set to true after Close() is called; new Run() calls are
|
|
// silently dropped.
|
|
closed bool
|
|
|
|
// rootCtx/rootCancel are used to signal shutdown to all goroutines.
|
|
rootCtx context.Context
|
|
rootCancel context.CancelFunc
|
|
|
|
// widgetUpdatePending is set to true when a WidgetUpdateEvent has been
|
|
// sent to the TUI but not yet consumed by its event loop. While the flag
|
|
// is set, subsequent NotifyWidgetUpdate calls are coalesced (dropped) to
|
|
// prevent fast extension tickers from flooding the BubbleTea mailbox with
|
|
// redundant re-render triggers. The flag is cleared after a short debounce
|
|
// (~1 frame) so new updates are always let through once the TUI has had a
|
|
// chance to process the pending event.
|
|
widgetUpdatePending atomic.Bool
|
|
|
|
// steerDrainFn is the test seam used by releaseBusyAfterCompact to pull
|
|
// any steer messages that arrived during compaction. In production it is
|
|
// nil and the helper falls back to a.opts.Kit.DrainSteer(); tests that
|
|
// need to exercise the steer-drain path without standing up a full
|
|
// *kit.Kit can set this field directly to inject fake items.
|
|
steerDrainFn func() []queueItem
|
|
}
|
|
|
|
// New creates a new App with the provided options and pre-loaded messages.
|
|
// initialMessages may be nil or empty for a fresh session.
|
|
func New(opts Options, initialMessages []kit.LLMMessage) *App {
|
|
rootCtx, rootCancel := context.WithCancel(context.Background())
|
|
return &App{
|
|
opts: opts,
|
|
store: NewMessageStoreWithMessages(initialMessages),
|
|
rootCtx: rootCtx,
|
|
rootCancel: rootCancel,
|
|
// cancelStep starts as a no-op so CancelCurrentStep() is always safe.
|
|
cancelStep: func() {},
|
|
}
|
|
}
|
|
|
|
// SetProgram registers the Bubble Tea program used to send events in
|
|
// interactive mode. Must be called before Run() in interactive mode.
|
|
func (a *App) SetProgram(p *tea.Program) {
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
a.program = p
|
|
}
|
|
|
|
// --------------------------------------------------------------------------
|
|
// AppController interface
|
|
// --------------------------------------------------------------------------
|
|
|
|
// Run queues a prompt for execution. If the app is idle the prompt is
|
|
// executed immediately in a background goroutine; otherwise it is appended
|
|
// to the queue.
|
|
//
|
|
// Returns the current queue depth after the operation: 0 means the prompt
|
|
// was started immediately (or the app is closed), >0 means it was queued.
|
|
// The caller is responsible for updating any UI state (e.g. queue badge)
|
|
// based on the returned value — Run does NOT send events to the program,
|
|
// because it may be called synchronously from within Bubble Tea's Update
|
|
// loop where prog.Send would deadlock.
|
|
//
|
|
// Satisfies ui.AppController.
|
|
func (a *App) Run(prompt string) int {
|
|
return a.RunWithFiles(prompt, nil)
|
|
}
|
|
|
|
// RunWithFiles queues a multimodal prompt (text + image files) for execution.
|
|
// If the app is idle the prompt executes immediately; otherwise it is queued.
|
|
// Returns the current queue depth (0 = started immediately, >0 = queued).
|
|
//
|
|
// Satisfies ui.AppController.
|
|
func (a *App) RunWithFiles(prompt string, files []kit.LLMFilePart) int {
|
|
a.mu.Lock()
|
|
|
|
if a.closed {
|
|
a.mu.Unlock()
|
|
return 0
|
|
}
|
|
|
|
item := queueItem{Prompt: prompt, Files: files}
|
|
|
|
if a.busy {
|
|
a.queue = append(a.queue, item)
|
|
qLen := len(a.queue)
|
|
a.mu.Unlock()
|
|
return qLen
|
|
}
|
|
|
|
a.busy = true
|
|
a.wg.Add(1)
|
|
a.mu.Unlock()
|
|
go a.drainQueue(item)
|
|
return 0
|
|
}
|
|
|
|
// CancelCurrentStep cancels the currently executing agent step. Safe to call
|
|
// even when no step is running (it is a no-op in that case).
|
|
//
|
|
// Satisfies ui.AppController.
|
|
func (a *App) CancelCurrentStep() {
|
|
a.mu.Lock()
|
|
cancel := a.cancelStep
|
|
a.mu.Unlock()
|
|
cancel()
|
|
}
|
|
|
|
// IsBusy returns true when the agent is currently processing a turn.
|
|
func (a *App) IsBusy() bool {
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
return a.busy
|
|
}
|
|
|
|
// Abort cancels the current agent step (if running) and clears the queue.
|
|
// Unlike InterruptAndSend, no new message is injected — the agent simply
|
|
// stops. Safe to call when idle (no-op).
|
|
func (a *App) Abort() {
|
|
a.mu.Lock()
|
|
a.queue = a.queue[:0]
|
|
cancel := a.cancelStep
|
|
a.mu.Unlock()
|
|
cancel()
|
|
}
|
|
|
|
// QueueLength returns the number of prompts currently waiting in the queue.
|
|
//
|
|
// Satisfies ui.AppController.
|
|
func (a *App) QueueLength() int {
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
return len(a.queue)
|
|
}
|
|
|
|
// Steer injects a steering message into the currently running agent turn.
|
|
// If the agent is in a multi-step tool loop, the message is delivered after
|
|
// the current tool execution finishes but before the next LLM call (graceful
|
|
// mid-turn injection via Fantasy's PrepareStep). If the agent is streaming
|
|
// a text-only response (no pending tool calls), the message waits until the
|
|
// response completes and then executes as the next turn.
|
|
//
|
|
// If the agent is idle, the message starts executing immediately (same as Run).
|
|
//
|
|
// Returns the number of pending steer/queue items (0 = started immediately,
|
|
// >0 = injected/queued). The caller must update UI state based on the return
|
|
// value — Steer does NOT send events to the program to avoid deadlocking
|
|
// when called from within Update().
|
|
//
|
|
// Satisfies ui.AppController.
|
|
func (a *App) Steer(prompt string) int {
|
|
return a.SteerWithFiles(prompt, nil)
|
|
}
|
|
|
|
// SteerWithFiles injects a steering message with optional file attachments
|
|
// (e.g. pasted images) into the currently running agent turn. Behaves like
|
|
// Steer but includes file parts alongside the text.
|
|
//
|
|
// Satisfies ui.AppController.
|
|
func (a *App) SteerWithFiles(prompt string, files []kit.LLMFilePart) int {
|
|
a.mu.Lock()
|
|
|
|
if a.closed {
|
|
a.mu.Unlock()
|
|
return 0
|
|
}
|
|
|
|
if !a.busy {
|
|
// Not busy — start immediately, same as RunWithFiles().
|
|
item := queueItem{Prompt: prompt, Files: files}
|
|
a.busy = true
|
|
a.wg.Add(1)
|
|
a.mu.Unlock()
|
|
go a.drainQueue(item)
|
|
return 0
|
|
}
|
|
|
|
a.mu.Unlock()
|
|
|
|
// Agent is busy — inject via the SDK's steer channel. The message
|
|
// will be picked up by PrepareStep between agent steps (after tool
|
|
// execution, before next LLM call). If PrepareStep doesn't fire
|
|
// (text-only response), drainQueue will pick it up after the turn.
|
|
if a.opts.Kit != nil {
|
|
a.opts.Kit.InjectSteerWithFiles(prompt, files)
|
|
}
|
|
return 1
|
|
}
|
|
|
|
// InterruptAndSend cancels the current agent step (if running), clears the
|
|
// queue, and sends a new message that will execute as soon as the current
|
|
// step finishes cancelling. If the agent is idle, the message executes
|
|
// immediately. This is the hard-cancel delivery mode used by extensions'
|
|
// CancelAndSend.
|
|
func (a *App) InterruptAndSend(prompt string) {
|
|
a.mu.Lock()
|
|
|
|
if a.closed {
|
|
a.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
item := queueItem{Prompt: prompt}
|
|
|
|
if !a.busy {
|
|
// Not busy — start immediately, same as Run().
|
|
a.busy = true
|
|
a.wg.Add(1)
|
|
a.mu.Unlock()
|
|
go a.drainQueue(item)
|
|
return
|
|
}
|
|
|
|
// Agent is busy: clear queue, insert steer message, then cancel.
|
|
a.queue = []queueItem{item}
|
|
cancel := a.cancelStep
|
|
a.mu.Unlock()
|
|
cancel()
|
|
}
|
|
|
|
// ClearQueue discards all queued prompts. The caller is responsible for
|
|
// updating any UI state (e.g. queue badge) — ClearQueue does NOT send
|
|
// events to the program, because it may be called synchronously from
|
|
// within Bubble Tea's Update loop where prog.Send would deadlock.
|
|
//
|
|
// Satisfies ui.AppController.
|
|
func (a *App) ClearQueue() {
|
|
a.mu.Lock()
|
|
a.queue = a.queue[:0]
|
|
a.mu.Unlock()
|
|
}
|
|
|
|
// ClearMessages empties the conversation history. When a tree session is
|
|
// active the leaf pointer is reset to the root, creating an implicit branch.
|
|
//
|
|
// Satisfies ui.AppController.
|
|
func (a *App) ClearMessages() {
|
|
a.store.Clear()
|
|
if a.opts.TreeSession != nil {
|
|
a.opts.TreeSession.ResetLeaf()
|
|
}
|
|
}
|
|
|
|
// ReloadMessagesFromTree clears the in-memory message store and reloads it
|
|
// from the tree session's current branch. Unlike ClearMessages, this does NOT
|
|
// reset the tree session's leaf pointer. Used after Branch() to sync the
|
|
// store with the new branch position.
|
|
func (a *App) ReloadMessagesFromTree() {
|
|
a.store.Clear()
|
|
if a.opts.TreeSession != nil {
|
|
a.store.Replace(a.opts.TreeSession.GetLLMMessages())
|
|
}
|
|
}
|
|
|
|
// GetTreeSession returns the tree session manager, or nil if not configured.
|
|
func (a *App) GetTreeSession() *session.TreeManager {
|
|
return a.opts.TreeSession
|
|
}
|
|
|
|
// SwitchTreeSession replaces the active tree session with a new one and
|
|
// reloads the in-memory message store from the new session's messages.
|
|
// The old tree session is closed. Used by /resume to switch sessions.
|
|
func (a *App) SwitchTreeSession(ts *session.TreeManager) {
|
|
// Close old session.
|
|
if old := a.opts.TreeSession; old != nil {
|
|
_ = old.Close()
|
|
}
|
|
a.opts.TreeSession = ts
|
|
// Also update the kit SDK's tree session so messages are persisted correctly.
|
|
if a.opts.Kit != nil {
|
|
a.opts.Kit.SetTreeSession(ts)
|
|
}
|
|
// Reload messages from new session.
|
|
a.store.Clear()
|
|
if ts != nil {
|
|
a.store.Replace(ts.GetLLMMessages())
|
|
}
|
|
}
|
|
|
|
// AddContextMessage adds a user-role message to the conversation history
|
|
// without triggering an LLM response. Used by the ! shell command prefix
|
|
// to inject command output into context so the LLM can reference it in
|
|
// subsequent turns.
|
|
//
|
|
// Satisfies ui.AppController.
|
|
func (a *App) AddContextMessage(text string) {
|
|
kitMsg := fantasy.NewUserMessage(text)
|
|
a.store.Add(kitMsg)
|
|
|
|
// Persist to tree session if active.
|
|
if ts := a.opts.TreeSession; ts != nil {
|
|
_, _ = ts.AppendLLMMessage(fantasy.NewUserMessage(text))
|
|
}
|
|
}
|
|
|
|
// CompactConversation summarises older messages to free context space. It
|
|
// returns an error synchronously if compaction cannot start (agent busy or
|
|
// app closed). The actual compaction runs in a background goroutine and
|
|
// delivers CompactCompleteEvent or CompactErrorEvent through the registered
|
|
// tea.Program. customInstructions is optional text appended to the summary
|
|
// prompt (e.g. "Focus on the API design decisions").
|
|
//
|
|
// Any prompts queued via Run/RunWithFiles or steering messages injected via
|
|
// Steer/SteerWithFiles while compaction is running are flushed automatically
|
|
// once compaction completes (see releaseBusyAfterCompact).
|
|
//
|
|
// Satisfies ui.AppController.
|
|
func (a *App) CompactConversation(customInstructions string) error {
|
|
a.mu.Lock()
|
|
if a.closed {
|
|
a.mu.Unlock()
|
|
return fmt.Errorf("app is closed")
|
|
}
|
|
if a.busy {
|
|
a.mu.Unlock()
|
|
return fmt.Errorf("cannot compact while the agent is working")
|
|
}
|
|
if a.opts.Kit == nil {
|
|
a.mu.Unlock()
|
|
return fmt.Errorf("SDK instance not available")
|
|
}
|
|
a.busy = true
|
|
a.wg.Add(1)
|
|
a.mu.Unlock()
|
|
|
|
go func() {
|
|
defer a.wg.Done()
|
|
defer a.releaseBusyAfterCompact()
|
|
|
|
// Subscribe to SDK events for streaming compaction summary to the TUI.
|
|
sendFn := func(msg tea.Msg) {
|
|
if a.program != nil {
|
|
a.program.Send(msg)
|
|
}
|
|
}
|
|
unsub := a.subscribeSDKEvents(sendFn, nil)
|
|
defer unsub()
|
|
|
|
result, err := a.opts.Kit.Compact(a.rootCtx, nil, customInstructions)
|
|
if err != nil {
|
|
a.sendEvent(CompactErrorEvent{Err: err})
|
|
return
|
|
}
|
|
if result == nil {
|
|
a.sendEvent(CompactErrorEvent{Err: fmt.Errorf("nothing to compact")})
|
|
return
|
|
}
|
|
|
|
// Sync in-memory store with the compacted session.
|
|
if a.opts.TreeSession != nil {
|
|
a.store.Replace(a.opts.TreeSession.GetLLMMessages())
|
|
}
|
|
|
|
a.sendEvent(CompactCompleteEvent{
|
|
Summary: result.Summary,
|
|
OriginalTokens: result.OriginalTokens,
|
|
CompactedTokens: result.CompactedTokens,
|
|
MessagesRemoved: result.MessagesRemoved,
|
|
})
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
// CompactAsync is like CompactConversation but calls onComplete/onError
|
|
// callbacks instead of sending TUI events. Used by the extension API's
|
|
// ctx.Compact() which needs callback-based notification.
|
|
//
|
|
// Like CompactConversation, any prompts/steer messages received during
|
|
// compaction are flushed automatically once compaction finishes.
|
|
func (a *App) CompactAsync(customInstructions string, onComplete func(), onError func(string)) error {
|
|
a.mu.Lock()
|
|
if a.closed {
|
|
a.mu.Unlock()
|
|
return fmt.Errorf("app is closed")
|
|
}
|
|
if a.busy {
|
|
a.mu.Unlock()
|
|
return fmt.Errorf("cannot compact while the agent is working")
|
|
}
|
|
if a.opts.Kit == nil {
|
|
a.mu.Unlock()
|
|
return fmt.Errorf("SDK instance not available")
|
|
}
|
|
a.busy = true
|
|
a.wg.Add(1)
|
|
a.mu.Unlock()
|
|
|
|
go func() {
|
|
defer a.wg.Done()
|
|
defer a.releaseBusyAfterCompact()
|
|
|
|
// Subscribe to SDK events for streaming compaction summary to the TUI.
|
|
sendFn := func(msg tea.Msg) {
|
|
if a.program != nil {
|
|
a.program.Send(msg)
|
|
}
|
|
}
|
|
unsub := a.subscribeSDKEvents(sendFn, nil)
|
|
defer unsub()
|
|
|
|
result, err := a.opts.Kit.Compact(a.rootCtx, nil, customInstructions)
|
|
if err != nil {
|
|
a.sendEvent(CompactErrorEvent{Err: err})
|
|
if onError != nil {
|
|
onError(err.Error())
|
|
}
|
|
return
|
|
}
|
|
if result == nil {
|
|
a.sendEvent(CompactErrorEvent{Err: fmt.Errorf("nothing to compact")})
|
|
if onError != nil {
|
|
onError("nothing to compact")
|
|
}
|
|
return
|
|
}
|
|
|
|
// Sync in-memory store with the compacted session.
|
|
if a.opts.TreeSession != nil {
|
|
a.store.Replace(a.opts.TreeSession.GetLLMMessages())
|
|
}
|
|
|
|
a.sendEvent(CompactCompleteEvent{
|
|
Summary: result.Summary,
|
|
OriginalTokens: result.OriginalTokens,
|
|
CompactedTokens: result.CompactedTokens,
|
|
MessagesRemoved: result.MessagesRemoved,
|
|
})
|
|
if onComplete != nil {
|
|
onComplete()
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
// releaseBusyAfterCompact is the deferred tail that runs at the end of every
|
|
// compaction goroutine (success, error, or panic-after-recover paths). It
|
|
// flips a.busy back to false, but before doing so it checks whether any
|
|
// prompts piled up while compaction was running:
|
|
//
|
|
// - Run/RunWithFiles append to a.queue when a.busy is set.
|
|
// - Steer/SteerWithFiles deposit messages into the SDK steer channel via
|
|
// Kit.InjectSteerWithFiles when a.busy is set.
|
|
//
|
|
// Without this hand-off the queue would sit idle until the user submits
|
|
// another prompt — see issue #27. If we find anything pending we keep busy
|
|
// set, splice the steer messages to the front of the queue, and start a
|
|
// fresh drainQueue goroutine to deliver them as a single batched turn.
|
|
func (a *App) releaseBusyAfterCompact() {
|
|
// Pull steer messages outside the app mutex; DrainSteer takes its own
|
|
// internal lock and we don't want to nest the two. The test seam
|
|
// (a.steerDrainFn) takes precedence so unit tests can inject fake
|
|
// steer items without a real *kit.Kit.
|
|
var steerItems []queueItem
|
|
switch {
|
|
case a.steerDrainFn != nil:
|
|
steerItems = a.steerDrainFn()
|
|
case a.opts.Kit != nil:
|
|
if leftover := a.opts.Kit.DrainSteer(); len(leftover) > 0 {
|
|
steerItems = make([]queueItem, len(leftover))
|
|
for i, sm := range leftover {
|
|
steerItems[i] = queueItem{Prompt: sm.Text, Files: sm.Files}
|
|
}
|
|
}
|
|
}
|
|
|
|
a.mu.Lock()
|
|
// If the app was closed while compaction was running, drop everything
|
|
// and just clear busy. Run/Steer would have rejected new items already
|
|
// after Close(), but this guards against in-flight items that slipped
|
|
// in just before closed was set.
|
|
if a.closed {
|
|
a.queue = a.queue[:0]
|
|
a.busy = false
|
|
a.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
// Combine steer-channel items (front) with the in-memory queue (back).
|
|
// Steer messages are placed first so they retain their "act now"
|
|
// semantics relative to ordinary queued prompts that arrived later.
|
|
pending := append(steerItems, a.queue...)
|
|
a.queue = a.queue[:0]
|
|
|
|
if len(pending) == 0 {
|
|
a.busy = false
|
|
a.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
// Hand off to drainQueue: it will pick up the first item directly and
|
|
// scoop the rest from a.queue on its first iteration.
|
|
first := pending[0]
|
|
if len(pending) > 1 {
|
|
a.queue = append(a.queue, pending[1:]...)
|
|
}
|
|
// Stay busy across the goroutine swap.
|
|
a.wg.Add(1)
|
|
a.mu.Unlock()
|
|
|
|
// Notify the UI that steer-channel messages were consumed so the
|
|
// steering badge can clear; ordinary queued prompts will be reflected
|
|
// by the QueueUpdatedEvent that drainQueue emits as it picks them up.
|
|
if len(steerItems) > 0 {
|
|
a.sendEvent(SteerConsumedEvent{})
|
|
}
|
|
|
|
go a.drainQueue(first)
|
|
}
|
|
|
|
// --------------------------------------------------------------------------
|
|
// Non-interactive execution
|
|
// --------------------------------------------------------------------------
|
|
|
|
// RunOnce executes a single agent step synchronously and prints the final
|
|
// response text to stdout. No intermediate events are emitted. Blocks until
|
|
// the step completes or ctx is cancelled.
|
|
func (a *App) RunOnce(ctx context.Context, prompt string) error {
|
|
return a.RunOnceWithFiles(ctx, prompt, nil)
|
|
}
|
|
|
|
// RunOnceWithFiles executes a single agent step synchronously with optional
|
|
// multimodal file attachments. Prints the response to stdout and returns.
|
|
func (a *App) RunOnceWithFiles(ctx context.Context, prompt string, files []kit.LLMFilePart) error {
|
|
stepCtx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
a.mu.Lock()
|
|
a.cancelStep = cancel
|
|
a.mu.Unlock()
|
|
|
|
result, err := a.executeStep(stepCtx, prompt, nil, files)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if result.Response != "" {
|
|
fmt.Println(result.Response)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RunOnceResult executes a single agent step synchronously and returns the
|
|
// full TurnResult without printing anything. This is used by --json mode to
|
|
// capture structured output for serialization.
|
|
func (a *App) RunOnceResult(ctx context.Context, prompt string) (*kit.TurnResult, error) {
|
|
return a.RunOnceResultWithFiles(ctx, prompt, nil)
|
|
}
|
|
|
|
// RunOnceResultWithFiles executes a single agent step synchronously with
|
|
// optional multimodal file attachments and returns the full TurnResult.
|
|
func (a *App) RunOnceResultWithFiles(ctx context.Context, prompt string, files []kit.LLMFilePart) (*kit.TurnResult, error) {
|
|
stepCtx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
a.mu.Lock()
|
|
a.cancelStep = cancel
|
|
a.mu.Unlock()
|
|
|
|
return a.executeStep(stepCtx, prompt, nil, files)
|
|
}
|
|
|
|
// RunOnceWithDisplay executes a single agent step synchronously, sending
|
|
// intermediate display events (spinner, tool calls, streaming chunks, etc.)
|
|
// to eventFn. This is the non-TUI equivalent of the interactive Run() path —
|
|
// used by non-interactive --prompt mode when output is needed.
|
|
//
|
|
// The eventFn receives the same event types as the Bubble Tea TUI
|
|
// (SpinnerEvent, ToolCallStartedEvent, StreamChunkEvent, StepCompleteEvent,
|
|
// etc.) and is responsible for rendering them.
|
|
//
|
|
// Blocks until the step completes or ctx is cancelled.
|
|
func (a *App) RunOnceWithDisplay(ctx context.Context, prompt string, eventFn func(tea.Msg)) error {
|
|
return a.RunOnceWithDisplayAndFiles(ctx, prompt, eventFn, nil)
|
|
}
|
|
|
|
// RunOnceWithDisplayAndFiles executes a single agent step synchronously with
|
|
// optional multimodal file attachments, sending intermediate display events.
|
|
func (a *App) RunOnceWithDisplayAndFiles(ctx context.Context, prompt string, eventFn func(tea.Msg), files []kit.LLMFilePart) error {
|
|
stepCtx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
a.mu.Lock()
|
|
a.cancelStep = cancel
|
|
a.mu.Unlock()
|
|
|
|
result, err := a.executeStep(stepCtx, prompt, eventFn, files)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Send step complete so the display handler can render the final response.
|
|
if eventFn != nil {
|
|
eventFn(StepCompleteEvent{ResponseText: result.Response})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// --------------------------------------------------------------------------
|
|
// Close
|
|
// --------------------------------------------------------------------------
|
|
|
|
// Close signals all background goroutines to stop and waits for them to finish.
|
|
// After Close() returns it is safe to call Kit.Close() / agent.Close().
|
|
func (a *App) Close() {
|
|
a.mu.Lock()
|
|
if a.closed {
|
|
a.mu.Unlock()
|
|
return
|
|
}
|
|
a.closed = true
|
|
cancel := a.cancelStep
|
|
a.mu.Unlock()
|
|
|
|
// Cancel any in-flight step and the root context.
|
|
cancel()
|
|
a.rootCancel()
|
|
|
|
// Wait for background goroutines.
|
|
a.wg.Wait()
|
|
|
|
// Clean up empty session file on shutdown.
|
|
if ts := a.opts.TreeSession; ts != nil && ts.IsEmpty() {
|
|
if path := ts.GetFilePath(); path != "" {
|
|
_ = os.Remove(path)
|
|
}
|
|
}
|
|
}
|
|
|
|
// --------------------------------------------------------------------------
|
|
// Internal: queue drain loop
|
|
// --------------------------------------------------------------------------
|
|
|
|
// drainQueue runs in a goroutine. It collects all queued items (including the
|
|
// first one) and submits them together as a single batch. This ensures that
|
|
// when multiple messages are queued while the agent is working, they are all
|
|
// submitted together in one turn rather than sequentially.
|
|
// Must be called with a.busy == true and a.wg incremented.
|
|
func (a *App) drainQueue(first queueItem) {
|
|
defer a.wg.Done()
|
|
|
|
// Collect all items to process in this batch
|
|
var items []queueItem
|
|
items = append(items, first)
|
|
|
|
// Process batches until no more items are queued
|
|
for {
|
|
// Drain the queue to collect any pending items
|
|
a.mu.Lock()
|
|
items = append(items, a.queue...)
|
|
a.queue = a.queue[:0] // Clear the queue
|
|
a.mu.Unlock()
|
|
|
|
// Notify UI: all queued messages have been consumed into this batch.
|
|
a.sendEvent(QueueUpdatedEvent{Length: 0})
|
|
|
|
// Process all collected items as a single batch
|
|
a.runQueueBatch(items)
|
|
|
|
// Drain any unconsumed steer messages from the SDK channel.
|
|
// These arrive when the user steered during a text-only response
|
|
// (no tool calls, so PrepareStep didn't fire for a second step).
|
|
// They go to the front of the queue so they run next.
|
|
if a.opts.Kit != nil {
|
|
if leftover := a.opts.Kit.DrainSteer(); len(leftover) > 0 {
|
|
a.mu.Lock()
|
|
steerItems := make([]queueItem, len(leftover))
|
|
for i, sm := range leftover {
|
|
steerItems[i] = queueItem{Prompt: sm.Text, Files: sm.Files}
|
|
}
|
|
a.queue = append(steerItems, a.queue...)
|
|
a.mu.Unlock()
|
|
// Notify UI about the consumed steer messages.
|
|
a.sendEvent(SteerConsumedEvent{})
|
|
}
|
|
}
|
|
|
|
// Check if more items were queued while we were processing
|
|
a.mu.Lock()
|
|
hasMore := len(a.queue) > 0
|
|
if hasMore {
|
|
// Start a new batch with the newly queued items
|
|
items = a.queue
|
|
a.queue = a.queue[:0]
|
|
}
|
|
a.mu.Unlock()
|
|
|
|
if hasMore {
|
|
// Notify UI: these newly queued messages have been consumed into the next batch.
|
|
a.sendEvent(QueueUpdatedEvent{Length: 0})
|
|
}
|
|
|
|
if !hasMore {
|
|
// No more items, we're done
|
|
break
|
|
}
|
|
// Process the new batch
|
|
}
|
|
|
|
// Mark as no longer busy
|
|
a.mu.Lock()
|
|
a.busy = false
|
|
a.mu.Unlock()
|
|
}
|
|
|
|
// runQueueBatch executes multiple queue items as a single agent turn.
|
|
// All items are submitted together, and the agent responds once to the combined context.
|
|
func (a *App) runQueueBatch(items []queueItem) {
|
|
if len(items) == 0 {
|
|
return
|
|
}
|
|
|
|
// Create a per-step cancellable context.
|
|
stepCtx, cancel := context.WithCancel(a.rootCtx)
|
|
a.mu.Lock()
|
|
a.cancelStep = cancel
|
|
a.mu.Unlock()
|
|
defer cancel()
|
|
|
|
// Build event function that sends to the registered tea.Program (if any).
|
|
a.mu.Lock()
|
|
prog := a.program
|
|
a.mu.Unlock()
|
|
|
|
eventFn := func(msg tea.Msg) {
|
|
if prog != nil {
|
|
prog.Send(msg)
|
|
}
|
|
}
|
|
|
|
// Execute the batch
|
|
result, err := a.executeBatch(stepCtx, items, eventFn)
|
|
if err != nil {
|
|
if stepCtx.Err() != nil {
|
|
// Step was cancelled by the user (double-ESC). The SDK
|
|
// preserves the user message and any completed tool
|
|
// call/result pairs; only the in-progress message or tool
|
|
// call is discarded. Sync the in-memory store to match.
|
|
if ts := a.opts.TreeSession; ts != nil {
|
|
a.store.Replace(ts.GetLLMMessages())
|
|
}
|
|
a.sendEvent(StepCancelledEvent{})
|
|
return
|
|
}
|
|
a.sendEvent(StepErrorEvent{Err: err})
|
|
return
|
|
}
|
|
|
|
a.sendEvent(StepCompleteEvent{ResponseText: result.Response})
|
|
}
|
|
|
|
// --------------------------------------------------------------------------
|
|
// Internal: single agent step
|
|
// --------------------------------------------------------------------------
|
|
|
|
// executeStep runs a single agentic step by delegating to the SDK's
|
|
// PromptResult() (or PromptResultWithFiles for multimodal), which handles
|
|
// session persistence, hooks, extension events, and the generation loop.
|
|
func (a *App) executeStep(ctx context.Context, prompt string, eventFn func(tea.Msg), files []kit.LLMFilePart) (*kit.TurnResult, error) {
|
|
// Test hook: bypass SDK entirely.
|
|
if a.opts.PromptFunc != nil {
|
|
return a.opts.PromptFunc(ctx, prompt)
|
|
}
|
|
|
|
sendFn := func(msg tea.Msg) {
|
|
if eventFn != nil {
|
|
eventFn(msg)
|
|
}
|
|
}
|
|
|
|
// Subscribe to SDK events for TUI rendering and per-step usage updates.
|
|
// The subscription is temporary — it lives only for the duration of this step.
|
|
var sawStepUsage atomic.Bool
|
|
unsub := a.subscribeSDKEvents(sendFn, &sawStepUsage)
|
|
defer unsub()
|
|
|
|
// Show spinner while the agent works.
|
|
sendFn(SpinnerEvent{Show: true})
|
|
|
|
var result *kit.TurnResult
|
|
var err error
|
|
if len(files) > 0 {
|
|
result, err = a.opts.Kit.PromptResultWithFiles(ctx, prompt, files)
|
|
} else {
|
|
result, err = a.opts.Kit.PromptResult(ctx, prompt)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Sync in-memory store with the SDK's authoritative conversation.
|
|
a.store.Replace(result.Messages)
|
|
|
|
// Update usage tracker. If per-step usage was already recorded from
|
|
// StepUsageEvent callbacks, avoid double-counting totals.
|
|
a.updateUsageFromTurnResult(result, prompt, sawStepUsage.Load())
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// executeBatch runs a batch of queue items as a single agent step by delegating
|
|
// to the SDK's PromptResultWithMessages(), which handles session persistence,
|
|
// hooks, extension events, and the generation loop.
|
|
func (a *App) executeBatch(ctx context.Context, items []queueItem, eventFn func(tea.Msg)) (*kit.TurnResult, error) {
|
|
// Test hook: bypass SDK entirely (single item only for test compatibility).
|
|
if a.opts.PromptFunc != nil {
|
|
if len(items) == 1 {
|
|
return a.opts.PromptFunc(ctx, items[0].Prompt)
|
|
}
|
|
// For batch mode with PromptFunc, just use the first item
|
|
return a.opts.PromptFunc(ctx, items[0].Prompt)
|
|
}
|
|
|
|
sendFn := func(msg tea.Msg) {
|
|
if eventFn != nil {
|
|
eventFn(msg)
|
|
}
|
|
}
|
|
|
|
// Subscribe to SDK events for TUI rendering and per-step usage updates.
|
|
// The subscription is temporary — it lives only for the duration of this step.
|
|
var sawStepUsage atomic.Bool
|
|
unsub := a.subscribeSDKEvents(sendFn, &sawStepUsage)
|
|
defer unsub()
|
|
|
|
// Show spinner while the agent works.
|
|
sendFn(SpinnerEvent{Show: true})
|
|
|
|
// Check if any items have file attachments
|
|
hasFiles := false
|
|
for _, item := range items {
|
|
if len(item.Files) > 0 {
|
|
hasFiles = true
|
|
break
|
|
}
|
|
}
|
|
|
|
var result *kit.TurnResult
|
|
var err error
|
|
|
|
if len(items) == 1 {
|
|
// Single item: use the original path for compatibility
|
|
item := items[0]
|
|
if len(item.Files) > 0 || hasFiles {
|
|
result, err = a.opts.Kit.PromptResultWithFiles(ctx, item.Prompt, item.Files)
|
|
} else {
|
|
result, err = a.opts.Kit.PromptResult(ctx, item.Prompt)
|
|
}
|
|
} else {
|
|
// Multiple items: batch them together
|
|
var messages []string
|
|
for _, item := range items {
|
|
messages = append(messages, item.Prompt)
|
|
}
|
|
|
|
// File attachments are not supported in batch mode; fall back to
|
|
// processing only the first item that carries files.
|
|
if hasFiles {
|
|
// If files exist, fall back to processing just the first item with files
|
|
for _, item := range items {
|
|
if len(item.Files) > 0 {
|
|
result, err = a.opts.Kit.PromptResultWithFiles(ctx, item.Prompt, item.Files)
|
|
break
|
|
}
|
|
}
|
|
} else {
|
|
result, err = a.opts.Kit.PromptResultWithMessages(ctx, messages)
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Sync in-memory store with the SDK's authoritative conversation.
|
|
a.store.Replace(result.Messages)
|
|
|
|
// Update usage tracker (using last item's prompt for fallback estimation).
|
|
// If per-step usage was already recorded from StepUsageEvent callbacks,
|
|
// avoid double-counting totals.
|
|
a.updateUsageFromTurnResult(result, items[len(items)-1].Prompt, sawStepUsage.Load())
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// sendEvent sends a tea.Msg to the registered program if one is set.
|
|
// Must NOT be called with a.mu held (to avoid deadlock with the program).
|
|
func (a *App) sendEvent(msg tea.Msg) {
|
|
a.mu.Lock()
|
|
prog := a.program
|
|
a.mu.Unlock()
|
|
if prog != nil {
|
|
prog.Send(msg)
|
|
}
|
|
}
|
|
|
|
// subscribeSDKEvents registers temporary SDK event subscribers that convert
|
|
// SDK events to tea.Msg events and dispatch them via sendFn. When stepUsageSeen
|
|
// is provided, it is set to true after any non-zero StepUsageEvent is observed.
|
|
// Returns an unsubscribe function that removes all listeners.
|
|
func (a *App) subscribeSDKEvents(sendFn func(tea.Msg), stepUsageSeen *atomic.Bool) func() {
|
|
k := a.opts.Kit
|
|
var unsubs []func()
|
|
|
|
unsubs = append(unsubs, k.Subscribe(func(e kit.Event) {
|
|
switch ev := e.(type) {
|
|
case kit.ToolCallEvent:
|
|
sendFn(ToolCallStartedEvent{ToolCallID: ev.ToolCallID, ToolName: ev.ToolName, ToolArgs: ev.ToolArgs})
|
|
case kit.ToolCallStartEvent:
|
|
sendFn(ToolCallInputStartEvent{ToolCallID: ev.ToolCallID, ToolName: ev.ToolName, ToolKind: ev.ToolKind})
|
|
case kit.ToolCallDeltaEvent:
|
|
sendFn(ToolCallInputDeltaEvent{ToolCallID: ev.ToolCallID, Delta: ev.Delta})
|
|
case kit.ToolCallEndEvent:
|
|
sendFn(ToolCallInputEndEvent{ToolCallID: ev.ToolCallID})
|
|
case kit.ToolExecutionStartEvent:
|
|
sendFn(ToolExecutionEvent{ToolCallID: ev.ToolCallID, ToolName: ev.ToolName, ToolArgs: ev.ToolArgs, IsStarting: true})
|
|
case kit.ToolExecutionEndEvent:
|
|
sendFn(ToolExecutionEvent{ToolCallID: ev.ToolCallID, ToolName: ev.ToolName, IsStarting: false})
|
|
case kit.ToolResultEvent:
|
|
sendFn(ToolResultEvent{
|
|
ToolCallID: ev.ToolCallID, ToolName: ev.ToolName, ToolArgs: ev.ToolArgs,
|
|
Result: ev.Result, IsError: ev.IsError,
|
|
})
|
|
case kit.ToolCallContentEvent:
|
|
sendFn(ToolCallContentEvent{Content: ev.Content})
|
|
case kit.ResponseEvent:
|
|
sendFn(ResponseCompleteEvent{Content: ev.Content})
|
|
case kit.MessageUpdateEvent:
|
|
sendFn(StreamChunkEvent{Content: ev.Chunk})
|
|
case kit.ReasoningDeltaEvent:
|
|
sendFn(ReasoningChunkEvent{Delta: ev.Delta})
|
|
case kit.ReasoningCompleteEvent:
|
|
sendFn(ReasoningCompleteEvent{})
|
|
case kit.ToolOutputEvent:
|
|
sendFn(ToolOutputEvent{
|
|
ToolCallID: ev.ToolCallID,
|
|
ToolName: ev.ToolName,
|
|
Chunk: ev.Chunk,
|
|
IsStderr: ev.IsStderr,
|
|
})
|
|
case kit.SteerConsumedEvent:
|
|
sendFn(SteerConsumedEvent{})
|
|
case kit.StepUsageEvent:
|
|
a.recordStepUsage(ev, stepUsageSeen, sendFn)
|
|
case kit.PasswordPromptEvent:
|
|
// Convert SDK PasswordPromptEvent to app PasswordPromptEvent
|
|
// The TUI will handle this and send the response back
|
|
responseCh := make(chan PasswordPromptResponse, 1)
|
|
sendFn(PasswordPromptEvent{
|
|
Prompt: ev.Prompt,
|
|
ResponseCh: responseCh,
|
|
})
|
|
// Wait for TUI response and forward to SDK
|
|
resp := <-responseCh
|
|
ev.ResponseCh <- kit.PasswordPromptResponse{
|
|
Password: resp.Password,
|
|
Cancelled: resp.Cancelled,
|
|
}
|
|
case kit.TurnEndEvent:
|
|
a.handleTurnEnd(ev, sendFn)
|
|
}
|
|
}))
|
|
|
|
return func() {
|
|
for _, unsub := range unsubs {
|
|
unsub()
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleTurnEnd inspects a turn's final StopReason and surfaces actionable
|
|
// feedback to the user when the turn ended in a state they can act on.
|
|
//
|
|
// Today the only surfaced case is FinishReasonLength — the model hit its
|
|
// configured max_output_tokens budget and the reply was truncated. Without
|
|
// this banner the TUI used to swallow the truncation silently, leading to
|
|
// "ghost" cut-offs with no indication of why.
|
|
//
|
|
// Separated from subscribeSDKEvents so tests can exercise it directly via a
|
|
// stubbed sendFn without standing up a full Kit.
|
|
func (a *App) handleTurnEnd(ev kit.TurnEndEvent, sendFn func(tea.Msg)) {
|
|
if sendFn == nil {
|
|
return
|
|
}
|
|
if ev.StopReason != kit.FinishReasonLength {
|
|
return
|
|
}
|
|
sendFn(ExtensionPrintEvent{
|
|
Level: "info",
|
|
Text: a.formatMaxTokensTruncatedMessage(),
|
|
})
|
|
}
|
|
|
|
// formatMaxTokensTruncatedMessage builds the user-facing explanation for a
|
|
// truncated turn. It reports the active max_output_tokens budget and, when
|
|
// known, the model's catalog output ceiling so the user can judge how much
|
|
// headroom is available.
|
|
func (a *App) formatMaxTokensTruncatedMessage() string {
|
|
k := a.opts.Kit
|
|
if k == nil {
|
|
// Extremely early / test-stub case: still emit a useful generic hint.
|
|
return "⚠ Response truncated: the model hit the configured max_output_tokens limit. " +
|
|
"Raise it with --max-tokens N, KIT_MAX_TOKENS=N, or per-model " +
|
|
"modelSettings[provider/model].maxTokens in config."
|
|
}
|
|
current := k.MaxTokens()
|
|
ceiling := k.MaxOutputLimit()
|
|
model := k.GetModelString()
|
|
|
|
msg := "⚠ Response truncated: "
|
|
if model != "" {
|
|
msg += fmt.Sprintf("%s hit the configured max_output_tokens limit", model)
|
|
} else {
|
|
msg += "the model hit the configured max_output_tokens limit"
|
|
}
|
|
if current > 0 {
|
|
msg += fmt.Sprintf(" (%d)", current)
|
|
}
|
|
msg += "."
|
|
if ceiling > 0 && current > 0 && ceiling > current {
|
|
msg += fmt.Sprintf(" This model supports up to %d output tokens.", ceiling)
|
|
}
|
|
msg += "\n\nRaise it with --max-tokens N, KIT_MAX_TOKENS=N, " +
|
|
"or per-model modelSettings[provider/model].maxTokens in your config. " +
|
|
"Re-run the last prompt after raising it to get the full response."
|
|
return msg
|
|
}
|
|
|
|
// QuitFromExtension triggers a graceful shutdown. In interactive mode it
|
|
// sends a tea.QuitMsg to the program so the TUI exits cleanly. In
|
|
// non-interactive mode it cancels the root context, stopping any in-flight
|
|
// step. Safe to call from any goroutine; idempotent.
|
|
func (a *App) QuitFromExtension() {
|
|
a.mu.Lock()
|
|
prog := a.program
|
|
a.mu.Unlock()
|
|
if prog != nil {
|
|
prog.Send(tea.QuitMsg{})
|
|
return
|
|
}
|
|
// Non-interactive: cancel the root context.
|
|
a.rootCancel()
|
|
}
|
|
|
|
// PrintFromExtension outputs text from an extension to the user. The level
|
|
// controls styling: "" for plain text, "info" for a system message block,
|
|
// "error" for an error block. In interactive mode it sends an
|
|
// ExtensionPrintEvent through the program so the TUI can render it with the
|
|
// appropriate renderer. In non-interactive mode it falls back to stderr with
|
|
// a level prefix so errors are distinguishable from plain output.
|
|
func (a *App) PrintFromExtension(level, text string) {
|
|
a.mu.Lock()
|
|
prog := a.program
|
|
a.mu.Unlock()
|
|
if prog != nil {
|
|
prog.Send(ExtensionPrintEvent{Text: text, Level: level})
|
|
return
|
|
}
|
|
// Non-interactive fallback: write to stderr with a level prefix so that
|
|
// errors and info messages are distinguishable from plain output.
|
|
switch level {
|
|
case "error":
|
|
fmt.Fprintf(os.Stderr, "[ERROR] %s\n", text)
|
|
case "info":
|
|
fmt.Fprintf(os.Stderr, "[INFO] %s\n", text)
|
|
default:
|
|
fmt.Println(text)
|
|
}
|
|
}
|
|
|
|
// SetEditorTextFromExtension sends an EditorTextSetEvent to the TUI to
|
|
// pre-fill the input editor. In non-interactive mode this is a no-op.
|
|
func (a *App) SetEditorTextFromExtension(text string) {
|
|
a.mu.Lock()
|
|
prog := a.program
|
|
a.mu.Unlock()
|
|
if prog != nil {
|
|
prog.Send(EditorTextSetEvent{Text: text})
|
|
}
|
|
}
|
|
|
|
// NotifyModelChanged sends a ModelChangedEvent to the TUI so it updates
|
|
// the model name in the status bar and message attribution.
|
|
func (a *App) NotifyModelChanged(provider, model string) {
|
|
a.mu.Lock()
|
|
prog := a.program
|
|
a.mu.Unlock()
|
|
if prog != nil {
|
|
prog.Send(ModelChangedEvent{ProviderName: provider, ModelName: model})
|
|
}
|
|
}
|
|
|
|
// NotifyWidgetUpdate sends a WidgetUpdateEvent to the TUI so it re-renders
|
|
// extension widgets. Called from the extension context's SetWidget/RemoveWidget
|
|
// closures. In non-interactive mode this is a no-op (widgets are TUI-only).
|
|
//
|
|
// Coalescing: if a WidgetUpdateEvent is already queued and not yet consumed
|
|
// by the TUI event loop, additional calls within the same ~16 ms window are
|
|
// dropped. This prevents fast extension tickers from flooding BubbleTea's
|
|
// mailbox with redundant re-render triggers.
|
|
func (a *App) NotifyWidgetUpdate() {
|
|
// Coalesce: only one pending update at a time.
|
|
if !a.widgetUpdatePending.CompareAndSwap(false, true) {
|
|
return
|
|
}
|
|
a.mu.Lock()
|
|
prog := a.program
|
|
a.mu.Unlock()
|
|
if prog != nil {
|
|
prog.Send(WidgetUpdateEvent{})
|
|
// Reset the pending flag after a short debounce so subsequent calls
|
|
// within the same render cycle are also coalesced, but new updates
|
|
// after the cycle are allowed through.
|
|
go func() {
|
|
time.Sleep(16 * time.Millisecond) // ~1 frame at 60 fps
|
|
a.widgetUpdatePending.Store(false)
|
|
}()
|
|
} else {
|
|
// No program registered (non-interactive mode); clear the flag so
|
|
// future calls are never permanently blocked.
|
|
a.widgetUpdatePending.Store(false)
|
|
}
|
|
}
|
|
|
|
// NotifyContentReload sends a ContentReloadEvent to the TUI so it refreshes
|
|
// prompt templates and skills from their provider callbacks. Called by file
|
|
// watchers when .md/.txt files change in prompt or skill directories.
|
|
// In non-interactive mode this is a no-op.
|
|
func (a *App) NotifyContentReload() {
|
|
a.mu.Lock()
|
|
prog := a.program
|
|
a.mu.Unlock()
|
|
if prog != nil {
|
|
prog.Send(ContentReloadEvent{})
|
|
}
|
|
}
|
|
|
|
// NotifyMCPToolsReady sends an MCPToolsReadyEvent to the TUI so it refreshes
|
|
// tool names and MCP tool count from provider callbacks. Called when background
|
|
// MCP tool loading completes. In non-interactive mode this is a no-op.
|
|
func (a *App) NotifyMCPToolsReady() {
|
|
a.mu.Lock()
|
|
prog := a.program
|
|
a.mu.Unlock()
|
|
if prog != nil {
|
|
prog.Send(MCPToolsReadyEvent{})
|
|
}
|
|
}
|
|
|
|
// NotifyMCPServerLoaded sends an MCPServerLoadedEvent to the TUI so it can
|
|
// display a system message when a single MCP server finishes loading. Called
|
|
// per server as background MCP tool loading progresses.
|
|
func (a *App) NotifyMCPServerLoaded(serverName string, toolCount int, err error) {
|
|
a.mu.Lock()
|
|
prog := a.program
|
|
a.mu.Unlock()
|
|
if prog != nil {
|
|
prog.Send(MCPServerLoadedEvent{
|
|
ServerName: serverName,
|
|
ToolCount: toolCount,
|
|
Error: err,
|
|
})
|
|
}
|
|
}
|
|
|
|
// SendEvent sends a tea.Msg to the registered program. Safe to call from
|
|
// any goroutine. No-op when no program is registered.
|
|
//
|
|
// Satisfies ui.AppController.
|
|
func (a *App) SendEvent(msg tea.Msg) {
|
|
a.sendEvent(msg)
|
|
}
|
|
|
|
// SendPromptRequest sends a PromptRequestEvent to the TUI so the user can
|
|
// respond interactively. In non-interactive mode (no program registered) it
|
|
// immediately responds with a cancelled result via the channel, ensuring the
|
|
// calling extension goroutine never blocks indefinitely.
|
|
func (a *App) SendPromptRequest(evt PromptRequestEvent) {
|
|
a.mu.Lock()
|
|
prog := a.program
|
|
a.mu.Unlock()
|
|
if prog != nil {
|
|
prog.Send(evt)
|
|
return
|
|
}
|
|
// Non-interactive fallback: immediately cancel.
|
|
if evt.ResponseCh != nil {
|
|
evt.ResponseCh <- PromptResponse{Cancelled: true}
|
|
}
|
|
}
|
|
|
|
// SendOverlayRequest sends an OverlayRequestEvent to the TUI so the user
|
|
// can interact with a modal overlay dialog. In non-interactive mode (no
|
|
// program registered) it immediately responds with a cancelled result via the
|
|
// channel, ensuring the calling extension goroutine never blocks indefinitely.
|
|
func (a *App) SendOverlayRequest(evt OverlayRequestEvent) {
|
|
a.mu.Lock()
|
|
prog := a.program
|
|
a.mu.Unlock()
|
|
if prog != nil {
|
|
prog.Send(evt)
|
|
return
|
|
}
|
|
// Non-interactive fallback: immediately cancel.
|
|
if evt.ResponseCh != nil {
|
|
evt.ResponseCh <- OverlayResponse{Cancelled: true}
|
|
}
|
|
}
|
|
|
|
// SuspendTUI temporarily releases the terminal from the TUI, runs the
|
|
// callback (which may spawn interactive subprocesses), and then restores
|
|
// the TUI. In non-interactive mode (no program registered) the callback
|
|
// runs directly with no terminal state changes.
|
|
//
|
|
// Safe to call from any goroutine (extension command handlers run in
|
|
// goroutines). Blocks until the callback returns.
|
|
func (a *App) SuspendTUI(callback func()) error {
|
|
a.mu.Lock()
|
|
prog := a.program
|
|
a.mu.Unlock()
|
|
if prog == nil {
|
|
// Non-interactive: just run the callback directly.
|
|
callback()
|
|
return nil
|
|
}
|
|
if err := prog.ReleaseTerminal(); err != nil {
|
|
return fmt.Errorf("release terminal: %w", err)
|
|
}
|
|
callback()
|
|
if err := prog.RestoreTerminal(); err != nil {
|
|
return fmt.Errorf("restore terminal: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// PrintBlockFromExtension outputs a custom styled block from an extension.
|
|
func (a *App) PrintBlockFromExtension(opts extensions.PrintBlockOpts) {
|
|
a.mu.Lock()
|
|
prog := a.program
|
|
a.mu.Unlock()
|
|
if prog != nil {
|
|
prog.Send(ExtensionPrintEvent{
|
|
Text: opts.Text,
|
|
Level: "block",
|
|
BorderColor: opts.BorderColor,
|
|
Subtitle: opts.Subtitle,
|
|
})
|
|
return
|
|
}
|
|
// Non-interactive fallback: render a simple framed block to stderr so
|
|
// it is visually distinct from plain stdout output.
|
|
if opts.Subtitle != "" {
|
|
fmt.Fprintf(os.Stderr, "--- %s ---\n%s\n", opts.Subtitle, opts.Text)
|
|
} else {
|
|
fmt.Fprintf(os.Stderr, "---\n%s\n---\n", opts.Text)
|
|
}
|
|
}
|
|
|
|
// recordStepUsage applies token/cost usage reported for a completed step.
|
|
// Step usage events arrive even when a turn is later cancelled, so this keeps
|
|
// the usage widget accurate on all stop paths.
|
|
//
|
|
// Both session totals (cost, token counts) and the context window fill level
|
|
// are updated here so the status bar reflects progress after every LLM call,
|
|
// not just at the end of the full turn. Context fill monotonically increases
|
|
// across steps because each step re-sends the entire conversation plus any
|
|
// new tool results, so the numbers only go up.
|
|
//
|
|
// sendFn is called with a UsageUpdatedEvent to trigger a TUI re-render so
|
|
// the updated values are visible immediately.
|
|
func (a *App) recordStepUsage(ev kit.StepUsageEvent, stepUsageSeen *atomic.Bool, sendFn func(tea.Msg)) {
|
|
hasUsage := ev.InputTokens > 0 || ev.OutputTokens > 0 || ev.CacheReadTokens > 0 || ev.CacheWriteTokens > 0
|
|
if a.opts.Debug {
|
|
log.Printf("[DEBUG] recordStepUsage: hasUsage=%v input=%d output=%d cacheRead=%d cacheWrite=%d",
|
|
hasUsage, ev.InputTokens, ev.OutputTokens, ev.CacheReadTokens, ev.CacheWriteTokens)
|
|
}
|
|
if !hasUsage {
|
|
return
|
|
}
|
|
if stepUsageSeen != nil {
|
|
stepUsageSeen.Store(true)
|
|
}
|
|
if a.opts.UsageTracker == nil {
|
|
return
|
|
}
|
|
a.opts.UsageTracker.UpdateUsage(
|
|
int(ev.InputTokens),
|
|
int(ev.OutputTokens),
|
|
int(ev.CacheReadTokens),
|
|
int(ev.CacheWriteTokens),
|
|
)
|
|
// Update context window fill from this step's usage. Each step sends
|
|
// the full conversation to the LLM, so the reported token counts
|
|
// represent the actual context utilization at that point.
|
|
contextFill := int(ev.InputTokens) + int(ev.CacheReadTokens) + int(ev.CacheWriteTokens) + int(ev.OutputTokens)
|
|
if contextFill > 0 {
|
|
if a.opts.Debug {
|
|
log.Printf("[DEBUG] recordStepUsage: SetContextTokens=%d (Input=%d + CacheRead=%d + CacheWrite=%d + Output=%d)",
|
|
contextFill, ev.InputTokens, ev.CacheReadTokens, ev.CacheWriteTokens, ev.OutputTokens)
|
|
}
|
|
a.opts.UsageTracker.SetContextTokens(contextFill)
|
|
}
|
|
// Notify the TUI so it re-renders the status bar with updated values.
|
|
if sendFn != nil {
|
|
sendFn(UsageUpdatedEvent{})
|
|
}
|
|
}
|
|
|
|
// updateUsageFromTurnResult records token usage from an SDK TurnResult into the
|
|
// configured UsageTracker. Called once per turn after the turn completes.
|
|
//
|
|
// When sawStepUsage is true, totals were already accumulated incrementally via
|
|
// StepUsageEvent callbacks; in that case this method only updates context fill.
|
|
// Otherwise it falls back to TotalUsage from the API response.
|
|
//
|
|
// NOTE: We only use ACTUAL token counts from API responses for cost tracking.
|
|
// Estimation is never used for costs - only API-reported tokens are accurate.
|
|
func (a *App) updateUsageFromTurnResult(result *kit.TurnResult, userPrompt string, sawStepUsage bool) {
|
|
if a.opts.UsageTracker == nil || result == nil {
|
|
return
|
|
}
|
|
|
|
// Debug logging for token tracking
|
|
if a.opts.Debug {
|
|
if result.TotalUsage != nil {
|
|
log.Printf("[DEBUG] updateUsageFromTurnResult TotalUsage: input=%d output=%d cacheRead=%d cacheCreate=%d",
|
|
result.TotalUsage.InputTokens, result.TotalUsage.OutputTokens,
|
|
result.TotalUsage.CacheReadTokens, result.TotalUsage.CacheCreationTokens)
|
|
} else {
|
|
log.Printf("[DEBUG] updateUsageFromTurnResult: TotalUsage=nil")
|
|
}
|
|
if result.FinalUsage != nil {
|
|
log.Printf("[DEBUG] updateUsageFromTurnResult FinalUsage: input=%d output=%d cacheRead=%d cacheCreate=%d",
|
|
result.FinalUsage.InputTokens, result.FinalUsage.OutputTokens,
|
|
result.FinalUsage.CacheReadTokens, result.FinalUsage.CacheCreationTokens)
|
|
} else {
|
|
log.Printf("[DEBUG] updateUsageFromTurnResult: FinalUsage=nil")
|
|
}
|
|
log.Printf("[DEBUG] updateUsageFromTurnResult: sawStepUsage=%v", sawStepUsage)
|
|
}
|
|
|
|
// --- Accumulate cost/token totals for the session ---
|
|
// Only use actual API-reported tokens for cost tracking.
|
|
// If sawStepUsage is true, totals were already updated via StepUsageEvent.
|
|
// Check any token field > 0 (not just InputTokens) because cached prompts
|
|
// can result in InputTokens=0 while OutputTokens>0 (OpenAI-compatible behavior).
|
|
hasTotalUsage := result.TotalUsage != nil &&
|
|
(result.TotalUsage.InputTokens > 0 ||
|
|
result.TotalUsage.OutputTokens > 0 ||
|
|
result.TotalUsage.CacheReadTokens > 0 ||
|
|
result.TotalUsage.CacheCreationTokens > 0)
|
|
if a.opts.Debug {
|
|
log.Printf("[DEBUG] updateUsageFromTurnResult: hasTotalUsage=%v", hasTotalUsage)
|
|
}
|
|
if !sawStepUsage && hasTotalUsage {
|
|
if a.opts.Debug {
|
|
log.Printf("[DEBUG] updateUsageFromTurnResult: calling UpdateUsage input=%d output=%d cacheRead=%d cacheCreate=%d",
|
|
result.TotalUsage.InputTokens, result.TotalUsage.OutputTokens,
|
|
result.TotalUsage.CacheReadTokens, result.TotalUsage.CacheCreationTokens)
|
|
}
|
|
a.opts.UsageTracker.UpdateUsage(
|
|
int(result.TotalUsage.InputTokens),
|
|
int(result.TotalUsage.OutputTokens),
|
|
int(result.TotalUsage.CacheReadTokens),
|
|
int(result.TotalUsage.CacheCreationTokens),
|
|
)
|
|
}
|
|
|
|
// --- Context window fill (drives the % bar) ---
|
|
// Calculate context fill from the LAST API call's usage. The context
|
|
// window is filled by everything sent to and received from the model:
|
|
//
|
|
// InputTokens — non-cached input (may be small with prompt caching)
|
|
// CacheReadTokens — input tokens served from cache
|
|
// CacheCreationTokens — input tokens written to cache this call
|
|
// OutputTokens — assistant output (becomes input next turn)
|
|
//
|
|
// With Anthropic prompt caching, InputTokens can drop to near-zero while
|
|
// CacheReadTokens holds the bulk of the context. We must sum all four to
|
|
// get the true context window utilization.
|
|
//
|
|
// We use FinalUsage (last step only), NOT TotalUsage, because TotalUsage
|
|
// sums across all tool-calling steps — and each step re-sends the full
|
|
// conversation, so TotalUsage massively overstates the actual window fill.
|
|
if result.FinalUsage != nil {
|
|
u := result.FinalUsage
|
|
contextFill := int(u.InputTokens) + int(u.CacheReadTokens) + int(u.CacheCreationTokens) + int(u.OutputTokens)
|
|
if contextFill > 0 {
|
|
if a.opts.Debug {
|
|
log.Printf("[DEBUG] updateUsageFromTurnResult: SetContextTokens=%d (Input=%d + CacheRead=%d + CacheCreate=%d + Output=%d)",
|
|
contextFill, u.InputTokens, u.CacheReadTokens, u.CacheCreationTokens, u.OutputTokens)
|
|
}
|
|
a.opts.UsageTracker.SetContextTokens(contextFill)
|
|
}
|
|
}
|
|
}
|