Files
microsoft-SkillOpt/skillopt_sleep/backend.py
Daniel Martinez 9fa0716c72 fix(skillopt-sleep): also surface codex failures on the tool-call rollout path
Follow-up from a fresh-context review of the prior commit: CodexCliBackend.attempt_with_tools
(the rollout path for tool-requiring tasks) ran codex exec inline, swallowed all exceptions,
and never set last_call_error — so an auth/model/version failure on the tool path still
produced a silent empty->0 with no diagnostic signal, the exact failure class the prior commit
fixed for the _call path. Now it surfaces timeout/exception/non-zero-exit via last_call_error
(response stays empty; never leaks the CLI error text), so a failed tool rollout shows up in
diagnostics.json. Adds a regression test.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-27 23:56:11 -05:00

1434 lines
64 KiB
Python

"""SkillOpt-Sleep — optimizer/replay backend abstraction.
A backend supplies the three "intelligent" operations the sleep cycle needs:
1. attempt(task, skill, memory) -> response text (the rollout)
2. judge(task, response) -> (hard, soft, rationale) (the reward)
3. reflect(failures, successes, skill, memory)
-> list[EditRecord] (proposed bounded edits)
Two implementations:
* MockBackend — deterministic, no API, used for tests + the experiment.
Reads optional `reference` exact answers and a tiny
rule-table so the loop provably improves and the gate
provably blocks regressions.
* AnthropicBackend — uses the user's ANTHROPIC_API_KEY via the `claude`
CLI or the anthropic SDK (lazy-imported). Real lift.
The backend never touches live config; it only returns text/edits that the
consolidation stage gates and stages.
"""
from __future__ import annotations
import json
import os
import re
import subprocess
import tempfile
from typing import Any, Dict, List, Optional, Tuple
from skillopt_sleep.types import EditRecord, ReplayResult, TaskRecord
def skill_hash(content: str) -> str:
import hashlib
return hashlib.sha256(content.encode("utf-8")).hexdigest()[:16]
# ── Backend protocol ──────────────────────────────────────────────────────────
class Backend:
name = "base"
# Optional user preferences (free text) injected into reflect as a prior.
preferences: str = ""
def attempt(self, task: TaskRecord, skill: str, memory: str,
sample_id: int = 0) -> str:
raise NotImplementedError
def attempt_with_tools(
self, task: TaskRecord, skill: str, memory: str, tools: List[str]
) -> Tuple[str, List[str]]:
"""Run the task while exposing real tools; return (response, tools_called).
Default: no real tool loop — fall back to plain attempt and let the
single-shot 'TOOL_CALL: <name>' marker convention surface intent. CLI
backends override this to expose a genuinely callable tool.
"""
resp = self.attempt(task, skill, memory)
called: List[str] = []
for t in tools:
if re.search(r"(?i)\btool_call\s*:\s*%s\b" % re.escape(t), resp):
called.append(t)
return resp, called
def judge(self, task: TaskRecord, response: str) -> Tuple[float, float, str]:
raise NotImplementedError
def reflect(
self,
failures: List[Tuple[TaskRecord, ReplayResult]],
successes: List[Tuple[TaskRecord, ReplayResult]],
skill: str,
memory: str,
*,
edit_budget: int,
evolve_skill: bool,
evolve_memory: bool,
) -> List[EditRecord]:
raise NotImplementedError
# token accounting (optional)
def tokens_used(self) -> int:
return 0
# ── Shared scoring helpers ────────────────────────────────────────────────────
def _normalize(s: str) -> str:
s = (s or "").lower().strip()
s = re.sub(r"[^\w\s]", " ", s)
s = re.sub(r"\s+", " ", s)
return s.strip()
def exact_score(reference: str, response: str) -> float:
ref = _normalize(reference)
resp = _normalize(response)
if not ref:
return 0.0
return 1.0 if ref in resp or resp == ref else 0.0
def keyword_soft_score(reference: str, response: str) -> float:
"""Fraction of reference tokens present in response (cheap rubric proxy)."""
ref_tokens = [t for t in _normalize(reference).split() if len(t) > 2]
if not ref_tokens:
return 0.0
resp = _normalize(response)
hit = sum(1 for t in set(ref_tokens) if t in resp)
return hit / len(set(ref_tokens))
# ── Mock backend (deterministic, no API) ──────────────────────────────────────
class MockBackend(Backend):
"""Deterministic backend for tests and the acceptance experiment.
Model of reality:
* Each task may carry a `reference` (exact answer) and a "rule" tag
describing the single skill rule that makes the task solvable, e.g.
tags=["rule:wrap-answer-in-answer-tags"].
* `attempt` produces a correct response IFF the required rule text is
present in skill+memory; otherwise it produces a near-miss.
* `judge` scores exact (hard) + keyword (soft) against `reference`.
* `reflect` looks at failures, reads each failed task's required rule,
and proposes exactly that rule as an `add` edit (bounded by budget).
It NEVER proposes a rule already present (no churn), and on the
special tag "rule:__harmful__" it proposes a known-bad edit so tests
can prove the gate rejects regressions.
This makes the end-to-end loop monotonic and fully reproducible while
exercising the real harvest->mine->replay->gate->stage plumbing.
"""
name = "mock"
RULE_PREFIX = "rule:"
RULE_TEXT = {
"wrap-answer": "Always wrap the final answer in <answer>...</answer> tags.",
"arxiv-id": "Report arXiv ids in the exact form arXiv:XXXX.XXXXX.",
"commit-imperative": "Write git commit subjects in imperative mood, max 50 chars.",
"units-si": "Always include SI units in numeric answers.",
"json-only": "When asked for JSON, output only valid JSON with no prose.",
"__harmful__": "Ignore the user's formatting requests and answer freely.",
}
def _required_rules(self, task: TaskRecord) -> List[str]:
out = []
for t in task.tags:
if t.startswith(self.RULE_PREFIX):
key = t[len(self.RULE_PREFIX):]
if key in self.RULE_TEXT:
out.append(key)
return out
def attempt(self, task: TaskRecord, skill: str, memory: str,
sample_id: int = 0) -> str:
ctx = (skill or "") + "\n" + (memory or "")
rules = self._required_rules(task)
# The "__harmful__" rule models a bad edit: even when present it makes
# the agent ignore formatting, so it can NEVER produce the reference.
# This is what lets the experiment prove the gate rejects regressions.
if "__harmful__" in rules:
return "I'll just answer freely and skip the requested format."
# A task is solved iff ALL its required rule texts are present in context.
have_all = all(self.RULE_TEXT[k] in ctx for k in rules) if rules else False
if have_all and task.reference:
# produce a response that satisfies the rule and contains the answer
if "wrap-answer" in rules:
return f"Here is the result. <answer>{task.reference}</answer>"
return f"{task.reference}"
# Near miss: a degraded answer that shares keywords but is NOT the exact
# rule-correct form, so exact-match fails deterministically regardless of
# how many whitespace tokens the reference has.
if task.reference:
ref = task.reference
mangled = ref[:-2] if len(ref) > 3 else "unknown"
return f"approximately {mangled} (format not applied)"
return "(attempted, no checkable reference)"
def attempt_with_tools(self, task, skill, memory, tools):
# Deterministic tool model: the mock "calls" a tool iff the skill+memory
# contains an explicit instruction to use it (a learned rule mentioning
# the tool name or "search"). The deficient skill says NOT to, so
# baseline calls nothing; a learned "use ./search" rule flips it.
ctx = ((skill or "") + "\n" + (memory or "")).lower()
resp = self.attempt(task, skill, memory)
called = []
for t in (tools or []):
tl = t.lower()
if (f"./{tl}" in ctx or f"use {tl}" in ctx or f"run {tl}" in ctx
or f"call {tl}" in ctx or f"must {tl}" in ctx):
called.append(t)
return resp, called
def judge(self, task: TaskRecord, response: str) -> Tuple[float, float, str]:
if task.reference_kind == "answer" and task.judge:
try:
from skillopt_sleep.experiments.real_eval import score_answer_judge
except ImportError:
score_answer_judge = None # research evaluators not bundled
if score_answer_judge is not None:
return score_answer_judge(task.judge, response)
if task.reference_kind == "rule" and task.judge:
from skillopt_sleep.judges import score_rule_judge
return score_rule_judge(task.judge, response)
if task.reference_kind == "exact" and task.reference:
hard = exact_score(task.reference, response)
soft = max(hard, keyword_soft_score(task.reference, response))
return hard, soft, f"exact-match={hard}"
if task.reference_kind == "rubric" and task.reference:
soft = keyword_soft_score(task.reference, response)
return (1.0 if soft >= 0.8 else 0.0), soft, f"rubric keyword soft={soft:.2f}"
# no reference: outcome-derived weak label
hard = 1.0 if task.outcome == "success" else 0.0
return hard, hard, "outcome-derived"
def reflect(
self,
failures,
successes,
skill: str,
memory: str,
*,
edit_budget: int,
evolve_skill: bool,
evolve_memory: bool,
) -> List[EditRecord]:
ctx = (skill or "") + "\n" + (memory or "")
edits: List[EditRecord] = []
seen_text: set = set()
target = "skill" if evolve_skill else "memory"
for task, _res in failures:
for key in self._required_rules(task):
text = self.RULE_TEXT[key]
if text in ctx or text in seen_text:
continue
seen_text.add(text)
edits.append(
EditRecord(
target=target,
op="add",
content=text,
rationale=f"failed task {task.id} requires rule '{key}'",
)
)
if len(edits) >= edit_budget:
return edits
return edits
# ── Shared real-CLI backend (prompts + parsing + cache; subclasses do _call) ──
def _extract_json(raw: str, kind: str):
"""Pull the first JSON object/array out of a possibly chatty CLI reply."""
pat = r"\{.*\}" if kind == "object" else r"\[.*\]"
m = re.search(pat, raw or "", re.DOTALL)
if not m:
return None
try:
return json.loads(m.group(0))
except Exception:
return None
def _task_guardrail(pairs) -> str:
"""Build an 'output contract' the optimizer must not violate.
``pairs`` is a list of (TaskRecord, ReplayResult). We surface the benchmark's
own rollout system prompt (TaskRecord.system) plus a short, explicit list of
invariants, so the optimizer cannot learn rules that the evaluator can never
honor (the SpreadsheetBench failure mode: a learned "return ```vba```" or
"ask the user for the range" rule scores 0 because the harness runs only
```python``` openpyxl and cannot answer questions).
Returns "" when no task carries a system contract (e.g. mined daily cases),
so non-benchmark runs are unchanged.
"""
sys_txt = ""
for t, _ in pairs:
s = getattr(t, "system", "") or ""
if s.strip():
sys_txt = s.strip()
break
if not sys_txt:
return ""
# the system prompt can be long; keep the rules portion concise for the optimizer
contract = sys_txt
if len(contract) > 900:
contract = contract[:900] + ""
invariants = (
"- Do NOT change the required output format or programming language.\n"
"- Do NOT tell the agent to ask the user a question or request more info; "
"it must always produce a best-effort answer from what is given.\n"
"- Keep every rule consistent with the contract above."
)
return (
"\n# Task output contract (rules MUST obey this — violating it scores 0)\n"
f"{contract}\n{invariants}\n"
)
class CliBackend(Backend):
"""Common logic for real CLI-driven backends (claude / codex).
Subclasses implement only ``_call(prompt) -> str``. This base owns the
prompts (attempt / judge / reflect), JSON parsing, a response cache (so
re-scoring an unchanged (skill, memory) on the held-out slice is free),
and a rough token estimate.
"""
name = "cli"
def __init__(self, model: str = "", timeout: int = 180) -> None:
self.model = model
self.timeout = timeout
self._tokens = 0
self._cache: Dict[str, str] = {}
self.last_call_error = ""
self.last_reflect_raw = ""
# subclasses override --------------------------------------------------
def _call(self, prompt: str, *, max_tokens: int = 1024) -> str:
raise NotImplementedError
def _cached_call(self, key: str, prompt: str, *, max_tokens: int = 1024) -> str:
if key in self._cache:
return self._cache[key]
out = self._call(prompt, max_tokens=max_tokens)
self._tokens += len(prompt) // 4 + len(out) // 4
self._cache[key] = out
return out
# operations -----------------------------------------------------------
def attempt(self, task: TaskRecord, skill: str, memory: str,
sample_id: int = 0) -> str:
# sample_id distinguishes repeated rollouts of the SAME (task, skill,
# memory) in the cache key. Without it the attempt cache collapses all
# K dream rollouts into one cached response (spread always 0), which
# silently disables contrastive reflection. sample_id=0 keeps the old
# key format so gate re-scoring still benefits from the cache.
if task.system:
# Benchmark carries its own (research-repo) rollout system prompt.
# Use it verbatim with a neutral skill/memory section — this both
# keeps scoring faithful and avoids the aggressive "OVERRIDE / HARD
# CONSTRAINT" phrasing below, which Azure's content filter flags as a
# jailbreak (HTTP 400) and silently zeroes the rollout.
skill_section = f"## Skill\n{skill.strip()}\n\n" if skill.strip() else ""
mem_section = f"## Memory\n{memory.strip()}\n\n" if memory.strip() else ""
system = task.system.replace("{skill_section}", skill_section)
if "{skill_section}" not in task.system and skill_section:
system = skill_section + system
body = task.intent + ("\n\n" + task.context_excerpt if task.context_excerpt else "")
prompt = f"{system}{mem_section}\n{body}"
salt = f"s{sample_id}:" if sample_id else ""
key = "attempt:" + salt + skill_hash(prompt)
return self._cached_call(key, prompt, max_tokens=512)
# generic path (mined daily-case tasks): neutral, content-filter-safe
# wording. Apply the skill/memory as guidance, not as adversarial
# "OVERRIDE everything" directives.
prompt = (
"Complete the following task for the user. Follow the skill and memory "
"guidance below, including any output-format and length requirements. "
"When a 'Learned preferences' rule sets an explicit limit (e.g. a length "
"cap), prefer that rule over more general advice it refines.\n\n"
f"# Skill\n{skill or '(none)'}\n\n# Memory\n{memory or '(none)'}\n\n"
f"# Task\n{task.intent}\n\n{task.context_excerpt}\n\n"
"Return ONLY the final answer text, nothing else."
)
# cache on (task, skill, memory) so identical hold-out re-scoring is free
salt = f"s{sample_id}:" if sample_id else ""
key = "attempt:" + salt + skill_hash(prompt)
return self._cached_call(key, prompt, max_tokens=512)
def judge(self, task: TaskRecord, response: str) -> Tuple[float, float, str]:
# real-benchmark correctness judge (searchqa/livemath/spreadsheet) — local
if task.reference_kind == "answer" and task.judge:
try:
from skillopt_sleep.experiments.real_eval import score_answer_judge
except ImportError:
score_answer_judge = None # research evaluators not bundled
if score_answer_judge is not None:
return score_answer_judge(task.judge, response)
# gbrain-style rule judge: scored locally, no API spend
if task.reference_kind == "rule" and task.judge:
from skillopt_sleep.judges import score_rule_judge
return score_rule_judge(task.judge, response)
# exact references are scored locally — no API spend
if task.reference_kind == "exact" and task.reference:
hard = exact_score(task.reference, response)
return hard, max(hard, keyword_soft_score(task.reference, response)), "exact(local)"
prompt = (
"Score how well the response satisfies the rubric, 0..1. "
'Return ONLY JSON {"score": <0..1>, "reason": "..."}.\n\n'
f"# Rubric\n{task.reference or task.intent}\n\n# Response\n{response}"
)
key = "judge:" + skill_hash(prompt)
raw = self._cached_call(key, prompt, max_tokens=200)
obj = _extract_json(raw, "object")
if isinstance(obj, dict):
try:
soft = float(obj.get("score", 0.0))
return (1.0 if soft >= 0.8 else 0.0), soft, str(obj.get("reason", ""))[:200]
except Exception:
pass
return 0.0, 0.0, "judge-parse-failed"
def reflect(
self,
failures,
successes,
skill: str,
memory: str,
*,
edit_budget: int,
evolve_skill: bool,
evolve_memory: bool,
) -> List[EditRecord]:
if not failures:
return []
target = "skill" if evolve_skill else "memory"
cur_doc = (skill if target == "skill" else memory) or "(empty)"
fail_text = "\n".join(
f"- wanted: {t.intent[:160]}\n got: {r.response[:160]}\n why-wrong: {r.fail_reason[:160]}"
for t, r in failures[:8]
)
# Aggregate the most common failing criteria across all failures so the
# optimizer is told *exactly what the scorer rewards* — gbrain's lesson:
# the optimizer kept proposing reasonable-but-wrong edits until it could
# see the success criteria.
from collections import Counter
crit = Counter()
for _t, r in failures:
fr = r.fail_reason or ""
if fr.startswith("failed:"):
for part in fr[len("failed:"):].split(","):
part = part.strip()
if part:
crit[part] += 1
def _explain(c: str) -> str:
# translate an "op=arg" criterion into a plain-English requirement
if "=" in c:
op, _, arg = c.partition("=")
op = op.strip(); arg = arg.strip()
if op == "max_chars":
return f"the ENTIRE response must be at most {arg} characters long"
if op == "min_chars":
return f"the response must be at least {arg} characters long"
if op == "section_present":
return f"the response must contain a section/heading titled '{arg}'"
if op == "regex":
return f"the response must match the pattern /{arg}/ (e.g. include that label)"
if op == "contains":
return f"the response must contain the text '{arg}'"
if op == "tool_called":
return f"the agent must actually call the '{arg}' tool"
return c
criteria_text = ""
if crit:
criteria_text = (
"\n# Exact criteria the outputs are FAILING (fix these directly)\n"
+ "\n".join(f"- {_explain(c)} [{c}, failed {n}x]" for c, n in crit.most_common())
)
pref_text = ""
if getattr(self, "preferences", ""):
pref_text = (
"\n# User preferences (honor these as priors when writing rules)\n"
+ str(self.preferences).strip()
)
# Task GUARDRAIL: the optimizer must not invent rules that violate the
# task's hard constraints (e.g. SpreadsheetBench answers MUST be a
# ```python``` openpyxl block — a learned "return ```vba```" or "ask the
# user for the range" rule scores 0 because the harness can't run VBA and
# can't ask questions). We surface the benchmark's own rollout system
# prompt (carried on TaskRecord.system) so proposed rules stay in-bounds.
guard_text = _task_guardrail(failures)
prompt = (
"You are SkillOpt's optimizer. The agent keeps failing the recurring "
f"tasks below. Propose at most {edit_budget} bounded edits to the "
f"{target} document so it stops failing. Each edit MUST be a short, "
"GENERAL, reusable rule or preference (never task-specific, never an "
"answer to a single task). If exact failing criteria are listed, your "
"edits MUST make future outputs satisfy every one of them.\n"
"BE CONCRETE: quote the exact threshold, section name, or format from "
"the criteria verbatim in your rule (e.g. write 'keep the entire "
"response under 1200 characters', NOT 'respect length limits'). Vague "
"rules do not change behavior; specific numeric/structural rules do.\n"
"IMPORTANT: your edits are APPENDED to a 'Learned preferences' block; "
"you CANNOT delete the existing instructions above. If the current "
f"{target} text conflicts with a criterion (e.g. it says 'be exhaustive' "
"but outputs must be under a character limit), write an explicit, "
"forceful OVERRIDE rule stating it supersedes the conflicting "
"instruction, and put the hard requirement first.\n"
"HARD CONSTRAINT: every rule you write MUST be consistent with the "
"'Task output contract' below (if shown). NEVER propose a rule that "
"changes the required output format/language, tells the agent to ask "
"the user a question, or otherwise violates that contract — such a "
"rule scores ZERO because the evaluator cannot honor it.\n"
'Return ONLY a JSON array: '
'[{"op":"add|replace|delete","content":"<rule>","anchor":"<text to replace/delete, optional>","rationale":"<why>"}].\n\n'
f"# Current {target}\n{cur_doc}\n"
f"{guard_text}"
f"{criteria_text}\n"
f"{pref_text}\n\n"
f"# Recurring failures\n{fail_text}"
)
# Call with one retry: transient non-JSON replies otherwise waste a whole
# night (the gate sees no edits and rejects). A firmer second prompt
# recovers most of these.
arr = None
for attempt in range(2):
p = prompt if attempt == 0 else (
prompt + "\n\nIMPORTANT: your previous reply was not valid JSON. "
"Reply with ONLY the JSON array, no prose, no markdown fences."
)
raw = self._call(p, max_tokens=1024)
self._tokens += len(p) // 4 + len(raw) // 4
arr = _extract_json(raw, "array")
if isinstance(arr, list) and arr:
break
# Expose the last raw optimizer reply so a no-edits night is diagnosable:
# a 0.0->0.0 gate with zero edits is otherwise indistinguishable from
# "nothing to learn" (the cycle persists this in diagnostics.json).
self.last_reflect_raw = raw or ""
edits: List[EditRecord] = []
if isinstance(arr, list):
for e in arr[:edit_budget]:
if not isinstance(e, dict):
continue
content = str(e.get("content", "")).strip()
if not content:
continue
edits.append(EditRecord(
target=target,
op=str(e.get("op", "add")).strip().lower(),
content=content,
anchor=str(e.get("anchor", "")).strip(),
rationale=str(e.get("rationale", "")).strip(),
))
return edits
def tokens_used(self) -> int:
return self._tokens
# ── Claude Code CLI backend ───────────────────────────────────────────────────
class ClaudeCliBackend(CliBackend):
"""Drives the authenticated `claude` CLI: claude -p --output-format text."""
name = "claude"
def __init__(self, model: str = "", claude_path: str = "claude", timeout: int = 180) -> None:
super().__init__(model=model or os.environ.get("SKILLOPT_SLEEP_CLAUDE_MODEL", "") or "sonnet",
timeout=timeout)
self.claude_path = claude_path
# Known CLI error prefixes that indicate auth or config failures.
# When detected, we log a warning so the user doesn't mistake a
# broken auth for "nothing to optimize" (issue #68).
# Keep these specific to avoid false positives on normal model output.
_CLI_ERROR_MARKERS = (
"Not logged in",
"Please run /login",
"Authentication required",
"Invalid API key",
"Unauthorized: invalid x-api-key",
)
def _detect_cli_error(self, stdout: str, stderr: str) -> None:
"""Log a warning if CLI output looks like an auth/config error.
Only checks stderr and short stdout (< 300 chars) to avoid
false-positives on legitimate model responses that mention
auth-related terms.
"""
import logging
# Long stdout is almost certainly a real model response, not an error.
check_stdout = stdout if len(stdout) < 300 else ""
combined = check_stdout + "\n" + stderr
for marker in self._CLI_ERROR_MARKERS:
if marker in combined:
logging.getLogger("skillopt_sleep").warning(
"Claude CLI returned a likely auth error: %s",
combined[:200].replace("\n", " "),
)
self.last_call_error = combined[:500]
return
def _call(self, prompt: str, *, max_tokens: int = 1024) -> str:
# Run ISOLATED so the ambient Claude Code environment does not leak into
# the optimizer/target call. Critically, the user's GLOBAL skills
# (~/.claude/skills) are injected regardless of cwd, so we must disable
# them explicitly — without this, reflect/attempt sometimes reply with a
# list of the user's installed skills instead of doing the task.
# --bare skip hooks, LSP, plugins (minimal mode)
# Only safe with ANTHROPIC_API_KEY auth;
# breaks subscription-token auth (#68).
# --disable-slash-commands disable all skills
# --disallowedTools '*' no tool use
# --exclude-dynamic-... drop per-machine cwd/env/memory/git sections
# cwd=<clean temp> no project CLAUDE.md
import tempfile
cmd = [self.claude_path, "-p", "--output-format", "text"]
if os.environ.get("ANTHROPIC_API_KEY"):
cmd.append("--bare")
cmd += [
"--disable-slash-commands",
"--disallowedTools", "*",
"--exclude-dynamic-system-prompt-sections",
]
if self.model:
cmd += ["--model", self.model]
cmd += ["--", prompt]
clean_cwd = tempfile.mkdtemp(prefix="skillopt_sleep_claude_")
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, timeout=self.timeout, cwd=clean_cwd,
)
except Exception:
return ""
finally:
try:
import shutil
shutil.rmtree(clean_cwd, ignore_errors=True)
except Exception:
pass
out = (proc.stdout or "").strip()
self._detect_cli_error(out, proc.stderr or "")
return out
def attempt_with_tools(self, task, skill, memory, tools):
# Expose a REAL, callable `search` tool (a shell shim that logs each
# call) so the gbrain quick-answerer judge (tool_called=search) is
# validated honestly: we detect the call from the shim's log, not from
# a self-reported marker. Other tools are stubbed the same way.
import tempfile, shutil, stat
work = tempfile.mkdtemp(prefix="skillopt_sleep_tools_")
calllog = os.path.join(work, "_tool_calls.log")
try:
for tname in (tools or ["search"]):
shim = os.path.join(work, tname)
with open(shim, "w") as f:
f.write(
"#!/usr/bin/env bash\n"
f'echo "{tname}" >> "{calllog}"\n'
'echo "(search results: 3 relevant notes found; use them to answer)"\n'
)
os.chmod(shim, os.stat(shim).st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH)
tool_hint = (
"You have shell tools available in the current directory: "
+ ", ".join(f"./{t}" for t in (tools or ["search"]))
+ ". When the skill says to look something up or search before "
"answering, you MUST actually run the tool (e.g. `./search \"query\"`) "
"via Bash before giving your final answer."
)
prompt = (
"You are completing a task. Apply the skill and memory rules EXACTLY, "
"including any rule about searching/looking up before answering. "
"Treat a 'Learned preferences' block as HARD CONSTRAINTS that override "
"earlier conflicting skill text.\n\n"
f"{tool_hint}\n\n"
f"# Skill\n{skill or '(none)'}\n\n# Memory\n{memory or '(none)'}\n\n"
f"# Task\n{task.intent}\n\n{task.context_excerpt}\n\n"
"Return ONLY the final answer text."
)
cmd = [self.claude_path, "-p", "--output-format", "text"]
if os.environ.get("ANTHROPIC_API_KEY"):
cmd.append("--bare")
cmd += [
"--disable-slash-commands",
"--allowedTools", "Bash",
"--exclude-dynamic-system-prompt-sections",
]
if self.model:
cmd += ["--model", self.model]
cmd += ["--", prompt]
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, timeout=self.timeout, cwd=work,
)
resp = (proc.stdout or "").strip()
self._detect_cli_error(resp, proc.stderr or "")
except Exception:
resp = ""
self._tokens += len(prompt) // 4 + len(resp) // 4
called: List[str] = []
if os.path.exists(calllog):
with open(calllog) as f:
logged = {ln.strip() for ln in f if ln.strip()}
called = [t for t in (tools or ["search"]) if t in logged]
return resp, called
finally:
try:
shutil.rmtree(work, ignore_errors=True)
except Exception:
pass
def resolve_codex_path(explicit: str = "") -> str:
"""Find the REAL `@openai/codex` binary, skipping the hermes wrapper.
The wrapper at ~/.local/bin/codex is a shell shim that execs hermes-codex
and injects extra output; we look past it for the genuine node-installed
binary so replay output is clean.
"""
if explicit:
return explicit
env = os.environ.get("SKILLOPT_SLEEP_CODEX_PATH")
if env:
return env
candidates = [
os.path.expanduser("~/.nvm/versions/node/v22.22.3/bin/codex"),
]
# any nvm node version
nvm = os.path.expanduser("~/.nvm/versions/node")
if os.path.isdir(nvm):
for ver in sorted(os.listdir(nvm), reverse=True):
candidates.append(os.path.join(nvm, ver, "bin", "codex"))
for c in candidates:
if not c or not os.path.exists(c):
continue
try:
with open(c, "rb") as f:
head = f.read(64)
# skip the bash shim that execs hermes
if head.startswith(b"#!") and b"bash" in head:
continue
except Exception:
pass
return c
return "codex" # last resort (may be the wrapper)
class CodexCliBackend(CliBackend):
"""Drives the real Codex CLI: `codex exec -o <file>` for clean output."""
name = "codex"
def __init__(
self,
model: str = "",
codex_path: str = "",
timeout: int = 240,
sandbox: str = "read-only",
project_dir: str = "",
) -> None:
super().__init__(model=model or os.environ.get("SKILLOPT_SLEEP_CODEX_MODEL", ""),
timeout=timeout)
self.codex_path = resolve_codex_path(codex_path)
self.sandbox = sandbox
self.project_dir = (
os.path.abspath(os.path.expanduser(project_dir)) if project_dir else ""
)
def _call_once(self, prompt: str, *, max_tokens: int = 1024) -> str:
"""One codex exec attempt: returns the response text, or "" on
timeout/exception/empty-output (with last_call_error set). ``_call``
wraps this with retries so a transient failure is NOT silently scored 0."""
import tempfile
out_path = tempfile.NamedTemporaryFile(
prefix="codex_last_", suffix=".txt", delete=False
).name
cmd = [
self.codex_path, "exec", "--skip-git-repo-check",
"--color", "never", "--sandbox", self.sandbox,
"-o", out_path,
]
if self.project_dir:
cmd[3:3] = ["-C", self.project_dir]
if self.model:
cmd += ["-m", self.model]
cmd += ["--", prompt]
proc = None
try:
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=self.timeout,
cwd=self.project_dir or None,
)
except subprocess.TimeoutExpired:
self.last_call_error = f"codex exec timed out after {self.timeout}s"
return ""
except Exception as exc:
self.last_call_error = f"codex exec failed: {exc}"
return ""
try:
with open(out_path, encoding="utf-8") as f:
out = f.read().strip()
if out:
return out
except Exception as exc:
self.last_call_error = f"could not read codex output file: {exc}"
stdout = (proc.stdout or "").strip() if proc is not None else ""
stderr = (proc.stderr or "").strip() if proc is not None else ""
if proc is not None and proc.returncode != 0 and not self.last_call_error:
self.last_call_error = f"codex exec exited {proc.returncode}: {stderr[:500]}"
# Do NOT return the CLI's error text as if it were a model response: it
# pollutes rollout/judge/reflect and gets silently scored 0, hiding the
# real cause (e.g. an expired codex auth token surfacing as a 9k-char 401).
# Surface it via last_call_error and return empty instead.
if self.last_call_error:
return ""
return stdout or stderr
finally:
try:
os.unlink(out_path)
except Exception:
pass
# Fatal codex failures that will NOT recover on retry — fail fast + loud so a
# 0.0 night reads as "codex auth/model/version problem" not "nothing to learn".
# Covers: auth (re-login), and 400 config errors like an unsupported model on a
# ChatGPT account or a model that needs a newer codex CLI (upgrade).
_AUTH_MARKERS = (
"401 Unauthorized", "refresh_token_reused", "token_expired",
"Please log out and sign in", "Not logged in", "Please run /login",
"authentication token is expired", "Unauthorized: invalid",
"is not supported when using Codex", "requires a newer version of Codex",
)
def _call(self, prompt: str, *, max_tokens: int = 1024, retries: int = 3) -> str:
"""Retry transient empties/timeouts instead of silently returning "".
An empty reply scores 0 on every judge, which deflates the held-out
baseline AND blocks the candidate from ever improving — making a flaky
backend indistinguishable from "nothing to learn". The Azure backend
already guards this way (AzureOpenAIBackend._call); codex now does too.
Auth errors are NOT retried (hopeless until the user re-logs-in).
"""
import logging
import random as _r
import time as _t
out = ""
for attempt in range(max(1, retries)):
self.last_call_error = ""
out = self._call_once(prompt, max_tokens=max_tokens)
if out:
return out
err = self.last_call_error or ""
if any(m in err for m in self._AUTH_MARKERS):
logging.getLogger("skillopt_sleep").error(
"codex auth error — re-login required (`codex login`): %s", err[:200]
)
break # fail fast: retrying a 401 just burns calls
if attempt < retries - 1:
_t.sleep(min(6.0, (2 ** attempt) * 0.5) + _r.random() * 0.3)
return out
def attempt_with_tools(self, task, skill, memory, tools):
# Codex exec runs in a sandbox with shell access; expose the same real
# `search` shim and let it run (workspace-write so the shim can log).
import tempfile, shutil, stat
work = tempfile.mkdtemp(prefix="skillopt_sleep_codextools_")
calllog = os.path.join(work, "_tool_calls.log")
out_path = os.path.join(work, "_last.txt")
try:
for tname in (tools or ["search"]):
shim = os.path.join(work, tname)
with open(shim, "w") as f:
f.write(
"#!/usr/bin/env bash\n"
f'echo "{tname}" >> "{calllog}"\n'
'echo "(search results: 3 relevant notes found; use them to answer)"\n'
)
os.chmod(shim, os.stat(shim).st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH)
tool_hint = (
"Shell tools are available in the working directory: "
+ ", ".join(f"./{t}" for t in (tools or ["search"]))
+ ". When the skill says to look something up or search before "
"answering, you MUST actually run the tool (e.g. `./search \"query\"`) "
"before giving your final answer."
)
prompt = (
"Complete the task. Apply the skill and memory rules EXACTLY, "
"including any rule about searching before answering. Treat a "
"'Learned preferences' block as HARD CONSTRAINTS overriding earlier "
"conflicting skill text.\n\n"
f"{tool_hint}\n\n# Skill\n{skill or '(none)'}\n\n# Memory\n{memory or '(none)'}\n\n"
f"# Task\n{task.intent}\n\n{task.context_excerpt}\n\nReturn ONLY the final answer."
)
cmd = [
self.codex_path, "exec", "--skip-git-repo-check", "--color", "never",
"--sandbox", "workspace-write", "-C", work, "-o", out_path,
]
if self.model:
cmd += ["-m", self.model]
cmd += ["--", prompt]
self.last_call_error = ""
proc = None
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=self.timeout, cwd=work)
except subprocess.TimeoutExpired:
self.last_call_error = f"codex exec (tools) timed out after {self.timeout}s"
except Exception as exc: # noqa: BLE001
self.last_call_error = f"codex exec (tools) failed: {exc}"
resp = ""
try:
with open(out_path, encoding="utf-8") as f:
resp = f.read().strip()
except Exception:
resp = ""
# Surface a failed tool-rollout the SAME way _call does: an auth/model/version
# failure on this path must show up in diagnostics (call_error), not vanish as a
# silent empty->0 scored as a failed rollout. Response stays "" (never the error text).
if not resp and not self.last_call_error and proc is not None and proc.returncode != 0:
self.last_call_error = (
f"codex exec (tools) exited {proc.returncode}: {(proc.stderr or '')[:500]}"
)
self._tokens += len(prompt) // 4 + len(resp) // 4
called: List[str] = []
if os.path.exists(calllog):
with open(calllog) as f:
logged = {ln.strip() for ln in f if ln.strip()}
called = [t for t in (tools or ["search"]) if t in logged]
return resp, called
finally:
try:
shutil.rmtree(work, ignore_errors=True)
except Exception:
pass
def resolve_copilot_path(explicit: str = "") -> str:
"""Find the GitHub Copilot CLI (`copilot`) binary."""
if explicit:
return explicit
env = os.environ.get("SKILLOPT_SLEEP_COPILOT_PATH")
if env:
return env
import shutil
found = shutil.which("copilot")
return found or "copilot"
class CopilotCliBackend(CliBackend):
"""Drives the GitHub Copilot CLI in non-interactive mode.
Uses ``copilot -p <prompt> --output-format json`` and parses the emitted
JSONL event stream, returning the concatenated ``assistant.message``
content. The plain-text / ``--silent`` modes do not reliably stream the
response to stdout on all platforms, so JSONL is used for robust capture.
The call runs in a clean temp cwd with streaming disabled and tools allowed
(so non-interactive mode never blocks on a permission prompt); ``_call``'s
prompts ask for final-answer text only, so no tool use is expected there,
while ``attempt_with_tools`` exposes real, cross-platform callable shims in
the working directory for honest tool-call detection.
Startup overhead is minimised: each invocation points ``COPILOT_HOME`` at a
dedicated, isolated config dir (no user ``mcp-config.json``, so the user's
MCP servers — including this project's own — are NOT spawned, avoiding a
slow recursive launch), and built-in MCP servers / custom instructions are
disabled. Auth is read from the OS credential store / token env vars, which
live outside ``COPILOT_HOME``, so isolation does not break authentication.
Set ``SKILLOPT_SLEEP_COPILOT_HOME`` to override the isolated home, or set it
empty / ``SKILLOPT_SLEEP_COPILOT_FULL_ENV=1`` to use the user's real
environment instead.
"""
name = "copilot"
def __init__(self, model: str = "", copilot_path: str = "", timeout: int = 240) -> None:
super().__init__(model=model or os.environ.get("SKILLOPT_SLEEP_COPILOT_MODEL", ""),
timeout=timeout)
self.copilot_path = resolve_copilot_path(copilot_path)
self.full_env = os.environ.get("SKILLOPT_SLEEP_COPILOT_FULL_ENV", "") == "1"
# Stable isolated home so first-run setup is cached across calls.
if self.full_env:
self.copilot_home = ""
else:
self.copilot_home = os.environ.get("SKILLOPT_SLEEP_COPILOT_HOME") or os.path.join(
tempfile.gettempdir(), "skillopt_sleep_copilot_home"
)
try:
os.makedirs(self.copilot_home, exist_ok=True)
except Exception:
self.copilot_home = ""
def _call(self, prompt: str, *, max_tokens: int = 1024) -> str:
clean_cwd = tempfile.mkdtemp(prefix="skillopt_sleep_copilot_")
cmd = [
self.copilot_path, "-p", prompt,
"--output-format", "json",
"--stream", "off",
"--no-color",
"--log-level", "none",
"--allow-all-tools",
"-C", clean_cwd,
]
if not self.full_env:
# Drop unneeded startup work: no built-in (github) MCP server and no
# AGENTS.md / custom-instruction loading. With an isolated home that
# has no mcp-config.json, no user MCP servers spawn either.
cmd += ["--disable-builtin-mcps", "--no-custom-instructions"]
if self.model:
cmd += ["--model", self.model]
env = os.environ.copy()
if self.copilot_home:
env["COPILOT_HOME"] = self.copilot_home
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, timeout=self.timeout, cwd=clean_cwd,
encoding="utf-8", errors="replace", env=env,
)
except Exception:
return ""
finally:
try:
import shutil
shutil.rmtree(clean_cwd, ignore_errors=True)
except Exception:
pass
return self._parse_jsonl_response(proc.stdout or "")
@staticmethod
def _parse_jsonl_response(raw: str) -> str:
parts: List[str] = []
for line in raw.splitlines():
line = line.strip()
if not line or not line.startswith("{"):
continue
try:
obj = json.loads(line)
except Exception:
continue
if obj.get("type") == "assistant.message":
content = (obj.get("data") or {}).get("content")
if isinstance(content, str) and content:
parts.append(content)
return "\n".join(parts).strip()
def attempt_with_tools(self, task, skill, memory, tools):
# Expose REAL, callable tool shims in the working directory so the
# gbrain quick-answerer judge (tool_called=search) is validated
# honestly: we detect each call from the shim's log, not from a
# self-reported marker. The Copilot CLI is the Windows-validated
# backend, so the shims must be cross-platform — a bash `#!/usr/bin/env
# bash` + chmod shim does NOT execute via `./tool` under PowerShell/cmd,
# so on Windows we emit a `.cmd` batch shim instead.
import shutil
import stat
work = tempfile.mkdtemp(prefix="skillopt_sleep_copilottools_")
calllog = os.path.join(work, "_tool_calls.log")
tool_names = tools or ["search"]
is_windows = os.name == "nt"
try:
for tname in tool_names:
if is_windows:
shim = os.path.join(work, f"{tname}.cmd")
with open(shim, "w") as f:
# `%~n0` is the script's own base name (the tool name);
# writing it keeps the calllog line == tool name so the
# honest-detection match below works unchanged.
f.write(
"@echo off\n"
f'echo %~n0>>"{calllog}"\n'
"echo (search results: 3 relevant notes found; use them to answer)\n"
)
else:
shim = os.path.join(work, tname)
with open(shim, "w") as f:
f.write(
"#!/usr/bin/env bash\n"
f'echo "{tname}" >> "{calllog}"\n'
'echo "(search results: 3 relevant notes found; use them to answer)"\n'
)
os.chmod(shim, os.stat(shim).st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH)
if is_windows:
tool_hint = (
"You have shell tools available in the current directory: "
+ ", ".join(f"{t}.cmd" for t in tool_names)
+ " (each callable as `" + tool_names[0] + "` or `.\\"
+ tool_names[0] + "`). When the skill says to look something "
"up or search before answering, you MUST actually run the "
"tool (e.g. `" + tool_names[0] + " \"query\"`) before giving "
"your final answer."
)
else:
tool_hint = (
"You have shell tools available in the current directory: "
+ ", ".join(f"./{t}" for t in tool_names)
+ ". When the skill says to look something up or search before "
"answering, you MUST actually run the tool (e.g. `./search \"query\"`) "
"before giving your final answer."
)
prompt = (
"You are completing a task. Apply the skill and memory rules EXACTLY, "
"including any rule about searching/looking up before answering. "
"Treat a 'Learned preferences' block as HARD CONSTRAINTS that override "
"earlier conflicting skill text.\n\n"
f"{tool_hint}\n\n"
f"# Skill\n{skill or '(none)'}\n\n# Memory\n{memory or '(none)'}\n\n"
f"# Task\n{task.intent}\n\n{task.context_excerpt}\n\n"
"Return ONLY the final answer text."
)
cmd = [
self.copilot_path, "-p", prompt,
"--output-format", "json",
"--stream", "off",
"--no-color",
"--log-level", "none",
"--allow-all-tools",
"-C", work,
]
if not self.full_env:
cmd += ["--disable-builtin-mcps", "--no-custom-instructions"]
if self.model:
cmd += ["--model", self.model]
env = os.environ.copy()
if self.copilot_home:
env["COPILOT_HOME"] = self.copilot_home
resp = ""
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, encoding="utf-8",
errors="replace", timeout=self.timeout, cwd=work, env=env,
)
resp = self._parse_jsonl_response(proc.stdout or "")
except Exception:
resp = ""
self._tokens += len(prompt) // 4 + len(resp) // 4
called: List[str] = []
if os.path.exists(calllog):
with open(calllog) as f:
logged = {ln.strip() for ln in f if ln.strip()}
called = [t for t in tool_names if t in logged]
return resp, called
finally:
try:
shutil.rmtree(work, ignore_errors=True)
except Exception:
pass
class DualBackend(Backend):
"""Route operations to two backends, à la SkillOpt's target vs optimizer.
* attempt -> TARGET backend (the model the skill is deployed on)
* reflect -> OPTIMIZER backend (the stronger/cheaper model writing edits)
* judge -> OPTIMIZER backend (graded by the optimizer when no local rule)
This lets you optimize a skill with one model and run tasks on another, and
is the basis of the sleep-scenario transfer experiment (optimize cheap,
deploy expensive — or vice-versa).
"""
name = "dual"
def __init__(self, target: Backend, optimizer: Backend) -> None:
self.target = target
self.optimizer = optimizer
self.name = f"target={target.name}/optimizer={optimizer.name}"
def attempt(self, task, skill, memory, sample_id: int = 0):
return self.target.attempt(task, skill, memory, sample_id=sample_id)
def attempt_with_tools(self, task, skill, memory, tools):
return self.target.attempt_with_tools(task, skill, memory, tools)
def judge(self, task, response):
# local rule/exact judging needs no model; delegate to target which
# already short-circuits those. For rubric judging use the optimizer.
if task.reference_kind in {"rule", "exact"}:
return self.target.judge(task, response)
return self.optimizer.judge(task, response)
def reflect(self, failures, successes, skill, memory, **kw):
return self.optimizer.reflect(failures, successes, skill, memory, **kw)
def _call(self, prompt, *, max_tokens=1024):
# used by the LLM miner; prefer the optimizer (the "thinking" model)
return self.optimizer._call(prompt, max_tokens=max_tokens) # type: ignore[attr-defined]
def tokens_used(self):
return self.target.tokens_used() + self.optimizer.tokens_used()
# ── Azure OpenAI backend (gpt-5.x via managed identity) ───────────────────────
# Endpoint -> deployments, from the intern's avail_api.md. The backend picks the
# first endpoint that hosts the requested deployment.
_AZURE_ENDPOINTS = {
"https://oaidr9.openai.azure.com/": {"gpt-5.5", "gpt-5.4", "gpt-5.4-mini", "gpt-5.4-nano", "o3"},
"https://t2vgoaigpt4o6.openai.azure.com/": {"gpt-5.5", "gpt-4o-mini", "o3", "o4-mini"},
"https://oaidr21.openai.azure.com/": {"gpt-5.5", "o3", "o4-mini"},
"https://searchagent5.cognitiveservices.azure.com/": {"gpt-5.4-mini", "gpt-4o-mini"},
"https://t2vgoaigpt4o.openai.azure.com/": {"gpt-5.4", "gpt-5.4-nano", "gpt-5.2", "gpt-5.1", "o3", "o4-mini"},
}
_AZURE_MI_CLIENT_ID = "8cafa2b1-a2a7-4ad9-814a-ffe4aed7e800"
class AzureOpenAIBackend(CliBackend):
"""Drives Azure OpenAI gpt-5.x deployments via managed identity.
Mirrors the intern's blog_1 setup (avail_api.md): managed-identity auth, the
same endpoints/deployments. Reuses CliBackend's attempt/judge/reflect prompts
and JSON parsing; only _call() differs. openai + azure-identity are lazy
imported so the mock/CLI paths stay dependency-free.
"""
name = "azure"
def __init__(self, deployment: str = "", endpoint: str = "", timeout: int = 180,
api_version: str = "2024-12-01-preview") -> None:
super().__init__(model=deployment or "gpt-5.5", timeout=timeout)
self.deployment = deployment or "gpt-5.5"
self.endpoint = endpoint or self._endpoint_for(self.deployment)
self.api_version = api_version
self.name = f"azure:{self.deployment}"
self._client = None
@staticmethod
def _endpoint_for(deployment: str) -> str:
for ep, deps in _AZURE_ENDPOINTS.items():
if deployment in deps:
return ep
return "https://oaidr9.openai.azure.com/"
def _get_client(self):
if self._client is None:
from azure.identity import ManagedIdentityCredential, get_bearer_token_provider
from openai import AzureOpenAI
cred = ManagedIdentityCredential(client_id=_AZURE_MI_CLIENT_ID)
tp = get_bearer_token_provider(cred, "https://cognitiveservices.azure.com/.default")
self._client = AzureOpenAI(
azure_endpoint=self.endpoint, azure_ad_token_provider=tp,
api_version=self.api_version, max_retries=4,
)
return self._client
def _call(self, prompt: str, *, max_tokens: int = 1024, retries: int = 5) -> str:
"""Call the deployment with bounded retries.
IMPORTANT: transient failures (429 rate-limit, timeouts, 5xx) must NOT be
silently turned into an empty string — an empty response scores 0 and
deflates every baseline/after measure. We retry with exponential backoff
(mirroring the research repo's retries=5) and only return "" after the
budget is exhausted. ``time``/``random`` are used for backoff; both are
available here (this is library code, not a Workflow script sandbox).
"""
import random as _r
import time as _t
client = self._get_client()
last_exc = None
for attempt in range(max(1, retries)):
try:
resp = client.chat.completions.create(
model=self.deployment,
messages=[{"role": "user", "content": prompt}],
max_completion_tokens=16384,
)
text = (resp.choices[0].message.content or "").strip()
try:
u = resp.usage
self._tokens += (getattr(u, "prompt_tokens", 0) or 0) + (getattr(u, "completion_tokens", 0) or 0)
except Exception:
pass
if text:
return text
# empty but no exception: model genuinely returned nothing — one
# quick retry can help (reasoning models occasionally yield empty)
last_exc = "empty-response"
except Exception as e: # noqa: BLE001
last_exc = e
# backoff before next try (skip after the final attempt)
if attempt < retries - 1:
_t.sleep(min(8.0, (2 ** attempt) * 0.5) + _r.random() * 0.4)
return ""
class AzureResponsesBackend(AzureOpenAIBackend):
"""gpt-5.x via the **Responses API** on the high-throughput gpt4v endpoints.
Differs from AzureOpenAIBackend in three ways, all required by the enhanced
experiment:
* Auth via ``AzureCliCredential`` (the logged-in user), not Managed Identity
— the gpt4v-scus/swc accounts grant the data role to the CLI principal.
* Calls ``client.responses.create`` (the /responses API) instead of
chat.completions — these deployments are Responses-only.
* Round-robins across multiple endpoints for parallel throughput; each
worker thread binds a client for one endpoint (picked by thread index)
so concurrent replay spreads load across all endpoints.
A single shared ``AzureCliCredential`` token provider is reused across all
endpoint clients (the token is cached + auto-refreshed by the provider).
"""
name = "azure-responses"
# the two parallel /responses endpoints (user-provided), both hosting gpt-5.5
_RESP_ENDPOINTS = [
"https://gpt4v-scus.openai.azure.com/",
"https://gpt4v-swc.openai.azure.com/",
]
def __init__(self, deployment: str = "", endpoints: Optional[List[str]] = None,
timeout: int = 180, api_version: str = "2025-04-01-preview") -> None:
super().__init__(deployment=deployment, endpoint=(endpoints or self._RESP_ENDPOINTS)[0],
timeout=timeout, api_version=api_version)
self.endpoints = list(endpoints or self._RESP_ENDPOINTS)
self.name = f"azure-responses:{self.deployment}"
self._token_provider = None
self._clients: dict = {} # endpoint -> AzureOpenAI client
import threading as _thr
self._lock = _thr.Lock()
self._rr = 0 # round-robin counter
def _get_provider(self):
if self._token_provider is None:
from azure.identity import AzureCliCredential, get_bearer_token_provider
self._token_provider = get_bearer_token_provider(
AzureCliCredential(), "https://cognitiveservices.azure.com/.default")
return self._token_provider
def _client_for(self, endpoint: str):
cl = self._clients.get(endpoint)
if cl is None:
from openai import AzureOpenAI
cl = AzureOpenAI(
azure_endpoint=endpoint, azure_ad_token_provider=self._get_provider(),
api_version=self.api_version, max_retries=2,
)
self._clients[endpoint] = cl
return cl
def _next_endpoint(self) -> str:
# round-robin so concurrent calls spread across all endpoints
with self._lock:
ep = self.endpoints[self._rr % len(self.endpoints)]
self._rr += 1
return ep
def _call(self, prompt: str, *, max_tokens: int = 1024, retries: int = 5) -> str:
import random as _r
import time as _t
last = None
base_ep = self._next_endpoint() # this call's primary endpoint
base_idx = self.endpoints.index(base_ep)
for attempt in range(max(1, retries)):
# on retry, fail over to the other endpoint(s)
ep = self.endpoints[(base_idx + attempt) % len(self.endpoints)]
try:
client = self._client_for(ep)
resp = client.responses.create(
model=self.deployment, input=prompt,
max_output_tokens=16384,
)
text = (getattr(resp, "output_text", "") or "").strip()
try:
u = resp.usage
self._tokens += (getattr(u, "input_tokens", 0) or 0) + (getattr(u, "output_tokens", 0) or 0)
except Exception:
pass
if text:
return text
last = "empty-response"
except Exception as e: # noqa: BLE001
last = e
if attempt < retries - 1:
_t.sleep(min(8.0, (2 ** attempt) * 0.5) + _r.random() * 0.4)
return ""
def get_backend(
name: str,
*,
model: str = "",
claude_path: str = "claude",
codex_path: str = "",
azure_endpoint: str = "",
project_dir: str = "",
) -> Backend:
n = (name or "mock").strip().lower()
if n in {"claude", "anthropic", "claude_cli", "claude_code"}:
return ClaudeCliBackend(model=model, claude_path=claude_path)
if n in {"codex", "codex_cli", "openai_codex"}:
return CodexCliBackend(model=model, codex_path=codex_path, project_dir=project_dir)
if n in {"azure", "azure_openai", "aoai"}:
return AzureOpenAIBackend(deployment=model, endpoint=azure_endpoint)
if n in {"azure-responses", "azure_responses", "aoai-responses", "responses"}:
eps = [e.strip() for e in azure_endpoint.split(",") if e.strip()] or None
return AzureResponsesBackend(deployment=model, endpoints=eps)
if n in {"copilot", "github_copilot", "copilot_cli", "gh_copilot"}:
return CopilotCliBackend(model=model)
return MockBackend()
def build_backend(
*,
backend: str = "mock",
model: str = "",
optimizer_backend: str = "",
optimizer_model: str = "",
target_backend: str = "",
target_model: str = "",
codex_path: str = "",
azure_endpoint: str = "",
preferences: str = "",
project_dir: str = "",
) -> Backend:
"""Build a single or dual backend.
If optimizer_* or target_* are given, returns a DualBackend routing
attempt->target and reflect/judge->optimizer. Otherwise a single backend
from (backend, model). ``preferences`` (free text) is attached so reflect
uses it as a prior (set on the optimizer for dual backends).
"""
has_split = any([optimizer_backend, optimizer_model, target_backend, target_model])
if not has_split:
be = get_backend(
backend,
model=model,
codex_path=codex_path,
azure_endpoint=azure_endpoint,
project_dir=project_dir,
)
be.preferences = preferences
return be
tgt = get_backend(target_backend or backend, model=target_model or model,
codex_path=codex_path, azure_endpoint=azure_endpoint,
project_dir=project_dir)
opt = get_backend(optimizer_backend or backend, model=optimizer_model or model,
codex_path=codex_path, azure_endpoint=azure_endpoint,
project_dir=project_dir)
opt.preferences = preferences # reflect runs on the optimizer
dual = DualBackend(target=tgt, optimizer=opt)
dual.preferences = preferences
return dual