Files
kit/internal/app/app_test.go
Ed Zynda 8823977612 test(app): cover steer-drain branch of releaseBusyAfterCompact
- Add unexported steerDrainFn test seam on App so unit tests can
  inject fake steer items without standing up a full *kit.Kit
  (Options.Kit is a concrete struct, not an interface).
- releaseBusyAfterCompact now prefers the seam over Kit.DrainSteer
  via a small switch; production behaviour is unchanged when the
  field is nil.
- Add TestReleaseBusyAfterCompact_splicesSteerAheadOfQueue, which
  pre-populates both fake steer items and ordinary queue prompts,
  invokes releaseBusyAfterCompact, and asserts the first dispatched
  prompt is the steer item — proving steer messages retain 'act now'
  priority and that drainQueue is actually launched (the bug from
  #27).
2026-05-08 12:18:52 +03:00

972 lines
28 KiB
Go

package app
import (
"context"
"errors"
"strings"
"sync"
"testing"
"time"
tea "charm.land/bubbletea/v2"
kit "github.com/mark3labs/kit/pkg/kit"
)
// --------------------------------------------------------------------------
// Helpers
// --------------------------------------------------------------------------
type usageUpdaterStub struct {
mu sync.Mutex
updateCalls int
estimateCalls int
contextCalls int
lastUpdateInput int
lastUpdateOutput int
lastUpdateCacheRead int
lastUpdateCacheWrite int
lastContextTokens int
lastEstimateInput string
lastEstimateOutput string
}
func (s *usageUpdaterStub) UpdateUsage(inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens int) {
s.mu.Lock()
defer s.mu.Unlock()
s.updateCalls++
s.lastUpdateInput = inputTokens
s.lastUpdateOutput = outputTokens
s.lastUpdateCacheRead = cacheReadTokens
s.lastUpdateCacheWrite = cacheWriteTokens
}
func (s *usageUpdaterStub) EstimateAndUpdateUsage(inputText, outputText string) {
s.mu.Lock()
defer s.mu.Unlock()
s.estimateCalls++
s.lastEstimateInput = inputText
s.lastEstimateOutput = outputText
}
func (s *usageUpdaterStub) SetContextTokens(tokens int) {
s.mu.Lock()
defer s.mu.Unlock()
s.contextCalls++
s.lastContextTokens = tokens
}
// turnResult builds a minimal TurnResult with response text t.
func turnResult(t string) *kit.TurnResult {
return &kit.TurnResult{Response: t}
}
// stubPromptFunc returns a PromptFunc that invokes successive functions from
// fns. Each function can block, return errors, etc. If fns is exhausted, a
// default success result is returned.
type stubPrompt struct {
mu sync.Mutex
fns []func(ctx context.Context) (*kit.TurnResult, error)
callN int
blockCh chan struct{} // if non-nil, each call blocks until a value arrives
}
func (s *stubPrompt) fn(ctx context.Context, _ string) (*kit.TurnResult, error) {
if s.blockCh != nil {
select {
case <-s.blockCh:
case <-ctx.Done():
return nil, ctx.Err()
}
}
s.mu.Lock()
idx := s.callN
s.callN++
s.mu.Unlock()
if idx < len(s.fns) {
return s.fns[idx](ctx)
}
return turnResult("default response"), nil
}
func (s *stubPrompt) callCount() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.callN
}
// newTestApp creates an App wired with the given stub prompt function.
func newTestApp(s *stubPrompt) *App {
return New(Options{PromptFunc: s.fn}, nil)
}
// newStub creates a stubPrompt that returns the given results in order.
func newStub(results ...string) *stubPrompt {
s := &stubPrompt{}
for _, r := range results {
s.fns = append(s.fns, func(_ context.Context) (*kit.TurnResult, error) {
return turnResult(r), nil
})
}
return s
}
// newStubWithFuncs creates a stubPrompt whose calls are governed by arbitrary
// functions (each may inspect ctx, block, return errors, etc.).
func newStubWithFuncs(fns ...func(ctx context.Context) (*kit.TurnResult, error)) *stubPrompt {
return &stubPrompt{fns: fns}
}
// waitForCondition polls fn() up to maxWait, returning true if fn returns true
// before the deadline.
func waitForCondition(maxWait time.Duration, fn func() bool) bool {
deadline := time.Now().Add(maxWait)
for time.Now().Before(deadline) {
if fn() {
return true
}
time.Sleep(5 * time.Millisecond)
}
return false
}
// --------------------------------------------------------------------------
// Run (single prompt)
// --------------------------------------------------------------------------
// TestRun_single verifies that a single call to Run() executes the prompt
// and transitions the app back to idle (busy==false).
func TestRun_single(t *testing.T) {
stub := newStub("hello")
app := newTestApp(stub)
defer app.Close()
app.Run("hello world")
ok := waitForCondition(2*time.Second, func() bool {
app.mu.Lock()
defer app.mu.Unlock()
return !app.busy
})
if !ok {
t.Fatal("app did not become idle within 2s after single Run()")
}
if got := stub.callCount(); got != 1 {
t.Fatalf("expected 1 call, got %d", got)
}
}
// --------------------------------------------------------------------------
// Run (queued prompts)
// --------------------------------------------------------------------------
// TestRun_queued verifies that queued prompts are batched together and submitted
// as a single agent turn rather than individually.
func TestRun_queued(t *testing.T) {
gate := make(chan struct{})
callCount := 0
var mu sync.Mutex
stub := newStubWithFuncs(
func(ctx context.Context) (*kit.TurnResult, error) {
mu.Lock()
callCount++
mu.Unlock()
<-gate
return turnResult("batch result"), nil
},
)
app := newTestApp(stub)
defer app.Close()
app.Run("first prompt")
time.Sleep(20 * time.Millisecond)
app.Run("second prompt")
if got := app.QueueLength(); got != 1 {
t.Fatalf("expected queue length 1, got %d", got)
}
close(gate)
ok := waitForCondition(3*time.Second, func() bool {
app.mu.Lock()
defer app.mu.Unlock()
return !app.busy
})
if !ok {
t.Fatal("app did not become idle within 3s after queued runs")
}
// Wait for the goroutine to fully finish (avoid race with queue check)
app.wg.Wait()
mu.Lock()
total := callCount
mu.Unlock()
// With batching, both prompts should be processed in a single call
if total != 1 {
t.Fatalf("expected 1 batched call, got %d", total)
}
if got := app.QueueLength(); got != 0 {
t.Fatalf("expected empty queue after drain, got %d", got)
}
}
// --------------------------------------------------------------------------
// Queue drain ordering
// --------------------------------------------------------------------------
// TestQueueDrainOrdering verifies that queued prompts are batched together and
// processed in a single agent turn.
func TestQueueDrainOrdering(t *testing.T) {
gate := make(chan struct{})
var receivedPrompt string
var mu sync.Mutex
stub := newStubWithFuncs(
func(ctx context.Context) (*kit.TurnResult, error) {
mu.Lock()
// In test mode with PromptFunc, we receive the first prompt
// but all messages are batched together
receivedPrompt = "batched"
mu.Unlock()
<-gate
return turnResult("batch result"), nil
},
)
app := newTestApp(stub)
defer app.Close()
app.Run("first")
time.Sleep(20 * time.Millisecond)
app.Run("second")
app.Run("third")
close(gate)
ok := waitForCondition(3*time.Second, func() bool {
app.mu.Lock()
defer app.mu.Unlock()
return !app.busy
})
if !ok {
t.Fatal("app did not become idle within 3s")
}
mu.Lock()
got := receivedPrompt
mu.Unlock()
// With batching, all 3 prompts should be processed in a single call
if got != "batched" {
t.Fatalf("expected batched processing, got %q", got)
}
}
// --------------------------------------------------------------------------
// CancelCurrentStep
// --------------------------------------------------------------------------
// TestCancelCurrentStep_cancelsInflightStep verifies that CancelCurrentStep()
// causes an in-flight step to receive a cancelled context and the app
// eventually transitions to idle.
func TestCancelCurrentStep_cancelsInflightStep(t *testing.T) {
started := make(chan struct{}, 1)
stub := newStubWithFuncs(
func(ctx context.Context) (*kit.TurnResult, error) {
started <- struct{}{}
<-ctx.Done()
return nil, ctx.Err()
},
)
app := newTestApp(stub)
defer app.Close()
app.Run("cancel me")
select {
case <-started:
case <-time.After(2 * time.Second):
t.Fatal("step never started")
}
app.CancelCurrentStep()
ok := waitForCondition(2*time.Second, func() bool {
app.mu.Lock()
defer app.mu.Unlock()
return !app.busy
})
if !ok {
t.Fatal("app did not become idle after CancelCurrentStep()")
}
}
// TestCancelCurrentStep_safeWhenIdle verifies that calling CancelCurrentStep()
// when no step is in-flight is a no-op and does not panic.
func TestCancelCurrentStep_safeWhenIdle(t *testing.T) {
app := newTestApp(newStub())
defer app.Close()
app.CancelCurrentStep()
}
// --------------------------------------------------------------------------
// ClearQueue
// --------------------------------------------------------------------------
// TestClearQueue_removesQueuedPrompts verifies that ClearQueue() removes all
// enqueued prompts and resets queue length to zero.
func TestClearQueue_removesQueuedPrompts(t *testing.T) {
gate := make(chan struct{})
stub := newStubWithFuncs(
func(ctx context.Context) (*kit.TurnResult, error) {
<-gate
return turnResult("first"), nil
},
)
app := newTestApp(stub)
defer app.Close()
app.Run("first")
time.Sleep(20 * time.Millisecond)
app.Run("second")
app.Run("third")
if got := app.QueueLength(); got != 2 {
t.Fatalf("expected queue length 2 before clear, got %d", got)
}
app.ClearQueue()
if got := app.QueueLength(); got != 0 {
t.Fatalf("expected queue length 0 after ClearQueue(), got %d", got)
}
close(gate)
ok := waitForCondition(2*time.Second, func() bool {
app.mu.Lock()
defer app.mu.Unlock()
return !app.busy
})
if !ok {
t.Fatal("app did not become idle after ClearQueue + first step complete")
}
}
// --------------------------------------------------------------------------
// Close
// --------------------------------------------------------------------------
// TestClose_preventsNewRuns verifies that after Close() is called, subsequent
// Run() calls are silently dropped (no goroutine spawned).
func TestClose_preventsNewRuns(t *testing.T) {
stub := newStub()
app := newTestApp(stub)
app.Close()
app.Run("should be dropped")
time.Sleep(50 * time.Millisecond)
if got := stub.callCount(); got != 0 {
t.Fatalf("expected 0 calls after Close(), got %d", got)
}
}
// TestClose_waitsForInflightStep verifies that Close() blocks until any in-flight
// step completes, ensuring the WaitGroup is properly tracked.
func TestClose_waitsForInflightStep(t *testing.T) {
gate := make(chan struct{})
stepFinished := make(chan struct{}, 1)
stub := newStubWithFuncs(
func(_ context.Context) (*kit.TurnResult, error) {
<-gate
stepFinished <- struct{}{}
return turnResult("done"), nil
},
)
app := newTestApp(stub)
app.Run("in-flight")
time.Sleep(20 * time.Millisecond)
closeDone := make(chan struct{})
go func() {
close(gate)
app.Close()
close(closeDone)
}()
select {
case <-closeDone:
select {
case <-stepFinished:
default:
t.Error("Close() returned before step finished")
}
case <-time.After(3 * time.Second):
t.Fatal("Close() timed out waiting for in-flight step")
}
}
// TestClose_idempotent verifies that calling Close() multiple times does not
// panic or deadlock.
func TestClose_idempotent(t *testing.T) {
app := newTestApp(newStub())
app.Close()
app.Close()
}
// TestClose_cancelsInflightStep verifies that Close() cancels the root context,
// causing a blocking step to unblock via ctx.Done().
func TestClose_cancelsInflightStep(t *testing.T) {
started := make(chan struct{}, 1)
stub := newStubWithFuncs(
func(ctx context.Context) (*kit.TurnResult, error) {
started <- struct{}{}
<-ctx.Done()
return nil, ctx.Err()
},
)
app := newTestApp(stub)
app.Run("in-flight")
select {
case <-started:
case <-time.After(2 * time.Second):
t.Fatal("step never started")
}
closeDone := make(chan struct{})
go func() {
app.Close()
close(closeDone)
}()
select {
case <-closeDone:
case <-time.After(3 * time.Second):
t.Fatal("Close() timed out after cancelling in-flight step")
}
}
// --------------------------------------------------------------------------
// StepError handling
// --------------------------------------------------------------------------
// TestRun_stepError verifies that when the prompt returns an error, the app
// transitions back to idle (not stuck in busy state).
func TestRun_stepError(t *testing.T) {
stub := newStubWithFuncs(
func(_ context.Context) (*kit.TurnResult, error) {
return nil, errors.New("agent exploded")
},
)
app := newTestApp(stub)
defer app.Close()
app.Run("trigger error")
ok := waitForCondition(2*time.Second, func() bool {
app.mu.Lock()
defer app.mu.Unlock()
return !app.busy
})
if !ok {
t.Fatal("app stuck in busy state after step error")
}
}
// --------------------------------------------------------------------------
// ClearMessages
// --------------------------------------------------------------------------
// TestClearMessages_emptiesStore verifies that ClearMessages() empties the store.
func TestClearMessages_emptiesStore(t *testing.T) {
app := newTestApp(newStub())
defer app.Close()
app.store.Add(makeTextMsg("user", "hello"))
if app.store.Len() != 1 {
t.Fatalf("expected 1 message before clear, got %d", app.store.Len())
}
app.ClearMessages()
if app.store.Len() != 0 {
t.Fatalf("expected 0 messages after ClearMessages(), got %d", app.store.Len())
}
}
// --------------------------------------------------------------------------
// QueueLength
// --------------------------------------------------------------------------
// TestQueueLength_reflects verifies that QueueLength() accurately reflects
// the queue depth.
func TestQueueLength_reflects(t *testing.T) {
app := newTestApp(newStub())
defer app.Close()
if got := app.QueueLength(); got != 0 {
t.Fatalf("expected 0, got %d", got)
}
app.mu.Lock()
app.queue = append(app.queue,
queueItem{Prompt: "a"},
queueItem{Prompt: "b"},
queueItem{Prompt: "c"},
)
app.mu.Unlock()
if got := app.QueueLength(); got != 3 {
t.Fatalf("expected 3, got %d", got)
}
}
// TestRecordStepUsage_updatesTracker verifies that per-step usage updates are
// recorded immediately for cost tracking. Context tokens are also updated so
// the status bar reflects context fill after every LLM call in a multi-step
// turn, not just at the end.
func TestRecordStepUsage_updatesTracker(t *testing.T) {
usage := &usageUpdaterStub{}
app := New(Options{UsageTracker: usage}, nil)
defer app.Close()
app.recordStepUsage(kit.StepUsageEvent{
InputTokens: 120,
OutputTokens: 45,
CacheReadTokens: 5,
CacheWriteTokens: 2,
}, nil, nil)
usage.mu.Lock()
defer usage.mu.Unlock()
if usage.updateCalls != 1 {
t.Fatalf("expected 1 update call, got %d", usage.updateCalls)
}
if usage.lastUpdateInput != 120 || usage.lastUpdateOutput != 45 || usage.lastUpdateCacheRead != 5 || usage.lastUpdateCacheWrite != 2 {
t.Fatalf("unexpected usage update payload: in=%d out=%d cache_read=%d cache_write=%d",
usage.lastUpdateInput, usage.lastUpdateOutput, usage.lastUpdateCacheRead, usage.lastUpdateCacheWrite)
}
// Context tokens should now be updated per-step (Input + CacheRead + CacheWrite + Output).
if usage.contextCalls != 1 {
t.Fatalf("expected 1 context token update from recordStepUsage, got %d", usage.contextCalls)
}
expectedContext := 120 + 45 + 5 + 2
if usage.lastContextTokens != expectedContext {
t.Fatalf("expected context tokens %d, got %d", expectedContext, usage.lastContextTokens)
}
}
// TestUpdateUsageFromTurnResult_skipsTotalsWhenStepUsageSeen ensures we avoid
// double-counting totals once StepUsageEvent-based updates were already applied.
func TestUpdateUsageFromTurnResult_skipsTotalsWhenStepUsageSeen(t *testing.T) {
usage := &usageUpdaterStub{}
app := New(Options{UsageTracker: usage}, nil)
defer app.Close()
app.updateUsageFromTurnResult(&kit.TurnResult{
Response: "ok",
TotalUsage: &kit.LLMUsage{
InputTokens: 999,
OutputTokens: 111,
CacheReadTokens: 7,
CacheCreationTokens: 3,
},
FinalUsage: &kit.LLMUsage{InputTokens: 456},
}, "prompt", true)
usage.mu.Lock()
defer usage.mu.Unlock()
if usage.updateCalls != 0 {
t.Fatalf("expected no total usage update when sawStepUsage=true, got %d", usage.updateCalls)
}
if usage.estimateCalls != 0 {
t.Fatalf("expected no estimate update when sawStepUsage=true, got %d", usage.estimateCalls)
}
// Context tokens should be InputTokens only (456)
if usage.contextCalls != 1 || usage.lastContextTokens != 456 {
t.Fatalf("expected final context tokens=456 (InputTokens only), got calls=%d tokens=%d", usage.contextCalls, usage.lastContextTokens)
}
}
// TestUpdateUsageFromTurnResult_recordsWhenInputTokensZero verifies that usage
// is recorded when InputTokens=0 but OutputTokens>0 (OpenAI-compatible cache behavior).
func TestUpdateUsageFromTurnResult_recordsWhenInputTokensZero(t *testing.T) {
usage := &usageUpdaterStub{}
app := New(Options{UsageTracker: usage}, nil)
defer app.Close()
// Simulate OpenAI-compatible behavior: all prompt tokens cached, InputTokens=0
app.updateUsageFromTurnResult(&kit.TurnResult{
Response: "ok",
TotalUsage: &kit.LLMUsage{
InputTokens: 0, // All cached - subtracted from prompt
OutputTokens: 150, // Actual generated tokens
CacheReadTokens: 500, // Cache hit
CacheCreationTokens: 0,
},
FinalUsage: &kit.LLMUsage{InputTokens: 0, OutputTokens: 150},
}, "prompt", false)
usage.mu.Lock()
defer usage.mu.Unlock()
if usage.updateCalls != 1 {
t.Fatalf("expected 1 update call when InputTokens=0 but OutputTokens>0, got %d", usage.updateCalls)
}
if usage.lastUpdateInput != 0 || usage.lastUpdateOutput != 150 {
t.Fatalf("expected input=0 output=150, got input=%d output=%d",
usage.lastUpdateInput, usage.lastUpdateOutput)
}
if usage.lastUpdateCacheRead != 500 {
t.Fatalf("expected cache_read=500, got %d", usage.lastUpdateCacheRead)
}
}
// TestUpdateUsageFromTurnResult_contextTokensUsesAllCategories verifies that
// context window fill uses all token categories from the final API call:
// InputTokens + CacheReadTokens + CacheCreationTokens + OutputTokens.
// With Anthropic prompt caching, InputTokens can be near-zero while
// CacheReadTokens holds the bulk of the context.
func TestUpdateUsageFromTurnResult_contextTokensUsesAllCategories(t *testing.T) {
usage := &usageUpdaterStub{}
app := New(Options{UsageTracker: usage}, nil)
defer app.Close()
app.updateUsageFromTurnResult(&kit.TurnResult{
Response: "ok",
TotalUsage: &kit.LLMUsage{
InputTokens: 3,
OutputTokens: 5,
CacheReadTokens: 0,
CacheCreationTokens: 4317,
},
FinalUsage: &kit.LLMUsage{
InputTokens: 3, // Non-cached input (small with caching)
OutputTokens: 5, // Assistant output
CacheReadTokens: 0, // No cache reads on first call
CacheCreationTokens: 4317, // System prompt + tools written to cache
},
}, "prompt", false)
usage.mu.Lock()
defer usage.mu.Unlock()
// Context tokens should be Input + CacheRead + CacheCreate + Output = 4325
expected := 3 + 0 + 4317 + 5
if usage.contextCalls != 1 || usage.lastContextTokens != expected {
t.Fatalf("expected context tokens=%d (all categories), got calls=%d tokens=%d",
expected, usage.contextCalls, usage.lastContextTokens)
}
}
// TestHandleTurnEnd_LengthEmitsWarning verifies that when the SDK reports a
// FinishReasonLength (max_output_tokens hit), the app surfaces a user-visible
// ExtensionPrintEvent with Level="info" so the TUI can render a banner
// instead of silently showing a truncated reply.
func TestHandleTurnEnd_LengthEmitsWarning(t *testing.T) {
app := New(Options{}, nil)
defer app.Close()
var mu sync.Mutex
var received []tea.Msg
sendFn := func(m tea.Msg) {
mu.Lock()
defer mu.Unlock()
received = append(received, m)
}
app.handleTurnEnd(kit.TurnEndEvent{StopReason: kit.FinishReasonLength}, sendFn)
mu.Lock()
defer mu.Unlock()
if len(received) != 1 {
t.Fatalf("expected 1 event on length stop, got %d", len(received))
}
ev, ok := received[0].(ExtensionPrintEvent)
if !ok {
t.Fatalf("expected ExtensionPrintEvent, got %T", received[0])
}
if ev.Level != "info" {
t.Errorf("expected Level=info, got %q", ev.Level)
}
if ev.Text == "" {
t.Error("expected non-empty warning text")
}
if !strings.Contains(ev.Text, "max_output_tokens") {
t.Errorf("warning text should mention max_output_tokens, got: %s", ev.Text)
}
}
// TestHandleTurnEnd_NonLengthIgnored verifies that ordinary stop reasons
// (stop, tool-calls, error, unknown, "") do not produce a warning banner.
func TestHandleTurnEnd_NonLengthIgnored(t *testing.T) {
app := New(Options{}, nil)
defer app.Close()
reasons := []string{
kit.FinishReasonStop,
kit.FinishReasonToolCalls,
kit.FinishReasonError,
kit.FinishReasonContentFilter,
kit.FinishReasonOther,
kit.FinishReasonUnknown,
"",
}
for _, r := range reasons {
var called bool
app.handleTurnEnd(kit.TurnEndEvent{StopReason: r}, func(m tea.Msg) {
called = true
})
if called {
t.Errorf("stop reason %q unexpectedly emitted a warning", r)
}
}
}
// TestHandleTurnEnd_NilSendFn guards against panics when no TUI listener is
// attached (e.g. early init or headless teardown).
func TestHandleTurnEnd_NilSendFn(t *testing.T) {
app := New(Options{}, nil)
defer app.Close()
// Should not panic with a nil sendFn.
app.handleTurnEnd(kit.TurnEndEvent{StopReason: kit.FinishReasonLength}, nil)
}
// TestFormatMaxTokensTruncatedMessage_NoKit verifies the fallback message
// when Options.Kit is nil (test/stub path).
func TestFormatMaxTokensTruncatedMessage_NoKit(t *testing.T) {
app := New(Options{}, nil)
defer app.Close()
msg := app.formatMaxTokensTruncatedMessage()
if msg == "" {
t.Fatal("expected non-empty fallback message")
}
for _, needle := range []string{"max_output_tokens", "--max-tokens", "KIT_MAX_TOKENS", "modelSettings"} {
if !strings.Contains(msg, needle) {
t.Errorf("fallback message missing %q:\n%s", needle, msg)
}
}
}
// --------------------------------------------------------------------------
// releaseBusyAfterCompact (issue #27)
// --------------------------------------------------------------------------
// TestReleaseBusyAfterCompact_flushesQueuedMessages is a regression test for
// issue #27: messages queued via Run() while /compact is running used to sit
// in a.queue indefinitely until the user typed another prompt. After the fix
// the deferred releaseBusyAfterCompact tail picks up any pending items and
// dispatches drainQueue automatically.
//
// We simulate the compaction completion path directly (bypassing the SDK)
// by toggling busy=true, populating the queue exactly as Run() would have
// during compaction, and then invoking releaseBusyAfterCompact.
func TestReleaseBusyAfterCompact_flushesQueuedMessages(t *testing.T) {
stub := newStubWithFuncs(
func(ctx context.Context) (*kit.TurnResult, error) {
return turnResult("compacted then drained"), nil
},
)
app := newTestApp(stub)
defer app.Close()
// Simulate the state at the start of the compaction tail: busy is set
// and a couple of prompts have piled up in the queue while we were
// summarising. (Run() would have appended them and returned a queue
// length > 0 to the caller.)
app.mu.Lock()
app.busy = true
app.queue = append(app.queue,
queueItem{Prompt: "queued during compact #1"},
queueItem{Prompt: "queued during compact #2"},
)
app.mu.Unlock()
// Invoke the deferred tail directly. It should kick off drainQueue.
app.releaseBusyAfterCompact()
// drainQueue runs in a goroutine. Wait for the app to come back to idle.
ok := waitForCondition(2*time.Second, func() bool {
app.mu.Lock()
defer app.mu.Unlock()
return !app.busy
})
if !ok {
t.Fatal("app did not become idle after releaseBusyAfterCompact: queue not drained")
}
// Wait for any in-flight goroutine to finish before reading state.
app.wg.Wait()
if got := app.QueueLength(); got != 0 {
t.Fatalf("expected empty queue after drain, got %d", got)
}
if n := stub.callCount(); n == 0 {
t.Fatalf("expected stub PromptFunc to fire at least once after compact, got %d calls", n)
}
}
// TestReleaseBusyAfterCompact_idleWhenQueueEmpty verifies that with no
// pending messages the helper just clears busy and does NOT spawn a
// drainQueue goroutine (no spurious agent turn).
func TestReleaseBusyAfterCompact_idleWhenQueueEmpty(t *testing.T) {
stub := newStub()
app := newTestApp(stub)
defer app.Close()
app.mu.Lock()
app.busy = true
app.mu.Unlock()
app.releaseBusyAfterCompact()
app.mu.Lock()
busy := app.busy
app.mu.Unlock()
if busy {
t.Fatal("expected busy=false after releaseBusyAfterCompact with empty queue")
}
// Give any rogue goroutine a moment to (incorrectly) call PromptFunc.
time.Sleep(50 * time.Millisecond)
if n := stub.callCount(); n != 0 {
t.Fatalf("expected 0 PromptFunc calls when queue empty, got %d", n)
}
}
// TestReleaseBusyAfterCompact_splicesSteerAheadOfQueue exercises the SDK
// steer-drain branch of releaseBusyAfterCompact (issue #27 follow-up).
//
// Production wires a.opts.Kit.DrainSteer() to pull messages that arrived via
// Steer/SteerWithFiles during compaction, but Options.Kit is *kit.Kit (a
// concrete struct) so unit tests cannot stand up a real instance without a
// full LLM backend. The test uses the unexported steerDrainFn seam to inject
// fake steer items, then asserts that:
//
// - Steer items are dispatched ahead of any prompts that piled up in
// a.queue (steer retains "act now" priority over ordinary queued
// prompts), and
// - the helper still hands off to drainQueue so the steer item actually
// fires (the previous behaviour left them stranded — see #27).
func TestReleaseBusyAfterCompact_splicesSteerAheadOfQueue(t *testing.T) {
var pmu sync.Mutex
var firstPrompt string
stub := newStubWithFuncs(
func(ctx context.Context) (*kit.TurnResult, error) {
return turnResult("steer dispatched"), nil
},
)
// Wrap PromptFunc so we can capture the prompt text the stub receives
// (newStubWithFuncs's fns ignore prompt; we need it to verify ordering).
capturingPrompt := func(ctx context.Context, prompt string) (*kit.TurnResult, error) {
pmu.Lock()
if firstPrompt == "" {
firstPrompt = prompt
}
pmu.Unlock()
return stub.fn(ctx, prompt)
}
app := New(Options{PromptFunc: capturingPrompt}, nil)
defer app.Close()
// Inject fake steer items via the test seam. In production the same
// items would have been delivered through Kit.InjectSteerWithFiles
// during /compact and pulled by DrainSteer here.
app.steerDrainFn = func() []queueItem {
return []queueItem{
{Prompt: "steer-1"},
{Prompt: "steer-2"},
}
}
// Simulate the state at the end of compaction: busy is set and a couple
// of regular Run() prompts have piled up after the steer messages.
app.mu.Lock()
app.busy = true
app.queue = append(app.queue,
queueItem{Prompt: "queued-1"},
queueItem{Prompt: "queued-2"},
)
app.mu.Unlock()
app.releaseBusyAfterCompact()
// Wait for the dispatched batch to complete.
ok := waitForCondition(2*time.Second, func() bool {
app.mu.Lock()
defer app.mu.Unlock()
return !app.busy
})
if !ok {
t.Fatal("app did not become idle after steer-spliced releaseBusyAfterCompact")
}
app.wg.Wait()
// drainQueue picks up `first` directly and batches the rest. With
// PromptFunc set, executeBatch invokes us with items[0] only — that
// item must be the first steer message, proving steer items were
// spliced ahead of the previously queued prompts.
pmu.Lock()
got := firstPrompt
pmu.Unlock()
if got != "steer-1" {
t.Fatalf("expected first dispatched prompt to be steer item %q (steer items must come before queued prompts), got %q",
"steer-1", got)
}
// Queue should be fully drained and PromptFunc must have actually fired.
if n := app.QueueLength(); n != 0 {
t.Fatalf("expected empty queue after drain, got %d entries", n)
}
if n := stub.callCount(); n == 0 {
t.Fatal("expected stub PromptFunc to fire at least once after splice")
}
}
// TestReleaseBusyAfterCompact_dropsQueueWhenClosed verifies that if the app
// was closed during compaction the helper discards any pending items rather
// than spawning drainQueue against a torn-down App.
func TestReleaseBusyAfterCompact_dropsQueueWhenClosed(t *testing.T) {
stub := newStub()
app := newTestApp(stub)
app.mu.Lock()
app.busy = true
app.queue = append(app.queue, queueItem{Prompt: "would have run"})
app.closed = true
app.mu.Unlock()
app.releaseBusyAfterCompact()
app.mu.Lock()
busy := app.busy
qLen := len(app.queue)
app.mu.Unlock()
if busy {
t.Fatal("expected busy=false even when closed")
}
if qLen != 0 {
t.Fatalf("expected queue cleared on closed app, got %d entries", qLen)
}
time.Sleep(20 * time.Millisecond)
if n := stub.callCount(); n != 0 {
t.Fatalf("expected 0 PromptFunc calls on closed app, got %d", n)
}
}