Files
chenhg5-cc-connect/core/streaming.go
Claude 410d46e5bf feat(feishu): refresh rich card rendering and panel handling (#1204)
Squash merge of PR #1204 (rebased onto main, minor conflicts resolved).
Adds structured per-turn reply footer, cardkit-v1 streaming, status footer
interface, claude context usage tracking, and rich card body improvements.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-04 10:05:17 +08:00

527 lines
18 KiB
Go

package core
import (
"context"
"log/slog"
"strings"
"sync"
"time"
)
// StreamPreviewCfg controls the streaming preview behavior.
type StreamPreviewCfg struct {
Enabled bool // global toggle
DisabledPlatforms []string // platforms where streaming preview is disabled (e.g. "feishu")
IntervalMs int // minimum ms between updates (default 1500)
MinDeltaChars int // minimum new chars before sending an update (default 30)
MaxChars int // max preview length (default 2000)
}
// DefaultStreamPreviewCfg returns sensible defaults.
func DefaultStreamPreviewCfg() StreamPreviewCfg {
return StreamPreviewCfg{
Enabled: true,
DisabledPlatforms: nil,
IntervalMs: 1500,
MinDeltaChars: 30,
MaxChars: 2000,
}
}
// streamPreview manages the state and throttling of a single streaming preview.
// It accumulates text from EventText events and periodically pushes
// updates to the platform via MessageUpdater.UpdateMessage.
type streamPreview struct {
mu sync.Mutex
cfg StreamPreviewCfg
platform Platform
replyCtx any
ctx context.Context
transform func(string) string
fullText string // accumulated full text so far
lastSentText string // what was last successfully sent to the platform
lastSentAt time.Time
lastSentViaUpdate bool // true if lastSentText was delivered via UpdateMessage (not SendPreviewStart)
previewMsgID any // platform-specific ID for the preview message (returned by SendPreviewStart)
degraded bool // if true, stop trying (platform doesn't support it or permanent error)
timer *time.Timer
timerStop chan struct{} // closed when preview ends
pendingStatus CardStatus // last status set via setStatus(); applied on recovery
}
// ToolStepKind identifies the kind of progress row shown in rich cards.
type ToolStepKind string
const (
ToolStepKindTool ToolStepKind = "tool"
ToolStepKindThinking ToolStepKind = "thinking"
)
// ToolStep is one summarized progress row shown in rich progress cards.
type ToolStep struct {
Kind ToolStepKind // progress row kind; empty means tool for backward compatibility
Name string // tool name (e.g. "Bash", "Edit")
Summary string // human-readable summary shown in the card
Result string // optional tool output/result summary
Status string // optional tool status (e.g. completed/failed)
ExitCode *int // optional process exit code
Success *bool // optional success flag
Done bool // true once a tool result has been observed
}
// RichCardSupporter is an optional interface for platforms that can build
// native rich cards combining tool steps, markdown content, and a multi-line
// status footer.
//
// statusFooter is a pre-composed multi-line string assembled by the engine
// (typically one line per: elapsed time, model · effort · ctx, workdir).
// Pass empty string to hide the footer entirely. Lines are separated by '\n';
// the platform implementation is expected to render each line as its own
// dim-styled element so they don't visually merge with the body markdown.
//
// (Phase B refactor: previously took elapsed time.Duration; now the engine
// owns elapsed-time formatting so it can apply i18n + project-level toggles
// uniformly with the rest of the footer.)
type RichCardSupporter interface {
BuildRichCard(status CardStatus, title string, steps []ToolStep, markdown string, streaming bool, statusFooter string) string
}
// RichCardMarkdownResolver is an optional interface for platforms that need to
// pre-process rich-card markdown before it is rendered or streamed.
//
// Feishu uses this to turn markdown image URLs into real uploaded image keys:
// intermediate streaming frames may return quickly without waiting for uploads,
// while final frames may wait briefly so the completed card can embed images.
type RichCardMarkdownResolver interface {
ResolveRichCardMarkdown(ctx context.Context, markdown string, final bool) string
}
// MarkdownTableSplitter is an optional interface for platforms that need
// platform-specific markdown table chunking before final send.
type MarkdownTableSplitter interface {
SplitMarkdownByTables(md string, maxTables int) []string
}
// RichCardTextStreamer is an optional interface for platforms that support
// per-element streaming text updates on rich cards (e.g. Lark/Feishu's
// cardkit-v1 streaming text update API). When implemented, the engine routes
// EventText growth through StreamRichCardText instead of full-card updates,
// giving the client a native typewriter rendering effect.
//
// Returns ErrNotSupported when the specific preview handle was created
// without a streamable card entity (fallback path); the engine then falls
// back to the standard MessageUpdater full-card update.
type RichCardTextStreamer interface {
// StreamRichCardText pushes the latest fullText to the streaming-text
// element of the rich card identified by previewHandle. The platform
// implementation is responsible for serializing concurrent calls and
// maintaining a monotonic sequence counter per handle.
StreamRichCardText(ctx context.Context, previewHandle any, fullText string) error
}
// PreviewStarter is an optional interface for platforms that can initiate a
// streaming preview message and return a handle for subsequent updates.
type PreviewStarter interface {
// SendPreviewStart sends the initial preview message and returns a handle
// that can be passed to UpdateMessage for edits. Returns nil handle if
// preview is not supported for this context.
SendPreviewStart(ctx context.Context, replyCtx any, content string) (previewHandle any, err error)
}
// PreviewCleaner is an optional interface for platforms that need to clean up
// the preview message after the final response is sent (e.g. Discord deletes
// the preview and sends a fresh message).
type PreviewCleaner interface {
DeletePreviewMessage(ctx context.Context, previewHandle any) error
}
// PreviewFinishPreference is an optional interface for platforms that want to
// keep the preview message as the final delivered message on normal completion.
type PreviewFinishPreference interface {
KeepPreviewOnFinish() bool
}
func newStreamPreview(cfg StreamPreviewCfg, p Platform, replyCtx any, ctx context.Context, transform func(string) string) *streamPreview {
return &streamPreview{
cfg: cfg,
platform: p,
replyCtx: replyCtx,
ctx: ctx,
transform: transform,
timerStop: make(chan struct{}),
}
}
// canPreview returns true if the platform supports message updating and is not disabled.
func (sp *streamPreview) canPreview() bool {
sp.mu.Lock()
degraded := sp.degraded
sp.mu.Unlock()
if degraded || !sp.cfg.Enabled {
return false
}
// Check if platform is in disabled list
platformName := sp.platform.Name()
for _, disabled := range sp.cfg.DisabledPlatforms {
if strings.EqualFold(disabled, platformName) {
return false
}
}
_, ok := sp.platform.(MessageUpdater)
return ok
}
// appendText adds new text content and triggers a throttled flush if needed.
func (sp *streamPreview) appendText(text string) {
sp.mu.Lock()
defer sp.mu.Unlock()
if sp.degraded || !sp.cfg.Enabled {
return
}
sp.fullText += text
displayText := sp.fullText
maxChars := sp.cfg.MaxChars
if maxChars > 0 && len([]rune(displayText)) > maxChars {
displayText = string([]rune(displayText)[:maxChars]) + "…"
}
delta := len([]rune(displayText)) - len([]rune(sp.lastSentText))
elapsed := time.Since(sp.lastSentAt)
interval := time.Duration(sp.cfg.IntervalMs) * time.Millisecond
if delta < sp.cfg.MinDeltaChars && !sp.lastSentAt.IsZero() {
sp.scheduleFlushLocked(interval)
return
}
if elapsed < interval && !sp.lastSentAt.IsZero() {
remaining := interval - elapsed
sp.scheduleFlushLocked(remaining)
return
}
sp.cancelTimerLocked()
sp.flushLocked(displayText)
}
func (sp *streamPreview) scheduleFlushLocked(delay time.Duration) {
if sp.timer != nil {
return // already scheduled
}
sp.timer = time.AfterFunc(delay, func() {
sp.mu.Lock()
defer sp.mu.Unlock()
sp.timer = nil
if sp.degraded {
return
}
displayText := sp.fullText
maxChars := sp.cfg.MaxChars
if maxChars > 0 && len([]rune(displayText)) > maxChars {
displayText = string([]rune(displayText)[:maxChars]) + "…"
}
sp.flushLocked(displayText)
})
}
func (sp *streamPreview) cancelTimerLocked() {
if sp.timer != nil {
sp.timer.Stop()
sp.timer = nil
}
}
// flushLocked sends the current preview text to the platform. Must hold sp.mu.
func (sp *streamPreview) flushLocked(text string) {
if sp.transform != nil {
text = sp.transform(text)
}
if text == sp.lastSentText || text == "" {
return
}
updater, ok := sp.platform.(MessageUpdater)
if !ok {
slog.Debug("stream preview: platform does not support UpdateMessage, degrading")
sp.degraded = true
return
}
if sp.previewMsgID == nil {
// First preview: try to send a new preview message
if starter, ok := sp.platform.(PreviewStarter); ok {
slog.Debug("stream preview: sending first preview via SendPreviewStart", "text_len", len(text))
handle, err := starter.SendPreviewStart(sp.ctx, sp.replyCtx, text)
if err != nil {
slog.Debug("stream preview: start failed, degrading", "error", err)
sp.degraded = true
return
}
sp.previewMsgID = handle
} else {
if err := sp.platform.Send(sp.ctx, sp.replyCtx, text); err != nil {
slog.Debug("stream preview: initial send failed", "error", err)
sp.degraded = true
return
}
sp.previewMsgID = sp.replyCtx
}
sp.lastSentText = text
sp.lastSentViaUpdate = false
sp.lastSentAt = time.Now()
return
}
// Update existing preview message
slog.Debug("stream preview: updating via UpdateMessage", "text_len", len(text))
if err := updater.UpdateMessage(sp.ctx, sp.previewMsgID, text); err != nil {
slog.Debug("stream preview: update failed, degrading", "error", err)
sp.degraded = true
return
}
sp.lastSentText = text
sp.lastSentViaUpdate = true
sp.lastSentAt = time.Now()
}
// freeze stops the streaming preview permanently: cancels pending timers,
// updates the preview message in-place with the accumulated text, and marks
// the preview as degraded so no further updates are sent.
// Call this when a permission prompt or other interruption occurs.
func (sp *streamPreview) freeze() {
sp.mu.Lock()
defer sp.mu.Unlock()
sp.cancelTimerLocked()
if sp.previewMsgID != nil && !sp.degraded {
if updater, ok := sp.platform.(MessageUpdater); ok {
text := sp.fullText
maxChars := sp.cfg.MaxChars
if maxChars > 0 && len([]rune(text)) > maxChars {
text = string([]rune(text)[:maxChars]) + "…"
}
if text != "" {
if sp.transform != nil {
text = sp.transform(text)
}
_ = updater.UpdateMessage(sp.ctx, sp.previewMsgID, text)
}
}
}
sp.degraded = true
}
// discard removes the preview message when possible and disables further
// preview updates. Call this when the caller intends to send a separate
// non-preview message (for example after tool use or on terminal errors).
func (sp *streamPreview) discard() {
sp.mu.Lock()
defer sp.mu.Unlock()
sp.cancelTimerLocked()
select {
case <-sp.timerStop:
default:
close(sp.timerStop)
}
if sp.previewMsgID != nil {
if cleaner, ok := sp.platform.(PreviewCleaner); ok {
slog.Debug("stream preview discard: deleting preview")
_ = cleaner.DeletePreviewMessage(sp.ctx, sp.previewMsgID)
}
}
sp.previewMsgID = nil
sp.degraded = true
}
// finish is called when the agent response is complete. It cancels any pending
// timer and optionally cleans up the preview message.
// Returns true if a preview was active and the final message was sent via preview
// (so the caller should skip sending the full response separately).
//
// `statusFooter` is an optional structured footer string (one or more lines)
// that platforms implementing StatusFooterUpdater render with small/dim
// styling separate from the body. When the platform does not implement that
// interface and statusFooter is non-empty, finish falls back to appending the
// footer inline to finalText before the regular UpdateMessage call.
func (sp *streamPreview) finish(finalText, statusFooter string) bool {
sp.mu.Lock()
defer sp.mu.Unlock()
sp.cancelTimerLocked()
select {
case <-sp.timerStop:
default:
close(sp.timerStop)
}
if sp.transform != nil {
finalText = sp.transform(finalText)
}
if sp.previewMsgID == nil || sp.degraded {
if sp.previewMsgID != nil && sp.degraded {
// Try to recover degraded preview via UpdateMessage before falling back to delete
if finalText != "" {
if updater, ok := sp.platform.(MessageUpdater); ok {
if err := updater.UpdateMessage(sp.ctx, sp.previewMsgID, finalText); err == nil {
if sp.pendingStatus != "" {
if statusUpdater, ok := sp.platform.(PreviewStatusUpdater); ok {
statusUpdater.SetPreviewStatus(sp.previewMsgID, sp.pendingStatus)
}
}
return true
} else {
slog.Debug("stream preview finish: degraded UpdateMessage failed, cleaning up", "error", err)
}
}
}
if cleaner, ok := sp.platform.(PreviewCleaner); ok {
slog.Debug("stream preview finish: deleting stale preview (degraded)")
_ = cleaner.DeletePreviewMessage(sp.ctx, sp.previewMsgID)
}
}
slog.Debug("stream preview finish: no active preview", "hasHandle", sp.previewMsgID != nil, "degraded", sp.degraded)
return false
}
keepPreview := false
if pref, ok := sp.platform.(PreviewFinishPreference); ok {
keepPreview = pref.KeepPreviewOnFinish()
}
// If platform wants to delete the preview and send fresh, let it.
if cleaner, ok := sp.platform.(PreviewCleaner); ok && !keepPreview {
slog.Debug("stream preview finish: deleting preview (PreviewCleaner)")
_ = cleaner.DeletePreviewMessage(sp.ctx, sp.previewMsgID)
return false
}
updater, ok := sp.platform.(MessageUpdater)
if !ok {
slog.Debug("stream preview finish: no MessageUpdater")
return false
}
if finalText == "" {
slog.Debug("stream preview finish: empty final text")
return false
}
// If the final text is identical to what was last sent via UpdateMessage
// AND no status footer needs to be applied, skip the redundant API call.
// This prevents duplicate messages on platforms (e.g. Feishu) where
// patching with identical content may fail. We must NOT skip when a
// statusFooter is pending — the body may match but the footer hasn't
// been rendered yet, and dropping the call would silently lose it.
// Only skip when lastSentViaUpdate is true — if the text was only sent
// via SendPreviewStart (first flush), we must still call UpdateMessage
// because it may apply different formatting (e.g. Markdown→HTML for
// Telegram).
if finalText == sp.lastSentText && sp.lastSentViaUpdate && statusFooter == "" {
slog.Debug("stream preview finish: text unchanged and no footer, skipping",
"text_len", len(finalText))
return true
}
// Try to update the preview in-place with the full final text.
// maxChars only throttles intermediate streaming updates; at finish time
// we always attempt a single final update regardless of length.
slog.Debug("stream preview finish: sending final UpdateMessage",
"text_len", len(finalText), "lastSent_len", len(sp.lastSentText),
"same", finalText == sp.lastSentText, "viaUpdate", sp.lastSentViaUpdate,
"footer_len", len(statusFooter))
// Prefer the structured-footer path when the platform supports it, so the
// footer renders with small/dim styling separate from the response body.
if statusFooter != "" {
if sfu, ok := sp.platform.(StatusFooterUpdater); ok {
if err := sfu.UpdateMessageWithStatusFooter(sp.ctx, sp.previewMsgID, finalText, statusFooter); err == nil {
slog.Debug("stream preview finish: success via UpdateMessageWithStatusFooter")
return true
} else {
slog.Debug("stream preview finish: UpdateMessageWithStatusFooter failed, falling back", "error", err)
}
}
// Fallback: append inline so the footer is at least visible.
finalText = appendReplyFooter(finalText, statusFooter)
}
if err := updater.UpdateMessage(sp.ctx, sp.previewMsgID, finalText); err != nil {
slog.Debug("stream preview finish: final update FAILED, cleaning up preview", "error", err)
// Update failed (e.g. text too long for platform edit API).
// Try to delete the stale preview so caller can send a fresh message.
if cleaner, ok := sp.platform.(PreviewCleaner); ok {
_ = cleaner.DeletePreviewMessage(sp.ctx, sp.previewMsgID)
}
return false
}
if sp.pendingStatus != "" {
if statusUpdater, ok := sp.platform.(PreviewStatusUpdater); ok {
statusUpdater.SetPreviewStatus(sp.previewMsgID, sp.pendingStatus)
}
}
slog.Debug("stream preview finish: success via UpdateMessage")
return true
}
// setStatus updates the card header status of the active preview message.
// If the preview is not yet active or is degraded, the status is saved and
// applied when the preview recovers (at finish time).
func (sp *streamPreview) setStatus(status CardStatus) {
sp.mu.Lock()
defer sp.mu.Unlock()
sp.pendingStatus = status
if sp.previewMsgID == nil || sp.degraded {
return
}
if updater, ok := sp.platform.(PreviewStatusUpdater); ok {
updater.SetPreviewStatus(sp.previewMsgID, status)
}
}
// detachPreview clears the preview message handle so that finish() won't
// delete it. Call this after freeze() when the frozen preview should remain
// visible as a permanent message (e.g. text before the first tool call).
func (sp *streamPreview) detachPreview() {
sp.mu.Lock()
defer sp.mu.Unlock()
sp.previewMsgID = nil
}
// appendSeparator inserts a paragraph break into the accumulated text without
// triggering a flush. Used in quiet mode to visually separate text segments
// that span thinking/tool boundaries without creating separate messages.
// Returns true if the separator was actually added.
func (sp *streamPreview) appendSeparator(sep string) bool {
sp.mu.Lock()
defer sp.mu.Unlock()
if sp.degraded || !sp.cfg.Enabled || sp.fullText == "" {
return false
}
sp.fullText += sep
return true
}
// needsDoneReaction returns true if the preview was delivered via in-place
// UpdateMessage at least once, meaning the user only received a push for the
// initial SendPreviewStart and subsequent updates were silent. In this case a
// "done" reaction can notify the user that processing has completed.
func (sp *streamPreview) needsDoneReaction() bool {
sp.mu.Lock()
defer sp.mu.Unlock()
return sp.previewMsgID != nil && sp.lastSentViaUpdate
}