Files
carpedkm 01075c90d3 fix: address codex round 2 — revert harvest break + allow lookback 0
- harvest.py: revert break to continue — mtime ordering can diverge
  from embedded ended_at timestamps (copy/touch), so we must check all
  files rather than early-exiting on the first old one
- cycle.py: use `is not None and > 0` so lookback_hours=0 means
  "scan full history" (opt-out of the cutoff)
- __main__.py: propagate --lookback-hours 0 to config as explicit 0

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-20 14:21:18 +00:00

305 lines
10 KiB
Python

"""SkillOpt-Sleep — Stage 1: harvest.
Read the user's local Claude Code records (read-only) and normalize them
into :class:`SessionDigest` objects.
Sources (verified schema):
* ~/.claude/history.jsonl — one JSON/line:
{"display": <prompt text>, "pastedContents": {...},
"timestamp": <epoch ms>, "project": <abs path>}
* ~/.claude/projects/<slug>/<sessionId>.jsonl — one record/line; the
records we care about have type "user"/"assistant" and carry:
message{role, content}, cwd, gitBranch, timestamp, sessionId, version
This module performs NO writes and NO network calls.
"""
from __future__ import annotations
import json
import os
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional
from skillopt_sleep.types import SessionDigest
# Heuristic phrases that signal the user (dis)approving of prior output.
# English-only by default. Users whose sessions are in another language can add
# their own phrases via the SKILLOPT_SLEEP_NEG_FEEDBACK / _POS_FEEDBACK env vars
# (comma-separated), so the capability is extensible without hardcoding locales.
_NEGATIVE_FEEDBACK = (
"still broken", "still not", "still wrong", "doesn't work", "does not work",
"not working", "that's wrong", "thats wrong", "incorrect", "wrong",
"no,", "nope", "fix it", "didn't", "did not", "broken", "error again",
"still failing", "still fails", "not fixed", "revert", "undo",
)
_POSITIVE_FEEDBACK = (
"thanks", "thank you", "perfect", "great", "works now", "fixed",
"that works", "lgtm", "looks good", "nice", "awesome", "correct",
)
def _extra_phrases(env_var: str) -> tuple:
raw = os.environ.get(env_var, "")
return tuple(p.strip().lower() for p in raw.split(",") if p.strip())
_NEGATIVE_FEEDBACK = _NEGATIVE_FEEDBACK + _extra_phrases("SKILLOPT_SLEEP_NEG_FEEDBACK")
_POSITIVE_FEEDBACK = _POSITIVE_FEEDBACK + _extra_phrases("SKILLOPT_SLEEP_POS_FEEDBACK")
def _iter_jsonl(path: str) -> Iterable[Dict[str, Any]]:
try:
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except Exception:
continue
except (FileNotFoundError, IsADirectoryError, PermissionError):
return
def _text_from_content(content: Any) -> str:
"""Flatten a message.content (str or list of blocks) into text."""
if isinstance(content, str):
return content
if isinstance(content, list):
parts: List[str] = []
for b in content:
if isinstance(b, dict):
if b.get("type") == "text" and b.get("text"):
parts.append(str(b["text"]))
return "\n".join(parts)
return ""
def _tool_names_from_content(content: Any) -> List[str]:
names: List[str] = []
if isinstance(content, list):
for b in content:
if isinstance(b, dict) and b.get("type") == "tool_use" and b.get("name"):
names.append(str(b["name"]))
return names
def _detect_feedback(text: str) -> List[str]:
low = text.lower()
sig: List[str] = []
for ph in _NEGATIVE_FEEDBACK:
if ph in low:
sig.append("neg:" + ph)
for ph in _POSITIVE_FEEDBACK:
if ph in low:
sig.append("pos:" + ph)
return sig
def _is_meta_prompt(text: str) -> bool:
"""Skip slash-commands / system noise that aren't real user intents."""
t = text.strip()
if not t:
return True
if t.startswith("<") and t.endswith(">"):
return True
if t.startswith("/") and len(t.split()) <= 3:
return True
if t.startswith("[Pasted text") or t.startswith("Caveat:"):
return True
return False
# ── Issue #62: filter headless replay sessions ─────────────────────────
# Prompt markers generated by the engine's own headless `claude -p` calls
# (judge, reflect, attempt). If the sole user prompt in a single-turn
# session matches any of these, the session is engine-generated, not a
# real user task.
_REPLAY_PROMPT_MARKERS = (
"## CURRENT SKILL",
"## FAILED TASKS",
"## SUCCESSFUL TASKS",
"## OUTPUT FORMAT",
"You are a strict grader",
"Score the response 0.0-1.0",
"You are SkillOpt-Sleep",
"## TASK\n",
"## SKILL\n",
)
def _is_headless_replay(digest: "SessionDigest") -> bool:
"""Detect sessions created by the engine's own headless replay calls.
Heuristics (conservatively applied):
1. Session has exactly 1 user turn AND
2. The sole prompt matches engine-generated patterns (grader/reflect),
OR the session lasted < 3 seconds (programmatic, not interactive).
Multi-turn sessions are always kept (interactive by definition).
"""
if digest.n_user_turns > 1:
return False
if digest.n_user_turns == 0:
return True
prompt = digest.user_prompts[0] if digest.user_prompts else ""
for marker in _REPLAY_PROMPT_MARKERS:
if marker in prompt:
return True
# Sub-3-second single-turn sessions with short prompts are almost
# certainly programmatic (engine grader/judge calls). We require the
# prompt to also be short (<200 chars) to avoid false-positives on
# real one-shot questions that Claude happens to answer quickly.
if digest.started_at and digest.ended_at and len(prompt) < 200:
try:
fmt = "%Y-%m-%dT%H:%M:%S"
start = datetime.strptime(digest.started_at[:19], fmt)
end = datetime.strptime(digest.ended_at[:19], fmt)
if (end - start).total_seconds() < 3:
return True
except (ValueError, TypeError):
pass
return False
def digest_transcript(path: str) -> Optional[SessionDigest]:
"""Build a SessionDigest from one ``<sessionId>.jsonl`` transcript."""
session_id = os.path.splitext(os.path.basename(path))[0]
project = ""
git_branch = ""
started = ""
ended = ""
user_prompts: List[str] = []
assistant_finals: List[str] = []
tools: List[str] = []
files: List[str] = []
feedback: List[str] = []
n_user = 0
n_asst = 0
for rec in _iter_jsonl(path):
rtype = rec.get("type")
ts = rec.get("timestamp")
if isinstance(ts, str) and ts:
if not started:
started = ts
ended = ts
if rec.get("cwd") and not project:
project = str(rec.get("cwd"))
if rec.get("gitBranch") and not git_branch:
git_branch = str(rec.get("gitBranch"))
if rtype == "file-history-snapshot":
snap = rec.get("snapshot") or rec.get("files") or {}
if isinstance(snap, dict):
files.extend([str(k) for k in list(snap.keys())[:20]])
msg = rec.get("message")
if not isinstance(msg, dict):
continue
role = msg.get("role")
content = msg.get("content")
if role == "user":
text = _text_from_content(content)
if text and not _is_meta_prompt(text):
n_user += 1
user_prompts.append(text.strip())
feedback.extend(_detect_feedback(text))
elif role == "assistant":
n_asst += 1
tools.extend(_tool_names_from_content(content))
text = _text_from_content(content)
if text.strip():
assistant_finals.append(text.strip())
if n_user == 0 and n_asst == 0:
return None
# de-dup tools/files preserving order
def _dedup(xs: List[str]) -> List[str]:
seen = set()
out = []
for x in xs:
if x not in seen:
seen.add(x)
out.append(x)
return out
return SessionDigest(
session_id=session_id,
project=project,
git_branch=git_branch,
started_at=started,
ended_at=ended,
user_prompts=user_prompts,
assistant_finals=assistant_finals[-5:], # last few finals are the useful ones
tools_used=_dedup(tools),
files_touched=_dedup(files),
feedback_signals=feedback,
n_user_turns=n_user,
n_assistant_turns=n_asst,
raw_path=path,
)
def _project_matches(project: str, scope: Any, invoked: str) -> bool:
if scope == "all":
return True
if isinstance(scope, (list, tuple)):
return any(os.path.abspath(project) == os.path.abspath(p) for p in scope)
# "invoked": match the invoked project (or a subdir of it)
if not invoked:
return True
a = os.path.abspath(project)
b = os.path.abspath(invoked)
return a == b or a.startswith(b + os.sep) or b.startswith(a + os.sep)
def harvest(
transcripts_dir: str,
*,
scope: Any = "all",
invoked_project: str = "",
since_iso: Optional[str] = None,
limit: int = 0,
) -> List[SessionDigest]:
"""Walk ~/.claude/projects and return digests matching scope/time.
Parameters
----------
transcripts_dir : str ~/.claude/projects
scope : "all" | "invoked" | list[path]
invoked_project : str used when scope == "invoked"
since_iso : str|None ISO8601; only sessions ending after this are kept
limit : int cap number of digests (0 = no cap)
"""
digests: List[SessionDigest] = []
if not os.path.isdir(transcripts_dir):
return digests
paths: List[str] = []
for root, _dirs, files in os.walk(transcripts_dir):
for fn in files:
if fn.endswith(".jsonl"):
paths.append(os.path.join(root, fn))
# newest first by mtime
paths.sort(key=lambda p: os.path.getmtime(p), reverse=True)
for p in paths:
d = digest_transcript(p)
if d is None:
continue
if _is_headless_replay(d):
continue # Issue #62: skip engine's own headless replay sessions
if not _project_matches(d.project or "", scope, invoked_project):
continue
if since_iso and d.ended_at and d.ended_at < since_iso:
# Note: files are sorted by mtime but we compare the embedded
# ended_at timestamp — mtime can diverge (copy/touch), so we
# cannot break here; we must continue to check all files.
continue
digests.append(d)
if limit and len(digests) >= limit:
break
return digests