Files
chenhg5-cc-connect/core/relay_test.go
cg33 23ebad3b02 fix(claudecode): keep turn running on mid-turn compaction event (fixes #481) (#1272)
* fix(claudecode): keep turn running on mid-turn compaction event (fixes #481)

Claude Code's stream-json protocol emits a `type:"result"` event with
`subtype:"compact"` (newer CLI) or `subtype:"compaction"` (older
CLI) when it performs automatic context compaction mid-turn. The
existing handleResult treated every result event as turn completion
(Done=true), so the engine's processInteractiveEvents loop would
return early and drop any subsequent tool calls and assistant
messages for the same turn.

Recognize the compaction subtypes and emit EventResult with
Done=false so the event loop keeps reading from the CLI process. Add
a default case to the readLoop event switch that logs unrecognized
event types at debug level (with the full payload) so future new
event shapes are diagnosable from the log stream.

Rebuild of #483. The original PR also referenced adding diagnostic
logging for unknown event types; that change is included as the new
default case.

- Add isCompactionResult / resultSubtype helpers in session.go.
- Set Done=!isCompaction in handleResult's emitted EventResult.
- Add default case to readLoop's event switch with slog.Debug.
- Add TestHandleResultCompactionSubtypeIsNotTerminal and
  TestIsCompactionResult regression tests; tighten existing
  TestHandleResultParsesUsage with a Done=true assertion.

* fix(core): gate EventResult case body on event.Done (fixes #481)

The previous PR #1272 fix changed the agent-side handleResult to set
Done=!isCompaction, but processInteractiveEvents in core/engine.go never
read event.Done — every EventResult unconditionally ran
cp.Finalize(Completed), AddHistory, noteUserTurnCompleted, and a final
return. So a mid-turn compaction result still caused the engine loop to
exit early, dropping subsequent tool calls and assistant messages.

Gate the entire EventResult case body on event.Done: when an agent
emits a non-terminal result (Done=false) — e.g. mid-turn auto-compact —
slog.Debug it and continue the outer loop to read the next event. All
existing turn-completion side effects and the trailing return only run
when Done=true. The agent-side Done=!isCompaction logic from #1272 is
now effective end-to-end.

Test:
- New TestProcessInteractiveEvents_NonTerminalResultContinuesTurn
  pins issue #481: emits EventResult{Done:false} → EventText →
  EventResult{Done:true} and asserts noteUserTurnCompleted runs exactly
  once (on the terminal result) and the final reply reaches the
  platform. Verified manually that without the engine fix the test
  fails because the compaction event produces an empty '(empty
  response)' message and the loop returns before the final result.
- Existing tests in core/engine_test.go (line 12741, 13026) and
  core/relay_test.go (line 102, 184, 223, 284) that previously sent
  EventResult without setting Done now set Done:true — they were
  relying on the implicit-on EventResult semantics that this change
  inverts.

Verified:
- go test -count=1 -tags no_web -short ./core/...    ok 42s
- go test -count=1 -tags no_web ./agent/claudecode/...  ok
- golangci-lint run --new-from-rev origin/main ./core/...   0 issues

---------

Co-authored-by: Claude <noreply@anthropic.com>
2026-06-16 09:57:21 +08:00

388 lines
13 KiB
Go

package core
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"time"
)
func TestRelayManager_DefaultTimeout(t *testing.T) {
rm := NewRelayManager("")
if rm.timeout != relayTimeout {
t.Fatalf("rm.timeout = %v, want %v", rm.timeout, relayTimeout)
}
}
func TestRelayManager_RelayContextHonorsConfiguredTimeout(t *testing.T) {
rm := NewRelayManager("")
rm.SetTimeout(3 * time.Second)
ctx, cancel := rm.relayContext(context.Background())
defer cancel()
deadline, ok := ctx.Deadline()
if !ok {
t.Fatal("expected relay context deadline")
}
if remaining := time.Until(deadline); remaining <= 0 || remaining > 3*time.Second {
t.Fatalf("time until deadline = %v, want within (0, 3s]", remaining)
}
}
func TestRelayManager_RelayContextDisablesTimeoutAtZero(t *testing.T) {
rm := NewRelayManager("")
rm.SetTimeout(0)
baseCtx := context.Background()
ctx, cancel := rm.relayContext(baseCtx)
defer cancel()
if ctx != baseCtx {
t.Fatal("expected relayContext to return the original context when timeout is disabled")
}
if _, ok := ctx.Deadline(); ok {
t.Fatal("expected no deadline when timeout is disabled")
}
}
type relayVisibilityPlatform struct {
stubPlatformEngine
reconstructed []string
}
func (p *relayVisibilityPlatform) ReconstructReplyCtx(sessionKey string) (any, error) {
p.mu.Lock()
p.reconstructed = append(p.reconstructed, sessionKey)
p.mu.Unlock()
return sessionKey, nil
}
func runRelayVisibilityScenario(t *testing.T, visibility string) (resp string, sourceSent []string, targetSent []string) {
t.Helper()
sourcePlatform := &relayVisibilityPlatform{stubPlatformEngine: stubPlatformEngine{n: "feishu"}}
targetPlatform := &relayVisibilityPlatform{stubPlatformEngine: stubPlatformEngine{n: "feishu"}}
sourceEngine := NewEngine("source", &stubAgent{}, []Platform{sourcePlatform}, "", LangEnglish)
targetSession := newControllableSession("target-session")
targetEngine := NewEngine("target", &controllableAgent{nextSession: targetSession}, []Platform{targetPlatform}, "", LangEnglish)
rm := NewRelayManager("")
rm.Bind("feishu", "chat-1", map[string]string{
"source": "source-bot",
"target": "target-bot",
})
rm.RegisterEngine("source", sourceEngine)
rm.RegisterEngine("target", targetEngine)
if visibility != "" {
rm.SetVisibility(visibility)
}
type relayResult struct {
resp string
err error
}
done := make(chan relayResult, 1)
go func() {
result, err := rm.Send(context.Background(), RelayRequest{
From: "source",
To: "target",
SessionKey: "feishu:chat-1:user-1",
Message: "please ask target",
})
if result != nil {
done <- relayResult{resp: result.Response, err: err}
return
}
done <- relayResult{err: err}
}()
targetSession.events <- Event{Type: EventResult, Content: "target says long answer", Done: true}
select {
case got := <-done:
if got.err != nil {
t.Fatalf("RelayManager.Send() error = %v", got.err)
}
resp = got.resp
case <-time.After(2 * time.Second):
t.Fatal("RelayManager.Send() did not return")
}
return resp, sourcePlatform.getSent(), targetPlatform.getSent()
}
func TestRelayManager_DefaultVisibilityEchoesFullMessages(t *testing.T) {
resp, sourceSent, targetSent := runRelayVisibilityScenario(t, "")
if resp != "target says long answer" {
t.Fatalf("response = %q, want target response", resp)
}
if len(sourceSent) != 1 || sourceSent[0] != "[source-bot → target-bot] please ask target" {
t.Fatalf("source sent = %#v, want full relay request", sourceSent)
}
if len(targetSent) != 1 || targetSent[0] != "[target-bot] target says long answer" {
t.Fatalf("target sent = %#v, want full relay response", targetSent)
}
}
func TestRelayManager_VisibilitySummarySuppressesBodies(t *testing.T) {
resp, sourceSent, targetSent := runRelayVisibilityScenario(t, RelayVisibilitySummary)
if resp != "target says long answer" {
t.Fatalf("response = %q, want target response", resp)
}
if len(sourceSent) != 1 || sourceSent[0] != "[source-bot → target-bot] relay request sent" {
t.Fatalf("source sent = %#v, want summary relay request", sourceSent)
}
if len(targetSent) != 1 || targetSent[0] != "[target-bot] relay response ready (23 chars)" {
t.Fatalf("target sent = %#v, want summary relay response", targetSent)
}
}
func TestRelayManager_VisibilityNoneSuppressesGroupEcho(t *testing.T) {
resp, sourceSent, targetSent := runRelayVisibilityScenario(t, RelayVisibilityNone)
if resp != "target says long answer" {
t.Fatalf("response = %q, want target response", resp)
}
if len(sourceSent) != 0 {
t.Fatalf("source sent = %#v, want no relay request echo", sourceSent)
}
if len(targetSent) != 0 {
t.Fatalf("target sent = %#v, want no relay response echo", targetSent)
}
}
func TestHandleRelay_ReturnsPartialOnTimeout(t *testing.T) {
e := newTestEngine()
session := newControllableSession("relay-session")
e.agent = &controllableAgent{nextSession: session}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
defer cancel()
type relayResult struct {
resp string
err error
}
done := make(chan relayResult, 1)
go func() {
resp, err := e.HandleRelay(ctx, "source", "test:chat-1:user", "hello")
done <- relayResult{resp: resp, err: err}
}()
session.events <- Event{Type: EventText, Content: "partial response", SessionID: "relay-session"}
time.Sleep(40 * time.Millisecond)
// After timeout, HandleRelay consumes the next event from the channel to
// unblock the for-range loop, then checks ctx.Err() and spawns the drain
// goroutine. We need two events: one to unblock HandleRelay, and one
// EventResult for the drain goroutine to close the session cleanly.
session.events <- Event{Type: EventThinking, Content: "still working"}
session.events <- Event{Type: EventResult, Content: "done", Done: true}
got := <-done
if got.err != nil {
t.Fatalf("HandleRelay() error = %v, want nil", got.err)
}
if got.resp != "partial response" {
t.Fatalf("HandleRelay() response = %q, want %q", got.resp, "partial response")
}
// Wait for the background drain goroutine to close the session.
select {
case <-session.closed:
case <-time.After(2 * time.Second):
t.Fatal("background drain goroutine did not close the session")
}
}
func TestHandleRelay_TimeoutWithoutTextReturnsContextError(t *testing.T) {
e := newTestEngine()
session := newControllableSession("relay-session")
e.agent = &controllableAgent{nextSession: session}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
defer cancel()
type relayResult struct {
resp string
err error
}
done := make(chan relayResult, 1)
go func() {
resp, err := e.HandleRelay(ctx, "source", "test:chat-1:user", "hello")
done <- relayResult{resp: resp, err: err}
}()
time.Sleep(40 * time.Millisecond)
// One event to unblock HandleRelay's for-range, one for the drain goroutine.
session.events <- Event{Type: EventThinking, Content: "still working"}
session.events <- Event{Type: EventResult, Content: "done", Done: true}
got := <-done
if got.resp != "" {
t.Fatalf("HandleRelay() response = %q, want empty", got.resp)
}
if !errors.Is(got.err, context.DeadlineExceeded) {
t.Fatalf("HandleRelay() error = %v, want context deadline exceeded", got.err)
}
select {
case <-session.closed:
case <-time.After(2 * time.Second):
t.Fatal("background drain goroutine did not close the session")
}
}
// relayFallbackAgent fails the first StartSession call (simulating a corrupt
// resume) and returns freshSession on the second call (fresh start).
type relayFallbackAgent struct {
callCount int
freshSession AgentSession
}
func (a *relayFallbackAgent) Name() string { return "fallback" }
func (a *relayFallbackAgent) StartSession(_ context.Context, sessionID string) (AgentSession, error) {
a.callCount++
if a.callCount == 1 && sessionID != "" {
return nil, fmt.Errorf("simulated resume failure")
}
return a.freshSession, nil
}
func (a *relayFallbackAgent) ListSessions(_ context.Context) ([]AgentSessionInfo, error) {
return nil, nil
}
func (a *relayFallbackAgent) Stop() error { return nil }
func TestHandleRelay_ResumeFailureFallsBackToFreshSession(t *testing.T) {
e := newTestEngine()
freshSession := newControllableSession("fresh-session")
e.agent = &relayFallbackAgent{freshSession: freshSession}
// Pre-set a stale session ID so that the first StartSession tries to resume.
sourceSessionKey := "test:chat-1:user"
relaySessionKey := "relay:source:test:chat-1"
sess := e.sessions.GetOrCreateActive(relaySessionKey)
sess.SetAgentSessionID("stale-id", "fallback")
e.sessions.Save()
ctx := context.Background()
done := make(chan string, 1)
go func() {
resp, err := e.HandleRelay(ctx, "source", sourceSessionKey, "hello")
if err != nil {
done <- "error: " + err.Error()
return
}
done <- resp
}()
// The fresh session should receive the message and respond.
freshSession.events <- Event{Type: EventResult, Content: "recovered", SessionID: "fresh-session", Done: true}
got := <-done
if got != "recovered" {
t.Fatalf("HandleRelay() = %q, want %q", got, "recovered")
}
// Session should be closed after EventResult.
select {
case <-freshSession.closed:
case <-time.After(2 * time.Second):
t.Fatal("session was not closed after EventResult")
}
}
func TestHandleRelay_SingleWorkspaceUsesGlobalAgentAndSourceSessionKey(t *testing.T) {
e := newTestEngine()
agent := &sessionEnvRecordingAgent{session: newResultAgentSession("global")}
e.agent = agent
sourceSessionKey := "discord:C1:U1"
resp, err := e.HandleRelay(context.Background(), "source", sourceSessionKey, "hello")
if err != nil {
t.Fatalf("HandleRelay() error = %v", err)
}
if resp != "global" {
t.Fatalf("HandleRelay() response = %q, want %q", resp, "global")
}
if got := agent.EnvValue("CC_SESSION_KEY"); got != sourceSessionKey {
t.Fatalf("CC_SESSION_KEY = %q, want %q", got, sourceSessionKey)
}
if got := e.sessions.ActiveSessionID("relay:source:discord:C1"); got == "" {
t.Fatal("expected relay session to be stored under platform-qualified relay key")
}
}
func TestHandleRelay_MultiWorkspaceRoutesBySourceSessionKey(t *testing.T) {
baseDir := t.TempDir()
channelID := "C42"
wsDir := filepath.Join(baseDir, "relay-ws")
if err := os.MkdirAll(wsDir, 0o755); err != nil {
t.Fatal(err)
}
p := &mockChannelResolver{name: "mock", names: map[string]string{channelID: "relay-ws"}}
globalAgent := &sessionEnvRecordingAgent{session: newResultAgentSession("global")}
e := NewEngine("test", globalAgent, []Platform{p}, "", LangEnglish)
e.SetMultiWorkspace(baseDir, filepath.Join(t.TempDir(), "bindings.json"))
normalizedWsDir := normalizeWorkspacePath(wsDir)
workspaceAgent := &sessionEnvRecordingAgent{session: newResultAgentSession("workspace")}
ws := e.workspacePool.GetOrCreate(normalizedWsDir)
ws.agent = workspaceAgent
ws.sessions = NewSessionManager("")
sourceSessionKey := "mock:" + channelID + ":U1"
resp, err := e.HandleRelay(context.Background(), "source", sourceSessionKey, "hello")
if err != nil {
t.Fatalf("HandleRelay() error = %v", err)
}
if resp != "workspace" {
t.Fatalf("HandleRelay() response = %q, want %q", resp, "workspace")
}
if got := workspaceAgent.EnvValue("CC_SESSION_KEY"); got != sourceSessionKey {
t.Fatalf("workspace CC_SESSION_KEY = %q, want %q", got, sourceSessionKey)
}
if got := globalAgent.EnvValue("CC_SESSION_KEY"); got != "" {
t.Fatalf("global agent should not receive relay env, got %q", got)
}
if got := e.sessions.ActiveSessionID("relay:source:mock:" + channelID); got != "" {
t.Fatalf("expected no global relay session, got %q", got)
}
if got := ws.sessions.ActiveSessionID("relay:source:mock:" + channelID); got == "" {
t.Fatal("expected relay session in workspace session manager")
}
if b := e.workspaceBindings.Lookup("project:test", workspaceChannelKey("mock", channelID)); b == nil || b.Workspace != normalizedWsDir {
t.Fatalf("expected convention binding to be created for %q", normalizedWsDir)
}
}
func TestHandleRelay_MultiWorkspaceRequiresWorkspaceBinding(t *testing.T) {
baseDir := t.TempDir()
globalAgent := &sessionEnvRecordingAgent{session: newResultAgentSession("global")}
e := NewEngine("test", globalAgent, nil, "", LangEnglish)
e.SetMultiWorkspace(baseDir, filepath.Join(t.TempDir(), "bindings.json"))
resp, err := e.HandleRelay(context.Background(), "source", "mock:C404:U1", "hello")
if err == nil {
t.Fatal("expected error for unbound relay workspace")
}
if resp != "" {
t.Fatalf("HandleRelay() response = %q, want empty", resp)
}
if !strings.Contains(err.Error(), "no workspace binding") {
t.Fatalf("HandleRelay() error = %v, want missing workspace binding", err)
}
if got := e.sessions.ActiveSessionID("relay:source:mock:C404"); got != "" {
t.Fatalf("expected no global relay session, got %q", got)
}
}