Merge pull request #28 from mark3labs/fix/27-queued-messages-after-compact

fix(app): flush queued messages after /compact completes (#27)
This commit is contained in:
Ed Zynda
2026-05-08 12:16:28 +03:00
committed by GitHub
2 changed files with 196 additions and 10 deletions
+79 -10
View File
@@ -356,6 +356,10 @@ func (a *App) AddContextMessage(text string) {
// tea.Program. customInstructions is optional text appended to the summary
// prompt (e.g. "Focus on the API design decisions").
//
// Any prompts queued via Run/RunWithFiles or steering messages injected via
// Steer/SteerWithFiles while compaction is running are flushed automatically
// once compaction completes (see releaseBusyAfterCompact).
//
// Satisfies ui.AppController.
func (a *App) CompactConversation(customInstructions string) error {
a.mu.Lock()
@@ -377,11 +381,7 @@ func (a *App) CompactConversation(customInstructions string) error {
go func() {
defer a.wg.Done()
defer func() {
a.mu.Lock()
a.busy = false
a.mu.Unlock()
}()
defer a.releaseBusyAfterCompact()
// Subscribe to SDK events for streaming compaction summary to the TUI.
sendFn := func(msg tea.Msg) {
@@ -420,6 +420,9 @@ func (a *App) CompactConversation(customInstructions string) error {
// CompactAsync is like CompactConversation but calls onComplete/onError
// callbacks instead of sending TUI events. Used by the extension API's
// ctx.Compact() which needs callback-based notification.
//
// Like CompactConversation, any prompts/steer messages received during
// compaction are flushed automatically once compaction finishes.
func (a *App) CompactAsync(customInstructions string, onComplete func(), onError func(string)) error {
a.mu.Lock()
if a.closed {
@@ -440,11 +443,7 @@ func (a *App) CompactAsync(customInstructions string, onComplete func(), onError
go func() {
defer a.wg.Done()
defer func() {
a.mu.Lock()
a.busy = false
a.mu.Unlock()
}()
defer a.releaseBusyAfterCompact()
// Subscribe to SDK events for streaming compaction summary to the TUI.
sendFn := func(msg tea.Msg) {
@@ -489,6 +488,76 @@ func (a *App) CompactAsync(customInstructions string, onComplete func(), onError
return nil
}
// releaseBusyAfterCompact is the deferred tail that runs at the end of every
// compaction goroutine (success, error, or panic-after-recover paths). It
// flips a.busy back to false, but before doing so it checks whether any
// prompts piled up while compaction was running:
//
// - Run/RunWithFiles append to a.queue when a.busy is set.
// - Steer/SteerWithFiles deposit messages into the SDK steer channel via
// Kit.InjectSteerWithFiles when a.busy is set.
//
// Without this hand-off the queue would sit idle until the user submits
// another prompt — see issue #27. If we find anything pending we keep busy
// set, splice the steer messages to the front of the queue, and start a
// fresh drainQueue goroutine to deliver them as a single batched turn.
func (a *App) releaseBusyAfterCompact() {
// Pull steer messages outside the app mutex; DrainSteer takes its own
// internal lock and we don't want to nest the two.
var steerItems []queueItem
if a.opts.Kit != nil {
if leftover := a.opts.Kit.DrainSteer(); len(leftover) > 0 {
steerItems = make([]queueItem, len(leftover))
for i, sm := range leftover {
steerItems[i] = queueItem{Prompt: sm.Text, Files: sm.Files}
}
}
}
a.mu.Lock()
// If the app was closed while compaction was running, drop everything
// and just clear busy. Run/Steer would have rejected new items already
// after Close(), but this guards against in-flight items that slipped
// in just before closed was set.
if a.closed {
a.queue = a.queue[:0]
a.busy = false
a.mu.Unlock()
return
}
// Combine steer-channel items (front) with the in-memory queue (back).
// Steer messages are placed first so they retain their "act now"
// semantics relative to ordinary queued prompts that arrived later.
pending := append(steerItems, a.queue...)
a.queue = a.queue[:0]
if len(pending) == 0 {
a.busy = false
a.mu.Unlock()
return
}
// Hand off to drainQueue: it will pick up the first item directly and
// scoop the rest from a.queue on its first iteration.
first := pending[0]
if len(pending) > 1 {
a.queue = append(a.queue, pending[1:]...)
}
// Stay busy across the goroutine swap.
a.wg.Add(1)
a.mu.Unlock()
// Notify the UI that steer-channel messages were consumed so the
// steering badge can clear; ordinary queued prompts will be reflected
// by the QueueUpdatedEvent that drainQueue emits as it picks them up.
if len(steerItems) > 0 {
a.sendEvent(SteerConsumedEvent{})
}
go a.drainQueue(first)
}
// --------------------------------------------------------------------------
// Non-interactive execution
// --------------------------------------------------------------------------
+117
View File
@@ -763,3 +763,120 @@ func TestFormatMaxTokensTruncatedMessage_NoKit(t *testing.T) {
}
}
}
// --------------------------------------------------------------------------
// releaseBusyAfterCompact (issue #27)
// --------------------------------------------------------------------------
// TestReleaseBusyAfterCompact_flushesQueuedMessages is a regression test for
// issue #27: messages queued via Run() while /compact is running used to sit
// in a.queue indefinitely until the user typed another prompt. After the fix
// the deferred releaseBusyAfterCompact tail picks up any pending items and
// dispatches drainQueue automatically.
//
// We simulate the compaction completion path directly (bypassing the SDK)
// by toggling busy=true, populating the queue exactly as Run() would have
// during compaction, and then invoking releaseBusyAfterCompact.
func TestReleaseBusyAfterCompact_flushesQueuedMessages(t *testing.T) {
stub := newStubWithFuncs(
func(ctx context.Context) (*kit.TurnResult, error) {
return turnResult("compacted then drained"), nil
},
)
app := newTestApp(stub)
defer app.Close()
// Simulate the state at the start of the compaction tail: busy is set
// and a couple of prompts have piled up in the queue while we were
// summarising. (Run() would have appended them and returned a queue
// length > 0 to the caller.)
app.mu.Lock()
app.busy = true
app.queue = append(app.queue,
queueItem{Prompt: "queued during compact #1"},
queueItem{Prompt: "queued during compact #2"},
)
app.mu.Unlock()
// Invoke the deferred tail directly. It should kick off drainQueue.
app.releaseBusyAfterCompact()
// drainQueue runs in a goroutine. Wait for the app to come back to idle.
ok := waitForCondition(2*time.Second, func() bool {
app.mu.Lock()
defer app.mu.Unlock()
return !app.busy
})
if !ok {
t.Fatal("app did not become idle after releaseBusyAfterCompact: queue not drained")
}
// Wait for any in-flight goroutine to finish before reading state.
app.wg.Wait()
if got := app.QueueLength(); got != 0 {
t.Fatalf("expected empty queue after drain, got %d", got)
}
if n := stub.callCount(); n == 0 {
t.Fatalf("expected stub PromptFunc to fire at least once after compact, got %d calls", n)
}
}
// TestReleaseBusyAfterCompact_idleWhenQueueEmpty verifies that with no
// pending messages the helper just clears busy and does NOT spawn a
// drainQueue goroutine (no spurious agent turn).
func TestReleaseBusyAfterCompact_idleWhenQueueEmpty(t *testing.T) {
stub := newStub()
app := newTestApp(stub)
defer app.Close()
app.mu.Lock()
app.busy = true
app.mu.Unlock()
app.releaseBusyAfterCompact()
app.mu.Lock()
busy := app.busy
app.mu.Unlock()
if busy {
t.Fatal("expected busy=false after releaseBusyAfterCompact with empty queue")
}
// Give any rogue goroutine a moment to (incorrectly) call PromptFunc.
time.Sleep(50 * time.Millisecond)
if n := stub.callCount(); n != 0 {
t.Fatalf("expected 0 PromptFunc calls when queue empty, got %d", n)
}
}
// TestReleaseBusyAfterCompact_dropsQueueWhenClosed verifies that if the app
// was closed during compaction the helper discards any pending items rather
// than spawning drainQueue against a torn-down App.
func TestReleaseBusyAfterCompact_dropsQueueWhenClosed(t *testing.T) {
stub := newStub()
app := newTestApp(stub)
app.mu.Lock()
app.busy = true
app.queue = append(app.queue, queueItem{Prompt: "would have run"})
app.closed = true
app.mu.Unlock()
app.releaseBusyAfterCompact()
app.mu.Lock()
busy := app.busy
qLen := len(app.queue)
app.mu.Unlock()
if busy {
t.Fatal("expected busy=false even when closed")
}
if qLen != 0 {
t.Fatalf("expected queue cleared on closed app, got %d entries", qLen)
}
time.Sleep(20 * time.Millisecond)
if n := stub.callCount(); n != 0 {
t.Fatalf("expected 0 PromptFunc calls on closed app, got %d", n)
}
}