Files
Cuzyoung 372fd56c1e fix(spreadsheetbench)+optimizer: fix verify-feedback bloat, drop optimizer-side truncation, soft-disable gate
A. SpreadsheetBench verification-feedback bloat
   - rollout.py _auto_verify_output: use official _compare_cell_value (was
     repr() equality, which falsely flagged 5 vs 5.0 / None vs ""); collapse
     correct-and-empty cells into a count so large sparse answer ranges no
     longer flood feedback with MBs of None=None noise.
   - codegen_agent.py _build_eval_feedback: only list WRONG cells, collapse
     correct ones into a count.
   Scoring is unaffected (evaluate() is independent); this only fixes the
   target model's multi-turn solving feedback.

B. Remove optimizer-side truncation (bloat source now fixed)
   - reflect.py: drop _MAX_TRAJ_CHARS cap and all per-field clips.
   - update_modes.py / clip.py / lr_autonomous.py: describe_item /
     short_item_summary no longer truncate; raise ranking/lr token budget.
   - trainer.py _format_step_buffer: full task_ids / target.
   - slow_update.py: full comparison samples.

C. Soft-disable gate
   - config.py / trainer.py: use_gate=false no longer raises; validation still
     runs but candidates are force-accepted (new force_accept branch + log).

Misc: aggregate.py merge token budget 4096 -> 16384.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-06-10 13:03:17 +00:00

732 lines
27 KiB
Python

"""Codegen agent for SpreadsheetBench — no tool-call, pure code generation.
Two modes:
- **single**: One LLM call → extract ```python``` block → done.
- **multi**: Up to max_turns LLM calls; after each, execute code and
feed errors back for correction.
This matches the official SpreadsheetBench evaluation setting (LLM generates
a Python code block, no function-calling / tool-use).
"""
from __future__ import annotations
import json
import os
import random
import signal
import time
import openpyxl
# ── Timeout helper ──────────────────────────────────────────────────────────
class TaskTimeout(Exception):
"""Raised when a task exceeds its time budget."""
def _timeout_handler(signum, frame):
raise TaskTimeout("Task timed out")
from skillopt.model.azure_openai import (
get_reasoning_effort,
get_target_client,
_needs_responses_api,
tracker,
)
from skillopt.model import get_codex_exec_config, get_target_backend, is_target_exec_backend
from skillopt.model.codex_harness import prepare_workspace, render_skill_md, run_target_exec
from skillopt.prompts import load_prompt
from skillopt.envs.spreadsheetbench.executor import run_generated_code
from skillopt.envs.spreadsheetbench.evaluator import evaluate
# ── Eval feedback helper (no golden value leakage) ─────────────────────────
def _build_eval_feedback(verify_report: str) -> str:
"""Build Target feedback from a verify report, hiding expected values.
The verify report contains lines like:
Sheet1!D2: got=None, expected=0 ✗
Sheet1!D10: got=None, expected=None ✓
We strip the ``expected=...`` part so the Target sees only its own
output and whether each cell is correct or wrong.
"""
import re
wrong_lines = []
n_correct = 0
for raw_line in verify_report.splitlines():
raw_line = raw_line.strip()
if not raw_line:
continue
# Match enrichment lines like " Sheet1!D2: got=None, expected=0 ✗"
m = re.match(
r"(\S+!?\w+):\s*got=(.+?),\s*expected=.+?\s*(✓|✗)$",
raw_line,
)
if m:
cell, got_val, mark = m.groups()
if mark == "":
wrong_lines.append(f" {cell}: your output = {got_val} (WRONG)")
else:
n_correct += 1
lines = ["Your code executed successfully but produced incorrect results.",
"The following cells have wrong values:"]
lines.extend(wrong_lines)
if n_correct:
lines.append(f" ({n_correct} other cells are correct.)")
lines.append(
"\nPlease analyze the spreadsheet data more carefully and fix the code. "
"Return a complete corrected Python script inside a ```python``` block."
)
return "\n".join(lines)
# ── Workbook preview (same as official prompt.py) ────────────────────────────
def _preview_workbook(path: str, max_rows: int = 5, max_cols: int = 20) -> str:
"""Generate a text preview of the first few rows of each sheet."""
wb = openpyxl.load_workbook(path, data_only=False)
chunks: list[str] = []
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
chunks.append(
f"## Sheet: {sheet_name} "
f"(dim={ws.dimensions}, max_row={ws.max_row}, max_col={ws.max_column})"
)
for row in ws.iter_rows(
min_row=1,
max_row=min(ws.max_row, max_rows),
max_col=min(ws.max_column, max_cols),
values_only=False,
):
cells = []
for cell in row:
v = cell.value
if v is None:
cells.append(f"{cell.coordinate}=")
else:
s = str(v)
if len(s) > 40:
s = s[:37] + "..."
cells.append(f"{cell.coordinate}={s}")
chunks.append(" | ".join(cells))
if ws.max_row > max_rows:
chunks.append(f"... ({ws.max_row - max_rows} more rows)")
chunks.append("")
wb.close()
return "\n".join(chunks)
# ── Code extraction (same as official prompt.py) ────────────────────────────
def extract_code(text: str) -> str:
"""Extract the first ```python``` fenced code block from LLM output."""
if "```" not in text:
return text.strip()
start = text.find("```")
nl = text.find("\n", start)
end = text.find("```", nl + 1)
if nl == -1 or end == -1:
return text.strip()
return text[nl + 1 : end].strip()
# ── Prompt construction (official SpreadsheetBench prompts) ─────────────────
def _build_system(skill_content: str) -> str:
base = load_prompt("codegen_system", env="spreadsheetbench")
if skill_content.strip():
base += f"\n\n## Skill\n{skill_content.strip()}"
return base
def _build_user(
instruction: str,
input_xlsx: str,
instruction_type: str = "",
answer_position: str = "",
diagnostic_mode: bool = False,
diagnostic_instruction: str = "",
diagnostic_trace_context: str = "",
) -> str:
try:
preview = _preview_workbook(input_xlsx)
except Exception as e: # noqa: BLE001
preview = f"(failed to preview workbook: {e})"
extra = ""
if instruction_type:
extra += f"\nInstruction type: {instruction_type}"
if answer_position:
extra += f"\nExpected answer position: {answer_position}"
task_suffix = "Return only a ```python``` code block."
diagnostic = ""
if diagnostic_mode and diagnostic_instruction.strip():
task_suffix = (
"First provide a short diagnostic readout that follows the training "
"instruction below, then return a single complete ```python``` code block."
)
diagnostic = f"\n\n# Training readout\n{diagnostic_instruction.strip()}"
prefix = ""
if diagnostic_trace_context.strip():
prefix = (
"# Previous Codex Trace Snapshot\n"
"This is a partial transcript from an earlier attempt. Use it as your current reasoning context.\n\n"
f"{diagnostic_trace_context.strip()}\n\n"
)
return (
f"{prefix}"
f"# Instruction\n{instruction}\n{extra}\n\n"
f"# Input spreadsheet preview\n{preview}\n\n"
"# Task\n"
"Write a Python script that reads the workbook from the variable `INPUT_PATH`, "
"applies the instruction, and writes the modified workbook to `OUTPUT_PATH`. "
"Preserve all other cells unchanged. "
"The preview may be truncated — do not hardcode row counts or assume the data ends at the last previewed row; "
"iterate over all actual rows in the workbook instead. "
f"{task_suffix}"
f"{diagnostic}"
)
# ── LLM call with retry ────────────────────────────────────────────────────
def _llm_call_with_retry(call_fn, *, retries: int = 5, timeout: int | None = 120):
"""Wrap an LLM API call with retry and per-call timeout."""
last_err = None
for attempt in range(retries):
try:
return call_fn(timeout=timeout)
except Exception as e: # noqa: BLE001
last_err = e
sleep = min(2 ** attempt + random.random(), 60)
time.sleep(sleep)
raise RuntimeError(f"LLM call failed after {retries} retries: {last_err}")
def _get_deployment() -> str:
from skillopt.model import azure_openai as _llm
return _llm.TARGET_DEPLOYMENT
def _build_codex_skill(skill_content: str) -> str:
return render_skill_md(
skill_content,
description="Dynamic ReflACT skill for solving the current SpreadsheetBench task.",
preamble=(
"Use this skill when solving the current SpreadsheetBench task in this workspace.\n"
"Write a single self-contained Python solution to `solution.py`.\n"
"The solution must operate on the provided `INPUT_PATH` and `OUTPUT_PATH` variables.\n"
"You may inspect `input.xlsx` and run `python run_solution.py` to validate locally,\n"
"but do not hardcode values from the preview or from one specific workbook."
),
)
def _build_codex_task(
instruction: str,
input_xlsx: str,
instruction_type: str,
answer_position: str,
*,
diagnostic_mode: bool,
diagnostic_instruction: str,
diagnostic_trace_context: str,
) -> str:
prompt = _build_user(
instruction,
input_xlsx,
instruction_type,
answer_position,
diagnostic_mode=diagnostic_mode,
diagnostic_instruction=diagnostic_instruction,
diagnostic_trace_context=diagnostic_trace_context,
)
return (
f"{prompt}\n\n"
"## Codex Harness Task\n"
"- Read `.agents/skills/skillopt-target/SKILL.md` before writing code; do not call a Skill tool.\n"
"- Read and optionally inspect `input.xlsx` in this workspace.\n"
"- Write the final Python solution to `solution.py`.\n"
"- The script should use the provided `INPUT_PATH` and `OUTPUT_PATH` variables.\n"
"- If you want to validate locally, run `python run_solution.py`.\n"
"- Do not return a code fence as the primary artifact; the source of truth is `solution.py`.\n"
)
def _build_codex_driver() -> str:
return (
"import pathlib\n"
"import re\n"
"import sys\n"
"import traceback\n\n"
'INPUT_PATH = "input.xlsx"\n'
'OUTPUT_PATH = "output.xlsx"\n'
"code = pathlib.Path('solution.py').read_text(encoding='utf-8')\n"
"code = re.sub(r'^\\s*(INPUT_PATH|OUTPUT_PATH)\\s*=\\s*.+$', '', code, flags=re.MULTILINE)\n"
"globals_dict = {'__name__': '__main__', 'INPUT_PATH': INPUT_PATH, 'OUTPUT_PATH': OUTPUT_PATH}\n"
"try:\n"
" exec(compile(code, 'solution.py', 'exec'), globals_dict, globals_dict)\n"
"except Exception:\n"
" traceback.print_exc()\n"
" sys.exit(2)\n"
)
def _prepare_codex_workspace(
*,
instruction: str,
input_xlsx: str,
output_path: str,
instruction_type: str,
answer_position: str,
skill_content: str,
diagnostic_mode: bool,
diagnostic_instruction: str,
diagnostic_trace_context: str,
workspace_name: str = "codex_single",
) -> tuple[str, str, str, str]:
task_out_dir = os.path.dirname(output_path)
work_dir = os.path.join(task_out_dir, workspace_name)
skill_md = _build_codex_skill(skill_content)
task_md = _build_codex_task(
instruction,
input_xlsx,
instruction_type,
answer_position,
diagnostic_mode=diagnostic_mode,
diagnostic_instruction=diagnostic_instruction,
diagnostic_trace_context=diagnostic_trace_context,
)
prompt = (
"Read `.agents/skills/skillopt-target/SKILL.md` directly; do not call a Skill tool.\n"
"Read `task.md`, inspect `input.xlsx` if useful, and write the final solution to `solution.py`.\n"
"You may run `python run_solution.py` to validate the script locally.\n"
"In your final response, briefly confirm whether `solution.py` was written and summarize the approach."
)
prepare_workspace(
work_dir=work_dir,
skill_md=skill_md,
task_text=task_md,
extra_files={"run_solution.py": _build_codex_driver()},
copy_files=[(input_xlsx, "input.xlsx")],
)
return work_dir, skill_md, task_md, prompt
def _run_exec_backend(
*,
work_dir: str,
prompt: str,
model: str,
timeout: int,
) -> tuple[str, str]:
return run_target_exec(
work_dir=work_dir,
prompt=prompt,
model=model,
timeout=timeout,
allow_file_edits=True,
)
# ── Chat (no tools) ────────────────────────────────────────────────────────
def _chat_call(
client,
deployment: str,
messages: list[dict],
max_output_tokens: int,
llm_timeout: int | None = 120,
) -> str:
"""Single LLM call, no tools. Returns raw text."""
reasoning_effort = get_reasoning_effort()
if _needs_responses_api(deployment):
# Responses API
system = ""
api_input = []
for m in messages:
if m["role"] == "system":
system = m["content"]
else:
api_input.append({"role": m["role"], "content": m["content"]})
resp = _llm_call_with_retry(lambda timeout: client.responses.create(
model=deployment,
instructions=system,
input=api_input,
max_output_tokens=max_output_tokens,
**({"reasoning": {"effort": reasoning_effort}} if reasoning_effort else {}),
timeout=timeout,
), timeout=llm_timeout)
if hasattr(resp, "usage") and resp.usage:
tracker.record(
"rollout",
getattr(resp.usage, "input_tokens", 0) or 0,
getattr(resp.usage, "output_tokens", 0) or 0,
)
text = getattr(resp, "output_text", None) or ""
if text:
return text
for item in getattr(resp, "output", None) or []:
for part in getattr(item, "content", []):
if getattr(part, "type", "") == "output_text":
return part.text or ""
return ""
else:
# Chat Completions API — no tools
kwargs = {
"model": deployment,
"messages": messages,
"max_completion_tokens": max_output_tokens,
}
if reasoning_effort is not None:
kwargs["reasoning_effort"] = reasoning_effort
resp = _llm_call_with_retry(lambda timeout: client.chat.completions.create(
**kwargs,
timeout=timeout,
), timeout=llm_timeout)
if resp.usage:
tracker.record(
"rollout",
resp.usage.prompt_tokens or 0,
resp.usage.completion_tokens or 0,
)
return resp.choices[0].message.content or ""
# ── Public API ──────────────────────────────────────────────────────────────
def run_single(
instruction: str,
input_xlsx: str,
output_path: str,
instruction_type: str = "",
answer_position: str = "",
skill_content: str = "",
max_output_tokens: int = 16384,
llm_timeout: int | None = 120,
task_timeout: int | None = 300,
diagnostic_mode: bool = False,
diagnostic_instruction: str = "",
diagnostic_trace_context: str = "",
) -> dict:
"""Single-round code generation. One LLM call, no tools.
Args:
llm_timeout: Per-LLM-call timeout in seconds (default 120).
task_timeout: Total task timeout in seconds (default 300).
Returns ``{"code": str, "raw": str, "n_turns": 1}``.
"""
no_task_timeout = task_timeout is None or task_timeout <= 0
if is_target_exec_backend():
deadline = None if no_task_timeout else time.time() + task_timeout
deployment = _get_deployment()
work_dir, skill_md, task_md, prompt = _prepare_codex_workspace(
instruction=instruction,
input_xlsx=input_xlsx,
output_path=output_path,
instruction_type=instruction_type,
answer_position=answer_position,
skill_content=skill_content,
diagnostic_mode=diagnostic_mode,
diagnostic_instruction=diagnostic_instruction,
diagnostic_trace_context=diagnostic_trace_context,
)
if deadline is None:
effective_timeout = 10**9
else:
remaining = max(10, int(deadline - time.time()))
effective_timeout = min(task_timeout, remaining)
final_message, raw = _run_exec_backend(
work_dir=work_dir,
prompt=prompt,
model=deployment,
timeout=effective_timeout,
)
solution_path = os.path.join(work_dir, "solution.py")
if os.path.exists(solution_path):
with open(solution_path, encoding="utf-8") as f:
code = f.read()
else:
code = extract_code(final_message or raw)
return {
"code": code,
"raw": raw or final_message,
"n_turns": 1,
"conversation": [{"role": "assistant", "content": final_message or raw}],
"target_system_prompt": skill_md,
"target_user_prompt": f"{prompt}\n\n## Task File\n\n{task_md}",
}
deadline = None if no_task_timeout else time.time() + task_timeout
client = get_target_client()
deployment = _get_deployment()
system = _build_system(skill_content)
user = _build_user(
instruction,
input_xlsx,
instruction_type,
answer_position,
diagnostic_mode=diagnostic_mode,
diagnostic_instruction=diagnostic_instruction,
diagnostic_trace_context=diagnostic_trace_context,
)
messages = [
{"role": "system", "content": system},
{"role": "user", "content": user},
]
if deadline is None:
effective_timeout = None
else:
remaining = max(10, int(deadline - time.time()))
effective_timeout = min(llm_timeout or remaining, remaining)
raw = _chat_call(client, deployment, messages, max_output_tokens, llm_timeout=effective_timeout)
time.sleep(3) # Rate-limit cooldown after successful LLM call
code = extract_code(raw)
return {
"code": code,
"raw": raw,
"n_turns": 1,
"conversation": [{"role": "assistant", "content": raw}],
"target_system_prompt": system,
"target_user_prompt": user,
}
def run_multi(
instruction: str,
input_xlsx: str,
output_path: str,
instruction_type: str = "",
answer_position: str = "",
skill_content: str = "",
max_turns: int = 5,
max_output_tokens: int = 16384,
llm_timeout: int | None = 120,
task_timeout: int | None = 600,
gold_path: str = "",
diagnostic_mode: bool = False,
diagnostic_instruction: str = "",
diagnostic_trace_context: str = "",
) -> dict:
"""Multi-round code generation with execution feedback. No tools.
Each round: LLM generates code → execute → if error, feed back and retry.
Args:
llm_timeout: Per-LLM-call timeout in seconds (default 120).
task_timeout: Total task timeout in seconds (default 600).
gold_path: Path to golden answer xlsx for eval feedback during
training. When non-empty, a successful execution is followed
by an eval check; if the output is wrong the agent receives
cell-level feedback (without revealing expected values) and
gets another turn. Leave empty for eval/test to avoid
data leakage.
Returns ``{"code": str, "raw": str, "n_turns": int, "conversation": [...]}``.
"""
no_task_timeout = task_timeout is None or task_timeout <= 0
if is_target_exec_backend():
deadline = None if no_task_timeout else time.time() + task_timeout
deployment = _get_deployment()
work_dir, skill_md, task_md, initial_prompt = _prepare_codex_workspace(
instruction=instruction,
input_xlsx=input_xlsx,
output_path=output_path,
instruction_type=instruction_type,
answer_position=answer_position,
skill_content=skill_content,
diagnostic_mode=diagnostic_mode,
diagnostic_instruction=diagnostic_instruction,
diagnostic_trace_context=diagnostic_trace_context,
workspace_name="codex_multi",
)
prompt = (
f"{initial_prompt}\n\n"
"## Multi-Turn Repair Mode\n"
"- This is turn 1. Write or overwrite `solution.py`.\n"
"- After each turn, the harness will execute your `solution.py`; if it fails, you will receive feedback and may revise it.\n"
"- Keep the script general: use `INPUT_PATH` and `OUTPUT_PATH`, and do not hardcode one workbook's values."
)
conversation: list[dict] = []
code = ""
raw = ""
final_message = ""
solution_path = os.path.join(work_dir, "solution.py")
for turn in range(max_turns):
if deadline is None:
effective_timeout = 10**9
else:
remaining = deadline - time.time()
if remaining <= 10:
break
effective_timeout = max(10, int(remaining))
final_message, raw = _run_exec_backend(
work_dir=work_dir,
prompt=prompt,
model=deployment,
timeout=effective_timeout,
)
conversation.append({"role": "assistant", "content": final_message or raw})
if os.path.exists(solution_path):
with open(solution_path, encoding="utf-8") as f:
code = f.read()
else:
code = extract_code(final_message or raw)
if code.strip():
with open(solution_path, "w", encoding="utf-8") as f:
f.write(code)
if not code.strip():
feedback = (
"No usable `solution.py` or Python code block was produced. "
"Write a complete `solution.py` that reads `INPUT_PATH` and saves `OUTPUT_PATH`."
)
else:
ok, err = run_generated_code(
code,
input_xlsx,
output_path,
timeout=None if no_task_timeout else 120,
)
if ok:
if gold_path and answer_position:
from skillopt.envs.spreadsheetbench.rollout import _auto_verify_output
eval_result = evaluate(
output_path, gold_path, instruction_type, answer_position,
)
if eval_result["ok"]:
break
verify = _auto_verify_output(output_path, gold_path, answer_position)
feedback = _build_eval_feedback(verify)
else:
break
else:
feedback = (
"The current `solution.py` raised an error during harness execution:\n\n"
f"```\n{err[:3000]}\n```\n\n"
"Revise `solution.py` to fix the error. Keep using `INPUT_PATH` and `OUTPUT_PATH`."
)
feedback_path = os.path.join(work_dir, f"feedback_turn_{turn + 1:02d}.md")
with open(feedback_path, "w", encoding="utf-8") as f:
f.write(feedback)
conversation.append({"role": "user", "content": feedback})
prompt = (
f"The previous `solution.py` was evaluated and needs another revision.\n"
f"Read `{os.path.basename(feedback_path)}` and update `solution.py` accordingly.\n"
"You may run `python run_solution.py` for a local syntax/runtime check, but the harness will run the final code separately.\n"
"Do not hardcode workbook-specific answers; preserve unrelated cells."
)
return {
"code": code,
"raw": raw or final_message,
"n_turns": len([m for m in conversation if m["role"] == "assistant"]),
"conversation": conversation,
"target_system_prompt": skill_md,
"target_user_prompt": f"{initial_prompt}\n\n## Task File\n\n{task_md}",
}
deadline = None if no_task_timeout else time.time() + task_timeout
client = get_target_client()
deployment = _get_deployment()
system = _build_system(skill_content)
user = _build_user(
instruction,
input_xlsx,
instruction_type,
answer_position,
diagnostic_mode=diagnostic_mode,
diagnostic_instruction=diagnostic_instruction,
diagnostic_trace_context=diagnostic_trace_context,
)
messages: list[dict] = [
{"role": "system", "content": system},
{"role": "user", "content": user},
]
conversation: list[dict] = []
code = ""
raw = ""
for turn in range(max_turns):
if deadline is None:
effective_timeout = None
else:
remaining = deadline - time.time()
if remaining <= 10:
# Not enough time for another round
break
effective_timeout = min(llm_timeout or int(remaining), int(remaining))
raw = _chat_call(client, deployment, messages, max_output_tokens, llm_timeout=effective_timeout)
time.sleep(3) # Rate-limit cooldown after successful LLM call
code = extract_code(raw)
conversation.append({"role": "assistant", "content": raw})
messages.append({"role": "assistant", "content": raw})
if not code.strip():
# No code extracted — ask again
feedback = (
"No Python code block was found in your response. "
"Please return a complete Python script inside a ```python``` block."
)
messages.append({"role": "user", "content": feedback})
conversation.append({"role": "user", "content": feedback})
continue
# Execute the code
ok, err = run_generated_code(
code,
input_xlsx,
output_path,
timeout=None if no_task_timeout else 120,
)
if ok:
# Execution succeeded — check correctness if gold_path available
if gold_path and answer_position:
from skillopt.envs.spreadsheetbench.rollout import _auto_verify_output
eval_result = evaluate(
output_path, gold_path, instruction_type, answer_position,
)
if eval_result["ok"]:
break # Genuinely correct — stop
# Output is wrong — build feedback without leaking golden values
verify = _auto_verify_output(output_path, gold_path, answer_position)
feedback = _build_eval_feedback(verify)
messages.append({"role": "user", "content": feedback})
conversation.append({"role": "user", "content": feedback})
continue
else:
# No gold path (eval/test) — accept execution success
break
# Execution failed — feed error back
feedback = (
f"The code raised an error during execution:\n\n"
f"```\n{err[:3000]}\n```\n\n"
f"Please fix the code and return a complete corrected Python script "
f"inside a ```python``` block."
)
messages.append({"role": "user", "content": feedback})
conversation.append({"role": "user", "content": feedback})
return {
"code": code,
"raw": raw,
"n_turns": turn + 1,
"conversation": conversation,
"target_system_prompt": system,
"target_user_prompt": user,
}