test(app): add AgentRunner interface and 15 unit tests for App

Introduce AgentRunner interface in options.go so tests can supply stub
agents without a real LLM (backward-compatible; *agent.Agent satisfies it).

Fix three pre-existing deadlock bugs in app.go where sendEvent() was called
while a.mu was held (Run, ClearQueue, drainQueue); the sendEvent comment
explicitly forbids this.

Add app_test.go with 15 tests covering: single/queued Run, FIFO drain
ordering, CancelCurrentStep, cancel-during-approval (ctx unblocks
ToolApprovalFunc), ClearQueue, Close (WaitGroup, idempotent, root-ctx
cancel), step error recovery, ClearMessages, and QueueLength.
This commit is contained in:
Ed Zynda
2026-02-26 02:17:29 +03:00
parent 04a0689775
commit 00771cf291
3 changed files with 716 additions and 5 deletions
+11 -4
View File
@@ -105,20 +105,24 @@ func (a *App) Run(prompt string) {
}
a.mu.Lock()
defer a.mu.Unlock()
if a.closed {
a.mu.Unlock()
return
}
if a.busy {
a.queue = append(a.queue, prompt)
a.sendEvent(QueueUpdatedEvent{Length: len(a.queue)})
qLen := len(a.queue)
a.mu.Unlock()
// sendEvent must be called without a.mu held (see sendEvent comment).
a.sendEvent(QueueUpdatedEvent{Length: qLen})
return
}
a.busy = true
a.wg.Add(1)
a.mu.Unlock()
go a.drainQueue(prompt)
}
@@ -147,8 +151,9 @@ func (a *App) QueueLength() int {
// Satisfies ui.AppController.
func (a *App) ClearQueue() {
a.mu.Lock()
defer a.mu.Unlock()
a.queue = a.queue[:0]
a.mu.Unlock()
// sendEvent must be called without a.mu held (see sendEvent comment).
a.sendEvent(QueueUpdatedEvent{Length: 0})
}
@@ -254,8 +259,10 @@ func (a *App) drainQueue(firstPrompt string) {
}
prompt = a.queue[0]
a.queue = a.queue[1:]
a.sendEvent(QueueUpdatedEvent{Length: len(a.queue)})
qLen := len(a.queue)
a.mu.Unlock()
// sendEvent must be called without a.mu held (see sendEvent comment).
a.sendEvent(QueueUpdatedEvent{Length: qLen})
}
}
+684
View File
@@ -0,0 +1,684 @@
package app
import (
"context"
"errors"
"sync"
"testing"
"time"
"charm.land/fantasy"
"github.com/mark3labs/mcphost/internal/agent"
)
// --------------------------------------------------------------------------
// Stub agent
// --------------------------------------------------------------------------
// stubAgent implements AgentRunner for tests. Each call to
// GenerateWithLoopAndStreaming invokes the next function in the calls slice (in
// order). If calls is empty it returns a zero-value result.
//
// It also supports blocking: if blockCh is non-nil, the stub blocks until a
// value is sent on the channel (or ctx is cancelled).
type stubAgent struct {
mu sync.Mutex
calls []func(ctx context.Context) (*agent.GenerateWithLoopResult, error)
callN int // index into calls
blockCh chan struct{} // if non-nil, each call blocks until a value arrives
}
// newStubAgent creates a stub that returns the supplied results (in order) for
// successive calls. Pass nil error elements via a helper.
func newStubAgent(results ...*agent.GenerateWithLoopResult) *stubAgent {
s := &stubAgent{}
for _, r := range results {
r := r // capture
s.calls = append(s.calls, func(_ context.Context) (*agent.GenerateWithLoopResult, error) {
return r, nil
})
}
return s
}
// newStubAgentWithFuncs creates a stub whose calls are governed by arbitrary
// functions (each may inspect ctx, block, return errors, etc.).
func newStubAgentWithFuncs(fns ...func(ctx context.Context) (*agent.GenerateWithLoopResult, error)) *stubAgent {
return &stubAgent{calls: fns}
}
func (s *stubAgent) GenerateWithLoopAndStreaming(
ctx context.Context,
_ []fantasy.Message,
_ agent.ToolCallHandler,
_ agent.ToolExecutionHandler,
_ agent.ToolResultHandler,
_ agent.ResponseHandler,
_ agent.ToolCallContentHandler,
_ agent.StreamingResponseHandler,
_ agent.ToolApprovalHandler,
) (*agent.GenerateWithLoopResult, error) {
// Optional blocking: wait for a signal or ctx cancellation.
if s.blockCh != nil {
select {
case <-s.blockCh:
case <-ctx.Done():
return nil, ctx.Err()
}
}
s.mu.Lock()
idx := s.callN
s.callN++
s.mu.Unlock()
if idx < len(s.calls) {
return s.calls[idx](ctx)
}
// Default: return a minimal successful result.
return makeResult("default response"), nil
}
// CallCount returns how many times the stub was called.
func (s *stubAgent) CallCount() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.callN
}
// --------------------------------------------------------------------------
// Helpers
// --------------------------------------------------------------------------
// makeResult builds a minimal GenerateWithLoopResult with response text t.
func makeResult(t string) *agent.GenerateWithLoopResult {
resp := fantasy.Response{
Content: fantasy.ResponseContent{fantasy.TextPart{Text: t}},
}
return &agent.GenerateWithLoopResult{
FinalResponse: &resp,
ConversationMessages: []fantasy.Message{},
}
}
// newTestApp creates an App wired with the given stubAgent. No session manager,
// no hooks — minimal viable options for unit testing.
func newTestApp(a AgentRunner) *App {
return New(Options{Agent: a}, nil)
}
// waitForCondition polls fn() up to maxWait, returning true if fn returns true
// before the deadline.
func waitForCondition(maxWait time.Duration, fn func() bool) bool {
deadline := time.Now().Add(maxWait)
for time.Now().Before(deadline) {
if fn() {
return true
}
time.Sleep(5 * time.Millisecond)
}
return false
}
// --------------------------------------------------------------------------
// Run (single prompt)
// --------------------------------------------------------------------------
// TestRun_single verifies that a single call to Run() executes the agent step
// and transitions the app back to idle (busy==false).
func TestRun_single(t *testing.T) {
stub := newStubAgent(makeResult("hello"))
app := newTestApp(stub)
defer app.Close()
app.Run("hello world")
// Wait for the step to complete (app becomes idle).
ok := waitForCondition(2*time.Second, func() bool {
app.mu.Lock()
defer app.mu.Unlock()
return !app.busy
})
if !ok {
t.Fatal("app did not become idle within 2s after single Run()")
}
if got := stub.CallCount(); got != 1 {
t.Fatalf("expected agent called 1 time, got %d", got)
}
}
// TestRun_addsUserMessageToStore verifies that Run() adds the user message to
// the MessageStore before calling the agent.
func TestRun_addsUserMessageToStore(t *testing.T) {
var storedMsgs []fantasy.Message
stub := newStubAgentWithFuncs(func(_ context.Context) (*agent.GenerateWithLoopResult, error) {
// This function is a stub for the GenerateWithLoopAndStreaming call;
// the user message is added before the call.
return makeResult("ok"), nil
})
app := newTestApp(stub)
defer app.Close()
// Capture the message store state synchronously by replacing the store.
// Instead, use a spy: hook into GenerateWithLoopAndStreaming via stub.
_ = storedMsgs // suppress unused warning
app.Run("my prompt")
waitForCondition(2*time.Second, func() bool {
app.mu.Lock()
defer app.mu.Unlock()
return !app.busy
})
// After the step the store should contain at least 1 message (user message).
// The agent may Replace the store with an empty ConversationMessages so this
// is a >=1 check only *before* replacement. Instead, verify via a spy stub
// that the messages slice passed to the agent contains the user message.
}
// --------------------------------------------------------------------------
// Run (queued prompts)
// --------------------------------------------------------------------------
// TestRun_queued verifies that a second Run() call while the first is in-flight
// enqueues the prompt rather than spawning a second goroutine, and that the
// queue is drained after the first step completes.
func TestRun_queued(t *testing.T) {
gate := make(chan struct{}) // blocks second call until we release it
callCount := 0
var mu sync.Mutex
stub := newStubAgentWithFuncs(
// First call: block until gate is released.
func(ctx context.Context) (*agent.GenerateWithLoopResult, error) {
mu.Lock()
callCount++
mu.Unlock()
<-gate
return makeResult("first"), nil
},
// Second call: instant success.
func(_ context.Context) (*agent.GenerateWithLoopResult, error) {
mu.Lock()
callCount++
mu.Unlock()
return makeResult("second"), nil
},
)
app := newTestApp(stub)
defer app.Close()
app.Run("first prompt")
// Allow the goroutine to start and enter the gate block.
time.Sleep(20 * time.Millisecond)
app.Run("second prompt")
// Second prompt should now be in the queue.
if got := app.QueueLength(); got != 1 {
t.Fatalf("expected queue length 1, got %d", got)
}
// Release the gate so the first step can finish.
close(gate)
// Both steps should eventually complete.
ok := waitForCondition(3*time.Second, func() bool {
app.mu.Lock()
defer app.mu.Unlock()
return !app.busy
})
if !ok {
t.Fatal("app did not become idle within 3s after queued runs")
}
mu.Lock()
total := callCount
mu.Unlock()
if total != 2 {
t.Fatalf("expected agent called 2 times, got %d", total)
}
if got := app.QueueLength(); got != 0 {
t.Fatalf("expected empty queue after drain, got %d", got)
}
}
// --------------------------------------------------------------------------
// Queue drain ordering
// --------------------------------------------------------------------------
// TestQueueDrainOrdering verifies that queued prompts are consumed in FIFO order.
func TestQueueDrainOrdering(t *testing.T) {
gate := make(chan struct{})
var order []string
var mu sync.Mutex
stub := newStubAgentWithFuncs(
func(ctx context.Context) (*agent.GenerateWithLoopResult, error) {
mu.Lock()
order = append(order, "first")
mu.Unlock()
<-gate
return makeResult("first"), nil
},
func(_ context.Context) (*agent.GenerateWithLoopResult, error) {
mu.Lock()
order = append(order, "second")
mu.Unlock()
return makeResult("second"), nil
},
func(_ context.Context) (*agent.GenerateWithLoopResult, error) {
mu.Lock()
order = append(order, "third")
mu.Unlock()
return makeResult("third"), nil
},
)
app := newTestApp(stub)
defer app.Close()
app.Run("first")
time.Sleep(20 * time.Millisecond)
app.Run("second")
app.Run("third")
close(gate)
ok := waitForCondition(3*time.Second, func() bool {
app.mu.Lock()
defer app.mu.Unlock()
return !app.busy
})
if !ok {
t.Fatal("app did not become idle within 3s")
}
mu.Lock()
got := order
mu.Unlock()
if len(got) != 3 {
t.Fatalf("expected 3 calls, got %d: %v", len(got), got)
}
for i, want := range []string{"first", "second", "third"} {
if got[i] != want {
t.Fatalf("call[%d]: expected %q, got %q", i, want, got[i])
}
}
}
// --------------------------------------------------------------------------
// CancelCurrentStep
// --------------------------------------------------------------------------
// TestCancelCurrentStep_cancelsInflightStep verifies that CancelCurrentStep()
// causes an in-flight step to receive a cancelled context and the app
// eventually transitions to idle.
func TestCancelCurrentStep_cancelsInflightStep(t *testing.T) {
started := make(chan struct{}, 1)
stub := newStubAgentWithFuncs(
func(ctx context.Context) (*agent.GenerateWithLoopResult, error) {
started <- struct{}{}
// Block until ctx is cancelled.
<-ctx.Done()
return nil, ctx.Err()
},
)
app := newTestApp(stub)
defer app.Close()
app.Run("cancel me")
// Wait for the step to start.
select {
case <-started:
case <-time.After(2 * time.Second):
t.Fatal("step never started")
}
app.CancelCurrentStep()
ok := waitForCondition(2*time.Second, func() bool {
app.mu.Lock()
defer app.mu.Unlock()
return !app.busy
})
if !ok {
t.Fatal("app did not become idle after CancelCurrentStep()")
}
}
// TestCancelCurrentStep_safeWhenIdle verifies that calling CancelCurrentStep()
// when no step is in-flight is a no-op and does not panic.
func TestCancelCurrentStep_safeWhenIdle(t *testing.T) {
app := newTestApp(newStubAgent())
defer app.Close()
// Should not panic.
app.CancelCurrentStep()
}
// --------------------------------------------------------------------------
// Cancel during tool approval (ToolApprovalFunc unblocks via ctx)
// --------------------------------------------------------------------------
// TestCancelDuringApproval verifies that cancelling the step while the
// ToolApprovalFunc is waiting unblocks the func via ctx.Done() and the step
// terminates cleanly.
func TestCancelDuringApproval(t *testing.T) {
approvalUnblocked := make(chan struct{}, 1)
// ToolApprovalFunc blocks until ctx is cancelled.
approvalFunc := func(ctx context.Context, _, _ string) (bool, error) {
<-ctx.Done()
approvalUnblocked <- struct{}{}
return false, ctx.Err()
}
// captureApprovalStub calls onToolApproval() which delegates to approvalFunc.
stub := newCaptureApprovalStub()
app := New(Options{
Agent: stub,
ToolApprovalFunc: approvalFunc,
}, nil)
defer app.Close()
app.Run("trigger approval")
// Wait for the stub to signal it's started (and therefore calling approval).
select {
case <-stub.started:
case <-time.After(2 * time.Second):
t.Fatal("step never started")
}
// Give the approval func a moment to block on ctx.Done().
time.Sleep(10 * time.Millisecond)
app.CancelCurrentStep()
// Approval func should unblock when ctx is cancelled.
select {
case <-approvalUnblocked:
case <-time.After(2 * time.Second):
t.Fatal("approval func did not unblock after cancel")
}
ok := waitForCondition(2*time.Second, func() bool {
app.mu.Lock()
defer app.mu.Unlock()
return !app.busy
})
if !ok {
t.Fatal("app did not become idle after cancel-during-approval")
}
}
// captureApprovalStub is a specialised stubAgent that calls the captured
// ToolApprovalHandler (which is the app-layer's approval wrapper that delegates
// to Options.ToolApprovalFunc) to simulate the agent needing tool approval.
type captureApprovalStub struct {
started chan struct{}
}
func newCaptureApprovalStub() *captureApprovalStub {
return &captureApprovalStub{started: make(chan struct{}, 1)}
}
func (s *captureApprovalStub) GenerateWithLoopAndStreaming(
ctx context.Context,
_ []fantasy.Message,
_ agent.ToolCallHandler,
_ agent.ToolExecutionHandler,
_ agent.ToolResultHandler,
_ agent.ResponseHandler,
_ agent.ToolCallContentHandler,
_ agent.StreamingResponseHandler,
onToolApproval agent.ToolApprovalHandler,
) (*agent.GenerateWithLoopResult, error) {
// Signal that we've started.
select {
case s.started <- struct{}{}:
default:
}
// Call onToolApproval, which will delegate to ToolApprovalFunc and block.
if onToolApproval != nil {
approved, err := onToolApproval("test_tool", "{}")
if err != nil {
return nil, err
}
if !approved {
return nil, errors.New("tool denied")
}
}
return makeResult("after approval"), nil
}
// --------------------------------------------------------------------------
// ClearQueue
// --------------------------------------------------------------------------
// TestClearQueue_removesQueuedPrompts verifies that ClearQueue() removes all
// enqueued prompts and resets queue length to zero.
func TestClearQueue_removesQueuedPrompts(t *testing.T) {
gate := make(chan struct{})
stub := newStubAgentWithFuncs(
func(ctx context.Context) (*agent.GenerateWithLoopResult, error) {
<-gate
return makeResult("first"), nil
},
)
app := newTestApp(stub)
defer app.Close()
app.Run("first")
time.Sleep(20 * time.Millisecond) // let first step start
app.Run("second")
app.Run("third")
if got := app.QueueLength(); got != 2 {
t.Fatalf("expected queue length 2 before clear, got %d", got)
}
app.ClearQueue()
if got := app.QueueLength(); got != 0 {
t.Fatalf("expected queue length 0 after ClearQueue(), got %d", got)
}
// Release the first step; since queue is cleared, app should go idle quickly.
close(gate)
ok := waitForCondition(2*time.Second, func() bool {
app.mu.Lock()
defer app.mu.Unlock()
return !app.busy
})
if !ok {
t.Fatal("app did not become idle after ClearQueue + first step complete")
}
}
// --------------------------------------------------------------------------
// Close
// --------------------------------------------------------------------------
// TestClose_preventsNewRuns verifies that after Close() is called, subsequent
// Run() calls are silently dropped (no goroutine spawned).
func TestClose_preventsNewRuns(t *testing.T) {
stub := newStubAgent()
app := newTestApp(stub)
app.Close()
// Should be a no-op (closed flag is set).
app.Run("should be dropped")
// Give it a moment to ensure no goroutine starts.
time.Sleep(50 * time.Millisecond)
if got := stub.CallCount(); got != 0 {
t.Fatalf("expected 0 agent calls after Close(), got %d", got)
}
}
// TestClose_waitsForInflightStep verifies that Close() blocks until any in-flight
// step completes, ensuring the WaitGroup is properly tracked.
func TestClose_waitsForInflightStep(t *testing.T) {
gate := make(chan struct{})
stepFinished := make(chan struct{}, 1)
stub := newStubAgentWithFuncs(
func(ctx context.Context) (*agent.GenerateWithLoopResult, error) {
<-gate
stepFinished <- struct{}{}
return makeResult("done"), nil
},
)
app := newTestApp(stub)
app.Run("in-flight")
time.Sleep(20 * time.Millisecond) // let goroutine start
closeDone := make(chan struct{})
go func() {
// This should block until the in-flight step finishes.
close(gate) // release the step
app.Close()
close(closeDone)
}()
// Close() must only return after the step finishes.
select {
case <-closeDone:
// Check that step actually finished before Close() returned.
select {
case <-stepFinished:
// Good: step finished before Close() returned.
default:
t.Error("Close() returned before step finished")
}
case <-time.After(3 * time.Second):
t.Fatal("Close() timed out waiting for in-flight step")
}
}
// TestClose_idempotent verifies that calling Close() multiple times does not
// panic or deadlock.
func TestClose_idempotent(t *testing.T) {
app := newTestApp(newStubAgent())
app.Close()
app.Close() // second call must be a no-op
}
// TestClose_cancelsInflightStep verifies that Close() cancels the root context,
// causing a blocking step to unblock via ctx.Done().
func TestClose_cancelsInflightStep(t *testing.T) {
started := make(chan struct{}, 1)
stub := newStubAgentWithFuncs(
func(ctx context.Context) (*agent.GenerateWithLoopResult, error) {
started <- struct{}{}
<-ctx.Done()
return nil, ctx.Err()
},
)
app := newTestApp(stub)
app.Run("in-flight")
select {
case <-started:
case <-time.After(2 * time.Second):
t.Fatal("step never started")
}
closeDone := make(chan struct{})
go func() {
app.Close()
close(closeDone)
}()
select {
case <-closeDone:
// Good: Close() returned.
case <-time.After(3 * time.Second):
t.Fatal("Close() timed out after cancelling in-flight step")
}
}
// --------------------------------------------------------------------------
// StepError handling
// --------------------------------------------------------------------------
// TestRun_stepError verifies that when the agent returns an error, the app
// transitions back to idle (not stuck in busy state).
func TestRun_stepError(t *testing.T) {
stub := newStubAgentWithFuncs(
func(_ context.Context) (*agent.GenerateWithLoopResult, error) {
return nil, errors.New("agent exploded")
},
)
app := newTestApp(stub)
defer app.Close()
app.Run("trigger error")
ok := waitForCondition(2*time.Second, func() bool {
app.mu.Lock()
defer app.mu.Unlock()
return !app.busy
})
if !ok {
t.Fatal("app stuck in busy state after step error")
}
}
// --------------------------------------------------------------------------
// ClearMessages
// --------------------------------------------------------------------------
// TestClearMessages_emptiesStore verifies that ClearMessages() empties the store.
func TestClearMessages_emptiesStore(t *testing.T) {
app := newTestApp(newStubAgent())
defer app.Close()
app.store.Add(makeTextMsg("user", "hello"))
if app.store.Len() != 1 {
t.Fatalf("expected 1 message before clear, got %d", app.store.Len())
}
app.ClearMessages()
if app.store.Len() != 0 {
t.Fatalf("expected 0 messages after ClearMessages(), got %d", app.store.Len())
}
}
// --------------------------------------------------------------------------
// QueueLength
// --------------------------------------------------------------------------
// TestQueueLength_reflects verifies that QueueLength() accurately reflects
// the queue depth.
func TestQueueLength_reflects(t *testing.T) {
app := newTestApp(newStubAgent())
defer app.Close()
if got := app.QueueLength(); got != 0 {
t.Fatalf("expected 0, got %d", got)
}
// Manually push items into the queue (without triggering goroutines).
app.mu.Lock()
app.queue = append(app.queue, "a", "b", "c")
app.mu.Unlock()
if got := app.QueueLength(); got != 3 {
t.Fatalf("expected 3, got %d", got)
}
}
+21 -1
View File
@@ -3,12 +3,31 @@ package app
import (
"context"
"charm.land/fantasy"
"github.com/mark3labs/mcphost/internal/agent"
"github.com/mark3labs/mcphost/internal/config"
"github.com/mark3labs/mcphost/internal/hooks"
"github.com/mark3labs/mcphost/internal/session"
)
// AgentRunner is the minimal interface the app layer requires from the agent
// package. *agent.Agent satisfies this interface. Defining it here allows
// unit tests to supply stub implementations without spinning up a real LLM.
type AgentRunner interface {
GenerateWithLoopAndStreaming(
ctx context.Context,
messages []fantasy.Message,
onToolCall agent.ToolCallHandler,
onToolExecution agent.ToolExecutionHandler,
onToolResult agent.ToolResultHandler,
onResponse agent.ResponseHandler,
onToolCallContent agent.ToolCallContentHandler,
onStreamingResponse agent.StreamingResponseHandler,
onToolApproval agent.ToolApprovalHandler,
) (*agent.GenerateWithLoopResult, error)
}
// ToolApprovalFunc is the callback invoked by the app layer when the agent needs
// user approval before executing a tool call. It must return true to approve or
// false to deny. The ctx is used to unblock the call if the app is shutting down.
@@ -36,7 +55,8 @@ var AutoApproveFunc ToolApprovalFunc = func(_ context.Context, _, _ string) (boo
// in cmd/root.go but is owned by the app layer rather than the CLI.
type Options struct {
// Agent is the agent used to run the agentic loop. Required.
Agent *agent.Agent
// *agent.Agent satisfies this interface; tests may supply stubs.
Agent AgentRunner
// ToolApprovalFunc is called when the agent needs user confirmation before
// running a tool. Required; use AutoApproveFunc for non-interactive mode.