diff --git a/internal/app/app.go b/internal/app/app.go index 218ba8af..80a8bf22 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -356,6 +356,15 @@ func (a *App) CompactConversation(customInstructions string) error { a.mu.Unlock() }() + // Subscribe to SDK events for streaming compaction summary to the TUI. + sendFn := func(msg tea.Msg) { + if a.program != nil { + a.program.Send(msg) + } + } + unsub := a.subscribeSDKEvents(sendFn, nil) + defer unsub() + result, err := a.opts.Kit.Compact(a.rootCtx, nil, customInstructions) if err != nil { a.sendEvent(CompactErrorEvent{Err: err}) diff --git a/internal/compaction/compaction.go b/internal/compaction/compaction.go index 8d73283d..71d15005 100644 --- a/internal/compaction/compaction.go +++ b/internal/compaction/compaction.go @@ -428,6 +428,10 @@ type PreviousCompaction struct { ModifiedFiles []string } +// StreamCallback is called for each chunk of text during streaming compaction. +// Return a non-nil error to cancel the stream. +type StreamCallback func(delta string) error + // Compact summarises older messages using the LLM, returning the compaction // result and a new message slice (summary message + preserved recent // messages). @@ -442,6 +446,8 @@ type PreviousCompaction struct { // // prev carries file tracking from a previous compaction for cumulative // tracking. Pass nil if there is no prior compaction. +// onChunk is an optional callback for streaming summary text. Pass nil for +// non-streaming compaction. func Compact( ctx context.Context, model fantasy.LanguageModel, @@ -449,6 +455,7 @@ func Compact( opts CompactionOptions, customInstructions string, prev *PreviousCompaction, + onChunk StreamCallback, ) (*CompactionResult, []fantasy.Message, error) { opts.defaults() @@ -487,9 +494,9 @@ func Compact( var err error if IsSplitTurn(messages, cutPoint) { - summaryText, err = compactSplitTurn(ctx, model, oldMessages, messages, cutPoint, opts, customInstructions) + summaryText, err = compactSplitTurn(ctx, model, oldMessages, messages, cutPoint, opts, customInstructions, onChunk) } else { - summaryText, err = compactNormal(ctx, model, oldMessages, opts, customInstructions) + summaryText, err = compactNormal(ctx, model, oldMessages, opts, customInstructions, onChunk) } if err != nil { return nil, nil, err @@ -527,15 +534,17 @@ func Compact( } // compactNormal generates a summary for a clean turn-boundary cut. +// If onChunk is provided, text deltas are streamed to it. func compactNormal( ctx context.Context, model fantasy.LanguageModel, oldMessages []fantasy.Message, opts CompactionOptions, customInstructions string, + onChunk StreamCallback, ) (string, error) { conversationText := serializeMessages(oldMessages) - return generateSummary(ctx, model, conversationText, opts, customInstructions) + return generateSummary(ctx, model, conversationText, opts, customInstructions, onChunk) } // compactSplitTurn handles the case where the cut point lands mid-turn. @@ -546,6 +555,7 @@ func compactNormal( // // The merged result preserves context from both the older history and the // beginning of the current long turn. +// If onChunk is provided, both summaries and the separator are streamed. func compactSplitTurn( ctx context.Context, model fantasy.LanguageModel, @@ -554,6 +564,7 @@ func compactSplitTurn( cutPoint int, opts CompactionOptions, customInstructions string, + onChunk StreamCallback, ) (string, error) { // Find where the split turn starts. turnStart := findTurnStart(allMessages, cutPoint) @@ -573,12 +584,19 @@ func compactSplitTurn( // Generate history summary if there are complete turns before the split. if len(historyMessages) >= 2 { historySummary, err = generateSummary(ctx, model, - serializeMessages(historyMessages), opts, "") + serializeMessages(historyMessages), opts, "", onChunk) if err != nil { return "", fmt.Errorf("split turn history summary failed: %w", err) } } + // Stream the separator between history and turn prefix summaries. + if onChunk != nil && historySummary != "" { + if err := onChunk("\n\n---\n\n## Current Turn (in progress)\n\n"); err != nil { + return "", fmt.Errorf("streaming separator failed: %w", err) + } + } + // Generate turn prefix summary. turnPrefixText := serializeMessages(turnPrefixMessages) turnPrefixPrompt := "The messages above are the BEGINNING of a long turn that was split. " + @@ -588,16 +606,10 @@ func compactSplitTurn( turnPrefixPrompt += "\n\nAdditional instructions: " + customInstructions } - summaryAgent := fantasy.NewAgent(model, - fantasy.WithSystemPrompt(defaultSystemPrompt), - ) - result, err := summaryAgent.Generate(ctx, fantasy.AgentCall{ - Prompt: turnPrefixText + "\n\n" + turnPrefixPrompt, - }) + turnPrefixSummary, err := generateSummary(ctx, model, turnPrefixText, opts, turnPrefixPrompt, onChunk) if err != nil { return "", fmt.Errorf("split turn prefix summary failed: %w", err) } - turnPrefixSummary := result.Response.Content.Text() // Merge the two summaries. if historySummary != "" && turnPrefixSummary != "" { @@ -610,12 +622,14 @@ func compactSplitTurn( } // generateSummary calls the LLM to produce a structured summary. +// If onChunk is provided, the summary is streamed using Agent.Stream(). func generateSummary( ctx context.Context, model fantasy.LanguageModel, conversationText string, opts CompactionOptions, customInstructions string, + onChunk StreamCallback, ) (string, error) { userPrompt := opts.SummaryPrompt if userPrompt == "" { @@ -628,8 +642,31 @@ func generateSummary( summaryAgent := fantasy.NewAgent(model, fantasy.WithSystemPrompt(defaultSystemPrompt), ) + + prompt := conversationText + "\n\n" + userPrompt + + // Use streaming if onChunk is provided. + if onChunk != nil { + var fullText strings.Builder + _, err := summaryAgent.Stream(ctx, fantasy.AgentStreamCall{ + Prompt: prompt, + OnTextDelta: func(_, delta string) error { + if delta != "" { + fullText.WriteString(delta) + return onChunk(delta) + } + return nil + }, + }) + if err != nil { + return "", fmt.Errorf("compaction summarisation (streaming) failed: %w", err) + } + return fullText.String(), nil + } + + // Non-streaming path. result, err := summaryAgent.Generate(ctx, fantasy.AgentCall{ - Prompt: conversationText + "\n\n" + userPrompt, + Prompt: prompt, }) if err != nil { return "", fmt.Errorf("compaction summarisation failed: %w", err) diff --git a/internal/compaction/compaction_test.go b/internal/compaction/compaction_test.go index bf238f39..2ca7c075 100644 --- a/internal/compaction/compaction_test.go +++ b/internal/compaction/compaction_test.go @@ -243,7 +243,7 @@ func TestCompact_TooFewMessages(t *testing.T) { makeTextMessageN(fantasy.MessageRoleUser, 400), } - result, newMsgs, err := Compact(context.TODO(), nil, msgs, CompactionOptions{}, "", nil) + result, newMsgs, err := Compact(context.TODO(), nil, msgs, CompactionOptions{}, "", nil, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -262,7 +262,7 @@ func TestCompact_WithinBudget(t *testing.T) { makeTextMessageN(fantasy.MessageRoleAssistant, 400), } - result, newMsgs, err := Compact(context.TODO(), nil, msgs, CompactionOptions{}, "", nil) + result, newMsgs, err := Compact(context.TODO(), nil, msgs, CompactionOptions{}, "", nil, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/internal/ui/input.go b/internal/ui/input.go index 41a53d9e..205ed9c1 100644 --- a/internal/ui/input.go +++ b/internal/ui/input.go @@ -552,9 +552,9 @@ func (s *InputComponent) RenderPopupCentered(termWidth, termHeight int) string { if !s.showPopup || len(s.filtered) == 0 { return "" } - + popupContent := s.renderPopupWithOptions(true) - + // Center popup using lipgloss.Place positioned := lipgloss.Place( termWidth, @@ -563,7 +563,7 @@ func (s *InputComponent) RenderPopupCentered(termWidth, termHeight int) string { lipgloss.Center, popupContent, ) - + return positioned } diff --git a/internal/ui/message_items.go b/internal/ui/message_items.go index df3932f5..4e8cdc3a 100644 --- a/internal/ui/message_items.go +++ b/internal/ui/message_items.go @@ -109,16 +109,16 @@ func (m *TextMessageItem) renderContent(width int) string { // StreamingMessageItem represents actively streaming assistant or reasoning text. // It accumulates content chunks and re-renders on each update for live display. type StreamingMessageItem struct { - id string - role string // "assistant" or "reasoning" - content string // Accumulated streaming content - timestamp time.Time - startTime time.Time // When streaming started (for live duration counter) - modelName string - streaming bool // true while actively streaming + id string + role string // "assistant" or "reasoning" + content string // Accumulated streaming content + timestamp time.Time + startTime time.Time // When streaming started (for live duration counter) + modelName string + streaming bool // true while actively streaming finalDuration time.Duration // Frozen duration when complete - cachedRender string - cachedWidth int + cachedRender string + cachedWidth int } // NewStreamingMessageItem creates a new streaming message item. @@ -157,10 +157,10 @@ func (s *StreamingMessageItem) Render(width int) string { mutedStyle := lipgloss.NewStyle().Foreground(theme.Muted) ty := createTypography(theme) content := strings.TrimLeft(s.content, " \t\n") - + var parts []string parts = append(parts, mutedStyle.Render(ty.Italic(content))) - + // Add live duration counter (updates on each render) var duration time.Duration if s.finalDuration > 0 { @@ -170,7 +170,7 @@ func (s *StreamingMessageItem) Render(width int) string { // Still streaming, show live duration duration = time.Since(s.startTime) } - + if duration > 0 { var durationStr string if duration < time.Second { @@ -182,7 +182,7 @@ func (s *StreamingMessageItem) Render(width int) string { durationStyled := lipgloss.NewStyle().Foreground(theme.Accent).Render(durationStr) parts = append(parts, label+durationStyled) } - + rendered = styleMarginBottom1.Render(strings.Join(parts, "\n")) } else { // Render as assistant message diff --git a/internal/ui/model.go b/internal/ui/model.go index e3f242b3..65c58b97 100644 --- a/internal/ui/model.go +++ b/internal/ui/model.go @@ -1530,7 +1530,7 @@ func (m *AppModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { // This event fires for both streaming and non-streaming paths. // In streaming mode, mark the StreamingMessageItem as complete. // In non-streaming mode (no stream content accumulated), print the text. - + // Check if we have an active StreamingMessageItem hasStreamingItem := false if len(m.messages) > 0 { @@ -1539,12 +1539,12 @@ func (m *AppModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { hasStreamingItem = true } } - + // Reset stream component if m.stream != nil { m.stream.Reset() } - + // If no streaming item exists and we have content, print it as a regular message if !hasStreamingItem && strings.TrimSpace(msg.Content) != "" { m.printAssistantMessage(msg.Content) @@ -1639,11 +1639,29 @@ func (m *AppModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { m.canceling = false case app.CompactCompleteEvent: + // Finalize any streaming compaction content. if m.stream != nil { m.stream.Reset() } m.state = stateInput - m.printCompactResult(msg) + + // Mark the last streaming message as complete in ScrollList. + if len(m.messages) > 0 { + if streamMsg, ok := m.messages[len(m.messages)-1].(*StreamingMessageItem); ok { + streamMsg.MarkComplete() + } + } + + // Refresh content to show the finalized message. + m.refreshContent() + + // Print stats as a separate system message. + saved := msg.OriginalTokens - msg.CompactedTokens + statsMsg := fmt.Sprintf( + "Compaction complete: %d messages summarised, ~%dk tokens freed (%dk -> %dk)", + msg.MessagesRemoved, saved/1000, msg.OriginalTokens/1000, msg.CompactedTokens/1000, + ) + m.printSystemMessage(statsMsg) case app.CompactErrorEvent: if m.stream != nil { @@ -2003,7 +2021,7 @@ func (m *AppModel) View() tea.View { func overlayContent(base, overlay string, width, height int) string { baseLines := strings.Split(base, "\n") overlayLines := strings.Split(overlay, "\n") - + // Ensure we have exactly height lines for len(baseLines) < height { baseLines = append(baseLines, strings.Repeat(" ", width)) @@ -2011,7 +2029,7 @@ func overlayContent(base, overlay string, width, height int) string { for len(overlayLines) < height { overlayLines = append(overlayLines, strings.Repeat(" ", width)) } - + // Merge lines - overlay takes precedence where non-empty result := make([]string, height) for i := 0; i < height; i++ { @@ -2023,7 +2041,7 @@ func overlayContent(base, overlay string, width, height int) string { result[i] = strings.Repeat(" ", width) } } - + return strings.Join(result, "\n") } diff --git a/pkg/kit/compaction.go b/pkg/kit/compaction.go index 6a6ed540..602549ba 100644 --- a/pkg/kit/compaction.go +++ b/pkg/kit/compaction.go @@ -153,7 +153,15 @@ func (m *Kit) compactInternal(ctx context.Context, opts *CompactionOptions, cust } model := m.agent.GetModel() - result, _, err := compaction.Compact(ctx, model, messages, *opts, customInstructions, prev) + + // Create a streaming callback to emit chunks as events. + streamCallback := func(delta string) error { + // Emit MessageUpdateEvent to the UI for streaming display. + m.events.emit(MessageUpdateEvent{Chunk: delta}) + return nil + } + + result, _, err := compaction.Compact(ctx, model, messages, *opts, customInstructions, prev, streamCallback) if err != nil { return nil, err } diff --git a/test_messages.txt b/test_messages.txt new file mode 100644 index 00000000..21036f7a --- /dev/null +++ b/test_messages.txt @@ -0,0 +1,9 @@ +1. Hello, world! + +2. Testing one, two, three. + +3. This is a quick test message. + +4. Sample text for verification. + +5. All systems operational.