mirror of
https://github.com/chenhg5/cc-connect.git
synced 2026-07-03 12:28:10 +08:00
fix(slack): stop chat.update once payload exceeds size limit; deliver full reply via fresh postMessage
Surfaced during v1.4.0-beta.1 QA: when an agent produced a ~7.3 KB Markdown reply in Slack, the streaming card (#1333) silently failed mid-stream with `msg_too_long`, then Finalize tried to chat.update the full reply and also failed, forcing the engine onto its emergency fallback path. The user perceived the turn as a non-streaming "one-shot output preceded by ~30s of silence". Root cause: Slack's `chat.update` text parameter is capped at ~4000 bytes (server-side, despite docs saying chars). The streaming card had no size guard and Update() also swallowed errors with no log, so the half-broken state was invisible from logs until Finalize blew up. Fix: - Introduce `slackUpdateMaxText = 3500` (conservative; leaves headroom for multi-byte CJK content where Go's len() overshoots Slack's limit). - `Update()`: once rendered exceeds the limit, silently skip the chat.update tick (leave the streaming card at its last fitting snapshot) and emit a Debug log instead of staying invisible. - `Update()`: also Debug-log transient render errors (previously swallowed entirely). - `Finalize()`: when the final payload exceeds the limit AND a card has already been posted, deliver the full reply via a *fresh* chat.postMessage (which has a 40 KB ceiling), so the engine sees success and skips its duplicate-fallback emission. The partial streaming card stays visible above the new full-content message. - Extract `postFresh()` helper to share the postMessage logic between the lazy first-post path and the new overflow path. Tests: - `TestStreamingCard_FinalizeFallsBackToFreshPostOnOversize`: oversized Finalize must NOT touch chat.update and must postMessage the full payload (and not mark the card failed). - `TestStreamingCard_UpdateSkipsOversizedPayload`: oversized Update tick must silently no-op (no chat.update call). Net behaviour: - Short replies (< ~3500 B mrkdwn): unchanged — chat.update streaming keeps working as #1333 intended. - Long replies: no more `msg_too_long` errors, no more duplicate fallback messages from the engine. UX is "partial streaming card + one final full-content message" instead of "broken card + duplicate full-content message". Closes the third user-visible bug surfaced during v1.4.0-beta.1 QA pairing (after #1395 image batch window default and the nav.cron i18n miss). Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -3,6 +3,7 @@ package slack
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -16,6 +17,16 @@ import (
|
||||
// risks chat.update rate limits.
|
||||
const cardUpdateMinInterval = 3 * time.Second
|
||||
|
||||
// slackUpdateMaxText is the conservative payload-size cap (in bytes of the
|
||||
// post-conversion mrkdwn text) for `chat.update`. Slack documents the text
|
||||
// parameter as ~4000 chars but enforces it server-side as a byte count and
|
||||
// occasionally tightens it; once exceeded the API returns `msg_too_long`.
|
||||
// We stop attempting in-place edits past this threshold and let Finalize
|
||||
// deliver the full reply as a fresh message instead. 3500 leaves headroom
|
||||
// for multi-byte CJK content where Go's `len()` (bytes) overshoots Slack's
|
||||
// effective limit.
|
||||
const slackUpdateMaxText = 3500
|
||||
|
||||
// slackStreamingCard aggregates one agent turn (thinking + tool steps + answer)
|
||||
// into a single Slack message that updates in place — the cc-connect equivalent
|
||||
// of DingTalk's AI Card. The message is posted LAZILY on the first non-empty
|
||||
@@ -46,15 +57,23 @@ func (p *Platform) CreateStreamingCard(ctx context.Context, rctx any) (core.Stre
|
||||
return &slackStreamingCard{client: p.client, channel: rc.channel, threadTS: rc.timestamp}, nil
|
||||
}
|
||||
|
||||
// postFresh posts a brand-new message — the lazy first post for an unseen
|
||||
// card, or the "too long for chat.update" overflow path used by Finalize.
|
||||
// Caller must hold c.mu.
|
||||
func (c *slackStreamingCard) postFresh(ctx context.Context, rendered string) (string, error) {
|
||||
opts := []slack.MsgOption{slack.MsgOptionText(rendered, false)}
|
||||
if c.threadTS != "" {
|
||||
opts = append(opts, slack.MsgOptionPostMessageParameters(slack.PostMessageParameters{ThreadTimestamp: c.threadTS}))
|
||||
}
|
||||
_, ts, err := c.client.PostMessageContext(ctx, c.channel, opts...)
|
||||
return ts, err
|
||||
}
|
||||
|
||||
// render posts the card on first use, then edits it in place thereafter.
|
||||
// Caller must hold c.mu.
|
||||
func (c *slackStreamingCard) render(ctx context.Context, rendered string) error {
|
||||
if c.ts == "" {
|
||||
opts := []slack.MsgOption{slack.MsgOptionText(rendered, false)}
|
||||
if c.threadTS != "" {
|
||||
opts = append(opts, slack.MsgOptionPostMessageParameters(slack.PostMessageParameters{ThreadTimestamp: c.threadTS}))
|
||||
}
|
||||
_, ts, err := c.client.PostMessageContext(ctx, c.channel, opts...)
|
||||
ts, err := c.postFresh(ctx, rendered)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -68,6 +87,11 @@ func (c *slackStreamingCard) render(ctx context.Context, rendered string) error
|
||||
// Update renders the latest aggregated content. The first post is immediate;
|
||||
// subsequent edits are coalesced to ~cardUpdateMinInterval. Transient errors are
|
||||
// swallowed (Finalize retries) so a blip doesn't abort the turn.
|
||||
//
|
||||
// Once the rendered payload exceeds slackUpdateMaxText we stop attempting
|
||||
// chat.update edits (which would fail with `msg_too_long`); the existing
|
||||
// streaming card stays at the last fitting snapshot and Finalize delivers the
|
||||
// full reply via a fresh postMessage.
|
||||
func (c *slackStreamingCard) Update(ctx context.Context, content string) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
@@ -81,7 +105,13 @@ func (c *slackStreamingCard) Update(ctx context.Context, content string) error {
|
||||
if rendered == "" || rendered == c.lastSent {
|
||||
return nil
|
||||
}
|
||||
if c.ts != "" && len(rendered) > slackUpdateMaxText {
|
||||
slog.Debug("slack: streaming card update skipped: payload exceeds chat.update limit",
|
||||
"size", len(rendered), "limit", slackUpdateMaxText)
|
||||
return nil
|
||||
}
|
||||
if err := c.render(ctx, rendered); err != nil {
|
||||
slog.Debug("slack: streaming card update failed (will retry on next tick / finalize)", "error", err)
|
||||
return nil
|
||||
}
|
||||
c.lastUpdate = time.Now()
|
||||
@@ -90,8 +120,11 @@ func (c *slackStreamingCard) Update(ctx context.Context, content string) error {
|
||||
}
|
||||
|
||||
// Finalize writes the final content unconditionally (no throttle); it posts the
|
||||
// card if it was never posted. On error it marks the card failed and returns the
|
||||
// error so the engine falls back to a normal message.
|
||||
// card if it was never posted. When the final payload exceeds the chat.update
|
||||
// size limit AND a card has already been posted, we deliver the full reply as
|
||||
// a fresh postMessage (the streaming card stays at its last fitting snapshot)
|
||||
// instead of failing with `msg_too_long`. The engine sees success, so no
|
||||
// fallback duplicate is sent.
|
||||
func (c *slackStreamingCard) Finalize(ctx context.Context, content string) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
@@ -102,6 +135,20 @@ func (c *slackStreamingCard) Finalize(ctx context.Context, content string) error
|
||||
if rendered == "" || rendered == c.lastSent {
|
||||
return nil
|
||||
}
|
||||
// Long reply + card already posted: chat.update would 413 with msg_too_long;
|
||||
// post a fresh message with the full content instead. This is the in-house
|
||||
// equivalent of "see full reply below" — the partial streaming card stays
|
||||
// visible above the new full-content message.
|
||||
if c.ts != "" && len(rendered) > slackUpdateMaxText {
|
||||
slog.Debug("slack: streaming card finalize switching to fresh postMessage (payload exceeds chat.update limit)",
|
||||
"size", len(rendered), "limit", slackUpdateMaxText)
|
||||
if _, err := c.postFresh(ctx, rendered); err != nil {
|
||||
c.failed = true
|
||||
return fmt.Errorf("slack: finalize streaming card: %w", err)
|
||||
}
|
||||
c.lastSent = rendered
|
||||
return nil
|
||||
}
|
||||
if err := c.render(ctx, rendered); err != nil {
|
||||
c.failed = true
|
||||
return fmt.Errorf("slack: finalize streaming card: %w", err)
|
||||
|
||||
140
platform/slack/streaming_card_test.go
Normal file
140
platform/slack/streaming_card_test.go
Normal file
@@ -0,0 +1,140 @@
|
||||
package slack
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/slack-go/slack"
|
||||
)
|
||||
|
||||
// TestStreamingCard_FinalizeFallsBackToFreshPostOnOversize covers the
|
||||
// `msg_too_long` regression observed during v1.4.0-beta.1 QA: when an agent
|
||||
// reply grew past Slack's chat.update text limit (~4000 bytes), Finalize
|
||||
// failed with msg_too_long and the engine had to send a duplicate fallback
|
||||
// message. After the fix, Finalize detects the overflow up-front and posts
|
||||
// the full reply as a fresh message via chat.postMessage (which has a much
|
||||
// larger 40k limit), so the engine sees success and no duplicate is sent.
|
||||
func TestStreamingCard_FinalizeFallsBackToFreshPostOnOversize(t *testing.T) {
|
||||
var (
|
||||
postCalls int32
|
||||
updateCalls int32
|
||||
lastPosted string
|
||||
)
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.URL.Path {
|
||||
case "/chat.postMessage":
|
||||
atomic.AddInt32(&postCalls, 1)
|
||||
if err := r.ParseForm(); err != nil {
|
||||
t.Fatalf("ParseForm: %v", err)
|
||||
}
|
||||
lastPosted = r.FormValue("text")
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"ok": true,
|
||||
"channel": "C1",
|
||||
"ts": "1700000000.0001",
|
||||
})
|
||||
case "/chat.update":
|
||||
atomic.AddInt32(&updateCalls, 1)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"ok": true,
|
||||
"channel": "C1",
|
||||
"ts": "1700000000.0001",
|
||||
"text": "edited",
|
||||
})
|
||||
default:
|
||||
t.Fatalf("unexpected slack API path %q", r.URL.Path)
|
||||
}
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
client := slack.New("xoxb-test", slack.OptionAPIURL(srv.URL+"/"))
|
||||
card := &slackStreamingCard{client: client, channel: "C1"}
|
||||
|
||||
// First (short) update posts the card via chat.postMessage.
|
||||
if err := card.Update(context.Background(), "hello"); err != nil {
|
||||
t.Fatalf("first Update: %v", err)
|
||||
}
|
||||
if got := atomic.LoadInt32(&postCalls); got != 1 {
|
||||
t.Fatalf("postMessage calls after first update = %d, want 1", got)
|
||||
}
|
||||
if card.ts == "" {
|
||||
t.Fatal("card.ts not set after first post")
|
||||
}
|
||||
|
||||
// Oversized reply: bigger than slackUpdateMaxText. Finalize must NOT
|
||||
// hit chat.update (would 413 with msg_too_long); it should fall back
|
||||
// to a fresh postMessage with the full content.
|
||||
huge := strings.Repeat("a", slackUpdateMaxText+1024)
|
||||
if err := card.Finalize(context.Background(), huge); err != nil {
|
||||
t.Fatalf("Finalize (oversized): %v", err)
|
||||
}
|
||||
if got := atomic.LoadInt32(&updateCalls); got != 0 {
|
||||
t.Fatalf("chat.update calls = %d, want 0 (oversized payload should NOT touch chat.update)", got)
|
||||
}
|
||||
if got := atomic.LoadInt32(&postCalls); got != 2 {
|
||||
t.Fatalf("postMessage calls after oversized finalize = %d, want 2 (initial + fallback)", got)
|
||||
}
|
||||
if !strings.HasPrefix(lastPosted, "aaaa") || len(lastPosted) < len(huge) {
|
||||
t.Fatalf("oversized fallback postMessage text len=%d, want >= %d and prefixed with payload", len(lastPosted), len(huge))
|
||||
}
|
||||
if card.failed {
|
||||
t.Fatal("card marked failed despite successful oversized fallback")
|
||||
}
|
||||
}
|
||||
|
||||
// TestStreamingCard_UpdateSkipsOversizedPayload ensures that once the rendered
|
||||
// content crosses the chat.update size cap, subsequent Update() ticks become
|
||||
// silent no-ops (the engine sees success, the partial card stays on the prior
|
||||
// snapshot, and Finalize handles the full payload via postFresh).
|
||||
func TestStreamingCard_UpdateSkipsOversizedPayload(t *testing.T) {
|
||||
var (
|
||||
postCalls int32
|
||||
updateCalls int32
|
||||
)
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.URL.Path {
|
||||
case "/chat.postMessage":
|
||||
atomic.AddInt32(&postCalls, 1)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "channel": "C1", "ts": "1700000000.0001"})
|
||||
case "/chat.update":
|
||||
atomic.AddInt32(&updateCalls, 1)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{"ok": true, "channel": "C1", "ts": "1700000000.0001", "text": "edited"})
|
||||
default:
|
||||
t.Fatalf("unexpected path %q", r.URL.Path)
|
||||
}
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
client := slack.New("xoxb-test", slack.OptionAPIURL(srv.URL+"/"))
|
||||
card := &slackStreamingCard{client: client, channel: "C1"}
|
||||
|
||||
// First post — sets card.ts and bypasses the throttle.
|
||||
if err := card.Update(context.Background(), "hi"); err != nil {
|
||||
t.Fatalf("first Update: %v", err)
|
||||
}
|
||||
if got := atomic.LoadInt32(&postCalls); got != 1 {
|
||||
t.Fatalf("postMessage calls = %d, want 1", got)
|
||||
}
|
||||
|
||||
// Drop the throttle so the next Update() is admissible, then feed it an
|
||||
// oversized payload — Update must silently skip without touching
|
||||
// chat.update.
|
||||
card.lastUpdate = card.lastUpdate.Add(-2 * cardUpdateMinInterval)
|
||||
huge := strings.Repeat("b", slackUpdateMaxText+100)
|
||||
if err := card.Update(context.Background(), huge); err != nil {
|
||||
t.Fatalf("oversized Update: %v", err)
|
||||
}
|
||||
if got := atomic.LoadInt32(&updateCalls); got != 0 {
|
||||
t.Fatalf("chat.update calls after oversized Update = %d, want 0 (silent skip)", got)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user