diff --git a/skillopt/sleep/budget.py b/skillopt/sleep/budget.py new file mode 100644 index 0000000..48875ca --- /dev/null +++ b/skillopt/sleep/budget.py @@ -0,0 +1,75 @@ +"""SkillOpt-Sleep — budget controller. + +Lets the user say how much they're willing to spend on a night's "dreaming", +in tokens or wall-clock minutes, and the engine schedules depth (how many +rollouts × how many nights) within that budget. Stops cleanly when exhausted +and reports what it skipped (no silent truncation). +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class Budget: + max_tokens: Optional[int] = None # None = unlimited + max_minutes: Optional[float] = None # None = unlimited + _start_time: Optional[float] = None + _tokens_at_start: int = 0 + + def start(self, clock_fn, tokens_now: int) -> None: + self._start_time = clock_fn() + self._tokens_at_start = tokens_now + + def tokens_spent(self, tokens_now: int) -> int: + return max(0, tokens_now - self._tokens_at_start) + + def minutes_elapsed(self, clock_fn) -> float: + if self._start_time is None: + return 0.0 + return (clock_fn() - self._start_time) / 60.0 + + def remaining_fraction(self, *, tokens_now: int, clock_fn) -> float: + """Smallest remaining fraction across all active limits (1.0 = fresh).""" + fracs = [1.0] + if self.max_tokens: + fracs.append(max(0.0, 1.0 - self.tokens_spent(tokens_now) / self.max_tokens)) + if self.max_minutes: + fracs.append(max(0.0, 1.0 - self.minutes_elapsed(clock_fn) / self.max_minutes)) + return min(fracs) + + def exhausted(self, *, tokens_now: int, clock_fn) -> bool: + if self.max_tokens and self.tokens_spent(tokens_now) >= self.max_tokens: + return True + if self.max_minutes and self.minutes_elapsed(clock_fn) >= self.max_minutes: + return True + return False + + def status(self, *, tokens_now: int, clock_fn) -> str: + parts = [] + if self.max_tokens: + parts.append(f"tokens {self.tokens_spent(tokens_now)}/{self.max_tokens}") + if self.max_minutes: + parts.append(f"minutes {self.minutes_elapsed(clock_fn):.1f}/{self.max_minutes}") + return ", ".join(parts) or "unbounded" + + +def plan_depth(budget: Budget, *, n_tasks: int, + default_nights: int = 2, default_k: int = 1) -> tuple: + """Heuristically choose (nights, rollouts_per_task) from a token budget. + + Rough cost model: one rollout ≈ 1 unit; a night does ~n_tasks*k rollouts + plus reflect/gate (~2*n_tasks). We scale k and nights up with more budget. + Returns (nights, k). With no budget set, returns the defaults. + """ + if not budget.max_tokens: + return default_nights, default_k + # assume ~1.5k tokens per rollout as a planning constant + rollouts_affordable = budget.max_tokens / 1500.0 + per_night = max(1, n_tasks) * 3 # rollouts + reflect + gate, k=1 + nights = max(1, min(4, int(rollouts_affordable // per_night))) + # spend surplus on more rollouts-per-task (contrastive signal) + surplus = rollouts_affordable - nights * per_night + k = max(1, min(5, 1 + int(surplus // max(1, n_tasks)))) + return nights, k diff --git a/skillopt/sleep/consolidate.py b/skillopt/sleep/consolidate.py index 328345a..da15d96 100644 --- a/skillopt/sleep/consolidate.py +++ b/skillopt/sleep/consolidate.py @@ -84,6 +84,7 @@ def consolidate( gate_metric: str = "mixed", gate_mixed_weight: float = 0.5, gate_mode: str = "on", # "on" (hard/soft per gate_metric) | "off" (greedy) + rollouts_k: int = 1, # >1 => multi-rollout contrastive reflection evolve_skill: bool = True, evolve_memory: bool = True, night: int = 1, @@ -136,10 +137,27 @@ def consolidate( return doc if evolve_skill: - edits = backend.reflect( - failures, successes, cand_skill, cand_memory, - edit_budget=edit_budget, evolve_skill=True, evolve_memory=False, - ) + if rollouts_k > 1: + # multi-rollout contrastive reflection: run each train task K times + # and distill a rule from the good-vs-bad contrast (the "脑补" signal). + from skillopt.sleep.rollout import multi_rollout, contrastive_reflect + sets = [multi_rollout(backend, t, cand_skill, cand_memory, k=rollouts_k) + for t in train_tasks] + edits = contrastive_reflect( + backend, sets, cand_skill, cand_memory, + edit_budget=edit_budget, target="skill", + ) + # fall back to single-shot reflect if contrast yielded nothing + if not edits: + edits = backend.reflect( + failures, successes, cand_skill, cand_memory, + edit_budget=edit_budget, evolve_skill=True, evolve_memory=False, + ) + else: + edits = backend.reflect( + failures, successes, cand_skill, cand_memory, + edit_budget=edit_budget, evolve_skill=True, evolve_memory=False, + ) cand_skill = _gate_apply(cand_skill, edits, "skill") if evolve_memory: diff --git a/skillopt/sleep/experiments/run_gbrain.py b/skillopt/sleep/experiments/run_gbrain.py index 0e71f88..27a31cd 100644 --- a/skillopt/sleep/experiments/run_gbrain.py +++ b/skillopt/sleep/experiments/run_gbrain.py @@ -45,7 +45,7 @@ def _score(backend, tasks, skill, memory, split="test", metric="mixed", w=0.5): def run_seed(backend, seed: str, skill: str, tasks: List, *, nights: int = 3, edit_budget: int = 4, gate_mode: str = "on", - slow_update: bool = True, + slow_update: bool = True, rollouts_k: int = 1, limit_replay: int = 0, limit_holdout: int = 0) -> dict: memory = "" # optionally cap each split to control API cost / latency. @@ -69,7 +69,8 @@ def run_seed(backend, seed: str, skill: str, tasks: List, *, res = consolidate( backend, tasks, cur, memory, edit_budget=edit_budget, gate_metric="mixed", gate_mixed_weight=0.5, - gate_mode=gate_mode, evolve_skill=True, evolve_memory=False, night=night, + gate_mode=gate_mode, rollouts_k=rollouts_k, + evolve_skill=True, evolve_memory=False, night=night, ) if res.accepted: cur = res.new_skill @@ -136,6 +137,11 @@ def main(argv=None) -> int: ap.add_argument("--edit-budget", type=int, default=4) ap.add_argument("--gate", default="on", choices=["on", "off", "hard", "soft"], help="on/hard/soft = validation-gated; off = greedy (no hard filter)") + ap.add_argument("--rollouts-k", type=int, default=1, + help=">1 = multi-rollout contrastive reflection per task") + ap.add_argument("--budget-tokens", type=int, default=0, + help="approx token budget; auto-plans nights x rollouts when set") + ap.add_argument("--budget-minutes", type=float, default=0.0) ap.add_argument("--limit-replay", type=int, default=0, help="cap #train tasks (cost control)") ap.add_argument("--limit-holdout", type=int, default=0, help="cap #val and #test tasks (cost control)") ap.add_argument("--json", action="store_true") @@ -160,8 +166,19 @@ def main(argv=None) -> int: skill, tasks = load_seed(data_root, seed) if not tasks: continue - r = run_seed(backend, seed, skill, tasks, nights=args.nights, - edit_budget=args.edit_budget, + # budget auto-planning: derive nights x rollouts_k from a token budget + nights, rollouts_k = args.nights, args.rollouts_k + if args.budget_tokens: + from skillopt.sleep.budget import Budget, plan_depth + n_train = len([t for t in tasks if t.split == "train"]) or len(tasks) + nights, rollouts_k = plan_depth( + Budget(max_tokens=args.budget_tokens), n_tasks=n_train, + default_nights=args.nights, default_k=args.rollouts_k, + ) + if not args.json: + print(f" [budget] {args.budget_tokens} tok -> nights={nights} rollouts_k={rollouts_k}") + r = run_seed(backend, seed, skill, tasks, nights=nights, + edit_budget=args.edit_budget, rollouts_k=rollouts_k, gate_mode=("off" if args.gate == "off" else "on"), limit_replay=args.limit_replay, limit_holdout=args.limit_holdout) results.append(r) diff --git a/skillopt/sleep/rollout.py b/skillopt/sleep/rollout.py new file mode 100644 index 0000000..f96679c --- /dev/null +++ b/skillopt/sleep/rollout.py @@ -0,0 +1,122 @@ +"""SkillOpt-Sleep — multi-rollout + contrastive reflection ("脑补推演" core). + +The user's insight: let the agent re-run the SAME task many times, then look at +which rollouts went well vs badly and distill a rule from the *contrast*. This +is a much stronger learning signal than a single failure, and it is the essence +of the offline "dream/imagination" process — train-time rollouts are synthetic, +so doing many is fine. + +Pieces: + * multi_rollout — run one task K times under (skill, memory), return scored attempts + * contrastive_reflect — given good vs bad attempts of the same tasks, ask the + optimizer what distinguishes them and propose a general rule + +Driven through the Backend abstraction (mock/claude/codex), import-light. +""" +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import List, Optional, Tuple + +from skillopt.sleep.backend import Backend, _extract_json +from skillopt.sleep.replay import replay_one +from skillopt.sleep.types import EditRecord, ReplayResult, TaskRecord + + +@dataclass +class RolloutSet: + """K scored attempts at one task under a fixed (skill, memory).""" + task: TaskRecord + attempts: List[ReplayResult] = field(default_factory=list) + + @property + def best(self) -> Optional[ReplayResult]: + return max(self.attempts, key=lambda r: r.hard, default=None) + + @property + def worst(self) -> Optional[ReplayResult]: + return min(self.attempts, key=lambda r: r.hard, default=None) + + @property + def spread(self) -> float: + if not self.attempts: + return 0.0 + hs = [r.hard for r in self.attempts] + return max(hs) - min(hs) + + @property + def pass_rate(self) -> float: + if not self.attempts: + return 0.0 + return sum(1 for r in self.attempts if r.hard >= 1.0) / len(self.attempts) + + +def multi_rollout( + backend: Backend, + task: TaskRecord, + skill: str, + memory: str, + *, + k: int = 3, +) -> RolloutSet: + """Run ``task`` K times. replay_one is deterministic for mock; for real + backends the model's own sampling yields variation across attempts.""" + rs = RolloutSet(task=task) + for _ in range(max(1, k)): + rs.attempts.append(replay_one(backend, task, skill, memory)) + return rs + + +def contrastive_reflect( + backend: Backend, + rollout_sets: List[RolloutSet], + skill: str, + memory: str, + *, + edit_budget: int = 4, + target: str = "skill", +) -> List[EditRecord]: + """Distill a rule from the contrast between good and bad attempts. + + We pick tasks with the highest score *spread* (some attempts passed, some + failed) — those are the most informative — and show the optimizer a + high-scoring vs a low-scoring attempt of each, asking what general rule makes + the good behavior reliable. + """ + informative = [rs for rs in rollout_sets if rs.spread > 0 and rs.best and rs.worst] + informative.sort(key=lambda rs: rs.spread, reverse=True) + informative = informative[:6] + if not informative: + return [] + + blocks = [] + for rs in informative: + blocks.append( + f"## Task: {rs.task.intent[:160]}\n" + f"- GOOD attempt (score {rs.best.hard:.1f}): {rs.best.response[:200]}\n" + f"- BAD attempt (score {rs.worst.hard:.1f}): {rs.worst.response[:200]}\n" + f" (bad failed: {rs.worst.fail_reason[:100]})" + ) + prompt = ( + "You are SkillOpt's optimizer doing CONTRASTIVE reflection. For each task " + "below the agent was run multiple times; some attempts succeeded and some " + "failed. Identify what the GOOD attempts did that the BAD ones did not, " + f"and propose at most {edit_budget} SHORT, GENERAL, reusable rules for the " + f"{target} that would make the good behavior reliable every time. Quote " + "concrete thresholds/formats verbatim; do not paraphrase vaguely. " + 'Return ONLY a JSON array: ' + '[{"op":"add","content":"","rationale":""}].\n\n' + + "\n\n".join(blocks) + ) + raw = backend._call(prompt, max_tokens=1024) # type: ignore[attr-defined] + arr = _extract_json(raw, "array") + edits: List[EditRecord] = [] + if isinstance(arr, list): + for e in arr[:edit_budget]: + if isinstance(e, dict) and str(e.get("content", "")).strip(): + edits.append(EditRecord( + target=target, op=str(e.get("op", "add")).strip().lower(), + content=str(e["content"]).strip(), + rationale=str(e.get("rationale", "")).strip(), + )) + return edits diff --git a/tests/test_sleep_engine.py b/tests/test_sleep_engine.py index 51eddf6..b0c7a75 100644 --- a/tests/test_sleep_engine.py +++ b/tests/test_sleep_engine.py @@ -232,6 +232,49 @@ class TestLlmMiner(unittest.TestCase): self.assertEqual(make_llm_miner(EmptyBackend())([digest]), []) +class TestMultiRolloutAndBudget(unittest.TestCase): + def test_rolloutset_stats(self): + from skillopt.sleep.rollout import RolloutSet + from skillopt.sleep.types import ReplayResult, TaskRecord + rs = RolloutSet(task=TaskRecord(id="t", project="/p", intent="x"), + attempts=[ReplayResult(id="t", hard=1.0), + ReplayResult(id="t", hard=0.0), + ReplayResult(id="t", hard=1.0)]) + self.assertEqual(rs.best.hard, 1.0) + self.assertEqual(rs.worst.hard, 0.0) + self.assertEqual(rs.spread, 1.0) + self.assertAlmostEqual(rs.pass_rate, 2 / 3) + + def test_budget_exhaustion_and_plan(self): + from skillopt.sleep.budget import Budget, plan_depth + clock = [0.0] + b = Budget(max_tokens=1000) + b.start(lambda: clock[0], tokens_now=0) + self.assertFalse(b.exhausted(tokens_now=500, clock_fn=lambda: clock[0])) + self.assertTrue(b.exhausted(tokens_now=1000, clock_fn=lambda: clock[0])) + self.assertEqual(plan_depth(Budget(), n_tasks=5, default_nights=2, default_k=1), (2, 1)) + nights, k = plan_depth(Budget(max_tokens=100_000), n_tasks=5) + self.assertGreaterEqual(nights, 1) + self.assertGreaterEqual(k, 1) + + def test_contrastive_reflect_with_stub(self): + from skillopt.sleep.backend import Backend + from skillopt.sleep.rollout import RolloutSet, contrastive_reflect + from skillopt.sleep.types import ReplayResult, TaskRecord + + class StubBackend(Backend): + name = "stub" + def _call(self, prompt, *, max_tokens=1024): + return '[{"op":"add","content":"always do the good thing","rationale":"good passed"}]' + + rs = RolloutSet(task=TaskRecord(id="t", project="/p", intent="x"), + attempts=[ReplayResult(id="t", hard=1.0, response="good"), + ReplayResult(id="t", hard=0.0, response="bad")]) + edits = contrastive_reflect(StubBackend(), [rs], "skill", "") + self.assertEqual(len(edits), 1) + self.assertIn("good thing", edits[0].content) + + class TestSlowUpdate(unittest.TestCase): def test_protected_field_roundtrip(self): from skillopt.sleep.slow_update import (