mirror of
https://github.com/microsoft/SkillOpt.git
synced 2026-07-03 14:02:58 +08:00
- harvest.py: revert break to continue — mtime ordering can diverge from embedded ended_at timestamps (copy/touch), so we must check all files rather than early-exiting on the first old one - cycle.py: use `is not None and > 0` so lookback_hours=0 means "scan full history" (opt-out of the cutoff) - __main__.py: propagate --lookback-hours 0 to config as explicit 0 Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
305 lines
10 KiB
Python
305 lines
10 KiB
Python
"""SkillOpt-Sleep — Stage 1: harvest.
|
|
|
|
Read the user's local Claude Code records (read-only) and normalize them
|
|
into :class:`SessionDigest` objects.
|
|
|
|
Sources (verified schema):
|
|
* ~/.claude/history.jsonl — one JSON/line:
|
|
{"display": <prompt text>, "pastedContents": {...},
|
|
"timestamp": <epoch ms>, "project": <abs path>}
|
|
* ~/.claude/projects/<slug>/<sessionId>.jsonl — one record/line; the
|
|
records we care about have type "user"/"assistant" and carry:
|
|
message{role, content}, cwd, gitBranch, timestamp, sessionId, version
|
|
|
|
This module performs NO writes and NO network calls.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
from typing import Any, Dict, Iterable, List, Optional
|
|
|
|
from skillopt_sleep.types import SessionDigest
|
|
|
|
|
|
# Heuristic phrases that signal the user (dis)approving of prior output.
|
|
# English-only by default. Users whose sessions are in another language can add
|
|
# their own phrases via the SKILLOPT_SLEEP_NEG_FEEDBACK / _POS_FEEDBACK env vars
|
|
# (comma-separated), so the capability is extensible without hardcoding locales.
|
|
_NEGATIVE_FEEDBACK = (
|
|
"still broken", "still not", "still wrong", "doesn't work", "does not work",
|
|
"not working", "that's wrong", "thats wrong", "incorrect", "wrong",
|
|
"no,", "nope", "fix it", "didn't", "did not", "broken", "error again",
|
|
"still failing", "still fails", "not fixed", "revert", "undo",
|
|
)
|
|
_POSITIVE_FEEDBACK = (
|
|
"thanks", "thank you", "perfect", "great", "works now", "fixed",
|
|
"that works", "lgtm", "looks good", "nice", "awesome", "correct",
|
|
)
|
|
|
|
|
|
def _extra_phrases(env_var: str) -> tuple:
|
|
raw = os.environ.get(env_var, "")
|
|
return tuple(p.strip().lower() for p in raw.split(",") if p.strip())
|
|
|
|
|
|
_NEGATIVE_FEEDBACK = _NEGATIVE_FEEDBACK + _extra_phrases("SKILLOPT_SLEEP_NEG_FEEDBACK")
|
|
_POSITIVE_FEEDBACK = _POSITIVE_FEEDBACK + _extra_phrases("SKILLOPT_SLEEP_POS_FEEDBACK")
|
|
|
|
|
|
def _iter_jsonl(path: str) -> Iterable[Dict[str, Any]]:
|
|
try:
|
|
with open(path, encoding="utf-8") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
yield json.loads(line)
|
|
except Exception:
|
|
continue
|
|
except (FileNotFoundError, IsADirectoryError, PermissionError):
|
|
return
|
|
|
|
|
|
def _text_from_content(content: Any) -> str:
|
|
"""Flatten a message.content (str or list of blocks) into text."""
|
|
if isinstance(content, str):
|
|
return content
|
|
if isinstance(content, list):
|
|
parts: List[str] = []
|
|
for b in content:
|
|
if isinstance(b, dict):
|
|
if b.get("type") == "text" and b.get("text"):
|
|
parts.append(str(b["text"]))
|
|
return "\n".join(parts)
|
|
return ""
|
|
|
|
|
|
def _tool_names_from_content(content: Any) -> List[str]:
|
|
names: List[str] = []
|
|
if isinstance(content, list):
|
|
for b in content:
|
|
if isinstance(b, dict) and b.get("type") == "tool_use" and b.get("name"):
|
|
names.append(str(b["name"]))
|
|
return names
|
|
|
|
|
|
def _detect_feedback(text: str) -> List[str]:
|
|
low = text.lower()
|
|
sig: List[str] = []
|
|
for ph in _NEGATIVE_FEEDBACK:
|
|
if ph in low:
|
|
sig.append("neg:" + ph)
|
|
for ph in _POSITIVE_FEEDBACK:
|
|
if ph in low:
|
|
sig.append("pos:" + ph)
|
|
return sig
|
|
|
|
|
|
def _is_meta_prompt(text: str) -> bool:
|
|
"""Skip slash-commands / system noise that aren't real user intents."""
|
|
t = text.strip()
|
|
if not t:
|
|
return True
|
|
if t.startswith("<") and t.endswith(">"):
|
|
return True
|
|
if t.startswith("/") and len(t.split()) <= 3:
|
|
return True
|
|
if t.startswith("[Pasted text") or t.startswith("Caveat:"):
|
|
return True
|
|
return False
|
|
|
|
|
|
# ── Issue #62: filter headless replay sessions ─────────────────────────
|
|
|
|
# Prompt markers generated by the engine's own headless `claude -p` calls
|
|
# (judge, reflect, attempt). If the sole user prompt in a single-turn
|
|
# session matches any of these, the session is engine-generated, not a
|
|
# real user task.
|
|
_REPLAY_PROMPT_MARKERS = (
|
|
"## CURRENT SKILL",
|
|
"## FAILED TASKS",
|
|
"## SUCCESSFUL TASKS",
|
|
"## OUTPUT FORMAT",
|
|
"You are a strict grader",
|
|
"Score the response 0.0-1.0",
|
|
"You are SkillOpt-Sleep",
|
|
"## TASK\n",
|
|
"## SKILL\n",
|
|
)
|
|
|
|
|
|
def _is_headless_replay(digest: "SessionDigest") -> bool:
|
|
"""Detect sessions created by the engine's own headless replay calls.
|
|
|
|
Heuristics (conservatively applied):
|
|
1. Session has exactly 1 user turn AND
|
|
2. The sole prompt matches engine-generated patterns (grader/reflect),
|
|
OR the session lasted < 3 seconds (programmatic, not interactive).
|
|
Multi-turn sessions are always kept (interactive by definition).
|
|
"""
|
|
if digest.n_user_turns > 1:
|
|
return False
|
|
if digest.n_user_turns == 0:
|
|
return True
|
|
prompt = digest.user_prompts[0] if digest.user_prompts else ""
|
|
for marker in _REPLAY_PROMPT_MARKERS:
|
|
if marker in prompt:
|
|
return True
|
|
# Sub-3-second single-turn sessions with short prompts are almost
|
|
# certainly programmatic (engine grader/judge calls). We require the
|
|
# prompt to also be short (<200 chars) to avoid false-positives on
|
|
# real one-shot questions that Claude happens to answer quickly.
|
|
if digest.started_at and digest.ended_at and len(prompt) < 200:
|
|
try:
|
|
fmt = "%Y-%m-%dT%H:%M:%S"
|
|
start = datetime.strptime(digest.started_at[:19], fmt)
|
|
end = datetime.strptime(digest.ended_at[:19], fmt)
|
|
if (end - start).total_seconds() < 3:
|
|
return True
|
|
except (ValueError, TypeError):
|
|
pass
|
|
return False
|
|
|
|
|
|
def digest_transcript(path: str) -> Optional[SessionDigest]:
|
|
"""Build a SessionDigest from one ``<sessionId>.jsonl`` transcript."""
|
|
session_id = os.path.splitext(os.path.basename(path))[0]
|
|
project = ""
|
|
git_branch = ""
|
|
started = ""
|
|
ended = ""
|
|
user_prompts: List[str] = []
|
|
assistant_finals: List[str] = []
|
|
tools: List[str] = []
|
|
files: List[str] = []
|
|
feedback: List[str] = []
|
|
n_user = 0
|
|
n_asst = 0
|
|
|
|
for rec in _iter_jsonl(path):
|
|
rtype = rec.get("type")
|
|
ts = rec.get("timestamp")
|
|
if isinstance(ts, str) and ts:
|
|
if not started:
|
|
started = ts
|
|
ended = ts
|
|
if rec.get("cwd") and not project:
|
|
project = str(rec.get("cwd"))
|
|
if rec.get("gitBranch") and not git_branch:
|
|
git_branch = str(rec.get("gitBranch"))
|
|
if rtype == "file-history-snapshot":
|
|
snap = rec.get("snapshot") or rec.get("files") or {}
|
|
if isinstance(snap, dict):
|
|
files.extend([str(k) for k in list(snap.keys())[:20]])
|
|
msg = rec.get("message")
|
|
if not isinstance(msg, dict):
|
|
continue
|
|
role = msg.get("role")
|
|
content = msg.get("content")
|
|
if role == "user":
|
|
text = _text_from_content(content)
|
|
if text and not _is_meta_prompt(text):
|
|
n_user += 1
|
|
user_prompts.append(text.strip())
|
|
feedback.extend(_detect_feedback(text))
|
|
elif role == "assistant":
|
|
n_asst += 1
|
|
tools.extend(_tool_names_from_content(content))
|
|
text = _text_from_content(content)
|
|
if text.strip():
|
|
assistant_finals.append(text.strip())
|
|
|
|
if n_user == 0 and n_asst == 0:
|
|
return None
|
|
|
|
# de-dup tools/files preserving order
|
|
def _dedup(xs: List[str]) -> List[str]:
|
|
seen = set()
|
|
out = []
|
|
for x in xs:
|
|
if x not in seen:
|
|
seen.add(x)
|
|
out.append(x)
|
|
return out
|
|
|
|
return SessionDigest(
|
|
session_id=session_id,
|
|
project=project,
|
|
git_branch=git_branch,
|
|
started_at=started,
|
|
ended_at=ended,
|
|
user_prompts=user_prompts,
|
|
assistant_finals=assistant_finals[-5:], # last few finals are the useful ones
|
|
tools_used=_dedup(tools),
|
|
files_touched=_dedup(files),
|
|
feedback_signals=feedback,
|
|
n_user_turns=n_user,
|
|
n_assistant_turns=n_asst,
|
|
raw_path=path,
|
|
)
|
|
|
|
|
|
def _project_matches(project: str, scope: Any, invoked: str) -> bool:
|
|
if scope == "all":
|
|
return True
|
|
if isinstance(scope, (list, tuple)):
|
|
return any(os.path.abspath(project) == os.path.abspath(p) for p in scope)
|
|
# "invoked": match the invoked project (or a subdir of it)
|
|
if not invoked:
|
|
return True
|
|
a = os.path.abspath(project)
|
|
b = os.path.abspath(invoked)
|
|
return a == b or a.startswith(b + os.sep) or b.startswith(a + os.sep)
|
|
|
|
|
|
def harvest(
|
|
transcripts_dir: str,
|
|
*,
|
|
scope: Any = "all",
|
|
invoked_project: str = "",
|
|
since_iso: Optional[str] = None,
|
|
limit: int = 0,
|
|
) -> List[SessionDigest]:
|
|
"""Walk ~/.claude/projects and return digests matching scope/time.
|
|
|
|
Parameters
|
|
----------
|
|
transcripts_dir : str ~/.claude/projects
|
|
scope : "all" | "invoked" | list[path]
|
|
invoked_project : str used when scope == "invoked"
|
|
since_iso : str|None ISO8601; only sessions ending after this are kept
|
|
limit : int cap number of digests (0 = no cap)
|
|
"""
|
|
digests: List[SessionDigest] = []
|
|
if not os.path.isdir(transcripts_dir):
|
|
return digests
|
|
|
|
paths: List[str] = []
|
|
for root, _dirs, files in os.walk(transcripts_dir):
|
|
for fn in files:
|
|
if fn.endswith(".jsonl"):
|
|
paths.append(os.path.join(root, fn))
|
|
# newest first by mtime
|
|
paths.sort(key=lambda p: os.path.getmtime(p), reverse=True)
|
|
|
|
for p in paths:
|
|
d = digest_transcript(p)
|
|
if d is None:
|
|
continue
|
|
if _is_headless_replay(d):
|
|
continue # Issue #62: skip engine's own headless replay sessions
|
|
if not _project_matches(d.project or "", scope, invoked_project):
|
|
continue
|
|
if since_iso and d.ended_at and d.ended_at < since_iso:
|
|
# Note: files are sorted by mtime but we compare the embedded
|
|
# ended_at timestamp — mtime can diverge (copy/touch), so we
|
|
# cannot break here; we must continue to check all files.
|
|
continue
|
|
digests.append(d)
|
|
if limit and len(digests) >= limit:
|
|
break
|
|
return digests
|