mirror of
https://github.com/chenhg5/cc-connect.git
synced 2026-07-03 12:28:10 +08:00
* feat(codex): support custom system_prompt / append_system_prompt config Codex has no native system-prompt CLI flag, so these project options are synthesized into a preamble and prepended to the first message of each new session. Resumed sessions are not re-injected (preambleSent is preset for resume IDs). Covers both the exec and app_server backends. Adds config.example.toml documentation for the two new options. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * test(codex): check cs.Close return value to satisfy errcheck CI lints new lines with golangci-lint --new-from-rev; the freshly added TestSend_PrependsProjectPromptOnFreshSession used an unchecked defer cs.Close(). Wrap it in a deferred closure that explicitly ignores the error, matching the errcheck-clean pattern. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
1657 lines
42 KiB
Go
1657 lines
42 KiB
Go
package codex
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/chenhg5/cc-connect/core"
|
|
)
|
|
|
|
type rpcResponseEnvelope struct {
|
|
ID any `json:"id"`
|
|
Result json.RawMessage `json:"result"`
|
|
Error *rpcError `json:"error"`
|
|
}
|
|
|
|
type rpcNotificationEnvelope struct {
|
|
Method string `json:"method"`
|
|
Params json.RawMessage `json:"params"`
|
|
}
|
|
|
|
type rpcError struct {
|
|
Code int `json:"code"`
|
|
Message string `json:"message"`
|
|
}
|
|
|
|
type initResponse struct {
|
|
ProtocolVersion string `json:"protocolVersion"`
|
|
}
|
|
|
|
type threadStartResponse struct {
|
|
Cwd string `json:"cwd"`
|
|
Model string `json:"model"`
|
|
ReasoningEffort *string `json:"reasoningEffort"`
|
|
Thread struct {
|
|
ID string `json:"id"`
|
|
} `json:"thread"`
|
|
}
|
|
|
|
type threadResumeResponse struct {
|
|
Cwd string `json:"cwd"`
|
|
Model string `json:"model"`
|
|
ReasoningEffort *string `json:"reasoningEffort"`
|
|
Thread struct {
|
|
ID string `json:"id"`
|
|
} `json:"thread"`
|
|
}
|
|
|
|
type turnStartResponse struct {
|
|
Turn struct {
|
|
ID string `json:"id"`
|
|
} `json:"turn"`
|
|
}
|
|
|
|
type turnNotification struct {
|
|
ThreadID string `json:"threadId"`
|
|
Turn struct {
|
|
ID string `json:"id"`
|
|
Status string `json:"status"`
|
|
Error *struct {
|
|
Message string `json:"message"`
|
|
} `json:"error"`
|
|
} `json:"turn"`
|
|
}
|
|
|
|
type itemNotification struct {
|
|
ThreadID string `json:"threadId"`
|
|
TurnID string `json:"turnId"`
|
|
Item map[string]any `json:"item"`
|
|
}
|
|
|
|
type errorNotification struct {
|
|
Message string `json:"message"`
|
|
}
|
|
|
|
type appServerRateLimitsResponse struct {
|
|
RateLimits appServerRateLimitSnapshot `json:"rateLimits"`
|
|
RateLimitsByLimitID map[string]appServerRateLimitSnapshot `json:"rateLimitsByLimitId"`
|
|
}
|
|
|
|
type appServerRateLimitSnapshot struct {
|
|
LimitID string `json:"limitId"`
|
|
LimitName string `json:"limitName"`
|
|
PlanType string `json:"planType"`
|
|
Primary *appServerRateLimitWindow `json:"primary"`
|
|
Secondary *appServerRateLimitWindow `json:"secondary"`
|
|
Credits *appServerCreditsSnapshot `json:"credits"`
|
|
}
|
|
|
|
type appServerRateLimitWindow struct {
|
|
UsedPercent int `json:"usedPercent"`
|
|
WindowDurationMins int `json:"windowDurationMins"`
|
|
ResetsAt int64 `json:"resetsAt"`
|
|
}
|
|
|
|
type appServerCreditsSnapshot struct {
|
|
Balance *string `json:"balance"`
|
|
HasCredits bool `json:"hasCredits"`
|
|
Unlimited bool `json:"unlimited"`
|
|
}
|
|
|
|
type appServerRequestUserInputParams struct {
|
|
ThreadID string `json:"threadId"`
|
|
TurnID string `json:"turnId"`
|
|
ItemID string `json:"itemId"`
|
|
Questions []appServerRequestUserInputQuestion `json:"questions"`
|
|
}
|
|
|
|
type appServerRequestUserInputQuestion struct {
|
|
ID string `json:"id"`
|
|
Header string `json:"header"`
|
|
Question string `json:"question"`
|
|
IsOther bool `json:"isOther"`
|
|
IsSecret bool `json:"isSecret"`
|
|
Options []appServerRequestUserInputOption `json:"options"`
|
|
}
|
|
|
|
type appServerRequestUserInputOption struct {
|
|
Label string `json:"label"`
|
|
Description string `json:"description"`
|
|
}
|
|
|
|
type appServerRequestUserInputResponse struct {
|
|
Answers map[string]appServerRequestUserInputAnswer `json:"answers"`
|
|
}
|
|
|
|
type appServerRequestUserInputAnswer struct {
|
|
Answers []string `json:"answers"`
|
|
}
|
|
|
|
type appServerSession struct {
|
|
url string
|
|
workDir string
|
|
model string
|
|
effort string
|
|
mode string
|
|
baseURL string
|
|
modelProvider string
|
|
extraEnv []string
|
|
codexHome string
|
|
promptPreamble string
|
|
|
|
events chan core.Event
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
cmd *exec.Cmd
|
|
stdin io.WriteCloser
|
|
procMu sync.Mutex
|
|
writeMu sync.Mutex
|
|
|
|
nextID atomic.Int64
|
|
|
|
pendingMu sync.Mutex
|
|
pending map[int64]chan rpcResponseEnvelope
|
|
|
|
approvalsMu sync.Mutex
|
|
pendingApprovals map[string]chan core.PermissionResult
|
|
|
|
threadID atomic.Value
|
|
alive atomic.Bool
|
|
|
|
closeOnce sync.Once
|
|
wg sync.WaitGroup
|
|
|
|
stateMu sync.Mutex
|
|
pendingMsgs []string
|
|
currentTurn string
|
|
preambleSent bool
|
|
|
|
runtimeMu sync.RWMutex
|
|
usage *core.UsageReport
|
|
context *core.ContextUsage
|
|
}
|
|
|
|
const (
|
|
appServerRequestTimeout = 120 * time.Second
|
|
appServerUsageRefreshTimeout = 1500 * time.Millisecond
|
|
)
|
|
|
|
func newAppServerSession(ctx context.Context, url, workDir, model, effort, mode, resumeID, baseURL, modelProvider string, extraEnv []string, codexHome string, systemPrompt string, appendPrompt string) (*appServerSession, error) {
|
|
sessionCtx, cancel := context.WithCancel(ctx)
|
|
s := &appServerSession{
|
|
url: url,
|
|
workDir: workDir,
|
|
model: model,
|
|
effort: effort,
|
|
mode: mode,
|
|
baseURL: baseURL,
|
|
modelProvider: modelProvider,
|
|
extraEnv: append([]string(nil), extraEnv...),
|
|
codexHome: strings.TrimSpace(codexHome),
|
|
promptPreamble: buildCodexPromptPreamble(systemPrompt, appendPrompt),
|
|
events: make(chan core.Event, 128),
|
|
ctx: sessionCtx,
|
|
cancel: cancel,
|
|
pending: make(map[int64]chan rpcResponseEnvelope),
|
|
pendingApprovals: make(map[string]chan core.PermissionResult),
|
|
preambleSent: resumeID != "" && resumeID != core.ContinueSession,
|
|
}
|
|
s.alive.Store(true)
|
|
|
|
if err := s.connect(); err != nil {
|
|
cancel()
|
|
return nil, err
|
|
}
|
|
|
|
if err := s.initialize(); err != nil {
|
|
_ = s.Close()
|
|
return nil, err
|
|
}
|
|
|
|
if err := s.ensureThread(resumeID); err != nil {
|
|
_ = s.Close()
|
|
return nil, err
|
|
}
|
|
if err := s.refreshUsage(context.Background()); err != nil {
|
|
slog.Debug("codex app-server: initial rate limit fetch failed", "error", err)
|
|
}
|
|
|
|
return s, nil
|
|
}
|
|
|
|
func (s *appServerSession) connect() error {
|
|
args := []string{"app-server"}
|
|
if strings.TrimSpace(s.url) != "" {
|
|
args = append(args, "--listen", strings.TrimSpace(s.url))
|
|
}
|
|
if model := strings.TrimSpace(s.model); model != "" {
|
|
args = append(args, "-c", fmt.Sprintf("model=%q", model))
|
|
}
|
|
if effort := strings.TrimSpace(s.effort); effort != "" {
|
|
args = append(args, "-c", fmt.Sprintf("model_reasoning_effort=%q", effort))
|
|
}
|
|
if provider := strings.TrimSpace(s.modelProvider); provider != "" {
|
|
args = append(args, "-c", fmt.Sprintf("model_provider=%q", provider))
|
|
}
|
|
if baseURL := strings.TrimSpace(s.baseURL); baseURL != "" {
|
|
args = append(args, "-c", fmt.Sprintf("openai_base_url=%q", baseURL))
|
|
}
|
|
cmd := exec.CommandContext(s.ctx, "codex", args...)
|
|
cmd.Dir = s.workDir
|
|
env := append([]string(nil), s.extraEnv...)
|
|
if s.codexHome != "" {
|
|
env = append(env, "CODEX_HOME="+s.codexHome)
|
|
}
|
|
if len(env) > 0 {
|
|
cmd.Env = core.MergeEnv(os.Environ(), env)
|
|
}
|
|
|
|
stdin, err := cmd.StdinPipe()
|
|
if err != nil {
|
|
return fmt.Errorf("codex app-server stdin pipe: %w", err)
|
|
}
|
|
stdout, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
return fmt.Errorf("codex app-server stdout pipe: %w", err)
|
|
}
|
|
stderr, err := cmd.StderrPipe()
|
|
if err != nil {
|
|
return fmt.Errorf("codex app-server stderr pipe: %w", err)
|
|
}
|
|
if err := cmd.Start(); err != nil {
|
|
return fmt.Errorf("codex app-server start: %w", err)
|
|
}
|
|
|
|
s.procMu.Lock()
|
|
s.cmd = cmd
|
|
s.stdin = stdin
|
|
s.procMu.Unlock()
|
|
|
|
slog.Info("codex app-server session started", "transport", "stdio", "pid", cmd.Process.Pid, "work_dir", s.workDir)
|
|
|
|
s.wg.Add(3)
|
|
go s.readLoop(stdout)
|
|
go s.stderrLoop(stderr)
|
|
go s.waitLoop()
|
|
return nil
|
|
}
|
|
|
|
func (s *appServerSession) initialize() error {
|
|
params := map[string]any{
|
|
"clientInfo": map[string]any{
|
|
"name": "cc-connect-codex-agent",
|
|
"title": "CC Connect Codex Agent",
|
|
"version": "0.1.0",
|
|
},
|
|
"capabilities": map[string]any{
|
|
"experimentalApi": true,
|
|
"optOutNotificationMethods": []string{
|
|
"command/exec/outputDelta",
|
|
"item/agentMessage/delta",
|
|
"item/plan/delta",
|
|
"item/fileChange/outputDelta",
|
|
"item/reasoning/summaryTextDelta",
|
|
"item/reasoning/textDelta",
|
|
},
|
|
},
|
|
}
|
|
|
|
var resp initResponse
|
|
if err := s.request("initialize", params, &resp); err != nil {
|
|
return fmt.Errorf("codex app-server initialize: %w", err)
|
|
}
|
|
if err := s.notify("initialized", nil); err != nil {
|
|
return fmt.Errorf("codex app-server initialized notify: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *appServerSession) ensureThread(resumeID string) error {
|
|
if resumeID != "" && resumeID != core.ContinueSession {
|
|
params := s.threadRequestParams()
|
|
params["threadId"] = resumeID
|
|
params["persistExtendedHistory"] = true
|
|
|
|
var resp threadResumeResponse
|
|
if err := s.request("thread/resume", params, &resp); err != nil {
|
|
return err
|
|
}
|
|
if resp.Thread.ID == "" {
|
|
return fmt.Errorf("codex app-server resume returned empty thread id")
|
|
}
|
|
s.applyThreadRuntimeState(resp.Cwd, resp.Model, resp.ReasoningEffort)
|
|
s.threadID.Store(resp.Thread.ID)
|
|
slog.Info("codex app-server thread resumed", "thread_id", resp.Thread.ID)
|
|
return nil
|
|
}
|
|
|
|
var resp threadStartResponse
|
|
if err := s.request("thread/start", s.threadRequestParams(), &resp); err != nil {
|
|
return err
|
|
}
|
|
if resp.Thread.ID == "" {
|
|
return fmt.Errorf("codex app-server start returned empty thread id")
|
|
}
|
|
s.applyThreadRuntimeState(resp.Cwd, resp.Model, resp.ReasoningEffort)
|
|
s.threadID.Store(resp.Thread.ID)
|
|
slog.Info("codex app-server thread started", "thread_id", resp.Thread.ID)
|
|
return nil
|
|
}
|
|
|
|
func (s *appServerSession) threadRequestParams() map[string]any {
|
|
params := map[string]any{
|
|
"experimentalRawEvents": false,
|
|
"persistExtendedHistory": false,
|
|
}
|
|
if model := s.GetModel(); model != "" {
|
|
params["model"] = model
|
|
}
|
|
if approval, sandbox := appServerModeSettings(s.mode); approval != "" {
|
|
params["approvalPolicy"] = approval
|
|
if sandbox != "" {
|
|
params["sandbox"] = sandbox
|
|
}
|
|
}
|
|
return params
|
|
}
|
|
|
|
func appServerModeSettings(mode string) (approval string, sandbox string) {
|
|
switch normalizeMode(mode) {
|
|
case "auto-edit", "full-auto":
|
|
return "never", "workspace-write"
|
|
case "yolo":
|
|
return "never", "danger-full-access"
|
|
default:
|
|
return "on-request", "read-only"
|
|
}
|
|
}
|
|
|
|
func (s *appServerSession) applyThreadRuntimeState(workDir, model string, effort *string) {
|
|
s.runtimeMu.Lock()
|
|
defer s.runtimeMu.Unlock()
|
|
if dir := strings.TrimSpace(workDir); dir != "" {
|
|
s.workDir = dir
|
|
}
|
|
if m := strings.TrimSpace(model); m != "" {
|
|
s.model = m
|
|
}
|
|
s.effort = normalizeRuntimeReasoningEffort(stringValue(effort))
|
|
}
|
|
|
|
func (s *appServerSession) refreshUsage(ctx context.Context) error {
|
|
timeout := appServerUsageRefreshTimeout
|
|
if ctx != nil {
|
|
if deadline, ok := ctx.Deadline(); ok {
|
|
if until := time.Until(deadline); until > 0 && until < timeout {
|
|
timeout = until
|
|
}
|
|
}
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if timeout <= 0 {
|
|
return context.DeadlineExceeded
|
|
}
|
|
|
|
var resp appServerRateLimitsResponse
|
|
if err := s.requestWithTimeout("account/rateLimits/read", map[string]any{}, &resp, timeout); err != nil {
|
|
return err
|
|
}
|
|
s.storeUsage(mapAppServerRateLimits(resp))
|
|
return nil
|
|
}
|
|
|
|
func (s *appServerSession) cachedUsage() *core.UsageReport {
|
|
s.runtimeMu.RLock()
|
|
defer s.runtimeMu.RUnlock()
|
|
return cloneUsageReport(s.usage)
|
|
}
|
|
|
|
func (s *appServerSession) cachedContextUsage() *core.ContextUsage {
|
|
s.runtimeMu.RLock()
|
|
defer s.runtimeMu.RUnlock()
|
|
return cloneContextUsage(s.context)
|
|
}
|
|
|
|
func (s *appServerSession) storeUsage(report *core.UsageReport) {
|
|
s.runtimeMu.Lock()
|
|
defer s.runtimeMu.Unlock()
|
|
s.usage = cloneUsageReport(report)
|
|
}
|
|
|
|
func (s *appServerSession) storeContextUsage(usage *core.ContextUsage) {
|
|
s.runtimeMu.Lock()
|
|
defer s.runtimeMu.Unlock()
|
|
s.context = cloneContextUsage(usage)
|
|
}
|
|
|
|
func (s *appServerSession) Send(prompt string, images []core.ImageAttachment, files []core.FileAttachment) error {
|
|
if !s.alive.Load() {
|
|
return fmt.Errorf("session is closed")
|
|
}
|
|
|
|
if len(files) > 0 {
|
|
filePaths := core.SaveFilesToDisk(s.workDir, files)
|
|
prompt = core.AppendFileRefs(prompt, filePaths)
|
|
}
|
|
|
|
prompt, imagePaths, err := s.stageImages(prompt, images)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.stateMu.Lock()
|
|
if !s.preambleSent {
|
|
prompt = prependCodexPromptPreamble(prompt, s.promptPreamble)
|
|
s.preambleSent = true
|
|
}
|
|
s.stateMu.Unlock()
|
|
|
|
threadID := s.CurrentSessionID()
|
|
if threadID == "" {
|
|
return fmt.Errorf("codex app-server thread id is empty")
|
|
}
|
|
|
|
input := make([]map[string]any, 0, 1+len(imagePaths))
|
|
input = append(input, map[string]any{
|
|
"type": "text",
|
|
"text": prompt,
|
|
"text_elements": []any{},
|
|
})
|
|
for _, path := range imagePaths {
|
|
input = append(input, map[string]any{
|
|
"type": "localImage",
|
|
"path": path,
|
|
})
|
|
}
|
|
|
|
params := map[string]any{
|
|
"threadId": threadID,
|
|
"input": input,
|
|
}
|
|
if model := s.GetModel(); model != "" {
|
|
params["model"] = model
|
|
}
|
|
if effort := s.GetReasoningEffort(); effort != "" {
|
|
params["effort"] = effort
|
|
}
|
|
if approval, _ := appServerModeSettings(s.mode); approval != "" {
|
|
params["approvalPolicy"] = approval
|
|
}
|
|
|
|
var resp turnStartResponse
|
|
if err := s.request("turn/start", params, &resp); err != nil {
|
|
return fmt.Errorf("codex app-server turn/start: %w", err)
|
|
}
|
|
if resp.Turn.ID == "" {
|
|
return fmt.Errorf("codex app-server turn/start returned empty turn id")
|
|
}
|
|
|
|
s.stateMu.Lock()
|
|
s.currentTurn = resp.Turn.ID
|
|
s.pendingMsgs = s.pendingMsgs[:0]
|
|
s.stateMu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *appServerSession) stageImages(prompt string, images []core.ImageAttachment) (string, []string, error) {
|
|
if len(images) == 0 {
|
|
return prompt, nil, nil
|
|
}
|
|
|
|
imgDir := filepath.Join(s.workDir, ".cc-connect", "images")
|
|
if err := os.MkdirAll(imgDir, 0o755); err != nil {
|
|
return "", nil, fmt.Errorf("codex app-server: create image dir: %w", err)
|
|
}
|
|
|
|
imagePaths := make([]string, 0, len(images))
|
|
for i, img := range images {
|
|
ext := codexImageExt(img.MimeType)
|
|
fname := fmt.Sprintf("img_%d_%d%s", time.Now().UnixMilli(), i, ext)
|
|
fpath := filepath.Join(imgDir, fname)
|
|
if err := os.WriteFile(fpath, img.Data, 0o644); err != nil {
|
|
return "", nil, fmt.Errorf("codex app-server: save image: %w", err)
|
|
}
|
|
imagePaths = append(imagePaths, fpath)
|
|
}
|
|
|
|
if strings.TrimSpace(prompt) == "" {
|
|
prompt = "Please analyze the attached image(s)."
|
|
}
|
|
|
|
return prompt, imagePaths, nil
|
|
}
|
|
|
|
func (s *appServerSession) RespondPermission(requestID string, result core.PermissionResult) error {
|
|
s.approvalsMu.Lock()
|
|
ch := s.pendingApprovals[requestID]
|
|
s.approvalsMu.Unlock()
|
|
if ch == nil {
|
|
return fmt.Errorf("codex app-server: no pending approval for request %s", requestID)
|
|
}
|
|
select {
|
|
case ch <- result:
|
|
default:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *appServerSession) handleServerRequest(probe map[string]json.RawMessage) {
|
|
rawID := probe["id"]
|
|
var method string
|
|
if err := json.Unmarshal(probe["method"], &method); err != nil {
|
|
return
|
|
}
|
|
params := probe["params"]
|
|
|
|
switch method {
|
|
case "item/commandExecution/requestApproval", "item/fileChange/requestApproval":
|
|
s.handleApprovalRequest(rawID, method, params)
|
|
case "item/permissions/requestApproval":
|
|
s.handlePermissionsApproval(rawID, params)
|
|
case "item/tool/requestUserInput":
|
|
s.handleRequestUserInput(rawID, params)
|
|
case "item/tool/call":
|
|
s.handleDynamicToolCall(rawID, params)
|
|
default:
|
|
_ = s.writeJSON(map[string]any{
|
|
"jsonrpc": "2.0", "id": rawID,
|
|
"error": map[string]any{"code": -32601, "message": "method not found"},
|
|
})
|
|
}
|
|
}
|
|
|
|
func (s *appServerSession) handleApprovalRequest(rawID json.RawMessage, method string, paramsRaw json.RawMessage) {
|
|
requestID := string(rawID)
|
|
var params map[string]any
|
|
if err := json.Unmarshal(paramsRaw, ¶ms); err != nil {
|
|
return
|
|
}
|
|
|
|
toolName, toolInput := method, appServerJSON(params)
|
|
switch method {
|
|
case "item/commandExecution/requestApproval":
|
|
toolName = "Bash"
|
|
if cmd, _ := params["command"].(string); cmd != "" {
|
|
toolInput = cmd
|
|
if cwd, _ := params["cwd"].(string); cwd != "" {
|
|
toolInput += "\n(in " + cwd + ")"
|
|
}
|
|
}
|
|
case "item/fileChange/requestApproval":
|
|
toolName = "Patch"
|
|
if reason, _ := params["reason"].(string); reason != "" {
|
|
toolInput = reason
|
|
}
|
|
}
|
|
|
|
ch := make(chan core.PermissionResult, 1)
|
|
s.approvalsMu.Lock()
|
|
s.pendingApprovals[requestID] = ch
|
|
s.approvalsMu.Unlock()
|
|
|
|
s.flushPendingAsThinking()
|
|
s.emit(core.Event{
|
|
Type: core.EventPermissionRequest,
|
|
RequestID: requestID,
|
|
ToolName: toolName,
|
|
ToolInput: toolInput,
|
|
ToolInputRaw: params,
|
|
})
|
|
|
|
go func() {
|
|
timer := time.NewTimer(5 * time.Minute)
|
|
defer timer.Stop()
|
|
var result core.PermissionResult
|
|
select {
|
|
case result = <-ch:
|
|
case <-s.ctx.Done():
|
|
result = core.PermissionResult{Behavior: "deny"}
|
|
case <-timer.C:
|
|
result = core.PermissionResult{Behavior: "deny"}
|
|
}
|
|
s.approvalsMu.Lock()
|
|
delete(s.pendingApprovals, requestID)
|
|
s.approvalsMu.Unlock()
|
|
|
|
decision := "decline"
|
|
if strings.EqualFold(result.Behavior, "allow") {
|
|
decision = "accept"
|
|
}
|
|
_ = s.writeJSON(map[string]any{
|
|
"jsonrpc": "2.0", "id": rawID,
|
|
"result": map[string]any{"decision": decision},
|
|
})
|
|
}()
|
|
}
|
|
|
|
func (s *appServerSession) handlePermissionsApproval(rawID json.RawMessage, paramsRaw json.RawMessage) {
|
|
requestID := string(rawID)
|
|
var params map[string]any
|
|
if err := json.Unmarshal(paramsRaw, ¶ms); err != nil {
|
|
return
|
|
}
|
|
|
|
ch := make(chan core.PermissionResult, 1)
|
|
s.approvalsMu.Lock()
|
|
s.pendingApprovals[requestID] = ch
|
|
s.approvalsMu.Unlock()
|
|
|
|
s.flushPendingAsThinking()
|
|
s.emit(core.Event{
|
|
Type: core.EventPermissionRequest,
|
|
RequestID: requestID,
|
|
ToolName: "Permissions",
|
|
ToolInput: appServerJSON(params),
|
|
ToolInputRaw: params,
|
|
})
|
|
|
|
go func() {
|
|
timer := time.NewTimer(5 * time.Minute)
|
|
defer timer.Stop()
|
|
var result core.PermissionResult
|
|
select {
|
|
case result = <-ch:
|
|
case <-s.ctx.Done():
|
|
result = core.PermissionResult{Behavior: "deny"}
|
|
case <-timer.C:
|
|
result = core.PermissionResult{Behavior: "deny"}
|
|
}
|
|
s.approvalsMu.Lock()
|
|
delete(s.pendingApprovals, requestID)
|
|
s.approvalsMu.Unlock()
|
|
|
|
if strings.EqualFold(result.Behavior, "allow") {
|
|
perms := params["permissions"]
|
|
if perms == nil {
|
|
perms = map[string]any{}
|
|
}
|
|
_ = s.writeJSON(map[string]any{
|
|
"jsonrpc": "2.0", "id": rawID,
|
|
"result": map[string]any{"permissions": perms, "scope": "turn"},
|
|
})
|
|
} else {
|
|
_ = s.writeJSON(map[string]any{
|
|
"jsonrpc": "2.0", "id": rawID,
|
|
"result": map[string]any{"permissions": map[string]any{}},
|
|
})
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (s *appServerSession) handleRequestUserInput(rawID json.RawMessage, paramsRaw json.RawMessage) {
|
|
requestID := string(rawID)
|
|
var params appServerRequestUserInputParams
|
|
if err := json.Unmarshal(paramsRaw, ¶ms); err != nil {
|
|
_ = s.writeJSON(map[string]any{
|
|
"jsonrpc": "2.0", "id": rawID,
|
|
"error": map[string]any{"code": -32602, "message": "invalid params"},
|
|
})
|
|
return
|
|
}
|
|
|
|
questions := appServerRequestUserInputQuestions(params.Questions)
|
|
if len(questions) == 0 {
|
|
_ = s.writeJSON(map[string]any{
|
|
"jsonrpc": "2.0", "id": rawID,
|
|
"result": appServerRequestUserInputResponse{Answers: map[string]appServerRequestUserInputAnswer{}},
|
|
})
|
|
return
|
|
}
|
|
|
|
rawInput := appServerRequestUserInputRawInput(params)
|
|
ch := make(chan core.PermissionResult, 1)
|
|
s.approvalsMu.Lock()
|
|
s.pendingApprovals[requestID] = ch
|
|
s.approvalsMu.Unlock()
|
|
|
|
s.flushPendingAsThinking()
|
|
s.emit(core.Event{
|
|
Type: core.EventPermissionRequest,
|
|
RequestID: requestID,
|
|
ToolName: "AskUserQuestion",
|
|
ToolInput: appServerJSON(rawInput),
|
|
ToolInputRaw: rawInput,
|
|
Questions: questions,
|
|
})
|
|
|
|
go func() {
|
|
timer := time.NewTimer(5 * time.Minute)
|
|
defer timer.Stop()
|
|
var result core.PermissionResult
|
|
select {
|
|
case result = <-ch:
|
|
case <-s.ctx.Done():
|
|
result = core.PermissionResult{Behavior: "deny"}
|
|
case <-timer.C:
|
|
result = core.PermissionResult{Behavior: "deny"}
|
|
}
|
|
s.approvalsMu.Lock()
|
|
delete(s.pendingApprovals, requestID)
|
|
s.approvalsMu.Unlock()
|
|
|
|
response := appServerRequestUserInputResponseFromResult(params.Questions, result)
|
|
_ = s.writeJSON(map[string]any{
|
|
"jsonrpc": "2.0", "id": rawID,
|
|
"result": response,
|
|
})
|
|
}()
|
|
}
|
|
|
|
func (s *appServerSession) handleDynamicToolCall(rawID json.RawMessage, paramsRaw json.RawMessage) {
|
|
_ = s.writeJSON(map[string]any{
|
|
"jsonrpc": "2.0", "id": rawID,
|
|
"result": map[string]any{
|
|
"success": false,
|
|
"contentItems": []map[string]any{{"type": "inputText", "text": "tool not available on this client"}},
|
|
},
|
|
})
|
|
}
|
|
|
|
func appServerRequestUserInputQuestions(input []appServerRequestUserInputQuestion) []core.UserQuestion {
|
|
questions := make([]core.UserQuestion, 0, len(input))
|
|
for _, in := range input {
|
|
questionText := strings.TrimSpace(in.Question)
|
|
if questionText == "" {
|
|
continue
|
|
}
|
|
q := core.UserQuestion{
|
|
Question: questionText,
|
|
Header: strings.TrimSpace(in.Header),
|
|
}
|
|
for _, opt := range in.Options {
|
|
q.Options = append(q.Options, core.UserQuestionOption{
|
|
Label: strings.TrimSpace(opt.Label),
|
|
Description: strings.TrimSpace(opt.Description),
|
|
})
|
|
}
|
|
questions = append(questions, q)
|
|
}
|
|
return questions
|
|
}
|
|
|
|
func appServerRequestUserInputRawInput(params appServerRequestUserInputParams) map[string]any {
|
|
questions := make([]any, 0, len(params.Questions))
|
|
for _, in := range params.Questions {
|
|
q := map[string]any{
|
|
"id": in.ID,
|
|
"header": in.Header,
|
|
"question": in.Question,
|
|
"isOther": in.IsOther,
|
|
"isSecret": in.IsSecret,
|
|
"options": appServerRequestUserInputRawOptions(in.Options),
|
|
}
|
|
questions = append(questions, q)
|
|
}
|
|
return map[string]any{
|
|
"threadId": params.ThreadID,
|
|
"turnId": params.TurnID,
|
|
"itemId": params.ItemID,
|
|
"questions": questions,
|
|
}
|
|
}
|
|
|
|
func appServerRequestUserInputRawOptions(options []appServerRequestUserInputOption) []any {
|
|
out := make([]any, 0, len(options))
|
|
for _, opt := range options {
|
|
out = append(out, map[string]any{
|
|
"label": opt.Label,
|
|
"description": opt.Description,
|
|
})
|
|
}
|
|
return out
|
|
}
|
|
|
|
func appServerRequestUserInputResponseFromResult(questions []appServerRequestUserInputQuestion, result core.PermissionResult) appServerRequestUserInputResponse {
|
|
response := appServerRequestUserInputResponse{Answers: map[string]appServerRequestUserInputAnswer{}}
|
|
if !strings.EqualFold(result.Behavior, "allow") {
|
|
return response
|
|
}
|
|
|
|
answersRaw, _ := result.UpdatedInput["answers"].(map[string]any)
|
|
if len(answersRaw) == 0 {
|
|
return response
|
|
}
|
|
|
|
for _, q := range questions {
|
|
id := strings.TrimSpace(q.ID)
|
|
text := strings.TrimSpace(q.Question)
|
|
if id == "" || text == "" {
|
|
continue
|
|
}
|
|
values := appServerRequestUserInputAnswerValues(answersRaw[text])
|
|
if len(values) == 0 {
|
|
continue
|
|
}
|
|
response.Answers[id] = appServerRequestUserInputAnswer{Answers: values}
|
|
}
|
|
return response
|
|
}
|
|
|
|
func appServerRequestUserInputAnswerValues(raw any) []string {
|
|
switch v := raw.(type) {
|
|
case string:
|
|
if strings.TrimSpace(v) == "" {
|
|
return nil
|
|
}
|
|
return []string{v}
|
|
case []string:
|
|
values := make([]string, 0, len(v))
|
|
for _, s := range v {
|
|
if strings.TrimSpace(s) != "" {
|
|
values = append(values, s)
|
|
}
|
|
}
|
|
return values
|
|
case []any:
|
|
values := make([]string, 0, len(v))
|
|
for _, item := range v {
|
|
if s, ok := item.(string); ok && strings.TrimSpace(s) != "" {
|
|
values = append(values, s)
|
|
}
|
|
}
|
|
return values
|
|
case map[string]any:
|
|
return appServerRequestUserInputAnswerValues(v["answers"])
|
|
case appServerRequestUserInputAnswer:
|
|
return appServerRequestUserInputAnswerValues(v.Answers)
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (s *appServerSession) rejectPendingApprovals(err error) {
|
|
s.approvalsMu.Lock()
|
|
defer s.approvalsMu.Unlock()
|
|
for id, ch := range s.pendingApprovals {
|
|
delete(s.pendingApprovals, id)
|
|
select {
|
|
case ch <- core.PermissionResult{Behavior: "deny"}:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *appServerSession) Events() <-chan core.Event {
|
|
return s.events
|
|
}
|
|
|
|
func (s *appServerSession) CurrentSessionID() string {
|
|
v, _ := s.threadID.Load().(string)
|
|
return v
|
|
}
|
|
|
|
func (s *appServerSession) GetWorkDir() string {
|
|
s.runtimeMu.RLock()
|
|
defer s.runtimeMu.RUnlock()
|
|
return s.workDir
|
|
}
|
|
|
|
func (s *appServerSession) GetModel() string {
|
|
s.runtimeMu.RLock()
|
|
defer s.runtimeMu.RUnlock()
|
|
return strings.TrimSpace(s.model)
|
|
}
|
|
|
|
func (s *appServerSession) GetReasoningEffort() string {
|
|
s.runtimeMu.RLock()
|
|
defer s.runtimeMu.RUnlock()
|
|
return strings.TrimSpace(s.effort)
|
|
}
|
|
|
|
func (s *appServerSession) GetUsage(ctx context.Context) (*core.UsageReport, error) {
|
|
if err := s.refreshUsage(ctx); err != nil {
|
|
if cached := s.cachedUsage(); cached != nil {
|
|
return cached, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
if cached := s.cachedUsage(); cached != nil {
|
|
return cached, nil
|
|
}
|
|
return nil, fmt.Errorf("codex app-server usage unavailable")
|
|
}
|
|
|
|
func (s *appServerSession) GetContextUsage() *core.ContextUsage {
|
|
return s.cachedContextUsage()
|
|
}
|
|
|
|
func (s *appServerSession) Alive() bool {
|
|
return s.alive.Load()
|
|
}
|
|
|
|
func (s *appServerSession) Close() error {
|
|
s.alive.Store(false)
|
|
s.cancel()
|
|
|
|
s.procMu.Lock()
|
|
if s.stdin != nil {
|
|
_ = s.stdin.Close()
|
|
s.stdin = nil
|
|
}
|
|
if s.cmd != nil && s.cmd.Process != nil {
|
|
_ = s.cmd.Process.Kill()
|
|
}
|
|
s.procMu.Unlock()
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
s.wg.Wait()
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
case <-time.After(2 * time.Second):
|
|
}
|
|
|
|
s.closeOnce.Do(func() {
|
|
close(s.events)
|
|
})
|
|
return nil
|
|
}
|
|
|
|
func (s *appServerSession) readLoop(r io.Reader) {
|
|
defer s.wg.Done()
|
|
scanner := bufio.NewScanner(r)
|
|
scanBuf := make([]byte, 0, 64*1024)
|
|
const maxLineSize = 10 * 1024 * 1024 // 10MB
|
|
scanner.Buffer(scanBuf, maxLineSize)
|
|
|
|
for scanner.Scan() {
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
data := scanner.Bytes()
|
|
|
|
var probe map[string]json.RawMessage
|
|
if err := json.Unmarshal(data, &probe); err != nil {
|
|
slog.Debug("codex app-server: invalid JSON", "error", err)
|
|
continue
|
|
}
|
|
|
|
_, hasID := probe["id"]
|
|
_, hasMethod := probe["method"]
|
|
|
|
switch {
|
|
case hasID && !hasMethod:
|
|
// Response to one of our requests.
|
|
var resp rpcResponseEnvelope
|
|
if err := json.Unmarshal(data, &resp); err != nil {
|
|
slog.Debug("codex app-server: bad response envelope", "error", err)
|
|
continue
|
|
}
|
|
s.handleResponse(resp)
|
|
|
|
case hasID && hasMethod:
|
|
// Server-initiated request that requires a response (e.g. approval).
|
|
s.handleServerRequest(probe)
|
|
|
|
default:
|
|
// Notification (no id).
|
|
var notif rpcNotificationEnvelope
|
|
if err := json.Unmarshal(data, ¬if); err != nil {
|
|
slog.Debug("codex app-server: bad notification envelope", "error", err)
|
|
continue
|
|
}
|
|
s.handleNotification(notif.Method, notif.Params)
|
|
}
|
|
}
|
|
|
|
err := scanner.Err()
|
|
if err != nil {
|
|
if s.ctx.Err() == nil && !errors.Is(err, io.EOF) {
|
|
slog.Warn("codex app-server read failed", "error", err)
|
|
if errors.Is(err, bufio.ErrTooLong) {
|
|
s.emitError(fmt.Errorf("codex app-server line exceeds max size (%d bytes): %w", maxLineSize, err))
|
|
} else {
|
|
s.emitError(fmt.Errorf("codex app-server connection closed: %w", err))
|
|
}
|
|
}
|
|
s.alive.Store(false)
|
|
s.rejectPending(err)
|
|
s.rejectPendingApprovals(err)
|
|
return
|
|
}
|
|
|
|
s.alive.Store(false)
|
|
s.rejectPending(io.EOF)
|
|
s.rejectPendingApprovals(io.EOF)
|
|
}
|
|
|
|
func (s *appServerSession) stderrLoop(r io.Reader) {
|
|
defer s.wg.Done()
|
|
scanner := bufio.NewScanner(r)
|
|
buf := make([]byte, 0, 64*1024)
|
|
scanner.Buffer(buf, 1024*1024)
|
|
for scanner.Scan() {
|
|
line := strings.TrimSpace(scanner.Text())
|
|
if line == "" {
|
|
continue
|
|
}
|
|
slog.Debug("codex app-server stderr", "line", line)
|
|
}
|
|
if err := scanner.Err(); err != nil && s.ctx.Err() == nil {
|
|
slog.Debug("codex app-server stderr read failed", "error", err)
|
|
}
|
|
}
|
|
|
|
func (s *appServerSession) waitLoop() {
|
|
defer s.wg.Done()
|
|
|
|
s.procMu.Lock()
|
|
cmd := s.cmd
|
|
s.procMu.Unlock()
|
|
if cmd == nil {
|
|
return
|
|
}
|
|
|
|
err := cmd.Wait()
|
|
if s.ctx.Err() == nil && err != nil {
|
|
slog.Warn("codex app-server exited unexpectedly", "error", err)
|
|
s.emitError(fmt.Errorf("codex app-server exited: %w", err))
|
|
}
|
|
s.alive.Store(false)
|
|
if err == nil {
|
|
err = io.EOF
|
|
}
|
|
s.rejectPending(err)
|
|
}
|
|
|
|
func (s *appServerSession) handleResponse(resp rpcResponseEnvelope) {
|
|
id, ok := rpcIDToInt64(resp.ID)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
s.pendingMu.Lock()
|
|
ch := s.pending[id]
|
|
delete(s.pending, id)
|
|
s.pendingMu.Unlock()
|
|
|
|
if ch == nil {
|
|
return
|
|
}
|
|
|
|
select {
|
|
case ch <- resp:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (s *appServerSession) handleNotification(method string, paramsRaw json.RawMessage) {
|
|
switch method {
|
|
case "turn/started":
|
|
var notif turnNotification
|
|
if err := json.Unmarshal(paramsRaw, ¬if); err == nil {
|
|
s.stateMu.Lock()
|
|
s.currentTurn = notif.Turn.ID
|
|
s.pendingMsgs = s.pendingMsgs[:0]
|
|
s.stateMu.Unlock()
|
|
s.storeContextUsage(nil)
|
|
}
|
|
|
|
case "item/started":
|
|
var notif itemNotification
|
|
if err := json.Unmarshal(paramsRaw, ¬if); err == nil {
|
|
s.handleItemStarted(notif.Item)
|
|
}
|
|
|
|
case "item/completed":
|
|
var notif itemNotification
|
|
if err := json.Unmarshal(paramsRaw, ¬if); err == nil {
|
|
s.handleItemCompleted(notif.Item)
|
|
}
|
|
|
|
case "turn/completed":
|
|
var notif turnNotification
|
|
if err := json.Unmarshal(paramsRaw, ¬if); err == nil {
|
|
s.completeTurn()
|
|
}
|
|
|
|
case "thread/status/changed":
|
|
var notif struct {
|
|
ThreadID string `json:"threadId"`
|
|
Status struct {
|
|
Type string `json:"type"`
|
|
} `json:"status"`
|
|
}
|
|
if err := json.Unmarshal(paramsRaw, ¬if); err == nil && notif.Status.Type == "idle" {
|
|
// In codex 0.125+, thread going idle signals turn completion.
|
|
s.completeTurn()
|
|
}
|
|
|
|
case "account/rateLimits/updated":
|
|
var notif appServerRateLimitsResponse
|
|
if err := json.Unmarshal(paramsRaw, ¬if); err == nil {
|
|
s.storeUsage(mapAppServerRateLimits(notif))
|
|
}
|
|
|
|
case "thread/tokenUsage/updated":
|
|
var notif appServerThreadTokenUsageNotification
|
|
if err := json.Unmarshal(paramsRaw, ¬if); err == nil {
|
|
s.storeContextUsage(mapAppServerTokenUsage(notif))
|
|
}
|
|
|
|
case "error":
|
|
var notif errorNotification
|
|
if err := json.Unmarshal(paramsRaw, ¬if); err == nil && strings.TrimSpace(notif.Message) != "" {
|
|
s.emitError(fmt.Errorf("%s", notif.Message))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *appServerSession) handleItemStarted(item map[string]any) {
|
|
itemType, _ := item["type"].(string)
|
|
if itemType == "" {
|
|
return
|
|
}
|
|
|
|
switch itemType {
|
|
case "agentMessage", "reasoning", "userMessage", "plan", "hookPrompt", "contextCompaction":
|
|
return
|
|
}
|
|
|
|
s.flushPendingAsThinking()
|
|
|
|
switch itemType {
|
|
case "commandExecution":
|
|
command, _ := item["command"].(string)
|
|
s.emit(core.Event{Type: core.EventToolUse, ToolName: "Bash", ToolInput: command})
|
|
|
|
case "mcpToolCall":
|
|
server, _ := item["server"].(string)
|
|
tool, _ := item["tool"].(string)
|
|
name := strings.Trim(strings.Join([]string{server, tool}, ":"), ":")
|
|
s.emit(core.Event{Type: core.EventToolUse, ToolName: "MCP", ToolInput: name + "\n" + appServerJSON(item["arguments"])})
|
|
|
|
case "webSearch":
|
|
query, _ := item["query"].(string)
|
|
s.emit(core.Event{Type: core.EventToolUse, ToolName: "WebSearch", ToolInput: query})
|
|
|
|
case "dynamicToolCall":
|
|
tool, _ := item["tool"].(string)
|
|
s.emit(core.Event{Type: core.EventToolUse, ToolName: tool, ToolInput: appServerJSON(item["arguments"])})
|
|
|
|
case "fileChange":
|
|
s.emit(core.Event{Type: core.EventToolUse, ToolName: "Patch", ToolInput: appServerJSON(item["changes"])})
|
|
}
|
|
}
|
|
|
|
func (s *appServerSession) handleItemCompleted(item map[string]any) {
|
|
itemType, _ := item["type"].(string)
|
|
if itemType == "" {
|
|
return
|
|
}
|
|
|
|
switch itemType {
|
|
case "reasoning":
|
|
text := appServerReasoningText(item)
|
|
if text != "" {
|
|
s.emit(core.Event{Type: core.EventThinking, Content: text})
|
|
}
|
|
|
|
case "agentMessage":
|
|
text, _ := item["text"].(string)
|
|
if strings.TrimSpace(text) != "" {
|
|
s.stateMu.Lock()
|
|
s.pendingMsgs = append(s.pendingMsgs, text)
|
|
s.stateMu.Unlock()
|
|
}
|
|
|
|
case "commandExecution":
|
|
command, _ := item["command"].(string)
|
|
status, _ := item["status"].(string)
|
|
output, _ := item["aggregatedOutput"].(string)
|
|
exitCode, hasExitCode := toInt(item["exitCode"])
|
|
var exitCodePtr *int
|
|
if hasExitCode {
|
|
exitCodePtr = &exitCode
|
|
}
|
|
success := appServerToolSuccess(status, exitCodePtr)
|
|
s.emit(core.Event{
|
|
Type: core.EventToolResult,
|
|
ToolName: "Bash",
|
|
ToolInput: command,
|
|
ToolResult: truncate(strings.TrimSpace(output), 500),
|
|
ToolStatus: strings.TrimSpace(status),
|
|
ToolExitCode: exitCodePtr,
|
|
ToolSuccess: &success,
|
|
})
|
|
|
|
case "mcpToolCall":
|
|
tool, _ := item["tool"].(string)
|
|
status, _ := item["status"].(string)
|
|
result := appServerJSON(item["result"])
|
|
if errText := appServerJSON(item["error"]); strings.TrimSpace(errText) != "" && result == "" {
|
|
result = errText
|
|
}
|
|
success := appServerToolSuccess(status, nil)
|
|
s.emit(core.Event{
|
|
Type: core.EventToolResult,
|
|
ToolName: tool,
|
|
ToolResult: truncate(strings.TrimSpace(result), 500),
|
|
ToolStatus: strings.TrimSpace(status),
|
|
ToolSuccess: &success,
|
|
})
|
|
|
|
case "webSearch":
|
|
query, _ := item["query"].(string)
|
|
s.emit(core.Event{
|
|
Type: core.EventToolResult,
|
|
ToolName: "WebSearch",
|
|
ToolResult: truncate(strings.TrimSpace(query), 500),
|
|
})
|
|
|
|
case "dynamicToolCall":
|
|
tool, _ := item["tool"].(string)
|
|
status, _ := item["status"].(string)
|
|
result := appServerDynamicToolText(item["contentItems"])
|
|
success := appServerToolSuccess(status, nil)
|
|
s.emit(core.Event{
|
|
Type: core.EventToolResult,
|
|
ToolName: tool,
|
|
ToolResult: truncate(strings.TrimSpace(result), 500),
|
|
ToolStatus: strings.TrimSpace(status),
|
|
ToolSuccess: &success,
|
|
})
|
|
}
|
|
}
|
|
|
|
func appServerReasoningText(item map[string]any) string {
|
|
var parts []string
|
|
if summary, ok := item["summary"].([]any); ok {
|
|
for _, entry := range summary {
|
|
if text, ok := entry.(string); ok && strings.TrimSpace(text) != "" {
|
|
parts = append(parts, text)
|
|
}
|
|
}
|
|
}
|
|
if len(parts) == 0 {
|
|
if content, ok := item["content"].([]any); ok {
|
|
for _, entry := range content {
|
|
if text, ok := entry.(string); ok && strings.TrimSpace(text) != "" {
|
|
parts = append(parts, text)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return strings.Join(parts, "\n")
|
|
}
|
|
|
|
func appServerDynamicToolText(raw any) string {
|
|
items, ok := raw.([]any)
|
|
if !ok {
|
|
return appServerJSON(raw)
|
|
}
|
|
var parts []string
|
|
for _, entry := range items {
|
|
m, ok := entry.(map[string]any)
|
|
if !ok {
|
|
continue
|
|
}
|
|
if text, _ := m["text"].(string); strings.TrimSpace(text) != "" {
|
|
parts = append(parts, text)
|
|
}
|
|
}
|
|
if len(parts) == 0 {
|
|
return appServerJSON(raw)
|
|
}
|
|
return strings.Join(parts, "\n")
|
|
}
|
|
|
|
func appServerToolSuccess(status string, exitCode *int) bool {
|
|
s := strings.ToLower(strings.TrimSpace(status))
|
|
if exitCode != nil {
|
|
return *exitCode == 0
|
|
}
|
|
return s == "completed" || s == "success" || s == "succeeded" || s == "ok"
|
|
}
|
|
|
|
func mapAppServerRateLimits(payload appServerRateLimitsResponse) *core.UsageReport {
|
|
report := &core.UsageReport{Provider: "codex"}
|
|
|
|
var snapshots []appServerRateLimitSnapshot
|
|
if len(payload.RateLimitsByLimitID) > 0 {
|
|
keys := make([]string, 0, len(payload.RateLimitsByLimitID))
|
|
for key := range payload.RateLimitsByLimitID {
|
|
keys = append(keys, key)
|
|
}
|
|
sort.Strings(keys)
|
|
for _, key := range keys {
|
|
snapshots = append(snapshots, payload.RateLimitsByLimitID[key])
|
|
}
|
|
} else if payload.RateLimits.LimitID != "" || payload.RateLimits.Primary != nil || payload.RateLimits.Secondary != nil || payload.RateLimits.Credits != nil {
|
|
snapshots = append(snapshots, payload.RateLimits)
|
|
}
|
|
|
|
for _, snapshot := range snapshots {
|
|
if report.Plan == "" && strings.TrimSpace(snapshot.PlanType) != "" {
|
|
report.Plan = strings.TrimSpace(snapshot.PlanType)
|
|
}
|
|
if report.Credits == nil && snapshot.Credits != nil {
|
|
report.Credits = &core.UsageCredits{
|
|
HasCredits: snapshot.Credits.HasCredits,
|
|
Unlimited: snapshot.Credits.Unlimited,
|
|
}
|
|
if snapshot.Credits.Balance != nil {
|
|
report.Credits.Balance = strings.TrimSpace(*snapshot.Credits.Balance)
|
|
}
|
|
}
|
|
|
|
windows := appServerUsageWindows(snapshot)
|
|
if len(windows) == 0 {
|
|
continue
|
|
}
|
|
limitReached := false
|
|
for _, window := range windows {
|
|
if window.UsedPercent >= 100 {
|
|
limitReached = true
|
|
break
|
|
}
|
|
}
|
|
|
|
report.Buckets = append(report.Buckets, core.UsageBucket{
|
|
Name: appServerBucketName(snapshot),
|
|
Allowed: !limitReached,
|
|
LimitReached: limitReached,
|
|
Windows: windows,
|
|
})
|
|
}
|
|
|
|
return report
|
|
}
|
|
|
|
func appServerBucketName(snapshot appServerRateLimitSnapshot) string {
|
|
if name := strings.TrimSpace(snapshot.LimitName); name != "" {
|
|
return name
|
|
}
|
|
if id := strings.TrimSpace(snapshot.LimitID); id != "" {
|
|
return id
|
|
}
|
|
return "Rate limit"
|
|
}
|
|
|
|
func appServerUsageWindows(snapshot appServerRateLimitSnapshot) []core.UsageWindow {
|
|
var windows []core.UsageWindow
|
|
if snapshot.Primary != nil {
|
|
windows = append(windows, appServerUsageWindow("Primary", snapshot.Primary))
|
|
}
|
|
if snapshot.Secondary != nil {
|
|
windows = append(windows, appServerUsageWindow("Secondary", snapshot.Secondary))
|
|
}
|
|
return windows
|
|
}
|
|
|
|
func appServerUsageWindow(name string, window *appServerRateLimitWindow) core.UsageWindow {
|
|
resetAfter := 0
|
|
if window != nil && window.ResetsAt > 0 {
|
|
resetAfter = int(time.Until(time.Unix(window.ResetsAt, 0)).Seconds())
|
|
if resetAfter < 0 {
|
|
resetAfter = 0
|
|
}
|
|
}
|
|
return core.UsageWindow{
|
|
Name: name,
|
|
UsedPercent: window.UsedPercent,
|
|
WindowSeconds: window.WindowDurationMins * 60,
|
|
ResetAfterSeconds: resetAfter,
|
|
ResetAtUnix: window.ResetsAt,
|
|
}
|
|
}
|
|
|
|
func cloneUsageReport(report *core.UsageReport) *core.UsageReport {
|
|
if report == nil {
|
|
return nil
|
|
}
|
|
cloned := *report
|
|
if len(report.Buckets) > 0 {
|
|
cloned.Buckets = make([]core.UsageBucket, len(report.Buckets))
|
|
for i, bucket := range report.Buckets {
|
|
cloned.Buckets[i] = bucket
|
|
if len(bucket.Windows) > 0 {
|
|
cloned.Buckets[i].Windows = append([]core.UsageWindow(nil), bucket.Windows...)
|
|
}
|
|
}
|
|
}
|
|
if report.Credits != nil {
|
|
credits := *report.Credits
|
|
cloned.Credits = &credits
|
|
}
|
|
return &cloned
|
|
}
|
|
|
|
func normalizeRuntimeReasoningEffort(raw string) string {
|
|
switch strings.ToLower(strings.TrimSpace(raw)) {
|
|
case "":
|
|
return ""
|
|
case "med":
|
|
return "medium"
|
|
case "x-high", "very-high":
|
|
return "xhigh"
|
|
default:
|
|
return strings.ToLower(strings.TrimSpace(raw))
|
|
}
|
|
}
|
|
|
|
func stringValue(v *string) string {
|
|
if v == nil {
|
|
return ""
|
|
}
|
|
return strings.TrimSpace(*v)
|
|
}
|
|
|
|
func appServerJSON(v any) string {
|
|
if v == nil {
|
|
return ""
|
|
}
|
|
b, err := json.Marshal(v)
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
s := strings.TrimSpace(string(b))
|
|
if s == "null" || s == "{}" || s == "[]" || s == `""` {
|
|
return ""
|
|
}
|
|
return s
|
|
}
|
|
|
|
func toInt(v any) (int, bool) {
|
|
switch n := v.(type) {
|
|
case float64:
|
|
return int(n), true
|
|
case int:
|
|
return n, true
|
|
case int64:
|
|
return int(n), true
|
|
case json.Number:
|
|
i, err := n.Int64()
|
|
if err == nil {
|
|
return int(i), true
|
|
}
|
|
}
|
|
return 0, false
|
|
}
|
|
|
|
func rpcIDToInt64(v any) (int64, bool) {
|
|
switch id := v.(type) {
|
|
case float64:
|
|
return int64(id), true
|
|
case int64:
|
|
return id, true
|
|
case int:
|
|
return int64(id), true
|
|
case json.Number:
|
|
i, err := id.Int64()
|
|
return i, err == nil
|
|
}
|
|
return 0, false
|
|
}
|
|
|
|
func (s *appServerSession) completeTurn() {
|
|
s.stateMu.Lock()
|
|
if s.currentTurn == "" {
|
|
s.stateMu.Unlock()
|
|
return
|
|
}
|
|
s.currentTurn = ""
|
|
s.stateMu.Unlock()
|
|
s.flushPendingAsText()
|
|
s.emit(core.Event{Type: core.EventResult, SessionID: s.CurrentSessionID(), Done: true})
|
|
}
|
|
|
|
func (s *appServerSession) flushPendingAsThinking() {
|
|
s.stateMu.Lock()
|
|
msgs := append([]string(nil), s.pendingMsgs...)
|
|
s.pendingMsgs = s.pendingMsgs[:0]
|
|
s.stateMu.Unlock()
|
|
|
|
for _, text := range msgs {
|
|
if strings.TrimSpace(text) != "" {
|
|
s.emit(core.Event{Type: core.EventThinking, Content: text})
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *appServerSession) flushPendingAsText() {
|
|
s.stateMu.Lock()
|
|
msgs := append([]string(nil), s.pendingMsgs...)
|
|
s.pendingMsgs = s.pendingMsgs[:0]
|
|
s.stateMu.Unlock()
|
|
|
|
for _, text := range msgs {
|
|
if strings.TrimSpace(text) != "" {
|
|
s.emit(core.Event{Type: core.EventText, Content: text})
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *appServerSession) emit(event core.Event) {
|
|
select {
|
|
case s.events <- event:
|
|
default:
|
|
slog.Warn("codex appserver: event channel full, dropping event", "type", event.Type)
|
|
}
|
|
}
|
|
|
|
func (s *appServerSession) emitError(err error) {
|
|
if err == nil {
|
|
return
|
|
}
|
|
s.emit(core.Event{Type: core.EventError, Error: err})
|
|
}
|
|
|
|
func (s *appServerSession) rejectPending(err error) {
|
|
s.pendingMu.Lock()
|
|
defer s.pendingMu.Unlock()
|
|
for id, ch := range s.pending {
|
|
delete(s.pending, id)
|
|
select {
|
|
case ch <- rpcResponseEnvelope{ID: id, Error: &rpcError{Message: err.Error()}}:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *appServerSession) request(method string, params any, out any) error {
|
|
return s.requestWithTimeout(method, params, out, appServerRequestTimeout)
|
|
}
|
|
|
|
func (s *appServerSession) requestWithTimeout(method string, params any, out any, timeout time.Duration) error {
|
|
id := s.nextID.Add(1)
|
|
ch := make(chan rpcResponseEnvelope, 1)
|
|
|
|
s.pendingMu.Lock()
|
|
if s.pending == nil {
|
|
s.pending = make(map[int64]chan rpcResponseEnvelope)
|
|
}
|
|
s.pending[id] = ch
|
|
s.pendingMu.Unlock()
|
|
|
|
payload := map[string]any{
|
|
"jsonrpc": "2.0",
|
|
"id": id,
|
|
"method": method,
|
|
"params": params,
|
|
}
|
|
|
|
if err := s.writeJSON(payload); err != nil {
|
|
s.pendingMu.Lock()
|
|
delete(s.pending, id)
|
|
s.pendingMu.Unlock()
|
|
return err
|
|
}
|
|
|
|
select {
|
|
case resp := <-ch:
|
|
if resp.Error != nil {
|
|
return fmt.Errorf("%s", strings.TrimSpace(resp.Error.Message))
|
|
}
|
|
if out != nil {
|
|
if err := json.Unmarshal(resp.Result, out); err != nil {
|
|
return fmt.Errorf("decode %s response: %w", method, err)
|
|
}
|
|
}
|
|
return nil
|
|
case <-s.ctx.Done():
|
|
return s.ctx.Err()
|
|
case <-time.After(timeout):
|
|
s.pendingMu.Lock()
|
|
delete(s.pending, id)
|
|
s.pendingMu.Unlock()
|
|
return fmt.Errorf("%s timed out", method)
|
|
}
|
|
}
|
|
|
|
func (s *appServerSession) notify(method string, params any) error {
|
|
payload := map[string]any{
|
|
"jsonrpc": "2.0",
|
|
"method": method,
|
|
}
|
|
if params != nil {
|
|
payload["params"] = params
|
|
}
|
|
return s.writeJSON(payload)
|
|
}
|
|
|
|
func (s *appServerSession) writeJSON(v any) error {
|
|
b, err := json.Marshal(v)
|
|
if err != nil {
|
|
return fmt.Errorf("codex app-server encode: %w", err)
|
|
}
|
|
|
|
s.procMu.Lock()
|
|
stdin := s.stdin
|
|
s.procMu.Unlock()
|
|
if stdin == nil {
|
|
return fmt.Errorf("codex app-server connection is closed")
|
|
}
|
|
|
|
s.writeMu.Lock()
|
|
defer s.writeMu.Unlock()
|
|
if _, err := stdin.Write(append(b, '\n')); err != nil {
|
|
return fmt.Errorf("codex app-server write: %w", err)
|
|
}
|
|
return nil
|
|
}
|