Files
chenhg5-cc-connect/core/api.go
Claude aeaeecdac6 feat(api): support combining --as-prompt with --new-thread in send command (#590)
Adds three new engine methods:
- InjectPrompt: inject a message into the agent's stdin as a prompt
- PostToNewThread: post a message to the platform as a new top-level message
- InjectPromptToNewThread: combine both for completion-watcher flows

Adds CLI flags --as-prompt and --new-thread to the send subcommand, both
usable alone or together. The previous handleSend dispatched on AsPrompt
first and never reached the NewThread branch, so the combined mode was
silently broken.

Fresh implementation supersedes the stale #593 branch. Fixes #590.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-16 17:05:46 +00:00

806 lines
22 KiB
Go

package core
import (
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
// APIServer exposes a local Unix socket API for external tools (e.g. cron jobs)
// to send messages to active sessions.
type APIServer struct {
socketPath string
listener net.Listener
server *http.Server
mux *http.ServeMux
engines map[string]*Engine // project name → engine
cron *CronScheduler
timer *TimerScheduler
relay *RelayManager
mu sync.RWMutex
}
// SendRequest is the JSON body for POST /send.
//
// Audios and Videos are kept separate from Files so the engine can
// dispatch them to AudioSender / VideoSender (native voice / video
// bubble) instead of FileSender (generic file download). The fields
// reuse FileAttachment as the wire format because audio/video clips
// are byte blobs with a name + mime — the dedicated typing happens at
// the dispatch layer in engine.go. See cc-connect internal task
// t-20260615-cqjbk1.
type SendRequest struct {
Project string `json:"project"`
SessionKey string `json:"session_key"`
Message string `json:"message"`
TTSText string `json:"tts_text,omitempty"`
Images []ImageAttachment `json:"images,omitempty"`
Files []FileAttachment `json:"files,omitempty"`
Audios []FileAttachment `json:"audios,omitempty"`
Videos []FileAttachment `json:"videos,omitempty"`
AtUsers []string `json:"at_users,omitempty"`
AtAll bool `json:"at_all,omitempty"`
// AsPrompt, when true, injects the message into the running agent session
// as a user prompt instead of posting to the platform. Combined with
// NewThread, it posts to a new top-level thread AND injects as a prompt
// (see issue #590).
AsPrompt bool `json:"as_prompt,omitempty"`
// NewThread, when true, posts the message as a new top-level message
// rather than replying in the existing thread. Only effective when the
// underlying platform supports top-level posting (Send).
NewThread bool `json:"new_thread,omitempty"`
}
// NewAPIServer creates an API server on a Unix socket.
func NewAPIServer(dataDir string) (*APIServer, error) {
sockDir := filepath.Join(dataDir, "run")
if err := os.MkdirAll(sockDir, 0o755); err != nil {
return nil, fmt.Errorf("create run dir: %w", err)
}
sockPath := filepath.Join(sockDir, "api.sock")
// Remove stale socket
os.Remove(sockPath)
listener, err := net.Listen("unix", sockPath)
if err != nil {
return nil, fmt.Errorf("listen unix socket: %w", err)
}
if err := os.Chmod(sockPath, 0o600); err != nil {
_ = listener.Close()
return nil, fmt.Errorf("chmod socket: %w", err)
}
s := &APIServer{
socketPath: sockPath,
listener: listener,
mux: http.NewServeMux(),
engines: make(map[string]*Engine),
}
s.mux.HandleFunc("/send", s.handleSend)
s.mux.HandleFunc("/sessions", s.handleSessions)
s.mux.HandleFunc("/cron/add", s.handleCronAdd)
s.mux.HandleFunc("/cron/list", s.handleCronList)
s.mux.HandleFunc("/cron/info", s.handleCronInfo)
s.mux.HandleFunc("/cron/edit", s.handleCronEdit)
s.mux.HandleFunc("/cron/del", s.handleCronDel)
s.mux.HandleFunc("/timer/add", s.handleTimerAdd)
s.mux.HandleFunc("/timer/list", s.handleTimerList)
s.mux.HandleFunc("/timer/info", s.handleTimerInfo)
s.mux.HandleFunc("/timer/del", s.handleTimerDel)
s.mux.HandleFunc("/cron/exec", s.handleCronExec)
s.mux.HandleFunc("/cron/run", s.handleCronExec)
s.mux.HandleFunc("/relay/send", s.handleRelaySend)
s.mux.HandleFunc("/relay/bind", s.handleRelayBind)
s.mux.HandleFunc("/relay/binding", s.handleRelayBinding)
return s, nil
}
func (s *APIServer) SocketPath() string {
return s.socketPath
}
func (s *APIServer) RegisterEngine(name string, e *Engine) {
s.mu.Lock()
defer s.mu.Unlock()
s.engines[name] = e
if s.relay != nil {
s.relay.RegisterEngine(name, e)
}
}
func (s *APIServer) SetRelayManager(rm *RelayManager) {
s.relay = rm
}
func (s *APIServer) RelayManager() *RelayManager {
return s.relay
}
func (s *APIServer) SetCronScheduler(cs *CronScheduler) {
s.cron = cs
}
func (s *APIServer) SetTimerScheduler(ts *TimerScheduler) {
s.timer = ts
}
func (s *APIServer) Start() {
s.server = &http.Server{Handler: s.mux}
go func() {
if err := s.server.Serve(s.listener); err != nil && err != http.ErrServerClosed {
slog.Error("api server error", "error", err)
}
}()
slog.Info("api server started", "socket", s.socketPath)
}
func (s *APIServer) Stop() {
if s.server != nil {
if err := s.server.Close(); err != nil && err != http.ErrServerClosed {
slog.Debug("api server close failed", "error", err)
}
}
if err := os.Remove(s.socketPath); err != nil && !os.IsNotExist(err) {
slog.Debug("api server remove socket failed", "error", err)
}
}
func apiJSON(w http.ResponseWriter, status int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
if err := json.NewEncoder(w).Encode(v); err != nil {
slog.Error("api server: write JSON failed", "error", err)
}
}
func (s *APIServer) handleSend(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "POST only", http.StatusMethodNotAllowed)
return
}
const maxSendBody = 52 << 20 // 52 MB (slightly above max attachment to account for base64 overhead)
var req SendRequest
if err := json.NewDecoder(io.LimitReader(r.Body, maxSendBody)).Decode(&req); err != nil {
http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest)
return
}
if req.Message == "" && strings.TrimSpace(req.TTSText) == "" && len(req.Images) == 0 && len(req.Files) == 0 && len(req.Audios) == 0 && len(req.Videos) == 0 {
http.Error(w, "message, tts_text, or attachment is required", http.StatusBadRequest)
return
}
s.mu.RLock()
var engine *Engine
var ok bool
if req.Project != "" {
engine, ok = s.engines[req.Project]
} else if len(s.engines) == 1 {
// No project specified and only one engine: use it by default.
// Do NOT silently fall back when a non-empty project name is unknown —
// that misroutes the message to the wrong engine. Mirrors the resolve
// pattern in webhook.go and handleCronAdd.
for _, e := range s.engines {
engine = e
ok = true
}
}
s.mu.RUnlock()
if !ok {
if req.Project == "" {
http.Error(w, "project is required (multiple projects configured)", http.StatusBadRequest)
return
}
http.Error(w, fmt.Sprintf("project %q not found", req.Project), http.StatusNotFound)
return
}
// Dispatch based on AsPrompt / NewThread combination. When both are
// false, fall through to the legacy SendToSessionWithAttachments path
// so the AtUsers / AtAll behavior is preserved exactly.
var sendErr error
switch {
case req.AsPrompt && req.NewThread:
// Post to platform as a new thread AND inject as a prompt. AtUsers
// and attachments are ignored here — they are not meaningful in
// the "programmatic prompt" path.
sendErr = engine.InjectPromptToNewThread(req.SessionKey, req.Message)
case req.AsPrompt:
sendErr = engine.InjectPrompt(req.SessionKey, req.Message, req.Images, req.Files)
case req.NewThread:
sendErr = engine.PostToNewThread(req.SessionKey, req.Message)
default:
if req.Message != "" || len(req.Images) > 0 || len(req.Files) > 0 {
sendErr = engine.SendToSessionWithAttachments(req.SessionKey, req.Message, req.Images, req.Files, req.AtUsers, req.AtAll)
}
}
if sendErr != nil {
http.Error(w, sendErr.Error(), http.StatusInternalServerError)
return
}
if len(req.Audios) > 0 {
if err := engine.SendAudiosToSession(req.SessionKey, req.Audios); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
if len(req.Videos) > 0 {
if err := engine.SendVideosToSession(req.SessionKey, req.Videos); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
if strings.TrimSpace(req.TTSText) != "" {
if err := engine.SendTTSToSession(req.SessionKey, req.TTSText); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
apiJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}
func (s *APIServer) handleSessions(w http.ResponseWriter, r *http.Request) {
s.mu.RLock()
defer s.mu.RUnlock()
type sessionInfo struct {
Project string `json:"project"`
SessionKey string `json:"session_key"`
Platform string `json:"platform"`
}
var result []sessionInfo
for name, e := range s.engines {
e.interactiveMu.Lock()
for key, state := range e.interactiveStates {
if state.platform != nil {
result = append(result, sessionInfo{
Project: name,
SessionKey: key,
Platform: state.platform.Name(),
})
}
}
e.interactiveMu.Unlock()
}
apiJSON(w, http.StatusOK, result)
}
// ── Cron API ───────────────────────────────────────────────────
// CronAddRequest is the JSON body for POST /cron/add.
type CronAddRequest struct {
Project string `json:"project"`
SessionKey string `json:"session_key"`
CronExpr string `json:"cron_expr"`
Prompt string `json:"prompt"`
Exec string `json:"exec"`
WorkDir string `json:"work_dir"`
Description string `json:"description"`
Silent *bool `json:"silent,omitempty"`
SessionMode string `json:"session_mode,omitempty"`
Mode string `json:"mode,omitempty"`
TimeoutMins *int `json:"timeout_mins,omitempty"`
}
func (s *APIServer) handleCronAdd(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "POST only", http.StatusMethodNotAllowed)
return
}
if s.cron == nil {
http.Error(w, "cron scheduler not available", http.StatusServiceUnavailable)
return
}
var req CronAddRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest)
return
}
if req.CronExpr == "" {
http.Error(w, "cron_expr is required", http.StatusBadRequest)
return
}
if req.Prompt == "" && req.Exec == "" {
http.Error(w, "either prompt or exec is required", http.StatusBadRequest)
return
}
if req.Prompt != "" && req.Exec != "" {
http.Error(w, "prompt and exec are mutually exclusive", http.StatusBadRequest)
return
}
// Resolve project: use provided, or pick single engine
project := req.Project
if project == "" {
s.mu.RLock()
if len(s.engines) == 1 {
for name := range s.engines {
project = name
}
}
s.mu.RUnlock()
}
if project == "" {
http.Error(w, "project is required (multiple projects configured)", http.StatusBadRequest)
return
}
// Resolve session_key: use provided, or auto-detect from active sessions
sessionKey := req.SessionKey
if sessionKey == "" {
s.mu.RLock()
engine := s.engines[project]
s.mu.RUnlock()
if engine != nil {
keys := engine.ActiveSessionKeys()
if len(keys) == 1 {
sessionKey = keys[0]
slog.Debug("auto-detected session_key for cron job", "session_key", sessionKey)
}
}
}
if sessionKey == "" {
http.Error(w, "session_key is required: set CC_SESSION_KEY env, pass --session-key, or ensure exactly one active session exists", http.StatusBadRequest)
return
}
job := &CronJob{
ID: GenerateCronID(),
Project: project,
SessionKey: sessionKey,
CronExpr: req.CronExpr,
Prompt: req.Prompt,
Exec: req.Exec,
WorkDir: req.WorkDir,
Description: req.Description,
Enabled: true,
Silent: req.Silent,
SessionMode: NormalizeCronSessionMode(req.SessionMode),
Mode: req.Mode,
TimeoutMins: req.TimeoutMins,
}
job.CreatedAt = time.Now()
if err := s.cron.AddJob(job); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
apiJSON(w, http.StatusOK, job)
}
func (s *APIServer) handleCronList(w http.ResponseWriter, r *http.Request) {
if s.cron == nil {
http.Error(w, "cron scheduler not available", http.StatusServiceUnavailable)
return
}
project := r.URL.Query().Get("project")
var jobs []*CronJob
if project != "" {
jobs = s.cron.Store().ListByProject(project)
} else {
jobs = s.cron.Store().List()
}
apiJSON(w, http.StatusOK, jobs)
}
func (s *APIServer) handleCronDel(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "POST only", http.StatusMethodNotAllowed)
return
}
if s.cron == nil {
http.Error(w, "cron scheduler not available", http.StatusServiceUnavailable)
return
}
var req struct {
ID string `json:"id"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest)
return
}
if req.ID == "" {
http.Error(w, "id is required", http.StatusBadRequest)
return
}
if s.cron.RemoveJob(req.ID) {
apiJSON(w, http.StatusOK, map[string]string{"status": "ok"})
} else {
http.Error(w, fmt.Sprintf("job %q not found", req.ID), http.StatusNotFound)
}
}
func (s *APIServer) handleCronExec(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "POST only", http.StatusMethodNotAllowed)
return
}
if s.cron == nil {
http.Error(w, "cron scheduler not available", http.StatusServiceUnavailable)
return
}
var req struct {
ID string `json:"id"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest)
return
}
if req.ID == "" {
http.Error(w, "id is required", http.StatusBadRequest)
return
}
if err := s.cron.RunJobNow(req.ID); err != nil {
if errors.Is(err, ErrCronJobNotFound) {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
apiJSON(w, http.StatusAccepted, map[string]string{
"id": req.ID,
"status": "triggered",
})
}
func (s *APIServer) handleCronInfo(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "GET only", http.StatusMethodNotAllowed)
return
}
if s.cron == nil {
http.Error(w, "cron scheduler not available", http.StatusServiceUnavailable)
return
}
id := r.URL.Query().Get("id")
if id == "" {
http.Error(w, "id is required", http.StatusBadRequest)
return
}
job := s.cron.store.Get(id)
if job == nil {
http.Error(w, fmt.Sprintf("job %q not found", id), http.StatusNotFound)
return
}
apiJSON(w, http.StatusOK, job)
}
func (s *APIServer) handleCronEdit(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "POST only", http.StatusMethodNotAllowed)
return
}
if s.cron == nil {
http.Error(w, "cron scheduler not available", http.StatusServiceUnavailable)
return
}
var req struct {
ID string `json:"id"`
Field string `json:"field"`
Value any `json:"value"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest)
return
}
if req.ID == "" {
http.Error(w, "id is required", http.StatusBadRequest)
return
}
if req.Field == "" {
http.Error(w, "field is required", http.StatusBadRequest)
return
}
if req.Value == nil {
http.Error(w, "value is required", http.StatusBadRequest)
return
}
if err := s.cron.UpdateJob(req.ID, req.Field, req.Value); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Return updated job
job := s.cron.Store().Get(req.ID)
apiJSON(w, http.StatusOK, job)
}
// ── Timer API ─────────────────────────────────────────────────
// TimerAddRequest is the JSON body for POST /timer/add.
type TimerAddRequest struct {
Project string `json:"project"`
SessionKey string `json:"session_key"`
Delay string `json:"delay"` // relative ("2h") or absolute ISO time
Prompt string `json:"prompt"`
Exec string `json:"exec"`
WorkDir string `json:"work_dir"`
Description string `json:"description"`
Silent *bool `json:"silent,omitempty"`
Mute bool `json:"mute,omitempty"`
SessionMode string `json:"session_mode,omitempty"`
Mode string `json:"mode,omitempty"`
TimeoutMins *int `json:"timeout_mins,omitempty"`
}
func (s *APIServer) handleTimerAdd(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "POST only", http.StatusMethodNotAllowed)
return
}
if s.timer == nil {
http.Error(w, "timer scheduler not available", http.StatusServiceUnavailable)
return
}
var req TimerAddRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest)
return
}
if req.Delay == "" {
http.Error(w, "delay is required (e.g. 2h, 30m, or ISO time)", http.StatusBadRequest)
return
}
if req.Prompt == "" && req.Exec == "" {
http.Error(w, "either prompt or exec is required", http.StatusBadRequest)
return
}
if req.Prompt != "" && req.Exec != "" {
http.Error(w, "prompt and exec are mutually exclusive", http.StatusBadRequest)
return
}
fireAt, err := ParseDelayOrTime(req.Delay)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
project := req.Project
if project == "" {
s.mu.RLock()
if len(s.engines) == 1 {
for name := range s.engines {
project = name
}
}
s.mu.RUnlock()
}
if project == "" {
http.Error(w, "project is required (multiple projects configured)", http.StatusBadRequest)
return
}
sessionKey := req.SessionKey
if sessionKey == "" {
s.mu.RLock()
engine := s.engines[project]
s.mu.RUnlock()
if engine != nil {
keys := engine.ActiveSessionKeys()
if len(keys) == 1 {
sessionKey = keys[0]
}
}
}
if sessionKey == "" {
http.Error(w, "session_key is required", http.StatusBadRequest)
return
}
job := &TimerJob{
ID: GenerateTimerID(),
Project: project,
SessionKey: sessionKey,
ScheduledAt: fireAt,
Prompt: req.Prompt,
Exec: req.Exec,
WorkDir: req.WorkDir,
Description: req.Description,
Silent: req.Silent,
Mute: req.Mute,
SessionMode: req.SessionMode,
Mode: req.Mode,
TimeoutMins: req.TimeoutMins,
CreatedAt: time.Now(),
}
if err := s.timer.AddJob(job); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
apiJSON(w, http.StatusOK, job)
}
func (s *APIServer) handleTimerList(w http.ResponseWriter, r *http.Request) {
if s.timer == nil {
http.Error(w, "timer scheduler not available", http.StatusServiceUnavailable)
return
}
project := r.URL.Query().Get("project")
var jobs []*TimerJob
if project != "" {
jobs = s.timer.Store().ListByProject(project)
} else {
jobs = s.timer.Store().List()
}
// Filter to pending only
var pending []*TimerJob
for _, j := range jobs {
if !j.Fired {
pending = append(pending, j)
}
}
apiJSON(w, http.StatusOK, pending)
}
func (s *APIServer) handleTimerInfo(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "GET only", http.StatusMethodNotAllowed)
return
}
if s.timer == nil {
http.Error(w, "timer scheduler not available", http.StatusServiceUnavailable)
return
}
id := r.URL.Query().Get("id")
if id == "" {
http.Error(w, "id is required", http.StatusBadRequest)
return
}
job := s.timer.Store().Get(id)
if job == nil {
http.Error(w, "timer not found", http.StatusNotFound)
return
}
apiJSON(w, http.StatusOK, job)
}
func (s *APIServer) handleTimerDel(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "POST only", http.StatusMethodNotAllowed)
return
}
if s.timer == nil {
http.Error(w, "timer scheduler not available", http.StatusServiceUnavailable)
return
}
var req struct {
ID string `json:"id"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest)
return
}
if req.ID == "" {
http.Error(w, "id is required", http.StatusBadRequest)
return
}
if !s.timer.RemoveJob(req.ID) {
http.Error(w, "timer not found", http.StatusNotFound)
return
}
apiJSON(w, http.StatusOK, map[string]string{"status": "ok", "id": req.ID})
}
// ── Relay API ──────────────────────────────────────────────────
func (s *APIServer) handleRelaySend(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "POST only", http.StatusMethodNotAllowed)
return
}
if s.relay == nil {
http.Error(w, "relay not available", http.StatusServiceUnavailable)
return
}
var req RelayRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest)
return
}
if req.To == "" || req.Message == "" || req.SessionKey == "" {
http.Error(w, "to, session_key, and message are required", http.StatusBadRequest)
return
}
resp, err := s.relay.Send(r.Context(), req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
apiJSON(w, http.StatusOK, resp)
}
func (s *APIServer) handleRelayBind(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "POST only", http.StatusMethodNotAllowed)
return
}
if s.relay == nil {
http.Error(w, "relay not available", http.StatusServiceUnavailable)
return
}
var req struct {
Platform string `json:"platform"`
ChatID string `json:"chat_id"`
Bots map[string]string `json:"bots"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest)
return
}
if req.ChatID == "" || len(req.Bots) < 2 {
http.Error(w, "chat_id and at least 2 bots are required", http.StatusBadRequest)
return
}
s.relay.Bind(req.Platform, req.ChatID, req.Bots)
apiJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}
func (s *APIServer) handleRelayBinding(w http.ResponseWriter, r *http.Request) {
if s.relay == nil {
http.Error(w, "relay not available", http.StatusServiceUnavailable)
return
}
chatID := r.URL.Query().Get("chat_id")
if chatID == "" {
http.Error(w, "chat_id is required", http.StatusBadRequest)
return
}
binding := s.relay.GetBinding(chatID)
if binding == nil {
http.Error(w, "no binding found", http.StatusNotFound)
return
}
apiJSON(w, http.StatusOK, binding)
}