fix(qq): route long Send() through HTTP to dodge 15s WS timeout (#1474)

Long replies (~2.5KB+) always tripped the hardcoded 15s timeout in
callAPI because the OneBot readLoop serves both inbound events and
outbound replies — long sends queue behind inbound traffic and stall.

New `send_via_http` config knob (default "auto"):

  - "auto"   → WS for <=512 byte messages, HTTP for longer ones.
               Falls back to WS when http_url is unset.
  - "always" → always HTTP (requires http_url); falls back to WS when
               absent so unconfigured deployments still deliver.
  - "never"  → always WebSocket, even for long messages.

SendFile() and SendImage() are unaffected — separate code paths.

Tests cover: short-msg WS path, long-msg HTTP fallback (2500B),
private-vs-group, threshold boundary, mode=always/never, auto with
no http_url, invalid config value (falls back to auto), and a
table-driven shouldUseHTTP test.
This commit is contained in:
dev-claudecode
2026-07-01 17:18:12 +00:00
parent 7915635f10
commit 0ebdcf5916
4 changed files with 363 additions and 4 deletions

View File

@@ -17,6 +17,7 @@
- **feishu**: coalesce consecutive image messages from the same session into a single multi-image dispatch to fix first-image drop on batch sends (#1395). When the Feishu mobile client sends N images in quick succession, each image arrives as a separate `image` event with very close `create_time` values. Dispatching each immediately caused core/engine's `create_time` watermark (PR #1168) to drop the oldest image, so the agent only saw N-1 images. A per-session image buffer with a 150ms quiet window now merges the burst into one `core.Message` carrying all images, in send order. Single-image sends and quoted-image replies are unaffected.
- **claudecode**: fix per-spawn system-prompt temp file EACCES under `run_as_user` (#1429). The per-spawn temp file written by `writeTempAppendPromptFile` (the 1% edge-case path used when the prompt has session-specific platform formatting or user `append_system_prompt`) inherited `os.CreateTemp`'s 0600 mode and was owned by the cc-connect process user (often root under systemd). When the agent was spawned under a different `run_as_user`, it could not read the file and exited before any prompt was loaded. The file is now `chmod 0o644` immediately after write, matching the shared `ensureSharedSystemPromptFile` path. Prompt content is non-secret (a superset of the already-shared base prompt), so 0644 is consistent with the shared file. Does not affect the shared-file path (already 0644 since #1376) or the daemon-mode path resolution (#1419).
- **core**: queue post-restart notification and dispatch on platform ready (#1383). Previously `/restart` sent the success notification immediately after engine startup, racing the platform's async connect window (Telegram: ~2.6s). On a not-yet-ready platform the send was silently dropped at debug log level. The notify is now queued on the engine and dispatched when the target platform reaches `OnPlatformReady`, with bounded retry (3 attempts, 0/500/1500 ms backoff) on transient send failure. Failed sends log at warn level. A 10s safety timeout drops the notify with a warning if the target platform never reaches ready, so startup is never blocked indefinitely. Also covers Discord / Weixin / Matrix (other AsyncRecoverablePlatform implementations) for free.
- **qq**: route long `Send()` messages through the OneBot HTTP API to avoid the 15s WebSocket timeout (#1474). Long replies (~2.5KB+) always tripped the hardcoded WS timeout in `callAPI` because the WS readLoop is shared between inbound events and outbound replies; long sends queue behind inbound traffic and stall. New `send_via_http` config knob (default `"auto"`): `"auto"` switches to HTTP when message exceeds 512 bytes and `http_url` is configured, `"always"` forces HTTP, `"never"` keeps WebSocket. Falls back to WS when `http_url` is empty so unconfigured deployments still deliver. `SendFile()` and `SendImage()` are unaffected (separate code paths).
## v1.3.3 (2026-06-15)

View File

@@ -1570,6 +1570,20 @@ app_secret = "your-feishu-app-secret"
# http_url = "" # optional: OneBot HTTP API URL for file operations, e.g. "http://127.0.0.1:3000"
# # Required for SendFile. Useful when OneBot runs in WSL/Docker.
# # 可选OneBot HTTP API 地址用于文件发送。OneBot 在 WSL/Docker 中时必须配置。
# send_via_http = "auto" # optional: how Send() dispatches text replies (default "auto")
# # "auto" → WS for short messages (<=512 bytes), HTTP for longer ones.
# # Long replies via WS can hit the 15s OneBot WS timeout (#1474).
# # HTTP has a 120s timeout and avoids WS readLoop queueing.
# # Falls back to WS if http_url is not configured.
# # "always" → always use HTTP (requires http_url); falls back to WS if absent.
# # "never" → always use WebSocket, even for long messages.
# # 可选Send() 的派发方式(默认 "auto"
# # "auto" → 短消息(<=512 字节)走 WebSocket长消息走 HTTP。
# # 长回复走 WS 可能命中 OneBot 15 秒 WS 超时(#1474
# # HTTP 有 120 秒超时,并避免阻塞 WS readLoop。
# # 未配置 http_url 时退回 WS。
# # "always" → 始终走 HTTP需要 http_url缺失时退回 WS。
# # "never" → 始终走 WebSocket即使长消息。
# allow_from = "*" # allowed QQ user IDs, e.g. "12345,67890" or "*" for all
# # 允许的 QQ 号,如 "12345,67890""*" 表示所有
# share_session_in_channel = false # If true, all users in a group share one agent session / 群聊共享会话

View File

@@ -23,6 +23,14 @@ func init() {
core.RegisterPlatform("qq", New)
}
// defaultHTTPFallbackThreshold is the message size (in bytes) above which Send()
// routes through the OneBot HTTP API instead of the WebSocket. The OneBot WS
// implementation round-trips every reply through the same readLoop that
// delivers events, so a long reply can race or stall and trip the 15s WS
// timeout (see #1474). HTTP send has a dedicated 120s timeout and avoids
// queueing on the WS readLoop.
const defaultHTTPFallbackThreshold = 512
// Platform connects to a OneBot v11 implementation (NapCat, LLOneBot, etc.)
// via forward WebSocket. It receives message events and sends messages back
// through the same WS connection.
@@ -40,7 +48,8 @@ type Platform struct {
selfID int64
dedup core.MessageDedup
groupNameCache sync.Map // groupID -> group name
httpURL string // OneBot HTTP API URL, e.g. "http://127.0.0.1:3000"
httpURL string // OneBot HTTP API URL, e.g. "http://127.0.0.1:3000"
sendViaHTTP string // "auto" (default) | "always" | "never"
}
func New(opts map[string]any) (core.Platform, error) {
@@ -57,12 +66,26 @@ func New(opts map[string]any) (core.Platform, error) {
httpURL, _ := opts["http_url"].(string)
httpURL = strings.TrimRight(httpURL, "/")
sendViaHTTP, _ := opts["send_via_http"].(string)
switch sendViaHTTP {
case "", "auto", "always", "never":
// valid
default:
slog.Warn("qq: invalid send_via_http value, falling back to auto",
"got", sendViaHTTP, "valid", []string{"auto", "always", "never"})
sendViaHTTP = "auto"
}
if sendViaHTTP == "" {
sendViaHTTP = "auto"
}
return &Platform{
wsURL: wsURL,
token: token,
allowFrom: allowFrom,
shareSessionInChannel: shareSessionInChannel,
httpURL: httpURL,
httpURL: httpURL,
sendViaHTTP: sendViaHTTP,
}, nil
}
@@ -443,6 +466,13 @@ func (p *Platform) Reply(ctx context.Context, replyCtx any, content string) erro
}
// Send sends a message to the conversation identified by replyCtx.
//
// Long messages route through the OneBot HTTP API instead of WebSocket. The
// OneBot WS implementation round-trips every reply through the same readLoop
// that delivers inbound events, so a long reply can race or stall and trip
// the 15s WS timeout (#1474). HTTP send has a dedicated 120s timeout and
// avoids queueing on the WS readLoop. Threshold + mode controlled by
// `send_via_http` config (default "auto" → switch at 512 bytes).
func (p *Platform) Send(ctx context.Context, replyCtx any, content string) error {
rctx, ok := replyCtx.(*replyContext)
if !ok {
@@ -453,17 +483,42 @@ func (p *Platform) Send(ctx context.Context, replyCtx any, content string) error
"message": content,
}
useHTTP := p.shouldUseHTTP(content)
call := p.callAPI
if useHTTP {
call = p.callHTTPAPI
}
if rctx.messageType == "group" {
params["group_id"] = rctx.groupID
_, err := p.callAPI("send_group_msg", params)
_, err := call("send_group_msg", params)
return err
}
params["user_id"] = rctx.userID
_, err := p.callAPI("send_private_msg", params)
_, err := call("send_private_msg", params)
return err
}
// shouldUseHTTP returns true when Send() should dispatch through the OneBot
// HTTP API for this message. Honors the `send_via_http` config:
// - "always" → true (skip WS entirely for sends)
// - "never" → false (always use WS, even for long messages)
// - "auto" → true when len(content) > 512 bytes AND http_url is configured
//
// "auto" falls back to WS when http_url is empty — there is no HTTP path to
// use. SendFile() and SendImage() are unaffected (separate code paths).
func (p *Platform) shouldUseHTTP(content string) bool {
switch p.sendViaHTTP {
case "always":
return p.httpURL != ""
case "never":
return false
default: // "auto" or unset
return p.httpURL != "" && len(content) > defaultHTTPFallbackThreshold
}
}
// SendImage sends an image to the conversation.
// Implements core.ImageSender.
func (p *Platform) SendImage(ctx context.Context, replyCtx any, img core.ImageAttachment) error {

View File

@@ -1,10 +1,13 @@
package qq
import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"strings"
"sync/atomic"
"testing"
"time"
@@ -149,3 +152,289 @@ func TestStart_FetchesSelfIDWithoutTimeout(t *testing.T) {
t.Errorf("selfID = %d, want %d (self-message filter would be disabled)", p.selfID, botUserID)
}
}
// napcatServer builds a mock NapCat exposing both WS (for receive + default
// send) and HTTP (for HTTP send fallback). It records how many times each
// surface was called so tests can assert which path Send() took.
type napcatServer struct {
ts *httptest.Server
wsSends atomic.Int32 // send_group_msg / send_private_msg over WS
httpSends atomic.Int32 // send_group_msg / send_private_msg over HTTP
}
func newNapcatServer(t *testing.T) *napcatServer {
t.Helper()
ns := &napcatServer{}
upgrader := websocket.Upgrader{}
ns.ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if websocket.IsWebSocketUpgrade(r) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer c.Close()
for {
_, msg, err := c.ReadMessage()
if err != nil {
return
}
var req map[string]any
if err := json.Unmarshal(msg, &req); err != nil {
continue
}
action, _ := req["action"].(string)
switch action {
case "get_login_info":
echo, _ := req["echo"].(string)
resp := map[string]any{
"status": "ok",
"retcode": 0,
"echo": echo,
"data": map[string]any{"user_id": 999999, "nickname": "TestBot"},
}
raw, _ := json.Marshal(resp)
_ = c.WriteMessage(websocket.TextMessage, raw)
case "send_group_msg", "send_private_msg":
ns.wsSends.Add(1)
echo, _ := req["echo"].(string)
resp := map[string]any{
"status": "ok",
"retcode": 0,
"echo": echo,
"data": map[string]any{"message_id": 12345},
}
raw, _ := json.Marshal(resp)
_ = c.WriteMessage(websocket.TextMessage, raw)
}
}
}
// HTTP API surface for Send() HTTP fallback.
body, _ := io.ReadAll(r.Body)
var params map[string]any
_ = json.Unmarshal(body, &params)
// Echo path on HTTP root for testing error formatting.
_ = params
ns.httpSends.Add(1)
resp := map[string]any{
"status": "ok",
"retcode": 0,
"data": map[string]any{"message_id": 67890},
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
t.Cleanup(ns.ts.Close)
return ns
}
func startPlatform(t *testing.T, ns *napcatServer, opts map[string]any) *Platform {
t.Helper()
if opts == nil {
opts = map[string]any{}
}
opts["ws_url"] = "ws" + strings.TrimPrefix(ns.ts.URL, "http")
if _, ok := opts["http_url"]; !ok {
opts["http_url"] = ns.ts.URL
}
p, err := New(opts)
if err != nil {
t.Fatalf("New: %v", err)
}
platform := p.(*Platform)
if err := platform.Start(func(core.Platform, *core.Message) {}); err != nil {
t.Fatalf("Start: %v", err)
}
t.Cleanup(func() { _ = platform.Stop() })
return platform
}
func TestSend_ShortMessageUsesWebSocket(t *testing.T) {
ns := newNapcatServer(t)
p := startPlatform(t, ns, nil)
rctx := &replyContext{messageType: "group", groupID: 12345}
if err := p.Send(context.Background(), rctx, "short message"); err != nil {
t.Fatalf("Send: %v", err)
}
if got := ns.wsSends.Load(); got != 1 {
t.Errorf("wsSends = %d, want 1 (short message must go via WS)", got)
}
if got := ns.httpSends.Load(); got != 0 {
t.Errorf("httpSends = %d, want 0 (short message must NOT hit HTTP)", got)
}
}
func TestSend_LongMessageFallbackHTTP(t *testing.T) {
ns := newNapcatServer(t)
p := startPlatform(t, ns, nil)
rctx := &replyContext{messageType: "group", groupID: 12345}
// Reporter's reproducer: ~2500 byte message would always hit the 15s WS
// timeout; must now dispatch through HTTP.
long := strings.Repeat("a", 2500)
if err := p.Send(context.Background(), rctx, long); err != nil {
t.Fatalf("Send: %v", err)
}
if got := ns.httpSends.Load(); got != 1 {
t.Errorf("httpSends = %d, want 1 (long message must go via HTTP)", got)
}
if got := ns.wsSends.Load(); got != 0 {
t.Errorf("wsSends = %d, want 0 (long message must NOT hit WS)", got)
}
}
func TestSend_LongPrivateMessageFallbackHTTP(t *testing.T) {
ns := newNapcatServer(t)
p := startPlatform(t, ns, nil)
rctx := &replyContext{messageType: "private", userID: 99999}
long := strings.Repeat("b", 3000)
if err := p.Send(context.Background(), rctx, long); err != nil {
t.Fatalf("Send: %v", err)
}
if got := ns.httpSends.Load(); got != 1 {
t.Errorf("httpSends = %d, want 1", got)
}
if got := ns.wsSends.Load(); got != 0 {
t.Errorf("wsSends = %d, want 0", got)
}
}
func TestSend_AtThresholdUsesWebSocket(t *testing.T) {
ns := newNapcatServer(t)
p := startPlatform(t, ns, nil)
rctx := &replyContext{messageType: "group", groupID: 12345}
// Boundary: exactly at threshold → still WS (> not >=).
if err := p.Send(context.Background(), rctx, strings.Repeat("c", 512)); err != nil {
t.Fatalf("Send: %v", err)
}
if got := ns.wsSends.Load(); got != 1 {
t.Errorf("wsSends = %d, want 1 at threshold", got)
}
if got := ns.httpSends.Load(); got != 0 {
t.Errorf("httpSends = %d, want 0 at threshold", got)
}
}
func TestSend_ModeAlwaysUsesHTTP(t *testing.T) {
ns := newNapcatServer(t)
p := startPlatform(t, ns, map[string]any{"send_via_http": "always"})
rctx := &replyContext{messageType: "group", groupID: 12345}
if err := p.Send(context.Background(), rctx, "x"); err != nil {
t.Fatalf("Send: %v", err)
}
if got := ns.httpSends.Load(); got != 1 {
t.Errorf("httpSends = %d, want 1", got)
}
if got := ns.wsSends.Load(); got != 0 {
t.Errorf("wsSends = %d, want 0", got)
}
}
func TestSend_ModeNeverUsesWebSocket(t *testing.T) {
ns := newNapcatServer(t)
p := startPlatform(t, ns, map[string]any{"send_via_http": "never"})
rctx := &replyContext{messageType: "group", groupID: 12345}
long := strings.Repeat("d", 2500)
if err := p.Send(context.Background(), rctx, long); err != nil {
t.Fatalf("Send: %v", err)
}
if got := ns.wsSends.Load(); got != 1 {
t.Errorf("wsSends = %d, want 1 (never mode must use WS even for long)", got)
}
if got := ns.httpSends.Load(); got != 0 {
t.Errorf("httpSends = %d, want 0", got)
}
}
func TestSend_AutoNoHTTPURLFallsBackToWebSocket(t *testing.T) {
ns := newNapcatServer(t)
p := startPlatform(t, ns, map[string]any{"http_url": ""})
rctx := &replyContext{messageType: "group", groupID: 12345}
long := strings.Repeat("e", 2500)
if err := p.Send(context.Background(), rctx, long); err != nil {
t.Fatalf("Send: %v", err)
}
if got := ns.wsSends.Load(); got != 1 {
t.Errorf("wsSends = %d, want 1 (no http_url → must use WS)", got)
}
if got := ns.httpSends.Load(); got != 0 {
t.Errorf("httpSends = %d, want 0", got)
}
}
func TestSend_ModeAlwaysNoHTTPURLGracefullyFallsBackToWS(t *testing.T) {
ns := newNapcatServer(t)
p := startPlatform(t, ns, map[string]any{"send_via_http": "always", "http_url": ""})
rctx := &replyContext{messageType: "group", groupID: 12345}
// With send_via_http=always but no http_url configured, must fall back to
// WebSocket instead of failing — better to deliver via the available path
// than surface a config error every send. Matches SendFile() behavior.
if err := p.Send(context.Background(), rctx, "x"); err != nil {
t.Fatalf("Send: %v", err)
}
if got := ns.wsSends.Load(); got != 1 {
t.Errorf("wsSends = %d, want 1 (no http_url → must fall back to WS)", got)
}
if got := ns.httpSends.Load(); got != 0 {
t.Errorf("httpSends = %d, want 0", got)
}
}
func TestNew_InvalidSendViaHTTPFallsBackToAuto(t *testing.T) {
p, err := New(map[string]any{"send_via_http": "bogus"})
if err != nil {
t.Fatalf("New: %v", err)
}
platform := p.(*Platform)
if platform.sendViaHTTP != "auto" {
t.Errorf("sendViaHTTP = %q, want auto", platform.sendViaHTTP)
}
}
func TestShouldUseHTTP(t *testing.T) {
cases := []struct {
name string
sendMode string
httpURL string
len int
wantHTTP bool
}{
{"auto_short_under_threshold", "auto", "http://x", 100, false},
{"auto_long_over_threshold", "auto", "http://x", 513, true},
{"auto_long_no_http_url", "auto", "", 5000, false},
{"always_short_with_http", "always", "http://x", 10, true},
{"always_short_no_http", "always", "", 10, false},
{"never_long", "never", "http://x", 5000, false},
{"default_empty_mode_long", "", "http://x", 1000, true},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
p := &Platform{sendViaHTTP: c.sendMode, httpURL: c.httpURL}
if c.sendMode == "" {
// New() normalizes "" → "auto"; mimic that here.
p.sendViaHTTP = "auto"
}
got := p.shouldUseHTTP(strings.Repeat("x", c.len))
if got != c.wantHTTP {
t.Errorf("shouldUseHTTP(%d) = %v, want %v", c.len, got, c.wantHTTP)
}
})
}
}