mirror of
https://github.com/microsoft/SkillOpt.git
synced 2026-07-03 14:02:58 +08:00
Split failure reflections into SKILL_DEFECT (body edit) vs EXECUTION_LAPSE (protected appendix note that re-emphasizes an existing rule, never edited by step-level analysts). Toggle: optimizer.use_skill_aware_reflection (default false; baseline byte-identical when off). - optimizer/appendix.py: protected APPENDIX region (inject/extract/append with dedup), mirrors the slow_update protected-field pattern - optimizer/skill_aware.py: analyst prompt augmentation, appendix_notes parsing, threshold-gated LLM consolidation, and a process-wide runtime switch (configure_skill_aware_reflection) set once by the trainer - gradient/reflect.py: augment error/success analyst prompts at runtime; None-sentinel kwargs resolve from the global switch, so env adapters need no per-benchmark wiring (works for all envs, present and future) - optimizer/skill.py: generalize the protected-region check to (slow_update, appendix); edits inside any protected region are skipped - engine/trainer.py: inject appendix at init, flush per-step EXECUTION_LAPSE notes after the gate settles, optional consolidation - tests: regression suite incl. toggle-off byte-identical guarantee and env-independent global-switch resolution (6/6 passing + live smoke) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
157 lines
6.1 KiB
Python
157 lines
6.1 KiB
Python
"""Skill-Aware Reflection — protected appendix field (EmbodiSkill S_app).
|
|
|
|
EmbodiSkill (paper 2605.10332v1) splits a skill into ``S = (S_body, S_app)``:
|
|
the body holds the main prescriptive rules; the appendix only *emphasizes*
|
|
existing valid rules that the executor failed to follow (EXECUTION_LAPSE), and
|
|
**never introduces new rules**.
|
|
|
|
This module owns the appendix region of the skill document. It mirrors the
|
|
protected-field pattern of :mod:`skillopt.optimizer.slow_update`, with two
|
|
differences:
|
|
|
|
1. **Append semantics** (not replace): execution-lapse reminders accumulate
|
|
across steps within a run, so new notes are merged into the existing
|
|
appendix rather than overwriting it.
|
|
2. **Lightweight dedup**: near-duplicate reminders are collapsed (inspired by
|
|
GMemory's ``_dedupe_preserve_order``) so the appendix stays compact.
|
|
|
|
The appendix lives **inside** the skill markdown, between dedicated markers, so
|
|
it is persisted by the normal ``_save_skill`` path and is resume-safe. Step-level
|
|
analyst edits cannot modify it (enforced by the shared protected-region check in
|
|
:mod:`skillopt.optimizer.skill`).
|
|
|
|
Public API
|
|
----------
|
|
- :func:`has_appendix_field` — check if markers are present
|
|
- :func:`inject_empty_appendix_field` — add empty placeholder (skill init)
|
|
- :func:`extract_appendix_notes` — read current notes as a list
|
|
- :func:`append_to_appendix_field` — merge new notes (dedup) into the region
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
|
|
# ── Protected field markers ─────────────────────────────────────────────────
|
|
|
|
APPENDIX_START = "<!-- APPENDIX_START -->"
|
|
APPENDIX_END = "<!-- APPENDIX_END -->"
|
|
|
|
# Heading shown inside the rendered appendix block (human-readable only).
|
|
APPENDIX_HEADING = "## Execution Notes Appendix"
|
|
|
|
# Each note is rendered as a markdown bullet so the target model reads it as
|
|
# ordinary guidance.
|
|
_NOTE_BULLET_PREFIX = "- "
|
|
|
|
|
|
# ── Dedup helpers ───────────────────────────────────────────────────────────
|
|
|
|
|
|
def _canonicalize(text: str) -> str:
|
|
"""Normalize a note for duplicate detection (whitespace/punct/case-insensitive)."""
|
|
normalized = re.sub(r"\s+", " ", str(text or "").strip())
|
|
normalized = normalized.rstrip(" .;:,_-")
|
|
return normalized.casefold()
|
|
|
|
|
|
def _dedupe_preserve_order(notes: list[str]) -> list[str]:
|
|
"""Drop blanks and near-duplicates, preserving first-seen order."""
|
|
seen: set[str] = set()
|
|
deduped: list[str] = []
|
|
for note in notes:
|
|
text = re.sub(r"\s+", " ", str(note).strip())
|
|
if not text:
|
|
continue
|
|
key = _canonicalize(text)
|
|
if not key or key in seen:
|
|
continue
|
|
seen.add(key)
|
|
deduped.append(text)
|
|
return deduped
|
|
|
|
|
|
# ── Field manipulation ──────────────────────────────────────────────────────
|
|
|
|
|
|
def has_appendix_field(skill: str) -> bool:
|
|
return APPENDIX_START in skill and APPENDIX_END in skill
|
|
|
|
|
|
def _render_block(notes: list[str]) -> str:
|
|
"""Render the full marker-delimited appendix block for *notes*."""
|
|
lines = [APPENDIX_START, APPENDIX_HEADING]
|
|
for note in notes:
|
|
lines.append(f"{_NOTE_BULLET_PREFIX}{note}")
|
|
lines.append(APPENDIX_END)
|
|
return "\n".join(lines)
|
|
|
|
|
|
def inject_empty_appendix_field(skill: str) -> str:
|
|
"""Add an empty appendix placeholder at the end of *skill* (idempotent).
|
|
|
|
Mirrors ``inject_empty_slow_update_field``: called once at skill init so the
|
|
protected region exists before any note is written.
|
|
"""
|
|
if has_appendix_field(skill):
|
|
return skill
|
|
block = f"\n\n{APPENDIX_START}\n{APPENDIX_HEADING}\n{APPENDIX_END}\n"
|
|
return skill.rstrip() + block
|
|
|
|
|
|
def extract_appendix_notes(skill: str) -> list[str]:
|
|
"""Return the current appendix notes as a list of strings (no markers/heading)."""
|
|
start = skill.find(APPENDIX_START)
|
|
end = skill.find(APPENDIX_END)
|
|
if start == -1 or end == -1:
|
|
return []
|
|
inner = skill[start + len(APPENDIX_START):end].strip()
|
|
notes: list[str] = []
|
|
for raw_line in inner.splitlines():
|
|
line = raw_line.strip()
|
|
if not line:
|
|
continue
|
|
if line == APPENDIX_HEADING or line.lstrip("#").strip() == APPENDIX_HEADING.lstrip("#").strip():
|
|
continue
|
|
if line.startswith(_NOTE_BULLET_PREFIX):
|
|
line = line[len(_NOTE_BULLET_PREFIX):].strip()
|
|
elif line.startswith("-") or line.startswith("*"):
|
|
line = line[1:].strip()
|
|
if line:
|
|
notes.append(line)
|
|
return notes
|
|
|
|
|
|
def _strip_all_appendix_fields(skill: str) -> str:
|
|
"""Remove every appendix marker pair (and content between) from *skill*."""
|
|
while True:
|
|
start = skill.find(APPENDIX_START)
|
|
if start == -1:
|
|
break
|
|
end = skill.find(APPENDIX_END, start)
|
|
if end == -1:
|
|
skill = skill[:start] + skill[start + len(APPENDIX_START):]
|
|
break
|
|
skill = skill[:end + len(APPENDIX_END)].rsplit(APPENDIX_START, 1)[0] + skill[end + len(APPENDIX_END):]
|
|
skill = skill.replace(APPENDIX_END, "")
|
|
while "\n\n\n" in skill:
|
|
skill = skill.replace("\n\n\n", "\n\n")
|
|
return skill.rstrip()
|
|
|
|
|
|
def append_to_appendix_field(skill: str, new_notes: list[str]) -> str:
|
|
"""Merge *new_notes* into the appendix region (dedup), returning updated skill.
|
|
|
|
- If no appendix region exists yet, one is created.
|
|
- Existing notes are preserved; new ones are appended after dedup against the
|
|
combined set, so order is stable and duplicates are dropped.
|
|
- Empty / whitespace-only notes are ignored. If the merged set is empty, an
|
|
empty placeholder region is still ensured.
|
|
"""
|
|
incoming = _dedupe_preserve_order(list(new_notes or []))
|
|
existing = extract_appendix_notes(skill)
|
|
merged = _dedupe_preserve_order(existing + incoming)
|
|
|
|
base = _strip_all_appendix_fields(skill)
|
|
block = _render_block(merged)
|
|
return f"{base}\n\n{block}\n"
|