feat: add streaming support for compaction summaries

- Add StreamCallback parameter to compaction.Compact() for streaming text deltas
- Update generateSummary() to use fantasy.Agent.Stream() when callback provided
- Fix compactSplitTurn() to stream both history and turn prefix summaries
- Add SDK event subscription in CompactConversation() goroutine
- Update UI to handle streaming compaction like regular assistant messages
- Compaction summaries now stream word-by-word instead of appearing all at once

Fixes issue where compaction would show incomplete context (e.g. only 'nce')
by ensuring both history summary and turn prefix are streamed to the UI.
This commit is contained in:
Ed Zynda
2026-03-31 17:33:51 +03:00
parent e2ed345280
commit 6d4e8bcec5
8 changed files with 119 additions and 38 deletions
+9
View File
@@ -356,6 +356,15 @@ func (a *App) CompactConversation(customInstructions string) error {
a.mu.Unlock()
}()
// Subscribe to SDK events for streaming compaction summary to the TUI.
sendFn := func(msg tea.Msg) {
if a.program != nil {
a.program.Send(msg)
}
}
unsub := a.subscribeSDKEvents(sendFn, nil)
defer unsub()
result, err := a.opts.Kit.Compact(a.rootCtx, nil, customInstructions)
if err != nil {
a.sendEvent(CompactErrorEvent{Err: err})
+49 -12
View File
@@ -428,6 +428,10 @@ type PreviousCompaction struct {
ModifiedFiles []string
}
// StreamCallback is called for each chunk of text during streaming compaction.
// Return a non-nil error to cancel the stream.
type StreamCallback func(delta string) error
// Compact summarises older messages using the LLM, returning the compaction
// result and a new message slice (summary message + preserved recent
// messages).
@@ -442,6 +446,8 @@ type PreviousCompaction struct {
//
// prev carries file tracking from a previous compaction for cumulative
// tracking. Pass nil if there is no prior compaction.
// onChunk is an optional callback for streaming summary text. Pass nil for
// non-streaming compaction.
func Compact(
ctx context.Context,
model fantasy.LanguageModel,
@@ -449,6 +455,7 @@ func Compact(
opts CompactionOptions,
customInstructions string,
prev *PreviousCompaction,
onChunk StreamCallback,
) (*CompactionResult, []fantasy.Message, error) {
opts.defaults()
@@ -487,9 +494,9 @@ func Compact(
var err error
if IsSplitTurn(messages, cutPoint) {
summaryText, err = compactSplitTurn(ctx, model, oldMessages, messages, cutPoint, opts, customInstructions)
summaryText, err = compactSplitTurn(ctx, model, oldMessages, messages, cutPoint, opts, customInstructions, onChunk)
} else {
summaryText, err = compactNormal(ctx, model, oldMessages, opts, customInstructions)
summaryText, err = compactNormal(ctx, model, oldMessages, opts, customInstructions, onChunk)
}
if err != nil {
return nil, nil, err
@@ -527,15 +534,17 @@ func Compact(
}
// compactNormal generates a summary for a clean turn-boundary cut.
// If onChunk is provided, text deltas are streamed to it.
func compactNormal(
ctx context.Context,
model fantasy.LanguageModel,
oldMessages []fantasy.Message,
opts CompactionOptions,
customInstructions string,
onChunk StreamCallback,
) (string, error) {
conversationText := serializeMessages(oldMessages)
return generateSummary(ctx, model, conversationText, opts, customInstructions)
return generateSummary(ctx, model, conversationText, opts, customInstructions, onChunk)
}
// compactSplitTurn handles the case where the cut point lands mid-turn.
@@ -546,6 +555,7 @@ func compactNormal(
//
// The merged result preserves context from both the older history and the
// beginning of the current long turn.
// If onChunk is provided, both summaries and the separator are streamed.
func compactSplitTurn(
ctx context.Context,
model fantasy.LanguageModel,
@@ -554,6 +564,7 @@ func compactSplitTurn(
cutPoint int,
opts CompactionOptions,
customInstructions string,
onChunk StreamCallback,
) (string, error) {
// Find where the split turn starts.
turnStart := findTurnStart(allMessages, cutPoint)
@@ -573,12 +584,19 @@ func compactSplitTurn(
// Generate history summary if there are complete turns before the split.
if len(historyMessages) >= 2 {
historySummary, err = generateSummary(ctx, model,
serializeMessages(historyMessages), opts, "")
serializeMessages(historyMessages), opts, "", onChunk)
if err != nil {
return "", fmt.Errorf("split turn history summary failed: %w", err)
}
}
// Stream the separator between history and turn prefix summaries.
if onChunk != nil && historySummary != "" {
if err := onChunk("\n\n---\n\n## Current Turn (in progress)\n\n"); err != nil {
return "", fmt.Errorf("streaming separator failed: %w", err)
}
}
// Generate turn prefix summary.
turnPrefixText := serializeMessages(turnPrefixMessages)
turnPrefixPrompt := "The messages above are the BEGINNING of a long turn that was split. " +
@@ -588,16 +606,10 @@ func compactSplitTurn(
turnPrefixPrompt += "\n\nAdditional instructions: " + customInstructions
}
summaryAgent := fantasy.NewAgent(model,
fantasy.WithSystemPrompt(defaultSystemPrompt),
)
result, err := summaryAgent.Generate(ctx, fantasy.AgentCall{
Prompt: turnPrefixText + "\n\n" + turnPrefixPrompt,
})
turnPrefixSummary, err := generateSummary(ctx, model, turnPrefixText, opts, turnPrefixPrompt, onChunk)
if err != nil {
return "", fmt.Errorf("split turn prefix summary failed: %w", err)
}
turnPrefixSummary := result.Response.Content.Text()
// Merge the two summaries.
if historySummary != "" && turnPrefixSummary != "" {
@@ -610,12 +622,14 @@ func compactSplitTurn(
}
// generateSummary calls the LLM to produce a structured summary.
// If onChunk is provided, the summary is streamed using Agent.Stream().
func generateSummary(
ctx context.Context,
model fantasy.LanguageModel,
conversationText string,
opts CompactionOptions,
customInstructions string,
onChunk StreamCallback,
) (string, error) {
userPrompt := opts.SummaryPrompt
if userPrompt == "" {
@@ -628,8 +642,31 @@ func generateSummary(
summaryAgent := fantasy.NewAgent(model,
fantasy.WithSystemPrompt(defaultSystemPrompt),
)
prompt := conversationText + "\n\n" + userPrompt
// Use streaming if onChunk is provided.
if onChunk != nil {
var fullText strings.Builder
_, err := summaryAgent.Stream(ctx, fantasy.AgentStreamCall{
Prompt: prompt,
OnTextDelta: func(_, delta string) error {
if delta != "" {
fullText.WriteString(delta)
return onChunk(delta)
}
return nil
},
})
if err != nil {
return "", fmt.Errorf("compaction summarisation (streaming) failed: %w", err)
}
return fullText.String(), nil
}
// Non-streaming path.
result, err := summaryAgent.Generate(ctx, fantasy.AgentCall{
Prompt: conversationText + "\n\n" + userPrompt,
Prompt: prompt,
})
if err != nil {
return "", fmt.Errorf("compaction summarisation failed: %w", err)
+2 -2
View File
@@ -243,7 +243,7 @@ func TestCompact_TooFewMessages(t *testing.T) {
makeTextMessageN(fantasy.MessageRoleUser, 400),
}
result, newMsgs, err := Compact(context.TODO(), nil, msgs, CompactionOptions{}, "", nil)
result, newMsgs, err := Compact(context.TODO(), nil, msgs, CompactionOptions{}, "", nil, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -262,7 +262,7 @@ func TestCompact_WithinBudget(t *testing.T) {
makeTextMessageN(fantasy.MessageRoleAssistant, 400),
}
result, newMsgs, err := Compact(context.TODO(), nil, msgs, CompactionOptions{}, "", nil)
result, newMsgs, err := Compact(context.TODO(), nil, msgs, CompactionOptions{}, "", nil, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
+3 -3
View File
@@ -552,9 +552,9 @@ func (s *InputComponent) RenderPopupCentered(termWidth, termHeight int) string {
if !s.showPopup || len(s.filtered) == 0 {
return ""
}
popupContent := s.renderPopupWithOptions(true)
// Center popup using lipgloss.Place
positioned := lipgloss.Place(
termWidth,
@@ -563,7 +563,7 @@ func (s *InputComponent) RenderPopupCentered(termWidth, termHeight int) string {
lipgloss.Center,
popupContent,
)
return positioned
}
+13 -13
View File
@@ -109,16 +109,16 @@ func (m *TextMessageItem) renderContent(width int) string {
// StreamingMessageItem represents actively streaming assistant or reasoning text.
// It accumulates content chunks and re-renders on each update for live display.
type StreamingMessageItem struct {
id string
role string // "assistant" or "reasoning"
content string // Accumulated streaming content
timestamp time.Time
startTime time.Time // When streaming started (for live duration counter)
modelName string
streaming bool // true while actively streaming
id string
role string // "assistant" or "reasoning"
content string // Accumulated streaming content
timestamp time.Time
startTime time.Time // When streaming started (for live duration counter)
modelName string
streaming bool // true while actively streaming
finalDuration time.Duration // Frozen duration when complete
cachedRender string
cachedWidth int
cachedRender string
cachedWidth int
}
// NewStreamingMessageItem creates a new streaming message item.
@@ -157,10 +157,10 @@ func (s *StreamingMessageItem) Render(width int) string {
mutedStyle := lipgloss.NewStyle().Foreground(theme.Muted)
ty := createTypography(theme)
content := strings.TrimLeft(s.content, " \t\n")
var parts []string
parts = append(parts, mutedStyle.Render(ty.Italic(content)))
// Add live duration counter (updates on each render)
var duration time.Duration
if s.finalDuration > 0 {
@@ -170,7 +170,7 @@ func (s *StreamingMessageItem) Render(width int) string {
// Still streaming, show live duration
duration = time.Since(s.startTime)
}
if duration > 0 {
var durationStr string
if duration < time.Second {
@@ -182,7 +182,7 @@ func (s *StreamingMessageItem) Render(width int) string {
durationStyled := lipgloss.NewStyle().Foreground(theme.Accent).Render(durationStr)
parts = append(parts, label+durationStyled)
}
rendered = styleMarginBottom1.Render(strings.Join(parts, "\n"))
} else {
// Render as assistant message
+25 -7
View File
@@ -1530,7 +1530,7 @@ func (m *AppModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
// This event fires for both streaming and non-streaming paths.
// In streaming mode, mark the StreamingMessageItem as complete.
// In non-streaming mode (no stream content accumulated), print the text.
// Check if we have an active StreamingMessageItem
hasStreamingItem := false
if len(m.messages) > 0 {
@@ -1539,12 +1539,12 @@ func (m *AppModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
hasStreamingItem = true
}
}
// Reset stream component
if m.stream != nil {
m.stream.Reset()
}
// If no streaming item exists and we have content, print it as a regular message
if !hasStreamingItem && strings.TrimSpace(msg.Content) != "" {
m.printAssistantMessage(msg.Content)
@@ -1639,11 +1639,29 @@ func (m *AppModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
m.canceling = false
case app.CompactCompleteEvent:
// Finalize any streaming compaction content.
if m.stream != nil {
m.stream.Reset()
}
m.state = stateInput
m.printCompactResult(msg)
// Mark the last streaming message as complete in ScrollList.
if len(m.messages) > 0 {
if streamMsg, ok := m.messages[len(m.messages)-1].(*StreamingMessageItem); ok {
streamMsg.MarkComplete()
}
}
// Refresh content to show the finalized message.
m.refreshContent()
// Print stats as a separate system message.
saved := msg.OriginalTokens - msg.CompactedTokens
statsMsg := fmt.Sprintf(
"Compaction complete: %d messages summarised, ~%dk tokens freed (%dk -> %dk)",
msg.MessagesRemoved, saved/1000, msg.OriginalTokens/1000, msg.CompactedTokens/1000,
)
m.printSystemMessage(statsMsg)
case app.CompactErrorEvent:
if m.stream != nil {
@@ -2003,7 +2021,7 @@ func (m *AppModel) View() tea.View {
func overlayContent(base, overlay string, width, height int) string {
baseLines := strings.Split(base, "\n")
overlayLines := strings.Split(overlay, "\n")
// Ensure we have exactly height lines
for len(baseLines) < height {
baseLines = append(baseLines, strings.Repeat(" ", width))
@@ -2011,7 +2029,7 @@ func overlayContent(base, overlay string, width, height int) string {
for len(overlayLines) < height {
overlayLines = append(overlayLines, strings.Repeat(" ", width))
}
// Merge lines - overlay takes precedence where non-empty
result := make([]string, height)
for i := 0; i < height; i++ {
@@ -2023,7 +2041,7 @@ func overlayContent(base, overlay string, width, height int) string {
result[i] = strings.Repeat(" ", width)
}
}
return strings.Join(result, "\n")
}
+9 -1
View File
@@ -153,7 +153,15 @@ func (m *Kit) compactInternal(ctx context.Context, opts *CompactionOptions, cust
}
model := m.agent.GetModel()
result, _, err := compaction.Compact(ctx, model, messages, *opts, customInstructions, prev)
// Create a streaming callback to emit chunks as events.
streamCallback := func(delta string) error {
// Emit MessageUpdateEvent to the UI for streaming display.
m.events.emit(MessageUpdateEvent{Chunk: delta})
return nil
}
result, _, err := compaction.Compact(ctx, model, messages, *opts, customInstructions, prev, streamCallback)
if err != nil {
return nil, err
}
+9
View File
@@ -0,0 +1,9 @@
1. Hello, world!
2. Testing one, two, three.
3. This is a quick test message.
4. Sample text for verification.
5. All systems operational.