mirror of
https://github.com/mark3labs/kit.git
synced 2026-06-20 22:26:17 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4bdc4f75cc | |||
| bbd8975ca0 |
@@ -924,6 +924,9 @@ func runNormalMode(ctx context.Context) error {
|
||||
Index: resp.Index,
|
||||
}
|
||||
},
|
||||
SpawnSubagent: func(config extensions.SubagentConfig) (*extensions.SubagentHandle, *extensions.SubagentResult, error) {
|
||||
return extensions.SpawnSubagent(config)
|
||||
},
|
||||
})
|
||||
kitInstance.EmitSessionStart()
|
||||
}
|
||||
|
||||
@@ -0,0 +1,164 @@
|
||||
//go:build ignore
|
||||
|
||||
// Subagent Test Extension — Tests the new first-class subagent API
|
||||
//
|
||||
// Commands:
|
||||
//
|
||||
// /subtest <task> — spawn a blocking subagent and print result
|
||||
// /subbg <task> — spawn a background subagent with live output
|
||||
//
|
||||
// Usage: kit -e examples/extensions/subagent-test.go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"kit/ext"
|
||||
)
|
||||
|
||||
var (
|
||||
mu sync.Mutex
|
||||
latestCtx ext.Context
|
||||
hasCtx bool
|
||||
)
|
||||
|
||||
func Init(api ext.API) {
|
||||
// Keep context fresh
|
||||
api.OnSessionStart(func(_ ext.SessionStartEvent, ctx ext.Context) {
|
||||
mu.Lock()
|
||||
latestCtx = ctx
|
||||
hasCtx = true
|
||||
mu.Unlock()
|
||||
|
||||
ctx.PrintInfo(
|
||||
"Subagent Test Extension loaded\n\n" +
|
||||
"/subtest <task> Spawn blocking subagent\n" +
|
||||
"/subbg <task> Spawn background subagent\n\n" +
|
||||
"The LLM can also use the spawn_subagent tool.")
|
||||
})
|
||||
|
||||
api.OnAgentEnd(func(_ ext.AgentEndEvent, ctx ext.Context) {
|
||||
mu.Lock()
|
||||
latestCtx = ctx
|
||||
mu.Unlock()
|
||||
})
|
||||
|
||||
// Command: /subtest <task> — blocking subagent
|
||||
api.RegisterCommand(ext.CommandDef{
|
||||
Name: "subtest",
|
||||
Description: "Spawn a blocking subagent: /subtest <task>",
|
||||
Execute: func(args string, ctx ext.Context) (string, error) {
|
||||
mu.Lock()
|
||||
latestCtx = ctx
|
||||
hasCtx = true
|
||||
mu.Unlock()
|
||||
|
||||
task := strings.TrimSpace(args)
|
||||
if task == "" {
|
||||
return "Usage: /subtest <task>", nil
|
||||
}
|
||||
|
||||
ctx.PrintInfo(fmt.Sprintf("Spawning blocking subagent for: %s", task))
|
||||
|
||||
start := time.Now()
|
||||
_, result, err := ctx.SpawnSubagent(ext.SubagentConfig{
|
||||
Prompt: task,
|
||||
Timeout: 2 * time.Minute,
|
||||
Blocking: true,
|
||||
})
|
||||
elapsed := time.Since(start)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Sprintf("Spawn error: %v", err), nil
|
||||
}
|
||||
|
||||
if result == nil {
|
||||
return "No result returned", nil
|
||||
}
|
||||
|
||||
if result.Error != nil {
|
||||
return fmt.Sprintf("Subagent failed (exit %d) after %ds: %v\n\nPartial output:\n%s",
|
||||
result.ExitCode, int(elapsed.Seconds()), result.Error, truncate(result.Response, 2000)), nil
|
||||
}
|
||||
|
||||
response := fmt.Sprintf("Subagent completed in %ds", int(elapsed.Seconds()))
|
||||
if result.Usage != nil {
|
||||
response += fmt.Sprintf(" (tokens: %d in / %d out)", result.Usage.InputTokens, result.Usage.OutputTokens)
|
||||
}
|
||||
response += fmt.Sprintf("\n\nResult:\n%s", truncate(result.Response, 4000))
|
||||
|
||||
return response, nil
|
||||
},
|
||||
})
|
||||
|
||||
// Command: /subbg <task> — background subagent with callbacks
|
||||
api.RegisterCommand(ext.CommandDef{
|
||||
Name: "subbg",
|
||||
Description: "Spawn a background subagent: /subbg <task>",
|
||||
Execute: func(args string, ctx ext.Context) (string, error) {
|
||||
mu.Lock()
|
||||
latestCtx = ctx
|
||||
hasCtx = true
|
||||
mu.Unlock()
|
||||
|
||||
task := strings.TrimSpace(args)
|
||||
if task == "" {
|
||||
return "Usage: /subbg <task>", nil
|
||||
}
|
||||
|
||||
ctx.PrintInfo(fmt.Sprintf("Spawning background subagent for: %s", task))
|
||||
|
||||
start := time.Now()
|
||||
handle, _, err := ctx.SpawnSubagent(ext.SubagentConfig{
|
||||
Prompt: task,
|
||||
Timeout: 2 * time.Minute,
|
||||
OnOutput: func(chunk string) {
|
||||
// Live output - could update a widget here
|
||||
fmt.Print(chunk)
|
||||
},
|
||||
OnComplete: func(result ext.SubagentResult) {
|
||||
elapsed := time.Since(start)
|
||||
|
||||
mu.Lock()
|
||||
c := latestCtx
|
||||
ok := hasCtx
|
||||
mu.Unlock()
|
||||
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if result.Error != nil {
|
||||
c.SendMessage(fmt.Sprintf("Background subagent failed after %ds: %v",
|
||||
int(elapsed.Seconds()), result.Error))
|
||||
return
|
||||
}
|
||||
|
||||
msg := fmt.Sprintf("Background subagent completed in %ds", int(elapsed.Seconds()))
|
||||
if result.Usage != nil {
|
||||
msg += fmt.Sprintf(" (tokens: %d in / %d out)", result.Usage.InputTokens, result.Usage.OutputTokens)
|
||||
}
|
||||
msg += fmt.Sprintf("\n\nResult:\n%s", truncate(result.Response, 4000))
|
||||
|
||||
c.SendMessage(msg)
|
||||
},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Sprintf("Spawn error: %v", err), nil
|
||||
}
|
||||
|
||||
return fmt.Sprintf("Background subagent spawned (ID: %s). Results will be delivered when complete.", handle.ID), nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func truncate(s string, max int) string {
|
||||
if len(s) <= max {
|
||||
return s
|
||||
}
|
||||
return s[:max] + "\n\n... [truncated]"
|
||||
}
|
||||
@@ -0,0 +1,120 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"charm.land/fantasy"
|
||||
"github.com/mark3labs/kit/internal/extensions"
|
||||
)
|
||||
|
||||
const defaultSubagentTimeout = 5 * time.Minute
|
||||
const maxSubagentTimeout = 30 * time.Minute
|
||||
|
||||
type subagentArgs struct {
|
||||
Task string `json:"task"`
|
||||
Model string `json:"model,omitempty"`
|
||||
SystemPrompt string `json:"system_prompt,omitempty"`
|
||||
TimeoutSeconds int `json:"timeout_seconds,omitempty"`
|
||||
}
|
||||
|
||||
// NewSubagentTool creates the spawn_subagent core tool.
|
||||
func NewSubagentTool(opts ...ToolOption) fantasy.AgentTool {
|
||||
return &coreTool{
|
||||
info: fantasy.ToolInfo{
|
||||
Name: "spawn_subagent",
|
||||
Description: `Spawn a background subagent to perform a task autonomously.
|
||||
|
||||
The subagent runs as a separate Kit instance with full tool access. Use this to:
|
||||
- Delegate independent subtasks that can run in parallel
|
||||
- Perform research or analysis without blocking your main work
|
||||
- Execute tasks that benefit from a fresh context window
|
||||
|
||||
The subagent result is returned when it completes. For long-running tasks,
|
||||
consider breaking them into smaller focused subtasks.
|
||||
|
||||
Example use cases:
|
||||
- "Research the authentication patterns in this codebase"
|
||||
- "Write unit tests for the UserService class"
|
||||
- "Analyze the performance bottlenecks in the database queries"`,
|
||||
Parameters: map[string]any{
|
||||
"task": map[string]any{
|
||||
"type": "string",
|
||||
"description": "The complete task description for the subagent to perform",
|
||||
},
|
||||
"model": map[string]any{
|
||||
"type": "string",
|
||||
"description": "Optional model override (e.g. 'anthropic/claude-haiku-3-5-20241022' for faster/cheaper tasks)",
|
||||
},
|
||||
"system_prompt": map[string]any{
|
||||
"type": "string",
|
||||
"description": "Optional system prompt for domain-specific guidance",
|
||||
},
|
||||
"timeout_seconds": map[string]any{
|
||||
"type": "number",
|
||||
"description": "Maximum execution time in seconds (default: 300, max: 1800)",
|
||||
},
|
||||
},
|
||||
Required: []string{"task"},
|
||||
},
|
||||
handler: func(ctx context.Context, call fantasy.ToolCall) (fantasy.ToolResponse, error) {
|
||||
return executeSubagent(ctx, call)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func executeSubagent(ctx context.Context, call fantasy.ToolCall) (fantasy.ToolResponse, error) {
|
||||
var args subagentArgs
|
||||
if err := parseArgs(call.Input, &args); err != nil {
|
||||
return fantasy.NewTextErrorResponse("task parameter is required"), nil
|
||||
}
|
||||
if args.Task == "" {
|
||||
return fantasy.NewTextErrorResponse("task parameter is required"), nil
|
||||
}
|
||||
|
||||
// Determine timeout
|
||||
timeout := defaultSubagentTimeout
|
||||
if args.TimeoutSeconds > 0 {
|
||||
timeout = min(time.Duration(args.TimeoutSeconds)*time.Second, maxSubagentTimeout)
|
||||
}
|
||||
|
||||
// Spawn subagent in blocking mode
|
||||
_, result, err := extensions.SpawnSubagent(extensions.SubagentConfig{
|
||||
Prompt: args.Task,
|
||||
Model: args.Model,
|
||||
SystemPrompt: args.SystemPrompt,
|
||||
Timeout: timeout,
|
||||
Blocking: true,
|
||||
})
|
||||
if err != nil {
|
||||
return fantasy.NewTextErrorResponse(fmt.Sprintf("Failed to spawn subagent: %v", err)), nil
|
||||
}
|
||||
|
||||
if result.Error != nil {
|
||||
// Subagent failed but we still have partial output
|
||||
response := fmt.Sprintf("Subagent failed (exit code %d) after %ds.\n\nError: %v",
|
||||
result.ExitCode, int(result.Elapsed.Seconds()), result.Error)
|
||||
if result.Response != "" {
|
||||
response += fmt.Sprintf("\n\nPartial output:\n%s", truncateResponse(result.Response, 8000))
|
||||
}
|
||||
return fantasy.NewTextErrorResponse(response), nil
|
||||
}
|
||||
|
||||
// Build successful response
|
||||
response := fmt.Sprintf("Subagent completed successfully in %ds.", int(result.Elapsed.Seconds()))
|
||||
if result.Usage != nil {
|
||||
response += fmt.Sprintf(" (tokens: %d in / %d out)", result.Usage.InputTokens, result.Usage.OutputTokens)
|
||||
}
|
||||
response += fmt.Sprintf("\n\nResult:\n%s", truncateResponse(result.Response, 12000))
|
||||
|
||||
return fantasy.NewTextResponse(response), nil
|
||||
}
|
||||
|
||||
// truncateResponse limits the response length to avoid overwhelming context windows.
|
||||
func truncateResponse(s string, maxLen int) string {
|
||||
if len(s) <= maxLen {
|
||||
return s
|
||||
}
|
||||
return s[:maxLen] + "\n\n... [truncated — " + fmt.Sprintf("%d", len(s)-maxLen) + " bytes omitted]"
|
||||
}
|
||||
@@ -96,5 +96,6 @@ func AllTools(opts ...ToolOption) []fantasy.AgentTool {
|
||||
NewGrepTool(opts...),
|
||||
NewFindTool(opts...),
|
||||
NewLsTool(opts...),
|
||||
NewSubagentTool(opts...),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -491,6 +491,41 @@ type Context struct {
|
||||
// },
|
||||
// })
|
||||
ReloadExtensions func() error
|
||||
|
||||
// SpawnSubagent spawns a child Kit instance to perform a task autonomously.
|
||||
// The subagent runs as a separate subprocess with full tool access but
|
||||
// isolated session and extensions (--no-session --no-extensions).
|
||||
//
|
||||
// When config.Blocking is true, blocks until completion and returns the
|
||||
// result directly (handle is nil). When false, returns immediately with
|
||||
// a handle for monitoring/cancellation.
|
||||
//
|
||||
// Example — blocking call:
|
||||
//
|
||||
// _, result, err := ctx.SpawnSubagent(ext.SubagentConfig{
|
||||
// Prompt: "Research authentication patterns in this codebase",
|
||||
// Blocking: true,
|
||||
// Timeout: 2 * time.Minute,
|
||||
// })
|
||||
// if err != nil {
|
||||
// ctx.PrintError("spawn failed: " + err.Error())
|
||||
// return
|
||||
// }
|
||||
// ctx.PrintInfo("Subagent result:\n" + result.Response)
|
||||
//
|
||||
// Example — background spawn with callbacks:
|
||||
//
|
||||
// handle, _, _ := ctx.SpawnSubagent(ext.SubagentConfig{
|
||||
// Prompt: "Write unit tests for UserService",
|
||||
// OnOutput: func(chunk string) {
|
||||
// // Live output streaming
|
||||
// },
|
||||
// OnComplete: func(result ext.SubagentResult) {
|
||||
// ctx.SendMessage("Subagent finished:\n" + result.Response)
|
||||
// },
|
||||
// })
|
||||
// // handle.Kill() to cancel, handle.Wait() to block
|
||||
SpawnSubagent func(SubagentConfig) (*SubagentHandle, *SubagentResult, error)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@@ -0,0 +1,328 @@
|
||||
// Package extensions provides subagent spawning capabilities for Kit extensions.
|
||||
package extensions
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Subagent types
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// SubagentConfig configures a subagent spawn.
|
||||
type SubagentConfig struct {
|
||||
// Prompt is the task/instruction for the subagent (required).
|
||||
Prompt string
|
||||
|
||||
// Model overrides the parent's model (e.g. "anthropic/claude-haiku-3-5-20241022").
|
||||
// Empty string uses the parent's current model.
|
||||
Model string
|
||||
|
||||
// SystemPrompt provides domain-specific instructions.
|
||||
// Empty string uses the default system prompt.
|
||||
SystemPrompt string
|
||||
|
||||
// Timeout limits execution time. Zero means 5 minute default.
|
||||
Timeout time.Duration
|
||||
|
||||
// OnOutput streams stderr output chunks as the subagent runs.
|
||||
// Called from a goroutine; must be safe for concurrent use.
|
||||
OnOutput func(chunk string)
|
||||
|
||||
// OnComplete is called when the subagent finishes (success or error).
|
||||
// Called from a goroutine; must be safe for concurrent use.
|
||||
OnComplete func(result SubagentResult)
|
||||
|
||||
// Blocking, when true, makes SpawnSubagent wait for completion and
|
||||
// return the result directly. When false (default), spawns in background
|
||||
// and returns immediately with a handle.
|
||||
Blocking bool
|
||||
|
||||
// ParentSessionID links the subagent's session to the parent (optional).
|
||||
// When set, the subagent's session is persisted with a parent reference.
|
||||
ParentSessionID string
|
||||
}
|
||||
|
||||
// SubagentResult contains the outcome of a subagent execution.
|
||||
type SubagentResult struct {
|
||||
// Response is the subagent's final text response.
|
||||
Response string
|
||||
|
||||
// Error is set if the subagent failed (nil on success).
|
||||
Error error
|
||||
|
||||
// ExitCode is the subprocess exit code (0 = success).
|
||||
ExitCode int
|
||||
|
||||
// Elapsed is the total execution time.
|
||||
Elapsed time.Duration
|
||||
|
||||
// Usage contains token usage if available.
|
||||
Usage *SubagentUsage
|
||||
}
|
||||
|
||||
// SubagentUsage contains token usage from the subagent's run.
|
||||
type SubagentUsage struct {
|
||||
InputTokens int64
|
||||
OutputTokens int64
|
||||
}
|
||||
|
||||
// SubagentHandle provides control over a running subagent.
|
||||
type SubagentHandle struct {
|
||||
// ID is a unique identifier for this subagent instance.
|
||||
ID string
|
||||
|
||||
proc *os.Process
|
||||
done chan struct{}
|
||||
result *SubagentResult
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// Kill terminates the subagent process.
|
||||
func (h *SubagentHandle) Kill() error {
|
||||
h.mu.Lock()
|
||||
proc := h.proc
|
||||
h.mu.Unlock()
|
||||
if proc != nil {
|
||||
return proc.Kill()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Wait blocks until the subagent completes and returns the result.
|
||||
func (h *SubagentHandle) Wait() SubagentResult {
|
||||
<-h.done
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
if h.result != nil {
|
||||
return *h.result
|
||||
}
|
||||
return SubagentResult{Error: fmt.Errorf("subagent completed without result")}
|
||||
}
|
||||
|
||||
// Done returns a channel that closes when the subagent completes.
|
||||
func (h *SubagentHandle) Done() <-chan struct{} {
|
||||
return h.done
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Internal helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// subagentJSONOutput matches the JSON envelope produced by `kit --json`.
|
||||
type subagentJSONOutput struct {
|
||||
Response string `json:"response"`
|
||||
Usage *struct {
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
OutputTokens int64 `json:"output_tokens"`
|
||||
} `json:"usage,omitempty"`
|
||||
}
|
||||
|
||||
var subagentCounter uint64
|
||||
|
||||
func generateSubagentID() string {
|
||||
n := atomic.AddUint64(&subagentCounter, 1)
|
||||
return fmt.Sprintf("sub-%d-%d", time.Now().UnixNano(), n)
|
||||
}
|
||||
|
||||
func findKitBinary() string {
|
||||
// Try the current process executable first.
|
||||
if exe, err := os.Executable(); err == nil {
|
||||
if _, err := os.Stat(exe); err == nil {
|
||||
return exe
|
||||
}
|
||||
}
|
||||
// Fall back to PATH lookup.
|
||||
if p, err := exec.LookPath("kit"); err == nil {
|
||||
return p
|
||||
}
|
||||
return "kit"
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// SpawnSubagent implementation
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// SpawnSubagent spawns a child Kit instance to perform a task.
|
||||
//
|
||||
// When config.Blocking is true, blocks until completion and returns the result
|
||||
// directly (handle is nil). When false, returns immediately with a handle for
|
||||
// monitoring/cancellation.
|
||||
//
|
||||
// The subagent runs with --json --no-session --no-extensions flags by default,
|
||||
// ensuring isolation from the parent's extensions and session state.
|
||||
func SpawnSubagent(cfg SubagentConfig) (*SubagentHandle, *SubagentResult, error) {
|
||||
if cfg.Prompt == "" {
|
||||
return nil, nil, fmt.Errorf("prompt is required")
|
||||
}
|
||||
|
||||
timeout := cfg.Timeout
|
||||
if timeout == 0 {
|
||||
timeout = 5 * time.Minute
|
||||
}
|
||||
|
||||
kitBinary := findKitBinary()
|
||||
|
||||
// Build subprocess arguments.
|
||||
args := []string{
|
||||
"--json",
|
||||
"--no-session",
|
||||
"--no-extensions",
|
||||
}
|
||||
if cfg.Model != "" {
|
||||
args = append(args, "--model", cfg.Model)
|
||||
}
|
||||
|
||||
// Handle system prompt - write to temp file if provided.
|
||||
var tmpFile *os.File
|
||||
if cfg.SystemPrompt != "" {
|
||||
var err error
|
||||
tmpFile, err = os.CreateTemp("", "kit-subagent-*.txt")
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("create temp file: %w", err)
|
||||
}
|
||||
if _, err := tmpFile.WriteString(cfg.SystemPrompt); err != nil {
|
||||
_ = tmpFile.Close()
|
||||
_ = os.Remove(tmpFile.Name())
|
||||
return nil, nil, fmt.Errorf("write system prompt: %w", err)
|
||||
}
|
||||
_ = tmpFile.Close()
|
||||
args = append(args, "--system-prompt", tmpFile.Name())
|
||||
}
|
||||
|
||||
// Add the prompt as a positional argument.
|
||||
args = append(args, cfg.Prompt)
|
||||
|
||||
// Create command with timeout context.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
|
||||
cmd := exec.CommandContext(ctx, kitBinary, args...)
|
||||
cmd.Env = os.Environ()
|
||||
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
cancel()
|
||||
if tmpFile != nil {
|
||||
_ = os.Remove(tmpFile.Name())
|
||||
}
|
||||
return nil, nil, fmt.Errorf("stdout pipe: %w", err)
|
||||
}
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
cancel()
|
||||
if tmpFile != nil {
|
||||
_ = os.Remove(tmpFile.Name())
|
||||
}
|
||||
return nil, nil, fmt.Errorf("stderr pipe: %w", err)
|
||||
}
|
||||
|
||||
handle := &SubagentHandle{
|
||||
ID: generateSubagentID(),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Start the subprocess.
|
||||
start := time.Now()
|
||||
if err := cmd.Start(); err != nil {
|
||||
cancel()
|
||||
if tmpFile != nil {
|
||||
_ = os.Remove(tmpFile.Name())
|
||||
}
|
||||
return nil, nil, fmt.Errorf("start subprocess: %w", err)
|
||||
}
|
||||
|
||||
handle.mu.Lock()
|
||||
handle.proc = cmd.Process
|
||||
handle.mu.Unlock()
|
||||
|
||||
// Run the subprocess monitoring in a goroutine.
|
||||
go func() {
|
||||
defer close(handle.done)
|
||||
defer cancel()
|
||||
if tmpFile != nil {
|
||||
defer func() { _ = os.Remove(tmpFile.Name()) }()
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var stdoutBuf strings.Builder
|
||||
|
||||
// Read stderr (live output).
|
||||
wg.Go(func() {
|
||||
scanner := bufio.NewScanner(stderr)
|
||||
scanner.Buffer(make([]byte, 256*1024), 256*1024)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if cfg.OnOutput != nil && strings.TrimSpace(line) != "" {
|
||||
cfg.OnOutput(line + "\n")
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// Read stdout (JSON output).
|
||||
scanner := bufio.NewScanner(stdout)
|
||||
scanner.Buffer(make([]byte, 256*1024), 256*1024)
|
||||
for scanner.Scan() {
|
||||
stdoutBuf.WriteString(scanner.Text() + "\n")
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
waitErr := cmd.Wait()
|
||||
elapsed := time.Since(start)
|
||||
|
||||
// Build result.
|
||||
result := SubagentResult{Elapsed: elapsed}
|
||||
if waitErr != nil {
|
||||
result.Error = waitErr
|
||||
if exitErr, ok := waitErr.(*exec.ExitError); ok {
|
||||
result.ExitCode = exitErr.ExitCode()
|
||||
} else {
|
||||
result.ExitCode = 1
|
||||
}
|
||||
}
|
||||
|
||||
// Parse JSON output.
|
||||
raw := strings.TrimSpace(stdoutBuf.String())
|
||||
var parsed subagentJSONOutput
|
||||
if raw != "" && json.Unmarshal([]byte(raw), &parsed) == nil {
|
||||
result.Response = parsed.Response
|
||||
if parsed.Usage != nil {
|
||||
result.Usage = &SubagentUsage{
|
||||
InputTokens: parsed.Usage.InputTokens,
|
||||
OutputTokens: parsed.Usage.OutputTokens,
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Fallback: use raw stdout.
|
||||
result.Response = raw
|
||||
}
|
||||
|
||||
handle.mu.Lock()
|
||||
handle.result = &result
|
||||
handle.proc = nil
|
||||
handle.mu.Unlock()
|
||||
|
||||
if cfg.OnComplete != nil {
|
||||
cfg.OnComplete(result)
|
||||
}
|
||||
}()
|
||||
|
||||
if cfg.Blocking {
|
||||
// Wait for completion and return result directly.
|
||||
<-handle.done
|
||||
handle.mu.Lock()
|
||||
r := handle.result
|
||||
handle.mu.Unlock()
|
||||
return nil, r, nil
|
||||
}
|
||||
|
||||
return handle, nil, nil
|
||||
}
|
||||
@@ -110,6 +110,12 @@ func Symbols() interp.Exports {
|
||||
"BeforeCompactEvent": reflect.ValueOf((*BeforeCompactEvent)(nil)),
|
||||
"BeforeCompactResult": reflect.ValueOf((*BeforeCompactResult)(nil)),
|
||||
|
||||
// Subagent types
|
||||
"SubagentConfig": reflect.ValueOf((*SubagentConfig)(nil)),
|
||||
"SubagentResult": reflect.ValueOf((*SubagentResult)(nil)),
|
||||
"SubagentUsage": reflect.ValueOf((*SubagentUsage)(nil)),
|
||||
"SubagentHandle": reflect.ValueOf((*SubagentHandle)(nil)),
|
||||
|
||||
// Event structs
|
||||
"ToolCallEvent": reflect.ValueOf((*ToolCallEvent)(nil)),
|
||||
"ToolCallResult": reflect.ValueOf((*ToolCallResult)(nil)),
|
||||
|
||||
@@ -0,0 +1,193 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"charm.land/fantasy"
|
||||
)
|
||||
|
||||
// ProviderPool manages reusable LLM provider instances to reduce overhead
|
||||
// when spawning multiple subagents or making repeated completion calls.
|
||||
type ProviderPool struct {
|
||||
mu sync.RWMutex
|
||||
providers map[string]*pooledProvider
|
||||
ttl time.Duration
|
||||
closed bool
|
||||
closeCh chan struct{}
|
||||
}
|
||||
|
||||
type pooledProvider struct {
|
||||
model fantasy.LanguageModel
|
||||
closer func() error
|
||||
providerOpts fantasy.ProviderOptions
|
||||
created time.Time
|
||||
lastUsed time.Time
|
||||
refs int32
|
||||
}
|
||||
|
||||
// DefaultPoolTTL is the default time-to-live for idle pooled providers.
|
||||
const DefaultPoolTTL = 5 * time.Minute
|
||||
|
||||
// globalPool is the singleton provider pool instance.
|
||||
var globalPool *ProviderPool
|
||||
var poolOnce sync.Once
|
||||
|
||||
// GetGlobalPool returns the singleton provider pool instance.
|
||||
func GetGlobalPool() *ProviderPool {
|
||||
poolOnce.Do(func() {
|
||||
globalPool = NewProviderPool(DefaultPoolTTL)
|
||||
})
|
||||
return globalPool
|
||||
}
|
||||
|
||||
// NewProviderPool creates a provider pool with the given TTL for idle providers.
|
||||
func NewProviderPool(ttl time.Duration) *ProviderPool {
|
||||
p := &ProviderPool{
|
||||
providers: make(map[string]*pooledProvider),
|
||||
ttl: ttl,
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
go p.cleanupLoop()
|
||||
return p
|
||||
}
|
||||
|
||||
// Get returns a provider for the model string, creating one if needed.
|
||||
// The returned release function must be called when the provider is no longer
|
||||
// needed. The provider may be reused by subsequent Get calls.
|
||||
func (p *ProviderPool) Get(ctx context.Context, modelString string) (fantasy.LanguageModel, fantasy.ProviderOptions, func(), error) {
|
||||
p.mu.Lock()
|
||||
|
||||
// Check if we have an existing provider.
|
||||
if pp, ok := p.providers[modelString]; ok {
|
||||
pp.refs++
|
||||
pp.lastUsed = time.Now()
|
||||
p.mu.Unlock()
|
||||
return pp.model, pp.providerOpts, func() { p.release(modelString) }, nil
|
||||
}
|
||||
|
||||
p.mu.Unlock()
|
||||
|
||||
// Create a new provider outside the lock.
|
||||
config := &ProviderConfig{ModelString: modelString}
|
||||
result, err := CreateProvider(ctx, config)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
// Double-check: another goroutine may have created one while we were unlocked.
|
||||
if pp, ok := p.providers[modelString]; ok {
|
||||
// Close the one we just created and use the existing one.
|
||||
if result.Closer != nil {
|
||||
_ = result.Closer.Close()
|
||||
}
|
||||
pp.refs++
|
||||
pp.lastUsed = time.Now()
|
||||
return pp.model, pp.providerOpts, func() { p.release(modelString) }, nil
|
||||
}
|
||||
|
||||
var closerFn func() error
|
||||
if result.Closer != nil {
|
||||
closerFn = result.Closer.Close
|
||||
}
|
||||
|
||||
pp := &pooledProvider{
|
||||
model: result.Model,
|
||||
closer: closerFn,
|
||||
providerOpts: result.ProviderOptions,
|
||||
created: time.Now(),
|
||||
lastUsed: time.Now(),
|
||||
refs: 1,
|
||||
}
|
||||
p.providers[modelString] = pp
|
||||
|
||||
return pp.model, pp.providerOpts, func() { p.release(modelString) }, nil
|
||||
}
|
||||
|
||||
func (p *ProviderPool) release(modelString string) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if pp, ok := p.providers[modelString]; ok {
|
||||
pp.refs--
|
||||
pp.lastUsed = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ProviderPool) cleanupLoop() {
|
||||
ticker := time.NewTicker(p.ttl / 2)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-p.closeCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
p.cleanup()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ProviderPool) cleanup() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
for key, pp := range p.providers {
|
||||
// Only clean up providers with no active references and past TTL.
|
||||
if pp.refs <= 0 && now.Sub(pp.lastUsed) > p.ttl {
|
||||
if pp.closer != nil {
|
||||
_ = pp.closer()
|
||||
}
|
||||
delete(p.providers, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close shuts down the pool and releases all providers.
|
||||
func (p *ProviderPool) Close() {
|
||||
p.mu.Lock()
|
||||
if p.closed {
|
||||
p.mu.Unlock()
|
||||
return
|
||||
}
|
||||
p.closed = true
|
||||
close(p.closeCh)
|
||||
|
||||
for key, pp := range p.providers {
|
||||
if pp.closer != nil {
|
||||
_ = pp.closer()
|
||||
}
|
||||
delete(p.providers, key)
|
||||
}
|
||||
p.mu.Unlock()
|
||||
}
|
||||
|
||||
// Stats returns current pool statistics.
|
||||
func (p *ProviderPool) Stats() PoolStats {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
|
||||
stats := PoolStats{
|
||||
TotalProviders: len(p.providers),
|
||||
}
|
||||
for _, pp := range p.providers {
|
||||
if pp.refs > 0 {
|
||||
stats.ActiveProviders++
|
||||
} else {
|
||||
stats.IdleProviders++
|
||||
}
|
||||
}
|
||||
return stats
|
||||
}
|
||||
|
||||
// PoolStats contains provider pool statistics.
|
||||
type PoolStats struct {
|
||||
TotalProviders int
|
||||
ActiveProviders int
|
||||
IdleProviders int
|
||||
}
|
||||
@@ -38,6 +38,10 @@ type SessionHeader struct {
|
||||
Timestamp time.Time `json:"timestamp"` // creation time
|
||||
Cwd string `json:"cwd"` // working directory
|
||||
ParentSession string `json:"parent_session,omitempty"` // path to parent if forked
|
||||
|
||||
// Subagent fields (set when session is created by a subagent)
|
||||
ParentSessionID string `json:"parent_session_id,omitempty"` // UUID of parent session
|
||||
SubagentTask string `json:"subagent_task,omitempty"` // original task prompt
|
||||
}
|
||||
|
||||
// Entry is the common structure shared by all tree entries (everything except
|
||||
|
||||
@@ -29,6 +29,12 @@ type SessionInfo struct {
|
||||
// ParentSessionPath is the parent session path if this session was forked.
|
||||
ParentSessionPath string
|
||||
|
||||
// ParentSessionID is the UUID of the parent session (for subagent sessions).
|
||||
ParentSessionID string
|
||||
|
||||
// SubagentTask is the original task prompt (for subagent sessions).
|
||||
SubagentTask string
|
||||
|
||||
// Created is when the session was first created.
|
||||
Created time.Time
|
||||
|
||||
@@ -162,6 +168,8 @@ func extractSessionInfo(path string) (*SessionInfo, error) {
|
||||
info.Created = h.Timestamp
|
||||
info.Modified = h.Timestamp
|
||||
info.ParentSessionPath = h.ParentSession
|
||||
info.ParentSessionID = h.ParentSessionID
|
||||
info.SubagentTask = h.SubagentTask
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -245,3 +253,27 @@ func extractTextPreview(partsJSON json.RawMessage) string {
|
||||
func DeleteSession(path string) error {
|
||||
return os.Remove(path)
|
||||
}
|
||||
|
||||
// ListChildSessions returns all sessions that have the given session ID as
|
||||
// their parent. This is useful for finding subagent sessions spawned from
|
||||
// a parent session. Results are sorted by creation time (newest first).
|
||||
func ListChildSessions(parentID string) ([]SessionInfo, error) {
|
||||
if parentID == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
allSessions, err := ListAllSessions()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var children []SessionInfo
|
||||
for _, s := range allSessions {
|
||||
if s.ParentSessionID == parentID {
|
||||
children = append(children, s)
|
||||
}
|
||||
}
|
||||
|
||||
// Already sorted by modification time from ListAllSessions
|
||||
return children, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user