diff --git a/cmd/root.go b/cmd/root.go index c3ac0b90..86759d08 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -811,7 +811,7 @@ func runNormalMode(ctx context.Context) error { PrintError: func(text string) { appInstance.PrintFromExtension("error", text) }, PrintBlock: appInstance.PrintBlockFromExtension, SendMessage: func(text string) { appInstance.Run(text) }, - CancelAndSend: func(text string) { appInstance.Steer(text) }, + CancelAndSend: func(text string) { appInstance.InterruptAndSend(text) }, Exit: func() { appInstance.QuitFromExtension() }, SetWidget: func(config extensions.WidgetConfig) { kitInstance.SetExtensionWidget(config) diff --git a/internal/agent/agent.go b/internal/agent/agent.go index dee73e9f..8dc503fc 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -275,7 +275,7 @@ func (a *Agent) GenerateWithLoopAndStreaming(ctx context.Context, messages []fan var completedStepMessages []fantasy.Message // Use fantasy's streaming agent - result, err := a.fantasyAgent.Stream(ctx, fantasy.AgentStreamCall{ + streamCall := fantasy.AgentStreamCall{ Prompt: prompt, Files: files, Messages: history, @@ -364,7 +364,51 @@ func (a *Agent) GenerateWithLoopAndStreaming(ctx context.Context, messages []fan } return nil }, - }) + } + + // If a steer channel is attached to the context, wire up a + // PrepareStep function that drains the channel between steps + // and injects pending steer messages as user messages before + // the next LLM call. This enables graceful mid-turn steering + // without cancelling in-progress tool execution. + if steerCh := steerChFromContext(ctx); steerCh != nil { + onConsumed := steerConsumedFromContext(ctx) + streamCall.PrepareStep = func( + stepCtx context.Context, + opts fantasy.PrepareStepFunctionOptions, + ) (context.Context, fantasy.PrepareStepResult, error) { + // Drain all pending steer messages (non-blocking). + var steered []string + for { + select { + case msg := <-steerCh: + steered = append(steered, msg) + default: + goto done + } + } + done: + result := fantasy.PrepareStepResult{ + Model: opts.Model, + Messages: opts.Messages, + } + if len(steered) > 0 { + // Inject each steer message as a user message so the + // LLM sees the redirection on the next step. + for _, text := range steered { + result.Messages = append(result.Messages, + fantasy.NewUserMessage(text)) + } + // Notify that steer messages were consumed. + if onConsumed != nil { + onConsumed(len(steered)) + } + } + return stepCtx, result, nil + } + } + + result, err := a.fantasyAgent.Stream(ctx, streamCall) if err != nil { // On cancellation (or any error), return a partial result // containing messages from completed steps so the caller can diff --git a/internal/agent/steer.go b/internal/agent/steer.go new file mode 100644 index 00000000..3a318c47 --- /dev/null +++ b/internal/agent/steer.go @@ -0,0 +1,35 @@ +package agent + +import "context" + +// steerChKey is the context key for the steer channel. +type steerChKey struct{} + +// steerConsumedKey is the context key for the steer-consumed callback. +type steerConsumedKey struct{} + +// ContextWithSteerCh returns a new context with the steer channel attached. +// The agent's PrepareStep function checks this channel between steps and +// injects any pending steer messages as user messages before the next LLM call. +func ContextWithSteerCh(ctx context.Context, ch <-chan string) context.Context { + return context.WithValue(ctx, steerChKey{}, ch) +} + +// ContextWithSteerConsumed returns a new context with a callback that fires +// when steer messages are consumed by PrepareStep. The count argument is the +// number of messages injected in this batch. +func ContextWithSteerConsumed(ctx context.Context, fn func(count int)) context.Context { + return context.WithValue(ctx, steerConsumedKey{}, fn) +} + +// steerChFromContext extracts the steer channel from the context, or nil. +func steerChFromContext(ctx context.Context) <-chan string { + ch, _ := ctx.Value(steerChKey{}).(<-chan string) + return ch +} + +// steerConsumedFromContext extracts the steer-consumed callback, or nil. +func steerConsumedFromContext(ctx context.Context) func(int) { + fn, _ := ctx.Value(steerConsumedKey{}).(func(int)) + return fn +} diff --git a/internal/app/app.go b/internal/app/app.go index 650e6040..f963edb4 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -159,11 +159,57 @@ func (a *App) QueueLength() int { return len(a.queue) } -// Steer cancels the current agent step (if running), clears the queue, and -// sends a new message that will execute as soon as the current step finishes -// cancelling. If the agent is idle, the message executes immediately. -// This is the "steer" delivery mode for SendMessage. -func (a *App) Steer(prompt string) { +// Steer injects a steering message into the currently running agent turn. +// If the agent is in a multi-step tool loop, the message is delivered after +// the current tool execution finishes but before the next LLM call (graceful +// mid-turn injection via Fantasy's PrepareStep). If the agent is streaming +// a text-only response (no pending tool calls), the message waits until the +// response completes and then executes as the next turn. +// +// If the agent is idle, the message starts executing immediately (same as Run). +// +// Returns the number of pending steer/queue items (0 = started immediately, +// >0 = injected/queued). The caller must update UI state based on the return +// value — Steer does NOT send events to the program to avoid deadlocking +// when called from within Update(). +// +// Satisfies ui.AppController. +func (a *App) Steer(prompt string) int { + a.mu.Lock() + + if a.closed { + a.mu.Unlock() + return 0 + } + + if !a.busy { + // Not busy — start immediately, same as Run(). + item := queueItem{Prompt: prompt} + a.busy = true + a.wg.Add(1) + a.mu.Unlock() + go a.drainQueue(item) + return 0 + } + + a.mu.Unlock() + + // Agent is busy — inject via the SDK's steer channel. The message + // will be picked up by PrepareStep between agent steps (after tool + // execution, before next LLM call). If PrepareStep doesn't fire + // (text-only response), drainQueue will pick it up after the turn. + if a.opts.Kit != nil { + a.opts.Kit.InjectSteer(prompt) + } + return 1 +} + +// InterruptAndSend cancels the current agent step (if running), clears the +// queue, and sends a new message that will execute as soon as the current +// step finishes cancelling. If the agent is idle, the message executes +// immediately. This is the hard-cancel delivery mode used by extensions' +// CancelAndSend. +func (a *App) InterruptAndSend(prompt string) { a.mu.Lock() if a.closed { @@ -434,6 +480,24 @@ func (a *App) drainQueue(first queueItem) { // Process all collected items as a single batch a.runQueueBatch(items) + // Drain any unconsumed steer messages from the SDK channel. + // These arrive when the user steered during a text-only response + // (no tool calls, so PrepareStep didn't fire for a second step). + // They go to the front of the queue so they run next. + if a.opts.Kit != nil { + if leftover := a.opts.Kit.DrainSteer(); len(leftover) > 0 { + a.mu.Lock() + steerItems := make([]queueItem, len(leftover)) + for i, text := range leftover { + steerItems[i] = queueItem{Prompt: text} + } + a.queue = append(steerItems, a.queue...) + a.mu.Unlock() + // Notify UI about the consumed steer messages. + a.sendEvent(SteerConsumedEvent{}) + } + } + // Check if more items were queued while we were processing a.mu.Lock() hasMore := len(a.queue) > 0 @@ -687,6 +751,8 @@ func (a *App) subscribeSDKEvents(sendFn func(tea.Msg)) func() { int(ev.CacheWriteTokens), ) } + case kit.SteerConsumedEvent: + sendFn(SteerConsumedEvent{}) } })) diff --git a/internal/app/events.go b/internal/app/events.go index c724cc82..3838a5a5 100644 --- a/internal/app/events.go +++ b/internal/app/events.go @@ -141,6 +141,12 @@ type CompactErrorEvent struct { Err error } +// SteerConsumedEvent is sent when one or more steering messages have been +// consumed — either injected mid-turn via PrepareStep, or drained into the +// queue after a turn completes. The TUI uses this to clear the steering +// badge from the display. +type SteerConsumedEvent struct{} + // ModelChangedEvent is sent when an extension changes the active model via // ctx.SetModel. The TUI updates the model name shown in the status bar and // message attribution. diff --git a/internal/ui/input.go b/internal/ui/input.go index fd55057b..656c35d4 100644 --- a/internal/ui/input.go +++ b/internal/ui/input.go @@ -65,6 +65,10 @@ type InputComponent struct { // hideHint suppresses the "enter submit · ctrl+j..." hint text. hideHint bool + // agentBusy indicates the agent is currently working. When true, the + // hint text shows steering shortcut (Ctrl+S) instead of submit. + agentBusy bool + // pendingImages holds clipboard images attached to the next submission. // Images are added via Ctrl+V and cleared on submit or Ctrl+U. pendingImages []ImageAttachment @@ -514,7 +518,16 @@ func (s *InputComponent) View() tea.View { // Adapt hint text to available width (accounting for left padding of 3). var hint string availableHintWidth := s.width - 3 - if availableHintWidth >= 67 { + if s.agentBusy { + // When the agent is working, show steering shortcut. + if availableHintWidth >= 55 { + hint = "enter queue • ctrl+s steer • esc esc cancel" + } else if availableHintWidth >= 35 { + hint = "↵ queue • ^S steer • esc×2 cancel" + } else { + hint = "^S steer" + } + } else if availableHintWidth >= 67 { hint = "enter submit • ctrl+j / shift+enter new line • ctrl+v paste image" } else if availableHintWidth >= 40 { hint = "↵ submit • ctrl+j newline • ctrl+v image" diff --git a/internal/ui/model.go b/internal/ui/model.go index 4ffe0419..2e77d05a 100644 --- a/internal/ui/model.go +++ b/internal/ui/model.go @@ -98,6 +98,12 @@ type AppController interface { // alongside the text. Returns the current queue depth (0 = started // immediately, >0 = queued). RunWithFiles(prompt string, files []fantasy.FilePart) int + // Steer injects a steering message into the currently running agent + // turn. If the agent is busy, the message is delivered between steps + // (after current tool finishes, before next LLM call). If idle, the + // message starts executing immediately. Returns 0 if started + // immediately, >0 if injected/pending. + Steer(prompt string) int } // SkillItem holds display metadata about a loaded skill for the startup @@ -415,6 +421,11 @@ type AppModel struct { // the input and move to scrollback when the agent picks them up. queuedMessages []string + // steeringMessages stores the text of prompts that were sent as steer + // messages (injected mid-turn via Ctrl+S). Rendered with a "STEERING" + // badge above the input. Cleared when the steer is consumed. + steeringMessages []string + // pendingUserPrints holds user messages that have been consumed from the // queue but not yet printed to scrollback. They are deferred until // SpinnerEvent{Show: true} so the previous assistant response can be @@ -1070,6 +1081,45 @@ func (m *AppModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { return m, tea.Batch(cmds...) } // In other states pass ESC through to children below. + + case "ctrl+s": + // Steer: inject the current input as a steering message into the + // running agent turn. Only active during stateWorking — in input + // state, Ctrl+S is passed through to children (no-op by default). + if m.state == stateWorking && m.appCtrl != nil { + var text string + if ic, ok := m.input.(*InputComponent); ok { + text = strings.TrimSpace(ic.textarea.Value()) + } + if text != "" { + // Clear the input and push to history. + if ic, ok := m.input.(*InputComponent); ok { + ic.pushHistory(text) + ic.textarea.SetValue("") + } + + // Preprocess @file references. + processedText := text + if m.cwd != "" { + processedText = ProcessFileAttachments(text, m.cwd) + } + + // Inject the steer message. + sLen := m.appCtrl.Steer(processedText) + if sLen > 0 { + m.steeringMessages = append(m.steeringMessages, text) + m.distributeHeight() + } else { + // Started immediately (agent was idle). + m.pendingUserPrints = append(m.pendingUserPrints, text) + m.flushStreamAndPendingUserMessages() + if m.state != stateWorking { + m.state = stateWorking + } + } + } + return m, tea.Batch(cmds...) + } } // Route key events to the focused child. Check for editor @@ -1389,6 +1439,14 @@ func (m *AppModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { } m.distributeHeight() + case app.SteerConsumedEvent: + // Steering messages were consumed — either injected mid-turn + // via PrepareStep or drained into the queue after a turn. + // Move them to pendingUserPrints for scrollback rendering. + m.pendingUserPrints = append(m.pendingUserPrints, m.steeringMessages...) + m.steeringMessages = m.steeringMessages[:0] + m.distributeHeight() + case app.StepCompleteEvent: // Keep stream content visible in the view — don't flush to scrollback // yet. Flushing + resetting in the same frame would shrink the view @@ -1641,6 +1699,7 @@ func (m *AppModel) View() tea.View { // Propagate hint visibility to the input component before rendering. if ic, ok := m.input.(*InputComponent); ok { ic.hideHint = vis.HideInputHint + ic.agentBusy = m.state == stateWorking } // When a prompt is active, it replaces the input area for consistency @@ -1935,16 +1994,26 @@ func (m *AppModel) cycleThinkingLevel() { go func() { _ = SaveThinkingLevelPreference(next) }() } -// renderSeparator renders the separator line with an optional queue count badge. +// renderSeparator renders the separator line with an optional queue/steer count badge. func (m *AppModel) renderSeparator() string { theme := GetTheme() lineStyle := lipgloss.NewStyle().Foreground(theme.Muted) queueLen := len(m.queuedMessages) + steerLen := len(m.steeringMessages) - if queueLen > 0 { - badge := lipgloss.NewStyle(). - Foreground(theme.Secondary). - Render(fmt.Sprintf("%d queued", queueLen)) + if steerLen > 0 || queueLen > 0 { + var parts []string + if steerLen > 0 { + parts = append(parts, lipgloss.NewStyle(). + Foreground(theme.Warning). + Render(fmt.Sprintf("%d steering", steerLen))) + } + if queueLen > 0 { + parts = append(parts, lipgloss.NewStyle(). + Foreground(theme.Secondary). + Render(fmt.Sprintf("%d queued", queueLen))) + } + badge := strings.Join(parts, " ") // Fill the separator with dashes up to the badge. dashWidth := max(m.width-lipgloss.Width(badge)-1, 0) @@ -2043,27 +2112,47 @@ func (m *AppModel) renderHeaderFooter(getter func() *WidgetData) string { return renderContentBlock(data.Text, m.width, opts...) } -// renderQueuedMessages renders queued prompts as styled content blocks with a -// "QUEUED" badge, anchored between the separator and input. Each message is -// displayed in a bordered block matching the overall message styling. +// renderQueuedMessages renders queued and steering prompts as styled content +// blocks with badges, anchored between the separator and input. Steering +// messages use a distinct "STEERING" badge to differentiate from queued ones. func (m *AppModel) renderQueuedMessages() string { - if len(m.queuedMessages) == 0 { + if len(m.queuedMessages) == 0 && len(m.steeringMessages) == 0 { return "" } theme := GetTheme() - badge := CreateBadge("QUEUED", theme.Accent) var blocks []string - for _, msg := range m.queuedMessages { - content := msg + "\n" + badge - rendered := renderContentBlock( - content, - m.width, - WithAlign(lipgloss.Left), - WithBorderColor(theme.Muted), - ) - blocks = append(blocks, rendered) + + // Render steering messages first (higher priority). + if len(m.steeringMessages) > 0 { + badge := CreateBadge("STEERING", theme.Warning) + for _, msg := range m.steeringMessages { + content := msg + "\n" + badge + rendered := renderContentBlock( + content, + m.width, + WithAlign(lipgloss.Left), + WithBorderColor(theme.Warning), + ) + blocks = append(blocks, rendered) + } } + + // Render queued messages. + if len(m.queuedMessages) > 0 { + badge := CreateBadge("QUEUED", theme.Accent) + for _, msg := range m.queuedMessages { + content := msg + "\n" + badge + rendered := renderContentBlock( + content, + m.width, + WithAlign(lipgloss.Left), + WithBorderColor(theme.Muted), + ) + blocks = append(blocks, rendered) + } + } + return strings.Join(blocks, "\n") } @@ -2134,6 +2223,7 @@ func (m *AppModel) handleSlashCommand(sc *SlashCommand) tea.Cmd { m.appCtrl.ClearQueue() } m.queuedMessages = m.queuedMessages[:0] + m.steeringMessages = m.steeringMessages[:0] m.distributeHeight() case "/tree": @@ -2321,7 +2411,9 @@ func (m *AppModel) printHelpMessage() { "- `!!command`: Run shell command, output excluded from LLM context\n\n" + "**Keys:**\n" + "- `Ctrl+C`: Exit at any time\n" + - "- `ESC` (x2): Cancel ongoing LLM generation\n\n" + + "- `ESC` (x2): Cancel ongoing LLM generation\n" + + "- `Ctrl+S`: Steer — redirect the agent mid-turn (injected between tool calls)\n" + + "- `Enter` (while working): Queue message for after the agent finishes\n\n" + "You can also just type your message to chat with the AI assistant." m.printSystemMessage(help) } diff --git a/internal/ui/model_test.go b/internal/ui/model_test.go index 5c2827b6..00727da8 100644 --- a/internal/ui/model_test.go +++ b/internal/ui/model_test.go @@ -67,6 +67,11 @@ func (s *stubAppController) RunWithFiles(prompt string, _ []fantasy.FilePart) in return s.queueLen } +func (s *stubAppController) Steer(prompt string) int { + s.runCalls = append(s.runCalls, prompt) + return s.queueLen +} + // -------------------------------------------------------------------------- // Stub child components // -------------------------------------------------------------------------- diff --git a/pkg/kit/events.go b/pkg/kit/events.go index 25dc8794..fd5f1f8b 100644 --- a/pkg/kit/events.go +++ b/pkg/kit/events.go @@ -42,6 +42,9 @@ const ( // EventToolOutput fires when a tool produces streaming output chunks. EventToolOutput EventType = "tool_output" EventStepUsage EventType = "step_usage" + // EventSteerConsumed fires when one or more steering messages have been + // injected into the agent turn via PrepareStep. + EventSteerConsumed EventType = "steer_consumed" ) // --------------------------------------------------------------------------- @@ -276,6 +279,16 @@ type CompactionEvent struct { // EventType implements Event. func (e CompactionEvent) EventType() EventType { return EventCompaction } +// SteerConsumedEvent fires when one or more steering messages have been +// injected into the agent turn via PrepareStep. The Count indicates how +// many messages were consumed in this batch. +type SteerConsumedEvent struct { + Count int +} + +// EventType implements Event. +func (e SteerConsumedEvent) EventType() EventType { return EventSteerConsumed } + // --------------------------------------------------------------------------- // EventBus // --------------------------------------------------------------------------- diff --git a/pkg/kit/kit.go b/pkg/kit/kit.go index 757a355a..f5f41d58 100644 --- a/pkg/kit/kit.go +++ b/pkg/kit/kit.go @@ -66,6 +66,13 @@ type Kit struct { // subagentListeners holds per-tool-call event listeners registered via // SubscribeSubagent(). Keyed by toolCallID → *subagentListenerSet. subagentListeners sync.Map + + // steerCh is a buffered channel used to inject steering messages into + // the running agent turn via Fantasy's PrepareStep. Created fresh for + // each generate() call and set to nil when idle. Protected by steerMu. + steerMu sync.Mutex + steerCh chan string + leftoverSteer []string // unconsumed steer messages from the last turn } // Subscribe registers an EventListener that will be called for every lifecycle @@ -1405,6 +1412,35 @@ func (m *Kit) Subagent(ctx context.Context, cfg SubagentConfig) (*SubagentResult // All prompt modes (Prompt, Steer, FollowUp, PromptWithOptions) share this // single code path so callback wiring is never duplicated. func (m *Kit) generate(ctx context.Context, messages []fantasy.Message) (*agent.GenerateWithLoopResult, error) { + // Create a per-turn steer channel and attach it to the context so the + // agent's PrepareStep can inject steering messages between steps. + steerCh := make(chan string, 16) + m.steerMu.Lock() + m.steerCh = steerCh + m.steerMu.Unlock() + defer func() { + // Drain any unconsumed steer messages before nilling the channel. + // These are stored in leftoverSteer so DrainSteer() can return them. + var leftover []string + for { + select { + case msg := <-steerCh: + leftover = append(leftover, msg) + default: + goto drained + } + } + drained: + m.steerMu.Lock() + m.steerCh = nil + m.leftoverSteer = leftover + m.steerMu.Unlock() + }() + ctx = agent.ContextWithSteerCh(ctx, steerCh) + ctx = agent.ContextWithSteerConsumed(ctx, func(count int) { + m.events.emit(SteerConsumedEvent{Count: count}) + }) + // Inject the in-process subagent spawner into the context so the // spawn_subagent core tool can create child Kit instances without // importing pkg/kit (which would create an import cycle). @@ -1714,6 +1750,71 @@ func (m *Kit) FollowUp(ctx context.Context, text string) (string, error) { return result.Response, nil } +// InjectSteer sends a steering message into the currently active agent turn. +// The message will be injected as a user message between steps (after the +// current tool execution finishes, before the next LLM call). If no turn is +// active the message is silently dropped — callers should check IsGenerating() +// or use Prompt()/Steer() for idle-state messaging. +// +// InjectSteer is safe to call from any goroutine. Multiple calls queue +// messages in order; all pending steer messages are drained and injected +// together at the next step boundary. +// +// This is the preferred way to redirect an agent mid-turn without cancelling +// in-progress tool execution. +func (m *Kit) InjectSteer(message string) { + m.steerMu.Lock() + ch := m.steerCh + m.steerMu.Unlock() + if ch == nil { + return + } + select { + case ch <- message: + default: + // Channel full — extremely unlikely with buffer of 16, but don't block. + } +} + +// IsGenerating returns true if an agent turn is currently in progress. +// Use this to decide between InjectSteer (mid-turn) and Prompt (new turn). +func (m *Kit) IsGenerating() bool { + m.steerMu.Lock() + defer m.steerMu.Unlock() + return m.steerCh != nil +} + +// DrainSteer removes and returns all unconsumed steer messages. Called after +// a turn completes so the app layer can process any steer messages that +// arrived after the last PrepareStep fired (e.g. during a text-only response +// with no tool calls, or after the agent finished its last step). +func (m *Kit) DrainSteer() []string { + m.steerMu.Lock() + defer m.steerMu.Unlock() + + // First check leftover messages saved when generate() returned. + if len(m.leftoverSteer) > 0 { + msgs := m.leftoverSteer + m.leftoverSteer = nil + return msgs + } + + // If a turn is still active, drain from the live channel. + if m.steerCh != nil { + var msgs []string + for { + select { + case msg := <-m.steerCh: + msgs = append(msgs, msg) + default: + return msgs + } + } + } + + return nil +} + // PromptOptions configures a single PromptWithOptions call. type PromptOptions struct { // SystemMessage is prepended as a system message before the user prompt.