feat(sleep): multi-rollout contrastive reflection + token/time budget

The "脑补推演" core the user described — re-run the same task many times and
learn from the contrast between good and bad rollouts:

  - rollout.py: multi_rollout(task, k) runs K scored attempts; RolloutSet exposes
    best/worst/spread/pass_rate. contrastive_reflect picks the highest-spread
    tasks (some attempts passed, some failed — most informative) and asks the
    optimizer what the GOOD attempts did that the BAD ones didn't, distilling a
    general rule. Far stronger signal than a single failure.
  - consolidate(rollouts_k>1) uses contrastive reflection (falls back to
    single-shot reflect if it yields nothing).
  - budget.py: Budget(max_tokens|max_minutes) tracks spend; plan_depth() derives
    (nights, rollouts_k) from a token budget. run_gbrain gains --rollouts-k,
    --budget-tokens, --budget-minutes (auto-plans depth).

3 new tests (rollout stats, budget+plan, contrastive stub). 26 tests pass.

Co-Authored-By: Claude Opus 4 <noreply@anthropic.com>
This commit is contained in:
Yifan Yang
2026-06-08 14:31:51 +00:00
parent c179a24c45
commit 77ac33e8bf
5 changed files with 283 additions and 8 deletions

75
skillopt/sleep/budget.py Normal file
View File

@@ -0,0 +1,75 @@
"""SkillOpt-Sleep — budget controller.
Lets the user say how much they're willing to spend on a night's "dreaming",
in tokens or wall-clock minutes, and the engine schedules depth (how many
rollouts × how many nights) within that budget. Stops cleanly when exhausted
and reports what it skipped (no silent truncation).
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
@dataclass
class Budget:
max_tokens: Optional[int] = None # None = unlimited
max_minutes: Optional[float] = None # None = unlimited
_start_time: Optional[float] = None
_tokens_at_start: int = 0
def start(self, clock_fn, tokens_now: int) -> None:
self._start_time = clock_fn()
self._tokens_at_start = tokens_now
def tokens_spent(self, tokens_now: int) -> int:
return max(0, tokens_now - self._tokens_at_start)
def minutes_elapsed(self, clock_fn) -> float:
if self._start_time is None:
return 0.0
return (clock_fn() - self._start_time) / 60.0
def remaining_fraction(self, *, tokens_now: int, clock_fn) -> float:
"""Smallest remaining fraction across all active limits (1.0 = fresh)."""
fracs = [1.0]
if self.max_tokens:
fracs.append(max(0.0, 1.0 - self.tokens_spent(tokens_now) / self.max_tokens))
if self.max_minutes:
fracs.append(max(0.0, 1.0 - self.minutes_elapsed(clock_fn) / self.max_minutes))
return min(fracs)
def exhausted(self, *, tokens_now: int, clock_fn) -> bool:
if self.max_tokens and self.tokens_spent(tokens_now) >= self.max_tokens:
return True
if self.max_minutes and self.minutes_elapsed(clock_fn) >= self.max_minutes:
return True
return False
def status(self, *, tokens_now: int, clock_fn) -> str:
parts = []
if self.max_tokens:
parts.append(f"tokens {self.tokens_spent(tokens_now)}/{self.max_tokens}")
if self.max_minutes:
parts.append(f"minutes {self.minutes_elapsed(clock_fn):.1f}/{self.max_minutes}")
return ", ".join(parts) or "unbounded"
def plan_depth(budget: Budget, *, n_tasks: int,
default_nights: int = 2, default_k: int = 1) -> tuple:
"""Heuristically choose (nights, rollouts_per_task) from a token budget.
Rough cost model: one rollout ≈ 1 unit; a night does ~n_tasks*k rollouts
plus reflect/gate (~2*n_tasks). We scale k and nights up with more budget.
Returns (nights, k). With no budget set, returns the defaults.
"""
if not budget.max_tokens:
return default_nights, default_k
# assume ~1.5k tokens per rollout as a planning constant
rollouts_affordable = budget.max_tokens / 1500.0
per_night = max(1, n_tasks) * 3 # rollouts + reflect + gate, k=1
nights = max(1, min(4, int(rollouts_affordable // per_night)))
# spend surplus on more rollouts-per-task (contrastive signal)
surplus = rollouts_affordable - nights * per_night
k = max(1, min(5, 1 + int(surplus // max(1, n_tasks))))
return nights, k

View File

@@ -84,6 +84,7 @@ def consolidate(
gate_metric: str = "mixed",
gate_mixed_weight: float = 0.5,
gate_mode: str = "on", # "on" (hard/soft per gate_metric) | "off" (greedy)
rollouts_k: int = 1, # >1 => multi-rollout contrastive reflection
evolve_skill: bool = True,
evolve_memory: bool = True,
night: int = 1,
@@ -136,10 +137,27 @@ def consolidate(
return doc
if evolve_skill:
edits = backend.reflect(
failures, successes, cand_skill, cand_memory,
edit_budget=edit_budget, evolve_skill=True, evolve_memory=False,
)
if rollouts_k > 1:
# multi-rollout contrastive reflection: run each train task K times
# and distill a rule from the good-vs-bad contrast (the "脑补" signal).
from skillopt.sleep.rollout import multi_rollout, contrastive_reflect
sets = [multi_rollout(backend, t, cand_skill, cand_memory, k=rollouts_k)
for t in train_tasks]
edits = contrastive_reflect(
backend, sets, cand_skill, cand_memory,
edit_budget=edit_budget, target="skill",
)
# fall back to single-shot reflect if contrast yielded nothing
if not edits:
edits = backend.reflect(
failures, successes, cand_skill, cand_memory,
edit_budget=edit_budget, evolve_skill=True, evolve_memory=False,
)
else:
edits = backend.reflect(
failures, successes, cand_skill, cand_memory,
edit_budget=edit_budget, evolve_skill=True, evolve_memory=False,
)
cand_skill = _gate_apply(cand_skill, edits, "skill")
if evolve_memory:

View File

@@ -45,7 +45,7 @@ def _score(backend, tasks, skill, memory, split="test", metric="mixed", w=0.5):
def run_seed(backend, seed: str, skill: str, tasks: List, *,
nights: int = 3, edit_budget: int = 4, gate_mode: str = "on",
slow_update: bool = True,
slow_update: bool = True, rollouts_k: int = 1,
limit_replay: int = 0, limit_holdout: int = 0) -> dict:
memory = ""
# optionally cap each split to control API cost / latency.
@@ -69,7 +69,8 @@ def run_seed(backend, seed: str, skill: str, tasks: List, *,
res = consolidate(
backend, tasks, cur, memory,
edit_budget=edit_budget, gate_metric="mixed", gate_mixed_weight=0.5,
gate_mode=gate_mode, evolve_skill=True, evolve_memory=False, night=night,
gate_mode=gate_mode, rollouts_k=rollouts_k,
evolve_skill=True, evolve_memory=False, night=night,
)
if res.accepted:
cur = res.new_skill
@@ -136,6 +137,11 @@ def main(argv=None) -> int:
ap.add_argument("--edit-budget", type=int, default=4)
ap.add_argument("--gate", default="on", choices=["on", "off", "hard", "soft"],
help="on/hard/soft = validation-gated; off = greedy (no hard filter)")
ap.add_argument("--rollouts-k", type=int, default=1,
help=">1 = multi-rollout contrastive reflection per task")
ap.add_argument("--budget-tokens", type=int, default=0,
help="approx token budget; auto-plans nights x rollouts when set")
ap.add_argument("--budget-minutes", type=float, default=0.0)
ap.add_argument("--limit-replay", type=int, default=0, help="cap #train tasks (cost control)")
ap.add_argument("--limit-holdout", type=int, default=0, help="cap #val and #test tasks (cost control)")
ap.add_argument("--json", action="store_true")
@@ -160,8 +166,19 @@ def main(argv=None) -> int:
skill, tasks = load_seed(data_root, seed)
if not tasks:
continue
r = run_seed(backend, seed, skill, tasks, nights=args.nights,
edit_budget=args.edit_budget,
# budget auto-planning: derive nights x rollouts_k from a token budget
nights, rollouts_k = args.nights, args.rollouts_k
if args.budget_tokens:
from skillopt.sleep.budget import Budget, plan_depth
n_train = len([t for t in tasks if t.split == "train"]) or len(tasks)
nights, rollouts_k = plan_depth(
Budget(max_tokens=args.budget_tokens), n_tasks=n_train,
default_nights=args.nights, default_k=args.rollouts_k,
)
if not args.json:
print(f" [budget] {args.budget_tokens} tok -> nights={nights} rollouts_k={rollouts_k}")
r = run_seed(backend, seed, skill, tasks, nights=nights,
edit_budget=args.edit_budget, rollouts_k=rollouts_k,
gate_mode=("off" if args.gate == "off" else "on"),
limit_replay=args.limit_replay, limit_holdout=args.limit_holdout)
results.append(r)

122
skillopt/sleep/rollout.py Normal file
View File

@@ -0,0 +1,122 @@
"""SkillOpt-Sleep — multi-rollout + contrastive reflection ("脑补推演" core).
The user's insight: let the agent re-run the SAME task many times, then look at
which rollouts went well vs badly and distill a rule from the *contrast*. This
is a much stronger learning signal than a single failure, and it is the essence
of the offline "dream/imagination" process — train-time rollouts are synthetic,
so doing many is fine.
Pieces:
* multi_rollout — run one task K times under (skill, memory), return scored attempts
* contrastive_reflect — given good vs bad attempts of the same tasks, ask the
optimizer what distinguishes them and propose a general rule
Driven through the Backend abstraction (mock/claude/codex), import-light.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List, Optional, Tuple
from skillopt.sleep.backend import Backend, _extract_json
from skillopt.sleep.replay import replay_one
from skillopt.sleep.types import EditRecord, ReplayResult, TaskRecord
@dataclass
class RolloutSet:
"""K scored attempts at one task under a fixed (skill, memory)."""
task: TaskRecord
attempts: List[ReplayResult] = field(default_factory=list)
@property
def best(self) -> Optional[ReplayResult]:
return max(self.attempts, key=lambda r: r.hard, default=None)
@property
def worst(self) -> Optional[ReplayResult]:
return min(self.attempts, key=lambda r: r.hard, default=None)
@property
def spread(self) -> float:
if not self.attempts:
return 0.0
hs = [r.hard for r in self.attempts]
return max(hs) - min(hs)
@property
def pass_rate(self) -> float:
if not self.attempts:
return 0.0
return sum(1 for r in self.attempts if r.hard >= 1.0) / len(self.attempts)
def multi_rollout(
backend: Backend,
task: TaskRecord,
skill: str,
memory: str,
*,
k: int = 3,
) -> RolloutSet:
"""Run ``task`` K times. replay_one is deterministic for mock; for real
backends the model's own sampling yields variation across attempts."""
rs = RolloutSet(task=task)
for _ in range(max(1, k)):
rs.attempts.append(replay_one(backend, task, skill, memory))
return rs
def contrastive_reflect(
backend: Backend,
rollout_sets: List[RolloutSet],
skill: str,
memory: str,
*,
edit_budget: int = 4,
target: str = "skill",
) -> List[EditRecord]:
"""Distill a rule from the contrast between good and bad attempts.
We pick tasks with the highest score *spread* (some attempts passed, some
failed) — those are the most informative — and show the optimizer a
high-scoring vs a low-scoring attempt of each, asking what general rule makes
the good behavior reliable.
"""
informative = [rs for rs in rollout_sets if rs.spread > 0 and rs.best and rs.worst]
informative.sort(key=lambda rs: rs.spread, reverse=True)
informative = informative[:6]
if not informative:
return []
blocks = []
for rs in informative:
blocks.append(
f"## Task: {rs.task.intent[:160]}\n"
f"- GOOD attempt (score {rs.best.hard:.1f}): {rs.best.response[:200]}\n"
f"- BAD attempt (score {rs.worst.hard:.1f}): {rs.worst.response[:200]}\n"
f" (bad failed: {rs.worst.fail_reason[:100]})"
)
prompt = (
"You are SkillOpt's optimizer doing CONTRASTIVE reflection. For each task "
"below the agent was run multiple times; some attempts succeeded and some "
"failed. Identify what the GOOD attempts did that the BAD ones did not, "
f"and propose at most {edit_budget} SHORT, GENERAL, reusable rules for the "
f"{target} that would make the good behavior reliable every time. Quote "
"concrete thresholds/formats verbatim; do not paraphrase vaguely. "
'Return ONLY a JSON array: '
'[{"op":"add","content":"<rule>","rationale":"<what good did that bad didnt>"}].\n\n'
+ "\n\n".join(blocks)
)
raw = backend._call(prompt, max_tokens=1024) # type: ignore[attr-defined]
arr = _extract_json(raw, "array")
edits: List[EditRecord] = []
if isinstance(arr, list):
for e in arr[:edit_budget]:
if isinstance(e, dict) and str(e.get("content", "")).strip():
edits.append(EditRecord(
target=target, op=str(e.get("op", "add")).strip().lower(),
content=str(e["content"]).strip(),
rationale=str(e.get("rationale", "")).strip(),
))
return edits