From 31ea80ec4f5e2c376897b6beb7ab4901e87c2115 Mon Sep 17 00:00:00 2001 From: Ed Zynda Date: Fri, 8 May 2026 11:30:26 +0300 Subject: [PATCH] fix(app): flush queued messages after /compact completes (#27) - Add releaseBusyAfterCompact() shared deferred tail used by both CompactConversation and CompactAsync. It drains the SDK steer channel, splices steer items in front of any queued prompts, and hands off to drainQueue so messages received during compaction are dispatched automatically once compaction finishes. - Previously, busy was simply cleared on completion and the queue sat idle until the user submitted another prompt, which then flushed everything together. - Honor the closed flag so a teardown during compaction discards pending items instead of spawning drainQueue against a torn-down App. - Add regression tests covering the queued-flush, idle-empty, and closed-during-compact paths. Fixes #27 --- internal/app/app.go | 89 +++++++++++++++++++++++++---- internal/app/app_test.go | 117 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 196 insertions(+), 10 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index bc4b3912..c15d2433 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -356,6 +356,10 @@ func (a *App) AddContextMessage(text string) { // tea.Program. customInstructions is optional text appended to the summary // prompt (e.g. "Focus on the API design decisions"). // +// Any prompts queued via Run/RunWithFiles or steering messages injected via +// Steer/SteerWithFiles while compaction is running are flushed automatically +// once compaction completes (see releaseBusyAfterCompact). +// // Satisfies ui.AppController. func (a *App) CompactConversation(customInstructions string) error { a.mu.Lock() @@ -377,11 +381,7 @@ func (a *App) CompactConversation(customInstructions string) error { go func() { defer a.wg.Done() - defer func() { - a.mu.Lock() - a.busy = false - a.mu.Unlock() - }() + defer a.releaseBusyAfterCompact() // Subscribe to SDK events for streaming compaction summary to the TUI. sendFn := func(msg tea.Msg) { @@ -420,6 +420,9 @@ func (a *App) CompactConversation(customInstructions string) error { // CompactAsync is like CompactConversation but calls onComplete/onError // callbacks instead of sending TUI events. Used by the extension API's // ctx.Compact() which needs callback-based notification. +// +// Like CompactConversation, any prompts/steer messages received during +// compaction are flushed automatically once compaction finishes. func (a *App) CompactAsync(customInstructions string, onComplete func(), onError func(string)) error { a.mu.Lock() if a.closed { @@ -440,11 +443,7 @@ func (a *App) CompactAsync(customInstructions string, onComplete func(), onError go func() { defer a.wg.Done() - defer func() { - a.mu.Lock() - a.busy = false - a.mu.Unlock() - }() + defer a.releaseBusyAfterCompact() // Subscribe to SDK events for streaming compaction summary to the TUI. sendFn := func(msg tea.Msg) { @@ -489,6 +488,76 @@ func (a *App) CompactAsync(customInstructions string, onComplete func(), onError return nil } +// releaseBusyAfterCompact is the deferred tail that runs at the end of every +// compaction goroutine (success, error, or panic-after-recover paths). It +// flips a.busy back to false, but before doing so it checks whether any +// prompts piled up while compaction was running: +// +// - Run/RunWithFiles append to a.queue when a.busy is set. +// - Steer/SteerWithFiles deposit messages into the SDK steer channel via +// Kit.InjectSteerWithFiles when a.busy is set. +// +// Without this hand-off the queue would sit idle until the user submits +// another prompt — see issue #27. If we find anything pending we keep busy +// set, splice the steer messages to the front of the queue, and start a +// fresh drainQueue goroutine to deliver them as a single batched turn. +func (a *App) releaseBusyAfterCompact() { + // Pull steer messages outside the app mutex; DrainSteer takes its own + // internal lock and we don't want to nest the two. + var steerItems []queueItem + if a.opts.Kit != nil { + if leftover := a.opts.Kit.DrainSteer(); len(leftover) > 0 { + steerItems = make([]queueItem, len(leftover)) + for i, sm := range leftover { + steerItems[i] = queueItem{Prompt: sm.Text, Files: sm.Files} + } + } + } + + a.mu.Lock() + // If the app was closed while compaction was running, drop everything + // and just clear busy. Run/Steer would have rejected new items already + // after Close(), but this guards against in-flight items that slipped + // in just before closed was set. + if a.closed { + a.queue = a.queue[:0] + a.busy = false + a.mu.Unlock() + return + } + + // Combine steer-channel items (front) with the in-memory queue (back). + // Steer messages are placed first so they retain their "act now" + // semantics relative to ordinary queued prompts that arrived later. + pending := append(steerItems, a.queue...) + a.queue = a.queue[:0] + + if len(pending) == 0 { + a.busy = false + a.mu.Unlock() + return + } + + // Hand off to drainQueue: it will pick up the first item directly and + // scoop the rest from a.queue on its first iteration. + first := pending[0] + if len(pending) > 1 { + a.queue = append(a.queue, pending[1:]...) + } + // Stay busy across the goroutine swap. + a.wg.Add(1) + a.mu.Unlock() + + // Notify the UI that steer-channel messages were consumed so the + // steering badge can clear; ordinary queued prompts will be reflected + // by the QueueUpdatedEvent that drainQueue emits as it picks them up. + if len(steerItems) > 0 { + a.sendEvent(SteerConsumedEvent{}) + } + + go a.drainQueue(first) +} + // -------------------------------------------------------------------------- // Non-interactive execution // -------------------------------------------------------------------------- diff --git a/internal/app/app_test.go b/internal/app/app_test.go index e0f2217a..f6582e95 100644 --- a/internal/app/app_test.go +++ b/internal/app/app_test.go @@ -763,3 +763,120 @@ func TestFormatMaxTokensTruncatedMessage_NoKit(t *testing.T) { } } } + +// -------------------------------------------------------------------------- +// releaseBusyAfterCompact (issue #27) +// -------------------------------------------------------------------------- + +// TestReleaseBusyAfterCompact_flushesQueuedMessages is a regression test for +// issue #27: messages queued via Run() while /compact is running used to sit +// in a.queue indefinitely until the user typed another prompt. After the fix +// the deferred releaseBusyAfterCompact tail picks up any pending items and +// dispatches drainQueue automatically. +// +// We simulate the compaction completion path directly (bypassing the SDK) +// by toggling busy=true, populating the queue exactly as Run() would have +// during compaction, and then invoking releaseBusyAfterCompact. +func TestReleaseBusyAfterCompact_flushesQueuedMessages(t *testing.T) { + stub := newStubWithFuncs( + func(ctx context.Context) (*kit.TurnResult, error) { + return turnResult("compacted then drained"), nil + }, + ) + app := newTestApp(stub) + defer app.Close() + + // Simulate the state at the start of the compaction tail: busy is set + // and a couple of prompts have piled up in the queue while we were + // summarising. (Run() would have appended them and returned a queue + // length > 0 to the caller.) + app.mu.Lock() + app.busy = true + app.queue = append(app.queue, + queueItem{Prompt: "queued during compact #1"}, + queueItem{Prompt: "queued during compact #2"}, + ) + app.mu.Unlock() + + // Invoke the deferred tail directly. It should kick off drainQueue. + app.releaseBusyAfterCompact() + + // drainQueue runs in a goroutine. Wait for the app to come back to idle. + ok := waitForCondition(2*time.Second, func() bool { + app.mu.Lock() + defer app.mu.Unlock() + return !app.busy + }) + if !ok { + t.Fatal("app did not become idle after releaseBusyAfterCompact: queue not drained") + } + + // Wait for any in-flight goroutine to finish before reading state. + app.wg.Wait() + + if got := app.QueueLength(); got != 0 { + t.Fatalf("expected empty queue after drain, got %d", got) + } + if n := stub.callCount(); n == 0 { + t.Fatalf("expected stub PromptFunc to fire at least once after compact, got %d calls", n) + } +} + +// TestReleaseBusyAfterCompact_idleWhenQueueEmpty verifies that with no +// pending messages the helper just clears busy and does NOT spawn a +// drainQueue goroutine (no spurious agent turn). +func TestReleaseBusyAfterCompact_idleWhenQueueEmpty(t *testing.T) { + stub := newStub() + app := newTestApp(stub) + defer app.Close() + + app.mu.Lock() + app.busy = true + app.mu.Unlock() + + app.releaseBusyAfterCompact() + + app.mu.Lock() + busy := app.busy + app.mu.Unlock() + if busy { + t.Fatal("expected busy=false after releaseBusyAfterCompact with empty queue") + } + + // Give any rogue goroutine a moment to (incorrectly) call PromptFunc. + time.Sleep(50 * time.Millisecond) + if n := stub.callCount(); n != 0 { + t.Fatalf("expected 0 PromptFunc calls when queue empty, got %d", n) + } +} + +// TestReleaseBusyAfterCompact_dropsQueueWhenClosed verifies that if the app +// was closed during compaction the helper discards any pending items rather +// than spawning drainQueue against a torn-down App. +func TestReleaseBusyAfterCompact_dropsQueueWhenClosed(t *testing.T) { + stub := newStub() + app := newTestApp(stub) + + app.mu.Lock() + app.busy = true + app.queue = append(app.queue, queueItem{Prompt: "would have run"}) + app.closed = true + app.mu.Unlock() + + app.releaseBusyAfterCompact() + + app.mu.Lock() + busy := app.busy + qLen := len(app.queue) + app.mu.Unlock() + if busy { + t.Fatal("expected busy=false even when closed") + } + if qLen != 0 { + t.Fatalf("expected queue cleared on closed app, got %d entries", qLen) + } + time.Sleep(20 * time.Millisecond) + if n := stub.callCount(); n != 0 { + t.Fatalf("expected 0 PromptFunc calls on closed app, got %d", n) + } +}