Files
microsoft-SkillOpt/skillopt_sleep/mine.py
Yifan Yang b02ffc2c99 refactor(sleep): decouple engine to top-level skillopt_sleep/ (zero research dep)
Open-source-tool / research-code separation:
  - git mv skillopt/sleep/ -> skillopt_sleep/ (top-level, sibling to the research
    skillopt/ package). History preserved as renames.
  - All imports skillopt.sleep.* -> skillopt_sleep.*.
  - Vendor the validation gate into skillopt_sleep/gate.py (a self-contained copy
    of skillopt.evaluation.gate). The engine now has ZERO dependency on the
    research package — verified: grep finds no `from skillopt.` in skillopt_sleep/,
    and consolidate's gate resolves to skillopt_sleep.gate.
  - Plugin scripts/commands/skill call `-m skillopt_sleep`.

29 tests pass; `python -m skillopt_sleep` runs standalone.

Co-Authored-By: Claude Opus 4 <noreply@anthropic.com>
2026-06-08 14:31:52 +00:00

211 lines
7.5 KiB
Python

"""SkillOpt-Sleep — Stage 2: mine.
Turn :class:`SessionDigest` objects into :class:`TaskRecord` training units.
Two miners:
* heuristic_mine — deterministic, no API. Detects retry chains (a prompt
re-asked after negative feedback => the early attempt failed), extracts
the user's recurring intents, and labels outcomes from feedback signals.
* llm_mine — optional; uses an optimizer backend to produce richer
TaskRecords with checkable references. Falls back to heuristic on error.
The heuristic miner is what makes the whole cycle runnable offline and is the
basis of the deterministic experiment.
"""
from __future__ import annotations
import hashlib
import re
from typing import Any, Callable, List, Optional
from skillopt_sleep.types import SessionDigest, TaskRecord
def _tid(project: str, intent: str) -> str:
h = hashlib.sha256((project + "::" + intent).encode("utf-8")).hexdigest()[:12]
return "task_" + h
def _short(text: str, n: int = 600) -> str:
text = (text or "").strip()
return text if len(text) <= n else text[:n] + ""
def _looks_negative(signals: List[str]) -> bool:
return any(s.startswith("neg:") for s in signals)
def _looks_positive(signals: List[str]) -> bool:
return any(s.startswith("pos:") for s in signals)
def heuristic_mine(
digests: List[SessionDigest],
*,
max_tasks: int = 40,
) -> List[TaskRecord]:
"""Deterministic miner — no API calls.
Strategy:
* Each session with >=1 real user prompt yields one TaskRecord whose
intent is the FIRST substantive prompt (the original ask).
* Outcome is inferred:
- negative feedback present and no later positive -> "fail"
- positive feedback present -> "success"
- re-asks (multiple user turns) without resolution -> "mixed"
- otherwise -> "unknown"
* attempted_solution = the last assistant final (what was produced).
* reference_kind defaults to "none"; the consolidation step will use a
rubric judge for these. (Exact refs are added by the experiment data
or by the LLM miner when it can derive a checkable answer.)
"""
tasks: List[TaskRecord] = []
for d in digests:
if not d.user_prompts:
continue
intent = d.user_prompts[0]
if len(intent.strip()) < 8:
continue
if _looks_positive(d.feedback_signals) and not _looks_negative(d.feedback_signals):
outcome = "success"
elif _looks_negative(d.feedback_signals):
outcome = "fail"
elif d.n_user_turns >= 3:
outcome = "mixed"
else:
outcome = "unknown"
attempted = d.assistant_finals[-1] if d.assistant_finals else ""
context = ""
if len(d.user_prompts) > 1:
# later prompts often carry the corrective detail / real constraints
context = "Follow-up constraints from the same session:\n- " + "\n- ".join(
_short(p, 200) for p in d.user_prompts[1:4]
)
tags = []
if d.tools_used:
tags.append("tools:" + "+".join(d.tools_used[:4]))
if d.git_branch:
tags.append("branch:" + d.git_branch)
tasks.append(
TaskRecord(
id=_tid(d.project, intent),
project=d.project,
intent=_short(intent, 800),
context_excerpt=_short(context, 600),
attempted_solution=_short(attempted, 600),
outcome=outcome,
reference_kind="none",
reference="",
tags=tags,
source_sessions=[d.session_id],
)
)
if len(tasks) >= max_tasks:
break
return tasks
def dedup_tasks(tasks: List[TaskRecord]) -> List[TaskRecord]:
"""Merge tasks sharing an id (same project+intent across sessions)."""
by_id: dict = {}
for t in tasks:
if t.id in by_id:
ex = by_id[t.id]
ex.source_sessions = list(dict.fromkeys(ex.source_sessions + t.source_sessions))
# prefer a resolved outcome if either session resolved it
order = {"success": 3, "fail": 2, "mixed": 1, "unknown": 0}
if order.get(t.outcome, 0) > order.get(ex.outcome, 0):
ex.outcome = t.outcome
else:
by_id[t.id] = t
return list(by_id.values())
def assign_splits(
tasks: List[TaskRecord],
*,
val_fraction: float = 0.34,
test_fraction: float = 0.0,
holdout_fraction: float | None = None, # legacy alias for val_fraction
seed: int = 42,
) -> List[TaskRecord]:
"""Deterministically split tasks into train / val / test.
Anti-overfitting contract (the user's design):
* ``val`` and ``test`` are drawn ONLY from REAL mined tasks (origin=='real')
and never overlap. val gates updates; test is the final held-out measure.
* ``train`` may include DREAM-augmented tasks (origin=='dream'); those are
NEVER placed in val/test.
A stable hash of the task id keeps the same real task in the same split across
nights (a fixed held-out gate, like SkillOpt's D_sel/D_test).
Back-compat: if ``test_fraction`` is 0 (default), this behaves like the old
two-way replay/holdout split — real tasks divide into train + val, no test.
``holdout_fraction`` is accepted as an alias for ``val_fraction``.
"""
if holdout_fraction is not None:
val_fraction = holdout_fraction
dream = [t for t in tasks if t.origin == "dream"]
real = [t for t in tasks if t.origin != "dream"]
# all dream tasks go to train, unconditionally
for t in dream:
t.split = "train"
val_cut = int(round(val_fraction * 100))
test_cut = val_cut + int(round(test_fraction * 100))
for t in real:
bucket = int(hashlib.sha256((str(seed) + t.id).encode()).hexdigest(), 16) % 100
if bucket < val_cut:
t.split = "val"
elif bucket < test_cut:
t.split = "test"
else:
t.split = "train"
# guarantee val (the gate) is non-empty when we have >=2 real tasks
real_splits = {t.split for t in real}
if len(real) >= 2 and "val" not in real_splits:
real[-1].split = "val"
# guarantee a train pool exists (dream or real) when possible
if not any(t.split == "train" for t in tasks) and len(real) >= 2:
real[0].split = "train"
# if test was requested but ended up empty with >=3 real tasks, carve one
if test_fraction > 0 and len(real) >= 3 and not any(t.split == "test" for t in real):
for t in real:
if t.split == "train":
t.split = "test"
break
return tasks
def normalize_legacy_split(value: str) -> str:
"""Map old split names to the new vocabulary."""
return {"replay": "train", "holdout": "val"}.get(value, value)
def mine(
digests: List[SessionDigest],
*,
max_tasks: int = 40,
holdout_fraction: float = 0.34,
seed: int = 42,
llm_miner: Optional[Callable[[List[SessionDigest]], List[TaskRecord]]] = None,
) -> List[TaskRecord]:
"""Top-level miner. Uses ``llm_miner`` if provided, else heuristic."""
tasks: List[TaskRecord] = []
if llm_miner is not None:
try:
tasks = llm_miner(digests) or []
except Exception:
tasks = []
if not tasks:
tasks = heuristic_mine(digests, max_tasks=max_tasks)
tasks = dedup_tasks(tasks)
tasks = assign_splits(tasks, holdout_fraction=holdout_fraction, seed=seed)
return tasks