Files
microsoft-SkillOpt/skillopt/model/claude_backend.py
Yifan Yang 14c045f04f Windows robustness for claude/codex backends (+ hardened JSON fallback) (#79)
* Robustness for the claude/codex backends on Windows: argv overflow, subprocess encoding, tolerant JSON, test-eval dirs

Fixes surfaced running SkillOpt end-to-end on the bundled `claude` backend
(local Claude CLI) on Windows. None changes the OpenAI/GPT happy path.

1. skillopt/engine/trainer.py — the final test-eval directory
   (test_eval_final/) is written to before being created; add
   os.makedirs(..., exist_ok=True), matching the two sibling test-eval dirs.
   Without it, summary.json raises FileNotFoundError when a rollout yields
   zero predictions.

2. skillopt/model/claude_backend.py
   a. Pass the prompt via stdin (not argv): on Windows the whole command line
      is capped at ~32 KB and a large optimizer prompt (the success-analyst
      minibatch carrying several report trajectories) overflows it with
      [WinError 206], killing the run after retries.
   b. Pass the system prompt via --append-system-prompt-file (a temp file),
      not argv. The system prompt here is the skill being optimized, which
      SkillOpt grows over training; since the ~32 KB cap applies to the SUM of
      all argv, a grown skill would re-hit [WinError 206] even with the prompt
      on stdin.
   c. Pin the subprocess encoding to utf-8 (errors="replace"). With text=True
      and no encoding=, stdin is encoded with the system codepage; on a zh-CN
      box (cp936/GBK) a prompt containing an emoji or some Latin-1 characters
      raises UnicodeEncodeError before the CLI even starts, failing every retry.

3. skillopt/model/codex_backend.py — the same utf-8 encoding pin on its
   subprocess.run(input=...) call (identical unpinned-encoding pattern).

4. skillopt/utils/json_utils.py — extract_json() returned None for valid-
   looking JSON that strict json.loads rejects (unescaped ASCII quotes inside
   CJK string values, trailing commas), silently dropping the analyst's edits
   on non-schema backends (Claude/Qwen): reflect produces N edits, 0 applied.
   Add a json_repair fallback, but only on a single unambiguous object — a
   balanced-brace extractor plus a refuse-on-multiple-objects guard — so a
   chain-of-thought "scratch + final" response can't make repair silently
   return the wrong (discarded) object, which would be worse than None (None is
   detectable and retryable; a wrong-but-valid edit is applied blind). Declare
   json_repair in requirements.txt and the claude/qwen optional extras so the
   fallback is actually present (it otherwise no-ops, dropping edits silently).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
(cherry picked from commit dca74a683e)

* fix(json_utils): harden tolerant JSON fallback from PR #77

Follow-up fixes on top of the cherry-picked Windows-robustness change:

1. Make _top_level_brace_objects() fully string-aware in its OUTER scan, not
   just inside an object. A '{' inside quoted prose (e.g. '"set it to {x}"')
   no longer starts a candidate object, so extract_json() returns None for
   prose pseudo-JSON instead of repairing it into a bogus dict — which would
   be strictly worse than dropping the edit, since extract_json feeds the
   optimizer's skill edits.

2. Pick the repair candidate BEFORE importing json_repair, so the missing-
   dependency RuntimeWarning only fires when there is genuinely a single
   malformed object that could have been repaired. Ordinary no-JSON / prose
   replies (the common case) now return None silently instead of warning on
   every call.

3. Resolve dependency-metadata inconsistency: json_repair is optional, so add
   it to the `all` extra (it was already in `claude`/`qwen`) and demote it
   from a hard requirement to an optional/commented entry in requirements.txt,
   matching the project's convention for backend-specific deps.

Adds regression tests for prose-with-braces (-> None), no-warning-on-plain-
text, single-object repair, and multi-object ambiguity. Existing 22 json
tests still pass with and without json_repair installed.

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: samuelgoofus-boop <260247789+samuelgoofus-boop@users.noreply.github.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-23 19:00:23 +08:00

372 lines
18 KiB
Python

"""Claude CLI chat backend for ReflACT."""
from __future__ import annotations
import base64
import json
import mimetypes
import os
import shutil
import subprocess
import tempfile
import time
from typing import Any
from urllib.parse import unquote, urlparse
from skillopt.model.common import CompatAssistantMessage, CompatToolCall, CompatToolFunction, default_model_for_backend, tracker
CLAUDE_BIN = os.environ.get("CLAUDE_CLI_BIN", "claude")
CLAUDE_PERMISSION_MODE = os.environ.get("CLAUDE_PERMISSION_MODE", "dontAsk")
CLAUDE_SETTING_SOURCES = os.environ.get("CLAUDE_SETTING_SOURCES", "user,project")
CLAUDE_ALLOW_ATTACHMENT_READ = os.environ.get("CLAUDE_ALLOW_ATTACHMENT_READ", "1").strip().lower() not in {"0", "false", "no"}
OPTIMIZER_DEPLOYMENT = os.environ.get("OPTIMIZER_DEPLOYMENT", "claude-sonnet-4-6")
TARGET_DEPLOYMENT = os.environ.get("TARGET_DEPLOYMENT", "claude-sonnet-4-6")
REASONING_EFFORT: str | None = None
_VALID_EFFORTS = {"low", "medium", "high", "xhigh", "max"}
def _parse_data_uri(url: str) -> tuple[bytes, str]:
header, data = url.split(",", 1)
mime = header[5:].split(";", 1)[0] or "image/png"
return base64.b64decode(data), mime
def _content_to_text(content: Any, attachments: list[dict[str, Any]], *, image_counter: int) -> tuple[str, int]:
if isinstance(content, str):
return content, image_counter
if not isinstance(content, list):
return str(content), image_counter
parts: list[str] = []
for item in content:
if not isinstance(item, dict):
continue
item_type = item.get("type")
if item_type == "text":
parts.append(str(item.get("text", "")))
continue
if item_type != "image_url":
continue
image_counter += 1
label = f"[Attached image {image_counter}]"
parts.append(label)
image_url = item.get("image_url", {}) or {}
url = str(image_url.get("url", "") or "")
if not url:
continue
if url.startswith("data:") and ";base64," in url:
data, mime = _parse_data_uri(url)
attachments.append({"bytes": data, "mime": mime, "label": label})
continue
if url.startswith("file://"):
parsed = urlparse(url)
path = unquote(parsed.path)
if path:
attachments.append({"path": path, "label": label})
continue
if os.path.exists(url):
attachments.append({"path": url, "label": label})
return "".join(parts), image_counter
def _simplify_tool_schemas(tools: list[dict[str, Any]] | None) -> list[dict[str, Any]]:
simplified: list[dict[str, Any]] = []
for tool in tools or []:
function = tool.get("function", tool)
simplified.append({
"name": function.get("name", ""),
"description": function.get("description", ""),
"parameters": function.get("parameters", {}),
})
return simplified
def _build_prompt_from_messages(messages: list[dict[str, Any]], *, tools: list[dict[str, Any]] | None = None, tool_choice: str | dict[str, Any] | None = None, structured_output: bool = False) -> tuple[str, str, list[dict[str, Any]]]:
system_parts: list[str] = []
history_parts: list[str] = []
attachments: list[dict[str, Any]] = []
image_counter = 0
def _history_line(label: str, body: str) -> str:
stripped = body.strip()
if not stripped:
return f"- {label}:"
indented = stripped.replace("\n", "\n ")
return f"- {label}: {indented}"
for message in messages:
role = str(message.get("role", "user"))
text, image_counter = _content_to_text(message.get("content", ""), attachments, image_counter=image_counter)
if role == "system":
if text.strip():
system_parts.append(text.strip())
continue
if role == "assistant":
block = _history_line("Assistant", text)
tool_calls = message.get("tool_calls") or []
if tool_calls:
simplified_calls = []
for tool_call in tool_calls:
function = tool_call.get("function", {}) or {}
simplified_calls.append({
"name": function.get("name", ""),
"arguments": function.get("arguments", "{}"),
})
block += "\n Compatibility tool requests:\n" + json.dumps(simplified_calls, ensure_ascii=False, indent=2)
history_parts.append(block)
continue
if role == "tool":
tool_call_id = str(message.get("tool_call_id", "") or "")
history_parts.append(_history_line(f"Tool result (tool_call_id={tool_call_id})", text))
continue
history_parts.append(_history_line(role.capitalize(), text))
prompt_parts: list[str] = []
if tools:
simplified_tools = _simplify_tool_schemas(tools)
prompt_parts.append("Available compatibility tools:\n" + json.dumps(simplified_tools, ensure_ascii=False, indent=2))
prompt_parts.append("Do not execute these compatibility tools yourself. If you need one, request it in `tool_calls`. Each `arguments` field must be a JSON string.")
if tool_choice == "required":
prompt_parts.append("Tool choice policy: you must request at least one compatibility tool.")
elif isinstance(tool_choice, dict) and tool_choice.get("type") == "function":
function = tool_choice.get("function", {}) or {}
prompt_parts.append(f"Tool choice policy: you must request the compatibility tool `{function.get('name', '')}`.")
history_text = "\n".join(part for part in history_parts if part).strip()
if history_text:
prompt_parts.append("History:\n" + history_text)
if structured_output:
prompt_parts.append("Return only JSON matching the provided schema.")
if tools:
prompt_parts.append("Set `content` to the assistant-visible reply. Set `tool_calls` to an empty array when no compatibility tool is needed.")
else:
prompt_parts.append("Answer the latest user request.")
return "\n\n".join(part for part in system_parts if part).strip(), "\n\n".join(prompt_parts), attachments
def _copy_attachments_to_temp(attachments: list[dict[str, Any]], temp_dir: str) -> list[dict[str, str]]:
copied: list[dict[str, str]] = []
for index, attachment in enumerate(attachments, 1):
source_path = attachment.get("path")
if source_path:
source_path = str(source_path)
source_suffix = os.path.splitext(source_path)[1]
target_path = os.path.join(temp_dir, f"image_{index}{source_suffix or '.bin'}")
shutil.copyfile(source_path, target_path)
copied.append({"path": target_path, "label": str(attachment.get("label", ""))})
continue
mime = str(attachment.get("mime", "image/png"))
suffix = mimetypes.guess_extension(mime) or ".png"
target_path = os.path.join(temp_dir, f"image_{index}{suffix}")
with open(target_path, "wb") as f:
f.write(attachment.get("bytes", b"") or b"")
copied.append({"path": target_path, "label": str(attachment.get("label", ""))})
return copied
def _append_attachment_instructions(prompt: str, copied_attachments: list[dict[str, str]]) -> str:
if not copied_attachments or not CLAUDE_ALLOW_ATTACHMENT_READ:
return prompt
lines = [
"Attached image files:",
*[f"- {item['label'] or f'Attached image {index}'}: {item['path']}" for index, item in enumerate(copied_attachments, 1)],
"If you need to inspect an attached image, you may use the built-in `Read` tool on those listed paths only. Do not use built-in tools for any other purpose.",
]
return prompt.rstrip() + "\n\n" + "\n".join(lines)
def _usage_from_result(result_event: dict[str, Any] | None) -> dict[str, int]:
usage = (result_event or {}).get("usage", {}) or {}
input_tokens = int(usage.get("input_tokens", 0) or 0)
output_tokens = int(usage.get("output_tokens", 0) or 0)
return {
"prompt_tokens": input_tokens,
"completion_tokens": output_tokens,
"total_tokens": input_tokens + output_tokens,
}
def _extract_result(event_stream: list[dict[str, Any]]) -> tuple[str, dict[str, Any] | None]:
result_event = None
for event in reversed(event_stream):
if event.get("type") == "result":
result_event = event
break
if result_event is None:
raise RuntimeError("Claude backend did not return a result event.")
content = result_event.get("result") or result_event.get("content") or ""
return str(content), result_event
def _check_claude_error(stderr_text: str, model: str) -> None:
lowered = stderr_text.lower()
if "invalid api key" in lowered or "authentication" in lowered or "login" in lowered:
raise RuntimeError("Claude CLI is not logged in. Run `claude auth login` (or start `claude` and use `/login`) first.")
if "unknown model" in lowered or "not available" in lowered or "invalid model" in lowered:
default_model = default_model_for_backend("claude")
raise RuntimeError(f"Claude backend tried to use model {model!r}, but your current Claude CLI/account rejected it. Try an available Claude model such as {default_model!r}.")
def _normalize_reasoning_effort(effort: str | None) -> str | None:
normalized = str(effort or "").strip().lower()
if not normalized or normalized == "off":
return None
if normalized in _VALID_EFFORTS:
return normalized
return None
def _assistant_message_schema() -> dict[str, Any]:
return {
"type": "object",
"properties": {
"content": {"type": "string"},
"tool_calls": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"arguments": {"type": "string"},
},
"required": ["name", "arguments"],
"additionalProperties": False,
},
},
},
"required": ["content", "tool_calls"],
"additionalProperties": False,
}
def _assistant_message_schema_wrapper() -> str:
return json.dumps(_assistant_message_schema(), ensure_ascii=False)
def _run_claude_print(*, system: str, prompt: str, model: str, tools: list[dict[str, Any]] | None, tool_choice: str | dict[str, Any] | None, return_message: bool, timeout: int | None, attachments: list[dict[str, Any]] | None = None) -> tuple[str, dict[str, Any], dict[str, int]]:
effort = _normalize_reasoning_effort(REASONING_EFFORT)
with tempfile.TemporaryDirectory(prefix="skillopt_claude_") as temp_dir:
copied_attachments = _copy_attachments_to_temp(attachments or [], temp_dir)
prompt_for_cli = _append_attachment_instructions(prompt, copied_attachments)
cmd = [CLAUDE_BIN, "-p", "--output-format", "json", "--permission-mode", CLAUDE_PERMISSION_MODE, "--add-dir", temp_dir]
if model:
cmd.extend(["--model", model])
if CLAUDE_SETTING_SOURCES:
cmd.extend(["--setting-sources", CLAUDE_SETTING_SOURCES])
if system:
# Write the system prompt to a file, not argv: here the skill being
# optimized IS the system prompt, and SkillOpt grows it over training,
# so past ~30 KB it would re-hit the Windows argv cap (WinError 206).
# The CLI reads it via --append-system-prompt-file.
system_path = os.path.join(temp_dir, "system_prompt.txt")
with open(system_path, "w", encoding="utf-8") as system_fh:
system_fh.write(system)
cmd.extend(["--append-system-prompt-file", system_path])
if effort:
cmd.extend(["--effort", effort])
structured_output = bool(return_message)
if structured_output:
cmd.extend(["--schema", _assistant_message_schema_wrapper()])
# Feed the prompt via stdin (and the system prompt via a file, above), not
# argv: on Windows the whole command line is capped at ~32 KB and large
# optimizer prompts / grown skills overflow it → [WinError 206]. Pin UTF-8
# so a zh-CN default codepage (cp936) can't raise UnicodeEncodeError on
# emoji / non-GBK glyphs before the CLI even starts.
proc = subprocess.run(cmd, input=prompt_for_cli, capture_output=True, text=True, encoding="utf-8", errors="replace", timeout=timeout or 300, cwd=temp_dir)
stderr_text = (proc.stderr or "").strip()
if proc.returncode != 0:
_check_claude_error(stderr_text, model)
raise RuntimeError(stderr_text or f"Claude CLI exited with code {proc.returncode}")
stream = []
for raw_line in (proc.stdout or "").splitlines():
raw_line = raw_line.strip()
if not raw_line:
continue
try:
stream.append(json.loads(raw_line))
except json.JSONDecodeError:
continue
raw_text, result_event = _extract_result(stream)
usage_info = _usage_from_result(result_event)
return raw_text, result_event or {}, usage_info
def _compat_message_from_payload(payload: Any) -> CompatAssistantMessage:
if not isinstance(payload, dict):
return CompatAssistantMessage(content=str(payload or ""), tool_calls=[])
content = str(payload.get("content", "") or "")
tool_calls: list[CompatToolCall] = []
for index, tool_call in enumerate(payload.get("tool_calls", []) or [], start=1):
name = str(tool_call.get("name", "") or "")
arguments = str(tool_call.get("arguments", "{}") or "{}")
tool_calls.append(CompatToolCall(id=f"claude_tool_{index}", function=CompatToolFunction(name=name, arguments=arguments)))
return CompatAssistantMessage(content=content, tool_calls=tool_calls)
def _call_messages(messages: list[dict[str, Any]], max_completion_tokens: int, retries: int, stage: str, *, tools: list[dict[str, Any]] | None = None, tool_choice: str | dict[str, Any] | None = None, return_message: bool = False, deployment: str | None = None, timeout: int | None = None) -> tuple[Any, dict[str, int]]:
del max_completion_tokens
system, prompt, attachments = _build_prompt_from_messages(messages, tools=tools, tool_choice=tool_choice, structured_output=return_message)
model = deployment or TARGET_DEPLOYMENT
last_err = None
for attempt in range(retries):
try:
raw_text, payload, usage_info = _run_claude_print(system=system, prompt=prompt, model=model, tools=tools, tool_choice=tool_choice, return_message=return_message, timeout=timeout, attachments=attachments)
tracker.record(stage, usage_info["prompt_tokens"], usage_info["completion_tokens"])
if return_message:
return _compat_message_from_payload(payload.get("result", payload)), usage_info
return raw_text, usage_info
except Exception as e: # noqa: BLE001
last_err = e
time.sleep(min(2 ** attempt, 15))
raise RuntimeError(f"Claude backend failed after {retries} retries: {last_err}")
def chat_optimizer(system: str, user: str, max_completion_tokens: int = 16384, retries: int = 5, stage: str = "optimizer", timeout: int | None = None) -> tuple[str, dict[str, int]]:
messages = [{"role": "system", "content": system}, {"role": "user", "content": user}]
return _call_messages(messages, max_completion_tokens, retries, stage, deployment=OPTIMIZER_DEPLOYMENT, timeout=timeout)
def chat_target(system: str, user: str, max_completion_tokens: int = 16384, retries: int = 5, stage: str = "target", timeout: int | None = None) -> tuple[str, dict[str, int]]:
messages = [{"role": "system", "content": system}, {"role": "user", "content": user}]
return _call_messages(messages, max_completion_tokens, retries, stage, deployment=TARGET_DEPLOYMENT, timeout=timeout)
def chat_with_deployment(deployment: str, system: str, user: str, max_completion_tokens: int = 16384, retries: int = 5, stage: str = "custom", timeout: int | None = None) -> tuple[str, dict[str, int]]:
messages = [{"role": "system", "content": system}, {"role": "user", "content": user}]
return _call_messages(messages, max_completion_tokens, retries, stage, deployment=deployment, timeout=timeout)
def chat_optimizer_messages(messages: list[dict[str, Any]], max_completion_tokens: int = 16384, retries: int = 5, stage: str = "optimizer", *, tools: list[dict[str, Any]] | None = None, tool_choice: str | dict[str, Any] | None = None, return_message: bool = False, timeout: int | None = None) -> tuple[Any, dict[str, int]]:
return _call_messages(messages, max_completion_tokens, retries, stage, tools=tools, tool_choice=tool_choice, return_message=return_message, deployment=OPTIMIZER_DEPLOYMENT, timeout=timeout)
def chat_target_messages(messages: list[dict[str, Any]], max_completion_tokens: int = 16384, retries: int = 5, stage: str = "target", *, tools: list[dict[str, Any]] | None = None, tool_choice: str | dict[str, Any] | None = None, return_message: bool = False, timeout: int | None = None) -> tuple[Any, dict[str, int]]:
return _call_messages(messages, max_completion_tokens, retries, stage, tools=tools, tool_choice=tool_choice, return_message=return_message, deployment=TARGET_DEPLOYMENT, timeout=timeout)
def chat_messages_with_deployment(deployment: str, messages: list[dict[str, Any]], max_completion_tokens: int = 16384, retries: int = 5, stage: str = "custom", *, tools: list[dict[str, Any]] | None = None, tool_choice: str | dict[str, Any] | None = None, return_message: bool = False, timeout: int | None = None) -> tuple[Any, dict[str, int]]:
return _call_messages(messages, max_completion_tokens, retries, stage, tools=tools, tool_choice=tool_choice, return_message=return_message, deployment=deployment, timeout=timeout)
def get_token_summary() -> dict[str, dict[str, int]]:
return tracker.summary()
def reset_token_tracker() -> None:
tracker.reset()
def set_reasoning_effort(effort: str | None) -> None:
global REASONING_EFFORT
REASONING_EFFORT = effort if effort else None
def set_target_deployment(deployment: str) -> None:
global TARGET_DEPLOYMENT
TARGET_DEPLOYMENT = deployment or default_model_for_backend("claude")
os.environ["TARGET_DEPLOYMENT"] = TARGET_DEPLOYMENT
def set_optimizer_deployment(deployment: str) -> None:
global OPTIMIZER_DEPLOYMENT
OPTIMIZER_DEPLOYMENT = deployment or default_model_for_backend("claude")
os.environ["OPTIMIZER_DEPLOYMENT"] = OPTIMIZER_DEPLOYMENT