Add reviewed task-file flow for Codex sleep runs

This commit is contained in:
Kirill Kostarev
2026-06-15 14:45:46 +03:00
committed by carpedkm
parent 382811ddcc
commit 05cdc26beb
9 changed files with 780 additions and 52 deletions

View File

@@ -49,18 +49,43 @@ Or call the engine directly:
```bash
python -m skillopt_sleep dry-run --project "$(pwd)" --source codex --backend mock
python -m skillopt_sleep run --project "$(pwd)" --source codex --backend codex
python -m skillopt_sleep run --project "$(pwd)" --source codex --backend codex \
--max-sessions 5 --max-tasks 3 --progress
python -m skillopt_sleep run --project "$(pwd)" --source codex --backend codex \
--target-skill-path .agents/skills/example/SKILL.md \
--max-sessions 5 --max-tasks 3 --progress
```
`--source codex` reads Codex Desktop archived sessions from
`~/.codex/archived_sessions`. Use `--codex-home /path/to/.codex` to point at a
different Codex home, or `--source auto` to try Codex archives first and fall
back to Claude Code transcripts. Default backend is `mock` (no API spend).
`--backend codex` uses your Codex budget for real improvement. All the
`--backend codex` uses your Codex budget for real improvement. Bound live runs
with `--max-sessions` and `--max-tasks`; add `--progress` because Codex-backed
mining, replay, and reflection can be slow and otherwise quiet. Use
`--target-skill-path` to stage/adopt into a repo-scoped Codex skill such as
`.agents/skills/<name>/SKILL.md`; target runs over-sample mined tasks and
prefer tasks that match the target skill's path, headings, and content. All the
controllable knobs (`--gate on|off`, `--rollouts-k`, `--budget-tokens`,
`--preferences`, optimizer/target split) work identically — see
[the SkillOpt-Sleep guide section](https://microsoft.github.io/SkillOpt/docs/guideline.html#sleep).
For privacy-sensitive projects, split the run into reviewable steps:
```bash
python -m skillopt_sleep harvest --project "$(pwd)" --source codex \
--target-skill-path .agents/skills/example/SKILL.md \
--max-sessions 5 --max-tasks 3 \
--output reviewed-tasks.json
python -m skillopt_sleep dry-run --project "$(pwd)" --backend codex \
--tasks-file reviewed-tasks.json --progress --json
```
Inspect/redact the JSON and set `"reviewed": true` before using a real backend.
`--tasks-file` skips archive harvest/mining and replays only the reviewed JSON
tasks; real backends refuse task files still marked `"reviewed": false`.
## Notes / status
- Codex's `exec` runs shell, so the real-tool-loop replay (e.g. the

View File

@@ -9,6 +9,10 @@
Common flags:
--project PATH project to evolve (default: cwd)
--scope all|invoked harvest scope (default: invoked)
--max-sessions N cap transcript sessions per run
--max-tasks N cap mined tasks per run
--target-skill-path PATH explicit live SKILL.md to stage/adopt
--tasks-file PATH reviewed TaskRecord JSON file to replay instead of harvesting
--backend mock|claude|codex|copilot
--source claude|codex|auto
--model NAME
@@ -31,6 +35,35 @@ from skillopt_sleep.mine import mine
from skillopt_sleep.staging import adopt as adopt_staging
from skillopt_sleep.staging import latest_staging
from skillopt_sleep.state import SleepState
from skillopt_sleep.tasks_file import load_tasks_file, make_tasks_payload, write_tasks_file
def _read_text(path: str) -> str:
try:
with open(path, encoding="utf-8") as f:
return f.read()
except Exception:
return ""
def _report_payload(rep, outcome) -> Dict[str, Any]:
return {
"night": rep.night,
"accepted": rep.accepted,
"gate_action": rep.gate_action,
"no_edits_reason": getattr(rep, "no_edits_reason", ""),
"baseline": rep.baseline_score,
"candidate": rep.candidate_score,
"n_tasks": rep.n_tasks,
"n_sessions": rep.n_sessions,
"n_accepted_edits": len(rep.edits),
"n_rejected_edits": len(rep.rejected_edits),
"edits": [e.__dict__ for e in rep.edits],
"rejected_edits": [e.__dict__ for e in rep.rejected_edits],
"notes": rep.notes,
"staging_dir": outcome.staging_dir,
"adopted": outcome.adopted,
}
def _add_common(p: argparse.ArgumentParser) -> None:
@@ -45,11 +78,21 @@ def _add_common(p: argparse.ArgumentParser) -> None:
help="session transcript source")
p.add_argument("--lookback-hours", type=int, default=0)
p.add_argument("--edit-budget", type=int, default=0)
p.add_argument("--max-sessions", type=int, default=0,
help="cap harvested sessions before mining; default derives from max tasks")
p.add_argument("--max-tasks", type=int, default=0,
help="cap mined tasks for this run")
p.add_argument("--target-skill-path", default="",
help="explicit live SKILL.md path to evolve/stage/adopt")
p.add_argument("--tasks-file", default="",
help="reviewed TaskRecord JSON file to replay instead of harvesting")
p.add_argument("--progress", action="store_true",
help="print phase progress to stderr")
p.add_argument("--auto-adopt", action="store_true")
p.add_argument("--json", action="store_true")
def _cfg_from_args(args) -> Any:
def _cfg_from_args(args, task_meta: Dict[str, Any] | None = None) -> Any:
overrides: Dict[str, Any] = {}
if args.project:
overrides["invoked_project"] = os.path.abspath(args.project)
@@ -72,30 +115,63 @@ def _cfg_from_args(args) -> Any:
overrides["lookback_hours"] = args.lookback_hours
if getattr(args, "edit_budget", 0):
overrides["edit_budget"] = args.edit_budget
if getattr(args, "max_sessions", 0):
overrides["max_sessions_per_night"] = args.max_sessions
if getattr(args, "max_tasks", 0):
overrides["max_tasks_per_night"] = args.max_tasks
target_skill_path = getattr(args, "target_skill_path", "")
if not target_skill_path and task_meta:
target_skill_path = str(task_meta.get("target_skill_path") or "")
if target_skill_path:
path = os.path.expanduser(target_skill_path)
if args.project and not os.path.isabs(path):
path = os.path.join(os.path.abspath(args.project), path)
overrides["target_skill_path"] = os.path.abspath(path)
if getattr(args, "progress", False):
overrides["progress"] = True
if getattr(args, "auto_adopt", False):
overrides["auto_adopt"] = True
return load_config(**overrides)
def cmd_run(args, dry: bool = False) -> int:
cfg = _cfg_from_args(args)
outcome = run_sleep_cycle(cfg, dry_run=dry)
task_meta: Dict[str, Any] = {}
tasks = None
if getattr(args, "tasks_file", ""):
# Load once before config so target_skill_path can default from metadata.
tasks, task_meta = load_tasks_file(args.tasks_file)
cfg = _cfg_from_args(args, task_meta=task_meta)
if getattr(args, "tasks_file", ""):
tasks, task_meta = load_tasks_file(
args.tasks_file,
holdout_fraction=cfg.get("holdout_fraction", 0.34),
seed=cfg.get("seed", 42),
)
if cfg.get("backend", "mock") != "mock" and task_meta.get("reviewed") is not True:
print(
"[sleep] refusing real-backend replay from an unreviewed tasks file; "
"inspect/redact it and set \"reviewed\": true first",
file=sys.stderr,
)
return 2
outcome = run_sleep_cycle(cfg, seed_tasks=tasks, dry_run=dry)
rep = outcome.report
if args.json:
print(json.dumps({
"night": rep.night, "accepted": rep.accepted,
"gate_action": rep.gate_action,
"baseline": rep.baseline_score, "candidate": rep.candidate_score,
"n_tasks": rep.n_tasks, "n_sessions": rep.n_sessions,
"edits": [e.__dict__ for e in rep.edits],
"staging_dir": outcome.staging_dir, "adopted": outcome.adopted,
}, ensure_ascii=False, indent=2))
payload = _report_payload(rep, outcome)
if task_meta:
payload["tasks_file"] = task_meta.get("tasks_file", "")
payload["tasks_reviewed"] = task_meta.get("reviewed", False)
print(json.dumps(payload, ensure_ascii=False, indent=2))
else:
print(f"[sleep] night {rep.night}: {rep.n_sessions} sessions -> {rep.n_tasks} tasks")
print(f"[sleep] held-out {rep.baseline_score:.3f} -> {rep.candidate_score:.3f} "
f"=> {rep.gate_action} (accepted={rep.accepted})")
for e in rep.edits:
print(f" + [{e.target}/{e.op}] {e.content}")
if rep.rejected_edits:
print("[sleep] rejected by gate:")
for e in rep.rejected_edits:
print(f" - [{e.target}/{e.op}] {e.content}")
if outcome.staging_dir:
print(f"[sleep] staged: {outcome.staging_dir}")
if not outcome.adopted:
@@ -152,16 +228,42 @@ def cmd_adopt(args) -> int:
def cmd_harvest(args) -> int:
cfg = _cfg_from_args(args)
digests = harvest_for_config(cfg, limit=cfg.get("max_tasks_per_night", 40) * 3)
tasks = mine(digests, max_tasks=cfg.get("max_tasks_per_night", 40),
holdout_fraction=cfg.get("holdout_fraction", 0.34), seed=cfg.get("seed", 42))
session_limit = cfg.get("max_sessions_per_night", 0) or cfg.get("max_tasks_per_night", 40) * 3
target_skill_path = cfg.managed_skill_path() if cfg.get("target_skill_path", "") else ""
target_skill_text = _read_text(target_skill_path) if target_skill_path else ""
max_tasks = cfg.get("max_tasks_per_night", 40)
candidate_limit = max_tasks
if cfg.get("target_task_filter", True) and target_skill_text:
candidate_limit = max(max_tasks, max_tasks * 3)
digests = harvest_for_config(cfg, limit=session_limit)
tasks = mine(
digests,
max_tasks=max_tasks,
candidate_limit=candidate_limit,
holdout_fraction=cfg.get("holdout_fraction", 0.34),
seed=cfg.get("seed", 42),
target_skill_text=target_skill_text,
target_skill_path=target_skill_path,
)
payload = make_tasks_payload(
tasks,
project=cfg.get("invoked_project") or os.getcwd(),
transcript_source=cfg.get("transcript_source", ""),
n_sessions=len(digests),
target_skill_path=target_skill_path,
)
output_path = ""
if getattr(args, "output", ""):
output_path = write_tasks_file(args.output, payload)
if args.json:
print(json.dumps({
"n_sessions": len(digests),
"tasks": [t.to_dict() for t in tasks],
}, ensure_ascii=False, indent=2))
json_payload = dict(payload)
if output_path:
json_payload["output"] = output_path
print(json.dumps(json_payload, ensure_ascii=False, indent=2))
else:
print(f"[sleep] {len(digests)} sessions -> {len(tasks)} tasks")
if output_path:
print(f"[sleep] wrote reviewed-task draft: {output_path}")
for t in tasks:
print(f" [{t.split}/{t.outcome}] {t.intent[:90]}")
return 0
@@ -207,6 +309,7 @@ def main(argv=None) -> int:
p_adopt.add_argument("--staging", default="", help="specific staging dir")
p_harvest = sub.add_parser("harvest", help="debug: show mined tasks")
_add_common(p_harvest)
p_harvest.add_argument("--output", default="", help="write mined tasks JSON for review")
p_sched = sub.add_parser("schedule", help="install a nightly cron entry for this project")
_add_common(p_sched)
p_sched.add_argument("--hour", type=int, default=3)

View File

@@ -316,6 +316,8 @@ class CliBackend(Backend):
self.timeout = timeout
self._tokens = 0
self._cache: Dict[str, str] = {}
self.last_call_error = ""
self.last_reflect_raw = ""
# subclasses override --------------------------------------------------
def _call(self, prompt: str, *, max_tokens: int = 1024) -> str:
@@ -692,15 +694,25 @@ class CodexCliBackend(CliBackend):
name = "codex"
def __init__(self, model: str = "", codex_path: str = "", timeout: int = 240,
sandbox: str = "read-only") -> None:
def __init__(
self,
model: str = "",
codex_path: str = "",
timeout: int = 240,
sandbox: str = "read-only",
project_dir: str = "",
) -> None:
super().__init__(model=model or os.environ.get("SKILLOPT_SLEEP_CODEX_MODEL", ""),
timeout=timeout)
self.codex_path = resolve_codex_path(codex_path)
self.sandbox = sandbox
self.project_dir = (
os.path.abspath(os.path.expanduser(project_dir)) if project_dir else ""
)
def _call(self, prompt: str, *, max_tokens: int = 1024) -> str:
import tempfile
self.last_call_error = ""
out_path = tempfile.NamedTemporaryFile(
prefix="codex_last_", suffix=".txt", delete=False
).name
@@ -709,18 +721,39 @@ class CodexCliBackend(CliBackend):
"--color", "never", "--sandbox", self.sandbox,
"-o", out_path,
]
if self.project_dir:
cmd[3:3] = ["-C", self.project_dir]
if self.model:
cmd += ["-m", self.model]
cmd += ["--", prompt]
proc = None
try:
subprocess.run(cmd, capture_output=True, text=True, timeout=self.timeout)
except Exception:
return ""
try:
with open(out_path, encoding="utf-8") as f:
return f.read().strip()
except Exception:
return ""
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=self.timeout,
cwd=self.project_dir or None,
)
except subprocess.TimeoutExpired:
self.last_call_error = f"codex exec timed out after {self.timeout}s"
return ""
except Exception as exc:
self.last_call_error = f"codex exec failed: {exc}"
return ""
try:
with open(out_path, encoding="utf-8") as f:
out = f.read().strip()
if out:
return out
except Exception as exc:
self.last_call_error = f"could not read codex output file: {exc}"
stdout = (proc.stdout or "").strip() if proc is not None else ""
stderr = (proc.stderr or "").strip() if proc is not None else ""
if proc is not None and proc.returncode != 0 and not self.last_call_error:
self.last_call_error = f"codex exec exited {proc.returncode}: {stderr[:500]}"
return stdout or stderr
finally:
try:
os.unlink(out_path)
@@ -1238,12 +1271,13 @@ def get_backend(
claude_path: str = "claude",
codex_path: str = "",
azure_endpoint: str = "",
project_dir: str = "",
) -> Backend:
n = (name or "mock").strip().lower()
if n in {"claude", "anthropic", "claude_cli", "claude_code"}:
return ClaudeCliBackend(model=model, claude_path=claude_path)
if n in {"codex", "codex_cli", "openai_codex"}:
return CodexCliBackend(model=model, codex_path=codex_path)
return CodexCliBackend(model=model, codex_path=codex_path, project_dir=project_dir)
if n in {"azure", "azure_openai", "aoai"}:
return AzureOpenAIBackend(deployment=model, endpoint=azure_endpoint)
if n in {"azure-responses", "azure_responses", "aoai-responses", "responses"}:
@@ -1265,6 +1299,7 @@ def build_backend(
codex_path: str = "",
azure_endpoint: str = "",
preferences: str = "",
project_dir: str = "",
) -> Backend:
"""Build a single or dual backend.
@@ -1275,13 +1310,21 @@ def build_backend(
"""
has_split = any([optimizer_backend, optimizer_model, target_backend, target_model])
if not has_split:
be = get_backend(backend, model=model, codex_path=codex_path, azure_endpoint=azure_endpoint)
be = get_backend(
backend,
model=model,
codex_path=codex_path,
azure_endpoint=azure_endpoint,
project_dir=project_dir,
)
be.preferences = preferences
return be
tgt = get_backend(target_backend or backend, model=target_model or model,
codex_path=codex_path, azure_endpoint=azure_endpoint)
codex_path=codex_path, azure_endpoint=azure_endpoint,
project_dir=project_dir)
opt = get_backend(optimizer_backend or backend, model=optimizer_model or model,
codex_path=codex_path, azure_endpoint=azure_endpoint)
codex_path=codex_path, azure_endpoint=azure_endpoint,
project_dir=project_dir)
opt.preferences = preferences # reflect runs on the optimizer
dual = DualBackend(target=tgt, optimizer=opt)
dual.preferences = preferences

View File

@@ -51,6 +51,9 @@ DEFAULTS: Dict[str, Any] = {
"evolve_memory": True, # consolidate CLAUDE.md
"evolve_skill": True, # consolidate the managed SKILL.md
"llm_mine": True, # use the backend to mine checkable tasks (real backends)
"target_skill_path": "", # explicit SKILL.md target for repo-scoped agents
"target_task_filter": True, # prefer mined tasks matching target_skill_path/text
"progress": False, # print phase progress to stderr
# ── adoption / safety ──────────────────────────────────────────────────
"auto_adopt": False, # default: stage + require explicit `adopt`
"managed_skill_name": "skillopt-sleep-learned",
@@ -113,6 +116,13 @@ class SleepConfig:
return os.path.join(self.data["claude_home"], "skills")
def managed_skill_path(self) -> str:
target = self.data.get("target_skill_path") or ""
if target:
target = os.path.expanduser(str(target))
if not os.path.isabs(target):
base = self.data.get("invoked_project") or os.getcwd()
target = os.path.join(base, target)
return os.path.abspath(target)
return os.path.join(
self.skills_dir, self.data["managed_skill_name"], "SKILL.md"
)

View File

@@ -10,6 +10,7 @@ CI use. With backend="anthropic" it spends the user's budget for real lift.
from __future__ import annotations
import os
import sys
from dataclasses import dataclass
from typing import List, Optional
@@ -49,6 +50,11 @@ def _read(path: str) -> str:
return ""
def _progress(cfg: SleepConfig, message: str) -> None:
if cfg.get("progress", False):
print(f"[sleep] {message}", file=sys.stderr, flush=True)
def _render_report_md(report: SleepReport, cfg: SleepConfig) -> str:
lines = [
f"# SkillOpt-Sleep — night {report.night} report",
@@ -108,6 +114,26 @@ def run_sleep_cycle(
cfg.get("backend", "mock"),
model=cfg.get("model", ""),
codex_path=cfg.get("codex_path", ""),
project_dir=project,
)
_progress(cfg, f"night {night}: project={project} backend={backend.name}")
# ── live skill/memory docs ───────────────────────────────────────────
live_memory_path = os.path.join(project, "CLAUDE.md")
live_skill_path = cfg.managed_skill_path()
_progress(cfg, f"live skill: {live_skill_path}")
raw_skill = _read(live_skill_path)
skill = raw_skill
memory = _read(live_memory_path)
if not skill:
skill = ensure_skill_scaffold(
"", name=cfg.get("managed_skill_name", "skillopt-sleep-learned"),
description="Preferences and procedures learned from past local agent sessions.",
)
target_filter = bool(
cfg.get("target_task_filter", True)
and cfg.get("target_skill_path", "")
and raw_skill
)
# ── 1+2. harvest + mine (unless seed_tasks injected) ─────────────────
@@ -115,14 +141,25 @@ def run_sleep_cycle(
if seed_tasks is not None:
tasks = seed_tasks
n_sessions = 0
_progress(cfg, f"using {len(tasks)} seeded tasks")
else:
since = state.last_harvest_for(project)
max_tasks = cfg.get("max_tasks_per_night", 40)
max_sessions = cfg.get("max_sessions_per_night", 0) or max_tasks * 3
candidate_limit = max_tasks
if target_filter:
candidate_limit = max(max_tasks, max_tasks * 3)
_progress(
cfg,
f"harvest start: source={cfg.get('transcript_source')} max_sessions={max_sessions}",
)
digests = harvest_for_config(
cfg,
since_iso=since,
limit=cfg.get("max_tasks_per_night", 40) * 3,
limit=max_sessions,
)
n_sessions = len(digests)
_progress(cfg, f"harvest done: sessions={n_sessions}")
# When a real backend is configured, use it to mine checkable tasks from
# the transcripts (rubric/rule judges); otherwise fall back to the
# heuristic miner (no API, no checkable reference).
@@ -130,27 +167,29 @@ def run_sleep_cycle(
if cfg.get("backend", "mock") != "mock" and cfg.get("llm_mine", True):
try:
from skillopt_sleep.llm_miner import make_llm_miner
llm_miner = make_llm_miner(backend, max_tasks=cfg.get("max_tasks_per_night", 40))
llm_miner = make_llm_miner(
backend,
max_sessions=max_sessions,
max_tasks=candidate_limit,
)
except Exception:
llm_miner = None
_progress(
cfg,
f"mine start: max_tasks={max_tasks} candidate_limit={candidate_limit} "
f"llm_mine={llm_miner is not None} target_filter={target_filter}",
)
tasks = mine(
digests,
max_tasks=cfg.get("max_tasks_per_night", 40),
max_tasks=max_tasks,
candidate_limit=candidate_limit,
holdout_fraction=cfg.get("holdout_fraction", 0.34),
seed=cfg.get("seed", 42),
llm_miner=llm_miner,
target_skill_text=raw_skill if target_filter else "",
target_skill_path=live_skill_path if target_filter else "",
)
# ── live skill/memory docs ───────────────────────────────────────────
live_memory_path = os.path.join(project, "CLAUDE.md")
live_skill_path = cfg.managed_skill_path()
skill = _read(live_skill_path)
memory = _read(live_memory_path)
if not skill:
skill = ensure_skill_scaffold(
"", name=cfg.get("managed_skill_name", "skillopt-sleep-learned"),
description="Preferences and procedures learned from past local agent sessions.",
)
_progress(cfg, f"mine done: tasks={len(tasks)}")
report = SleepReport(
night=night, project=project, started_at=started,
@@ -172,6 +211,7 @@ def run_sleep_cycle(
# / dream_factor enrich the training signal. With the defaults (recall_k=0,
# dream_rollouts=1, dream_factor=0) this is exactly the prior single-shot
# consolidate — behavior is unchanged unless the user opts in.
_progress(cfg, "consolidate start")
recall_k = int(cfg.get("recall_k", 0) or 0)
history_tasks = []
if recall_k > 0:
@@ -192,12 +232,18 @@ def run_sleep_cycle(
)
# archive tonight's real (non-dream) tasks so future nights can recall them
state.add_to_archive([t.to_dict() for t in tasks if t.origin != "dream"])
_progress(
cfg,
f"consolidate done: gate={result.gate_action} accepted={result.accepted} "
f"edits={len(result.applied_edits)} rejected={len(result.rejected_edits)}",
)
report.n_replayed = len(tasks)
report.baseline_score = result.baseline_score
report.candidate_score = result.candidate_score
report.accepted = result.accepted
report.gate_action = result.gate_action
report.no_edits_reason = getattr(result, "no_edits_reason", "")
report.edits = result.applied_edits
report.rejected_edits = result.rejected_edits
report.tokens_used = backend.tokens_used()
@@ -208,6 +254,7 @@ def run_sleep_cycle(
adopted = False
adopted_paths: List[str] = []
if not dry_run:
_progress(cfg, "staging start")
report_md = _render_report_md(report, cfg)
proposed_skill = result.new_skill if (cfg.get("evolve_skill") and result.accepted) else None
proposed_memory = result.new_memory if (cfg.get("evolve_memory") and result.accepted) else None

View File

@@ -15,8 +15,10 @@ basis of the deterministic experiment.
from __future__ import annotations
import hashlib
import os
import re
from typing import Any, Callable, List, Optional
from collections import Counter
from typing import Any, Callable, List, Optional, Set, Tuple
from skillopt_sleep.types import SessionDigest, TaskRecord
@@ -39,6 +41,99 @@ def _looks_positive(signals: List[str]) -> bool:
return any(s.startswith("pos:") for s in signals)
_TARGET_STOPWORDS = {
"about", "after", "again", "agent", "agents", "all", "also", "always",
"and", "any", "are", "before", "being", "but", "can", "codex",
"current", "default", "docs", "does", "done", "each", "file", "files",
"for", "from", "have", "into", "keep", "must", "not", "only", "path",
"paths", "project", "read", "repo", "request", "requests", "rule",
"rules", "same", "should", "skill", "skills", "source", "start",
"task", "tasks", "that", "the", "their", "then", "this", "unless",
"update", "user", "users", "when", "with", "work", "workflow",
}
def _target_tokens(text: str) -> List[str]:
tokens: List[str] = []
for raw in re.findall(r"[\w][\w.-]*", (text or "").lower(), flags=re.UNICODE):
parts = [raw] + re.split(r"[\W_]+", raw, flags=re.UNICODE)
for part in parts:
if len(part) < 3 or part.isdigit() or part in _TARGET_STOPWORDS:
continue
tokens.append(part)
return tokens
def _expand_target_keywords(keywords: Set[str]) -> None:
if "mcp" in keywords:
keywords.update({
"configure", "configuration", "connect", "connected", "enable",
"enabled", "install", "installed", "server", "servers",
"настрой", "настроить", "подключи", "подключить",
})
if {"conflict", "conflicts"} & keywords:
keywords.update({
"cherry", "conflict", "conflicts", "git", "merge", "rebase",
"unmerged", "конфликт", "конфликты",
})
def target_task_keywords(
target_skill_text: str,
target_skill_path: str = "",
*,
limit: int = 180,
) -> Tuple[Set[str], Set[str]]:
"""Return (strong, weak) keywords that describe a target skill."""
path_text = (target_skill_path or "").replace(os.sep, " ")
headings = "\n".join(re.findall(r"(?m)^#+\s+(.+)$", target_skill_text or ""))
strong = set(_target_tokens(path_text + "\n" + headings))
weak = set(strong)
counts = Counter(_target_tokens(target_skill_text or ""))
for token, _count in counts.most_common(limit):
weak.add(token)
_expand_target_keywords(strong)
_expand_target_keywords(weak)
return strong, weak
def _task_search_text(task: TaskRecord) -> str:
return "\n".join([
task.intent or "",
task.context_excerpt or "",
" ".join(task.tags or []),
])
def filter_tasks_for_target(
tasks: List[TaskRecord],
target_skill_text: str,
target_skill_path: str = "",
) -> List[TaskRecord]:
"""Prefer tasks whose language overlaps the explicit target skill.
If nothing matches, return the original list. This keeps a target run useful
even when transcripts are too sparse or the skill is too generic.
"""
strong, weak = target_task_keywords(target_skill_text, target_skill_path)
if not tasks or not (strong or weak):
return tasks
ranked = []
for idx, task in enumerate(tasks):
tokens = set(_target_tokens(_task_search_text(task)))
strong_hits = tokens & strong
weak_hits = tokens & weak
if not strong_hits and len(weak_hits) < 2:
continue
score = len(strong_hits) * 3 + len(weak_hits)
ranked.append((score, idx, task))
if not ranked:
return tasks
ranked.sort(key=lambda item: (-item[0], item[1]))
return [task for _score, _idx, task in ranked]
def heuristic_mine(
digests: List[SessionDigest],
*,
@@ -192,11 +287,15 @@ def mine(
digests: List[SessionDigest],
*,
max_tasks: int = 40,
candidate_limit: int = 0,
holdout_fraction: float = 0.34,
seed: int = 42,
llm_miner: Optional[Callable[[List[SessionDigest]], List[TaskRecord]]] = None,
target_skill_text: str = "",
target_skill_path: str = "",
) -> List[TaskRecord]:
"""Top-level miner. Uses ``llm_miner`` if provided, else heuristic."""
candidate_limit = candidate_limit or max_tasks
tasks: List[TaskRecord] = []
if llm_miner is not None:
try:
@@ -204,7 +303,10 @@ def mine(
except Exception:
tasks = []
if not tasks:
tasks = heuristic_mine(digests, max_tasks=max_tasks)
tasks = heuristic_mine(digests, max_tasks=candidate_limit)
tasks = dedup_tasks(tasks)
if target_skill_text or target_skill_path:
tasks = filter_tasks_for_target(tasks, target_skill_text, target_skill_path)
tasks = tasks[:max_tasks]
tasks = assign_splits(tasks, holdout_fraction=holdout_fraction, seed=seed)
return tasks

View File

@@ -0,0 +1,81 @@
"""Reviewed task-file helpers for privacy-safe SkillOpt-Sleep runs."""
from __future__ import annotations
import json
import os
from typing import Any, Dict, List, Tuple
from skillopt_sleep.mine import assign_splits, normalize_legacy_split
from skillopt_sleep.types import TaskRecord
def make_tasks_payload(
tasks: List[TaskRecord],
*,
project: str,
transcript_source: str = "",
n_sessions: int = 0,
target_skill_path: str = "",
) -> Dict[str, Any]:
return {
"format": "skillopt_sleep.tasks.v1",
"project": project,
"transcript_source": transcript_source,
"n_sessions": n_sessions,
"target_skill_path": target_skill_path,
"reviewed": False,
"tasks": [t.to_dict() for t in tasks],
}
def write_tasks_file(path: str, payload: Dict[str, Any]) -> str:
out = os.path.abspath(os.path.expanduser(path))
parent = os.path.dirname(out)
if parent:
os.makedirs(parent, exist_ok=True)
with open(out, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
f.write("\n")
return out
def _normalize_tasks(
tasks: List[TaskRecord],
*,
holdout_fraction: float,
seed: int,
) -> List[TaskRecord]:
for task in tasks:
task.split = normalize_legacy_split(task.split or "train")
if len(tasks) >= 2 and not any(task.split in {"val", "test"} for task in tasks):
tasks = assign_splits(tasks, holdout_fraction=holdout_fraction, seed=seed)
return tasks
def load_tasks_file(
path: str,
*,
holdout_fraction: float = 0.34,
seed: int = 42,
) -> Tuple[List[TaskRecord], Dict[str, Any]]:
source = os.path.abspath(os.path.expanduser(path))
with open(source, encoding="utf-8") as f:
payload = json.load(f)
if isinstance(payload, list):
meta: Dict[str, Any] = {"format": "skillopt_sleep.tasks.v1", "tasks_file": source}
raw_tasks = payload
elif isinstance(payload, dict):
meta = {k: v for k, v in payload.items() if k != "tasks"}
meta["tasks_file"] = source
raw_tasks = payload.get("tasks", [])
else:
raise ValueError("tasks file must contain a JSON object with tasks or a JSON task array")
if not isinstance(raw_tasks, list):
raise ValueError("tasks file field 'tasks' must be an array")
tasks: List[TaskRecord] = []
for item in raw_tasks:
if not isinstance(item, dict):
raise ValueError("each task entry must be a JSON object")
tasks.append(TaskRecord.from_dict(item))
return _normalize_tasks(tasks, holdout_fraction=holdout_fraction, seed=seed), meta

View File

@@ -135,6 +135,7 @@ class SleepReport:
candidate_score: float = 0.0
accepted: bool = False
gate_action: str = ""
no_edits_reason: str = ""
edits: List[EditRecord] = field(default_factory=list)
rejected_edits: List[EditRecord] = field(default_factory=list)
tokens_used: int = 0

View File

@@ -10,6 +10,7 @@ import json
import os
import tempfile
import unittest
from unittest import mock
from skillopt_sleep.backend import MockBackend, exact_score, keyword_soft_score
from skillopt_sleep.config import load_config
@@ -18,9 +19,9 @@ from skillopt_sleep.cycle import run_sleep_cycle
from skillopt_sleep.experiments.personas import programmer_persona, researcher_persona
from skillopt_sleep.harvest import _detect_feedback, _is_meta_prompt, digest_transcript
from skillopt_sleep.memory import apply_edits, current_learned_lines, extract_learned, set_learned
from skillopt_sleep.mine import assign_splits, heuristic_mine
from skillopt_sleep.mine import assign_splits, filter_tasks_for_target, heuristic_mine, mine
from skillopt_sleep.staging import adopt
from skillopt_sleep.types import EditRecord, SessionDigest, TaskRecord
from skillopt_sleep.types import EditRecord, SessionDigest, SleepReport, TaskRecord
class TestScoring(unittest.TestCase):
@@ -180,6 +181,208 @@ class TestHarvest(unittest.TestCase):
self.assertEqual(digests[0].session_id, "rollout-yoshi")
self.assertEqual(digests[0].user_prompts, ["fix Yoshi"])
def test_cli_exposes_limits_progress_and_target_skill_path(self):
from skillopt_sleep.__main__ import _cfg_from_args
with tempfile.TemporaryDirectory() as project:
Args = type("Args", (), {
"project": project,
"scope": "",
"backend": "codex",
"model": "",
"codex_path": "",
"claude_home": "",
"codex_home": "",
"source": "codex",
"lookback_hours": 0,
"edit_budget": 2,
"max_sessions": 5,
"max_tasks": 3,
"target_skill_path": ".agents/skills/taste-skill/SKILL.md",
"progress": True,
"auto_adopt": False,
})
cfg = _cfg_from_args(Args())
self.assertEqual(cfg.get("backend"), "codex")
self.assertEqual(cfg.get("max_sessions_per_night"), 5)
self.assertEqual(cfg.get("max_tasks_per_night"), 3)
self.assertTrue(cfg.get("progress"))
self.assertEqual(
cfg.managed_skill_path(),
os.path.join(project, ".agents/skills/taste-skill/SKILL.md"),
)
def test_cli_report_payload_includes_rejected_edits(self):
from skillopt_sleep.__main__ import _report_payload
report = SleepReport(
night=1,
project="/p",
edits=[EditRecord("skill", "add", "accepted rule")],
rejected_edits=[EditRecord("skill", "add", "rejected rule")],
)
outcome = type("Outcome", (), {"staging_dir": "", "adopted": False})()
payload = _report_payload(report, outcome)
self.assertEqual(payload["n_accepted_edits"], 1)
self.assertEqual(payload["n_rejected_edits"], 1)
self.assertEqual(payload["rejected_edits"][0]["content"], "rejected rule")
def test_tasks_file_roundtrip_and_split_assignment(self):
from skillopt_sleep.tasks_file import load_tasks_file, make_tasks_payload, write_tasks_file
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "tasks.json")
payload = make_tasks_payload(
[
TaskRecord(id="t1", project="/p", intent="configure MCP server"),
TaskRecord(id="t2", project="/p", intent="resolve Git conflict"),
],
project="/p",
transcript_source="codex",
n_sessions=2,
target_skill_path="/p/.agents/skills/yoshi-monorepo/SKILL.md",
)
written = write_tasks_file(path, payload)
tasks, meta = load_tasks_file(written, holdout_fraction=0.5, seed=1)
self.assertEqual(meta["target_skill_path"], "/p/.agents/skills/yoshi-monorepo/SKILL.md")
self.assertEqual([t.id for t in tasks], ["t1", "t2"])
self.assertIn("val", {t.split for t in tasks})
def test_cfg_uses_tasks_file_target_skill_path_metadata(self):
from skillopt_sleep.__main__ import _cfg_from_args
Args = type("Args", (), {
"project": "/repo/Yoshi",
"scope": "",
"backend": "",
"model": "",
"codex_path": "",
"claude_home": "",
"codex_home": "",
"source": "",
"lookback_hours": 0,
"edit_budget": 0,
"max_sessions": 0,
"max_tasks": 0,
"target_skill_path": "",
"progress": False,
"auto_adopt": False,
})
cfg = _cfg_from_args(Args(), task_meta={
"target_skill_path": ".agents/skills/yoshi-monorepo/SKILL.md",
})
self.assertEqual(
cfg.managed_skill_path(),
"/repo/Yoshi/.agents/skills/yoshi-monorepo/SKILL.md",
)
def test_cmd_run_uses_tasks_file_without_harvest(self):
from contextlib import redirect_stdout
from io import StringIO
from skillopt_sleep.__main__ import cmd_run
from skillopt_sleep.tasks_file import make_tasks_payload, write_tasks_file
with tempfile.TemporaryDirectory() as project, tempfile.TemporaryDirectory() as home:
target = os.path.join(project, ".agents/skills/yoshi-monorepo/SKILL.md")
os.makedirs(os.path.dirname(target))
with open(target, "w", encoding="utf-8") as f:
f.write("# Yoshi Monorepo\n")
tasks_path = os.path.join(home, "reviewed-tasks.json")
write_tasks_file(
tasks_path,
make_tasks_payload(
[
TaskRecord(id="t1", project=project, intent="configure MCP server"),
TaskRecord(id="t2", project=project, intent="resolve Git conflict"),
],
project=project,
n_sessions=2,
target_skill_path=target,
),
)
Args = type("Args", (), {
"project": project,
"scope": "",
"backend": "mock",
"model": "",
"codex_path": "",
"claude_home": os.path.join(home, ".claude"),
"codex_home": "",
"source": "",
"lookback_hours": 0,
"edit_budget": 2,
"max_sessions": 5,
"max_tasks": 3,
"target_skill_path": "",
"tasks_file": tasks_path,
"progress": False,
"auto_adopt": False,
"json": True,
})
out = StringIO()
with redirect_stdout(out):
rc = cmd_run(Args(), dry=True)
payload = json.loads(out.getvalue())
self.assertEqual(rc, 0)
self.assertEqual(payload["n_sessions"], 0)
self.assertEqual(payload["n_tasks"], 2)
self.assertEqual(payload["tasks_file"], tasks_path)
def test_cmd_run_refuses_unreviewed_tasks_file_for_real_backend(self):
from contextlib import redirect_stderr
from io import StringIO
from skillopt_sleep.__main__ import cmd_run
from skillopt_sleep.tasks_file import make_tasks_payload, write_tasks_file
with tempfile.TemporaryDirectory() as project, tempfile.TemporaryDirectory() as home:
tasks_path = os.path.join(home, "reviewed-tasks.json")
write_tasks_file(
tasks_path,
make_tasks_payload(
[TaskRecord(id="t1", project=project, intent="configure MCP server")],
project=project,
target_skill_path=os.path.join(project, ".agents/skills/yoshi-monorepo/SKILL.md"),
),
)
Args = type("Args", (), {
"project": project,
"scope": "",
"backend": "codex",
"model": "",
"codex_path": "",
"claude_home": os.path.join(home, ".claude"),
"codex_home": "",
"source": "",
"lookback_hours": 0,
"edit_budget": 2,
"max_sessions": 0,
"max_tasks": 0,
"target_skill_path": "",
"tasks_file": tasks_path,
"progress": False,
"auto_adopt": False,
"json": True,
})
err = StringIO()
with redirect_stderr(err):
rc = cmd_run(Args(), dry=True)
self.assertEqual(rc, 2)
self.assertIn("unreviewed tasks file", err.getvalue())
class TestMine(unittest.TestCase):
def _digest(self, prompts, feedback):
@@ -220,6 +423,59 @@ class TestMine(unittest.TestCase):
# and val/test are disjoint (a task is in exactly one split)
self.assertTrue(any(t.split == "val" for t in tasks))
def test_target_filter_prefers_matching_skill_terms(self):
skill = """# Yoshi Monorepo
## MCP Setup Requests
Configure Codex MCP servers from linked setup docs.
## Local Git Conflicts
Resolve local Git conflicts during merge, rebase, or cherry-pick.
"""
tasks = [
TaskRecord(id="ios", project="/p", intent="polish SwiftUI onboarding spacing"),
TaskRecord(id="mcp", project="/p", intent="configure an MCP server from docs"),
TaskRecord(id="git", project="/p", intent="resolve a local Git conflict"),
TaskRecord(id="api", project="/p", intent="deploy the Rails API with Kamal"),
]
filtered = filter_tasks_for_target(
tasks,
skill,
".agents/skills/yoshi-monorepo/SKILL.md",
)
self.assertEqual({t.id for t in filtered}, {"mcp", "git"})
def test_mine_oversamples_before_target_filtering(self):
skill = """# Yoshi Monorepo
## MCP Setup Requests
Configure Codex MCP servers.
## Local Git Conflicts
Resolve local Git conflicts.
"""
digests = [
self._digest(["polish SwiftUI onboarding spacing"], ["neg:missed"]),
self._digest(["configure an MCP server from docs"], ["neg:missed"]),
self._digest(["resolve a local Git conflict"], ["neg:missed"]),
]
tasks = mine(
digests,
max_tasks=2,
candidate_limit=3,
target_skill_text=skill,
target_skill_path=".agents/skills/yoshi-monorepo/SKILL.md",
seed=42,
)
self.assertEqual({t.intent for t in tasks}, {
"configure an MCP server from docs",
"resolve a local Git conflict",
})
class TestConsolidateGate(unittest.TestCase):
def test_accepts_helpful_rejects_harmful(self):
@@ -366,6 +622,39 @@ class TestMultiObjectiveAndPrefs(unittest.TestCase):
self.assertGreaterEqual(r.latency_ms, 0.0)
class TestCodexBackend(unittest.TestCase):
def test_codex_cli_backend_runs_exec_in_project_dir(self):
from skillopt_sleep.backend import CodexCliBackend
calls = []
def fake_run(cmd, **kwargs):
calls.append((cmd, kwargs))
out_path = cmd[cmd.index("-o") + 1]
with open(out_path, "w", encoding="utf-8") as f:
f.write("ok")
class Proc:
returncode = 0
stdout = ""
stderr = ""
return Proc()
with tempfile.TemporaryDirectory() as project:
expected_project = os.path.abspath(project)
backend = CodexCliBackend(codex_path="codex", project_dir=project)
with mock.patch("skillopt_sleep.backend.subprocess.run", side_effect=fake_run):
self.assertEqual(backend._call("hello"), "ok")
self.assertEqual(len(calls), 1)
cmd, kwargs = calls[0]
self.assertEqual(kwargs["cwd"], expected_project)
self.assertIn("-C", cmd)
self.assertEqual(cmd[cmd.index("-C") + 1], expected_project)
class TestMultiRolloutAndBudget(unittest.TestCase):
def test_rolloutset_stats(self):
from skillopt_sleep.rollout import RolloutSet
@@ -508,6 +797,33 @@ class TestFullCycleAndAdopt(unittest.TestCase):
with open(live_skill) as f:
self.assertIn("answer", f.read().lower())
def test_cycle_can_target_repo_scoped_skill_path(self):
with tempfile.TemporaryDirectory() as proj, tempfile.TemporaryDirectory() as home:
target = os.path.join(proj, ".agents/skills/taste-skill/SKILL.md")
cfg = load_config(
invoked_project=proj,
projects="invoked",
backend="mock",
claude_home=os.path.join(home, ".claude"),
target_skill_path=target,
auto_adopt=False,
)
tasks = assign_splits(programmer_persona(), holdout_fraction=0.34, seed=42)
outcome = run_sleep_cycle(cfg, seed_tasks=tasks)
self.assertTrue(outcome.report.accepted)
manifest_path = os.path.join(outcome.staging_dir, "manifest.json")
with open(manifest_path, encoding="utf-8") as f:
manifest = json.load(f)
self.assertEqual(manifest["live_skill_path"], target)
self.assertFalse(os.path.exists(target))
updated = adopt(outcome.staging_dir)
self.assertIn(target, updated)
self.assertTrue(os.path.exists(target))
class TestCopilotBackend(unittest.TestCase):
"""Pure-logic tests for CopilotCliBackend — no `copilot` CLI required."""