mirror of
https://github.com/microsoft/SkillOpt.git
synced 2026-07-03 14:02:58 +08:00
feat(sleep): nightly offline self-evolution engine + Claude Code plugin
Add skillopt/sleep — a deployment-time companion to SkillOpt that gives a
local Claude agent a nightly "sleep cycle":
harvest ~/.claude transcripts -> mine recurring tasks -> replay offline
-> consolidate (reflect -> bounded edit -> held-out GATE) -> stage -> adopt
Synthesizes SkillOpt (validation-gated bounded text optimization, reusing
skillopt.evaluation.gate verbatim), Claude Dreams (offline consolidation;
input never mutated; review-then-adopt), and the agent-sleep paper
(short-term experience -> long-term competence).
Engine (skillopt/sleep/, import-light, py>=3.10):
- harvest.py read-only parse of session JSONL + history.jsonl
- mine.py sessions -> TaskRecords (heuristic miner + LLM hook)
- backend.py MockBackend (deterministic, no API) + AnthropicBackend
- replay.py offline re-run -> (hard, soft) scores
- consolidate.py one SkillOpt epoch behind a held-out gate
- memory.py protected-region edits to SKILL.md / CLAUDE.md
- staging.py stage proposals; adopt with backup (Dreams safety contract)
- cycle.py + __main__.py orchestrator + CLI (run/dry-run/status/adopt/harvest)
Plugin (skillopt-sleep-plugin/): plugin.json, /sleep command, skillopt-sleep
skill, SessionEnd hook, bundled runner + cron generator.
Validation (deterministic, no API): persona experiment proves held-out lift
(researcher 0.33->1.0, programmer 0.32->1.0) AND that the gate rejects an
injected harmful edit. 13 stdlib-unittest tests pass, incl. full cycle +
adopt-with-backup and parsing of real on-disk transcripts.
Co-Authored-By: Claude Opus 4 <noreply@anthropic.com>
This commit is contained in:
20
skillopt/sleep/__init__.py
Normal file
20
skillopt/sleep/__init__.py
Normal file
@@ -0,0 +1,20 @@
|
||||
"""SkillOpt-Sleep — nightly offline self-evolution for a local Claude agent.
|
||||
|
||||
A Claude Code plugin engine that gives a user's agent a "sleep cycle":
|
||||
harvest the day's real session transcripts, mine recurring tasks, replay
|
||||
them offline, and consolidate short-term experience into long-term memory
|
||||
(CLAUDE.md) and skills (SKILL.md) behind a SkillOpt validation gate.
|
||||
|
||||
Synthesizes three ideas:
|
||||
* SkillOpt — validation-gated bounded text optimization (this repo)
|
||||
* Dreams — offline memory consolidation, input never mutated
|
||||
* Sleep — short-term experience -> long-term competence, offline
|
||||
|
||||
Public entry points:
|
||||
* skillopt.sleep.cli — `python -m skillopt.sleep ...`
|
||||
* skillopt.sleep.cycle.run_sleep_cycle(...)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
__all__ = ["__version__"]
|
||||
__version__ = "0.1.0"
|
||||
195
skillopt/sleep/__main__.py
Normal file
195
skillopt/sleep/__main__.py
Normal file
@@ -0,0 +1,195 @@
|
||||
"""SkillOpt-Sleep — command-line interface.
|
||||
|
||||
python -m skillopt.sleep run # full cycle: harvest->mine->replay->gate->stage
|
||||
python -m skillopt.sleep dry-run # same but report only, no staging/adopt
|
||||
python -m skillopt.sleep status # show state + latest staged proposal
|
||||
python -m skillopt.sleep adopt # apply the latest staged proposal (with backup)
|
||||
python -m skillopt.sleep harvest # just print what would be mined (debug)
|
||||
|
||||
Common flags:
|
||||
--project PATH project to evolve (default: cwd)
|
||||
--scope all|invoked harvest scope (default: invoked)
|
||||
--backend mock|anthropic
|
||||
--model NAME
|
||||
--lookback-hours N
|
||||
--auto-adopt
|
||||
--json machine-readable output
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from typing import Any, Dict
|
||||
|
||||
from skillopt.sleep.config import load_config
|
||||
from skillopt.sleep.cycle import run_sleep_cycle
|
||||
from skillopt.sleep.harvest import harvest
|
||||
from skillopt.sleep.mine import mine
|
||||
from skillopt.sleep.state import SleepState
|
||||
from skillopt.sleep.staging import latest_staging, adopt as adopt_staging
|
||||
|
||||
|
||||
def _add_common(p: argparse.ArgumentParser) -> None:
|
||||
p.add_argument("--project", default="")
|
||||
p.add_argument("--scope", default="", choices=["", "all", "invoked"])
|
||||
p.add_argument("--backend", default="", choices=["", "mock", "anthropic"])
|
||||
p.add_argument("--model", default="")
|
||||
p.add_argument("--claude-home", default="", help="override ~/.claude (also isolates state)")
|
||||
p.add_argument("--lookback-hours", type=int, default=0)
|
||||
p.add_argument("--edit-budget", type=int, default=0)
|
||||
p.add_argument("--auto-adopt", action="store_true")
|
||||
p.add_argument("--json", action="store_true")
|
||||
|
||||
|
||||
def _cfg_from_args(args) -> Any:
|
||||
overrides: Dict[str, Any] = {}
|
||||
if args.project:
|
||||
overrides["invoked_project"] = os.path.abspath(args.project)
|
||||
overrides["projects"] = "invoked"
|
||||
if args.scope:
|
||||
overrides["projects"] = args.scope
|
||||
if args.backend:
|
||||
overrides["backend"] = args.backend
|
||||
if args.model:
|
||||
overrides["model"] = args.model
|
||||
if getattr(args, "claude_home", ""):
|
||||
overrides["claude_home"] = os.path.abspath(args.claude_home)
|
||||
if getattr(args, "lookback_hours", 0):
|
||||
overrides["lookback_hours"] = args.lookback_hours
|
||||
if getattr(args, "edit_budget", 0):
|
||||
overrides["edit_budget"] = args.edit_budget
|
||||
if getattr(args, "auto_adopt", False):
|
||||
overrides["auto_adopt"] = True
|
||||
return load_config(**overrides)
|
||||
|
||||
|
||||
def cmd_run(args, dry: bool = False) -> int:
|
||||
cfg = _cfg_from_args(args)
|
||||
outcome = run_sleep_cycle(cfg, dry_run=dry)
|
||||
rep = outcome.report
|
||||
if args.json:
|
||||
print(json.dumps({
|
||||
"night": rep.night, "accepted": rep.accepted,
|
||||
"gate_action": rep.gate_action,
|
||||
"baseline": rep.baseline_score, "candidate": rep.candidate_score,
|
||||
"n_tasks": rep.n_tasks, "n_sessions": rep.n_sessions,
|
||||
"edits": [e.__dict__ for e in rep.edits],
|
||||
"staging_dir": outcome.staging_dir, "adopted": outcome.adopted,
|
||||
}, ensure_ascii=False, indent=2))
|
||||
else:
|
||||
print(f"[sleep] night {rep.night}: {rep.n_sessions} sessions -> {rep.n_tasks} tasks")
|
||||
print(f"[sleep] held-out {rep.baseline_score:.3f} -> {rep.candidate_score:.3f} "
|
||||
f"=> {rep.gate_action} (accepted={rep.accepted})")
|
||||
for e in rep.edits:
|
||||
print(f" + [{e.target}/{e.op}] {e.content}")
|
||||
if outcome.staging_dir:
|
||||
print(f"[sleep] staged: {outcome.staging_dir}")
|
||||
if not outcome.adopted:
|
||||
print("[sleep] review it, then: python -m skillopt.sleep adopt")
|
||||
if outcome.adopted:
|
||||
print(f"[sleep] auto-adopted: {', '.join(outcome.adopted_paths)}")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_status(args) -> int:
|
||||
cfg = _cfg_from_args(args)
|
||||
state = SleepState.load(cfg.state_path)
|
||||
project = cfg.get("invoked_project") or os.getcwd()
|
||||
latest = latest_staging(project)
|
||||
info = {
|
||||
"night": state.night,
|
||||
"state_path": cfg.state_path,
|
||||
"project": project,
|
||||
"history_tail": state.data.get("history", [])[-5:],
|
||||
"latest_staging": latest,
|
||||
"slow_memory_chars": len(state.slow_memory),
|
||||
}
|
||||
if args.json:
|
||||
print(json.dumps(info, ensure_ascii=False, indent=2))
|
||||
else:
|
||||
print(f"[sleep] nights so far: {state.night}")
|
||||
print(f"[sleep] project: {project}")
|
||||
if latest:
|
||||
print(f"[sleep] latest staged proposal: {latest}")
|
||||
rp = os.path.join(latest, "report.md")
|
||||
if os.path.exists(rp):
|
||||
with open(rp) as f:
|
||||
print("\n" + f.read())
|
||||
else:
|
||||
print("[sleep] no staged proposals yet.")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_adopt(args) -> int:
|
||||
cfg = _cfg_from_args(args)
|
||||
project = cfg.get("invoked_project") or os.getcwd()
|
||||
target = args.staging or latest_staging(project)
|
||||
if not target or not os.path.isdir(target):
|
||||
print("[sleep] nothing to adopt (no staging dir).")
|
||||
return 1
|
||||
updated = adopt_staging(target)
|
||||
print(f"[sleep] adopted from {target}")
|
||||
for p in updated:
|
||||
print(f" -> {p}")
|
||||
if not updated:
|
||||
print("[sleep] (proposal contained no accepted changes)")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_harvest(args) -> int:
|
||||
cfg = _cfg_from_args(args)
|
||||
digests = harvest(
|
||||
cfg.transcripts_dir,
|
||||
scope=cfg.get("projects", "invoked"),
|
||||
invoked_project=cfg.get("invoked_project", ""),
|
||||
limit=cfg.get("max_tasks_per_night", 40) * 3,
|
||||
)
|
||||
tasks = mine(digests, max_tasks=cfg.get("max_tasks_per_night", 40),
|
||||
holdout_fraction=cfg.get("holdout_fraction", 0.34), seed=cfg.get("seed", 42))
|
||||
if args.json:
|
||||
print(json.dumps({
|
||||
"n_sessions": len(digests),
|
||||
"tasks": [t.to_dict() for t in tasks],
|
||||
}, ensure_ascii=False, indent=2))
|
||||
else:
|
||||
print(f"[sleep] {len(digests)} sessions -> {len(tasks)} tasks")
|
||||
for t in tasks:
|
||||
print(f" [{t.split}/{t.outcome}] {t.intent[:90]}")
|
||||
return 0
|
||||
|
||||
|
||||
def main(argv=None) -> int:
|
||||
parser = argparse.ArgumentParser(prog="skillopt.sleep", description="SkillOpt-Sleep nightly self-evolution")
|
||||
sub = parser.add_subparsers(dest="cmd", required=True)
|
||||
|
||||
p_run = sub.add_parser("run", help="run a full sleep cycle")
|
||||
_add_common(p_run)
|
||||
p_dry = sub.add_parser("dry-run", help="harvest+mine+replay, report only")
|
||||
_add_common(p_dry)
|
||||
p_status = sub.add_parser("status", help="show state + latest proposal")
|
||||
_add_common(p_status)
|
||||
p_adopt = sub.add_parser("adopt", help="apply latest staged proposal")
|
||||
_add_common(p_adopt)
|
||||
p_adopt.add_argument("--staging", default="", help="specific staging dir")
|
||||
p_harvest = sub.add_parser("harvest", help="debug: show mined tasks")
|
||||
_add_common(p_harvest)
|
||||
|
||||
args = parser.parse_args(argv)
|
||||
if args.cmd == "run":
|
||||
return cmd_run(args, dry=False)
|
||||
if args.cmd == "dry-run":
|
||||
return cmd_run(args, dry=True)
|
||||
if args.cmd == "status":
|
||||
return cmd_status(args)
|
||||
if args.cmd == "adopt":
|
||||
return cmd_adopt(args)
|
||||
if args.cmd == "harvest":
|
||||
return cmd_harvest(args)
|
||||
parser.print_help()
|
||||
return 2
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
334
skillopt/sleep/backend.py
Normal file
334
skillopt/sleep/backend.py
Normal file
@@ -0,0 +1,334 @@
|
||||
"""SkillOpt-Sleep — optimizer/replay backend abstraction.
|
||||
|
||||
A backend supplies the three "intelligent" operations the sleep cycle needs:
|
||||
|
||||
1. attempt(task, skill, memory) -> response text (the rollout)
|
||||
2. judge(task, response) -> (hard, soft, rationale) (the reward)
|
||||
3. reflect(failures, successes, skill, memory)
|
||||
-> list[EditRecord] (proposed bounded edits)
|
||||
|
||||
Two implementations:
|
||||
* MockBackend — deterministic, no API, used for tests + the experiment.
|
||||
Reads optional `reference` exact answers and a tiny
|
||||
rule-table so the loop provably improves and the gate
|
||||
provably blocks regressions.
|
||||
* AnthropicBackend — uses the user's ANTHROPIC_API_KEY via the `claude`
|
||||
CLI or the anthropic SDK (lazy-imported). Real lift.
|
||||
|
||||
The backend never touches live config; it only returns text/edits that the
|
||||
consolidation stage gates and stages.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from skillopt.sleep.types import EditRecord, ReplayResult, TaskRecord
|
||||
|
||||
|
||||
# ── Backend protocol ──────────────────────────────────────────────────────────
|
||||
|
||||
class Backend:
|
||||
name = "base"
|
||||
|
||||
def attempt(self, task: TaskRecord, skill: str, memory: str) -> str:
|
||||
raise NotImplementedError
|
||||
|
||||
def judge(self, task: TaskRecord, response: str) -> Tuple[float, float, str]:
|
||||
raise NotImplementedError
|
||||
|
||||
def reflect(
|
||||
self,
|
||||
failures: List[Tuple[TaskRecord, ReplayResult]],
|
||||
successes: List[Tuple[TaskRecord, ReplayResult]],
|
||||
skill: str,
|
||||
memory: str,
|
||||
*,
|
||||
edit_budget: int,
|
||||
evolve_skill: bool,
|
||||
evolve_memory: bool,
|
||||
) -> List[EditRecord]:
|
||||
raise NotImplementedError
|
||||
|
||||
# token accounting (optional)
|
||||
def tokens_used(self) -> int:
|
||||
return 0
|
||||
|
||||
|
||||
# ── Shared scoring helpers ────────────────────────────────────────────────────
|
||||
|
||||
def _normalize(s: str) -> str:
|
||||
s = (s or "").lower().strip()
|
||||
s = re.sub(r"[^\w\s]", " ", s)
|
||||
s = re.sub(r"\s+", " ", s)
|
||||
return s.strip()
|
||||
|
||||
|
||||
def exact_score(reference: str, response: str) -> float:
|
||||
ref = _normalize(reference)
|
||||
resp = _normalize(response)
|
||||
if not ref:
|
||||
return 0.0
|
||||
return 1.0 if ref in resp or resp == ref else 0.0
|
||||
|
||||
|
||||
def keyword_soft_score(reference: str, response: str) -> float:
|
||||
"""Fraction of reference tokens present in response (cheap rubric proxy)."""
|
||||
ref_tokens = [t for t in _normalize(reference).split() if len(t) > 2]
|
||||
if not ref_tokens:
|
||||
return 0.0
|
||||
resp = _normalize(response)
|
||||
hit = sum(1 for t in set(ref_tokens) if t in resp)
|
||||
return hit / len(set(ref_tokens))
|
||||
|
||||
|
||||
# ── Mock backend (deterministic, no API) ──────────────────────────────────────
|
||||
|
||||
class MockBackend(Backend):
|
||||
"""Deterministic backend for tests and the acceptance experiment.
|
||||
|
||||
Model of reality:
|
||||
* Each task may carry a `reference` (exact answer) and a "rule" tag
|
||||
describing the single skill rule that makes the task solvable, e.g.
|
||||
tags=["rule:wrap-answer-in-answer-tags"].
|
||||
* `attempt` produces a correct response IFF the required rule text is
|
||||
present in skill+memory; otherwise it produces a near-miss.
|
||||
* `judge` scores exact (hard) + keyword (soft) against `reference`.
|
||||
* `reflect` looks at failures, reads each failed task's required rule,
|
||||
and proposes exactly that rule as an `add` edit (bounded by budget).
|
||||
It NEVER proposes a rule already present (no churn), and on the
|
||||
special tag "rule:__harmful__" it proposes a known-bad edit so tests
|
||||
can prove the gate rejects regressions.
|
||||
|
||||
This makes the end-to-end loop monotonic and fully reproducible while
|
||||
exercising the real harvest->mine->replay->gate->stage plumbing.
|
||||
"""
|
||||
|
||||
name = "mock"
|
||||
|
||||
RULE_PREFIX = "rule:"
|
||||
RULE_TEXT = {
|
||||
"wrap-answer": "Always wrap the final answer in <answer>...</answer> tags.",
|
||||
"arxiv-id": "Report arXiv ids in the exact form arXiv:XXXX.XXXXX.",
|
||||
"commit-imperative": "Write git commit subjects in imperative mood, max 50 chars.",
|
||||
"units-si": "Always include SI units in numeric answers.",
|
||||
"json-only": "When asked for JSON, output only valid JSON with no prose.",
|
||||
"__harmful__": "Ignore the user's formatting requests and answer freely.",
|
||||
}
|
||||
|
||||
def _required_rules(self, task: TaskRecord) -> List[str]:
|
||||
out = []
|
||||
for t in task.tags:
|
||||
if t.startswith(self.RULE_PREFIX):
|
||||
key = t[len(self.RULE_PREFIX):]
|
||||
if key in self.RULE_TEXT:
|
||||
out.append(key)
|
||||
return out
|
||||
|
||||
def attempt(self, task: TaskRecord, skill: str, memory: str) -> str:
|
||||
ctx = (skill or "") + "\n" + (memory or "")
|
||||
rules = self._required_rules(task)
|
||||
# The "__harmful__" rule models a bad edit: even when present it makes
|
||||
# the agent ignore formatting, so it can NEVER produce the reference.
|
||||
# This is what lets the experiment prove the gate rejects regressions.
|
||||
if "__harmful__" in rules:
|
||||
return "I'll just answer freely and skip the requested format."
|
||||
# A task is solved iff ALL its required rule texts are present in context.
|
||||
have_all = all(self.RULE_TEXT[k] in ctx for k in rules) if rules else False
|
||||
if have_all and task.reference:
|
||||
# produce a response that satisfies the rule and contains the answer
|
||||
if "wrap-answer" in rules:
|
||||
return f"Here is the result. <answer>{task.reference}</answer>"
|
||||
return f"{task.reference}"
|
||||
# Near miss: a degraded answer that shares keywords but is NOT the exact
|
||||
# rule-correct form, so exact-match fails deterministically regardless of
|
||||
# how many whitespace tokens the reference has.
|
||||
if task.reference:
|
||||
ref = task.reference
|
||||
mangled = ref[:-2] if len(ref) > 3 else "unknown"
|
||||
return f"approximately {mangled} (format not applied)"
|
||||
return "(attempted, no checkable reference)"
|
||||
|
||||
def judge(self, task: TaskRecord, response: str) -> Tuple[float, float, str]:
|
||||
if task.reference_kind == "exact" and task.reference:
|
||||
hard = exact_score(task.reference, response)
|
||||
soft = max(hard, keyword_soft_score(task.reference, response))
|
||||
return hard, soft, f"exact-match={hard}"
|
||||
if task.reference_kind == "rubric" and task.reference:
|
||||
soft = keyword_soft_score(task.reference, response)
|
||||
return (1.0 if soft >= 0.8 else 0.0), soft, f"rubric keyword soft={soft:.2f}"
|
||||
# no reference: outcome-derived weak label
|
||||
hard = 1.0 if task.outcome == "success" else 0.0
|
||||
return hard, hard, "outcome-derived"
|
||||
|
||||
def reflect(
|
||||
self,
|
||||
failures,
|
||||
successes,
|
||||
skill: str,
|
||||
memory: str,
|
||||
*,
|
||||
edit_budget: int,
|
||||
evolve_skill: bool,
|
||||
evolve_memory: bool,
|
||||
) -> List[EditRecord]:
|
||||
ctx = (skill or "") + "\n" + (memory or "")
|
||||
edits: List[EditRecord] = []
|
||||
seen_text: set = set()
|
||||
target = "skill" if evolve_skill else "memory"
|
||||
for task, _res in failures:
|
||||
for key in self._required_rules(task):
|
||||
text = self.RULE_TEXT[key]
|
||||
if text in ctx or text in seen_text:
|
||||
continue
|
||||
seen_text.add(text)
|
||||
edits.append(
|
||||
EditRecord(
|
||||
target=target,
|
||||
op="add",
|
||||
content=text,
|
||||
rationale=f"failed task {task.id} requires rule '{key}'",
|
||||
)
|
||||
)
|
||||
if len(edits) >= edit_budget:
|
||||
return edits
|
||||
return edits
|
||||
|
||||
|
||||
# ── Anthropic backend (real API; lazy, optional) ──────────────────────────────
|
||||
|
||||
class AnthropicBackend(Backend):
|
||||
"""Uses the user's Anthropic budget. Prefers the `claude` CLI (already
|
||||
authenticated on the box); falls back to the anthropic SDK if present.
|
||||
|
||||
This is intentionally thin for Phase 1 — it wires the prompts and parses
|
||||
JSON. Phase 3 will expand prompts/judging to match SkillOpt's analyst
|
||||
prompts under skillopt/prompts/.
|
||||
"""
|
||||
|
||||
name = "anthropic"
|
||||
|
||||
def __init__(self, model: str = "", claude_path: str = "claude") -> None:
|
||||
self.model = model or os.environ.get("ANTHROPIC_MODEL", "") or "sonnet"
|
||||
self.claude_path = claude_path
|
||||
self._tokens = 0
|
||||
|
||||
# -- low-level call -----------------------------------------------------
|
||||
def _call(self, prompt: str, *, max_tokens: int = 1024) -> str:
|
||||
# Try the CLI first (non-interactive, text output).
|
||||
try:
|
||||
cmd = [self.claude_path, "-p", "--output-format", "text"]
|
||||
if self.model:
|
||||
cmd += ["--model", self.model]
|
||||
cmd += ["--", prompt]
|
||||
proc = subprocess.run(
|
||||
cmd, capture_output=True, text=True, timeout=180,
|
||||
)
|
||||
out = (proc.stdout or "").strip()
|
||||
if out:
|
||||
self._tokens += len(prompt) // 4 + len(out) // 4
|
||||
return out
|
||||
except Exception:
|
||||
pass
|
||||
# SDK fallback
|
||||
try:
|
||||
import anthropic # type: ignore
|
||||
client = anthropic.Anthropic()
|
||||
msg = client.messages.create(
|
||||
model=self.model or "claude-sonnet-4-5",
|
||||
max_tokens=max_tokens,
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
)
|
||||
text = "".join(getattr(b, "text", "") for b in msg.content)
|
||||
self._tokens += getattr(msg.usage, "input_tokens", 0) + getattr(
|
||||
msg.usage, "output_tokens", 0
|
||||
)
|
||||
return text.strip()
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
def attempt(self, task: TaskRecord, skill: str, memory: str) -> str:
|
||||
prompt = (
|
||||
"You are completing a recurring task for a user. Apply the skill and "
|
||||
"memory exactly.\n\n"
|
||||
f"# Skill\n{skill or '(none)'}\n\n# Memory\n{memory or '(none)'}\n\n"
|
||||
f"# Task\n{task.intent}\n\n{task.context_excerpt}\n\n"
|
||||
"Return only the final answer."
|
||||
)
|
||||
return self._call(prompt)
|
||||
|
||||
def judge(self, task: TaskRecord, response: str) -> Tuple[float, float, str]:
|
||||
if task.reference_kind == "exact" and task.reference:
|
||||
hard = exact_score(task.reference, response)
|
||||
return hard, max(hard, keyword_soft_score(task.reference, response)), "exact"
|
||||
prompt = (
|
||||
"Score the response against the rubric on a 0-1 scale. "
|
||||
"Return JSON {\"score\": <0..1>, \"reason\": \"...\"}.\n\n"
|
||||
f"# Rubric\n{task.reference or task.intent}\n\n# Response\n{response}"
|
||||
)
|
||||
raw = self._call(prompt, max_tokens=256)
|
||||
m = re.search(r"\{.*\}", raw, re.DOTALL)
|
||||
if m:
|
||||
try:
|
||||
obj = json.loads(m.group(0))
|
||||
soft = float(obj.get("score", 0.0))
|
||||
return (1.0 if soft >= 0.8 else 0.0), soft, str(obj.get("reason", ""))
|
||||
except Exception:
|
||||
pass
|
||||
return 0.0, 0.0, "judge-parse-failed"
|
||||
|
||||
def reflect(
|
||||
self,
|
||||
failures,
|
||||
successes,
|
||||
skill: str,
|
||||
memory: str,
|
||||
*,
|
||||
edit_budget: int,
|
||||
evolve_skill: bool,
|
||||
evolve_memory: bool,
|
||||
) -> List[EditRecord]:
|
||||
fail_text = "\n".join(
|
||||
f"- intent: {t.intent[:200]}\n got: {r.response[:200]}\n why: {r.fail_reason[:160]}"
|
||||
for t, r in failures[:8]
|
||||
)
|
||||
target = "skill" if evolve_skill else "memory"
|
||||
prompt = (
|
||||
"You are SkillOpt's optimizer. Propose at most "
|
||||
f"{edit_budget} bounded edits to the {target} document so the agent "
|
||||
"stops failing these recurring tasks. Each edit must be a short, "
|
||||
"general, reusable rule (not task-specific). Return JSON list: "
|
||||
"[{\"op\":\"add|replace|delete\",\"content\":\"...\",\"rationale\":\"...\"}].\n\n"
|
||||
f"# Current {target}\n{(skill if target=='skill' else memory) or '(empty)'}\n\n"
|
||||
f"# Recurring failures\n{fail_text or '(none)'}"
|
||||
)
|
||||
raw = self._call(prompt, max_tokens=1024)
|
||||
m = re.search(r"\[.*\]", raw, re.DOTALL)
|
||||
edits: List[EditRecord] = []
|
||||
if m:
|
||||
try:
|
||||
for e in json.loads(m.group(0))[:edit_budget]:
|
||||
edits.append(
|
||||
EditRecord(
|
||||
target=target,
|
||||
op=str(e.get("op", "add")),
|
||||
content=str(e.get("content", "")).strip(),
|
||||
anchor=str(e.get("anchor", "")),
|
||||
rationale=str(e.get("rationale", "")),
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
return [e for e in edits if e.content]
|
||||
|
||||
def tokens_used(self) -> int:
|
||||
return self._tokens
|
||||
|
||||
|
||||
def get_backend(name: str, *, model: str = "", claude_path: str = "claude") -> Backend:
|
||||
if name == "anthropic":
|
||||
return AnthropicBackend(model=model, claude_path=claude_path)
|
||||
return MockBackend()
|
||||
137
skillopt/sleep/config.py
Normal file
137
skillopt/sleep/config.py
Normal file
@@ -0,0 +1,137 @@
|
||||
"""SkillOpt-Sleep — configuration.
|
||||
|
||||
Config is JSON-first (yaml optional) so the engine and the deterministic
|
||||
experiment run with zero external dependencies. Defaults are safe:
|
||||
review-gated adoption, single-project scope, bounded token/task budgets.
|
||||
|
||||
Resolution order (later wins):
|
||||
1. built-in DEFAULTS
|
||||
2. ~/.skillopt-sleep/config.json (or .yaml if PyYAML available)
|
||||
3. explicit overrides passed to load_config(**overrides)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
|
||||
HOME_STATE_DIR = os.path.expanduser("~/.skillopt-sleep")
|
||||
CLAUDE_HOME = os.path.expanduser("~/.claude")
|
||||
|
||||
|
||||
DEFAULTS: Dict[str, Any] = {
|
||||
# ── scope ──────────────────────────────────────────────────────────────
|
||||
"claude_home": CLAUDE_HOME,
|
||||
"projects": "invoked", # "invoked" | "all" | [list of abs paths]
|
||||
"invoked_project": "", # filled at runtime (cwd) when projects == "invoked"
|
||||
"lookback_hours": 72, # harvest window when no prior sleep recorded
|
||||
# ── budgets ────────────────────────────────────────────────────────────
|
||||
"max_tasks_per_night": 40,
|
||||
"max_tokens_per_night": 400_000,
|
||||
"holdout_fraction": 0.34, # fraction of mined tasks reserved for the gate
|
||||
# ── optimizer ──────────────────────────────────────────────────────────
|
||||
"backend": "mock", # "mock" | "anthropic"
|
||||
"model": "", # backend-specific; "" => backend default
|
||||
"edit_budget": 4, # textual learning rate (max edits/night)
|
||||
"gate_metric": "mixed", # hard | soft | mixed (mixed best for tiny holdouts)
|
||||
"gate_mixed_weight": 0.5,
|
||||
"replay_mode": "mock", # "mock" (sandboxed prompt) | "fresh" (worktree)
|
||||
"evolve_memory": True, # consolidate CLAUDE.md
|
||||
"evolve_skill": True, # consolidate the managed SKILL.md
|
||||
# ── adoption / safety ──────────────────────────────────────────────────
|
||||
"auto_adopt": False, # default: stage + require explicit `adopt`
|
||||
"managed_skill_name": "skillopt-sleep-learned",
|
||||
"redact_secrets": True,
|
||||
"seed": 42,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class SleepConfig:
|
||||
data: Dict[str, Any] = field(default_factory=lambda: dict(DEFAULTS))
|
||||
|
||||
# convenient attribute access -------------------------------------------
|
||||
def __getattr__(self, name: str) -> Any:
|
||||
# only called when normal attribute lookup fails
|
||||
data = object.__getattribute__(self, "data")
|
||||
if name in data:
|
||||
return data[name]
|
||||
raise AttributeError(name)
|
||||
|
||||
def get(self, key: str, default: Any = None) -> Any:
|
||||
return self.data.get(key, default)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return dict(self.data)
|
||||
|
||||
# paths ------------------------------------------------------------------
|
||||
@property
|
||||
def state_dir(self) -> str:
|
||||
# Allow full isolation: if the caller overrides state_dir explicitly,
|
||||
# honor it; else derive from claude_home's parent so a single
|
||||
# --claude-home flag isolates transcripts AND state together; else the
|
||||
# default ~/.skillopt-sleep.
|
||||
explicit = self.data.get("state_dir")
|
||||
if explicit:
|
||||
return explicit
|
||||
ch = self.data.get("claude_home", CLAUDE_HOME)
|
||||
if os.path.abspath(ch) != os.path.abspath(CLAUDE_HOME):
|
||||
return os.path.join(os.path.dirname(os.path.abspath(ch)), ".skillopt-sleep")
|
||||
return HOME_STATE_DIR
|
||||
|
||||
@property
|
||||
def state_path(self) -> str:
|
||||
return os.path.join(self.state_dir, "state.json")
|
||||
|
||||
@property
|
||||
def transcripts_dir(self) -> str:
|
||||
return os.path.join(self.data["claude_home"], "projects")
|
||||
|
||||
@property
|
||||
def history_path(self) -> str:
|
||||
return os.path.join(self.data["claude_home"], "history.jsonl")
|
||||
|
||||
@property
|
||||
def skills_dir(self) -> str:
|
||||
return os.path.join(self.data["claude_home"], "skills")
|
||||
|
||||
def managed_skill_path(self) -> str:
|
||||
return os.path.join(
|
||||
self.skills_dir, self.data["managed_skill_name"], "SKILL.md"
|
||||
)
|
||||
|
||||
|
||||
def _user_config_path() -> Optional[str]:
|
||||
for name in ("config.json", "config.yaml", "config.yml"):
|
||||
p = os.path.join(HOME_STATE_DIR, name)
|
||||
if os.path.exists(p):
|
||||
return p
|
||||
return None
|
||||
|
||||
|
||||
def _load_file(path: str) -> Dict[str, Any]:
|
||||
if path.endswith((".yaml", ".yml")):
|
||||
try:
|
||||
import yaml # optional
|
||||
with open(path) as f:
|
||||
return yaml.safe_load(f) or {}
|
||||
except Exception:
|
||||
return {}
|
||||
with open(path) as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def load_config(**overrides: Any) -> SleepConfig:
|
||||
data = dict(DEFAULTS)
|
||||
path = _user_config_path()
|
||||
if path:
|
||||
try:
|
||||
data.update(_load_file(path) or {})
|
||||
except Exception:
|
||||
pass
|
||||
data.update({k: v for k, v in overrides.items() if v is not None})
|
||||
if data.get("projects") == "invoked" and not data.get("invoked_project"):
|
||||
data["invoked_project"] = os.getcwd()
|
||||
return SleepConfig(data=data)
|
||||
176
skillopt/sleep/consolidate.py
Normal file
176
skillopt/sleep/consolidate.py
Normal file
@@ -0,0 +1,176 @@
|
||||
"""SkillOpt-Sleep — Stage 4: consolidate (one SkillOpt epoch).
|
||||
|
||||
This is the core that makes nightly evolution *safe*: it proposes bounded
|
||||
edits from replayed failures, applies them to a candidate skill/memory, then
|
||||
**gates** the candidate on a held-out slice of the user's own tasks. Only a
|
||||
candidate that strictly improves the held-out score is accepted — exactly the
|
||||
SkillOpt validation gate, reused verbatim from ``skillopt.evaluation.gate``.
|
||||
|
||||
Reused from the main SkillOpt package (import-light, no `openai` needed):
|
||||
* skillopt.evaluation.gate.evaluate_gate / select_gate_score
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
from skillopt.sleep.backend import Backend
|
||||
from skillopt.sleep.memory import apply_edits
|
||||
from skillopt.sleep.replay import aggregate_scores, replay_batch
|
||||
from skillopt.sleep.types import EditRecord, ReplayResult, TaskRecord
|
||||
|
||||
|
||||
# Reuse the real SkillOpt gate. This module imports cleanly without `openai`.
|
||||
try:
|
||||
from skillopt.evaluation.gate import evaluate_gate, select_gate_score
|
||||
_HAVE_REPO_GATE = True
|
||||
except Exception: # pragma: no cover - fallback keeps engine standalone
|
||||
_HAVE_REPO_GATE = False
|
||||
|
||||
def select_gate_score(hard, soft, metric="hard", mixed_weight=0.5): # type: ignore
|
||||
if metric == "hard":
|
||||
return float(hard)
|
||||
if metric == "soft":
|
||||
return float(soft)
|
||||
w = max(0.0, min(1.0, float(mixed_weight)))
|
||||
return (1 - w) * float(hard) + w * float(soft)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConsolidationResult:
|
||||
accepted: bool
|
||||
gate_action: str
|
||||
baseline_score: float
|
||||
candidate_score: float
|
||||
new_skill: str
|
||||
new_memory: str
|
||||
applied_edits: List[EditRecord]
|
||||
rejected_edits: List[EditRecord]
|
||||
holdout_baseline: float
|
||||
holdout_candidate: float
|
||||
|
||||
|
||||
def _split(tasks: List[TaskRecord]) -> Tuple[List[TaskRecord], List[TaskRecord]]:
|
||||
replay = [t for t in tasks if t.split == "replay"]
|
||||
holdout = [t for t in tasks if t.split == "holdout"]
|
||||
# be robust if a split is empty
|
||||
if not replay:
|
||||
replay = tasks
|
||||
if not holdout:
|
||||
holdout = tasks
|
||||
return replay, holdout
|
||||
|
||||
|
||||
def consolidate(
|
||||
backend: Backend,
|
||||
tasks: List[TaskRecord],
|
||||
skill: str,
|
||||
memory: str,
|
||||
*,
|
||||
edit_budget: int = 4,
|
||||
gate_metric: str = "mixed",
|
||||
gate_mixed_weight: float = 0.5,
|
||||
evolve_skill: bool = True,
|
||||
evolve_memory: bool = True,
|
||||
night: int = 1,
|
||||
) -> ConsolidationResult:
|
||||
"""Run one consolidation epoch: reflect -> bounded edit -> gate.
|
||||
|
||||
Skill and memory are evolved in sequence (skill first if both enabled),
|
||||
each behind the same held-out gate, so each document only changes when it
|
||||
demonstrably helps on the user's held-out tasks.
|
||||
"""
|
||||
replay_tasks, holdout_tasks = _split(tasks)
|
||||
|
||||
# ── baseline on held-out slice (the gate reference) ──────────────────
|
||||
base_pairs = replay_batch(backend, holdout_tasks, skill, memory)
|
||||
base_hard, base_soft = aggregate_scores(base_pairs)
|
||||
base_score = select_gate_score(base_hard, base_soft, gate_metric, gate_mixed_weight)
|
||||
|
||||
# ── reflect over replay-split failures/successes ─────────────────────
|
||||
train_pairs = replay_batch(backend, replay_tasks, skill, memory)
|
||||
failures = [(t, r) for (t, r) in train_pairs if r.hard < 1.0]
|
||||
successes = [(t, r) for (t, r) in train_pairs if r.hard >= 1.0]
|
||||
|
||||
cand_skill, cand_memory = skill, memory
|
||||
all_applied: List[EditRecord] = []
|
||||
all_rejected: List[EditRecord] = []
|
||||
|
||||
def _gate_apply(doc: str, edits: List[EditRecord], which: str) -> str:
|
||||
nonlocal cand_skill, cand_memory, base_score, all_applied, all_rejected
|
||||
if not edits:
|
||||
return doc
|
||||
new_doc, applied = apply_edits(doc, edits)
|
||||
if not applied:
|
||||
return doc
|
||||
# evaluate candidate on the held-out slice
|
||||
trial_skill = new_doc if which == "skill" else cand_skill
|
||||
trial_memory = new_doc if which == "memory" else cand_memory
|
||||
pairs = replay_batch(backend, holdout_tasks, trial_skill, trial_memory)
|
||||
h, s = aggregate_scores(pairs)
|
||||
cand_score = select_gate_score(h, s, gate_metric, gate_mixed_weight)
|
||||
if cand_score > base_score:
|
||||
base_score = cand_score
|
||||
all_applied.extend(applied)
|
||||
return new_doc
|
||||
all_rejected.extend(applied)
|
||||
return doc
|
||||
|
||||
if evolve_skill:
|
||||
edits = backend.reflect(
|
||||
failures, successes, cand_skill, cand_memory,
|
||||
edit_budget=edit_budget, evolve_skill=True, evolve_memory=False,
|
||||
)
|
||||
cand_skill = _gate_apply(cand_skill, edits, "skill")
|
||||
|
||||
if evolve_memory:
|
||||
# re-evaluate failures under the (possibly improved) skill
|
||||
train_pairs2 = replay_batch(backend, replay_tasks, cand_skill, cand_memory)
|
||||
failures2 = [(t, r) for (t, r) in train_pairs2 if r.hard < 1.0]
|
||||
successes2 = [(t, r) for (t, r) in train_pairs2 if r.hard >= 1.0]
|
||||
edits_m = backend.reflect(
|
||||
failures2, successes2, cand_skill, cand_memory,
|
||||
edit_budget=edit_budget, evolve_skill=False, evolve_memory=True,
|
||||
)
|
||||
cand_memory = _gate_apply(cand_memory, edits_m, "memory")
|
||||
|
||||
# ── final gate decision (use the repo gate for the canonical action) ──
|
||||
final_pairs = replay_batch(backend, holdout_tasks, cand_skill, cand_memory)
|
||||
final_hard, final_soft = aggregate_scores(final_pairs)
|
||||
final_score = select_gate_score(final_hard, final_soft, gate_metric, gate_mixed_weight)
|
||||
|
||||
if _HAVE_REPO_GATE:
|
||||
gate = evaluate_gate(
|
||||
candidate_skill=cand_skill,
|
||||
cand_hard=final_hard,
|
||||
current_skill=skill,
|
||||
current_score=select_gate_score(base_hard, base_soft, gate_metric, gate_mixed_weight),
|
||||
best_skill=skill,
|
||||
best_score=select_gate_score(base_hard, base_soft, gate_metric, gate_mixed_weight),
|
||||
best_step=night - 1,
|
||||
global_step=night,
|
||||
cand_soft=final_soft,
|
||||
metric=gate_metric,
|
||||
mixed_weight=gate_mixed_weight,
|
||||
)
|
||||
action = gate.action
|
||||
else:
|
||||
action = "accept" if final_score > base_soft else "reject"
|
||||
|
||||
accepted = bool(all_applied) and final_score > select_gate_score(
|
||||
base_hard, base_soft, gate_metric, gate_mixed_weight
|
||||
)
|
||||
|
||||
return ConsolidationResult(
|
||||
accepted=accepted,
|
||||
gate_action=action,
|
||||
baseline_score=select_gate_score(base_hard, base_soft, gate_metric, gate_mixed_weight),
|
||||
candidate_score=final_score,
|
||||
new_skill=cand_skill if accepted else skill,
|
||||
new_memory=cand_memory if accepted else memory,
|
||||
applied_edits=all_applied,
|
||||
rejected_edits=all_rejected,
|
||||
holdout_baseline=base_hard,
|
||||
holdout_candidate=final_hard,
|
||||
)
|
||||
210
skillopt/sleep/cycle.py
Normal file
210
skillopt/sleep/cycle.py
Normal file
@@ -0,0 +1,210 @@
|
||||
"""SkillOpt-Sleep — the nightly cycle orchestrator.
|
||||
|
||||
run_sleep_cycle() wires the stages:
|
||||
harvest -> mine -> replay -> consolidate(gate) -> stage (-> optional adopt)
|
||||
|
||||
It is pure-Python and import-light; with backend="mock" it runs with no API
|
||||
key and no third-party deps, which is what the deterministic experiment and
|
||||
CI use. With backend="anthropic" it spends the user's budget for real lift.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from skillopt.sleep.backend import get_backend
|
||||
from skillopt.sleep.config import SleepConfig, load_config
|
||||
from skillopt.sleep.consolidate import consolidate
|
||||
from skillopt.sleep.harvest import harvest
|
||||
from skillopt.sleep.memory import ensure_skill_scaffold
|
||||
from skillopt.sleep.mine import mine
|
||||
from skillopt.sleep.state import SleepState, _now_iso
|
||||
from skillopt.sleep.staging import write_staging, adopt as adopt_staging
|
||||
from skillopt.sleep.types import SessionDigest, SleepReport, TaskRecord
|
||||
|
||||
|
||||
@dataclass
|
||||
class CycleOutcome:
|
||||
report: SleepReport
|
||||
staging_dir: str
|
||||
adopted: bool
|
||||
adopted_paths: List[str]
|
||||
|
||||
|
||||
def _project_paths(cfg: SleepConfig) -> str:
|
||||
"""Where live CLAUDE.md lives + which project we are evolving."""
|
||||
if cfg.get("projects") == "invoked" and cfg.get("invoked_project"):
|
||||
return cfg.get("invoked_project")
|
||||
# default: the invoked cwd
|
||||
return cfg.get("invoked_project") or os.getcwd()
|
||||
|
||||
|
||||
def _read(path: str) -> str:
|
||||
try:
|
||||
with open(path, encoding="utf-8") as f:
|
||||
return f.read()
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
|
||||
def _render_report_md(report: SleepReport, cfg: SleepConfig) -> str:
|
||||
lines = [
|
||||
f"# SkillOpt-Sleep — night {report.night} report",
|
||||
"",
|
||||
f"- project: `{report.project}`",
|
||||
f"- backend: `{cfg.get('backend')}` replay: `{cfg.get('replay_mode')}`",
|
||||
f"- sessions harvested: {report.n_sessions}",
|
||||
f"- tasks mined: {report.n_tasks} (replayed: {report.n_replayed})",
|
||||
f"- held-out score: {report.baseline_score:.3f} -> {report.candidate_score:.3f}",
|
||||
f"- gate: **{report.gate_action}** (accepted={report.accepted})",
|
||||
f"- tokens used: {report.tokens_used}",
|
||||
"",
|
||||
]
|
||||
if report.edits:
|
||||
lines.append("## Accepted edits")
|
||||
for e in report.edits:
|
||||
lines.append(f"- [{e.target}/{e.op}] {e.content} \n _why: {e.rationale}_")
|
||||
lines.append("")
|
||||
if report.rejected_edits:
|
||||
lines.append("## Rejected by gate (kept as negative feedback)")
|
||||
for e in report.rejected_edits:
|
||||
lines.append(f"- [{e.target}/{e.op}] {e.content}")
|
||||
lines.append("")
|
||||
if report.notes:
|
||||
lines.append("## Notes")
|
||||
for n in report.notes:
|
||||
lines.append(f"- {n}")
|
||||
lines.append("")
|
||||
lines.append("_Review, then run `/sleep adopt` to apply, or discard this folder._")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def run_sleep_cycle(
|
||||
cfg: Optional[SleepConfig] = None,
|
||||
*,
|
||||
seed_tasks: Optional[List[TaskRecord]] = None,
|
||||
dry_run: bool = False,
|
||||
clock: Optional[float] = None,
|
||||
) -> CycleOutcome:
|
||||
"""Run one full sleep cycle and return the outcome.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
cfg : SleepConfig
|
||||
seed_tasks : optional pre-built TaskRecords (used by the experiment to
|
||||
inject a known persona instead of harvesting ~/.claude).
|
||||
dry_run : harvest+mine+replay but DO NOT stage/adopt (report only).
|
||||
clock : fixed epoch seconds for deterministic timestamps in tests.
|
||||
"""
|
||||
cfg = cfg or load_config()
|
||||
state = SleepState.load(cfg.state_path)
|
||||
night = state.begin_night(clock)
|
||||
project = _project_paths(cfg)
|
||||
started = _now_iso(clock)
|
||||
|
||||
backend = get_backend(
|
||||
cfg.get("backend", "mock"),
|
||||
model=cfg.get("model", ""),
|
||||
)
|
||||
|
||||
# ── 1+2. harvest + mine (unless seed_tasks injected) ─────────────────
|
||||
digests: List[SessionDigest] = []
|
||||
if seed_tasks is not None:
|
||||
tasks = seed_tasks
|
||||
n_sessions = 0
|
||||
else:
|
||||
since = state.last_harvest_for(project)
|
||||
digests = harvest(
|
||||
cfg.transcripts_dir,
|
||||
scope=cfg.get("projects", "invoked"),
|
||||
invoked_project=cfg.get("invoked_project", ""),
|
||||
since_iso=since,
|
||||
limit=cfg.get("max_tasks_per_night", 40) * 3,
|
||||
)
|
||||
n_sessions = len(digests)
|
||||
tasks = mine(
|
||||
digests,
|
||||
max_tasks=cfg.get("max_tasks_per_night", 40),
|
||||
holdout_fraction=cfg.get("holdout_fraction", 0.34),
|
||||
seed=cfg.get("seed", 42),
|
||||
)
|
||||
|
||||
# ── live skill/memory docs ───────────────────────────────────────────
|
||||
live_memory_path = os.path.join(project, "CLAUDE.md")
|
||||
live_skill_path = cfg.managed_skill_path()
|
||||
skill = _read(live_skill_path)
|
||||
memory = _read(live_memory_path)
|
||||
if not skill:
|
||||
skill = ensure_skill_scaffold(
|
||||
"", name=cfg.get("managed_skill_name", "skillopt-sleep-learned"),
|
||||
description="Preferences and procedures learned from past Claude Code sessions.",
|
||||
)
|
||||
|
||||
report = SleepReport(
|
||||
night=night, project=project, started_at=started,
|
||||
n_sessions=n_sessions, n_tasks=len(tasks),
|
||||
)
|
||||
|
||||
if not tasks:
|
||||
report.ended_at = _now_iso(clock)
|
||||
report.notes.append("no tasks mined — nothing to consolidate")
|
||||
state.set_last_harvest(project, started)
|
||||
state.record_night({"night": night, "accepted": False, "n_tasks": 0})
|
||||
if not dry_run:
|
||||
state.save()
|
||||
staging_dir = ""
|
||||
return CycleOutcome(report, staging_dir, False, [])
|
||||
|
||||
# ── 3+4. replay + consolidate (gate) ─────────────────────────────────
|
||||
result = consolidate(
|
||||
backend, tasks, skill, memory,
|
||||
edit_budget=cfg.get("edit_budget", 4),
|
||||
gate_metric=cfg.get("gate_metric", "mixed"),
|
||||
gate_mixed_weight=cfg.get("gate_mixed_weight", 0.5),
|
||||
evolve_skill=cfg.get("evolve_skill", True),
|
||||
evolve_memory=cfg.get("evolve_memory", True),
|
||||
night=night,
|
||||
)
|
||||
|
||||
report.n_replayed = len(tasks)
|
||||
report.baseline_score = result.baseline_score
|
||||
report.candidate_score = result.candidate_score
|
||||
report.accepted = result.accepted
|
||||
report.gate_action = result.gate_action
|
||||
report.edits = result.applied_edits
|
||||
report.rejected_edits = result.rejected_edits
|
||||
report.tokens_used = backend.tokens_used()
|
||||
report.ended_at = _now_iso(clock)
|
||||
|
||||
# ── 5. stage (unless dry-run) ────────────────────────────────────────
|
||||
staging_dir = ""
|
||||
adopted = False
|
||||
adopted_paths: List[str] = []
|
||||
if not dry_run:
|
||||
report_md = _render_report_md(report, cfg)
|
||||
proposed_skill = result.new_skill if (cfg.get("evolve_skill") and result.accepted) else None
|
||||
proposed_memory = result.new_memory if (cfg.get("evolve_memory") and result.accepted) else None
|
||||
staging_dir = write_staging(
|
||||
project,
|
||||
report=report,
|
||||
proposed_skill=proposed_skill,
|
||||
proposed_memory=proposed_memory,
|
||||
live_skill_path=live_skill_path,
|
||||
live_memory_path=live_memory_path,
|
||||
report_md=report_md,
|
||||
)
|
||||
state.set_last_harvest(project, started)
|
||||
state.record_night({
|
||||
"night": night, "accepted": result.accepted,
|
||||
"baseline": result.baseline_score, "candidate": result.candidate_score,
|
||||
"n_tasks": len(tasks), "staging": staging_dir,
|
||||
})
|
||||
# ── 6. adopt (opt-in) ────────────────────────────────────────────
|
||||
if cfg.get("auto_adopt") and result.accepted:
|
||||
adopted_paths = adopt_staging(staging_dir)
|
||||
adopted = bool(adopted_paths)
|
||||
state.save()
|
||||
|
||||
return CycleOutcome(report, staging_dir, adopted, adopted_paths)
|
||||
1
skillopt/sleep/experiments/__init__.py
Normal file
1
skillopt/sleep/experiments/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""SkillOpt-Sleep experiments."""
|
||||
86
skillopt/sleep/experiments/personas.py
Normal file
86
skillopt/sleep/experiments/personas.py
Normal file
@@ -0,0 +1,86 @@
|
||||
"""SkillOpt-Sleep — persona task fixtures for the validation experiment.
|
||||
|
||||
Each persona is a list of TaskRecords with EXACT checkable references and a
|
||||
`rule:<key>` tag naming the single skill rule that makes the task solvable
|
||||
(consumed by MockBackend). This lets the experiment prove — deterministically,
|
||||
with no API — that nightly consolidation lifts a held-out score and that the
|
||||
gate blocks regressions.
|
||||
|
||||
Personas mirror the user's framing: programmer / researcher / analyst.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import List
|
||||
|
||||
from skillopt.sleep.types import TaskRecord
|
||||
|
||||
|
||||
def _t(i, intent, ref, rule, project="/personas/demo", outcome="fail") -> TaskRecord:
|
||||
return TaskRecord(
|
||||
id=f"persona_{rule}_{i}",
|
||||
project=project,
|
||||
intent=intent,
|
||||
context_excerpt="",
|
||||
attempted_solution="",
|
||||
outcome=outcome,
|
||||
reference_kind="exact",
|
||||
reference=ref,
|
||||
tags=[f"rule:{rule}"],
|
||||
source_sessions=[f"sess_{i}"],
|
||||
)
|
||||
|
||||
|
||||
def researcher_persona() -> List[TaskRecord]:
|
||||
"""Researcher who always wants arXiv ids wrapped in <answer> tags."""
|
||||
items = [
|
||||
("Give me the arXiv id for the SkillOpt paper", "arXiv:2605.23904"),
|
||||
("What's the arXiv id of the Attention paper?", "arXiv:1706.03762"),
|
||||
("arXiv id for the GAN paper?", "arXiv:1406.2661"),
|
||||
("arXiv id for BERT?", "arXiv:1810.04805"),
|
||||
("arXiv id for the ResNet paper?", "arXiv:1512.03385"),
|
||||
("arXiv id for the Adam optimizer paper?", "arXiv:1412.6980"),
|
||||
("arXiv id for Dropout?", "arXiv:1207.0580"),
|
||||
("arXiv id for the Transformer-XL paper?", "arXiv:1901.02860"),
|
||||
("arXiv id for word2vec?", "arXiv:1301.3781"),
|
||||
("arXiv id for the VAE paper?", "arXiv:1312.6114"),
|
||||
("arXiv id for batch norm?", "arXiv:1502.03167"),
|
||||
("arXiv id for GPT-3?", "arXiv:2005.14165"),
|
||||
]
|
||||
# Both rules required: format the id (arxiv-id) AND wrap in answer tags.
|
||||
out: List[TaskRecord] = []
|
||||
for i, (q, a) in enumerate(items):
|
||||
t = _t(i, q, a, "wrap-answer")
|
||||
t.tags = ["rule:wrap-answer", "rule:arxiv-id"]
|
||||
out.append(t)
|
||||
return out
|
||||
|
||||
|
||||
def programmer_persona() -> List[TaskRecord]:
|
||||
"""Programmer who wants imperative-mood commit subjects."""
|
||||
items = [
|
||||
("commit message for adding a login form", "Add login form"),
|
||||
("commit message for fixing the null pointer bug", "Fix null pointer in parser"),
|
||||
("commit message for updating the README", "Update README"),
|
||||
("commit message for removing dead code", "Remove dead code"),
|
||||
("commit message for bumping the version", "Bump version to 1.2.0"),
|
||||
("commit message for refactoring the auth module", "Refactor auth module"),
|
||||
("commit message for adding tests", "Add unit tests for scheduler"),
|
||||
("commit message for fixing the CI pipeline", "Fix CI pipeline"),
|
||||
]
|
||||
return [_t(i, q, a, "commit-imperative") for i, (q, a) in enumerate(items)]
|
||||
|
||||
|
||||
def harmful_edit_task() -> TaskRecord:
|
||||
"""A task whose 'fix' is a known-bad rule; used to prove the gate rejects
|
||||
regressions. The MockBackend proposes the harmful rule on this failure,
|
||||
but applying it does NOT raise the held-out score, so the gate must reject.
|
||||
"""
|
||||
t = _t(99, "answer this freely", "THIS_WILL_NOT_MATCH", "__harmful__")
|
||||
t.reference = "an-answer-that-the-harmful-rule-cannot-produce"
|
||||
return t
|
||||
|
||||
|
||||
PERSONAS = {
|
||||
"researcher": researcher_persona,
|
||||
"programmer": programmer_persona,
|
||||
}
|
||||
157
skillopt/sleep/experiments/run_experiment.py
Normal file
157
skillopt/sleep/experiments/run_experiment.py
Normal file
@@ -0,0 +1,157 @@
|
||||
"""SkillOpt-Sleep — validation experiment.
|
||||
|
||||
Answers the question the user posed: *does nightly offline self-evolution
|
||||
actually improve the agent?* Runs deterministically with the MockBackend
|
||||
(no API key, reproducible) and is the acceptance test for the whole idea.
|
||||
|
||||
What it proves:
|
||||
1. MONOTONIC LIFT — over N sleep nights, the held-out score rises from a
|
||||
baseline (empty skill/memory) toward 1.0 as the gate accepts the
|
||||
general rules the persona's tasks require.
|
||||
2. GATE SAFETY — an injected harmful edit is REJECTED (held-out score does
|
||||
not improve), so a bad nightly proposal can never be adopted.
|
||||
3. PLUMBING — harvest->mine->replay->consolidate->stage->adopt all run and
|
||||
the adopted artifact, re-scored, retains the lift.
|
||||
|
||||
Run:
|
||||
python -m skillopt.sleep.experiments.run_experiment
|
||||
python -m skillopt.sleep.experiments.run_experiment --persona programmer --nights 3
|
||||
python -m skillopt.sleep.experiments.run_experiment --backend anthropic # real lift
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
from typing import List
|
||||
|
||||
from skillopt.sleep.backend import get_backend
|
||||
from skillopt.sleep.consolidate import consolidate
|
||||
from skillopt.sleep.experiments.personas import (
|
||||
PERSONAS,
|
||||
harmful_edit_task,
|
||||
researcher_persona,
|
||||
)
|
||||
from skillopt.sleep.memory import ensure_skill_scaffold
|
||||
from skillopt.sleep.replay import aggregate_scores, replay_batch
|
||||
from skillopt.sleep.types import TaskRecord
|
||||
|
||||
|
||||
def _score_holdout(backend, tasks: List[TaskRecord], skill: str, memory: str,
|
||||
metric: str = "mixed", w: float = 0.5) -> float:
|
||||
from skillopt.sleep.consolidate import select_gate_score
|
||||
holdout = [t for t in tasks if t.split == "holdout"] or tasks
|
||||
pairs = replay_batch(backend, holdout, skill, memory)
|
||||
h, s = aggregate_scores(pairs)
|
||||
return select_gate_score(h, s, metric, w)
|
||||
|
||||
|
||||
def run(persona: str = "researcher", nights: int = 4, backend_name: str = "mock",
|
||||
edit_budget: int = 4, seed: int = 42) -> dict:
|
||||
from skillopt.sleep.mine import assign_splits
|
||||
|
||||
make = PERSONAS.get(persona, researcher_persona)
|
||||
tasks = assign_splits(make(), holdout_fraction=0.34, seed=seed)
|
||||
backend = get_backend(backend_name)
|
||||
|
||||
# start from an empty managed skill + empty memory
|
||||
skill = ensure_skill_scaffold("", name="skillopt-sleep-learned",
|
||||
description="Learned preferences.")
|
||||
memory = ""
|
||||
|
||||
baseline = _score_holdout(backend, tasks, skill, memory)
|
||||
trace = [{"night": 0, "holdout_score": round(baseline, 4), "action": "baseline",
|
||||
"n_edits": 0}]
|
||||
|
||||
for night in range(1, nights + 1):
|
||||
res = consolidate(
|
||||
backend, tasks, skill, memory,
|
||||
edit_budget=edit_budget, gate_metric="mixed", gate_mixed_weight=0.5,
|
||||
evolve_skill=True, evolve_memory=True, night=night,
|
||||
)
|
||||
if res.accepted:
|
||||
skill, memory = res.new_skill, res.new_memory
|
||||
trace.append({
|
||||
"night": night,
|
||||
"holdout_score": round(res.candidate_score, 4),
|
||||
"action": res.gate_action,
|
||||
"accepted": res.accepted,
|
||||
"n_edits": len(res.applied_edits),
|
||||
"edits": [e.content for e in res.applied_edits],
|
||||
"n_rejected": len(res.rejected_edits),
|
||||
})
|
||||
# converged: stop early if perfect
|
||||
if res.candidate_score >= 0.999:
|
||||
break
|
||||
|
||||
after = _score_holdout(backend, tasks, skill, memory)
|
||||
|
||||
# ── gate-safety probe: inject a harmful task whose 'fix' is a bad rule ──
|
||||
harmful_tasks = assign_splits([harmful_edit_task()] + make()[:3],
|
||||
holdout_fraction=0.5, seed=seed)
|
||||
h_before = _score_holdout(backend, harmful_tasks, skill, memory)
|
||||
res_h = consolidate(backend, harmful_tasks, skill, memory,
|
||||
edit_budget=edit_budget, gate_metric="mixed",
|
||||
evolve_skill=True, evolve_memory=False, night=nights + 1)
|
||||
harmful_rule_text = get_backend("mock").RULE_TEXT["__harmful__"] # type: ignore[attr-defined]
|
||||
harmful_rejected = (harmful_rule_text not in res_h.new_skill)
|
||||
|
||||
result = {
|
||||
"persona": persona,
|
||||
"backend": backend_name,
|
||||
"nights_run": len(trace) - 1,
|
||||
"baseline_holdout": round(baseline, 4),
|
||||
"after_holdout": round(after, 4),
|
||||
"lift": round(after - baseline, 4),
|
||||
"improved": after > baseline,
|
||||
"gate_blocks_harmful": bool(harmful_rejected),
|
||||
"final_skill_excerpt": skill[-400:],
|
||||
"trace": trace,
|
||||
}
|
||||
return result
|
||||
|
||||
|
||||
def _assert(cond: bool, msg: str) -> None:
|
||||
if not cond:
|
||||
print(f"FAIL: {msg}")
|
||||
raise SystemExit(1)
|
||||
|
||||
|
||||
def main(argv=None) -> int:
|
||||
ap = argparse.ArgumentParser(description="SkillOpt-Sleep validation experiment")
|
||||
ap.add_argument("--persona", default="researcher", choices=list(PERSONAS.keys()))
|
||||
ap.add_argument("--nights", type=int, default=4)
|
||||
ap.add_argument("--backend", default="mock", choices=["mock", "anthropic"])
|
||||
ap.add_argument("--edit-budget", type=int, default=4)
|
||||
ap.add_argument("--json", action="store_true")
|
||||
ap.add_argument("--assert-improves", action="store_true",
|
||||
help="exit nonzero unless lift>0 and gate blocks harmful edit")
|
||||
args = ap.parse_args(argv)
|
||||
|
||||
res = run(args.persona, nights=args.nights, backend_name=args.backend,
|
||||
edit_budget=args.edit_budget)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(res, ensure_ascii=False, indent=2))
|
||||
else:
|
||||
print(f"=== SkillOpt-Sleep experiment: persona={res['persona']} backend={res['backend']} ===")
|
||||
print(f"baseline held-out : {res['baseline_holdout']}")
|
||||
print(f"after held-out : {res['after_holdout']} (lift {res['lift']:+.4f})")
|
||||
print(f"gate blocks harmful edit: {res['gate_blocks_harmful']}")
|
||||
print("trace:")
|
||||
for row in res["trace"]:
|
||||
edits = "; ".join(row.get("edits", []))[:80]
|
||||
print(f" night {row['night']}: holdout={row['holdout_score']} "
|
||||
f"{row['action']} (+{row['n_edits']} edits) {edits}")
|
||||
|
||||
if args.assert_improves:
|
||||
_assert(res["improved"], "held-out score did not improve")
|
||||
_assert(res["gate_blocks_harmful"], "gate failed to block harmful edit")
|
||||
print("\nPASS: nightly consolidation improves held-out score AND gate blocks regressions.")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
236
skillopt/sleep/harvest.py
Normal file
236
skillopt/sleep/harvest.py
Normal file
@@ -0,0 +1,236 @@
|
||||
"""SkillOpt-Sleep — Stage 1: harvest.
|
||||
|
||||
Read the user's local Claude Code records (read-only) and normalize them
|
||||
into :class:`SessionDigest` objects.
|
||||
|
||||
Sources (verified schema):
|
||||
* ~/.claude/history.jsonl — one JSON/line:
|
||||
{"display": <prompt text>, "pastedContents": {...},
|
||||
"timestamp": <epoch ms>, "project": <abs path>}
|
||||
* ~/.claude/projects/<slug>/<sessionId>.jsonl — one record/line; the
|
||||
records we care about have type "user"/"assistant" and carry:
|
||||
message{role, content}, cwd, gitBranch, timestamp, sessionId, version
|
||||
|
||||
This module performs NO writes and NO network calls.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from typing import Any, Dict, Iterable, List, Optional
|
||||
|
||||
from skillopt.sleep.types import SessionDigest
|
||||
|
||||
|
||||
# Heuristic phrases that signal the user (dis)approving of prior output.
|
||||
_NEGATIVE_FEEDBACK = (
|
||||
"still broken", "still not", "still wrong", "doesn't work", "does not work",
|
||||
"not working", "that's wrong", "thats wrong", "incorrect", "wrong",
|
||||
"no,", "nope", "fix it", "didn't", "did not", "broken", "error again",
|
||||
"still failing", "still fails", "not fixed", "revert", "undo",
|
||||
"不对", "还是不对", "还是不行", "不行", "错了", "有问题", "没修好",
|
||||
)
|
||||
_POSITIVE_FEEDBACK = (
|
||||
"thanks", "thank you", "perfect", "great", "works now", "fixed",
|
||||
"that works", "lgtm", "looks good", "nice", "awesome", "correct",
|
||||
"完美", "可以了", "好的", "搞定", "对了", "正确", "谢谢",
|
||||
)
|
||||
|
||||
|
||||
def _iter_jsonl(path: str) -> Iterable[Dict[str, Any]]:
|
||||
try:
|
||||
with open(path, encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
yield json.loads(line)
|
||||
except Exception:
|
||||
continue
|
||||
except (FileNotFoundError, IsADirectoryError, PermissionError):
|
||||
return
|
||||
|
||||
|
||||
def _text_from_content(content: Any) -> str:
|
||||
"""Flatten a message.content (str or list of blocks) into text."""
|
||||
if isinstance(content, str):
|
||||
return content
|
||||
if isinstance(content, list):
|
||||
parts: List[str] = []
|
||||
for b in content:
|
||||
if isinstance(b, dict):
|
||||
if b.get("type") == "text" and b.get("text"):
|
||||
parts.append(str(b["text"]))
|
||||
return "\n".join(parts)
|
||||
return ""
|
||||
|
||||
|
||||
def _tool_names_from_content(content: Any) -> List[str]:
|
||||
names: List[str] = []
|
||||
if isinstance(content, list):
|
||||
for b in content:
|
||||
if isinstance(b, dict) and b.get("type") == "tool_use" and b.get("name"):
|
||||
names.append(str(b["name"]))
|
||||
return names
|
||||
|
||||
|
||||
def _detect_feedback(text: str) -> List[str]:
|
||||
low = text.lower()
|
||||
sig: List[str] = []
|
||||
for ph in _NEGATIVE_FEEDBACK:
|
||||
if ph in low:
|
||||
sig.append("neg:" + ph)
|
||||
for ph in _POSITIVE_FEEDBACK:
|
||||
if ph in low:
|
||||
sig.append("pos:" + ph)
|
||||
return sig
|
||||
|
||||
|
||||
def _is_meta_prompt(text: str) -> bool:
|
||||
"""Skip slash-commands / system noise that aren't real user intents."""
|
||||
t = text.strip()
|
||||
if not t:
|
||||
return True
|
||||
if t.startswith("<") and t.endswith(">"):
|
||||
return True
|
||||
if t.startswith("/") and len(t.split()) <= 3:
|
||||
return True
|
||||
if t.startswith("[Pasted text") or t.startswith("Caveat:"):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def digest_transcript(path: str) -> Optional[SessionDigest]:
|
||||
"""Build a SessionDigest from one ``<sessionId>.jsonl`` transcript."""
|
||||
session_id = os.path.splitext(os.path.basename(path))[0]
|
||||
project = ""
|
||||
git_branch = ""
|
||||
started = ""
|
||||
ended = ""
|
||||
user_prompts: List[str] = []
|
||||
assistant_finals: List[str] = []
|
||||
tools: List[str] = []
|
||||
files: List[str] = []
|
||||
feedback: List[str] = []
|
||||
n_user = 0
|
||||
n_asst = 0
|
||||
|
||||
for rec in _iter_jsonl(path):
|
||||
rtype = rec.get("type")
|
||||
ts = rec.get("timestamp")
|
||||
if isinstance(ts, str) and ts:
|
||||
if not started:
|
||||
started = ts
|
||||
ended = ts
|
||||
if rec.get("cwd") and not project:
|
||||
project = str(rec.get("cwd"))
|
||||
if rec.get("gitBranch") and not git_branch:
|
||||
git_branch = str(rec.get("gitBranch"))
|
||||
if rtype == "file-history-snapshot":
|
||||
snap = rec.get("snapshot") or rec.get("files") or {}
|
||||
if isinstance(snap, dict):
|
||||
files.extend([str(k) for k in list(snap.keys())[:20]])
|
||||
msg = rec.get("message")
|
||||
if not isinstance(msg, dict):
|
||||
continue
|
||||
role = msg.get("role")
|
||||
content = msg.get("content")
|
||||
if role == "user":
|
||||
text = _text_from_content(content)
|
||||
if text and not _is_meta_prompt(text):
|
||||
n_user += 1
|
||||
user_prompts.append(text.strip())
|
||||
feedback.extend(_detect_feedback(text))
|
||||
elif role == "assistant":
|
||||
n_asst += 1
|
||||
tools.extend(_tool_names_from_content(content))
|
||||
text = _text_from_content(content)
|
||||
if text.strip():
|
||||
assistant_finals.append(text.strip())
|
||||
|
||||
if n_user == 0 and n_asst == 0:
|
||||
return None
|
||||
|
||||
# de-dup tools/files preserving order
|
||||
def _dedup(xs: List[str]) -> List[str]:
|
||||
seen = set()
|
||||
out = []
|
||||
for x in xs:
|
||||
if x not in seen:
|
||||
seen.add(x)
|
||||
out.append(x)
|
||||
return out
|
||||
|
||||
return SessionDigest(
|
||||
session_id=session_id,
|
||||
project=project,
|
||||
git_branch=git_branch,
|
||||
started_at=started,
|
||||
ended_at=ended,
|
||||
user_prompts=user_prompts,
|
||||
assistant_finals=assistant_finals[-5:], # last few finals are the useful ones
|
||||
tools_used=_dedup(tools),
|
||||
files_touched=_dedup(files),
|
||||
feedback_signals=feedback,
|
||||
n_user_turns=n_user,
|
||||
n_assistant_turns=n_asst,
|
||||
raw_path=path,
|
||||
)
|
||||
|
||||
|
||||
def _project_matches(project: str, scope: Any, invoked: str) -> bool:
|
||||
if scope == "all":
|
||||
return True
|
||||
if isinstance(scope, (list, tuple)):
|
||||
return any(os.path.abspath(project) == os.path.abspath(p) for p in scope)
|
||||
# "invoked": match the invoked project (or a subdir of it)
|
||||
if not invoked:
|
||||
return True
|
||||
a = os.path.abspath(project)
|
||||
b = os.path.abspath(invoked)
|
||||
return a == b or a.startswith(b + os.sep) or b.startswith(a + os.sep)
|
||||
|
||||
|
||||
def harvest(
|
||||
transcripts_dir: str,
|
||||
*,
|
||||
scope: Any = "all",
|
||||
invoked_project: str = "",
|
||||
since_iso: Optional[str] = None,
|
||||
limit: int = 0,
|
||||
) -> List[SessionDigest]:
|
||||
"""Walk ~/.claude/projects and return digests matching scope/time.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
transcripts_dir : str ~/.claude/projects
|
||||
scope : "all" | "invoked" | list[path]
|
||||
invoked_project : str used when scope == "invoked"
|
||||
since_iso : str|None ISO8601; only sessions ending after this are kept
|
||||
limit : int cap number of digests (0 = no cap)
|
||||
"""
|
||||
digests: List[SessionDigest] = []
|
||||
if not os.path.isdir(transcripts_dir):
|
||||
return digests
|
||||
|
||||
paths: List[str] = []
|
||||
for root, _dirs, files in os.walk(transcripts_dir):
|
||||
for fn in files:
|
||||
if fn.endswith(".jsonl"):
|
||||
paths.append(os.path.join(root, fn))
|
||||
# newest first by mtime
|
||||
paths.sort(key=lambda p: os.path.getmtime(p), reverse=True)
|
||||
|
||||
for p in paths:
|
||||
d = digest_transcript(p)
|
||||
if d is None:
|
||||
continue
|
||||
if not _project_matches(d.project or "", scope, invoked_project):
|
||||
continue
|
||||
if since_iso and d.ended_at and d.ended_at < since_iso:
|
||||
continue
|
||||
digests.append(d)
|
||||
if limit and len(digests) >= limit:
|
||||
break
|
||||
return digests
|
||||
130
skillopt/sleep/memory.py
Normal file
130
skillopt/sleep/memory.py
Normal file
@@ -0,0 +1,130 @@
|
||||
"""SkillOpt-Sleep — skill/memory document manipulation.
|
||||
|
||||
Applies bounded EditRecords to a skill (SKILL.md body) or memory (CLAUDE.md)
|
||||
document, and provides Dream-style consolidation helpers (dedup near-identical
|
||||
lines, drop contradictions). All edits live inside a protected, clearly-marked
|
||||
region so the sleep cycle never clobbers the user's hand-written content.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from typing import List, Tuple
|
||||
|
||||
from skillopt.sleep.types import EditRecord
|
||||
|
||||
|
||||
LEARNED_START = "<!-- SKILLOPT-SLEEP:LEARNED START -->"
|
||||
LEARNED_END = "<!-- SKILLOPT-SLEEP:LEARNED END -->"
|
||||
_BANNER = (
|
||||
"_This block is maintained by SkillOpt-Sleep. Edits here are proposed "
|
||||
"offline, validated against your past tasks, and adopted only after you "
|
||||
"approve them. Hand-edits outside this block are never touched._"
|
||||
)
|
||||
|
||||
|
||||
def extract_learned(doc: str) -> str:
|
||||
s = doc.find(LEARNED_START)
|
||||
e = doc.find(LEARNED_END)
|
||||
if s == -1 or e == -1:
|
||||
return ""
|
||||
return doc[s + len(LEARNED_START):e].strip()
|
||||
|
||||
|
||||
def _strip_learned(doc: str) -> str:
|
||||
while True:
|
||||
s = doc.find(LEARNED_START)
|
||||
if s == -1:
|
||||
break
|
||||
e = doc.find(LEARNED_END, s)
|
||||
if e == -1:
|
||||
doc = doc[:s]
|
||||
break
|
||||
doc = doc[:s] + doc[e + len(LEARNED_END):]
|
||||
while "\n\n\n" in doc:
|
||||
doc = doc.replace("\n\n\n", "\n\n")
|
||||
return doc.rstrip()
|
||||
|
||||
|
||||
def set_learned(doc: str, learned_lines: List[str]) -> str:
|
||||
"""Replace the protected learned region with the given bullet lines."""
|
||||
base = _strip_learned(doc)
|
||||
body = "\n".join(f"- {ln.strip().lstrip('- ').strip()}" for ln in learned_lines if ln.strip())
|
||||
block = (
|
||||
f"\n\n{LEARNED_START}\n"
|
||||
f"## Learned preferences & procedures\n\n{_BANNER}\n\n{body}\n"
|
||||
f"{LEARNED_END}\n"
|
||||
)
|
||||
return (base + block).lstrip("\n")
|
||||
|
||||
|
||||
def current_learned_lines(doc: str) -> List[str]:
|
||||
inner = extract_learned(doc)
|
||||
lines: List[str] = []
|
||||
for ln in inner.splitlines():
|
||||
ln = ln.strip()
|
||||
if ln.startswith("- "):
|
||||
lines.append(ln[2:].strip())
|
||||
return lines
|
||||
|
||||
|
||||
def _norm(s: str) -> str:
|
||||
return re.sub(r"\s+", " ", (s or "").lower()).strip()
|
||||
|
||||
|
||||
def apply_edits(doc: str, edits: List[EditRecord]) -> Tuple[str, List[EditRecord]]:
|
||||
"""Apply add/delete/replace edits to the protected learned region.
|
||||
|
||||
Returns (new_doc, applied_edits). Dedups: an `add` whose content already
|
||||
exists (normalized) is skipped. `delete`/`replace` match on normalized
|
||||
anchor substring.
|
||||
"""
|
||||
lines = current_learned_lines(doc)
|
||||
norm_set = {_norm(l) for l in lines}
|
||||
applied: List[EditRecord] = []
|
||||
|
||||
for e in edits:
|
||||
op = (e.op or "add").lower()
|
||||
if op == "add":
|
||||
if _norm(e.content) in norm_set or not e.content.strip():
|
||||
continue
|
||||
lines.append(e.content.strip())
|
||||
norm_set.add(_norm(e.content))
|
||||
applied.append(e)
|
||||
elif op == "delete":
|
||||
anchor = _norm(e.anchor or e.content)
|
||||
keep = [l for l in lines if anchor not in _norm(l)]
|
||||
if len(keep) != len(lines):
|
||||
lines = keep
|
||||
norm_set = {_norm(l) for l in lines}
|
||||
applied.append(e)
|
||||
elif op == "replace":
|
||||
anchor = _norm(e.anchor)
|
||||
new_lines = []
|
||||
changed = False
|
||||
for l in lines:
|
||||
if anchor and anchor in _norm(l):
|
||||
new_lines.append(e.content.strip())
|
||||
changed = True
|
||||
else:
|
||||
new_lines.append(l)
|
||||
if changed:
|
||||
lines = new_lines
|
||||
norm_set = {_norm(l) for l in lines}
|
||||
applied.append(e)
|
||||
|
||||
return set_learned(doc, lines), applied
|
||||
|
||||
|
||||
def ensure_skill_scaffold(doc: str, *, name: str, description: str) -> str:
|
||||
"""Ensure a SKILL.md has YAML frontmatter so Claude Code loads it."""
|
||||
if doc.lstrip().startswith("---"):
|
||||
return doc
|
||||
fm = (
|
||||
"---\n"
|
||||
f"name: {name}\n"
|
||||
f"description: {description}\n"
|
||||
"---\n\n"
|
||||
f"# {name}\n\n"
|
||||
"Preferences and procedures learned from your past Claude Code sessions.\n"
|
||||
)
|
||||
return fm + doc
|
||||
168
skillopt/sleep/mine.py
Normal file
168
skillopt/sleep/mine.py
Normal file
@@ -0,0 +1,168 @@
|
||||
"""SkillOpt-Sleep — Stage 2: mine.
|
||||
|
||||
Turn :class:`SessionDigest` objects into :class:`TaskRecord` training units.
|
||||
|
||||
Two miners:
|
||||
* heuristic_mine — deterministic, no API. Detects retry chains (a prompt
|
||||
re-asked after negative feedback => the early attempt failed), extracts
|
||||
the user's recurring intents, and labels outcomes from feedback signals.
|
||||
* llm_mine — optional; uses an optimizer backend to produce richer
|
||||
TaskRecords with checkable references. Falls back to heuristic on error.
|
||||
|
||||
The heuristic miner is what makes the whole cycle runnable offline and is the
|
||||
basis of the deterministic experiment.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import re
|
||||
from typing import Any, Callable, List, Optional
|
||||
|
||||
from skillopt.sleep.types import SessionDigest, TaskRecord
|
||||
|
||||
|
||||
def _tid(project: str, intent: str) -> str:
|
||||
h = hashlib.sha256((project + "::" + intent).encode("utf-8")).hexdigest()[:12]
|
||||
return "task_" + h
|
||||
|
||||
|
||||
def _short(text: str, n: int = 600) -> str:
|
||||
text = (text or "").strip()
|
||||
return text if len(text) <= n else text[:n] + " …"
|
||||
|
||||
|
||||
def _looks_negative(signals: List[str]) -> bool:
|
||||
return any(s.startswith("neg:") for s in signals)
|
||||
|
||||
|
||||
def _looks_positive(signals: List[str]) -> bool:
|
||||
return any(s.startswith("pos:") for s in signals)
|
||||
|
||||
|
||||
def heuristic_mine(
|
||||
digests: List[SessionDigest],
|
||||
*,
|
||||
max_tasks: int = 40,
|
||||
) -> List[TaskRecord]:
|
||||
"""Deterministic miner — no API calls.
|
||||
|
||||
Strategy:
|
||||
* Each session with >=1 real user prompt yields one TaskRecord whose
|
||||
intent is the FIRST substantive prompt (the original ask).
|
||||
* Outcome is inferred:
|
||||
- negative feedback present and no later positive -> "fail"
|
||||
- positive feedback present -> "success"
|
||||
- re-asks (multiple user turns) without resolution -> "mixed"
|
||||
- otherwise -> "unknown"
|
||||
* attempted_solution = the last assistant final (what was produced).
|
||||
* reference_kind defaults to "none"; the consolidation step will use a
|
||||
rubric judge for these. (Exact refs are added by the experiment data
|
||||
or by the LLM miner when it can derive a checkable answer.)
|
||||
"""
|
||||
tasks: List[TaskRecord] = []
|
||||
for d in digests:
|
||||
if not d.user_prompts:
|
||||
continue
|
||||
intent = d.user_prompts[0]
|
||||
if len(intent.strip()) < 8:
|
||||
continue
|
||||
if _looks_positive(d.feedback_signals) and not _looks_negative(d.feedback_signals):
|
||||
outcome = "success"
|
||||
elif _looks_negative(d.feedback_signals):
|
||||
outcome = "fail"
|
||||
elif d.n_user_turns >= 3:
|
||||
outcome = "mixed"
|
||||
else:
|
||||
outcome = "unknown"
|
||||
|
||||
attempted = d.assistant_finals[-1] if d.assistant_finals else ""
|
||||
context = ""
|
||||
if len(d.user_prompts) > 1:
|
||||
# later prompts often carry the corrective detail / real constraints
|
||||
context = "Follow-up constraints from the same session:\n- " + "\n- ".join(
|
||||
_short(p, 200) for p in d.user_prompts[1:4]
|
||||
)
|
||||
tags = []
|
||||
if d.tools_used:
|
||||
tags.append("tools:" + "+".join(d.tools_used[:4]))
|
||||
if d.git_branch:
|
||||
tags.append("branch:" + d.git_branch)
|
||||
|
||||
tasks.append(
|
||||
TaskRecord(
|
||||
id=_tid(d.project, intent),
|
||||
project=d.project,
|
||||
intent=_short(intent, 800),
|
||||
context_excerpt=_short(context, 600),
|
||||
attempted_solution=_short(attempted, 600),
|
||||
outcome=outcome,
|
||||
reference_kind="none",
|
||||
reference="",
|
||||
tags=tags,
|
||||
source_sessions=[d.session_id],
|
||||
)
|
||||
)
|
||||
if len(tasks) >= max_tasks:
|
||||
break
|
||||
return tasks
|
||||
|
||||
|
||||
def dedup_tasks(tasks: List[TaskRecord]) -> List[TaskRecord]:
|
||||
"""Merge tasks sharing an id (same project+intent across sessions)."""
|
||||
by_id: dict = {}
|
||||
for t in tasks:
|
||||
if t.id in by_id:
|
||||
ex = by_id[t.id]
|
||||
ex.source_sessions = list(dict.fromkeys(ex.source_sessions + t.source_sessions))
|
||||
# prefer a resolved outcome if either session resolved it
|
||||
order = {"success": 3, "fail": 2, "mixed": 1, "unknown": 0}
|
||||
if order.get(t.outcome, 0) > order.get(ex.outcome, 0):
|
||||
ex.outcome = t.outcome
|
||||
else:
|
||||
by_id[t.id] = t
|
||||
return list(by_id.values())
|
||||
|
||||
|
||||
def assign_splits(
|
||||
tasks: List[TaskRecord],
|
||||
*,
|
||||
holdout_fraction: float = 0.34,
|
||||
seed: int = 42,
|
||||
) -> List[TaskRecord]:
|
||||
"""Deterministically split tasks into replay (train) / holdout (test).
|
||||
|
||||
Uses a stable hash of the task id so the same task always lands in the
|
||||
same split across nights (a fixed held-out gate, like SkillOpt's D_sel).
|
||||
"""
|
||||
for t in tasks:
|
||||
bucket = int(hashlib.sha256((str(seed) + t.id).encode()).hexdigest(), 16) % 100
|
||||
t.split = "holdout" if bucket < int(holdout_fraction * 100) else "replay"
|
||||
# guarantee both splits non-empty when possible
|
||||
splits = {t.split for t in tasks}
|
||||
if len(tasks) >= 2 and "holdout" not in splits:
|
||||
tasks[-1].split = "holdout"
|
||||
if len(tasks) >= 2 and "replay" not in splits:
|
||||
tasks[0].split = "replay"
|
||||
return tasks
|
||||
|
||||
|
||||
def mine(
|
||||
digests: List[SessionDigest],
|
||||
*,
|
||||
max_tasks: int = 40,
|
||||
holdout_fraction: float = 0.34,
|
||||
seed: int = 42,
|
||||
llm_miner: Optional[Callable[[List[SessionDigest]], List[TaskRecord]]] = None,
|
||||
) -> List[TaskRecord]:
|
||||
"""Top-level miner. Uses ``llm_miner`` if provided, else heuristic."""
|
||||
tasks: List[TaskRecord] = []
|
||||
if llm_miner is not None:
|
||||
try:
|
||||
tasks = llm_miner(digests) or []
|
||||
except Exception:
|
||||
tasks = []
|
||||
if not tasks:
|
||||
tasks = heuristic_mine(digests, max_tasks=max_tasks)
|
||||
tasks = dedup_tasks(tasks)
|
||||
tasks = assign_splits(tasks, holdout_fraction=holdout_fraction, seed=seed)
|
||||
return tasks
|
||||
46
skillopt/sleep/replay.py
Normal file
46
skillopt/sleep/replay.py
Normal file
@@ -0,0 +1,46 @@
|
||||
"""SkillOpt-Sleep — Stage 3: replay.
|
||||
|
||||
Re-run mined TaskRecords offline under a given (skill, memory) and score
|
||||
them, producing the (hard, soft) signal SkillOpt's gate consumes.
|
||||
|
||||
For Phase 1 the replay is "mock mode": a sandboxed single-shot attempt via
|
||||
the chosen backend (MockBackend = deterministic; AnthropicBackend = real).
|
||||
"fresh" worktree replay is Phase 3 and is intentionally not wired here.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import List, Tuple
|
||||
|
||||
from skillopt.sleep.backend import Backend
|
||||
from skillopt.sleep.types import ReplayResult, TaskRecord
|
||||
|
||||
|
||||
def replay_one(backend: Backend, task: TaskRecord, skill: str, memory: str) -> ReplayResult:
|
||||
response = backend.attempt(task, skill, memory)
|
||||
hard, soft, rationale = backend.judge(task, response)
|
||||
return ReplayResult(
|
||||
id=task.id,
|
||||
hard=float(hard),
|
||||
soft=float(soft),
|
||||
response=response,
|
||||
fail_reason="" if hard >= 1.0 else (rationale or "below threshold"),
|
||||
task_type=(task.tags[0] if task.tags else "task"),
|
||||
judge_rationale=rationale,
|
||||
)
|
||||
|
||||
|
||||
def replay_batch(
|
||||
backend: Backend,
|
||||
tasks: List[TaskRecord],
|
||||
skill: str,
|
||||
memory: str,
|
||||
) -> List[Tuple[TaskRecord, ReplayResult]]:
|
||||
return [(t, replay_one(backend, t, skill, memory)) for t in tasks]
|
||||
|
||||
|
||||
def aggregate_scores(pairs: List[Tuple[TaskRecord, ReplayResult]]) -> Tuple[float, float]:
|
||||
if not pairs:
|
||||
return 0.0, 0.0
|
||||
hard = sum(r.hard for _t, r in pairs) / len(pairs)
|
||||
soft = sum(r.soft for _t, r in pairs) / len(pairs)
|
||||
return hard, soft
|
||||
103
skillopt/sleep/staging.py
Normal file
103
skillopt/sleep/staging.py
Normal file
@@ -0,0 +1,103 @@
|
||||
"""SkillOpt-Sleep — Stage 5/6: staging and adoption.
|
||||
|
||||
Implements the Dreams safety contract: the cycle never mutates the user's
|
||||
live CLAUDE.md / SKILL.md. It writes proposals + a human-readable report into
|
||||
a staging directory; a separate, explicit `adopt` step copies them over the
|
||||
live files after taking a backup.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import time
|
||||
from typing import List, Optional
|
||||
|
||||
from skillopt.sleep.types import SleepReport
|
||||
|
||||
|
||||
def _ts_dir() -> str:
|
||||
return time.strftime("%Y%m%d-%H%M%S", time.localtime())
|
||||
|
||||
|
||||
def staging_root(project: str) -> str:
|
||||
return os.path.join(project, ".skillopt-sleep", "staging")
|
||||
|
||||
|
||||
def latest_staging(project: str) -> Optional[str]:
|
||||
root = staging_root(project)
|
||||
if not os.path.isdir(root):
|
||||
return None
|
||||
subs = sorted(
|
||||
(os.path.join(root, d) for d in os.listdir(root)),
|
||||
key=lambda p: os.path.getmtime(p),
|
||||
reverse=True,
|
||||
)
|
||||
return subs[0] if subs else None
|
||||
|
||||
|
||||
def write_staging(
|
||||
project: str,
|
||||
*,
|
||||
report: SleepReport,
|
||||
proposed_skill: Optional[str],
|
||||
proposed_memory: Optional[str],
|
||||
live_skill_path: str,
|
||||
live_memory_path: str,
|
||||
report_md: str,
|
||||
) -> str:
|
||||
"""Write proposals + report into staging/<ts>/ and return that path."""
|
||||
out = os.path.join(staging_root(project), _ts_dir())
|
||||
os.makedirs(out, exist_ok=True)
|
||||
|
||||
manifest = {
|
||||
"live_skill_path": live_skill_path,
|
||||
"live_memory_path": live_memory_path,
|
||||
"has_skill": proposed_skill is not None,
|
||||
"has_memory": proposed_memory is not None,
|
||||
"accepted": report.accepted,
|
||||
}
|
||||
if proposed_skill is not None:
|
||||
with open(os.path.join(out, "proposed_SKILL.md"), "w", encoding="utf-8") as f:
|
||||
f.write(proposed_skill)
|
||||
if proposed_memory is not None:
|
||||
with open(os.path.join(out, "proposed_CLAUDE.md"), "w", encoding="utf-8") as f:
|
||||
f.write(proposed_memory)
|
||||
with open(os.path.join(out, "report.json"), "w", encoding="utf-8") as f:
|
||||
json.dump(report.to_dict(), f, ensure_ascii=False, indent=2)
|
||||
with open(os.path.join(out, "report.md"), "w", encoding="utf-8") as f:
|
||||
f.write(report_md)
|
||||
with open(os.path.join(out, "manifest.json"), "w", encoding="utf-8") as f:
|
||||
json.dump(manifest, f, ensure_ascii=False, indent=2)
|
||||
return out
|
||||
|
||||
|
||||
def _backup(path: str, backup_dir: str) -> None:
|
||||
if os.path.exists(path):
|
||||
os.makedirs(backup_dir, exist_ok=True)
|
||||
shutil.copy2(path, os.path.join(backup_dir, os.path.basename(path)))
|
||||
|
||||
|
||||
def adopt(staging_dir: str) -> List[str]:
|
||||
"""Copy staged proposals over the live files, backing up first.
|
||||
|
||||
Returns the list of live paths that were updated.
|
||||
"""
|
||||
with open(os.path.join(staging_dir, "manifest.json")) as f:
|
||||
manifest = json.load(f)
|
||||
backup_dir = os.path.join(staging_dir, "backup")
|
||||
updated: List[str] = []
|
||||
|
||||
if manifest.get("has_skill"):
|
||||
live = manifest["live_skill_path"]
|
||||
os.makedirs(os.path.dirname(live), exist_ok=True)
|
||||
_backup(live, backup_dir)
|
||||
shutil.copy2(os.path.join(staging_dir, "proposed_SKILL.md"), live)
|
||||
updated.append(live)
|
||||
if manifest.get("has_memory"):
|
||||
live = manifest["live_memory_path"]
|
||||
os.makedirs(os.path.dirname(live), exist_ok=True)
|
||||
_backup(live, backup_dir)
|
||||
shutil.copy2(os.path.join(staging_dir, "proposed_CLAUDE.md"), live)
|
||||
updated.append(live)
|
||||
return updated
|
||||
83
skillopt/sleep/state.py
Normal file
83
skillopt/sleep/state.py
Normal file
@@ -0,0 +1,83 @@
|
||||
"""SkillOpt-Sleep — persistent cross-night state.
|
||||
|
||||
state.json lives in ~/.skillopt-sleep and is the "long-term" store that
|
||||
turns nightly episodes into durable competence (the Agent-Sleep paper's
|
||||
short-term -> long-term transfer). It records:
|
||||
|
||||
- night counter
|
||||
- last harvest timestamp per project (so each night only sees new data)
|
||||
- cross-night "slow/meta" memory (lessons that persisted across nights)
|
||||
- per-night history (scores, accept/reject) for trend reporting
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
|
||||
def _now_iso(clock: Optional[float] = None) -> str:
|
||||
# caller passes a timestamp; we avoid importing time at module import
|
||||
import time as _t
|
||||
return _t.strftime("%Y-%m-%dT%H:%M:%S", _t.localtime(clock if clock is not None else _t.time()))
|
||||
|
||||
|
||||
DEFAULT_STATE: Dict[str, Any] = {
|
||||
"version": 1,
|
||||
"night": 0,
|
||||
"last_harvest": {}, # project -> iso timestamp of last harvested record
|
||||
"slow_memory": "", # cross-night consolidated lessons (meta-skill analogue)
|
||||
"history": [], # list of per-night summaries
|
||||
}
|
||||
|
||||
|
||||
class SleepState:
|
||||
def __init__(self, path: str, data: Optional[Dict[str, Any]] = None) -> None:
|
||||
self.path = path
|
||||
self.data = data if data is not None else dict(DEFAULT_STATE)
|
||||
|
||||
# io ---------------------------------------------------------------------
|
||||
@classmethod
|
||||
def load(cls, path: str) -> "SleepState":
|
||||
if os.path.exists(path):
|
||||
try:
|
||||
with open(path) as f:
|
||||
data = json.load(f)
|
||||
merged = dict(DEFAULT_STATE)
|
||||
merged.update(data if isinstance(data, dict) else {})
|
||||
return cls(path, merged)
|
||||
except Exception:
|
||||
pass
|
||||
return cls(path, dict(DEFAULT_STATE))
|
||||
|
||||
def save(self) -> None:
|
||||
os.makedirs(os.path.dirname(self.path), exist_ok=True)
|
||||
tmp = self.path + ".tmp"
|
||||
with open(tmp, "w") as f:
|
||||
json.dump(self.data, f, ensure_ascii=False, indent=2)
|
||||
os.replace(tmp, self.path)
|
||||
|
||||
# accessors --------------------------------------------------------------
|
||||
@property
|
||||
def night(self) -> int:
|
||||
return int(self.data.get("night", 0))
|
||||
|
||||
def last_harvest_for(self, project: str) -> Optional[str]:
|
||||
return self.data.get("last_harvest", {}).get(project)
|
||||
|
||||
def set_last_harvest(self, project: str, iso_ts: str) -> None:
|
||||
self.data.setdefault("last_harvest", {})[project] = iso_ts
|
||||
|
||||
@property
|
||||
def slow_memory(self) -> str:
|
||||
return str(self.data.get("slow_memory", ""))
|
||||
|
||||
def set_slow_memory(self, content: str) -> None:
|
||||
self.data["slow_memory"] = content
|
||||
|
||||
def begin_night(self, clock: Optional[float] = None) -> int:
|
||||
self.data["night"] = self.night + 1
|
||||
return self.night
|
||||
|
||||
def record_night(self, summary: Dict[str, Any]) -> None:
|
||||
self.data.setdefault("history", []).append(summary)
|
||||
127
skillopt/sleep/types.py
Normal file
127
skillopt/sleep/types.py
Normal file
@@ -0,0 +1,127 @@
|
||||
"""SkillOpt-Sleep — core data types.
|
||||
|
||||
These dataclasses are the interfaces between the sleep-cycle stages
|
||||
(harvest -> mine -> replay -> consolidate -> stage). They are intentionally
|
||||
plain (no slots, no heavy deps) so the package imports cleanly on any
|
||||
Python 3.8+ interpreter and the deterministic experiment runs with zero
|
||||
external dependencies.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
|
||||
# ── Stage 1: harvest ──────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class SessionDigest:
|
||||
"""A normalized summary of one Claude Code session transcript.
|
||||
|
||||
Produced by :mod:`skillopt.sleep.harvest` from a ``<sessionId>.jsonl``
|
||||
transcript plus ``history.jsonl`` entries.
|
||||
"""
|
||||
|
||||
session_id: str
|
||||
project: str
|
||||
git_branch: str = ""
|
||||
started_at: str = ""
|
||||
ended_at: str = ""
|
||||
user_prompts: List[str] = field(default_factory=list)
|
||||
assistant_finals: List[str] = field(default_factory=list)
|
||||
tools_used: List[str] = field(default_factory=list)
|
||||
files_touched: List[str] = field(default_factory=list)
|
||||
feedback_signals: List[str] = field(default_factory=list) # "still broken", "perfect", ...
|
||||
n_user_turns: int = 0
|
||||
n_assistant_turns: int = 0
|
||||
raw_path: str = ""
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
# ── Stage 2: mine ─────────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class TaskRecord:
|
||||
"""A self-contained recurring task mined from one or more sessions.
|
||||
|
||||
This is the *training unit* of the sleep cycle — the analogue of a
|
||||
SkillOpt benchmark item.
|
||||
"""
|
||||
|
||||
id: str
|
||||
project: str
|
||||
intent: str # what the user wanted (the "question")
|
||||
context_excerpt: str = "" # minimal context needed to attempt it
|
||||
attempted_solution: str = "" # what the agent produced before
|
||||
outcome: str = "unknown" # success | fail | mixed | unknown
|
||||
reference_kind: str = "none" # exact | rubric | none
|
||||
reference: str = "" # exact answer, or rubric text
|
||||
tags: List[str] = field(default_factory=list)
|
||||
source_sessions: List[str] = field(default_factory=list)
|
||||
split: str = "replay" # replay (train) | holdout (test)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return asdict(self)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, d: Dict[str, Any]) -> "TaskRecord":
|
||||
known = {f for f in cls.__dataclass_fields__} # type: ignore[attr-defined]
|
||||
return cls(**{k: v for k, v in d.items() if k in known})
|
||||
|
||||
|
||||
# ── Stage 3: replay ───────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class ReplayResult:
|
||||
"""Outcome of re-running one TaskRecord offline under a given skill+memory."""
|
||||
|
||||
id: str
|
||||
hard: float = 0.0 # 0/1 exact, or continuous reward
|
||||
soft: float = 0.0 # partial credit / judge score 0..1
|
||||
response: str = ""
|
||||
fail_reason: str = ""
|
||||
task_type: str = "task"
|
||||
judge_rationale: str = ""
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
# ── Stage 4/5: consolidation report ───────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class EditRecord:
|
||||
"""One bounded edit proposed/applied to skill or memory."""
|
||||
|
||||
target: str # "skill" | "memory"
|
||||
op: str # add | delete | replace
|
||||
content: str = ""
|
||||
anchor: str = "" # for replace/delete: text being changed
|
||||
rationale: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class SleepReport:
|
||||
"""Everything one night produced — written to staging for review."""
|
||||
|
||||
night: int
|
||||
project: str
|
||||
started_at: str = ""
|
||||
ended_at: str = ""
|
||||
n_sessions: int = 0
|
||||
n_tasks: int = 0
|
||||
n_replayed: int = 0
|
||||
baseline_score: float = 0.0
|
||||
candidate_score: float = 0.0
|
||||
accepted: bool = False
|
||||
gate_action: str = ""
|
||||
edits: List[EditRecord] = field(default_factory=list)
|
||||
rejected_edits: List[EditRecord] = field(default_factory=list)
|
||||
tokens_used: int = 0
|
||||
notes: List[str] = field(default_factory=list)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
d = asdict(self)
|
||||
return d
|
||||
Reference in New Issue
Block a user