mirror of
https://github.com/microsoft/SkillOpt.git
synced 2026-07-03 14:02:58 +08:00
Three additions driven by the goal of price-aware, model-flexible sleep: 1. DualBackend + build_backend(): route attempt->TARGET model and reflect/judge->OPTIMIZER model (SkillOpt's target-vs-optimizer split). gbrain runner gains --optimizer-backend/-model + --target-backend/-model. 2. run_transfer.py: sleep-scenario cross-model transfer. Optimize a skill on a SOURCE model (e.g. cheap haiku), freeze it, evaluate held-out on a TARGET model (e.g. expensive sonnet) with no further optimization — plus a direct reference. Mirrors the SkillOpt paper's transfer table; quantifies the "optimize cheap overnight, deploy anywhere" value prop. 3. llm_miner.py: turn real harvested transcripts into TaskRecords WITH checkable rule/rubric judges, wired into the cycle for non-mock backends, so real-data lift becomes measurable (heuristic miner remains the no-API fallback). Fixed a str.format brace bug the new unit test caught. 19 tests pass. Co-Authored-By: Claude Opus 4 <noreply@anthropic.com>
247 lines
11 KiB
Python
247 lines
11 KiB
Python
"""Tests for the SkillOpt-Sleep engine.
|
|
|
|
Pure-stdlib (unittest), deterministic, no API key, no third-party deps.
|
|
Run: python3.12 -m pytest tests/test_sleep_engine.py
|
|
or: python3.12 -m unittest skillopt.sleep ... (see bottom)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import tempfile
|
|
import unittest
|
|
|
|
from skillopt.sleep.backend import MockBackend, exact_score, keyword_soft_score
|
|
from skillopt.sleep.config import load_config
|
|
from skillopt.sleep.consolidate import consolidate
|
|
from skillopt.sleep.cycle import run_sleep_cycle
|
|
from skillopt.sleep.experiments.personas import researcher_persona, programmer_persona
|
|
from skillopt.sleep.harvest import digest_transcript, _detect_feedback, _is_meta_prompt
|
|
from skillopt.sleep.memory import apply_edits, current_learned_lines, extract_learned, set_learned
|
|
from skillopt.sleep.mine import assign_splits, heuristic_mine, dedup_tasks
|
|
from skillopt.sleep.staging import adopt, latest_staging
|
|
from skillopt.sleep.types import EditRecord, SessionDigest, TaskRecord
|
|
|
|
|
|
class TestScoring(unittest.TestCase):
|
|
def test_exact_score(self):
|
|
self.assertEqual(exact_score("arXiv:1706.03762", "the id is arXiv:1706.03762 ok"), 1.0)
|
|
self.assertEqual(exact_score("arXiv:1706.03762", "approximately arXiv:1706.037"), 0.0)
|
|
|
|
def test_keyword_soft(self):
|
|
self.assertGreater(keyword_soft_score("add login form", "please add the login form"), 0.5)
|
|
|
|
|
|
class TestMemoryEdits(unittest.TestCase):
|
|
def test_add_and_dedup(self):
|
|
doc = set_learned("# skill\n", [])
|
|
doc2, applied = apply_edits(doc, [EditRecord("skill", "add", "Rule A"),
|
|
EditRecord("skill", "add", "Rule A")])
|
|
self.assertEqual(len(applied), 1)
|
|
self.assertIn("Rule A", extract_learned(doc2))
|
|
|
|
def test_protected_region_roundtrip(self):
|
|
base = "# My hand-written skill\nkeep me\n"
|
|
doc = set_learned(base, ["Rule X"])
|
|
self.assertIn("keep me", doc)
|
|
self.assertEqual(current_learned_lines(doc), ["Rule X"])
|
|
# replacing learned region must preserve hand-written content
|
|
doc2 = set_learned(doc, ["Rule Y"])
|
|
self.assertIn("keep me", doc2)
|
|
self.assertEqual(current_learned_lines(doc2), ["Rule Y"])
|
|
|
|
def test_replace_and_delete(self):
|
|
doc = set_learned("", ["old rule about commits"])
|
|
doc, _ = apply_edits(doc, [EditRecord("skill", "replace", "new rule", anchor="old rule")])
|
|
self.assertIn("new rule", extract_learned(doc))
|
|
doc, _ = apply_edits(doc, [EditRecord("skill", "delete", "", anchor="new rule")])
|
|
self.assertEqual(current_learned_lines(doc), [])
|
|
|
|
|
|
class TestHarvest(unittest.TestCase):
|
|
def test_feedback_detection(self):
|
|
self.assertTrue(any(s.startswith("neg:") for s in _detect_feedback("this is still broken")))
|
|
self.assertTrue(any(s.startswith("pos:") for s in _detect_feedback("perfect, thanks")))
|
|
|
|
def test_meta_prompt_filter(self):
|
|
self.assertTrue(_is_meta_prompt("/clear"))
|
|
self.assertTrue(_is_meta_prompt("<system-reminder>x</system-reminder>"))
|
|
self.assertFalse(_is_meta_prompt("please refactor the auth module"))
|
|
|
|
def test_digest_real_transcript_if_present(self):
|
|
# uses the live machine's transcripts when available; skips otherwise
|
|
base = os.path.expanduser("~/.claude/projects")
|
|
if not os.path.isdir(base):
|
|
self.skipTest("no ~/.claude/projects on this machine")
|
|
found = None
|
|
for root, _d, files in os.walk(base):
|
|
for fn in files:
|
|
if fn.endswith(".jsonl"):
|
|
found = os.path.join(root, fn)
|
|
break
|
|
if found:
|
|
break
|
|
if not found:
|
|
self.skipTest("no transcripts")
|
|
d = digest_transcript(found)
|
|
# may be None for empty transcripts; if not, it must have core fields
|
|
if d is not None:
|
|
self.assertIsInstance(d.session_id, str)
|
|
self.assertGreaterEqual(d.n_user_turns + d.n_assistant_turns, 0)
|
|
|
|
|
|
class TestMine(unittest.TestCase):
|
|
def _digest(self, prompts, feedback):
|
|
return SessionDigest(
|
|
session_id="s1", project="/p", user_prompts=prompts,
|
|
assistant_finals=["did stuff"], feedback_signals=feedback,
|
|
n_user_turns=len(prompts), n_assistant_turns=1,
|
|
)
|
|
|
|
def test_outcome_inference(self):
|
|
fail = heuristic_mine([self._digest(["fix the parser bug please"], ["neg:still broken"])])
|
|
self.assertEqual(fail[0].outcome, "fail")
|
|
ok = heuristic_mine([self._digest(["format the output"], ["pos:perfect"])])
|
|
self.assertEqual(ok[0].outcome, "success")
|
|
|
|
def test_split_stable_and_nonempty(self):
|
|
tasks = assign_splits(researcher_persona(), holdout_fraction=0.34, seed=42)
|
|
splits = {t.split for t in tasks}
|
|
self.assertIn("replay", splits)
|
|
self.assertIn("holdout", splits)
|
|
# stable across calls
|
|
again = assign_splits(researcher_persona(), holdout_fraction=0.34, seed=42)
|
|
self.assertEqual([t.split for t in tasks], [t.split for t in again])
|
|
|
|
|
|
class TestConsolidateGate(unittest.TestCase):
|
|
def test_accepts_helpful_rejects_harmful(self):
|
|
be = MockBackend()
|
|
tasks = assign_splits(researcher_persona(), holdout_fraction=0.34, seed=42)
|
|
res = consolidate(be, tasks, set_learned("", []), "", edit_budget=4,
|
|
gate_metric="mixed", night=1)
|
|
self.assertTrue(res.accepted)
|
|
self.assertGreater(res.candidate_score, res.baseline_score)
|
|
|
|
def test_no_op_when_already_optimal(self):
|
|
be = MockBackend()
|
|
tasks = assign_splits(programmer_persona(), holdout_fraction=0.34, seed=1)
|
|
# first night learns the rule
|
|
r1 = consolidate(be, tasks, set_learned("", []), "", edit_budget=4, night=1)
|
|
# second night on the learned skill should find nothing to add
|
|
r2 = consolidate(be, tasks, r1.new_skill, r1.new_memory, edit_budget=4, night=2)
|
|
self.assertEqual(len(r2.applied_edits), 0)
|
|
|
|
|
|
class TestRuleJudge(unittest.TestCase):
|
|
def test_section_and_regex(self):
|
|
from skillopt.sleep.judges import score_rule_judge
|
|
j = {"kind": "rule", "checks": [
|
|
{"op": "section_present", "arg": "Key Risks"},
|
|
{"op": "regex", "arg": r"[Cc]onfidence\s*[:=]"},
|
|
]}
|
|
ok = "# Brief\n## Key Risks\nstuff\nConfidence: High"
|
|
self.assertEqual(score_rule_judge(j, ok)[0], 1.0)
|
|
self.assertEqual(score_rule_judge(j, "just an answer")[0], 0.0)
|
|
|
|
def test_max_chars(self):
|
|
from skillopt.sleep.judges import score_rule_judge
|
|
j = {"checks": [{"op": "max_chars", "arg": 50}]}
|
|
self.assertEqual(score_rule_judge(j, "x" * 10)[0], 1.0)
|
|
self.assertEqual(score_rule_judge(j, "x" * 100)[0], 0.0)
|
|
|
|
def test_partial_soft_score(self):
|
|
from skillopt.sleep.judges import score_rule_judge
|
|
j = {"checks": [
|
|
{"op": "contains", "arg": "alpha"},
|
|
{"op": "contains", "arg": "beta"},
|
|
]}
|
|
h, s, _ = score_rule_judge(j, "only alpha here")
|
|
self.assertEqual(h, 0.0)
|
|
self.assertAlmostEqual(s, 0.5)
|
|
|
|
|
|
class TestGbrainLoader(unittest.TestCase):
|
|
def test_loads_when_present(self):
|
|
from skillopt.sleep.experiments.gbrain_bench import find_data_root, load_seed
|
|
root = find_data_root()
|
|
if not root:
|
|
self.skipTest("gbrain-evals data not present")
|
|
skill, tasks = load_seed(root, "brief-writer")
|
|
self.assertTrue(skill)
|
|
self.assertTrue(any(t.split == "holdout" for t in tasks))
|
|
self.assertTrue(all(t.reference_kind == "rule" for t in tasks))
|
|
# the deficient skill must FAIL its own held-out checks (baseline 0)
|
|
from skillopt.sleep.judges import score_rule_judge
|
|
ho = [t for t in tasks if t.split == "holdout"][0]
|
|
self.assertEqual(score_rule_judge(ho.judge, skill)[0], 0.0)
|
|
|
|
|
|
class TestLlmMiner(unittest.TestCase):
|
|
def test_miner_emits_checkable_tasks(self):
|
|
# a stub backend whose _call returns canned miner JSON => deterministic
|
|
from skillopt.sleep.backend import Backend
|
|
from skillopt.sleep.llm_miner import make_llm_miner
|
|
|
|
class StubBackend(Backend):
|
|
name = "stub"
|
|
def _call(self, prompt, *, max_tokens=1024):
|
|
return ('[{"intent":"write a research brief",'
|
|
'"checks":[{"op":"section_present","arg":"Key Risks"}],'
|
|
'"rubric":"has a risks section","satisfied":false}]')
|
|
|
|
digest = SessionDigest(session_id="s1", project="/p",
|
|
user_prompts=["write a brief on X"],
|
|
assistant_finals=["a brief"], n_user_turns=1)
|
|
miner = make_llm_miner(StubBackend())
|
|
tasks = miner([digest])
|
|
self.assertEqual(len(tasks), 1)
|
|
self.assertEqual(tasks[0].reference_kind, "rule")
|
|
self.assertEqual(tasks[0].judge["checks"][0]["op"], "section_present")
|
|
|
|
def test_miner_drops_uncheckable(self):
|
|
from skillopt.sleep.backend import Backend
|
|
from skillopt.sleep.llm_miner import make_llm_miner
|
|
|
|
class EmptyBackend(Backend):
|
|
name = "stub"
|
|
def _call(self, prompt, *, max_tokens=1024):
|
|
return "[]"
|
|
|
|
digest = SessionDigest(session_id="s1", project="/p",
|
|
user_prompts=["chat"], n_user_turns=1)
|
|
self.assertEqual(make_llm_miner(EmptyBackend())([digest]), [])
|
|
|
|
|
|
class TestFullCycleAndAdopt(unittest.TestCase):
|
|
def test_cycle_stage_then_adopt_with_backup(self):
|
|
with tempfile.TemporaryDirectory() as proj, tempfile.TemporaryDirectory() as home:
|
|
cfg = load_config(
|
|
invoked_project=proj, projects="invoked", backend="mock",
|
|
claude_home=os.path.join(home, ".claude"),
|
|
managed_skill_name="skillopt-sleep-learned",
|
|
auto_adopt=False,
|
|
)
|
|
# seed a known persona so we don't depend on ~/.claude
|
|
tasks = assign_splits(researcher_persona(), holdout_fraction=0.34, seed=42)
|
|
|
|
outcome = run_sleep_cycle(cfg, seed_tasks=tasks)
|
|
self.assertTrue(outcome.report.accepted)
|
|
self.assertTrue(os.path.isdir(outcome.staging_dir))
|
|
self.assertTrue(os.path.exists(os.path.join(outcome.staging_dir, "report.md")))
|
|
|
|
# nothing live touched yet
|
|
live_skill = cfg.managed_skill_path()
|
|
self.assertFalse(os.path.exists(live_skill))
|
|
|
|
# adopt -> live file created, backup dir exists
|
|
updated = adopt(outcome.staging_dir)
|
|
self.assertTrue(any("SKILL.md" in p for p in updated))
|
|
self.assertTrue(os.path.exists(live_skill))
|
|
with open(live_skill) as f:
|
|
self.assertIn("answer", f.read().lower())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main(verbosity=2)
|