From 6d4e8bcec59c2772c6e4fb07865d198c40a9faea Mon Sep 17 00:00:00 2001 From: Ed Zynda Date: Tue, 31 Mar 2026 17:33:51 +0300 Subject: [PATCH] feat: add streaming support for compaction summaries - Add StreamCallback parameter to compaction.Compact() for streaming text deltas - Update generateSummary() to use fantasy.Agent.Stream() when callback provided - Fix compactSplitTurn() to stream both history and turn prefix summaries - Add SDK event subscription in CompactConversation() goroutine - Update UI to handle streaming compaction like regular assistant messages - Compaction summaries now stream word-by-word instead of appearing all at once Fixes issue where compaction would show incomplete context (e.g. only 'nce') by ensuring both history summary and turn prefix are streamed to the UI. --- internal/app/app.go | 9 ++++ internal/compaction/compaction.go | 61 +++++++++++++++++++++----- internal/compaction/compaction_test.go | 4 +- internal/ui/input.go | 6 +-- internal/ui/message_items.go | 26 +++++------ internal/ui/model.go | 32 +++++++++++--- pkg/kit/compaction.go | 10 ++++- test_messages.txt | 9 ++++ 8 files changed, 119 insertions(+), 38 deletions(-) create mode 100644 test_messages.txt diff --git a/internal/app/app.go b/internal/app/app.go index 218ba8af..80a8bf22 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -356,6 +356,15 @@ func (a *App) CompactConversation(customInstructions string) error { a.mu.Unlock() }() + // Subscribe to SDK events for streaming compaction summary to the TUI. + sendFn := func(msg tea.Msg) { + if a.program != nil { + a.program.Send(msg) + } + } + unsub := a.subscribeSDKEvents(sendFn, nil) + defer unsub() + result, err := a.opts.Kit.Compact(a.rootCtx, nil, customInstructions) if err != nil { a.sendEvent(CompactErrorEvent{Err: err}) diff --git a/internal/compaction/compaction.go b/internal/compaction/compaction.go index 8d73283d..71d15005 100644 --- a/internal/compaction/compaction.go +++ b/internal/compaction/compaction.go @@ -428,6 +428,10 @@ type PreviousCompaction struct { ModifiedFiles []string } +// StreamCallback is called for each chunk of text during streaming compaction. +// Return a non-nil error to cancel the stream. +type StreamCallback func(delta string) error + // Compact summarises older messages using the LLM, returning the compaction // result and a new message slice (summary message + preserved recent // messages). @@ -442,6 +446,8 @@ type PreviousCompaction struct { // // prev carries file tracking from a previous compaction for cumulative // tracking. Pass nil if there is no prior compaction. +// onChunk is an optional callback for streaming summary text. Pass nil for +// non-streaming compaction. func Compact( ctx context.Context, model fantasy.LanguageModel, @@ -449,6 +455,7 @@ func Compact( opts CompactionOptions, customInstructions string, prev *PreviousCompaction, + onChunk StreamCallback, ) (*CompactionResult, []fantasy.Message, error) { opts.defaults() @@ -487,9 +494,9 @@ func Compact( var err error if IsSplitTurn(messages, cutPoint) { - summaryText, err = compactSplitTurn(ctx, model, oldMessages, messages, cutPoint, opts, customInstructions) + summaryText, err = compactSplitTurn(ctx, model, oldMessages, messages, cutPoint, opts, customInstructions, onChunk) } else { - summaryText, err = compactNormal(ctx, model, oldMessages, opts, customInstructions) + summaryText, err = compactNormal(ctx, model, oldMessages, opts, customInstructions, onChunk) } if err != nil { return nil, nil, err @@ -527,15 +534,17 @@ func Compact( } // compactNormal generates a summary for a clean turn-boundary cut. +// If onChunk is provided, text deltas are streamed to it. func compactNormal( ctx context.Context, model fantasy.LanguageModel, oldMessages []fantasy.Message, opts CompactionOptions, customInstructions string, + onChunk StreamCallback, ) (string, error) { conversationText := serializeMessages(oldMessages) - return generateSummary(ctx, model, conversationText, opts, customInstructions) + return generateSummary(ctx, model, conversationText, opts, customInstructions, onChunk) } // compactSplitTurn handles the case where the cut point lands mid-turn. @@ -546,6 +555,7 @@ func compactNormal( // // The merged result preserves context from both the older history and the // beginning of the current long turn. +// If onChunk is provided, both summaries and the separator are streamed. func compactSplitTurn( ctx context.Context, model fantasy.LanguageModel, @@ -554,6 +564,7 @@ func compactSplitTurn( cutPoint int, opts CompactionOptions, customInstructions string, + onChunk StreamCallback, ) (string, error) { // Find where the split turn starts. turnStart := findTurnStart(allMessages, cutPoint) @@ -573,12 +584,19 @@ func compactSplitTurn( // Generate history summary if there are complete turns before the split. if len(historyMessages) >= 2 { historySummary, err = generateSummary(ctx, model, - serializeMessages(historyMessages), opts, "") + serializeMessages(historyMessages), opts, "", onChunk) if err != nil { return "", fmt.Errorf("split turn history summary failed: %w", err) } } + // Stream the separator between history and turn prefix summaries. + if onChunk != nil && historySummary != "" { + if err := onChunk("\n\n---\n\n## Current Turn (in progress)\n\n"); err != nil { + return "", fmt.Errorf("streaming separator failed: %w", err) + } + } + // Generate turn prefix summary. turnPrefixText := serializeMessages(turnPrefixMessages) turnPrefixPrompt := "The messages above are the BEGINNING of a long turn that was split. " + @@ -588,16 +606,10 @@ func compactSplitTurn( turnPrefixPrompt += "\n\nAdditional instructions: " + customInstructions } - summaryAgent := fantasy.NewAgent(model, - fantasy.WithSystemPrompt(defaultSystemPrompt), - ) - result, err := summaryAgent.Generate(ctx, fantasy.AgentCall{ - Prompt: turnPrefixText + "\n\n" + turnPrefixPrompt, - }) + turnPrefixSummary, err := generateSummary(ctx, model, turnPrefixText, opts, turnPrefixPrompt, onChunk) if err != nil { return "", fmt.Errorf("split turn prefix summary failed: %w", err) } - turnPrefixSummary := result.Response.Content.Text() // Merge the two summaries. if historySummary != "" && turnPrefixSummary != "" { @@ -610,12 +622,14 @@ func compactSplitTurn( } // generateSummary calls the LLM to produce a structured summary. +// If onChunk is provided, the summary is streamed using Agent.Stream(). func generateSummary( ctx context.Context, model fantasy.LanguageModel, conversationText string, opts CompactionOptions, customInstructions string, + onChunk StreamCallback, ) (string, error) { userPrompt := opts.SummaryPrompt if userPrompt == "" { @@ -628,8 +642,31 @@ func generateSummary( summaryAgent := fantasy.NewAgent(model, fantasy.WithSystemPrompt(defaultSystemPrompt), ) + + prompt := conversationText + "\n\n" + userPrompt + + // Use streaming if onChunk is provided. + if onChunk != nil { + var fullText strings.Builder + _, err := summaryAgent.Stream(ctx, fantasy.AgentStreamCall{ + Prompt: prompt, + OnTextDelta: func(_, delta string) error { + if delta != "" { + fullText.WriteString(delta) + return onChunk(delta) + } + return nil + }, + }) + if err != nil { + return "", fmt.Errorf("compaction summarisation (streaming) failed: %w", err) + } + return fullText.String(), nil + } + + // Non-streaming path. result, err := summaryAgent.Generate(ctx, fantasy.AgentCall{ - Prompt: conversationText + "\n\n" + userPrompt, + Prompt: prompt, }) if err != nil { return "", fmt.Errorf("compaction summarisation failed: %w", err) diff --git a/internal/compaction/compaction_test.go b/internal/compaction/compaction_test.go index bf238f39..2ca7c075 100644 --- a/internal/compaction/compaction_test.go +++ b/internal/compaction/compaction_test.go @@ -243,7 +243,7 @@ func TestCompact_TooFewMessages(t *testing.T) { makeTextMessageN(fantasy.MessageRoleUser, 400), } - result, newMsgs, err := Compact(context.TODO(), nil, msgs, CompactionOptions{}, "", nil) + result, newMsgs, err := Compact(context.TODO(), nil, msgs, CompactionOptions{}, "", nil, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -262,7 +262,7 @@ func TestCompact_WithinBudget(t *testing.T) { makeTextMessageN(fantasy.MessageRoleAssistant, 400), } - result, newMsgs, err := Compact(context.TODO(), nil, msgs, CompactionOptions{}, "", nil) + result, newMsgs, err := Compact(context.TODO(), nil, msgs, CompactionOptions{}, "", nil, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/internal/ui/input.go b/internal/ui/input.go index 41a53d9e..205ed9c1 100644 --- a/internal/ui/input.go +++ b/internal/ui/input.go @@ -552,9 +552,9 @@ func (s *InputComponent) RenderPopupCentered(termWidth, termHeight int) string { if !s.showPopup || len(s.filtered) == 0 { return "" } - + popupContent := s.renderPopupWithOptions(true) - + // Center popup using lipgloss.Place positioned := lipgloss.Place( termWidth, @@ -563,7 +563,7 @@ func (s *InputComponent) RenderPopupCentered(termWidth, termHeight int) string { lipgloss.Center, popupContent, ) - + return positioned } diff --git a/internal/ui/message_items.go b/internal/ui/message_items.go index df3932f5..4e8cdc3a 100644 --- a/internal/ui/message_items.go +++ b/internal/ui/message_items.go @@ -109,16 +109,16 @@ func (m *TextMessageItem) renderContent(width int) string { // StreamingMessageItem represents actively streaming assistant or reasoning text. // It accumulates content chunks and re-renders on each update for live display. type StreamingMessageItem struct { - id string - role string // "assistant" or "reasoning" - content string // Accumulated streaming content - timestamp time.Time - startTime time.Time // When streaming started (for live duration counter) - modelName string - streaming bool // true while actively streaming + id string + role string // "assistant" or "reasoning" + content string // Accumulated streaming content + timestamp time.Time + startTime time.Time // When streaming started (for live duration counter) + modelName string + streaming bool // true while actively streaming finalDuration time.Duration // Frozen duration when complete - cachedRender string - cachedWidth int + cachedRender string + cachedWidth int } // NewStreamingMessageItem creates a new streaming message item. @@ -157,10 +157,10 @@ func (s *StreamingMessageItem) Render(width int) string { mutedStyle := lipgloss.NewStyle().Foreground(theme.Muted) ty := createTypography(theme) content := strings.TrimLeft(s.content, " \t\n") - + var parts []string parts = append(parts, mutedStyle.Render(ty.Italic(content))) - + // Add live duration counter (updates on each render) var duration time.Duration if s.finalDuration > 0 { @@ -170,7 +170,7 @@ func (s *StreamingMessageItem) Render(width int) string { // Still streaming, show live duration duration = time.Since(s.startTime) } - + if duration > 0 { var durationStr string if duration < time.Second { @@ -182,7 +182,7 @@ func (s *StreamingMessageItem) Render(width int) string { durationStyled := lipgloss.NewStyle().Foreground(theme.Accent).Render(durationStr) parts = append(parts, label+durationStyled) } - + rendered = styleMarginBottom1.Render(strings.Join(parts, "\n")) } else { // Render as assistant message diff --git a/internal/ui/model.go b/internal/ui/model.go index e3f242b3..65c58b97 100644 --- a/internal/ui/model.go +++ b/internal/ui/model.go @@ -1530,7 +1530,7 @@ func (m *AppModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { // This event fires for both streaming and non-streaming paths. // In streaming mode, mark the StreamingMessageItem as complete. // In non-streaming mode (no stream content accumulated), print the text. - + // Check if we have an active StreamingMessageItem hasStreamingItem := false if len(m.messages) > 0 { @@ -1539,12 +1539,12 @@ func (m *AppModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { hasStreamingItem = true } } - + // Reset stream component if m.stream != nil { m.stream.Reset() } - + // If no streaming item exists and we have content, print it as a regular message if !hasStreamingItem && strings.TrimSpace(msg.Content) != "" { m.printAssistantMessage(msg.Content) @@ -1639,11 +1639,29 @@ func (m *AppModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { m.canceling = false case app.CompactCompleteEvent: + // Finalize any streaming compaction content. if m.stream != nil { m.stream.Reset() } m.state = stateInput - m.printCompactResult(msg) + + // Mark the last streaming message as complete in ScrollList. + if len(m.messages) > 0 { + if streamMsg, ok := m.messages[len(m.messages)-1].(*StreamingMessageItem); ok { + streamMsg.MarkComplete() + } + } + + // Refresh content to show the finalized message. + m.refreshContent() + + // Print stats as a separate system message. + saved := msg.OriginalTokens - msg.CompactedTokens + statsMsg := fmt.Sprintf( + "Compaction complete: %d messages summarised, ~%dk tokens freed (%dk -> %dk)", + msg.MessagesRemoved, saved/1000, msg.OriginalTokens/1000, msg.CompactedTokens/1000, + ) + m.printSystemMessage(statsMsg) case app.CompactErrorEvent: if m.stream != nil { @@ -2003,7 +2021,7 @@ func (m *AppModel) View() tea.View { func overlayContent(base, overlay string, width, height int) string { baseLines := strings.Split(base, "\n") overlayLines := strings.Split(overlay, "\n") - + // Ensure we have exactly height lines for len(baseLines) < height { baseLines = append(baseLines, strings.Repeat(" ", width)) @@ -2011,7 +2029,7 @@ func overlayContent(base, overlay string, width, height int) string { for len(overlayLines) < height { overlayLines = append(overlayLines, strings.Repeat(" ", width)) } - + // Merge lines - overlay takes precedence where non-empty result := make([]string, height) for i := 0; i < height; i++ { @@ -2023,7 +2041,7 @@ func overlayContent(base, overlay string, width, height int) string { result[i] = strings.Repeat(" ", width) } } - + return strings.Join(result, "\n") } diff --git a/pkg/kit/compaction.go b/pkg/kit/compaction.go index 6a6ed540..602549ba 100644 --- a/pkg/kit/compaction.go +++ b/pkg/kit/compaction.go @@ -153,7 +153,15 @@ func (m *Kit) compactInternal(ctx context.Context, opts *CompactionOptions, cust } model := m.agent.GetModel() - result, _, err := compaction.Compact(ctx, model, messages, *opts, customInstructions, prev) + + // Create a streaming callback to emit chunks as events. + streamCallback := func(delta string) error { + // Emit MessageUpdateEvent to the UI for streaming display. + m.events.emit(MessageUpdateEvent{Chunk: delta}) + return nil + } + + result, _, err := compaction.Compact(ctx, model, messages, *opts, customInstructions, prev, streamCallback) if err != nil { return nil, err } diff --git a/test_messages.txt b/test_messages.txt new file mode 100644 index 00000000..21036f7a --- /dev/null +++ b/test_messages.txt @@ -0,0 +1,9 @@ +1. Hello, world! + +2. Testing one, two, three. + +3. This is a quick test message. + +4. Sample text for verification. + +5. All systems operational.