Files
chenhg5-cc-connect/core/observer.go
Claude d21cee5eb7 fix: pre-release fixes for high/medium priority issues from PR review
High priority:
- PR #420: fix Telegram reply context lost by alias resolution —
  resolveAlias now runs on user text before ExtraContent merge
- PR #567: fix message queue race — adoptPendingFromPlaceholder
  copies queued messages when placeholder state is replaced
- PR #437: harden /diff command — reject targets starting with '-',
  use '--' separator to prevent git option injection
- PR #560: add run_as_env dangerous variable blacklist (LD_PRELOAD,
  PATH, HOME, etc.); fix doc claiming PATH is in default allowlist

Medium priority:
- PR #497: ACP permission params log level Info→Debug to prevent
  sensitive data leaking; add toolInputByID cache size cap (1000)
- PR #559: observe new JSONL files start from EOF instead of offset 0,
  preventing full session history replay to Slack
- PR #510: add idempotent Close() test for claude session
- PR #514: bridge ReconstructReplyCtx already has structured JSON +
  backward compat; added decode helper + tests
- PR #440: add instance lock acquire/release unit tests
- PR #569: document ContinueSession behavioral change in code comments

All changes backward-compatible. Tests added for all fixes.

Made-with: Cursor
2026-04-12 20:40:02 +08:00

265 lines
6.5 KiB
Go

package core
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"strings"
"sync"
"time"
"unicode/utf8"
)
// ObserverTarget is an optional interface that platforms can implement to receive
// terminal observation messages. Currently only Slack implements this.
// Other platforms can implement it in the future without changes to core.
type ObserverTarget interface {
SendObservation(ctx context.Context, channelID, text string) error
}
// observation represents a parsed user or assistant message from a JSONL session log.
type observation struct {
role string // "user" or "assistant"
text string
sessionID string
}
// parseObservationLine parses a single JSONL line from a Claude Code session log.
// Returns nil if the line should be skipped (non-message type, sdk-cli entrypoint, etc).
func parseObservationLine(line []byte) *observation {
var raw map[string]any
if err := json.Unmarshal(line, &raw); err != nil {
return nil
}
eventType, _ := raw["type"].(string)
if eventType != "user" && eventType != "assistant" {
return nil
}
// Skip cc-connect's own sessions
if ep, _ := raw["entrypoint"].(string); ep == "sdk-cli" {
return nil
}
sessionID, _ := raw["sessionId"].(string)
msg, _ := raw["message"].(map[string]any)
if msg == nil {
return nil
}
var text string
content := msg["content"]
switch c := content.(type) {
case string:
text = c
case []any:
// Extract text blocks, skip tool_use/thinking
var parts []string
for _, block := range c {
b, ok := block.(map[string]any)
if !ok {
continue
}
if bt, _ := b["type"].(string); bt == "text" {
if t, _ := b["text"].(string); t != "" {
parts = append(parts, t)
}
}
}
text = strings.Join(parts, "\n")
}
return &observation{
role: eventType,
text: text,
sessionID: sessionID,
}
}
// startObserver launches the terminal session observer if configured.
// Called from Engine.Start() after platforms are ready.
func (e *Engine) startObserver() {
if !e.observeEnabled || e.observeProjectDir == "" {
return
}
target := e.findObserverTarget()
if target == nil {
slog.Warn("observe: no platform supports observation; --observe ignored")
return
}
channelID := extractChannelID(e.observeSessionKey)
if channelID == "" {
slog.Warn("observe: could not extract channel ID from session key", "key", e.observeSessionKey)
return
}
ctx, cancel := context.WithCancel(e.ctx)
e.observeCancel = cancel
obs := newSessionObserver(e.observeProjectDir, target, channelID)
go obs.run(ctx)
slog.Info("observe: watching terminal sessions", "dir", e.observeProjectDir, "channel", channelID)
}
// sessionObserver watches Claude Code JSONL session logs and forwards
// user/assistant messages to an ObserverTarget (e.g. Slack).
type sessionObserver struct {
projectDir string
target ObserverTarget
channelID string
offsets map[string]int64 // file path -> last read offset
mu sync.Mutex
}
func newSessionObserver(projectDir string, target ObserverTarget, channelID string) *sessionObserver {
return &sessionObserver{
projectDir: projectDir,
target: target,
channelID: channelID,
offsets: make(map[string]int64),
}
}
// run starts the observation loop. It scans for JSONL files, seeks to end
// (for existing files), then polls for new content every 2 seconds.
// Blocks until ctx is cancelled.
func (o *sessionObserver) run(ctx context.Context) {
// Initial scan: record current end-of-file offsets so we only see NEW content
o.initOffsets()
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
o.poll(ctx)
}
}
}
// initOffsets scans existing JSONL files and records their current size
// so we don't replay historical content on startup.
func (o *sessionObserver) initOffsets() {
files, err := filepath.Glob(filepath.Join(o.projectDir, "*.jsonl"))
if err != nil {
slog.Warn("observe: glob failed", "dir", o.projectDir, "error", err)
}
o.mu.Lock()
defer o.mu.Unlock()
for _, f := range files {
info, err := os.Stat(f)
if err != nil {
continue
}
o.offsets[f] = info.Size()
}
}
// poll checks all JSONL files for new content since last read.
func (o *sessionObserver) poll(ctx context.Context) {
files, err := filepath.Glob(filepath.Join(o.projectDir, "*.jsonl"))
if err != nil {
slog.Warn("observe: glob failed", "dir", o.projectDir, "error", err)
return
}
for _, f := range files {
o.mu.Lock()
offset, known := o.offsets[f]
if !known {
// New file appeared — start at EOF so we do not replay pre-existing
// session history that was already on disk when we first saw the file.
offset = 0
if info, err := os.Stat(f); err == nil {
offset = info.Size()
}
o.offsets[f] = offset
}
o.mu.Unlock()
newOffset := o.tailFile(ctx, f, offset)
o.mu.Lock()
o.offsets[f] = newOffset
o.mu.Unlock()
}
}
// tailFile reads new lines from a JSONL file starting at offset.
// Returns the new offset after reading.
func (o *sessionObserver) tailFile(ctx context.Context, path string, offset int64) int64 {
f, err := os.Open(path)
if err != nil {
return offset
}
defer f.Close()
info, err := f.Stat()
if err != nil || info.Size() <= offset {
return offset
}
if _, err := f.Seek(offset, io.SeekStart); err != nil {
return offset
}
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 0, 64*1024), 1*1024*1024)
for scanner.Scan() {
line := scanner.Bytes()
obs := parseObservationLine(line)
if obs == nil || obs.text == "" {
continue
}
o.forward(ctx, obs)
}
// Use file size as new offset when we've read to EOF cleanly.
// This avoids offset drift from line-length calculation (e.g. \r\n vs \n).
if scanner.Err() == nil {
return info.Size()
}
return offset
}
// forward sends a parsed observation to the target platform.
func (o *sessionObserver) forward(ctx context.Context, obs *observation) {
var msg string
switch obs.role {
case "user":
msg = fmt.Sprintf("user: %s", obs.text)
case "assistant":
msg = fmt.Sprintf("\nClaude: %s", obs.text)
default:
return
}
// Slack has a 4000 char limit per message; truncate if needed
const maxLen = 3900
if len(msg) > maxLen {
truncated := msg[:maxLen]
// Ensure we don't cut mid-rune
for len(truncated) > 0 && !utf8.ValidString(truncated) {
truncated = truncated[:len(truncated)-1]
}
msg = truncated + "\n... (truncated)"
}
if err := o.target.SendObservation(ctx, o.channelID, msg); err != nil {
slog.Warn("observe: forward failed", "error", err)
}
}