diff --git a/skillopt/sleep/backend.py b/skillopt/sleep/backend.py index eec4367..bdc8e57 100644 --- a/skillopt/sleep/backend.py +++ b/skillopt/sleep/backend.py @@ -464,6 +464,48 @@ class CodexCliBackend(CliBackend): pass +# ── Dual backend: target runs the task, optimizer proposes/judges edits ─────── + +class DualBackend(Backend): + """Route operations to two backends, à la SkillOpt's target vs optimizer. + + * attempt -> TARGET backend (the model the skill is deployed on) + * reflect -> OPTIMIZER backend (the stronger/cheaper model writing edits) + * judge -> OPTIMIZER backend (graded by the optimizer when no local rule) + + This lets you optimize a skill with one model and run tasks on another, and + is the basis of the sleep-scenario transfer experiment (optimize cheap, + deploy expensive — or vice-versa). + """ + + name = "dual" + + def __init__(self, target: Backend, optimizer: Backend) -> None: + self.target = target + self.optimizer = optimizer + self.name = f"target={target.name}/optimizer={optimizer.name}" + + def attempt(self, task, skill, memory): + return self.target.attempt(task, skill, memory) + + def judge(self, task, response): + # local rule/exact judging needs no model; delegate to target which + # already short-circuits those. For rubric judging use the optimizer. + if task.reference_kind in {"rule", "exact"}: + return self.target.judge(task, response) + return self.optimizer.judge(task, response) + + def reflect(self, failures, successes, skill, memory, **kw): + return self.optimizer.reflect(failures, successes, skill, memory, **kw) + + def _call(self, prompt, *, max_tokens=1024): + # used by the LLM miner; prefer the optimizer (the "thinking" model) + return self.optimizer._call(prompt, max_tokens=max_tokens) # type: ignore[attr-defined] + + def tokens_used(self): + return self.target.tokens_used() + self.optimizer.tokens_used() + + def get_backend( name: str, *, @@ -477,3 +519,27 @@ def get_backend( if n in {"codex", "codex_cli", "openai_codex"}: return CodexCliBackend(model=model, codex_path=codex_path) return MockBackend() + + +def build_backend( + *, + backend: str = "mock", + model: str = "", + optimizer_backend: str = "", + optimizer_model: str = "", + target_backend: str = "", + target_model: str = "", + codex_path: str = "", +) -> Backend: + """Build a single or dual backend. + + If optimizer_* or target_* are given, returns a DualBackend routing + attempt->target and reflect/judge->optimizer. Otherwise a single backend + from (backend, model). + """ + has_split = any([optimizer_backend, optimizer_model, target_backend, target_model]) + if not has_split: + return get_backend(backend, model=model, codex_path=codex_path) + tgt = get_backend(target_backend or backend, model=target_model or model, codex_path=codex_path) + opt = get_backend(optimizer_backend or backend, model=optimizer_model or model, codex_path=codex_path) + return DualBackend(target=tgt, optimizer=opt) diff --git a/skillopt/sleep/config.py b/skillopt/sleep/config.py index 7fa3b47..88b969c 100644 --- a/skillopt/sleep/config.py +++ b/skillopt/sleep/config.py @@ -41,6 +41,7 @@ DEFAULTS: Dict[str, Any] = { "replay_mode": "mock", # "mock" (sandboxed prompt) | "fresh" (worktree) "evolve_memory": True, # consolidate CLAUDE.md "evolve_skill": True, # consolidate the managed SKILL.md + "llm_mine": True, # use the backend to mine checkable tasks (real backends) # ── adoption / safety ────────────────────────────────────────────────── "auto_adopt": False, # default: stage + require explicit `adopt` "managed_skill_name": "skillopt-sleep-learned", diff --git a/skillopt/sleep/cycle.py b/skillopt/sleep/cycle.py index 7b9b20b..a410e79 100644 --- a/skillopt/sleep/cycle.py +++ b/skillopt/sleep/cycle.py @@ -125,11 +125,22 @@ def run_sleep_cycle( limit=cfg.get("max_tasks_per_night", 40) * 3, ) n_sessions = len(digests) + # When a real backend is configured, use it to mine checkable tasks from + # the transcripts (rubric/rule judges); otherwise fall back to the + # heuristic miner (no API, no checkable reference). + llm_miner = None + if cfg.get("backend", "mock") != "mock" and cfg.get("llm_mine", True): + try: + from skillopt.sleep.llm_miner import make_llm_miner + llm_miner = make_llm_miner(backend, max_tasks=cfg.get("max_tasks_per_night", 40)) + except Exception: + llm_miner = None tasks = mine( digests, max_tasks=cfg.get("max_tasks_per_night", 40), holdout_fraction=cfg.get("holdout_fraction", 0.34), seed=cfg.get("seed", 42), + llm_miner=llm_miner, ) # ── live skill/memory docs ─────────────────────────────────────────── diff --git a/skillopt/sleep/experiments/run_gbrain.py b/skillopt/sleep/experiments/run_gbrain.py index 06819e4..63feec4 100644 --- a/skillopt/sleep/experiments/run_gbrain.py +++ b/skillopt/sleep/experiments/run_gbrain.py @@ -24,7 +24,7 @@ import json import sys from typing import Dict, List, Optional -from skillopt.sleep.backend import get_backend +from skillopt.sleep.backend import build_backend, get_backend from skillopt.sleep.consolidate import consolidate, select_gate_score from skillopt.sleep.experiments.gbrain_bench import ( available_seeds, @@ -90,6 +90,10 @@ def main(argv=None) -> int: ap = argparse.ArgumentParser(description="Run gbrain-evals skillopt-v1 with SkillOpt-Sleep") ap.add_argument("--backend", default="mock", choices=["mock", "claude", "codex"]) ap.add_argument("--model", default="") + ap.add_argument("--optimizer-backend", default="", help="route reflect/judge here (dual)") + ap.add_argument("--optimizer-model", default="") + ap.add_argument("--target-backend", default="", help="route attempt here (dual)") + ap.add_argument("--target-model", default="") ap.add_argument("--codex-path", default="") ap.add_argument("--data-root", default="", help="path to eval/data/skillopt-v1") ap.add_argument("--seeds", default="", help="comma list; default = all available") @@ -107,7 +111,12 @@ def main(argv=None) -> int: return 2 seeds = [s.strip() for s in args.seeds.split(",") if s.strip()] or available_seeds(data_root) - backend = get_backend(args.backend, model=args.model, codex_path=args.codex_path) + backend = build_backend( + backend=args.backend, model=args.model, + optimizer_backend=args.optimizer_backend, optimizer_model=args.optimizer_model, + target_backend=args.target_backend, target_model=args.target_model, + codex_path=args.codex_path, + ) results = [] for seed in seeds: diff --git a/skillopt/sleep/experiments/run_transfer.py b/skillopt/sleep/experiments/run_transfer.py new file mode 100644 index 0000000..af26685 --- /dev/null +++ b/skillopt/sleep/experiments/run_transfer.py @@ -0,0 +1,150 @@ +"""SkillOpt-Sleep — skill-transfer experiment (sleep scenario). + +Answers: "if I optimize a skill while the agent sleeps using a CHEAP model, +does the learned skill still help an EXPENSIVE model at deploy time?" — and the +reverse. This is the SkillOpt paper's cross-model transfer result, reproduced +in the sleep setting, and it is the core price-difference value proposition: +spend cheap tokens overnight, deploy the frozen skill anywhere. + +Protocol, per gbrain seed: + 1. baseline_target = held-out score of the DEFICIENT skill, run on TARGET model + 2. optimize the skill for N nights using the SOURCE model (attempt+reflect) + 3. transferred = held-out score of the LEARNED skill, run on TARGET model, + with NO further optimization + 4. (reference) direct = held-out score of a skill optimized AND run on TARGET + +Report baseline / direct / transferred, mirroring SkillOpt Table "transfer". + +Usage: + python -m skillopt.sleep.experiments.run_transfer \ + --source-backend claude --source-model haiku \ + --target-backend claude --target-model sonnet \ + --seeds brief-writer --nights 2 +""" +from __future__ import annotations + +import argparse +import json +import sys +from typing import List, Optional + +from skillopt.sleep.backend import get_backend +from skillopt.sleep.consolidate import consolidate, select_gate_score +from skillopt.sleep.experiments.gbrain_bench import ( + available_seeds, find_data_root, load_seed, +) +from skillopt.sleep.replay import aggregate_scores, replay_batch + + +def _holdout_hard(backend, tasks, skill, memory="") -> float: + ho = [t for t in tasks if t.split == "holdout"] or tasks + pairs = replay_batch(backend, ho, skill, memory) + h, _s = aggregate_scores(pairs) + return h + + +def _optimize(backend, skill, tasks, *, nights, edit_budget) -> str: + cur = skill + for night in range(1, nights + 1): + res = consolidate(backend, tasks, cur, "", + edit_budget=edit_budget, gate_metric="mixed", + evolve_skill=True, evolve_memory=False, night=night) + if res.accepted: + cur = res.new_skill + if res.holdout_candidate >= 0.999: + break + return cur + + +def run_seed(seed, skill, tasks, *, source, target, nights, edit_budget, + limit_replay, limit_holdout, do_direct=True) -> dict: + if limit_replay or limit_holdout: + replay = [t for t in tasks if t.split == "replay"] + holdout = [t for t in tasks if t.split == "holdout"] + if limit_replay: + replay = replay[:limit_replay] + if limit_holdout: + holdout = holdout[:limit_holdout] + tasks = replay + holdout + + baseline_target = _holdout_hard(target, tasks, skill) + + # optimize on SOURCE, evaluate frozen skill on TARGET + learned_on_source = _optimize(source, skill, tasks, nights=nights, edit_budget=edit_budget) + transferred = _holdout_hard(target, tasks, learned_on_source) + + direct = None + if do_direct: + learned_on_target = _optimize(target, skill, tasks, nights=nights, edit_budget=edit_budget) + direct = _holdout_hard(target, tasks, learned_on_target) + + return { + "seed": seed, + "baseline_target": round(baseline_target, 3), + "direct_target": (round(direct, 3) if direct is not None else None), + "transferred": round(transferred, 3), + "transfer_gain": round(transferred - baseline_target, 3), + "learned_skill_tail": learned_on_source[-300:], + } + + +def main(argv=None) -> int: + ap = argparse.ArgumentParser(description="SkillOpt-Sleep cross-model transfer") + ap.add_argument("--source-backend", default="claude") + ap.add_argument("--source-model", default="haiku") + ap.add_argument("--target-backend", default="claude") + ap.add_argument("--target-model", default="sonnet") + ap.add_argument("--codex-path", default="") + ap.add_argument("--data-root", default="") + ap.add_argument("--seeds", default="brief-writer") + ap.add_argument("--nights", type=int, default=2) + ap.add_argument("--edit-budget", type=int, default=4) + ap.add_argument("--limit-replay", type=int, default=3) + ap.add_argument("--limit-holdout", type=int, default=3) + ap.add_argument("--no-direct", action="store_true", help="skip the direct reference (saves cost)") + ap.add_argument("--json", action="store_true") + args = ap.parse_args(argv) + + data_root = find_data_root(args.data_root) + if not data_root: + print("ERROR: gbrain-evals skillopt-v1 data not found; pass --data-root", file=sys.stderr) + return 2 + + source = get_backend(args.source_backend, model=args.source_model, codex_path=args.codex_path) + target = get_backend(args.target_backend, model=args.target_model, codex_path=args.codex_path) + + seeds = [s.strip() for s in args.seeds.split(",") if s.strip()] or available_seeds(data_root) + results = [] + for seed in seeds: + skill, tasks = load_seed(data_root, seed) + if not tasks: + continue + r = run_seed(seed, skill, tasks, source=source, target=target, + nights=args.nights, edit_budget=args.edit_budget, + limit_replay=args.limit_replay, limit_holdout=args.limit_holdout, + do_direct=not args.no_direct) + results.append(r) + if not args.json: + d = f" direct={r['direct_target']}" if r['direct_target'] is not None else "" + print(f" {seed:<16} baseline={r['baseline_target']:.2f}" + f" transferred={r['transferred']:.2f}{d}" + f" (gain {r['transfer_gain']:+.2f})") + + summary = { + "experiment": "skillopt-sleep/transfer", + "source": f"{args.source_backend}:{args.source_model}", + "target": f"{args.target_backend}:{args.target_model}", + "tokens_source": source.tokens_used(), + "tokens_target": target.tokens_used(), + "results": results, + } + if args.json: + print(json.dumps(summary, ensure_ascii=False, indent=2)) + else: + print(f"\n=== transfer {summary['source']} -> {summary['target']}: " + f"{sum(1 for r in results if r['transfer_gain'] > 0)}/{len(results)} positive ===") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/skillopt/sleep/llm_miner.py b/skillopt/sleep/llm_miner.py new file mode 100644 index 0000000..374b787 --- /dev/null +++ b/skillopt/sleep/llm_miner.py @@ -0,0 +1,134 @@ +"""SkillOpt-Sleep — LLM-backed task miner. + +The heuristic miner (mine.py) produces TaskRecords without a checkable +reference, so real harvested transcripts can't show measurable lift. This +module uses an optimizer backend to turn session digests into TaskRecords +WITH a checkable rubric judge — the missing piece for real-data improvement. + +For each recurring intent it extracts: + * a clean, generalized `intent` (the reusable task, stripped of one-off specifics) + * a `rubric` (what a good answer must satisfy) -> stored as a rule judge of + `contains`/`regex`/`section_present` checks the local judge can score, OR a + free-text rubric scored by the backend's judge() when no programmatic check fits + * a preference signal (was the user satisfied?) to weight failures + +It is deliberately conservative: it only emits a task when it can name a +concrete, checkable success criterion, so the gate has real signal. Tasks it +can't make checkable are dropped (logged), not faked. +""" +from __future__ import annotations + +import json +import re +from typing import Any, Callable, Dict, List + +from skillopt.sleep.backend import Backend, _extract_json +from skillopt.sleep.types import SessionDigest, TaskRecord + + +_MINER_PROMPT = """You are mining a user's past AI-assistant sessions to find RECURRING tasks +worth optimizing a skill for. From the session below, extract 0-3 reusable tasks. + +A good task is something the user asks for repeatedly or had to correct, where a +GENERAL rule would help next time (formatting, structure, tool-use, conventions). +Skip one-off or purely exploratory requests. + +For each task return: + - "intent": the reusable request, generalized (no one-off specifics) + - "checks": a list of programmatic success checks a grader can run on a future + answer. Each check is one of: + {"op":"section_present","arg":""} + {"op":"regex","arg":""} + {"op":"contains","arg":""} + {"op":"max_chars","arg":} + Only include checks you are confident a GOOD answer must satisfy. + - "rubric": a one-sentence description of what a good answer looks like + - "satisfied": true/false — did the user seem satisfied with the assistant's answer? + +Return ONLY a JSON array (possibly empty). No prose. + +# Session +project: __PROJECT__ +user prompts: +__PROMPTS__ +assistant final (last): +__FINAL__ +feedback signals: __FEEDBACK__ +""" + + +def _digest_to_prompt(d: SessionDigest) -> str: + prompts = "\n".join(f" - {p[:240]}" for p in d.user_prompts[:6]) or " (none)" + final = (d.assistant_finals[-1][:400] if d.assistant_finals else "(none)") + return ( + _MINER_PROMPT + .replace("__PROJECT__", d.project or "(unknown)") + .replace("__PROMPTS__", prompts) + .replace("__FINAL__", final) + .replace("__FEEDBACK__", ", ".join(d.feedback_signals[:6]) or "(none)") + ) + + +def _mk_task(d: SessionDigest, obj: Dict[str, Any], idx: int) -> TaskRecord | None: + intent = str(obj.get("intent", "")).strip() + if len(intent) < 8: + return None + checks = obj.get("checks") or [] + rubric = str(obj.get("rubric", "")).strip() + satisfied = bool(obj.get("satisfied", False)) + + # keep only well-formed checks + clean_checks = [] + for c in checks: + if isinstance(c, dict) and c.get("op") in { + "section_present", "regex", "contains", "max_chars", "min_chars", + }: + clean_checks.append({"op": c["op"], "arg": c.get("arg")}) + + import hashlib + tid = "llm_" + hashlib.sha256((d.project + intent).encode()).hexdigest()[:12] + + if clean_checks: + return TaskRecord( + id=tid, project=d.project, intent=intent, + reference_kind="rule", judge={"kind": "rule", "checks": clean_checks}, + outcome="success" if satisfied else "fail", + tags=["mined:llm"], source_sessions=[d.session_id], + ) + if rubric: + return TaskRecord( + id=tid, project=d.project, intent=intent, + reference_kind="rubric", reference=rubric, + outcome="success" if satisfied else "fail", + tags=["mined:llm"], source_sessions=[d.session_id], + ) + return None # not checkable -> drop + + +def make_llm_miner( + backend: Backend, + *, + max_sessions: int = 20, + max_tasks: int = 40, +) -> Callable[[List[SessionDigest]], List[TaskRecord]]: + """Return an llm_miner(digests) -> list[TaskRecord] bound to a backend.""" + + def _miner(digests: List[SessionDigest]) -> List[TaskRecord]: + out: List[TaskRecord] = [] + for d in digests[:max_sessions]: + if not d.user_prompts: + continue + raw = backend._call(_digest_to_prompt(d), max_tokens=800) # type: ignore[attr-defined] + arr = _extract_json(raw, "array") + if not isinstance(arr, list): + continue + for i, obj in enumerate(arr[:3]): + if isinstance(obj, dict): + t = _mk_task(d, obj, i) + if t is not None: + out.append(t) + if len(out) >= max_tasks: + return out + return out + + return _miner diff --git a/tests/test_sleep_engine.py b/tests/test_sleep_engine.py index 8cdf9ab..d409bb2 100644 --- a/tests/test_sleep_engine.py +++ b/tests/test_sleep_engine.py @@ -177,6 +177,42 @@ class TestGbrainLoader(unittest.TestCase): self.assertEqual(score_rule_judge(ho.judge, skill)[0], 0.0) +class TestLlmMiner(unittest.TestCase): + def test_miner_emits_checkable_tasks(self): + # a stub backend whose _call returns canned miner JSON => deterministic + from skillopt.sleep.backend import Backend + from skillopt.sleep.llm_miner import make_llm_miner + + class StubBackend(Backend): + name = "stub" + def _call(self, prompt, *, max_tokens=1024): + return ('[{"intent":"write a research brief",' + '"checks":[{"op":"section_present","arg":"Key Risks"}],' + '"rubric":"has a risks section","satisfied":false}]') + + digest = SessionDigest(session_id="s1", project="/p", + user_prompts=["write a brief on X"], + assistant_finals=["a brief"], n_user_turns=1) + miner = make_llm_miner(StubBackend()) + tasks = miner([digest]) + self.assertEqual(len(tasks), 1) + self.assertEqual(tasks[0].reference_kind, "rule") + self.assertEqual(tasks[0].judge["checks"][0]["op"], "section_present") + + def test_miner_drops_uncheckable(self): + from skillopt.sleep.backend import Backend + from skillopt.sleep.llm_miner import make_llm_miner + + class EmptyBackend(Backend): + name = "stub" + def _call(self, prompt, *, max_tokens=1024): + return "[]" + + digest = SessionDigest(session_id="s1", project="/p", + user_prompts=["chat"], n_user_turns=1) + self.assertEqual(make_llm_miner(EmptyBackend())([digest]), []) + + class TestFullCycleAndAdopt(unittest.TestCase): def test_cycle_stage_then_adopt_with_backup(self): with tempfile.TemporaryDirectory() as proj, tempfile.TemporaryDirectory() as home: