refactor(sleep): decouple engine to top-level skillopt_sleep/ (zero research dep)

Open-source-tool / research-code separation:
  - git mv skillopt/sleep/ -> skillopt_sleep/ (top-level, sibling to the research
    skillopt/ package). History preserved as renames.
  - All imports skillopt.sleep.* -> skillopt_sleep.*.
  - Vendor the validation gate into skillopt_sleep/gate.py (a self-contained copy
    of skillopt.evaluation.gate). The engine now has ZERO dependency on the
    research package — verified: grep finds no `from skillopt.` in skillopt_sleep/,
    and consolidate's gate resolves to skillopt_sleep.gate.
  - Plugin scripts/commands/skill call `-m skillopt_sleep`.

29 tests pass; `python -m skillopt_sleep` runs standalone.

Co-Authored-By: Claude Opus 4 <noreply@anthropic.com>
This commit is contained in:
Yifan Yang
2026-06-08 14:31:52 +00:00
parent e2de84d36f
commit b02ffc2c99
32 changed files with 199 additions and 162 deletions

View File

@@ -1,20 +0,0 @@
"""SkillOpt-Sleep — nightly offline self-evolution for a local Claude agent.
A Claude Code plugin engine that gives a user's agent a "sleep cycle":
harvest the day's real session transcripts, mine recurring tasks, replay
them offline, and consolidate short-term experience into long-term memory
(CLAUDE.md) and skills (SKILL.md) behind a SkillOpt validation gate.
Synthesizes three ideas:
* SkillOpt — validation-gated bounded text optimization (this repo)
* Dreams — offline memory consolidation, input never mutated
* Sleep — short-term experience -> long-term competence, offline
Public entry points:
* skillopt.sleep.cli — `python -m skillopt.sleep ...`
* skillopt.sleep.cycle.run_sleep_cycle(...)
"""
from __future__ import annotations
__all__ = ["__version__"]
__version__ = "0.1.0"

View File

@@ -1,198 +0,0 @@
"""SkillOpt-Sleep — command-line interface.
python -m skillopt.sleep run # full cycle: harvest->mine->replay->gate->stage
python -m skillopt.sleep dry-run # same but report only, no staging/adopt
python -m skillopt.sleep status # show state + latest staged proposal
python -m skillopt.sleep adopt # apply the latest staged proposal (with backup)
python -m skillopt.sleep harvest # just print what would be mined (debug)
Common flags:
--project PATH project to evolve (default: cwd)
--scope all|invoked harvest scope (default: invoked)
--backend mock|anthropic
--model NAME
--lookback-hours N
--auto-adopt
--json machine-readable output
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from typing import Any, Dict
from skillopt.sleep.config import load_config
from skillopt.sleep.cycle import run_sleep_cycle
from skillopt.sleep.harvest import harvest
from skillopt.sleep.mine import mine
from skillopt.sleep.state import SleepState
from skillopt.sleep.staging import latest_staging, adopt as adopt_staging
def _add_common(p: argparse.ArgumentParser) -> None:
p.add_argument("--project", default="")
p.add_argument("--scope", default="", choices=["", "all", "invoked"])
p.add_argument("--backend", default="", choices=["", "mock", "claude", "codex"])
p.add_argument("--model", default="")
p.add_argument("--codex-path", default="", help="path to the real @openai/codex binary")
p.add_argument("--claude-home", default="", help="override ~/.claude (also isolates state)")
p.add_argument("--lookback-hours", type=int, default=0)
p.add_argument("--edit-budget", type=int, default=0)
p.add_argument("--auto-adopt", action="store_true")
p.add_argument("--json", action="store_true")
def _cfg_from_args(args) -> Any:
overrides: Dict[str, Any] = {}
if args.project:
overrides["invoked_project"] = os.path.abspath(args.project)
overrides["projects"] = "invoked"
if args.scope:
overrides["projects"] = args.scope
if args.backend:
overrides["backend"] = args.backend
if args.model:
overrides["model"] = args.model
if getattr(args, "codex_path", ""):
overrides["codex_path"] = os.path.abspath(args.codex_path)
if getattr(args, "claude_home", ""):
overrides["claude_home"] = os.path.abspath(args.claude_home)
if getattr(args, "lookback_hours", 0):
overrides["lookback_hours"] = args.lookback_hours
if getattr(args, "edit_budget", 0):
overrides["edit_budget"] = args.edit_budget
if getattr(args, "auto_adopt", False):
overrides["auto_adopt"] = True
return load_config(**overrides)
def cmd_run(args, dry: bool = False) -> int:
cfg = _cfg_from_args(args)
outcome = run_sleep_cycle(cfg, dry_run=dry)
rep = outcome.report
if args.json:
print(json.dumps({
"night": rep.night, "accepted": rep.accepted,
"gate_action": rep.gate_action,
"baseline": rep.baseline_score, "candidate": rep.candidate_score,
"n_tasks": rep.n_tasks, "n_sessions": rep.n_sessions,
"edits": [e.__dict__ for e in rep.edits],
"staging_dir": outcome.staging_dir, "adopted": outcome.adopted,
}, ensure_ascii=False, indent=2))
else:
print(f"[sleep] night {rep.night}: {rep.n_sessions} sessions -> {rep.n_tasks} tasks")
print(f"[sleep] held-out {rep.baseline_score:.3f} -> {rep.candidate_score:.3f} "
f"=> {rep.gate_action} (accepted={rep.accepted})")
for e in rep.edits:
print(f" + [{e.target}/{e.op}] {e.content}")
if outcome.staging_dir:
print(f"[sleep] staged: {outcome.staging_dir}")
if not outcome.adopted:
print("[sleep] review it, then: python -m skillopt.sleep adopt")
if outcome.adopted:
print(f"[sleep] auto-adopted: {', '.join(outcome.adopted_paths)}")
return 0
def cmd_status(args) -> int:
cfg = _cfg_from_args(args)
state = SleepState.load(cfg.state_path)
project = cfg.get("invoked_project") or os.getcwd()
latest = latest_staging(project)
info = {
"night": state.night,
"state_path": cfg.state_path,
"project": project,
"history_tail": state.data.get("history", [])[-5:],
"latest_staging": latest,
"slow_memory_chars": len(state.slow_memory),
}
if args.json:
print(json.dumps(info, ensure_ascii=False, indent=2))
else:
print(f"[sleep] nights so far: {state.night}")
print(f"[sleep] project: {project}")
if latest:
print(f"[sleep] latest staged proposal: {latest}")
rp = os.path.join(latest, "report.md")
if os.path.exists(rp):
with open(rp) as f:
print("\n" + f.read())
else:
print("[sleep] no staged proposals yet.")
return 0
def cmd_adopt(args) -> int:
cfg = _cfg_from_args(args)
project = cfg.get("invoked_project") or os.getcwd()
target = args.staging or latest_staging(project)
if not target or not os.path.isdir(target):
print("[sleep] nothing to adopt (no staging dir).")
return 1
updated = adopt_staging(target)
print(f"[sleep] adopted from {target}")
for p in updated:
print(f" -> {p}")
if not updated:
print("[sleep] (proposal contained no accepted changes)")
return 0
def cmd_harvest(args) -> int:
cfg = _cfg_from_args(args)
digests = harvest(
cfg.transcripts_dir,
scope=cfg.get("projects", "invoked"),
invoked_project=cfg.get("invoked_project", ""),
limit=cfg.get("max_tasks_per_night", 40) * 3,
)
tasks = mine(digests, max_tasks=cfg.get("max_tasks_per_night", 40),
holdout_fraction=cfg.get("holdout_fraction", 0.34), seed=cfg.get("seed", 42))
if args.json:
print(json.dumps({
"n_sessions": len(digests),
"tasks": [t.to_dict() for t in tasks],
}, ensure_ascii=False, indent=2))
else:
print(f"[sleep] {len(digests)} sessions -> {len(tasks)} tasks")
for t in tasks:
print(f" [{t.split}/{t.outcome}] {t.intent[:90]}")
return 0
def main(argv=None) -> int:
parser = argparse.ArgumentParser(prog="skillopt.sleep", description="SkillOpt-Sleep nightly self-evolution")
sub = parser.add_subparsers(dest="cmd", required=True)
p_run = sub.add_parser("run", help="run a full sleep cycle")
_add_common(p_run)
p_dry = sub.add_parser("dry-run", help="harvest+mine+replay, report only")
_add_common(p_dry)
p_status = sub.add_parser("status", help="show state + latest proposal")
_add_common(p_status)
p_adopt = sub.add_parser("adopt", help="apply latest staged proposal")
_add_common(p_adopt)
p_adopt.add_argument("--staging", default="", help="specific staging dir")
p_harvest = sub.add_parser("harvest", help="debug: show mined tasks")
_add_common(p_harvest)
args = parser.parse_args(argv)
if args.cmd == "run":
return cmd_run(args, dry=False)
if args.cmd == "dry-run":
return cmd_run(args, dry=True)
if args.cmd == "status":
return cmd_status(args)
if args.cmd == "adopt":
return cmd_adopt(args)
if args.cmd == "harvest":
return cmd_harvest(args)
parser.print_help()
return 2
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,787 +0,0 @@
"""SkillOpt-Sleep — optimizer/replay backend abstraction.
A backend supplies the three "intelligent" operations the sleep cycle needs:
1. attempt(task, skill, memory) -> response text (the rollout)
2. judge(task, response) -> (hard, soft, rationale) (the reward)
3. reflect(failures, successes, skill, memory)
-> list[EditRecord] (proposed bounded edits)
Two implementations:
* MockBackend — deterministic, no API, used for tests + the experiment.
Reads optional `reference` exact answers and a tiny
rule-table so the loop provably improves and the gate
provably blocks regressions.
* AnthropicBackend — uses the user's ANTHROPIC_API_KEY via the `claude`
CLI or the anthropic SDK (lazy-imported). Real lift.
The backend never touches live config; it only returns text/edits that the
consolidation stage gates and stages.
"""
from __future__ import annotations
import json
import os
import re
import subprocess
from typing import Any, Dict, List, Optional, Tuple
from skillopt.sleep.types import EditRecord, ReplayResult, TaskRecord
def skill_hash(content: str) -> str:
import hashlib
return hashlib.sha256(content.encode("utf-8")).hexdigest()[:16]
# ── Backend protocol ──────────────────────────────────────────────────────────
class Backend:
name = "base"
# Optional user preferences (free text) injected into reflect as a prior.
preferences: str = ""
def attempt(self, task: TaskRecord, skill: str, memory: str) -> str:
raise NotImplementedError
def attempt_with_tools(
self, task: TaskRecord, skill: str, memory: str, tools: List[str]
) -> Tuple[str, List[str]]:
"""Run the task while exposing real tools; return (response, tools_called).
Default: no real tool loop — fall back to plain attempt and let the
single-shot 'TOOL_CALL: <name>' marker convention surface intent. CLI
backends override this to expose a genuinely callable tool.
"""
resp = self.attempt(task, skill, memory)
called: List[str] = []
for t in tools:
if re.search(r"(?i)\btool_call\s*:\s*%s\b" % re.escape(t), resp):
called.append(t)
return resp, called
def judge(self, task: TaskRecord, response: str) -> Tuple[float, float, str]:
raise NotImplementedError
def reflect(
self,
failures: List[Tuple[TaskRecord, ReplayResult]],
successes: List[Tuple[TaskRecord, ReplayResult]],
skill: str,
memory: str,
*,
edit_budget: int,
evolve_skill: bool,
evolve_memory: bool,
) -> List[EditRecord]:
raise NotImplementedError
# token accounting (optional)
def tokens_used(self) -> int:
return 0
# ── Shared scoring helpers ────────────────────────────────────────────────────
def _normalize(s: str) -> str:
s = (s or "").lower().strip()
s = re.sub(r"[^\w\s]", " ", s)
s = re.sub(r"\s+", " ", s)
return s.strip()
def exact_score(reference: str, response: str) -> float:
ref = _normalize(reference)
resp = _normalize(response)
if not ref:
return 0.0
return 1.0 if ref in resp or resp == ref else 0.0
def keyword_soft_score(reference: str, response: str) -> float:
"""Fraction of reference tokens present in response (cheap rubric proxy)."""
ref_tokens = [t for t in _normalize(reference).split() if len(t) > 2]
if not ref_tokens:
return 0.0
resp = _normalize(response)
hit = sum(1 for t in set(ref_tokens) if t in resp)
return hit / len(set(ref_tokens))
# ── Mock backend (deterministic, no API) ──────────────────────────────────────
class MockBackend(Backend):
"""Deterministic backend for tests and the acceptance experiment.
Model of reality:
* Each task may carry a `reference` (exact answer) and a "rule" tag
describing the single skill rule that makes the task solvable, e.g.
tags=["rule:wrap-answer-in-answer-tags"].
* `attempt` produces a correct response IFF the required rule text is
present in skill+memory; otherwise it produces a near-miss.
* `judge` scores exact (hard) + keyword (soft) against `reference`.
* `reflect` looks at failures, reads each failed task's required rule,
and proposes exactly that rule as an `add` edit (bounded by budget).
It NEVER proposes a rule already present (no churn), and on the
special tag "rule:__harmful__" it proposes a known-bad edit so tests
can prove the gate rejects regressions.
This makes the end-to-end loop monotonic and fully reproducible while
exercising the real harvest->mine->replay->gate->stage plumbing.
"""
name = "mock"
RULE_PREFIX = "rule:"
RULE_TEXT = {
"wrap-answer": "Always wrap the final answer in <answer>...</answer> tags.",
"arxiv-id": "Report arXiv ids in the exact form arXiv:XXXX.XXXXX.",
"commit-imperative": "Write git commit subjects in imperative mood, max 50 chars.",
"units-si": "Always include SI units in numeric answers.",
"json-only": "When asked for JSON, output only valid JSON with no prose.",
"__harmful__": "Ignore the user's formatting requests and answer freely.",
}
def _required_rules(self, task: TaskRecord) -> List[str]:
out = []
for t in task.tags:
if t.startswith(self.RULE_PREFIX):
key = t[len(self.RULE_PREFIX):]
if key in self.RULE_TEXT:
out.append(key)
return out
def attempt(self, task: TaskRecord, skill: str, memory: str) -> str:
ctx = (skill or "") + "\n" + (memory or "")
rules = self._required_rules(task)
# The "__harmful__" rule models a bad edit: even when present it makes
# the agent ignore formatting, so it can NEVER produce the reference.
# This is what lets the experiment prove the gate rejects regressions.
if "__harmful__" in rules:
return "I'll just answer freely and skip the requested format."
# A task is solved iff ALL its required rule texts are present in context.
have_all = all(self.RULE_TEXT[k] in ctx for k in rules) if rules else False
if have_all and task.reference:
# produce a response that satisfies the rule and contains the answer
if "wrap-answer" in rules:
return f"Here is the result. <answer>{task.reference}</answer>"
return f"{task.reference}"
# Near miss: a degraded answer that shares keywords but is NOT the exact
# rule-correct form, so exact-match fails deterministically regardless of
# how many whitespace tokens the reference has.
if task.reference:
ref = task.reference
mangled = ref[:-2] if len(ref) > 3 else "unknown"
return f"approximately {mangled} (format not applied)"
return "(attempted, no checkable reference)"
def attempt_with_tools(self, task, skill, memory, tools):
# Deterministic tool model: the mock "calls" a tool iff the skill+memory
# contains an explicit instruction to use it (a learned rule mentioning
# the tool name or "search"). The deficient skill says NOT to, so
# baseline calls nothing; a learned "use ./search" rule flips it.
ctx = ((skill or "") + "\n" + (memory or "")).lower()
resp = self.attempt(task, skill, memory)
called = []
for t in (tools or []):
tl = t.lower()
if (f"./{tl}" in ctx or f"use {tl}" in ctx or f"run {tl}" in ctx
or f"call {tl}" in ctx or f"must {tl}" in ctx):
called.append(t)
return resp, called
def judge(self, task: TaskRecord, response: str) -> Tuple[float, float, str]:
if task.reference_kind == "rule" and task.judge:
from skillopt.sleep.judges import score_rule_judge
return score_rule_judge(task.judge, response)
if task.reference_kind == "exact" and task.reference:
hard = exact_score(task.reference, response)
soft = max(hard, keyword_soft_score(task.reference, response))
return hard, soft, f"exact-match={hard}"
if task.reference_kind == "rubric" and task.reference:
soft = keyword_soft_score(task.reference, response)
return (1.0 if soft >= 0.8 else 0.0), soft, f"rubric keyword soft={soft:.2f}"
# no reference: outcome-derived weak label
hard = 1.0 if task.outcome == "success" else 0.0
return hard, hard, "outcome-derived"
def reflect(
self,
failures,
successes,
skill: str,
memory: str,
*,
edit_budget: int,
evolve_skill: bool,
evolve_memory: bool,
) -> List[EditRecord]:
ctx = (skill or "") + "\n" + (memory or "")
edits: List[EditRecord] = []
seen_text: set = set()
target = "skill" if evolve_skill else "memory"
for task, _res in failures:
for key in self._required_rules(task):
text = self.RULE_TEXT[key]
if text in ctx or text in seen_text:
continue
seen_text.add(text)
edits.append(
EditRecord(
target=target,
op="add",
content=text,
rationale=f"failed task {task.id} requires rule '{key}'",
)
)
if len(edits) >= edit_budget:
return edits
return edits
# ── Shared real-CLI backend (prompts + parsing + cache; subclasses do _call) ──
def _extract_json(raw: str, kind: str):
"""Pull the first JSON object/array out of a possibly chatty CLI reply."""
pat = r"\{.*\}" if kind == "object" else r"\[.*\]"
m = re.search(pat, raw or "", re.DOTALL)
if not m:
return None
try:
return json.loads(m.group(0))
except Exception:
return None
class CliBackend(Backend):
"""Common logic for real CLI-driven backends (claude / codex).
Subclasses implement only ``_call(prompt) -> str``. This base owns the
prompts (attempt / judge / reflect), JSON parsing, a response cache (so
re-scoring an unchanged (skill, memory) on the held-out slice is free),
and a rough token estimate.
"""
name = "cli"
def __init__(self, model: str = "", timeout: int = 180) -> None:
self.model = model
self.timeout = timeout
self._tokens = 0
self._cache: Dict[str, str] = {}
# subclasses override --------------------------------------------------
def _call(self, prompt: str, *, max_tokens: int = 1024) -> str:
raise NotImplementedError
def _cached_call(self, key: str, prompt: str, *, max_tokens: int = 1024) -> str:
if key in self._cache:
return self._cache[key]
out = self._call(prompt, max_tokens=max_tokens)
self._tokens += len(prompt) // 4 + len(out) // 4
self._cache[key] = out
return out
# operations -----------------------------------------------------------
def attempt(self, task: TaskRecord, skill: str, memory: str) -> str:
prompt = (
"You are completing a recurring task for a user. Apply the skill and "
"memory rules EXACTLY, including any output-format requirements. If the "
"skill contains a 'Learned preferences' block, treat those rules as "
"HARD CONSTRAINTS that OVERRIDE anything earlier in the skill they "
"conflict with (e.g. an explicit length limit overrides 'be "
"exhaustive'). Satisfy every such constraint even at the cost of "
"brevity or detail.\n\n"
f"# Skill\n{skill or '(none)'}\n\n# Memory\n{memory or '(none)'}\n\n"
f"# Task\n{task.intent}\n\n{task.context_excerpt}\n\n"
"Return ONLY the final answer text, nothing else."
)
# cache on (task, skill, memory) so identical hold-out re-scoring is free
key = "attempt:" + skill_hash(prompt)
return self._cached_call(key, prompt, max_tokens=512)
def judge(self, task: TaskRecord, response: str) -> Tuple[float, float, str]:
# gbrain-style rule judge: scored locally, no API spend
if task.reference_kind == "rule" and task.judge:
from skillopt.sleep.judges import score_rule_judge
return score_rule_judge(task.judge, response)
# exact references are scored locally — no API spend
if task.reference_kind == "exact" and task.reference:
hard = exact_score(task.reference, response)
return hard, max(hard, keyword_soft_score(task.reference, response)), "exact(local)"
prompt = (
"Score how well the response satisfies the rubric, 0..1. "
'Return ONLY JSON {"score": <0..1>, "reason": "..."}.\n\n'
f"# Rubric\n{task.reference or task.intent}\n\n# Response\n{response}"
)
key = "judge:" + skill_hash(prompt)
raw = self._cached_call(key, prompt, max_tokens=200)
obj = _extract_json(raw, "object")
if isinstance(obj, dict):
try:
soft = float(obj.get("score", 0.0))
return (1.0 if soft >= 0.8 else 0.0), soft, str(obj.get("reason", ""))[:200]
except Exception:
pass
return 0.0, 0.0, "judge-parse-failed"
def reflect(
self,
failures,
successes,
skill: str,
memory: str,
*,
edit_budget: int,
evolve_skill: bool,
evolve_memory: bool,
) -> List[EditRecord]:
if not failures:
return []
target = "skill" if evolve_skill else "memory"
cur_doc = (skill if target == "skill" else memory) or "(empty)"
fail_text = "\n".join(
f"- wanted: {t.intent[:160]}\n got: {r.response[:160]}\n why-wrong: {r.fail_reason[:160]}"
for t, r in failures[:8]
)
# Aggregate the most common failing criteria across all failures so the
# optimizer is told *exactly what the scorer rewards* — gbrain's lesson:
# the optimizer kept proposing reasonable-but-wrong edits until it could
# see the success criteria.
from collections import Counter
crit = Counter()
for _t, r in failures:
fr = r.fail_reason or ""
if fr.startswith("failed:"):
for part in fr[len("failed:"):].split(","):
part = part.strip()
if part:
crit[part] += 1
def _explain(c: str) -> str:
# translate an "op=arg" criterion into a plain-English requirement
if "=" in c:
op, _, arg = c.partition("=")
op = op.strip(); arg = arg.strip()
if op == "max_chars":
return f"the ENTIRE response must be at most {arg} characters long"
if op == "min_chars":
return f"the response must be at least {arg} characters long"
if op == "section_present":
return f"the response must contain a section/heading titled '{arg}'"
if op == "regex":
return f"the response must match the pattern /{arg}/ (e.g. include that label)"
if op == "contains":
return f"the response must contain the text '{arg}'"
if op == "tool_called":
return f"the agent must actually call the '{arg}' tool"
return c
criteria_text = ""
if crit:
criteria_text = (
"\n# Exact criteria the outputs are FAILING (fix these directly)\n"
+ "\n".join(f"- {_explain(c)} [{c}, failed {n}x]" for c, n in crit.most_common())
)
pref_text = ""
if getattr(self, "preferences", ""):
pref_text = (
"\n# User preferences (honor these as priors when writing rules)\n"
+ str(self.preferences).strip()
)
prompt = (
"You are SkillOpt's optimizer. The agent keeps failing the recurring "
f"tasks below. Propose at most {edit_budget} bounded edits to the "
f"{target} document so it stops failing. Each edit MUST be a short, "
"GENERAL, reusable rule or preference (never task-specific, never an "
"answer to a single task). If exact failing criteria are listed, your "
"edits MUST make future outputs satisfy every one of them.\n"
"BE CONCRETE: quote the exact threshold, section name, or format from "
"the criteria verbatim in your rule (e.g. write 'keep the entire "
"response under 1200 characters', NOT 'respect length limits'). Vague "
"rules do not change behavior; specific numeric/structural rules do.\n"
"IMPORTANT: your edits are APPENDED to a 'Learned preferences' block; "
"you CANNOT delete the existing instructions above. If the current "
f"{target} text conflicts with a criterion (e.g. it says 'be exhaustive' "
"but outputs must be under a character limit), write an explicit, "
"forceful OVERRIDE rule stating it supersedes the conflicting "
"instruction, and put the hard requirement first.\n"
'Return ONLY a JSON array: '
'[{"op":"add|replace|delete","content":"<rule>","anchor":"<text to replace/delete, optional>","rationale":"<why>"}].\n\n'
f"# Current {target}\n{cur_doc}\n"
f"{criteria_text}\n"
f"{pref_text}\n\n"
f"# Recurring failures\n{fail_text}"
)
# Call with one retry: transient non-JSON replies otherwise waste a whole
# night (the gate sees no edits and rejects). A firmer second prompt
# recovers most of these.
arr = None
for attempt in range(2):
p = prompt if attempt == 0 else (
prompt + "\n\nIMPORTANT: your previous reply was not valid JSON. "
"Reply with ONLY the JSON array, no prose, no markdown fences."
)
raw = self._call(p, max_tokens=1024)
self._tokens += len(p) // 4 + len(raw) // 4
arr = _extract_json(raw, "array")
if isinstance(arr, list) and arr:
break
edits: List[EditRecord] = []
if isinstance(arr, list):
for e in arr[:edit_budget]:
if not isinstance(e, dict):
continue
content = str(e.get("content", "")).strip()
if not content:
continue
edits.append(EditRecord(
target=target,
op=str(e.get("op", "add")).strip().lower(),
content=content,
anchor=str(e.get("anchor", "")).strip(),
rationale=str(e.get("rationale", "")).strip(),
))
return edits
def tokens_used(self) -> int:
return self._tokens
# ── Claude Code CLI backend ───────────────────────────────────────────────────
class ClaudeCliBackend(CliBackend):
"""Drives the authenticated `claude` CLI: claude -p --output-format text."""
name = "claude"
def __init__(self, model: str = "", claude_path: str = "claude", timeout: int = 180) -> None:
super().__init__(model=model or os.environ.get("SKILLOPT_SLEEP_CLAUDE_MODEL", "") or "sonnet",
timeout=timeout)
self.claude_path = claude_path
def _call(self, prompt: str, *, max_tokens: int = 1024) -> str:
# Run ISOLATED so the ambient Claude Code environment does not leak into
# the optimizer/target call. Critically, the user's GLOBAL skills
# (~/.claude/skills) are injected regardless of cwd, so we must disable
# them explicitly — without this, reflect/attempt sometimes reply with a
# list of the user's installed skills instead of doing the task.
# --bare skip hooks, LSP, plugins (minimal mode)
# --disable-slash-commands disable all skills
# --disallowedTools '*' no tool use
# --exclude-dynamic-... drop per-machine cwd/env/memory/git sections
# cwd=<clean temp> no project CLAUDE.md
import tempfile
cmd = [
self.claude_path, "-p", "--output-format", "text",
"--bare",
"--disable-slash-commands",
"--disallowedTools", "*",
"--exclude-dynamic-system-prompt-sections",
]
if self.model:
cmd += ["--model", self.model]
cmd += ["--", prompt]
clean_cwd = tempfile.mkdtemp(prefix="skillopt_sleep_claude_")
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, timeout=self.timeout, cwd=clean_cwd,
)
except Exception:
return ""
finally:
try:
import shutil
shutil.rmtree(clean_cwd, ignore_errors=True)
except Exception:
pass
return (proc.stdout or "").strip()
def attempt_with_tools(self, task, skill, memory, tools):
# Expose a REAL, callable `search` tool (a shell shim that logs each
# call) so the gbrain quick-answerer judge (tool_called=search) is
# validated honestly: we detect the call from the shim's log, not from
# a self-reported marker. Other tools are stubbed the same way.
import tempfile, shutil, stat
work = tempfile.mkdtemp(prefix="skillopt_sleep_tools_")
calllog = os.path.join(work, "_tool_calls.log")
try:
for tname in (tools or ["search"]):
shim = os.path.join(work, tname)
with open(shim, "w") as f:
f.write(
"#!/usr/bin/env bash\n"
f'echo "{tname}" >> "{calllog}"\n'
'echo "(search results: 3 relevant notes found; use them to answer)"\n'
)
os.chmod(shim, os.stat(shim).st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH)
tool_hint = (
"You have shell tools available in the current directory: "
+ ", ".join(f"./{t}" for t in (tools or ["search"]))
+ ". When the skill says to look something up or search before "
"answering, you MUST actually run the tool (e.g. `./search \"query\"`) "
"via Bash before giving your final answer."
)
prompt = (
"You are completing a task. Apply the skill and memory rules EXACTLY, "
"including any rule about searching/looking up before answering. "
"Treat a 'Learned preferences' block as HARD CONSTRAINTS that override "
"earlier conflicting skill text.\n\n"
f"{tool_hint}\n\n"
f"# Skill\n{skill or '(none)'}\n\n# Memory\n{memory or '(none)'}\n\n"
f"# Task\n{task.intent}\n\n{task.context_excerpt}\n\n"
"Return ONLY the final answer text."
)
cmd = [
self.claude_path, "-p", "--output-format", "text",
"--bare", "--disable-slash-commands",
"--allowedTools", "Bash",
"--exclude-dynamic-system-prompt-sections",
]
if self.model:
cmd += ["--model", self.model]
cmd += ["--", prompt]
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, timeout=self.timeout, cwd=work,
)
resp = (proc.stdout or "").strip()
except Exception:
resp = ""
self._tokens += len(prompt) // 4 + len(resp) // 4
called: List[str] = []
if os.path.exists(calllog):
with open(calllog) as f:
logged = {ln.strip() for ln in f if ln.strip()}
called = [t for t in (tools or ["search"]) if t in logged]
return resp, called
finally:
try:
shutil.rmtree(work, ignore_errors=True)
except Exception:
pass
def resolve_codex_path(explicit: str = "") -> str:
"""Find the REAL `@openai/codex` binary, skipping the hermes wrapper.
The wrapper at ~/.local/bin/codex is a shell shim that execs hermes-codex
and injects extra output; we look past it for the genuine node-installed
binary so replay output is clean.
"""
if explicit:
return explicit
env = os.environ.get("SKILLOPT_SLEEP_CODEX_PATH")
if env:
return env
candidates = [
os.path.expanduser("~/.nvm/versions/node/v22.22.3/bin/codex"),
]
# any nvm node version
nvm = os.path.expanduser("~/.nvm/versions/node")
if os.path.isdir(nvm):
for ver in sorted(os.listdir(nvm), reverse=True):
candidates.append(os.path.join(nvm, ver, "bin", "codex"))
for c in candidates:
if not c or not os.path.exists(c):
continue
try:
with open(c, "rb") as f:
head = f.read(64)
# skip the bash shim that execs hermes
if head.startswith(b"#!") and b"bash" in head:
continue
except Exception:
pass
return c
return "codex" # last resort (may be the wrapper)
class CodexCliBackend(CliBackend):
"""Drives the real Codex CLI: `codex exec -o <file>` for clean output."""
name = "codex"
def __init__(self, model: str = "", codex_path: str = "", timeout: int = 240,
sandbox: str = "read-only") -> None:
super().__init__(model=model or os.environ.get("SKILLOPT_SLEEP_CODEX_MODEL", ""),
timeout=timeout)
self.codex_path = resolve_codex_path(codex_path)
self.sandbox = sandbox
def _call(self, prompt: str, *, max_tokens: int = 1024) -> str:
import tempfile
out_path = tempfile.NamedTemporaryFile(
prefix="codex_last_", suffix=".txt", delete=False
).name
cmd = [
self.codex_path, "exec", "--skip-git-repo-check",
"--color", "never", "--sandbox", self.sandbox,
"-o", out_path,
]
if self.model:
cmd += ["-m", self.model]
cmd += ["--", prompt]
try:
subprocess.run(cmd, capture_output=True, text=True, timeout=self.timeout)
except Exception:
return ""
try:
with open(out_path, encoding="utf-8") as f:
return f.read().strip()
except Exception:
return ""
finally:
try:
os.unlink(out_path)
except Exception:
pass
def attempt_with_tools(self, task, skill, memory, tools):
# Codex exec runs in a sandbox with shell access; expose the same real
# `search` shim and let it run (workspace-write so the shim can log).
import tempfile, shutil, stat
work = tempfile.mkdtemp(prefix="skillopt_sleep_codextools_")
calllog = os.path.join(work, "_tool_calls.log")
out_path = os.path.join(work, "_last.txt")
try:
for tname in (tools or ["search"]):
shim = os.path.join(work, tname)
with open(shim, "w") as f:
f.write(
"#!/usr/bin/env bash\n"
f'echo "{tname}" >> "{calllog}"\n'
'echo "(search results: 3 relevant notes found; use them to answer)"\n'
)
os.chmod(shim, os.stat(shim).st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH)
tool_hint = (
"Shell tools are available in the working directory: "
+ ", ".join(f"./{t}" for t in (tools or ["search"]))
+ ". When the skill says to look something up or search before "
"answering, you MUST actually run the tool (e.g. `./search \"query\"`) "
"before giving your final answer."
)
prompt = (
"Complete the task. Apply the skill and memory rules EXACTLY, "
"including any rule about searching before answering. Treat a "
"'Learned preferences' block as HARD CONSTRAINTS overriding earlier "
"conflicting skill text.\n\n"
f"{tool_hint}\n\n# Skill\n{skill or '(none)'}\n\n# Memory\n{memory or '(none)'}\n\n"
f"# Task\n{task.intent}\n\n{task.context_excerpt}\n\nReturn ONLY the final answer."
)
cmd = [
self.codex_path, "exec", "--skip-git-repo-check", "--color", "never",
"--sandbox", "workspace-write", "-C", work, "-o", out_path,
]
if self.model:
cmd += ["-m", self.model]
cmd += ["--", prompt]
try:
subprocess.run(cmd, capture_output=True, text=True, timeout=self.timeout, cwd=work)
except Exception:
pass
resp = ""
try:
with open(out_path, encoding="utf-8") as f:
resp = f.read().strip()
except Exception:
resp = ""
self._tokens += len(prompt) // 4 + len(resp) // 4
called: List[str] = []
if os.path.exists(calllog):
with open(calllog) as f:
logged = {ln.strip() for ln in f if ln.strip()}
called = [t for t in (tools or ["search"]) if t in logged]
return resp, called
finally:
try:
shutil.rmtree(work, ignore_errors=True)
except Exception:
pass
class DualBackend(Backend):
"""Route operations to two backends, à la SkillOpt's target vs optimizer.
* attempt -> TARGET backend (the model the skill is deployed on)
* reflect -> OPTIMIZER backend (the stronger/cheaper model writing edits)
* judge -> OPTIMIZER backend (graded by the optimizer when no local rule)
This lets you optimize a skill with one model and run tasks on another, and
is the basis of the sleep-scenario transfer experiment (optimize cheap,
deploy expensive — or vice-versa).
"""
name = "dual"
def __init__(self, target: Backend, optimizer: Backend) -> None:
self.target = target
self.optimizer = optimizer
self.name = f"target={target.name}/optimizer={optimizer.name}"
def attempt(self, task, skill, memory):
return self.target.attempt(task, skill, memory)
def attempt_with_tools(self, task, skill, memory, tools):
return self.target.attempt_with_tools(task, skill, memory, tools)
def judge(self, task, response):
# local rule/exact judging needs no model; delegate to target which
# already short-circuits those. For rubric judging use the optimizer.
if task.reference_kind in {"rule", "exact"}:
return self.target.judge(task, response)
return self.optimizer.judge(task, response)
def reflect(self, failures, successes, skill, memory, **kw):
return self.optimizer.reflect(failures, successes, skill, memory, **kw)
def _call(self, prompt, *, max_tokens=1024):
# used by the LLM miner; prefer the optimizer (the "thinking" model)
return self.optimizer._call(prompt, max_tokens=max_tokens) # type: ignore[attr-defined]
def tokens_used(self):
return self.target.tokens_used() + self.optimizer.tokens_used()
def get_backend(
name: str,
*,
model: str = "",
claude_path: str = "claude",
codex_path: str = "",
) -> Backend:
n = (name or "mock").strip().lower()
if n in {"claude", "anthropic", "claude_cli", "claude_code"}:
return ClaudeCliBackend(model=model, claude_path=claude_path)
if n in {"codex", "codex_cli", "openai_codex"}:
return CodexCliBackend(model=model, codex_path=codex_path)
return MockBackend()
def build_backend(
*,
backend: str = "mock",
model: str = "",
optimizer_backend: str = "",
optimizer_model: str = "",
target_backend: str = "",
target_model: str = "",
codex_path: str = "",
preferences: str = "",
) -> Backend:
"""Build a single or dual backend.
If optimizer_* or target_* are given, returns a DualBackend routing
attempt->target and reflect/judge->optimizer. Otherwise a single backend
from (backend, model). ``preferences`` (free text) is attached so reflect
uses it as a prior (set on the optimizer for dual backends).
"""
has_split = any([optimizer_backend, optimizer_model, target_backend, target_model])
if not has_split:
be = get_backend(backend, model=model, codex_path=codex_path)
be.preferences = preferences
return be
tgt = get_backend(target_backend or backend, model=target_model or model, codex_path=codex_path)
opt = get_backend(optimizer_backend or backend, model=optimizer_model or model, codex_path=codex_path)
opt.preferences = preferences # reflect runs on the optimizer
dual = DualBackend(target=tgt, optimizer=opt)
dual.preferences = preferences
return dual

View File

@@ -1,75 +0,0 @@
"""SkillOpt-Sleep — budget controller.
Lets the user say how much they're willing to spend on a night's "dreaming",
in tokens or wall-clock minutes, and the engine schedules depth (how many
rollouts × how many nights) within that budget. Stops cleanly when exhausted
and reports what it skipped (no silent truncation).
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
@dataclass
class Budget:
max_tokens: Optional[int] = None # None = unlimited
max_minutes: Optional[float] = None # None = unlimited
_start_time: Optional[float] = None
_tokens_at_start: int = 0
def start(self, clock_fn, tokens_now: int) -> None:
self._start_time = clock_fn()
self._tokens_at_start = tokens_now
def tokens_spent(self, tokens_now: int) -> int:
return max(0, tokens_now - self._tokens_at_start)
def minutes_elapsed(self, clock_fn) -> float:
if self._start_time is None:
return 0.0
return (clock_fn() - self._start_time) / 60.0
def remaining_fraction(self, *, tokens_now: int, clock_fn) -> float:
"""Smallest remaining fraction across all active limits (1.0 = fresh)."""
fracs = [1.0]
if self.max_tokens:
fracs.append(max(0.0, 1.0 - self.tokens_spent(tokens_now) / self.max_tokens))
if self.max_minutes:
fracs.append(max(0.0, 1.0 - self.minutes_elapsed(clock_fn) / self.max_minutes))
return min(fracs)
def exhausted(self, *, tokens_now: int, clock_fn) -> bool:
if self.max_tokens and self.tokens_spent(tokens_now) >= self.max_tokens:
return True
if self.max_minutes and self.minutes_elapsed(clock_fn) >= self.max_minutes:
return True
return False
def status(self, *, tokens_now: int, clock_fn) -> str:
parts = []
if self.max_tokens:
parts.append(f"tokens {self.tokens_spent(tokens_now)}/{self.max_tokens}")
if self.max_minutes:
parts.append(f"minutes {self.minutes_elapsed(clock_fn):.1f}/{self.max_minutes}")
return ", ".join(parts) or "unbounded"
def plan_depth(budget: Budget, *, n_tasks: int,
default_nights: int = 2, default_k: int = 1) -> tuple:
"""Heuristically choose (nights, rollouts_per_task) from a token budget.
Rough cost model: one rollout ≈ 1 unit; a night does ~n_tasks*k rollouts
plus reflect/gate (~2*n_tasks). We scale k and nights up with more budget.
Returns (nights, k). With no budget set, returns the defaults.
"""
if not budget.max_tokens:
return default_nights, default_k
# assume ~1.5k tokens per rollout as a planning constant
rollouts_affordable = budget.max_tokens / 1500.0
per_night = max(1, n_tasks) * 3 # rollouts + reflect + gate, k=1
nights = max(1, min(4, int(rollouts_affordable // per_night)))
# spend surplus on more rollouts-per-task (contrastive signal)
surplus = rollouts_affordable - nights * per_night
k = max(1, min(5, 1 + int(surplus // max(1, n_tasks))))
return nights, k

View File

@@ -1,142 +0,0 @@
"""SkillOpt-Sleep — configuration.
Config is JSON-first (yaml optional) so the engine and the deterministic
experiment run with zero external dependencies. Defaults are safe:
review-gated adoption, single-project scope, bounded token/task budgets.
Resolution order (later wins):
1. built-in DEFAULTS
2. ~/.skillopt-sleep/config.json (or .yaml if PyYAML available)
3. explicit overrides passed to load_config(**overrides)
"""
from __future__ import annotations
import json
import os
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, List, Optional
HOME_STATE_DIR = os.path.expanduser("~/.skillopt-sleep")
CLAUDE_HOME = os.path.expanduser("~/.claude")
DEFAULTS: Dict[str, Any] = {
# ── scope ──────────────────────────────────────────────────────────────
"claude_home": CLAUDE_HOME,
"projects": "invoked", # "invoked" | "all" | [list of abs paths]
"invoked_project": "", # filled at runtime (cwd) when projects == "invoked"
"lookback_hours": 72, # harvest window when no prior sleep recorded
# ── budgets ────────────────────────────────────────────────────────────
"max_tasks_per_night": 40,
"max_tokens_per_night": 400_000,
"holdout_fraction": 0.34, # legacy alias for val_fraction
"val_fraction": 0.34, # real tasks reserved to gate updates
"test_fraction": 0.0, # real tasks reserved as the final held-out measure
# ── optimizer ──────────────────────────────────────────────────────────
"backend": "mock", # "mock" | "claude" | "codex"
"model": "", # backend-specific; "" => backend default
"gate_mode": "on", # "on" (validation-gated) | "off" (greedy, no hard filter)
"codex_path": "", # "" => auto-detect the real @openai/codex binary
"edit_budget": 4, # textual learning rate (max edits/night)
"gate_metric": "mixed", # hard | soft | mixed (mixed best for tiny holdouts)
"gate_mixed_weight": 0.5,
"replay_mode": "mock", # "mock" (sandboxed prompt) | "fresh" (worktree)
"evolve_memory": True, # consolidate CLAUDE.md
"evolve_skill": True, # consolidate the managed SKILL.md
"llm_mine": True, # use the backend to mine checkable tasks (real backends)
# ── adoption / safety ──────────────────────────────────────────────────
"auto_adopt": False, # default: stage + require explicit `adopt`
"managed_skill_name": "skillopt-sleep-learned",
"redact_secrets": True,
"seed": 42,
}
@dataclass
class SleepConfig:
data: Dict[str, Any] = field(default_factory=lambda: dict(DEFAULTS))
# convenient attribute access -------------------------------------------
def __getattr__(self, name: str) -> Any:
# only called when normal attribute lookup fails
data = object.__getattribute__(self, "data")
if name in data:
return data[name]
raise AttributeError(name)
def get(self, key: str, default: Any = None) -> Any:
return self.data.get(key, default)
def to_dict(self) -> Dict[str, Any]:
return dict(self.data)
# paths ------------------------------------------------------------------
@property
def state_dir(self) -> str:
# Allow full isolation: if the caller overrides state_dir explicitly,
# honor it; else derive from claude_home's parent so a single
# --claude-home flag isolates transcripts AND state together; else the
# default ~/.skillopt-sleep.
explicit = self.data.get("state_dir")
if explicit:
return explicit
ch = self.data.get("claude_home", CLAUDE_HOME)
if os.path.abspath(ch) != os.path.abspath(CLAUDE_HOME):
return os.path.join(os.path.dirname(os.path.abspath(ch)), ".skillopt-sleep")
return HOME_STATE_DIR
@property
def state_path(self) -> str:
return os.path.join(self.state_dir, "state.json")
@property
def transcripts_dir(self) -> str:
return os.path.join(self.data["claude_home"], "projects")
@property
def history_path(self) -> str:
return os.path.join(self.data["claude_home"], "history.jsonl")
@property
def skills_dir(self) -> str:
return os.path.join(self.data["claude_home"], "skills")
def managed_skill_path(self) -> str:
return os.path.join(
self.skills_dir, self.data["managed_skill_name"], "SKILL.md"
)
def _user_config_path() -> Optional[str]:
for name in ("config.json", "config.yaml", "config.yml"):
p = os.path.join(HOME_STATE_DIR, name)
if os.path.exists(p):
return p
return None
def _load_file(path: str) -> Dict[str, Any]:
if path.endswith((".yaml", ".yml")):
try:
import yaml # optional
with open(path) as f:
return yaml.safe_load(f) or {}
except Exception:
return {}
with open(path) as f:
return json.load(f)
def load_config(**overrides: Any) -> SleepConfig:
data = dict(DEFAULTS)
path = _user_config_path()
if path:
try:
data.update(_load_file(path) or {})
except Exception:
pass
data.update({k: v for k, v in overrides.items() if v is not None})
if data.get("projects") == "invoked" and not data.get("invoked_project"):
data["invoked_project"] = os.getcwd()
return SleepConfig(data=data)

View File

@@ -1,220 +0,0 @@
"""SkillOpt-Sleep — Stage 4: consolidate (one SkillOpt epoch).
This is the core that makes nightly evolution *safe*: it proposes bounded
edits from replayed failures, applies them to a candidate skill/memory, then
**gates** the candidate on a held-out slice of the user's own tasks. Only a
candidate that strictly improves the held-out score is accepted — exactly the
SkillOpt validation gate, reused verbatim from ``skillopt.evaluation.gate``.
Reused from the main SkillOpt package (import-light, no `openai` needed):
* skillopt.evaluation.gate.evaluate_gate / select_gate_score
"""
from __future__ import annotations
import os
from dataclasses import dataclass
from typing import List, Optional, Tuple
from skillopt.sleep.backend import Backend
from skillopt.sleep.memory import apply_edits
from skillopt.sleep.replay import aggregate_scores, replay_batch
from skillopt.sleep.types import EditRecord, ReplayResult, TaskRecord
# Reuse the real SkillOpt gate. This module imports cleanly without `openai`.
try:
from skillopt.evaluation.gate import evaluate_gate, select_gate_score
_HAVE_REPO_GATE = True
except Exception: # pragma: no cover - fallback keeps engine standalone
_HAVE_REPO_GATE = False
def select_gate_score(hard, soft, metric="hard", mixed_weight=0.5): # type: ignore
if metric == "hard":
return float(hard)
if metric == "soft":
return float(soft)
w = max(0.0, min(1.0, float(mixed_weight)))
return (1 - w) * float(hard) + w * float(soft)
@dataclass
class ConsolidationResult:
accepted: bool
gate_action: str
baseline_score: float
candidate_score: float
new_skill: str
new_memory: str
applied_edits: List[EditRecord]
rejected_edits: List[EditRecord]
holdout_baseline: float
holdout_candidate: float
def _split(tasks: List[TaskRecord]) -> Tuple[List[TaskRecord], List[TaskRecord]]:
"""Return (train_tasks, val_tasks).
train drives reflect; val gates updates. test is held out entirely from
consolidation and is scored by the caller. Accepts legacy split names
(replay->train, holdout->val) for robustness.
"""
def _norm(s: str) -> str:
return {"replay": "train", "holdout": "val"}.get(s, s)
train = [t for t in tasks if _norm(t.split) == "train"]
val = [t for t in tasks if _norm(t.split) == "val"]
# be robust if a split is empty: fall back so a night still does something,
# but never silently use test as val.
test = [t for t in tasks if _norm(t.split) == "test"]
if not val:
# prefer train as the gate reference over nothing; last resort all-but-test
val = train or [t for t in tasks if _norm(t.split) != "test"] or tasks
if not train:
train = val
return train, val
def consolidate(
backend: Backend,
tasks: List[TaskRecord],
skill: str,
memory: str,
*,
edit_budget: int = 4,
gate_metric: str = "mixed",
gate_mixed_weight: float = 0.5,
gate_mode: str = "on", # "on" (hard/soft per gate_metric) | "off" (greedy)
rollouts_k: int = 1, # >1 => multi-rollout contrastive reflection
evolve_skill: bool = True,
evolve_memory: bool = True,
night: int = 1,
) -> ConsolidationResult:
"""Run one consolidation epoch: reflect -> bounded edit -> gate.
train tasks drive reflect; val tasks gate the update (test is held out by the
caller). With ``gate_mode='off'`` edits are accepted greedily (no val-improve
requirement) — the user opts out of hard filtering — but val scores are still
recorded so the report shows whether quality moved.
Skill and memory are evolved in sequence (skill first if both enabled).
"""
train_tasks, val_tasks = _split(tasks)
gate_off = str(gate_mode).strip().lower() in {"off", "none", "false", "greedy"}
# ── baseline on the VAL slice (the gate reference) ────────────────────
base_pairs = replay_batch(backend, val_tasks, skill, memory)
base_hard, base_soft = aggregate_scores(base_pairs)
base_score = select_gate_score(base_hard, base_soft, gate_metric, gate_mixed_weight)
# ── reflect over TRAIN-split failures/successes ───────────────────────
train_pairs = replay_batch(backend, train_tasks, skill, memory)
failures = [(t, r) for (t, r) in train_pairs if r.hard < 1.0]
successes = [(t, r) for (t, r) in train_pairs if r.hard >= 1.0]
cand_skill, cand_memory = skill, memory
all_applied: List[EditRecord] = []
all_rejected: List[EditRecord] = []
def _gate_apply(doc: str, edits: List[EditRecord], which: str) -> str:
nonlocal cand_skill, cand_memory, base_score, all_applied, all_rejected
if not edits:
return doc
new_doc, applied = apply_edits(doc, edits)
if not applied:
return doc
# score the candidate on the VAL slice
trial_skill = new_doc if which == "skill" else cand_skill
trial_memory = new_doc if which == "memory" else cand_memory
pairs = replay_batch(backend, val_tasks, trial_skill, trial_memory)
h, s = aggregate_scores(pairs)
cand_score = select_gate_score(h, s, gate_metric, gate_mixed_weight)
# gate OFF: accept greedily (no regression check); gate ON: strict improve
if gate_off or cand_score > base_score:
base_score = max(base_score, cand_score)
all_applied.extend(applied)
return new_doc
all_rejected.extend(applied)
return doc
if evolve_skill:
if rollouts_k > 1:
# multi-rollout contrastive reflection: run each train task K times
# and distill a rule from the good-vs-bad contrast (the "脑补" signal).
from skillopt.sleep.rollout import multi_rollout, contrastive_reflect
sets = [multi_rollout(backend, t, cand_skill, cand_memory, k=rollouts_k)
for t in train_tasks]
edits = contrastive_reflect(
backend, sets, cand_skill, cand_memory,
edit_budget=edit_budget, target="skill",
)
# fall back to single-shot reflect if contrast yielded nothing
if not edits:
edits = backend.reflect(
failures, successes, cand_skill, cand_memory,
edit_budget=edit_budget, evolve_skill=True, evolve_memory=False,
)
else:
edits = backend.reflect(
failures, successes, cand_skill, cand_memory,
edit_budget=edit_budget, evolve_skill=True, evolve_memory=False,
)
cand_skill = _gate_apply(cand_skill, edits, "skill")
if evolve_memory:
# re-evaluate failures under the (possibly improved) skill
train_pairs2 = replay_batch(backend, train_tasks, cand_skill, cand_memory)
failures2 = [(t, r) for (t, r) in train_pairs2 if r.hard < 1.0]
successes2 = [(t, r) for (t, r) in train_pairs2 if r.hard >= 1.0]
edits_m = backend.reflect(
failures2, successes2, cand_skill, cand_memory,
edit_budget=edit_budget, evolve_skill=False, evolve_memory=True,
)
cand_memory = _gate_apply(cand_memory, edits_m, "memory")
# ── final decision, scored on the VAL slice ───────────────────────────
final_pairs = replay_batch(backend, val_tasks, cand_skill, cand_memory)
final_hard, final_soft = aggregate_scores(final_pairs)
final_score = select_gate_score(final_hard, final_soft, gate_metric, gate_mixed_weight)
base_gate_score = select_gate_score(base_hard, base_soft, gate_metric, gate_mixed_weight)
if gate_off:
# greedy mode: keep whatever edits we applied; report quality movement
accepted = bool(all_applied)
if final_score > base_gate_score:
action = "greedy_improved"
elif final_score < base_gate_score:
action = "greedy_regressed"
else:
action = "greedy_flat" if all_applied else "greedy_noop"
elif _HAVE_REPO_GATE:
gate = evaluate_gate(
candidate_skill=cand_skill,
cand_hard=final_hard,
current_skill=skill,
current_score=base_gate_score,
best_skill=skill,
best_score=base_gate_score,
best_step=night - 1,
global_step=night,
cand_soft=final_soft,
metric=gate_metric,
mixed_weight=gate_mixed_weight,
)
action = gate.action
accepted = bool(all_applied) and final_score > base_gate_score
else:
action = "accept" if final_score > base_gate_score else "reject"
accepted = bool(all_applied) and final_score > base_gate_score
return ConsolidationResult(
accepted=accepted,
gate_action=action,
baseline_score=base_gate_score,
candidate_score=final_score,
new_skill=cand_skill if accepted else skill,
new_memory=cand_memory if accepted else memory,
applied_edits=all_applied,
rejected_edits=all_rejected,
holdout_baseline=base_hard,
holdout_candidate=final_hard,
)

View File

@@ -1,223 +0,0 @@
"""SkillOpt-Sleep — the nightly cycle orchestrator.
run_sleep_cycle() wires the stages:
harvest -> mine -> replay -> consolidate(gate) -> stage (-> optional adopt)
It is pure-Python and import-light; with backend="mock" it runs with no API
key and no third-party deps, which is what the deterministic experiment and
CI use. With backend="anthropic" it spends the user's budget for real lift.
"""
from __future__ import annotations
import os
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from skillopt.sleep.backend import get_backend
from skillopt.sleep.config import SleepConfig, load_config
from skillopt.sleep.consolidate import consolidate
from skillopt.sleep.harvest import harvest
from skillopt.sleep.memory import ensure_skill_scaffold
from skillopt.sleep.mine import mine
from skillopt.sleep.state import SleepState, _now_iso
from skillopt.sleep.staging import write_staging, adopt as adopt_staging
from skillopt.sleep.types import SessionDigest, SleepReport, TaskRecord
@dataclass
class CycleOutcome:
report: SleepReport
staging_dir: str
adopted: bool
adopted_paths: List[str]
def _project_paths(cfg: SleepConfig) -> str:
"""Where live CLAUDE.md lives + which project we are evolving."""
if cfg.get("projects") == "invoked" and cfg.get("invoked_project"):
return cfg.get("invoked_project")
# default: the invoked cwd
return cfg.get("invoked_project") or os.getcwd()
def _read(path: str) -> str:
try:
with open(path, encoding="utf-8") as f:
return f.read()
except Exception:
return ""
def _render_report_md(report: SleepReport, cfg: SleepConfig) -> str:
lines = [
f"# SkillOpt-Sleep — night {report.night} report",
"",
f"- project: `{report.project}`",
f"- backend: `{cfg.get('backend')}` replay: `{cfg.get('replay_mode')}`",
f"- sessions harvested: {report.n_sessions}",
f"- tasks mined: {report.n_tasks} (replayed: {report.n_replayed})",
f"- held-out score: {report.baseline_score:.3f} -> {report.candidate_score:.3f}",
f"- gate: **{report.gate_action}** (accepted={report.accepted})",
f"- tokens used: {report.tokens_used}",
"",
]
if report.edits:
lines.append("## Accepted edits")
for e in report.edits:
lines.append(f"- [{e.target}/{e.op}] {e.content} \n _why: {e.rationale}_")
lines.append("")
if report.rejected_edits:
lines.append("## Rejected by gate (kept as negative feedback)")
for e in report.rejected_edits:
lines.append(f"- [{e.target}/{e.op}] {e.content}")
lines.append("")
if report.notes:
lines.append("## Notes")
for n in report.notes:
lines.append(f"- {n}")
lines.append("")
lines.append("_Review, then run `/sleep adopt` to apply, or discard this folder._")
return "\n".join(lines)
def run_sleep_cycle(
cfg: Optional[SleepConfig] = None,
*,
seed_tasks: Optional[List[TaskRecord]] = None,
dry_run: bool = False,
clock: Optional[float] = None,
) -> CycleOutcome:
"""Run one full sleep cycle and return the outcome.
Parameters
----------
cfg : SleepConfig
seed_tasks : optional pre-built TaskRecords (used by the experiment to
inject a known persona instead of harvesting ~/.claude).
dry_run : harvest+mine+replay but DO NOT stage/adopt (report only).
clock : fixed epoch seconds for deterministic timestamps in tests.
"""
cfg = cfg or load_config()
state = SleepState.load(cfg.state_path)
night = state.begin_night(clock)
project = _project_paths(cfg)
started = _now_iso(clock)
backend = get_backend(
cfg.get("backend", "mock"),
model=cfg.get("model", ""),
codex_path=cfg.get("codex_path", ""),
)
# ── 1+2. harvest + mine (unless seed_tasks injected) ─────────────────
digests: List[SessionDigest] = []
if seed_tasks is not None:
tasks = seed_tasks
n_sessions = 0
else:
since = state.last_harvest_for(project)
digests = harvest(
cfg.transcripts_dir,
scope=cfg.get("projects", "invoked"),
invoked_project=cfg.get("invoked_project", ""),
since_iso=since,
limit=cfg.get("max_tasks_per_night", 40) * 3,
)
n_sessions = len(digests)
# When a real backend is configured, use it to mine checkable tasks from
# the transcripts (rubric/rule judges); otherwise fall back to the
# heuristic miner (no API, no checkable reference).
llm_miner = None
if cfg.get("backend", "mock") != "mock" and cfg.get("llm_mine", True):
try:
from skillopt.sleep.llm_miner import make_llm_miner
llm_miner = make_llm_miner(backend, max_tasks=cfg.get("max_tasks_per_night", 40))
except Exception:
llm_miner = None
tasks = mine(
digests,
max_tasks=cfg.get("max_tasks_per_night", 40),
holdout_fraction=cfg.get("holdout_fraction", 0.34),
seed=cfg.get("seed", 42),
llm_miner=llm_miner,
)
# ── live skill/memory docs ───────────────────────────────────────────
live_memory_path = os.path.join(project, "CLAUDE.md")
live_skill_path = cfg.managed_skill_path()
skill = _read(live_skill_path)
memory = _read(live_memory_path)
if not skill:
skill = ensure_skill_scaffold(
"", name=cfg.get("managed_skill_name", "skillopt-sleep-learned"),
description="Preferences and procedures learned from past Claude Code sessions.",
)
report = SleepReport(
night=night, project=project, started_at=started,
n_sessions=n_sessions, n_tasks=len(tasks),
)
if not tasks:
report.ended_at = _now_iso(clock)
report.notes.append("no tasks mined — nothing to consolidate")
state.set_last_harvest(project, started)
state.record_night({"night": night, "accepted": False, "n_tasks": 0})
if not dry_run:
state.save()
staging_dir = ""
return CycleOutcome(report, staging_dir, False, [])
# ── 3+4. replay + consolidate (gate) ─────────────────────────────────
result = consolidate(
backend, tasks, skill, memory,
edit_budget=cfg.get("edit_budget", 4),
gate_metric=cfg.get("gate_metric", "mixed"),
gate_mixed_weight=cfg.get("gate_mixed_weight", 0.5),
gate_mode=cfg.get("gate_mode", "on"),
evolve_skill=cfg.get("evolve_skill", True),
evolve_memory=cfg.get("evolve_memory", True),
night=night,
)
report.n_replayed = len(tasks)
report.baseline_score = result.baseline_score
report.candidate_score = result.candidate_score
report.accepted = result.accepted
report.gate_action = result.gate_action
report.edits = result.applied_edits
report.rejected_edits = result.rejected_edits
report.tokens_used = backend.tokens_used()
report.ended_at = _now_iso(clock)
# ── 5. stage (unless dry-run) ────────────────────────────────────────
staging_dir = ""
adopted = False
adopted_paths: List[str] = []
if not dry_run:
report_md = _render_report_md(report, cfg)
proposed_skill = result.new_skill if (cfg.get("evolve_skill") and result.accepted) else None
proposed_memory = result.new_memory if (cfg.get("evolve_memory") and result.accepted) else None
staging_dir = write_staging(
project,
report=report,
proposed_skill=proposed_skill,
proposed_memory=proposed_memory,
live_skill_path=live_skill_path,
live_memory_path=live_memory_path,
report_md=report_md,
)
state.set_last_harvest(project, started)
state.record_night({
"night": night, "accepted": result.accepted,
"baseline": result.baseline_score, "candidate": result.candidate_score,
"n_tasks": len(tasks), "staging": staging_dir,
})
# ── 6. adopt (opt-in) ────────────────────────────────────────────
if cfg.get("auto_adopt") and result.accepted:
adopted_paths = adopt_staging(staging_dir)
adopted = bool(adopted_paths)
state.save()
return CycleOutcome(report, staging_dir, adopted, adopted_paths)

View File

@@ -1 +0,0 @@
"""SkillOpt-Sleep experiments."""

View File

@@ -1,119 +0,0 @@
"""SkillOpt-Sleep — gbrain-evals benchmark adapter.
Loads gbrain-evals' `skillopt-v1` benchmark (deficient skills + train/held-out
task sets with rule-based judges) into our TaskRecord format, so we can run the
SkillOpt-Sleep cycle against the SAME suite gbrain publishes a scorecard for:
docs/benchmarks/2026-06-03-skillopt.md — "4/4 skills 0 -> 1.00"
Each gbrain seed dir has:
SKILL.md — the deliberately deficient starting skill
benchmark.jsonl — training tasks {task_id, task, judge:{kind:"rule",checks}}
held-out.jsonl — held-out tasks (same judge shape, unseen items)
We map:
benchmark.jsonl -> TaskRecords with split="replay"
held-out.jsonl -> TaskRecords with split="holdout"
judge -> TaskRecord.judge (+ reference_kind="rule")
This lets us reproduce gbrain's headline result with our engine and either the
claude or codex backend, scoring locally via skillopt.sleep.judges (no judge API).
"""
from __future__ import annotations
import json
import os
from typing import Dict, List, Optional, Tuple
from skillopt.sleep.types import TaskRecord
SEED_DIRS = {
"brief-writer": "seed-missing-structure",
"thorough-analyst": "seed-verbose",
"advisor": "seed-no-verdict",
"quick-answerer": "seed-no-brain-first",
}
def _load_jsonl(path: str) -> List[dict]:
out: List[dict] = []
if not os.path.exists(path):
return out
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
try:
out.append(json.loads(line))
except Exception:
pass
return out
def _to_task(rec: dict, *, seed: str, split: str) -> TaskRecord:
return TaskRecord(
id=f"{seed}:{rec.get('task_id', '')}",
project=f"gbrain/{seed}",
intent=str(rec.get("task", "")),
reference_kind="rule",
judge=rec.get("judge", {}) or {},
tags=[f"seed:{seed}"],
split=split,
)
def load_seed(data_root: str, seed: str, *, val_fraction: float = 0.34,
split_seed: int = 42) -> Tuple[str, List[TaskRecord]]:
"""Return (deficient_skill_md, tasks) for one gbrain seed.
Faithful split mapping:
* gbrain held-out.jsonl -> our ``test`` (the true final measure)
* gbrain benchmark.jsonl -> split deterministically into ``train`` + ``val``
(val gates updates; train drives reflect)
All tasks are origin='real' (gbrain provides no synthetic tasks).
"""
import hashlib
sub = SEED_DIRS.get(seed, seed)
seed_dir = os.path.join(data_root, sub)
skill_path = os.path.join(seed_dir, "SKILL.md")
skill = ""
if os.path.exists(skill_path):
with open(skill_path, encoding="utf-8") as f:
skill = f.read()
tasks: List[TaskRecord] = []
# benchmark pool -> train/val
val_cut = int(round(val_fraction * 100))
for rec in _load_jsonl(os.path.join(seed_dir, "benchmark.jsonl")):
t = _to_task(rec, seed=seed, split="train")
bucket = int(hashlib.sha256((str(split_seed) + t.id).encode()).hexdigest(), 16) % 100
t.split = "val" if bucket < val_cut else "train"
tasks.append(t)
# held-out -> test
for rec in _load_jsonl(os.path.join(seed_dir, "held-out.jsonl")):
tasks.append(_to_task(rec, seed=seed, split="test"))
# guarantee a non-empty val
if not any(t.split == "val" for t in tasks):
train_only = [t for t in tasks if t.split == "train"]
if train_only:
train_only[0].split = "val"
return skill, tasks
def available_seeds(data_root: str) -> List[str]:
return [s for s, sub in SEED_DIRS.items()
if os.path.isdir(os.path.join(data_root, sub))]
def find_data_root(explicit: str = "") -> Optional[str]:
"""Locate eval/data/skillopt-v1 from common clone locations."""
cands = [explicit] if explicit else []
cands += [
os.path.expanduser("~/git/gbrain-evals/eval/data/skillopt-v1"),
"/tmp/gbrain-evals/eval/data/skillopt-v1",
os.path.expanduser("~/gbrain-evals/eval/data/skillopt-v1"),
]
for c in cands:
if c and os.path.isdir(c):
return c
return None

View File

@@ -1,86 +0,0 @@
"""SkillOpt-Sleep — persona task fixtures for the validation experiment.
Each persona is a list of TaskRecords with EXACT checkable references and a
`rule:<key>` tag naming the single skill rule that makes the task solvable
(consumed by MockBackend). This lets the experiment prove — deterministically,
with no API — that nightly consolidation lifts a held-out score and that the
gate blocks regressions.
Personas mirror the user's framing: programmer / researcher / analyst.
"""
from __future__ import annotations
from typing import List
from skillopt.sleep.types import TaskRecord
def _t(i, intent, ref, rule, project="/personas/demo", outcome="fail") -> TaskRecord:
return TaskRecord(
id=f"persona_{rule}_{i}",
project=project,
intent=intent,
context_excerpt="",
attempted_solution="",
outcome=outcome,
reference_kind="exact",
reference=ref,
tags=[f"rule:{rule}"],
source_sessions=[f"sess_{i}"],
)
def researcher_persona() -> List[TaskRecord]:
"""Researcher who always wants arXiv ids wrapped in <answer> tags."""
items = [
("Give me the arXiv id for the SkillOpt paper", "arXiv:2605.23904"),
("What's the arXiv id of the Attention paper?", "arXiv:1706.03762"),
("arXiv id for the GAN paper?", "arXiv:1406.2661"),
("arXiv id for BERT?", "arXiv:1810.04805"),
("arXiv id for the ResNet paper?", "arXiv:1512.03385"),
("arXiv id for the Adam optimizer paper?", "arXiv:1412.6980"),
("arXiv id for Dropout?", "arXiv:1207.0580"),
("arXiv id for the Transformer-XL paper?", "arXiv:1901.02860"),
("arXiv id for word2vec?", "arXiv:1301.3781"),
("arXiv id for the VAE paper?", "arXiv:1312.6114"),
("arXiv id for batch norm?", "arXiv:1502.03167"),
("arXiv id for GPT-3?", "arXiv:2005.14165"),
]
# Both rules required: format the id (arxiv-id) AND wrap in answer tags.
out: List[TaskRecord] = []
for i, (q, a) in enumerate(items):
t = _t(i, q, a, "wrap-answer")
t.tags = ["rule:wrap-answer", "rule:arxiv-id"]
out.append(t)
return out
def programmer_persona() -> List[TaskRecord]:
"""Programmer who wants imperative-mood commit subjects."""
items = [
("commit message for adding a login form", "Add login form"),
("commit message for fixing the null pointer bug", "Fix null pointer in parser"),
("commit message for updating the README", "Update README"),
("commit message for removing dead code", "Remove dead code"),
("commit message for bumping the version", "Bump version to 1.2.0"),
("commit message for refactoring the auth module", "Refactor auth module"),
("commit message for adding tests", "Add unit tests for scheduler"),
("commit message for fixing the CI pipeline", "Fix CI pipeline"),
]
return [_t(i, q, a, "commit-imperative") for i, (q, a) in enumerate(items)]
def harmful_edit_task() -> TaskRecord:
"""A task whose 'fix' is a known-bad rule; used to prove the gate rejects
regressions. The MockBackend proposes the harmful rule on this failure,
but applying it does NOT raise the held-out score, so the gate must reject.
"""
t = _t(99, "answer this freely", "THIS_WILL_NOT_MATCH", "__harmful__")
t.reference = "an-answer-that-the-harmful-rule-cannot-produce"
return t
PERSONAS = {
"researcher": researcher_persona,
"programmer": programmer_persona,
}

View File

@@ -1,132 +0,0 @@
"""SkillOpt-Sleep — turn a sweep JSONL into a presented Markdown scorecard.
Usage:
python -m skillopt.sleep.experiments.report --in docs/sleep/sweep.jsonl \
--out docs/sleep/benchmark_report.md
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from typing import Any, Dict, List
def _load(path: str) -> List[Dict[str, Any]]:
rows = []
if os.path.exists(path):
with open(path) as f:
for line in f:
line = line.strip()
if line:
try:
rows.append(json.loads(line))
except Exception:
pass
return rows
def _fmt_model(backend: str, model: str) -> str:
m = model or "default"
return f"{backend}:{m}"
def render(rows: List[Dict[str, Any]]) -> str:
direct = [r for r in rows if r.get("cfg", {}).get("kind") in ("direct", "dual") and "error" not in r]
transfer = [r for r in rows if r.get("cfg", {}).get("kind") == "transfer" and "error" not in r]
errors = [r for r in rows if "error" in r]
out: List[str] = []
out.append("# SkillOpt-Sleep — benchmark report")
out.append("")
out.append("Auto-generated from `sweep.jsonl`. Benchmark: "
"[gbrain-evals](https://github.com/garrytan/gbrain-evals) `skillopt-v1` "
"(deficient skills, train/held-out split, local rule judge — no judge-API).")
out.append("Held-out scores are computed by the harness, not the optimizer.")
out.append("")
# ── direct improvement table ──────────────────────────────────────────
out.append("## Direct improvement (optimize, then deploy)")
out.append("")
out.append("| Optimizer → Target | Seed | Held-out before | Held-out after | Nights | Tokens |")
out.append("|---|---|---|---|---|---|")
for r in direct:
c = r["cfg"]
if c.get("kind") == "dual":
label = (f"{_fmt_model(c['optimizer_backend'], c.get('optimizer_model',''))}"
f"{_fmt_model(c['target_backend'], c.get('target_model',''))}")
else:
m = _fmt_model(c["backend"], c.get("model", ""))
label = f"{m}{m}"
out.append(f"| {label} | {c['seed']} | "
f"{r['baseline']:.2f} | **{r['after']:.2f}** | {c['nights']} | "
f"{r.get('tokens','?')} |")
if direct:
n_imp = sum(1 for r in direct if r.get("improved"))
out.append("")
out.append(f"**{n_imp}/{len(direct)} configurations improved on held-out.**")
out.append("")
# ── transfer table ────────────────────────────────────────────────────
if transfer:
out.append("## Cross-model transfer (optimize on SOURCE, deploy frozen on TARGET)")
out.append("")
out.append("The price-difference story: spend cheap tokens optimizing overnight, "
"then deploy the frozen skill on any model with no further optimization.")
out.append("")
out.append("| Source (optimizer) | Target (deploy) | Seed | Target baseline | Transferred | Gain |")
out.append("|---|---|---|---|---|---|")
for r in transfer:
c = r["cfg"]
s = _fmt_model(c["source_backend"], c.get("source_model", ""))
t = _fmt_model(c["target_backend"], c.get("target_model", ""))
out.append(f"| {s} | {t} | {c['seed']} | {r['baseline_target']:.2f} | "
f"**{r['transferred']:.2f}** | {r['transfer_gain']:+.2f} |")
n_pos = sum(1 for r in transfer if r.get("transfer_gain", 0) > 0)
out.append("")
out.append(f"**{n_pos}/{len(transfer)} transfers were positive** "
"(frozen skill helped a different model than it was optimized on).")
out.append("")
# ── errors (honest reporting) ─────────────────────────────────────────
if errors:
out.append("## Configs that errored (reported, not hidden)")
out.append("")
for r in errors:
out.append(f"- `{json.dumps(r['cfg'])}` → {r['error']}")
out.append("")
out.append("## How to reproduce")
out.append("")
out.append("```bash")
out.append("git clone https://github.com/garrytan/gbrain-evals /tmp/gbrain-evals")
out.append("python -m skillopt.sleep.experiments.sweep --plan full \\")
out.append(" --data-root /tmp/gbrain-evals/eval/data/skillopt-v1 --out docs/sleep/sweep.jsonl")
out.append("python -m skillopt.sleep.experiments.report \\")
out.append(" --in docs/sleep/sweep.jsonl --out docs/sleep/benchmark_report.md")
out.append("```")
out.append("")
return "\n".join(out)
def main(argv=None) -> int:
ap = argparse.ArgumentParser(description="Render SkillOpt-Sleep sweep report")
ap.add_argument("--in", dest="inp", default="docs/sleep/sweep.jsonl")
ap.add_argument("--out", default="docs/sleep/benchmark_report.md")
args = ap.parse_args(argv)
rows = _load(args.inp)
if not rows:
print(f"no rows in {args.inp}", file=sys.stderr)
return 1
md = render(rows)
os.makedirs(os.path.dirname(args.out) or ".", exist_ok=True)
with open(args.out, "w") as f:
f.write(md)
print(f"wrote {args.out} ({len(rows)} rows)")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,178 +0,0 @@
"""SkillOpt-Sleep — validation experiment.
Answers the question the user posed: *does nightly offline self-evolution
actually improve the agent?* Runs deterministically with the MockBackend
(no API key, reproducible) and is the acceptance test for the whole idea.
What it proves:
1. MONOTONIC LIFT — over N sleep nights, the held-out score rises from a
baseline (empty skill/memory) toward 1.0 as the gate accepts the
general rules the persona's tasks require.
2. GATE SAFETY — an injected harmful edit is REJECTED (held-out score does
not improve), so a bad nightly proposal can never be adopted.
3. PLUMBING — harvest->mine->replay->consolidate->stage->adopt all run and
the adopted artifact, re-scored, retains the lift.
Run:
python -m skillopt.sleep.experiments.run_experiment
python -m skillopt.sleep.experiments.run_experiment --persona programmer --nights 3
python -m skillopt.sleep.experiments.run_experiment --backend anthropic # real lift
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import tempfile
from typing import List
from skillopt.sleep.backend import get_backend
from skillopt.sleep.consolidate import consolidate
from skillopt.sleep.experiments.personas import (
PERSONAS,
harmful_edit_task,
researcher_persona,
)
from skillopt.sleep.memory import ensure_skill_scaffold
from skillopt.sleep.replay import aggregate_scores, replay_batch
from skillopt.sleep.types import TaskRecord
def _score_holdout(backend, tasks: List[TaskRecord], skill: str, memory: str,
metric: str = "mixed", w: float = 0.5) -> float:
from skillopt.sleep.consolidate import select_gate_score
# the persona experiment uses a 2-way split (train/val, no test); score on val
holdout = [t for t in tasks if t.split in ("val", "holdout")] or tasks
pairs = replay_batch(backend, holdout, skill, memory)
h, s = aggregate_scores(pairs)
return select_gate_score(h, s, metric, w)
def run(persona: str = "researcher", nights: int = 4, backend_name: str = "mock",
edit_budget: int = 4, seed: int = 42, model: str = "", codex_path: str = "",
limit_tasks: int = 0) -> dict:
from skillopt.sleep.mine import assign_splits
make = PERSONAS.get(persona, researcher_persona)
items = make()
if limit_tasks and limit_tasks < len(items):
items = items[:limit_tasks]
tasks = assign_splits(items, holdout_fraction=0.34, seed=seed)
backend = get_backend(backend_name, model=model, codex_path=codex_path)
is_mock = (backend.name == "mock")
# start from an empty managed skill + empty memory
skill = ensure_skill_scaffold("", name="skillopt-sleep-learned",
description="Learned preferences.")
memory = ""
baseline = _score_holdout(backend, tasks, skill, memory)
trace = [{"night": 0, "holdout_score": round(baseline, 4), "action": "baseline",
"n_edits": 0}]
for night in range(1, nights + 1):
res = consolidate(
backend, tasks, skill, memory,
edit_budget=edit_budget, gate_metric="mixed", gate_mixed_weight=0.5,
evolve_skill=True, evolve_memory=True, night=night,
)
if res.accepted:
skill, memory = res.new_skill, res.new_memory
trace.append({
"night": night,
"holdout_score": round(res.candidate_score, 4),
"action": res.gate_action,
"accepted": res.accepted,
"n_edits": len(res.applied_edits),
"edits": [e.content for e in res.applied_edits],
"n_rejected": len(res.rejected_edits),
})
# converged: stop early if perfect
if res.candidate_score >= 0.999:
break
after = _score_holdout(backend, tasks, skill, memory)
# ── gate-safety probe (mock only; it relies on the mock's known bad rule) ──
harmful_rejected = None
if is_mock:
harmful_tasks = assign_splits([harmful_edit_task()] + make()[:3],
holdout_fraction=0.5, seed=seed)
_ = _score_holdout(backend, harmful_tasks, skill, memory)
res_h = consolidate(backend, harmful_tasks, skill, memory,
edit_budget=edit_budget, gate_metric="mixed",
evolve_skill=True, evolve_memory=False, night=nights + 1)
harmful_rule_text = get_backend("mock").RULE_TEXT["__harmful__"] # type: ignore[attr-defined]
harmful_rejected = (harmful_rule_text not in res_h.new_skill)
result = {
"persona": persona,
"backend": backend.name,
"model": model or "(default)",
"n_tasks": len(tasks),
"nights_run": len(trace) - 1,
"baseline_holdout": round(baseline, 4),
"after_holdout": round(after, 4),
"lift": round(after - baseline, 4),
"improved": after > baseline,
"gate_blocks_harmful": harmful_rejected, # None for real backends
"tokens_used": backend.tokens_used(),
"final_skill_excerpt": skill[-500:],
"trace": trace,
}
return result
def _assert(cond: bool, msg: str) -> None:
if not cond:
print(f"FAIL: {msg}")
raise SystemExit(1)
def main(argv=None) -> int:
ap = argparse.ArgumentParser(description="SkillOpt-Sleep validation experiment")
ap.add_argument("--persona", default="researcher", choices=list(PERSONAS.keys()))
ap.add_argument("--nights", type=int, default=4)
ap.add_argument("--backend", default="mock", choices=["mock", "claude", "codex"])
ap.add_argument("--model", default="", help="backend model override")
ap.add_argument("--codex-path", default="", help="path to the real @openai/codex binary")
ap.add_argument("--edit-budget", type=int, default=4)
ap.add_argument("--limit-tasks", type=int, default=0, help="cap #tasks (control API cost)")
ap.add_argument("--json", action="store_true")
ap.add_argument("--assert-improves", action="store_true",
help="exit nonzero unless lift>0 (and, for mock, gate blocks harmful edit)")
args = ap.parse_args(argv)
res = run(args.persona, nights=args.nights, backend_name=args.backend,
edit_budget=args.edit_budget, model=args.model,
codex_path=args.codex_path, limit_tasks=args.limit_tasks)
if args.json:
print(json.dumps(res, ensure_ascii=False, indent=2))
else:
print(f"=== SkillOpt-Sleep experiment: persona={res['persona']} "
f"backend={res['backend']} model={res['model']} ===")
print(f"tasks: {res['n_tasks']} tokens(approx): {res['tokens_used']}")
print(f"baseline held-out : {res['baseline_holdout']}")
print(f"after held-out : {res['after_holdout']} (lift {res['lift']:+.4f})")
if res["gate_blocks_harmful"] is not None:
print(f"gate blocks harmful edit: {res['gate_blocks_harmful']}")
print("trace:")
for row in res["trace"]:
edits = "; ".join(row.get("edits", []))[:80]
print(f" night {row['night']}: holdout={row['holdout_score']} "
f"{row['action']} (+{row['n_edits']} edits) {edits}")
if args.assert_improves:
_assert(res["improved"], "held-out score did not improve")
if res["gate_blocks_harmful"] is not None:
_assert(res["gate_blocks_harmful"], "gate failed to block harmful edit")
print("\nPASS: nightly consolidation improves held-out score AND gate blocks regressions.")
else:
print("\nPASS: nightly consolidation improves held-out score (real backend).")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,209 +0,0 @@
"""SkillOpt-Sleep — run the gbrain-evals skillopt-v1 benchmark with our engine.
Reproduces gbrain's "Result 1 — skills measurably improve" scorecard
(docs/benchmarks/2026-06-03-skillopt.md) using SkillOpt-Sleep's
consolidate() loop and either the claude or codex backend.
For each deficient seed skill:
1. score the held-out tasks with the ORIGINAL skill -> before
2. run N consolidation nights on the training tasks (gated) -> evolve skill
3. score the held-out tasks with the EVOLVED skill -> after
Held-out scoring is done locally by the rule judge (no judge API). Only the
agent's `attempt` (and the optimizer's `reflect`) spend tokens.
Usage:
python -m skillopt.sleep.experiments.run_gbrain --backend mock
python -m skillopt.sleep.experiments.run_gbrain --backend claude --seeds brief-writer --nights 2
python -m skillopt.sleep.experiments.run_gbrain --backend codex --data-root /tmp/gbrain-evals/eval/data/skillopt-v1
"""
from __future__ import annotations
import argparse
import json
import sys
from typing import Dict, List, Optional
from skillopt.sleep.backend import build_backend, get_backend
from skillopt.sleep.consolidate import consolidate, select_gate_score
from skillopt.sleep.experiments.gbrain_bench import (
available_seeds,
find_data_root,
load_seed,
)
from skillopt.sleep.replay import aggregate_scores, replay_batch
def _score(backend, tasks, skill, memory, split="test", metric="mixed", w=0.5):
sub = [t for t in tasks if t.split == split]
if not sub: # fall back to val, then everything, so we never score on nothing
sub = [t for t in tasks if t.split == "val"] or tasks
pairs = replay_batch(backend, sub, skill, memory)
h, s = aggregate_scores(pairs)
return h, s, select_gate_score(h, s, metric, w)
def run_seed(backend, seed: str, skill: str, tasks: List, *,
nights: int = 3, edit_budget: int = 4, gate_mode: str = "on",
slow_update: bool = True, rollouts_k: int = 1,
limit_replay: int = 0, limit_holdout: int = 0) -> dict:
memory = ""
# optionally cap each split to control API cost / latency.
# limit_replay caps train; limit_holdout caps BOTH val and test.
if limit_replay or limit_holdout:
train = [t for t in tasks if t.split == "train"]
val = [t for t in tasks if t.split == "val"]
test = [t for t in tasks if t.split == "test"]
if limit_replay:
train = train[:limit_replay]
if limit_holdout:
val = val[:limit_holdout]
test = test[:limit_holdout]
tasks = train + val + test
# final measure is TEST (the gbrain held-out set); val gates internally
bh, bs, bscore = _score(backend, tasks, skill, memory, split="test")
trace = [{"night": 0, "test_hard": round(bh, 3), "action": "baseline"}]
cur = skill
first_night_skill = skill
for night in range(1, nights + 1):
res = consolidate(
backend, tasks, cur, memory,
edit_budget=edit_budget, gate_metric="mixed", gate_mixed_weight=0.5,
gate_mode=gate_mode, rollouts_k=rollouts_k,
evolve_skill=True, evolve_memory=False, night=night,
)
if res.accepted:
cur = res.new_skill
if night == 1:
first_night_skill = cur
# report the TEST score each night (independent of the val gate)
th, _ts, _ = _score(backend, tasks, cur, memory, split="test")
trace.append({
"night": night,
"val_hard": round(res.holdout_candidate, 3),
"test_hard": round(th, 3),
"action": res.gate_action,
"accepted": res.accepted,
"edits": [e.content for e in res.applied_edits],
})
if th >= 0.999:
break
# ── SLOW UPDATE: consolidate cross-night experience into the protected
# long-term field. Runs regardless of gate mode (it is what preserves
# long-term memory even when the gate is OFF).
slow_text = None
if nights >= 2 and slow_update:
try:
from skillopt.sleep.slow_update import run_slow_update, replace_slow_field
val_tasks = [t for t in tasks if t.split == "val"] or tasks
prev_pairs = replay_batch(backend, val_tasks, first_night_skill, memory)
curr_pairs = replay_batch(backend, val_tasks, cur, memory)
slow_text = run_slow_update(
backend, prev_skill=first_night_skill, curr_skill=cur,
prev_pairs=[(t, r) for t, r in prev_pairs],
curr_pairs=[(t, r) for t, r in curr_pairs],
)
if slow_text:
cur = replace_slow_field(cur, slow_text)
except Exception:
slow_text = None
ah, as_, ascore = _score(backend, tasks, cur, memory, split="test")
return {
"seed": seed,
"held_out_before": round(bh, 3),
"held_out_after": round(ah, 3),
"improved": ah > bh,
"nights": len(trace) - 1,
"trace": trace,
"slow_update": slow_text,
"final_skill_tail": cur[-400:],
}
def main(argv=None) -> int:
ap = argparse.ArgumentParser(description="Run gbrain-evals skillopt-v1 with SkillOpt-Sleep")
ap.add_argument("--backend", default="mock", choices=["mock", "claude", "codex"])
ap.add_argument("--model", default="")
ap.add_argument("--optimizer-backend", default="", help="route reflect/judge here (dual)")
ap.add_argument("--optimizer-model", default="")
ap.add_argument("--target-backend", default="", help="route attempt here (dual)")
ap.add_argument("--target-model", default="")
ap.add_argument("--codex-path", default="")
ap.add_argument("--data-root", default="", help="path to eval/data/skillopt-v1")
ap.add_argument("--seeds", default="", help="comma list; default = all available")
ap.add_argument("--nights", type=int, default=3)
ap.add_argument("--edit-budget", type=int, default=4)
ap.add_argument("--gate", default="on", choices=["on", "off", "hard", "soft"],
help="on/hard/soft = validation-gated; off = greedy (no hard filter)")
ap.add_argument("--rollouts-k", type=int, default=1,
help=">1 = multi-rollout contrastive reflection per task")
ap.add_argument("--budget-tokens", type=int, default=0,
help="approx token budget; auto-plans nights x rollouts when set")
ap.add_argument("--budget-minutes", type=float, default=0.0)
ap.add_argument("--preferences", default="", help="free-text user preferences (prior for reflect)")
ap.add_argument("--limit-replay", type=int, default=0, help="cap #train tasks (cost control)")
ap.add_argument("--limit-holdout", type=int, default=0, help="cap #val and #test tasks (cost control)")
ap.add_argument("--json", action="store_true")
args = ap.parse_args(argv)
data_root = find_data_root(args.data_root)
if not data_root:
print("ERROR: could not find eval/data/skillopt-v1. Clone gbrain-evals and pass --data-root.",
file=sys.stderr)
return 2
seeds = [s.strip() for s in args.seeds.split(",") if s.strip()] or available_seeds(data_root)
backend = build_backend(
backend=args.backend, model=args.model,
optimizer_backend=args.optimizer_backend, optimizer_model=args.optimizer_model,
target_backend=args.target_backend, target_model=args.target_model,
codex_path=args.codex_path, preferences=args.preferences,
)
results = []
for seed in seeds:
skill, tasks = load_seed(data_root, seed)
if not tasks:
continue
# budget auto-planning: derive nights x rollouts_k from a token budget
nights, rollouts_k = args.nights, args.rollouts_k
if args.budget_tokens:
from skillopt.sleep.budget import Budget, plan_depth
n_train = len([t for t in tasks if t.split == "train"]) or len(tasks)
nights, rollouts_k = plan_depth(
Budget(max_tokens=args.budget_tokens), n_tasks=n_train,
default_nights=args.nights, default_k=args.rollouts_k,
)
if not args.json:
print(f" [budget] {args.budget_tokens} tok -> nights={nights} rollouts_k={rollouts_k}")
r = run_seed(backend, seed, skill, tasks, nights=nights,
edit_budget=args.edit_budget, rollouts_k=rollouts_k,
gate_mode=("off" if args.gate == "off" else "on"),
limit_replay=args.limit_replay, limit_holdout=args.limit_holdout)
results.append(r)
if not args.json:
print(f" {seed:<18} held-out {r['held_out_before']:.2f} -> {r['held_out_after']:.2f}"
f" ({'IMPROVED' if r['improved'] else 'no change'}, {r['nights']} nights)")
n_improved = sum(1 for r in results if r["improved"])
summary = {
"benchmark": "gbrain-evals/skillopt-v1",
"backend": backend.name,
"model": args.model or "(default)",
"n_seeds": len(results),
"n_improved": n_improved,
"tokens_used": backend.tokens_used(),
"results": results,
}
if args.json:
print(json.dumps(summary, ensure_ascii=False, indent=2))
else:
print(f"\n=== {n_improved}/{len(results)} seeds improved on held-out "
f"(backend={backend.name}, ~{backend.tokens_used()} tokens) ===")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,155 +0,0 @@
"""SkillOpt-Sleep — skill-transfer experiment (sleep scenario).
Answers: "if I optimize a skill while the agent sleeps using a CHEAP model,
does the learned skill still help an EXPENSIVE model at deploy time?" — and the
reverse. This is the SkillOpt paper's cross-model transfer result, reproduced
in the sleep setting, and it is the core price-difference value proposition:
spend cheap tokens overnight, deploy the frozen skill anywhere.
Protocol, per gbrain seed:
1. baseline_target = held-out score of the DEFICIENT skill, run on TARGET model
2. optimize the skill for N nights using the SOURCE model (attempt+reflect)
3. transferred = held-out score of the LEARNED skill, run on TARGET model,
with NO further optimization
4. (reference) direct = held-out score of a skill optimized AND run on TARGET
Report baseline / direct / transferred, mirroring SkillOpt Table "transfer".
Usage:
python -m skillopt.sleep.experiments.run_transfer \
--source-backend claude --source-model haiku \
--target-backend claude --target-model sonnet \
--seeds brief-writer --nights 2
"""
from __future__ import annotations
import argparse
import json
import sys
from typing import List, Optional
from skillopt.sleep.backend import get_backend
from skillopt.sleep.consolidate import consolidate, select_gate_score
from skillopt.sleep.experiments.gbrain_bench import (
available_seeds, find_data_root, load_seed,
)
from skillopt.sleep.replay import aggregate_scores, replay_batch
def _holdout_hard(backend, tasks, skill, memory="") -> float:
# transfer is measured on the true held-out TEST split
ho = [t for t in tasks if t.split == "test"]
if not ho:
ho = [t for t in tasks if t.split in ("val", "holdout")] or tasks
pairs = replay_batch(backend, ho, skill, memory)
h, _s = aggregate_scores(pairs)
return h
def _optimize(backend, skill, tasks, *, nights, edit_budget) -> str:
cur = skill
for night in range(1, nights + 1):
res = consolidate(backend, tasks, cur, "",
edit_budget=edit_budget, gate_metric="mixed",
evolve_skill=True, evolve_memory=False, night=night)
if res.accepted:
cur = res.new_skill
if res.holdout_candidate >= 0.999:
break
return cur
def run_seed(seed, skill, tasks, *, source, target, nights, edit_budget,
limit_replay, limit_holdout, do_direct=True) -> dict:
if limit_replay or limit_holdout:
train = [t for t in tasks if t.split == "train"]
val = [t for t in tasks if t.split == "val"]
test = [t for t in tasks if t.split == "test"]
if limit_replay:
train = train[:limit_replay]
if limit_holdout:
val = val[:limit_holdout]
test = test[:limit_holdout]
tasks = train + val + test
baseline_target = _holdout_hard(target, tasks, skill)
# optimize on SOURCE, evaluate frozen skill on TARGET
learned_on_source = _optimize(source, skill, tasks, nights=nights, edit_budget=edit_budget)
transferred = _holdout_hard(target, tasks, learned_on_source)
direct = None
if do_direct:
learned_on_target = _optimize(target, skill, tasks, nights=nights, edit_budget=edit_budget)
direct = _holdout_hard(target, tasks, learned_on_target)
return {
"seed": seed,
"baseline_target": round(baseline_target, 3),
"direct_target": (round(direct, 3) if direct is not None else None),
"transferred": round(transferred, 3),
"transfer_gain": round(transferred - baseline_target, 3),
"learned_skill_tail": learned_on_source[-300:],
}
def main(argv=None) -> int:
ap = argparse.ArgumentParser(description="SkillOpt-Sleep cross-model transfer")
ap.add_argument("--source-backend", default="claude")
ap.add_argument("--source-model", default="haiku")
ap.add_argument("--target-backend", default="claude")
ap.add_argument("--target-model", default="sonnet")
ap.add_argument("--codex-path", default="")
ap.add_argument("--data-root", default="")
ap.add_argument("--seeds", default="brief-writer")
ap.add_argument("--nights", type=int, default=2)
ap.add_argument("--edit-budget", type=int, default=4)
ap.add_argument("--limit-replay", type=int, default=3)
ap.add_argument("--limit-holdout", type=int, default=3)
ap.add_argument("--no-direct", action="store_true", help="skip the direct reference (saves cost)")
ap.add_argument("--json", action="store_true")
args = ap.parse_args(argv)
data_root = find_data_root(args.data_root)
if not data_root:
print("ERROR: gbrain-evals skillopt-v1 data not found; pass --data-root", file=sys.stderr)
return 2
source = get_backend(args.source_backend, model=args.source_model, codex_path=args.codex_path)
target = get_backend(args.target_backend, model=args.target_model, codex_path=args.codex_path)
seeds = [s.strip() for s in args.seeds.split(",") if s.strip()] or available_seeds(data_root)
results = []
for seed in seeds:
skill, tasks = load_seed(data_root, seed)
if not tasks:
continue
r = run_seed(seed, skill, tasks, source=source, target=target,
nights=args.nights, edit_budget=args.edit_budget,
limit_replay=args.limit_replay, limit_holdout=args.limit_holdout,
do_direct=not args.no_direct)
results.append(r)
if not args.json:
d = f" direct={r['direct_target']}" if r['direct_target'] is not None else ""
print(f" {seed:<16} baseline={r['baseline_target']:.2f}"
f" transferred={r['transferred']:.2f}{d}"
f" (gain {r['transfer_gain']:+.2f})")
summary = {
"experiment": "skillopt-sleep/transfer",
"source": f"{args.source_backend}:{args.source_model}",
"target": f"{args.target_backend}:{args.target_model}",
"tokens_source": source.tokens_used(),
"tokens_target": target.tokens_used(),
"results": results,
}
if args.json:
print(json.dumps(summary, ensure_ascii=False, indent=2))
else:
print(f"\n=== transfer {summary['source']} -> {summary['target']}: "
f"{sum(1 for r in results if r['transfer_gain'] > 0)}/{len(results)} positive ===")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,164 +0,0 @@
"""SkillOpt-Sleep — benchmark sweep driver.
Runs many (backend, model, seed, transfer-pair) configurations SEQUENTIALLY in
one process, appending each result to a JSONL file as it finishes. Designed to
run unattended in the background; safe to interrupt (already-written rows
survive) and resume (skip configs whose row already exists).
Then `report.py` turns the JSONL into a presented Markdown scorecard.
Usage:
python -m skillopt.sleep.experiments.sweep --plan quick --out docs/sleep/sweep.jsonl
python -m skillopt.sleep.experiments.sweep --plan full --out docs/sleep/sweep.jsonl
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from typing import Any, Dict, List
from skillopt.sleep.backend import build_backend, get_backend
from skillopt.sleep.experiments.gbrain_bench import find_data_root, load_seed
from skillopt.sleep.experiments.run_gbrain import run_seed as bench_seed
from skillopt.sleep.experiments.run_transfer import run_seed as transfer_seed
# Plans: lists of config dicts. Kept small per-run to bound cost/latency.
def _direct_cfg(backend, model, seed, nights=2):
return {"kind": "direct", "backend": backend, "model": model, "seed": seed, "nights": nights}
def _dual_cfg(opt_backend, opt_model, tgt_backend, tgt_model, seed, nights=2):
# a 'direct' run on a DualBackend: strong optimizer proposes, weak target runs
return {"kind": "dual", "optimizer_backend": opt_backend, "optimizer_model": opt_model,
"target_backend": tgt_backend, "target_model": tgt_model, "seed": seed, "nights": nights}
def _transfer_cfg(sb, sm, tb, tm, seed, nights=2):
return {"kind": "transfer", "source_backend": sb, "source_model": sm,
"target_backend": tb, "target_model": tm, "seed": seed, "nights": nights}
PLANS: Dict[str, List[Dict[str, Any]]] = {
# one cheap seed each, both backends — fast sanity
"quick": [
_direct_cfg("claude", "haiku", "brief-writer", 1),
_direct_cfg("codex", "", "brief-writer", 2),
],
# SkillOpt-faithful: STRONG optimizer (sonnet) proposes, WEAK target (haiku)
# runs — the reliable config. Plus Codex self-optimized. All 4 gbrain seeds,
# including quick-answerer (real tool loop).
"direct": [
_dual_cfg("claude", "sonnet", "claude", "haiku", "brief-writer"),
_dual_cfg("claude", "sonnet", "claude", "haiku", "advisor"),
_dual_cfg("claude", "sonnet", "claude", "haiku", "thorough-analyst"),
_dual_cfg("claude", "sonnet", "claude", "haiku", "quick-answerer"),
_direct_cfg("codex", "", "brief-writer"),
_direct_cfg("codex", "", "advisor"),
_direct_cfg("codex", "", "quick-answerer"),
],
# the price-difference story: optimize cheap, deploy expensive (and reverse)
"transfer": [
_transfer_cfg("claude", "haiku", "claude", "sonnet", "brief-writer"),
_transfer_cfg("claude", "sonnet", "claude", "haiku", "brief-writer"),
_transfer_cfg("codex", "", "claude", "haiku", "brief-writer"),
_transfer_cfg("claude", "haiku", "codex", "", "brief-writer"),
],
}
PLANS["full"] = PLANS["direct"] + PLANS["transfer"]
def _cfg_key(c: Dict[str, Any]) -> str:
return json.dumps({k: c[k] for k in sorted(c)}, ensure_ascii=False)
def _load_done(out_path: str) -> set:
done = set()
if os.path.exists(out_path):
with open(out_path) as f:
for line in f:
try:
row = json.loads(line)
if "cfg_key" in row:
done.add(row["cfg_key"])
except Exception:
pass
return done
def _append(out_path: str, row: Dict[str, Any]) -> None:
os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True)
with open(out_path, "a") as f:
f.write(json.dumps(row, ensure_ascii=False) + "\n")
def run_one(cfg: Dict[str, Any], data_root: str, codex_path: str,
limit_replay: int, limit_holdout: int) -> Dict[str, Any]:
seed = cfg["seed"]
skill, tasks = load_seed(data_root, seed)
t0 = time.time()
if cfg["kind"] in ("direct", "dual"):
if cfg["kind"] == "dual":
be = build_backend(
optimizer_backend=cfg["optimizer_backend"], optimizer_model=cfg.get("optimizer_model", ""),
target_backend=cfg["target_backend"], target_model=cfg.get("target_model", ""),
codex_path=codex_path,
)
else:
be = get_backend(cfg["backend"], model=cfg.get("model", ""), codex_path=codex_path)
r = bench_seed(be, seed, skill, tasks, nights=cfg["nights"],
limit_replay=limit_replay, limit_holdout=limit_holdout)
out = {"baseline": r["held_out_before"], "after": r["held_out_after"],
"improved": r["improved"], "tokens": be.tokens_used()}
else:
src = get_backend(cfg["source_backend"], model=cfg.get("source_model", ""), codex_path=codex_path)
tgt = get_backend(cfg["target_backend"], model=cfg.get("target_model", ""), codex_path=codex_path)
r = transfer_seed(seed, skill, tasks, source=src, target=tgt, nights=cfg["nights"],
edit_budget=4, limit_replay=limit_replay, limit_holdout=limit_holdout,
do_direct=False)
out = {"baseline_target": r["baseline_target"], "transferred": r["transferred"],
"transfer_gain": r["transfer_gain"],
"tokens": src.tokens_used() + tgt.tokens_used()}
out.update({"cfg": cfg, "cfg_key": _cfg_key(cfg), "elapsed_s": round(time.time() - t0, 1)})
return out
def main(argv=None) -> int:
ap = argparse.ArgumentParser(description="SkillOpt-Sleep benchmark sweep")
ap.add_argument("--plan", default="quick", choices=list(PLANS.keys()))
ap.add_argument("--out", default="docs/sleep/sweep.jsonl")
ap.add_argument("--data-root", default="")
ap.add_argument("--codex-path", default="")
ap.add_argument("--limit-replay", type=int, default=3)
ap.add_argument("--limit-holdout", type=int, default=3)
args = ap.parse_args(argv)
data_root = find_data_root(args.data_root)
if not data_root:
print("ERROR: gbrain-evals data not found; pass --data-root", file=sys.stderr)
return 2
plan = PLANS[args.plan]
done = _load_done(args.out)
print(f"[sweep] plan={args.plan} configs={len(plan)} already_done={len(done)} -> {args.out}")
for i, cfg in enumerate(plan, 1):
key = _cfg_key(cfg)
if key in done:
print(f"[sweep] ({i}/{len(plan)}) skip (done): {cfg}")
continue
print(f"[sweep] ({i}/{len(plan)}) running: {cfg}", flush=True)
try:
row = run_one(cfg, data_root, args.codex_path, args.limit_replay, args.limit_holdout)
except Exception as e: # never let one config kill the sweep
row = {"cfg": cfg, "cfg_key": key, "error": f"{type(e).__name__}: {e}"}
_append(args.out, row)
print(f"[sweep] -> {json.dumps({k: v for k, v in row.items() if k not in ('cfg','cfg_key')})}", flush=True)
print(f"[sweep] done. rows in {args.out}: {len(_load_done(args.out))}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,236 +0,0 @@
"""SkillOpt-Sleep — Stage 1: harvest.
Read the user's local Claude Code records (read-only) and normalize them
into :class:`SessionDigest` objects.
Sources (verified schema):
* ~/.claude/history.jsonl — one JSON/line:
{"display": <prompt text>, "pastedContents": {...},
"timestamp": <epoch ms>, "project": <abs path>}
* ~/.claude/projects/<slug>/<sessionId>.jsonl — one record/line; the
records we care about have type "user"/"assistant" and carry:
message{role, content}, cwd, gitBranch, timestamp, sessionId, version
This module performs NO writes and NO network calls.
"""
from __future__ import annotations
import json
import os
from typing import Any, Dict, Iterable, List, Optional
from skillopt.sleep.types import SessionDigest
# Heuristic phrases that signal the user (dis)approving of prior output.
_NEGATIVE_FEEDBACK = (
"still broken", "still not", "still wrong", "doesn't work", "does not work",
"not working", "that's wrong", "thats wrong", "incorrect", "wrong",
"no,", "nope", "fix it", "didn't", "did not", "broken", "error again",
"still failing", "still fails", "not fixed", "revert", "undo",
"不对", "还是不对", "还是不行", "不行", "错了", "有问题", "没修好",
)
_POSITIVE_FEEDBACK = (
"thanks", "thank you", "perfect", "great", "works now", "fixed",
"that works", "lgtm", "looks good", "nice", "awesome", "correct",
"完美", "可以了", "好的", "搞定", "对了", "正确", "谢谢",
)
def _iter_jsonl(path: str) -> Iterable[Dict[str, Any]]:
try:
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except Exception:
continue
except (FileNotFoundError, IsADirectoryError, PermissionError):
return
def _text_from_content(content: Any) -> str:
"""Flatten a message.content (str or list of blocks) into text."""
if isinstance(content, str):
return content
if isinstance(content, list):
parts: List[str] = []
for b in content:
if isinstance(b, dict):
if b.get("type") == "text" and b.get("text"):
parts.append(str(b["text"]))
return "\n".join(parts)
return ""
def _tool_names_from_content(content: Any) -> List[str]:
names: List[str] = []
if isinstance(content, list):
for b in content:
if isinstance(b, dict) and b.get("type") == "tool_use" and b.get("name"):
names.append(str(b["name"]))
return names
def _detect_feedback(text: str) -> List[str]:
low = text.lower()
sig: List[str] = []
for ph in _NEGATIVE_FEEDBACK:
if ph in low:
sig.append("neg:" + ph)
for ph in _POSITIVE_FEEDBACK:
if ph in low:
sig.append("pos:" + ph)
return sig
def _is_meta_prompt(text: str) -> bool:
"""Skip slash-commands / system noise that aren't real user intents."""
t = text.strip()
if not t:
return True
if t.startswith("<") and t.endswith(">"):
return True
if t.startswith("/") and len(t.split()) <= 3:
return True
if t.startswith("[Pasted text") or t.startswith("Caveat:"):
return True
return False
def digest_transcript(path: str) -> Optional[SessionDigest]:
"""Build a SessionDigest from one ``<sessionId>.jsonl`` transcript."""
session_id = os.path.splitext(os.path.basename(path))[0]
project = ""
git_branch = ""
started = ""
ended = ""
user_prompts: List[str] = []
assistant_finals: List[str] = []
tools: List[str] = []
files: List[str] = []
feedback: List[str] = []
n_user = 0
n_asst = 0
for rec in _iter_jsonl(path):
rtype = rec.get("type")
ts = rec.get("timestamp")
if isinstance(ts, str) and ts:
if not started:
started = ts
ended = ts
if rec.get("cwd") and not project:
project = str(rec.get("cwd"))
if rec.get("gitBranch") and not git_branch:
git_branch = str(rec.get("gitBranch"))
if rtype == "file-history-snapshot":
snap = rec.get("snapshot") or rec.get("files") or {}
if isinstance(snap, dict):
files.extend([str(k) for k in list(snap.keys())[:20]])
msg = rec.get("message")
if not isinstance(msg, dict):
continue
role = msg.get("role")
content = msg.get("content")
if role == "user":
text = _text_from_content(content)
if text and not _is_meta_prompt(text):
n_user += 1
user_prompts.append(text.strip())
feedback.extend(_detect_feedback(text))
elif role == "assistant":
n_asst += 1
tools.extend(_tool_names_from_content(content))
text = _text_from_content(content)
if text.strip():
assistant_finals.append(text.strip())
if n_user == 0 and n_asst == 0:
return None
# de-dup tools/files preserving order
def _dedup(xs: List[str]) -> List[str]:
seen = set()
out = []
for x in xs:
if x not in seen:
seen.add(x)
out.append(x)
return out
return SessionDigest(
session_id=session_id,
project=project,
git_branch=git_branch,
started_at=started,
ended_at=ended,
user_prompts=user_prompts,
assistant_finals=assistant_finals[-5:], # last few finals are the useful ones
tools_used=_dedup(tools),
files_touched=_dedup(files),
feedback_signals=feedback,
n_user_turns=n_user,
n_assistant_turns=n_asst,
raw_path=path,
)
def _project_matches(project: str, scope: Any, invoked: str) -> bool:
if scope == "all":
return True
if isinstance(scope, (list, tuple)):
return any(os.path.abspath(project) == os.path.abspath(p) for p in scope)
# "invoked": match the invoked project (or a subdir of it)
if not invoked:
return True
a = os.path.abspath(project)
b = os.path.abspath(invoked)
return a == b or a.startswith(b + os.sep) or b.startswith(a + os.sep)
def harvest(
transcripts_dir: str,
*,
scope: Any = "all",
invoked_project: str = "",
since_iso: Optional[str] = None,
limit: int = 0,
) -> List[SessionDigest]:
"""Walk ~/.claude/projects and return digests matching scope/time.
Parameters
----------
transcripts_dir : str ~/.claude/projects
scope : "all" | "invoked" | list[path]
invoked_project : str used when scope == "invoked"
since_iso : str|None ISO8601; only sessions ending after this are kept
limit : int cap number of digests (0 = no cap)
"""
digests: List[SessionDigest] = []
if not os.path.isdir(transcripts_dir):
return digests
paths: List[str] = []
for root, _dirs, files in os.walk(transcripts_dir):
for fn in files:
if fn.endswith(".jsonl"):
paths.append(os.path.join(root, fn))
# newest first by mtime
paths.sort(key=lambda p: os.path.getmtime(p), reverse=True)
for p in paths:
d = digest_transcript(p)
if d is None:
continue
if not _project_matches(d.project or "", scope, invoked_project):
continue
if since_iso and d.ended_at and d.ended_at < since_iso:
continue
digests.append(d)
if limit and len(digests) >= limit:
break
return digests

View File

@@ -1,84 +0,0 @@
"""SkillOpt-Sleep — rule-based judges (gbrain-evals compatible).
Implements the programmatic check operators used by gbrain-evals'
skillopt-v1 benchmark so we can score skill outputs locally, with NO judge
API call:
* section_present <name> — a markdown heading containing <name> exists
* regex <pattern> — the pattern matches the response
* max_chars <n> — response length <= n
* min_chars <n> — response length >= n
* contains <text> — substring present (case-insensitive)
* tool_called <name> — a tool with <name> was invoked (needs a tool loop;
in single-shot replay we approximate via an
explicit "TOOL_CALL: <name>" marker the agent emits)
A task whose judge is {"kind": "rule", "checks": [...]} passes (hard=1.0) iff
ALL checks pass; soft = fraction of checks passed. This mirrors gbrain's
all-checks-must-pass rule scoring and gives the gate a smooth signal.
"""
from __future__ import annotations
import re
from typing import Any, Dict, List, Tuple
def _section_present(response: str, name: str) -> bool:
# a markdown heading line (#, ##, ...) or bold line that contains `name`
pat = re.compile(
r"(?im)^\s{0,3}(#{1,6}\s*.*%s|\*\*.*%s.*\*\*\s*:?)\s*$" % (re.escape(name), re.escape(name))
)
if pat.search(response or ""):
return True
# also accept "Name:" style label at line start
label = re.compile(r"(?im)^\s*%s\s*:" % re.escape(name))
return bool(label.search(response or ""))
def _check(op: str, arg: Any, response: str, tools_called: List[str]) -> bool:
r = response or ""
if op == "section_present":
return _section_present(r, str(arg))
if op == "regex":
try:
return bool(re.search(str(arg), r))
except re.error:
return False
if op == "max_chars":
return len(r) <= int(arg)
if op == "min_chars":
return len(r) >= int(arg)
if op == "contains":
return str(arg).lower() in r.lower()
if op == "tool_called":
name = str(arg).lower()
if any(name == t.lower() for t in tools_called):
return True
# single-shot approximation: the agent emits an explicit marker
return bool(re.search(r"(?i)\btool_call\s*:\s*%s\b" % re.escape(name), r))
# unknown op: do not block
return True
def score_rule_judge(
judge: Dict[str, Any],
response: str,
tools_called: List[str] | None = None,
) -> Tuple[float, float, str]:
"""Return (hard, soft, rationale) for a gbrain-style rule judge."""
checks = (judge or {}).get("checks", []) or []
if not checks:
return 0.0, 0.0, "no checks"
tools_called = tools_called or []
passed = 0
failed_desc: List[str] = []
for c in checks:
ok = _check(c.get("op", ""), c.get("arg"), response, tools_called)
if ok:
passed += 1
else:
failed_desc.append(f"{c.get('op')}={c.get('arg')}")
soft = passed / len(checks)
hard = 1.0 if passed == len(checks) else 0.0
rationale = "all checks passed" if hard else "failed: " + ", ".join(failed_desc)
return hard, soft, rationale

View File

@@ -1,134 +0,0 @@
"""SkillOpt-Sleep — LLM-backed task miner.
The heuristic miner (mine.py) produces TaskRecords without a checkable
reference, so real harvested transcripts can't show measurable lift. This
module uses an optimizer backend to turn session digests into TaskRecords
WITH a checkable rubric judge — the missing piece for real-data improvement.
For each recurring intent it extracts:
* a clean, generalized `intent` (the reusable task, stripped of one-off specifics)
* a `rubric` (what a good answer must satisfy) -> stored as a rule judge of
`contains`/`regex`/`section_present` checks the local judge can score, OR a
free-text rubric scored by the backend's judge() when no programmatic check fits
* a preference signal (was the user satisfied?) to weight failures
It is deliberately conservative: it only emits a task when it can name a
concrete, checkable success criterion, so the gate has real signal. Tasks it
can't make checkable are dropped (logged), not faked.
"""
from __future__ import annotations
import json
import re
from typing import Any, Callable, Dict, List
from skillopt.sleep.backend import Backend, _extract_json
from skillopt.sleep.types import SessionDigest, TaskRecord
_MINER_PROMPT = """You are mining a user's past AI-assistant sessions to find RECURRING tasks
worth optimizing a skill for. From the session below, extract 0-3 reusable tasks.
A good task is something the user asks for repeatedly or had to correct, where a
GENERAL rule would help next time (formatting, structure, tool-use, conventions).
Skip one-off or purely exploratory requests.
For each task return:
- "intent": the reusable request, generalized (no one-off specifics)
- "checks": a list of programmatic success checks a grader can run on a future
answer. Each check is one of:
{"op":"section_present","arg":"<heading text>"}
{"op":"regex","arg":"<python regex the answer must match>"}
{"op":"contains","arg":"<substring the answer must contain>"}
{"op":"max_chars","arg":<int>}
Only include checks you are confident a GOOD answer must satisfy.
- "rubric": a one-sentence description of what a good answer looks like
- "satisfied": true/false — did the user seem satisfied with the assistant's answer?
Return ONLY a JSON array (possibly empty). No prose.
# Session
project: __PROJECT__
user prompts:
__PROMPTS__
assistant final (last):
__FINAL__
feedback signals: __FEEDBACK__
"""
def _digest_to_prompt(d: SessionDigest) -> str:
prompts = "\n".join(f" - {p[:240]}" for p in d.user_prompts[:6]) or " (none)"
final = (d.assistant_finals[-1][:400] if d.assistant_finals else "(none)")
return (
_MINER_PROMPT
.replace("__PROJECT__", d.project or "(unknown)")
.replace("__PROMPTS__", prompts)
.replace("__FINAL__", final)
.replace("__FEEDBACK__", ", ".join(d.feedback_signals[:6]) or "(none)")
)
def _mk_task(d: SessionDigest, obj: Dict[str, Any], idx: int) -> TaskRecord | None:
intent = str(obj.get("intent", "")).strip()
if len(intent) < 8:
return None
checks = obj.get("checks") or []
rubric = str(obj.get("rubric", "")).strip()
satisfied = bool(obj.get("satisfied", False))
# keep only well-formed checks
clean_checks = []
for c in checks:
if isinstance(c, dict) and c.get("op") in {
"section_present", "regex", "contains", "max_chars", "min_chars",
}:
clean_checks.append({"op": c["op"], "arg": c.get("arg")})
import hashlib
tid = "llm_" + hashlib.sha256((d.project + intent).encode()).hexdigest()[:12]
if clean_checks:
return TaskRecord(
id=tid, project=d.project, intent=intent,
reference_kind="rule", judge={"kind": "rule", "checks": clean_checks},
outcome="success" if satisfied else "fail",
tags=["mined:llm"], source_sessions=[d.session_id],
)
if rubric:
return TaskRecord(
id=tid, project=d.project, intent=intent,
reference_kind="rubric", reference=rubric,
outcome="success" if satisfied else "fail",
tags=["mined:llm"], source_sessions=[d.session_id],
)
return None # not checkable -> drop
def make_llm_miner(
backend: Backend,
*,
max_sessions: int = 20,
max_tasks: int = 40,
) -> Callable[[List[SessionDigest]], List[TaskRecord]]:
"""Return an llm_miner(digests) -> list[TaskRecord] bound to a backend."""
def _miner(digests: List[SessionDigest]) -> List[TaskRecord]:
out: List[TaskRecord] = []
for d in digests[:max_sessions]:
if not d.user_prompts:
continue
raw = backend._call(_digest_to_prompt(d), max_tokens=800) # type: ignore[attr-defined]
arr = _extract_json(raw, "array")
if not isinstance(arr, list):
continue
for i, obj in enumerate(arr[:3]):
if isinstance(obj, dict):
t = _mk_task(d, obj, i)
if t is not None:
out.append(t)
if len(out) >= max_tasks:
return out
return out
return _miner

View File

@@ -1,130 +0,0 @@
"""SkillOpt-Sleep — skill/memory document manipulation.
Applies bounded EditRecords to a skill (SKILL.md body) or memory (CLAUDE.md)
document, and provides Dream-style consolidation helpers (dedup near-identical
lines, drop contradictions). All edits live inside a protected, clearly-marked
region so the sleep cycle never clobbers the user's hand-written content.
"""
from __future__ import annotations
import re
from typing import List, Tuple
from skillopt.sleep.types import EditRecord
LEARNED_START = "<!-- SKILLOPT-SLEEP:LEARNED START -->"
LEARNED_END = "<!-- SKILLOPT-SLEEP:LEARNED END -->"
_BANNER = (
"_This block is maintained by SkillOpt-Sleep. Edits here are proposed "
"offline, validated against your past tasks, and adopted only after you "
"approve them. Hand-edits outside this block are never touched._"
)
def extract_learned(doc: str) -> str:
s = doc.find(LEARNED_START)
e = doc.find(LEARNED_END)
if s == -1 or e == -1:
return ""
return doc[s + len(LEARNED_START):e].strip()
def _strip_learned(doc: str) -> str:
while True:
s = doc.find(LEARNED_START)
if s == -1:
break
e = doc.find(LEARNED_END, s)
if e == -1:
doc = doc[:s]
break
doc = doc[:s] + doc[e + len(LEARNED_END):]
while "\n\n\n" in doc:
doc = doc.replace("\n\n\n", "\n\n")
return doc.rstrip()
def set_learned(doc: str, learned_lines: List[str]) -> str:
"""Replace the protected learned region with the given bullet lines."""
base = _strip_learned(doc)
body = "\n".join(f"- {ln.strip().lstrip('- ').strip()}" for ln in learned_lines if ln.strip())
block = (
f"\n\n{LEARNED_START}\n"
f"## Learned preferences & procedures\n\n{_BANNER}\n\n{body}\n"
f"{LEARNED_END}\n"
)
return (base + block).lstrip("\n")
def current_learned_lines(doc: str) -> List[str]:
inner = extract_learned(doc)
lines: List[str] = []
for ln in inner.splitlines():
ln = ln.strip()
if ln.startswith("- "):
lines.append(ln[2:].strip())
return lines
def _norm(s: str) -> str:
return re.sub(r"\s+", " ", (s or "").lower()).strip()
def apply_edits(doc: str, edits: List[EditRecord]) -> Tuple[str, List[EditRecord]]:
"""Apply add/delete/replace edits to the protected learned region.
Returns (new_doc, applied_edits). Dedups: an `add` whose content already
exists (normalized) is skipped. `delete`/`replace` match on normalized
anchor substring.
"""
lines = current_learned_lines(doc)
norm_set = {_norm(l) for l in lines}
applied: List[EditRecord] = []
for e in edits:
op = (e.op or "add").lower()
if op == "add":
if _norm(e.content) in norm_set or not e.content.strip():
continue
lines.append(e.content.strip())
norm_set.add(_norm(e.content))
applied.append(e)
elif op == "delete":
anchor = _norm(e.anchor or e.content)
keep = [l for l in lines if anchor not in _norm(l)]
if len(keep) != len(lines):
lines = keep
norm_set = {_norm(l) for l in lines}
applied.append(e)
elif op == "replace":
anchor = _norm(e.anchor)
new_lines = []
changed = False
for l in lines:
if anchor and anchor in _norm(l):
new_lines.append(e.content.strip())
changed = True
else:
new_lines.append(l)
if changed:
lines = new_lines
norm_set = {_norm(l) for l in lines}
applied.append(e)
return set_learned(doc, lines), applied
def ensure_skill_scaffold(doc: str, *, name: str, description: str) -> str:
"""Ensure a SKILL.md has YAML frontmatter so Claude Code loads it."""
if doc.lstrip().startswith("---"):
return doc
fm = (
"---\n"
f"name: {name}\n"
f"description: {description}\n"
"---\n\n"
f"# {name}\n\n"
"Preferences and procedures learned from your past Claude Code sessions.\n"
)
return fm + doc

View File

@@ -1,210 +0,0 @@
"""SkillOpt-Sleep — Stage 2: mine.
Turn :class:`SessionDigest` objects into :class:`TaskRecord` training units.
Two miners:
* heuristic_mine — deterministic, no API. Detects retry chains (a prompt
re-asked after negative feedback => the early attempt failed), extracts
the user's recurring intents, and labels outcomes from feedback signals.
* llm_mine — optional; uses an optimizer backend to produce richer
TaskRecords with checkable references. Falls back to heuristic on error.
The heuristic miner is what makes the whole cycle runnable offline and is the
basis of the deterministic experiment.
"""
from __future__ import annotations
import hashlib
import re
from typing import Any, Callable, List, Optional
from skillopt.sleep.types import SessionDigest, TaskRecord
def _tid(project: str, intent: str) -> str:
h = hashlib.sha256((project + "::" + intent).encode("utf-8")).hexdigest()[:12]
return "task_" + h
def _short(text: str, n: int = 600) -> str:
text = (text or "").strip()
return text if len(text) <= n else text[:n] + ""
def _looks_negative(signals: List[str]) -> bool:
return any(s.startswith("neg:") for s in signals)
def _looks_positive(signals: List[str]) -> bool:
return any(s.startswith("pos:") for s in signals)
def heuristic_mine(
digests: List[SessionDigest],
*,
max_tasks: int = 40,
) -> List[TaskRecord]:
"""Deterministic miner — no API calls.
Strategy:
* Each session with >=1 real user prompt yields one TaskRecord whose
intent is the FIRST substantive prompt (the original ask).
* Outcome is inferred:
- negative feedback present and no later positive -> "fail"
- positive feedback present -> "success"
- re-asks (multiple user turns) without resolution -> "mixed"
- otherwise -> "unknown"
* attempted_solution = the last assistant final (what was produced).
* reference_kind defaults to "none"; the consolidation step will use a
rubric judge for these. (Exact refs are added by the experiment data
or by the LLM miner when it can derive a checkable answer.)
"""
tasks: List[TaskRecord] = []
for d in digests:
if not d.user_prompts:
continue
intent = d.user_prompts[0]
if len(intent.strip()) < 8:
continue
if _looks_positive(d.feedback_signals) and not _looks_negative(d.feedback_signals):
outcome = "success"
elif _looks_negative(d.feedback_signals):
outcome = "fail"
elif d.n_user_turns >= 3:
outcome = "mixed"
else:
outcome = "unknown"
attempted = d.assistant_finals[-1] if d.assistant_finals else ""
context = ""
if len(d.user_prompts) > 1:
# later prompts often carry the corrective detail / real constraints
context = "Follow-up constraints from the same session:\n- " + "\n- ".join(
_short(p, 200) for p in d.user_prompts[1:4]
)
tags = []
if d.tools_used:
tags.append("tools:" + "+".join(d.tools_used[:4]))
if d.git_branch:
tags.append("branch:" + d.git_branch)
tasks.append(
TaskRecord(
id=_tid(d.project, intent),
project=d.project,
intent=_short(intent, 800),
context_excerpt=_short(context, 600),
attempted_solution=_short(attempted, 600),
outcome=outcome,
reference_kind="none",
reference="",
tags=tags,
source_sessions=[d.session_id],
)
)
if len(tasks) >= max_tasks:
break
return tasks
def dedup_tasks(tasks: List[TaskRecord]) -> List[TaskRecord]:
"""Merge tasks sharing an id (same project+intent across sessions)."""
by_id: dict = {}
for t in tasks:
if t.id in by_id:
ex = by_id[t.id]
ex.source_sessions = list(dict.fromkeys(ex.source_sessions + t.source_sessions))
# prefer a resolved outcome if either session resolved it
order = {"success": 3, "fail": 2, "mixed": 1, "unknown": 0}
if order.get(t.outcome, 0) > order.get(ex.outcome, 0):
ex.outcome = t.outcome
else:
by_id[t.id] = t
return list(by_id.values())
def assign_splits(
tasks: List[TaskRecord],
*,
val_fraction: float = 0.34,
test_fraction: float = 0.0,
holdout_fraction: float | None = None, # legacy alias for val_fraction
seed: int = 42,
) -> List[TaskRecord]:
"""Deterministically split tasks into train / val / test.
Anti-overfitting contract (the user's design):
* ``val`` and ``test`` are drawn ONLY from REAL mined tasks (origin=='real')
and never overlap. val gates updates; test is the final held-out measure.
* ``train`` may include DREAM-augmented tasks (origin=='dream'); those are
NEVER placed in val/test.
A stable hash of the task id keeps the same real task in the same split across
nights (a fixed held-out gate, like SkillOpt's D_sel/D_test).
Back-compat: if ``test_fraction`` is 0 (default), this behaves like the old
two-way replay/holdout split — real tasks divide into train + val, no test.
``holdout_fraction`` is accepted as an alias for ``val_fraction``.
"""
if holdout_fraction is not None:
val_fraction = holdout_fraction
dream = [t for t in tasks if t.origin == "dream"]
real = [t for t in tasks if t.origin != "dream"]
# all dream tasks go to train, unconditionally
for t in dream:
t.split = "train"
val_cut = int(round(val_fraction * 100))
test_cut = val_cut + int(round(test_fraction * 100))
for t in real:
bucket = int(hashlib.sha256((str(seed) + t.id).encode()).hexdigest(), 16) % 100
if bucket < val_cut:
t.split = "val"
elif bucket < test_cut:
t.split = "test"
else:
t.split = "train"
# guarantee val (the gate) is non-empty when we have >=2 real tasks
real_splits = {t.split for t in real}
if len(real) >= 2 and "val" not in real_splits:
real[-1].split = "val"
# guarantee a train pool exists (dream or real) when possible
if not any(t.split == "train" for t in tasks) and len(real) >= 2:
real[0].split = "train"
# if test was requested but ended up empty with >=3 real tasks, carve one
if test_fraction > 0 and len(real) >= 3 and not any(t.split == "test" for t in real):
for t in real:
if t.split == "train":
t.split = "test"
break
return tasks
def normalize_legacy_split(value: str) -> str:
"""Map old split names to the new vocabulary."""
return {"replay": "train", "holdout": "val"}.get(value, value)
def mine(
digests: List[SessionDigest],
*,
max_tasks: int = 40,
holdout_fraction: float = 0.34,
seed: int = 42,
llm_miner: Optional[Callable[[List[SessionDigest]], List[TaskRecord]]] = None,
) -> List[TaskRecord]:
"""Top-level miner. Uses ``llm_miner`` if provided, else heuristic."""
tasks: List[TaskRecord] = []
if llm_miner is not None:
try:
tasks = llm_miner(digests) or []
except Exception:
tasks = []
if not tasks:
tasks = heuristic_mine(digests, max_tasks=max_tasks)
tasks = dedup_tasks(tasks)
tasks = assign_splits(tasks, holdout_fraction=holdout_fraction, seed=seed)
return tasks

View File

@@ -1,118 +0,0 @@
"""SkillOpt-Sleep — Stage 3: replay.
Re-run mined TaskRecords offline under a given (skill, memory) and score
them, producing the (hard, soft) signal SkillOpt's gate consumes.
Single-shot text replay by default. Tasks whose rule judge requires a tool
call (gbrain's `tool_called`) are run through the backend's real tool loop
(attempt_with_tools), so tool use is verified honestly rather than self-reported.
"""
from __future__ import annotations
from typing import List, Tuple
from skillopt.sleep.backend import Backend
from skillopt.sleep.types import ReplayResult, TaskRecord
def _required_tools(task: TaskRecord) -> List[str]:
"""Tool names a rule judge requires (op == 'tool_called')."""
if task.reference_kind != "rule" or not task.judge:
return []
tools = []
for c in task.judge.get("checks", []) or []:
if isinstance(c, dict) and c.get("op") == "tool_called" and c.get("arg"):
tools.append(str(c["arg"]))
return tools
def replay_one(backend: Backend, task: TaskRecord, skill: str, memory: str) -> ReplayResult:
import time
tools = _required_tools(task)
tools_called: List[str] = []
t0 = time.time()
tok_before = backend.tokens_used()
if tools:
response, tools_called = backend.attempt_with_tools(task, skill, memory, tools)
else:
response = backend.attempt(task, skill, memory)
latency_ms = (time.time() - t0) * 1000.0
tokens = max(0, backend.tokens_used() - tok_before)
# if the backend doesn't track tokens (e.g. mock), approximate from text length
if tokens == 0:
tokens = (len(skill) + len(memory) + len(task.intent) + len(response)) // 4
# rule judges may need the detected tool calls; score locally when possible
if task.reference_kind == "rule" and task.judge:
from skillopt.sleep.judges import score_rule_judge
hard, soft, rationale = score_rule_judge(task.judge, response, tools_called)
else:
hard, soft, rationale = backend.judge(task, response)
return ReplayResult(
id=task.id,
hard=float(hard),
soft=float(soft),
response=response,
fail_reason="" if hard >= 1.0 else (rationale or "below threshold"),
task_type=(task.tags[0] if task.tags else "task"),
judge_rationale=rationale,
tools_called=tools_called,
tokens=int(tokens),
latency_ms=round(latency_ms, 1),
)
def replay_batch(
backend: Backend,
tasks: List[TaskRecord],
skill: str,
memory: str,
) -> List[Tuple[TaskRecord, ReplayResult]]:
return [(t, replay_one(backend, t, skill, memory)) for t in tasks]
def aggregate_scores(pairs: List[Tuple[TaskRecord, ReplayResult]]) -> Tuple[float, float]:
if not pairs:
return 0.0, 0.0
hard = sum(r.hard for _t, r in pairs) / len(pairs)
soft = sum(r.soft for _t, r in pairs) / len(pairs)
return hard, soft
def aggregate_cost(pairs: List[Tuple[TaskRecord, ReplayResult]]) -> Tuple[float, float]:
"""Mean (tokens, latency_ms) per task — the cost objectives."""
if not pairs:
return 0.0, 0.0
tok = sum(r.tokens for _t, r in pairs) / len(pairs)
lat = sum(r.latency_ms for _t, r in pairs) / len(pairs)
return tok, lat
def multi_objective_reward(
pairs: List[Tuple[TaskRecord, ReplayResult]],
*,
w_acc: float = 1.0,
w_tokens: float = 0.0,
w_latency: float = 0.0,
token_ref: float = 2000.0,
latency_ref_ms: float = 15000.0,
) -> float:
"""Weighted reward = accuracy↑, tokens↓, latency↓.
Cost terms are normalized against a reference and clamped to [0,1], so a
response at/under the reference cost contributes ~1.0 and an expensive one
less. Weights let the user trade off (default = accuracy only, backward
compatible).
"""
if not pairs:
return 0.0
acc, _soft = aggregate_scores(pairs)
tok, lat = aggregate_cost(pairs)
tok_score = max(0.0, 1.0 - tok / max(1.0, token_ref)) if token_ref else 0.0
lat_score = max(0.0, 1.0 - lat / max(1.0, latency_ref_ms)) if latency_ref_ms else 0.0
total_w = w_acc + w_tokens + w_latency
if total_w <= 0:
return acc
return (w_acc * acc + w_tokens * tok_score + w_latency * lat_score) / total_w

View File

@@ -1,122 +0,0 @@
"""SkillOpt-Sleep — multi-rollout + contrastive reflection ("脑补推演" core).
The user's insight: let the agent re-run the SAME task many times, then look at
which rollouts went well vs badly and distill a rule from the *contrast*. This
is a much stronger learning signal than a single failure, and it is the essence
of the offline "dream/imagination" process — train-time rollouts are synthetic,
so doing many is fine.
Pieces:
* multi_rollout — run one task K times under (skill, memory), return scored attempts
* contrastive_reflect — given good vs bad attempts of the same tasks, ask the
optimizer what distinguishes them and propose a general rule
Driven through the Backend abstraction (mock/claude/codex), import-light.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List, Optional, Tuple
from skillopt.sleep.backend import Backend, _extract_json
from skillopt.sleep.replay import replay_one
from skillopt.sleep.types import EditRecord, ReplayResult, TaskRecord
@dataclass
class RolloutSet:
"""K scored attempts at one task under a fixed (skill, memory)."""
task: TaskRecord
attempts: List[ReplayResult] = field(default_factory=list)
@property
def best(self) -> Optional[ReplayResult]:
return max(self.attempts, key=lambda r: r.hard, default=None)
@property
def worst(self) -> Optional[ReplayResult]:
return min(self.attempts, key=lambda r: r.hard, default=None)
@property
def spread(self) -> float:
if not self.attempts:
return 0.0
hs = [r.hard for r in self.attempts]
return max(hs) - min(hs)
@property
def pass_rate(self) -> float:
if not self.attempts:
return 0.0
return sum(1 for r in self.attempts if r.hard >= 1.0) / len(self.attempts)
def multi_rollout(
backend: Backend,
task: TaskRecord,
skill: str,
memory: str,
*,
k: int = 3,
) -> RolloutSet:
"""Run ``task`` K times. replay_one is deterministic for mock; for real
backends the model's own sampling yields variation across attempts."""
rs = RolloutSet(task=task)
for _ in range(max(1, k)):
rs.attempts.append(replay_one(backend, task, skill, memory))
return rs
def contrastive_reflect(
backend: Backend,
rollout_sets: List[RolloutSet],
skill: str,
memory: str,
*,
edit_budget: int = 4,
target: str = "skill",
) -> List[EditRecord]:
"""Distill a rule from the contrast between good and bad attempts.
We pick tasks with the highest score *spread* (some attempts passed, some
failed) — those are the most informative — and show the optimizer a
high-scoring vs a low-scoring attempt of each, asking what general rule makes
the good behavior reliable.
"""
informative = [rs for rs in rollout_sets if rs.spread > 0 and rs.best and rs.worst]
informative.sort(key=lambda rs: rs.spread, reverse=True)
informative = informative[:6]
if not informative:
return []
blocks = []
for rs in informative:
blocks.append(
f"## Task: {rs.task.intent[:160]}\n"
f"- GOOD attempt (score {rs.best.hard:.1f}): {rs.best.response[:200]}\n"
f"- BAD attempt (score {rs.worst.hard:.1f}): {rs.worst.response[:200]}\n"
f" (bad failed: {rs.worst.fail_reason[:100]})"
)
prompt = (
"You are SkillOpt's optimizer doing CONTRASTIVE reflection. For each task "
"below the agent was run multiple times; some attempts succeeded and some "
"failed. Identify what the GOOD attempts did that the BAD ones did not, "
f"and propose at most {edit_budget} SHORT, GENERAL, reusable rules for the "
f"{target} that would make the good behavior reliable every time. Quote "
"concrete thresholds/formats verbatim; do not paraphrase vaguely. "
'Return ONLY a JSON array: '
'[{"op":"add","content":"<rule>","rationale":"<what good did that bad didnt>"}].\n\n'
+ "\n\n".join(blocks)
)
raw = backend._call(prompt, max_tokens=1024) # type: ignore[attr-defined]
arr = _extract_json(raw, "array")
edits: List[EditRecord] = []
if isinstance(arr, list):
for e in arr[:edit_budget]:
if isinstance(e, dict) and str(e.get("content", "")).strip():
edits.append(EditRecord(
target=target, op=str(e.get("op", "add")).strip().lower(),
content=str(e["content"]).strip(),
rationale=str(e.get("rationale", "")).strip(),
))
return edits

View File

@@ -1,142 +0,0 @@
"""SkillOpt-Sleep — slow update (cross-night long-term memory).
This is the deployment-time analogue of SkillOpt's epoch-wise slow/meta update
(paper §3.6). Step-level edits (consolidate) learn from one night's batch; the
slow update learns across nights and writes a durable "longitudinal guidance"
block into a PROTECTED field of the skill that step-level edits never touch.
It reuses the exact protected-field marker convention from the main repo
(``skillopt/optimizer/slow_update.py``) so the artifact is compatible:
<!-- SLOW_UPDATE_START --> ... <!-- SLOW_UPDATE_END -->
Why it matters: even when the user turns the validation gate OFF (greedy mode),
the slow update still runs at the end of the run, so short-term nightly
experience is consolidated into long-term memory rather than lost. The cross-night
content is carried in ``state.slow_memory``.
Driven through the Backend abstraction (mock/claude/codex), so it stays
import-light — no `openai` dependency.
"""
from __future__ import annotations
import re
from typing import List, Optional, Tuple
from skillopt.sleep.backend import Backend, _extract_json
from skillopt.sleep.types import ReplayResult, TaskRecord
SLOW_UPDATE_START = "<!-- SLOW_UPDATE_START -->"
SLOW_UPDATE_END = "<!-- SLOW_UPDATE_END -->"
# ── protected-field helpers (mirror skillopt/optimizer/slow_update.py) ─────────
def has_slow_field(skill: str) -> bool:
return SLOW_UPDATE_START in skill and SLOW_UPDATE_END in skill
def extract_slow_field(skill: str) -> str:
s = skill.find(SLOW_UPDATE_START)
e = skill.find(SLOW_UPDATE_END)
if s == -1 or e == -1:
return ""
return skill[s + len(SLOW_UPDATE_START):e].strip()
def _strip_slow_fields(skill: str) -> str:
while True:
s = skill.find(SLOW_UPDATE_START)
if s == -1:
break
e = skill.find(SLOW_UPDATE_END, s)
if e == -1:
skill = skill[:s]
break
skill = skill[:s] + skill[e + len(SLOW_UPDATE_END):]
skill = skill.replace(SLOW_UPDATE_END, "")
while "\n\n\n" in skill:
skill = skill.replace("\n\n\n", "\n\n")
return skill.rstrip()
def replace_slow_field(skill: str, content: str) -> str:
"""Set the protected slow-update field to ``content`` (exactly one block)."""
base = _strip_slow_fields(skill)
if not content.strip():
return base
block = f"\n\n{SLOW_UPDATE_START}\n{content.strip()}\n{SLOW_UPDATE_END}\n"
return base + block
# ── the slow-update synthesis ──────────────────────────────────────────────────
def _summarize_pairs(
prev_pairs: List[Tuple[TaskRecord, ReplayResult]],
curr_pairs: List[Tuple[TaskRecord, ReplayResult]],
) -> str:
"""Group adjacent-version outcomes into improved/regressed/persistent/stable."""
prev_by = {t.id: r for t, r in prev_pairs}
lines: List[str] = []
counts = {"improved": 0, "regressed": 0, "persistent_fail": 0, "stable_success": 0}
for t, r in curr_pairs:
p = prev_by.get(t.id)
if p is None:
continue
a, b = p.hard, r.hard
if b > a:
cat = "improved"
elif b < a:
cat = "regressed"
elif b >= 1.0:
cat = "stable_success"
else:
cat = "persistent_fail"
counts[cat] += 1
if cat in ("regressed", "persistent_fail") and len(lines) < 8:
lines.append(f"- [{cat}] {t.intent[:120]} (why: {r.fail_reason[:80]})")
head = ", ".join(f"{k}={v}" for k, v in counts.items())
return head + ("\n" + "\n".join(lines) if lines else ""), counts # type: ignore[return-value]
def run_slow_update(
backend: Backend,
*,
prev_skill: str,
curr_skill: str,
prev_pairs: List[Tuple[TaskRecord, ReplayResult]],
curr_pairs: List[Tuple[TaskRecord, ReplayResult]],
prev_slow_content: str = "",
) -> Optional[str]:
"""Produce durable longitudinal guidance text (or None).
Compares behavior under the previous vs current skill across the same tasks
and asks the optimizer to distill a short, durable guidance block — what to
keep doing, what regressions to avoid — refining any prior slow-update text.
"""
summary, counts = _summarize_pairs(prev_pairs, curr_pairs) # type: ignore[misc]
# nothing changed and no prior guidance to refine → skip
if counts["regressed"] == 0 and counts["persistent_fail"] == 0 and not prev_slow_content:
return None
prompt = (
"You are SkillOpt's SLOW UPDATE — the long-term memory pass that runs "
"across nights. Write a SHORT, durable guidance block (2-5 bullet "
"points) capturing the longitudinal lessons: behaviors that reliably "
"help and should be preserved, and regressions/persistent failures to "
"avoid. Keep it GENERAL and stable (not tied to one task). If prior "
"guidance is given, refine it rather than restate it.\n"
'Return ONLY JSON: {"guidance": "<bullet list as one string>"}.\n\n'
f"# Cross-night outcome summary\n{summary}\n\n"
f"# Prior long-term guidance (refine this)\n{prev_slow_content or '(none)'}"
)
raw = backend._call(prompt, max_tokens=600) # type: ignore[attr-defined]
obj = _extract_json(raw, "object")
if isinstance(obj, dict):
g = str(obj.get("guidance", "")).strip()
if g:
return g
# fallback: if the model returned prose, keep the first ~400 chars
text = (raw or "").strip()
return text[:400] if text else None

View File

@@ -1,103 +0,0 @@
"""SkillOpt-Sleep — Stage 5/6: staging and adoption.
Implements the Dreams safety contract: the cycle never mutates the user's
live CLAUDE.md / SKILL.md. It writes proposals + a human-readable report into
a staging directory; a separate, explicit `adopt` step copies them over the
live files after taking a backup.
"""
from __future__ import annotations
import json
import os
import shutil
import time
from typing import List, Optional
from skillopt.sleep.types import SleepReport
def _ts_dir() -> str:
return time.strftime("%Y%m%d-%H%M%S", time.localtime())
def staging_root(project: str) -> str:
return os.path.join(project, ".skillopt-sleep", "staging")
def latest_staging(project: str) -> Optional[str]:
root = staging_root(project)
if not os.path.isdir(root):
return None
subs = sorted(
(os.path.join(root, d) for d in os.listdir(root)),
key=lambda p: os.path.getmtime(p),
reverse=True,
)
return subs[0] if subs else None
def write_staging(
project: str,
*,
report: SleepReport,
proposed_skill: Optional[str],
proposed_memory: Optional[str],
live_skill_path: str,
live_memory_path: str,
report_md: str,
) -> str:
"""Write proposals + report into staging/<ts>/ and return that path."""
out = os.path.join(staging_root(project), _ts_dir())
os.makedirs(out, exist_ok=True)
manifest = {
"live_skill_path": live_skill_path,
"live_memory_path": live_memory_path,
"has_skill": proposed_skill is not None,
"has_memory": proposed_memory is not None,
"accepted": report.accepted,
}
if proposed_skill is not None:
with open(os.path.join(out, "proposed_SKILL.md"), "w", encoding="utf-8") as f:
f.write(proposed_skill)
if proposed_memory is not None:
with open(os.path.join(out, "proposed_CLAUDE.md"), "w", encoding="utf-8") as f:
f.write(proposed_memory)
with open(os.path.join(out, "report.json"), "w", encoding="utf-8") as f:
json.dump(report.to_dict(), f, ensure_ascii=False, indent=2)
with open(os.path.join(out, "report.md"), "w", encoding="utf-8") as f:
f.write(report_md)
with open(os.path.join(out, "manifest.json"), "w", encoding="utf-8") as f:
json.dump(manifest, f, ensure_ascii=False, indent=2)
return out
def _backup(path: str, backup_dir: str) -> None:
if os.path.exists(path):
os.makedirs(backup_dir, exist_ok=True)
shutil.copy2(path, os.path.join(backup_dir, os.path.basename(path)))
def adopt(staging_dir: str) -> List[str]:
"""Copy staged proposals over the live files, backing up first.
Returns the list of live paths that were updated.
"""
with open(os.path.join(staging_dir, "manifest.json")) as f:
manifest = json.load(f)
backup_dir = os.path.join(staging_dir, "backup")
updated: List[str] = []
if manifest.get("has_skill"):
live = manifest["live_skill_path"]
os.makedirs(os.path.dirname(live), exist_ok=True)
_backup(live, backup_dir)
shutil.copy2(os.path.join(staging_dir, "proposed_SKILL.md"), live)
updated.append(live)
if manifest.get("has_memory"):
live = manifest["live_memory_path"]
os.makedirs(os.path.dirname(live), exist_ok=True)
_backup(live, backup_dir)
shutil.copy2(os.path.join(staging_dir, "proposed_CLAUDE.md"), live)
updated.append(live)
return updated

View File

@@ -1,83 +0,0 @@
"""SkillOpt-Sleep — persistent cross-night state.
state.json lives in ~/.skillopt-sleep and is the "long-term" store that
turns nightly episodes into durable competence (the Agent-Sleep paper's
short-term -> long-term transfer). It records:
- night counter
- last harvest timestamp per project (so each night only sees new data)
- cross-night "slow/meta" memory (lessons that persisted across nights)
- per-night history (scores, accept/reject) for trend reporting
"""
from __future__ import annotations
import json
import os
from typing import Any, Dict, List, Optional
def _now_iso(clock: Optional[float] = None) -> str:
# caller passes a timestamp; we avoid importing time at module import
import time as _t
return _t.strftime("%Y-%m-%dT%H:%M:%S", _t.localtime(clock if clock is not None else _t.time()))
DEFAULT_STATE: Dict[str, Any] = {
"version": 1,
"night": 0,
"last_harvest": {}, # project -> iso timestamp of last harvested record
"slow_memory": "", # cross-night consolidated lessons (meta-skill analogue)
"history": [], # list of per-night summaries
}
class SleepState:
def __init__(self, path: str, data: Optional[Dict[str, Any]] = None) -> None:
self.path = path
self.data = data if data is not None else dict(DEFAULT_STATE)
# io ---------------------------------------------------------------------
@classmethod
def load(cls, path: str) -> "SleepState":
if os.path.exists(path):
try:
with open(path) as f:
data = json.load(f)
merged = dict(DEFAULT_STATE)
merged.update(data if isinstance(data, dict) else {})
return cls(path, merged)
except Exception:
pass
return cls(path, dict(DEFAULT_STATE))
def save(self) -> None:
os.makedirs(os.path.dirname(self.path), exist_ok=True)
tmp = self.path + ".tmp"
with open(tmp, "w") as f:
json.dump(self.data, f, ensure_ascii=False, indent=2)
os.replace(tmp, self.path)
# accessors --------------------------------------------------------------
@property
def night(self) -> int:
return int(self.data.get("night", 0))
def last_harvest_for(self, project: str) -> Optional[str]:
return self.data.get("last_harvest", {}).get(project)
def set_last_harvest(self, project: str, iso_ts: str) -> None:
self.data.setdefault("last_harvest", {})[project] = iso_ts
@property
def slow_memory(self) -> str:
return str(self.data.get("slow_memory", ""))
def set_slow_memory(self, content: str) -> None:
self.data["slow_memory"] = content
def begin_night(self, clock: Optional[float] = None) -> int:
self.data["night"] = self.night + 1
return self.night
def record_night(self, summary: Dict[str, Any]) -> None:
self.data.setdefault("history", []).append(summary)

View File

@@ -1,140 +0,0 @@
"""SkillOpt-Sleep — core data types.
These dataclasses are the interfaces between the sleep-cycle stages
(harvest -> mine -> replay -> consolidate -> stage). They are intentionally
plain (no slots, no heavy deps) so the package imports cleanly on any
Python 3.8+ interpreter and the deterministic experiment runs with zero
external dependencies.
"""
from __future__ import annotations
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, List, Optional
# ── Stage 1: harvest ──────────────────────────────────────────────────────────
@dataclass
class SessionDigest:
"""A normalized summary of one Claude Code session transcript.
Produced by :mod:`skillopt.sleep.harvest` from a ``<sessionId>.jsonl``
transcript plus ``history.jsonl`` entries.
"""
session_id: str
project: str
git_branch: str = ""
started_at: str = ""
ended_at: str = ""
user_prompts: List[str] = field(default_factory=list)
assistant_finals: List[str] = field(default_factory=list)
tools_used: List[str] = field(default_factory=list)
files_touched: List[str] = field(default_factory=list)
feedback_signals: List[str] = field(default_factory=list) # "still broken", "perfect", ...
n_user_turns: int = 0
n_assistant_turns: int = 0
raw_path: str = ""
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
# ── Stage 2: mine ─────────────────────────────────────────────────────────────
@dataclass
class TaskRecord:
"""A self-contained recurring task mined from one or more sessions.
This is the *training unit* of the sleep cycle — the analogue of a
SkillOpt benchmark item.
"""
id: str
project: str
intent: str # what the user wanted (the "question")
context_excerpt: str = "" # minimal context needed to attempt it
attempted_solution: str = "" # what the agent produced before
outcome: str = "unknown" # success | fail | mixed | unknown
reference_kind: str = "none" # exact | rubric | rule | none
reference: str = "" # exact answer, or rubric text
judge: Dict[str, Any] = field(default_factory=dict) # gbrain-style rule judge
tags: List[str] = field(default_factory=list)
source_sessions: List[str] = field(default_factory=list)
# split ∈ {train, val, test}. val + test come ONLY from real mined tasks and
# never overlap (val gates updates, test is the final held-out measure). train
# may be dream-augmented (see origin). Legacy values replay->train,
# holdout->val are normalized on load.
split: str = "train"
# origin ∈ {real, dream}. 'real' = mined from the user's actual sessions;
# 'dream' = synthetic/augmented for the training pool. Dream tasks are NEVER
# allowed into val/test, which is the anti-overfitting guarantee.
origin: str = "real"
derived_from: str = "" # for dream tasks: the real task id it varies
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "TaskRecord":
known = {f for f in cls.__dataclass_fields__} # type: ignore[attr-defined]
return cls(**{k: v for k, v in d.items() if k in known})
# ── Stage 3: replay ───────────────────────────────────────────────────────────
@dataclass
class ReplayResult:
"""Outcome of re-running one TaskRecord offline under a given skill+memory."""
id: str
hard: float = 0.0 # 0/1 exact, or continuous reward
soft: float = 0.0 # partial credit / judge score 0..1
response: str = ""
fail_reason: str = ""
task_type: str = "task"
judge_rationale: str = ""
tools_called: List[str] = field(default_factory=list)
tokens: int = 0 # approx tokens this rollout cost (for token objective)
latency_ms: float = 0.0 # wall-clock for this rollout (for latency objective)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
# ── Stage 4/5: consolidation report ───────────────────────────────────────────
@dataclass
class EditRecord:
"""One bounded edit proposed/applied to skill or memory."""
target: str # "skill" | "memory"
op: str # add | delete | replace
content: str = ""
anchor: str = "" # for replace/delete: text being changed
rationale: str = ""
@dataclass
class SleepReport:
"""Everything one night produced — written to staging for review."""
night: int
project: str
started_at: str = ""
ended_at: str = ""
n_sessions: int = 0
n_tasks: int = 0
n_replayed: int = 0
baseline_score: float = 0.0
candidate_score: float = 0.0
accepted: bool = False
gate_action: str = ""
edits: List[EditRecord] = field(default_factory=list)
rejected_edits: List[EditRecord] = field(default_factory=list)
tokens_used: int = 0
notes: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
d = asdict(self)
return d