Files
microsoft-SkillOpt/skillopt/model/codex_backend.py
Yifan Yang 14c045f04f Windows robustness for claude/codex backends (+ hardened JSON fallback) (#79)
* Robustness for the claude/codex backends on Windows: argv overflow, subprocess encoding, tolerant JSON, test-eval dirs

Fixes surfaced running SkillOpt end-to-end on the bundled `claude` backend
(local Claude CLI) on Windows. None changes the OpenAI/GPT happy path.

1. skillopt/engine/trainer.py — the final test-eval directory
   (test_eval_final/) is written to before being created; add
   os.makedirs(..., exist_ok=True), matching the two sibling test-eval dirs.
   Without it, summary.json raises FileNotFoundError when a rollout yields
   zero predictions.

2. skillopt/model/claude_backend.py
   a. Pass the prompt via stdin (not argv): on Windows the whole command line
      is capped at ~32 KB and a large optimizer prompt (the success-analyst
      minibatch carrying several report trajectories) overflows it with
      [WinError 206], killing the run after retries.
   b. Pass the system prompt via --append-system-prompt-file (a temp file),
      not argv. The system prompt here is the skill being optimized, which
      SkillOpt grows over training; since the ~32 KB cap applies to the SUM of
      all argv, a grown skill would re-hit [WinError 206] even with the prompt
      on stdin.
   c. Pin the subprocess encoding to utf-8 (errors="replace"). With text=True
      and no encoding=, stdin is encoded with the system codepage; on a zh-CN
      box (cp936/GBK) a prompt containing an emoji or some Latin-1 characters
      raises UnicodeEncodeError before the CLI even starts, failing every retry.

3. skillopt/model/codex_backend.py — the same utf-8 encoding pin on its
   subprocess.run(input=...) call (identical unpinned-encoding pattern).

4. skillopt/utils/json_utils.py — extract_json() returned None for valid-
   looking JSON that strict json.loads rejects (unescaped ASCII quotes inside
   CJK string values, trailing commas), silently dropping the analyst's edits
   on non-schema backends (Claude/Qwen): reflect produces N edits, 0 applied.
   Add a json_repair fallback, but only on a single unambiguous object — a
   balanced-brace extractor plus a refuse-on-multiple-objects guard — so a
   chain-of-thought "scratch + final" response can't make repair silently
   return the wrong (discarded) object, which would be worse than None (None is
   detectable and retryable; a wrong-but-valid edit is applied blind). Declare
   json_repair in requirements.txt and the claude/qwen optional extras so the
   fallback is actually present (it otherwise no-ops, dropping edits silently).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
(cherry picked from commit dca74a683e)

* fix(json_utils): harden tolerant JSON fallback from PR #77

Follow-up fixes on top of the cherry-picked Windows-robustness change:

1. Make _top_level_brace_objects() fully string-aware in its OUTER scan, not
   just inside an object. A '{' inside quoted prose (e.g. '"set it to {x}"')
   no longer starts a candidate object, so extract_json() returns None for
   prose pseudo-JSON instead of repairing it into a bogus dict — which would
   be strictly worse than dropping the edit, since extract_json feeds the
   optimizer's skill edits.

2. Pick the repair candidate BEFORE importing json_repair, so the missing-
   dependency RuntimeWarning only fires when there is genuinely a single
   malformed object that could have been repaired. Ordinary no-JSON / prose
   replies (the common case) now return None silently instead of warning on
   every call.

3. Resolve dependency-metadata inconsistency: json_repair is optional, so add
   it to the `all` extra (it was already in `claude`/`qwen`) and demote it
   from a hard requirement to an optional/commented entry in requirements.txt,
   matching the project's convention for backend-specific deps.

Adds regression tests for prose-with-braces (-> None), no-warning-on-plain-
text, single-object repair, and multi-object ambiguity. Existing 22 json
tests still pass with and without json_repair installed.

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: samuelgoofus-boop <260247789+samuelgoofus-boop@users.noreply.github.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-23 19:00:23 +08:00

667 lines
20 KiB
Python

"""Codex CLI backend for ReflACT."""
from __future__ import annotations
import base64
import json
import mimetypes
import os
import subprocess
import tempfile
import time
import uuid
from typing import Any
from urllib.parse import unquote, urlparse
from skillopt.model.common import (
CompatAssistantMessage,
CompatToolCall,
CompatToolFunction,
tracker,
)
CODEX_BIN = os.environ.get("CODEX_CLI_BIN", "codex")
CODEX_PROFILE = os.environ.get("CODEX_PROFILE", "review")
CODEX_SANDBOX_MODE = os.environ.get("CODEX_SANDBOX_MODE", "read-only")
OPTIMIZER_DEPLOYMENT = os.environ.get("OPTIMIZER_DEPLOYMENT", "gpt-4o")
TARGET_DEPLOYMENT = os.environ.get("TARGET_DEPLOYMENT", "gpt-4o")
REASONING_EFFORT: str | None = None
def _default_working_directory() -> str:
return os.environ.get("CODEX_WORKING_DIRECTORY", os.getcwd())
def _parse_data_uri(url: str) -> tuple[bytes, str]:
header, data = url.split(",", 1)
mime = header[5:].split(";", 1)[0] or "image/png"
return base64.b64decode(data), mime
def _content_to_text(
content: Any,
attachments: list[dict[str, Any]],
*,
image_counter: int,
) -> tuple[str, int]:
if isinstance(content, str):
return content, image_counter
if not isinstance(content, list):
return str(content), image_counter
parts: list[str] = []
for item in content:
if not isinstance(item, dict):
continue
item_type = item.get("type")
if item_type == "text":
parts.append(str(item.get("text", "")))
continue
if item_type != "image_url":
continue
image_counter += 1
label = f"[Attached image {image_counter}]"
parts.append(label)
image_url = item.get("image_url", {}) or {}
url = str(image_url.get("url", "") or "")
if not url:
continue
if url.startswith("data:") and ";base64," in url:
data, mime = _parse_data_uri(url)
attachments.append({"bytes": data, "mime": mime})
continue
if url.startswith("file://"):
parsed = urlparse(url)
path = unquote(parsed.path)
if path:
attachments.append({"path": path})
continue
if os.path.exists(url):
attachments.append({"path": url})
return "".join(parts), image_counter
def _simplify_tool_schemas(tools: list[dict[str, Any]] | None) -> list[dict[str, Any]]:
simplified: list[dict[str, Any]] = []
for tool in tools or []:
function = tool.get("function", tool)
simplified.append(
{
"name": function.get("name", ""),
"description": function.get("description", ""),
"parameters": function.get("parameters", {}),
}
)
return simplified
def _build_prompt_from_messages(
messages: list[dict[str, Any]],
*,
tools: list[dict[str, Any]] | None = None,
tool_choice: str | dict[str, Any] | None = None,
structured_output: bool = False,
) -> tuple[str, list[dict[str, Any]]]:
system_parts: list[str] = []
history_parts: list[str] = []
attachments: list[dict[str, Any]] = []
image_counter = 0
def _history_line(label: str, body: str) -> str:
stripped = body.strip()
if not stripped:
return f"- {label}:"
indented = stripped.replace("\n", "\n ")
return f"- {label}: {indented}"
for message in messages:
role = str(message.get("role", "user"))
text, image_counter = _content_to_text(
message.get("content", ""),
attachments,
image_counter=image_counter,
)
if role == "system":
if text.strip():
system_parts.append(text.strip())
continue
if role == "assistant":
block = _history_line("Assistant", text)
tool_calls = message.get("tool_calls") or []
if tool_calls:
simplified_calls = []
for tool_call in tool_calls:
function = tool_call.get("function", {}) or {}
simplified_calls.append(
{
"name": function.get("name", ""),
"arguments": function.get("arguments", "{}"),
}
)
block += (
"\n Compatibility tool requests:\n"
+ json.dumps(simplified_calls, ensure_ascii=False, indent=2)
)
history_parts.append(block)
continue
if role == "tool":
tool_call_id = str(message.get("tool_call_id", "") or "")
label = f"Tool result (tool_call_id={tool_call_id})"
history_parts.append(_history_line(label, text))
continue
history_parts.append(_history_line(role.capitalize(), text))
prompt_parts: list[str] = []
system_text = "\n\n".join(part for part in system_parts if part).strip()
if system_text:
prompt_parts.append(system_text)
if tools:
simplified_tools = _simplify_tool_schemas(tools)
prompt_parts.append(
"Available compatibility tools:\n"
+ json.dumps(simplified_tools, ensure_ascii=False, indent=2)
)
prompt_parts.append(
"Do not execute these tools yourself. If you need one, request it in "
"`tool_calls`. Each `arguments` field must be a JSON string."
)
if tool_choice == "required":
prompt_parts.append(
"Tool choice policy: you must request at least one compatibility tool."
)
elif isinstance(tool_choice, dict) and tool_choice.get("type") == "function":
function = tool_choice.get("function", {}) or {}
prompt_parts.append(
"Tool choice policy: you must request the compatibility tool "
f"`{function.get('name', '')}`."
)
history_text = "\n".join(part for part in history_parts if part).strip()
if history_text:
prompt_parts.append("History:\n" + history_text)
if structured_output:
prompt_parts.append("Return only JSON matching the provided schema.")
if tools:
prompt_parts.append(
"Set `content` to the assistant-visible reply. Set `tool_calls` to "
"an empty array when no tool is needed."
)
else:
prompt_parts.append("Answer the latest user request.")
return "\n\n".join(prompt_parts), attachments
def _assistant_message_schema() -> dict[str, Any]:
return {
"type": "object",
"properties": {
"content": {"type": "string"},
"tool_calls": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"arguments": {"type": "string"},
},
"required": ["name", "arguments"],
"additionalProperties": False,
},
},
},
"required": ["content", "tool_calls"],
"additionalProperties": False,
}
def _materialize_attachments(
attachments: list[dict[str, Any]],
temp_dir: str,
) -> list[str]:
image_paths: list[str] = []
for index, attachment in enumerate(attachments, 1):
path = attachment.get("path")
if path:
image_paths.append(str(path))
continue
mime = str(attachment.get("mime", "image/png"))
suffix = mimetypes.guess_extension(mime) or ".png"
image_path = os.path.join(temp_dir, f"image_{index}{suffix}")
with open(image_path, "wb") as f:
f.write(attachment.get("bytes", b""))
image_paths.append(image_path)
return image_paths
def _usage_from_event(usage: dict[str, Any] | None) -> dict[str, int]:
usage = usage or {}
prompt_tokens = int(usage.get("input_tokens", 0) or 0)
completion_tokens = int(usage.get("output_tokens", 0) or 0)
return {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens,
}
def _extract_error(stdout: str, stderr: str) -> str:
for raw_line in reversed(stdout.splitlines()):
line = raw_line.strip()
if not line:
continue
try:
payload = json.loads(line)
except json.JSONDecodeError:
continue
if payload.get("type") == "turn.failed":
error = payload.get("error", {}) or {}
return str(error.get("message", "") or "Codex turn failed")
if payload.get("type") == "error":
return str(payload.get("message", "") or "Codex execution failed")
return stderr.strip() or stdout.strip() or "Codex execution failed"
def _run_codex_exec(
*,
model: str,
prompt: str,
attachments: list[dict[str, Any]],
output_schema: dict[str, Any] | None,
timeout: int | None,
) -> tuple[str, dict[str, int]]:
with tempfile.TemporaryDirectory(prefix="skillopt_codex_") as temp_dir:
output_path = os.path.join(temp_dir, "last_message.txt")
image_paths = _materialize_attachments(attachments, temp_dir)
command = [
CODEX_BIN,
"exec",
"--json",
"--ephemeral",
"--profile",
CODEX_PROFILE,
"-c",
"approval_policy=\"never\"",
"--sandbox",
CODEX_SANDBOX_MODE,
"--skip-git-repo-check",
"--cd",
_default_working_directory(),
"--model",
model,
"--output-last-message",
output_path,
]
if REASONING_EFFORT:
command.extend(["-c", f"model_reasoning_effort={json.dumps(REASONING_EFFORT)}"])
schema_path = None
if output_schema is not None:
schema_path = os.path.join(temp_dir, "schema.json")
with open(schema_path, "w", encoding="utf-8") as f:
json.dump(output_schema, f, ensure_ascii=False)
command.extend(["--output-schema", schema_path])
for image_path in image_paths:
command.extend(["--image", image_path])
command.append("-")
proc = subprocess.run(
command,
input=prompt,
text=True,
encoding="utf-8",
errors="replace",
capture_output=True,
timeout=timeout,
check=False,
)
usage_info = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
fallback_text = ""
for raw_line in proc.stdout.splitlines():
line = raw_line.strip()
if not line:
continue
try:
payload = json.loads(line)
except json.JSONDecodeError:
continue
if payload.get("type") == "item.completed":
item = payload.get("item", {}) or {}
if item.get("type") == "agent_message":
fallback_text = str(item.get("text", "") or fallback_text)
if payload.get("type") == "turn.completed":
usage_info = _usage_from_event(payload.get("usage"))
last_message = ""
if os.path.exists(output_path):
with open(output_path, encoding="utf-8") as f:
last_message = f.read().strip()
if not last_message:
last_message = fallback_text.strip()
if proc.returncode != 0:
raise RuntimeError(_extract_error(proc.stdout, proc.stderr))
if not last_message:
raise RuntimeError("Codex returned an empty final message")
return last_message, usage_info
def _tool_name_from_choice(tool_choice: str | dict[str, Any] | None) -> str | None:
if not isinstance(tool_choice, dict):
return None
if tool_choice.get("type") != "function":
return None
function = tool_choice.get("function", {}) or {}
return str(function.get("name", "") or "") or None
def _compat_message_from_payload(
payload: dict[str, Any],
*,
tool_choice: str | dict[str, Any] | None = None,
) -> CompatAssistantMessage:
content = str(payload.get("content", "") or "")
tool_calls: list[CompatToolCall] = []
for index, raw_tool_call in enumerate(payload.get("tool_calls", []) or [], 1):
if not isinstance(raw_tool_call, dict):
continue
name = str(raw_tool_call.get("name", "") or "")
arguments = raw_tool_call.get("arguments", "{}")
if not isinstance(arguments, str):
arguments = json.dumps(arguments, ensure_ascii=False)
tool_calls.append(
CompatToolCall(
id=f"tool_{index}_{uuid.uuid4().hex[:12]}",
function=CompatToolFunction(name=name, arguments=arguments),
)
)
if tool_choice == "required" and not tool_calls:
raise RuntimeError("Codex response did not request a tool under tool_choice='required'")
required_name = _tool_name_from_choice(tool_choice)
if required_name and all(
tool_call.function.name != required_name for tool_call in tool_calls
):
raise RuntimeError(
f"Codex response did not request the required tool {required_name!r}"
)
return CompatAssistantMessage(content=content, tool_calls=tool_calls)
def _chat_messages_impl(
model: str,
messages: list[dict[str, Any]],
max_completion_tokens: int,
retries: int,
stage: str,
*,
tools: list[dict[str, Any]] | None = None,
tool_choice: str | dict[str, Any] | None = None,
return_message: bool = False,
timeout: int | None = None,
) -> tuple[Any, dict[str, int]]:
del max_completion_tokens
last_err = None
structured_output = bool(tools) or return_message
for attempt in range(retries):
try:
prompt, attachments = _build_prompt_from_messages(
messages,
tools=tools,
tool_choice=tool_choice,
structured_output=structured_output,
)
raw_text, usage_info = _run_codex_exec(
model=model,
prompt=prompt,
attachments=attachments,
output_schema=_assistant_message_schema() if structured_output else None,
timeout=timeout,
)
tracker.record(
stage,
usage_info["prompt_tokens"],
usage_info["completion_tokens"],
)
if not structured_output:
return raw_text, usage_info
payload = json.loads(raw_text)
compat = _compat_message_from_payload(payload, tool_choice=tool_choice)
return (compat if return_message else compat.content), usage_info
except subprocess.TimeoutExpired as exc:
last_err = RuntimeError(f"Codex CLI timed out after {timeout}s") if timeout else exc
except Exception as exc: # noqa: BLE001
last_err = exc
time.sleep(min(2 ** attempt, 30))
raise RuntimeError(f"Codex call failed after {retries} retries: {last_err}")
def chat_with_model(
model: str,
system: str,
user: str,
max_completion_tokens: int = 16384,
retries: int = 5,
stage: str = "custom",
timeout: int | None = None,
) -> tuple[str, dict[str, int]]:
messages = [
{"role": "system", "content": system},
{"role": "user", "content": user},
]
return _chat_messages_impl(
model,
messages,
max_completion_tokens,
retries,
stage,
timeout=timeout,
)
def chat_messages_with_model(
model: str,
messages: list[dict[str, Any]],
max_completion_tokens: int = 16384,
retries: int = 5,
stage: str = "custom",
*,
tools: list[dict[str, Any]] | None = None,
tool_choice: str | dict[str, Any] | None = None,
return_message: bool = False,
timeout: int | None = None,
) -> tuple[Any, dict[str, int]]:
return _chat_messages_impl(
model,
messages,
max_completion_tokens,
retries,
stage,
tools=tools,
tool_choice=tool_choice,
return_message=return_message,
timeout=timeout,
)
def chat_optimizer(
system: str,
user: str,
max_completion_tokens: int = 16384,
retries: int = 5,
stage: str = "optimizer",
timeout: int | None = None,
) -> tuple[str, dict[str, int]]:
return chat_with_model(
model=OPTIMIZER_DEPLOYMENT,
system=system,
user=user,
max_completion_tokens=max_completion_tokens,
retries=retries,
stage=stage,
timeout=timeout,
)
def chat_with_deployment(
deployment: str,
system: str,
user: str,
max_completion_tokens: int = 16384,
retries: int = 5,
stage: str = "custom",
timeout: int | None = None,
) -> tuple[str, dict[str, int]]:
return chat_with_model(
model=deployment,
system=system,
user=user,
max_completion_tokens=max_completion_tokens,
retries=retries,
stage=stage,
timeout=timeout,
)
def chat_target(
system: str,
user: str,
max_completion_tokens: int = 16384,
retries: int = 5,
stage: str = "target",
timeout: int | None = None,
) -> tuple[str, dict[str, int]]:
return chat_with_model(
model=TARGET_DEPLOYMENT,
system=system,
user=user,
max_completion_tokens=max_completion_tokens,
retries=retries,
stage=stage,
timeout=timeout,
)
def chat_optimizer_messages(
messages: list[dict[str, Any]],
max_completion_tokens: int = 16384,
retries: int = 5,
stage: str = "optimizer",
*,
tools: list[dict[str, Any]] | None = None,
tool_choice: str | dict[str, Any] | None = None,
return_message: bool = False,
timeout: int | None = None,
) -> tuple[Any, dict[str, int]]:
return _chat_messages_impl(
OPTIMIZER_DEPLOYMENT,
messages,
max_completion_tokens,
retries,
stage,
tools=tools,
tool_choice=tool_choice,
return_message=return_message,
timeout=timeout,
)
def chat_messages_with_deployment(
deployment: str,
messages: list[dict[str, Any]],
max_completion_tokens: int = 16384,
retries: int = 5,
stage: str = "custom",
*,
tools: list[dict[str, Any]] | None = None,
tool_choice: str | dict[str, Any] | None = None,
return_message: bool = False,
timeout: int | None = None,
) -> tuple[Any, dict[str, int]]:
return _chat_messages_impl(
deployment,
messages,
max_completion_tokens,
retries,
stage,
tools=tools,
tool_choice=tool_choice,
return_message=return_message,
timeout=timeout,
)
def chat_target_messages(
messages: list[dict[str, Any]],
max_completion_tokens: int = 16384,
retries: int = 5,
stage: str = "target",
*,
tools: list[dict[str, Any]] | None = None,
tool_choice: str | dict[str, Any] | None = None,
return_message: bool = False,
timeout: int | None = None,
) -> tuple[Any, dict[str, int]]:
return _chat_messages_impl(
TARGET_DEPLOYMENT,
messages,
max_completion_tokens,
retries,
stage,
tools=tools,
tool_choice=tool_choice,
return_message=return_message,
timeout=timeout,
)
def get_token_summary() -> dict[str, dict[str, int]]:
return tracker.summary()
def reset_token_tracker() -> None:
tracker.reset()
def set_target_deployment(deployment: str) -> None:
global TARGET_DEPLOYMENT
TARGET_DEPLOYMENT = deployment
os.environ["TARGET_DEPLOYMENT"] = deployment
def set_reasoning_effort(effort: str | None) -> None:
global REASONING_EFFORT
REASONING_EFFORT = effort if effort else None
def set_optimizer_deployment(deployment: str) -> None:
global OPTIMIZER_DEPLOYMENT
OPTIMIZER_DEPLOYMENT = deployment
os.environ["OPTIMIZER_DEPLOYMENT"] = deployment