mirror of
https://github.com/microsoft/SkillOpt.git
synced 2026-07-03 14:02:58 +08:00
Open-source-tool / research-code separation:
- git mv skillopt/sleep/ -> skillopt_sleep/ (top-level, sibling to the research
skillopt/ package). History preserved as renames.
- All imports skillopt.sleep.* -> skillopt_sleep.*.
- Vendor the validation gate into skillopt_sleep/gate.py (a self-contained copy
of skillopt.evaluation.gate). The engine now has ZERO dependency on the
research package — verified: grep finds no `from skillopt.` in skillopt_sleep/,
and consolidate's gate resolves to skillopt_sleep.gate.
- Plugin scripts/commands/skill call `-m skillopt_sleep`.
29 tests pass; `python -m skillopt_sleep` runs standalone.
Co-Authored-By: Claude Opus 4 <noreply@anthropic.com>
211 lines
7.5 KiB
Python
211 lines
7.5 KiB
Python
"""SkillOpt-Sleep — Stage 2: mine.
|
|
|
|
Turn :class:`SessionDigest` objects into :class:`TaskRecord` training units.
|
|
|
|
Two miners:
|
|
* heuristic_mine — deterministic, no API. Detects retry chains (a prompt
|
|
re-asked after negative feedback => the early attempt failed), extracts
|
|
the user's recurring intents, and labels outcomes from feedback signals.
|
|
* llm_mine — optional; uses an optimizer backend to produce richer
|
|
TaskRecords with checkable references. Falls back to heuristic on error.
|
|
|
|
The heuristic miner is what makes the whole cycle runnable offline and is the
|
|
basis of the deterministic experiment.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import re
|
|
from typing import Any, Callable, List, Optional
|
|
|
|
from skillopt_sleep.types import SessionDigest, TaskRecord
|
|
|
|
|
|
def _tid(project: str, intent: str) -> str:
|
|
h = hashlib.sha256((project + "::" + intent).encode("utf-8")).hexdigest()[:12]
|
|
return "task_" + h
|
|
|
|
|
|
def _short(text: str, n: int = 600) -> str:
|
|
text = (text or "").strip()
|
|
return text if len(text) <= n else text[:n] + " …"
|
|
|
|
|
|
def _looks_negative(signals: List[str]) -> bool:
|
|
return any(s.startswith("neg:") for s in signals)
|
|
|
|
|
|
def _looks_positive(signals: List[str]) -> bool:
|
|
return any(s.startswith("pos:") for s in signals)
|
|
|
|
|
|
def heuristic_mine(
|
|
digests: List[SessionDigest],
|
|
*,
|
|
max_tasks: int = 40,
|
|
) -> List[TaskRecord]:
|
|
"""Deterministic miner — no API calls.
|
|
|
|
Strategy:
|
|
* Each session with >=1 real user prompt yields one TaskRecord whose
|
|
intent is the FIRST substantive prompt (the original ask).
|
|
* Outcome is inferred:
|
|
- negative feedback present and no later positive -> "fail"
|
|
- positive feedback present -> "success"
|
|
- re-asks (multiple user turns) without resolution -> "mixed"
|
|
- otherwise -> "unknown"
|
|
* attempted_solution = the last assistant final (what was produced).
|
|
* reference_kind defaults to "none"; the consolidation step will use a
|
|
rubric judge for these. (Exact refs are added by the experiment data
|
|
or by the LLM miner when it can derive a checkable answer.)
|
|
"""
|
|
tasks: List[TaskRecord] = []
|
|
for d in digests:
|
|
if not d.user_prompts:
|
|
continue
|
|
intent = d.user_prompts[0]
|
|
if len(intent.strip()) < 8:
|
|
continue
|
|
if _looks_positive(d.feedback_signals) and not _looks_negative(d.feedback_signals):
|
|
outcome = "success"
|
|
elif _looks_negative(d.feedback_signals):
|
|
outcome = "fail"
|
|
elif d.n_user_turns >= 3:
|
|
outcome = "mixed"
|
|
else:
|
|
outcome = "unknown"
|
|
|
|
attempted = d.assistant_finals[-1] if d.assistant_finals else ""
|
|
context = ""
|
|
if len(d.user_prompts) > 1:
|
|
# later prompts often carry the corrective detail / real constraints
|
|
context = "Follow-up constraints from the same session:\n- " + "\n- ".join(
|
|
_short(p, 200) for p in d.user_prompts[1:4]
|
|
)
|
|
tags = []
|
|
if d.tools_used:
|
|
tags.append("tools:" + "+".join(d.tools_used[:4]))
|
|
if d.git_branch:
|
|
tags.append("branch:" + d.git_branch)
|
|
|
|
tasks.append(
|
|
TaskRecord(
|
|
id=_tid(d.project, intent),
|
|
project=d.project,
|
|
intent=_short(intent, 800),
|
|
context_excerpt=_short(context, 600),
|
|
attempted_solution=_short(attempted, 600),
|
|
outcome=outcome,
|
|
reference_kind="none",
|
|
reference="",
|
|
tags=tags,
|
|
source_sessions=[d.session_id],
|
|
)
|
|
)
|
|
if len(tasks) >= max_tasks:
|
|
break
|
|
return tasks
|
|
|
|
|
|
def dedup_tasks(tasks: List[TaskRecord]) -> List[TaskRecord]:
|
|
"""Merge tasks sharing an id (same project+intent across sessions)."""
|
|
by_id: dict = {}
|
|
for t in tasks:
|
|
if t.id in by_id:
|
|
ex = by_id[t.id]
|
|
ex.source_sessions = list(dict.fromkeys(ex.source_sessions + t.source_sessions))
|
|
# prefer a resolved outcome if either session resolved it
|
|
order = {"success": 3, "fail": 2, "mixed": 1, "unknown": 0}
|
|
if order.get(t.outcome, 0) > order.get(ex.outcome, 0):
|
|
ex.outcome = t.outcome
|
|
else:
|
|
by_id[t.id] = t
|
|
return list(by_id.values())
|
|
|
|
|
|
def assign_splits(
|
|
tasks: List[TaskRecord],
|
|
*,
|
|
val_fraction: float = 0.34,
|
|
test_fraction: float = 0.0,
|
|
holdout_fraction: float | None = None, # legacy alias for val_fraction
|
|
seed: int = 42,
|
|
) -> List[TaskRecord]:
|
|
"""Deterministically split tasks into train / val / test.
|
|
|
|
Anti-overfitting contract (the user's design):
|
|
* ``val`` and ``test`` are drawn ONLY from REAL mined tasks (origin=='real')
|
|
and never overlap. val gates updates; test is the final held-out measure.
|
|
* ``train`` may include DREAM-augmented tasks (origin=='dream'); those are
|
|
NEVER placed in val/test.
|
|
|
|
A stable hash of the task id keeps the same real task in the same split across
|
|
nights (a fixed held-out gate, like SkillOpt's D_sel/D_test).
|
|
|
|
Back-compat: if ``test_fraction`` is 0 (default), this behaves like the old
|
|
two-way replay/holdout split — real tasks divide into train + val, no test.
|
|
``holdout_fraction`` is accepted as an alias for ``val_fraction``.
|
|
"""
|
|
if holdout_fraction is not None:
|
|
val_fraction = holdout_fraction
|
|
|
|
dream = [t for t in tasks if t.origin == "dream"]
|
|
real = [t for t in tasks if t.origin != "dream"]
|
|
|
|
# all dream tasks go to train, unconditionally
|
|
for t in dream:
|
|
t.split = "train"
|
|
|
|
val_cut = int(round(val_fraction * 100))
|
|
test_cut = val_cut + int(round(test_fraction * 100))
|
|
for t in real:
|
|
bucket = int(hashlib.sha256((str(seed) + t.id).encode()).hexdigest(), 16) % 100
|
|
if bucket < val_cut:
|
|
t.split = "val"
|
|
elif bucket < test_cut:
|
|
t.split = "test"
|
|
else:
|
|
t.split = "train"
|
|
|
|
# guarantee val (the gate) is non-empty when we have >=2 real tasks
|
|
real_splits = {t.split for t in real}
|
|
if len(real) >= 2 and "val" not in real_splits:
|
|
real[-1].split = "val"
|
|
# guarantee a train pool exists (dream or real) when possible
|
|
if not any(t.split == "train" for t in tasks) and len(real) >= 2:
|
|
real[0].split = "train"
|
|
# if test was requested but ended up empty with >=3 real tasks, carve one
|
|
if test_fraction > 0 and len(real) >= 3 and not any(t.split == "test" for t in real):
|
|
for t in real:
|
|
if t.split == "train":
|
|
t.split = "test"
|
|
break
|
|
return tasks
|
|
|
|
|
|
def normalize_legacy_split(value: str) -> str:
|
|
"""Map old split names to the new vocabulary."""
|
|
return {"replay": "train", "holdout": "val"}.get(value, value)
|
|
|
|
|
|
def mine(
|
|
digests: List[SessionDigest],
|
|
*,
|
|
max_tasks: int = 40,
|
|
holdout_fraction: float = 0.34,
|
|
seed: int = 42,
|
|
llm_miner: Optional[Callable[[List[SessionDigest]], List[TaskRecord]]] = None,
|
|
) -> List[TaskRecord]:
|
|
"""Top-level miner. Uses ``llm_miner`` if provided, else heuristic."""
|
|
tasks: List[TaskRecord] = []
|
|
if llm_miner is not None:
|
|
try:
|
|
tasks = llm_miner(digests) or []
|
|
except Exception:
|
|
tasks = []
|
|
if not tasks:
|
|
tasks = heuristic_mine(digests, max_tasks=max_tasks)
|
|
tasks = dedup_tasks(tasks)
|
|
tasks = assign_splits(tasks, holdout_fraction=holdout_fraction, seed=seed)
|
|
return tasks
|