mirror of
https://github.com/microsoft/SkillOpt.git
synced 2026-07-03 14:02:58 +08:00
PR #92 added a per-cycle diagnostics.json that surfaces backend stderr, optimizer replies, and task responses so a 0.0 night is self-diagnosing. Those free-text fields can carry credentials (e.g. a codex 401 stderr dump containing an auth token), so persisting them verbatim was a new on-disk leak surface. - Add a shared redact_secrets() in staging.py and route diagnostics.json's call_error / reflect_raw_head / holdout_detail through it before writing. - Redact the codex and Claude auth-error log lines too (a secondary sink when a file log handler is attached); last_call_error stays raw in memory so _AUTH_MARKERS matching is unaffected. - Centralize _SECRET_PATTERNS in staging.py (harvest_codex now reuses them) and extend coverage to AWS / GitHub / Slack / Google / JWT token shapes. - Tests: secret-shape coverage, private-key blocks, recursive/scalar passthrough, no over-redaction of plain prose, fail-fast auth-error log redaction, and an end-to-end check that diagnostics.json has no secret. Observability-only; the gate and learning algorithm are unchanged. Co-Authored-By: Claude <noreply@anthropic.com>
234 lines
7.2 KiB
Python
234 lines
7.2 KiB
Python
"""SkillOpt-Sleep Codex Desktop session harvesting.
|
|
|
|
Reads Codex Desktop archived session JSONL files and normalizes them into
|
|
``SessionDigest`` records without copying developer/system instructions, tool
|
|
arguments, or raw tool outputs.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import re
|
|
from typing import Any, Dict, Iterable, List, Optional
|
|
|
|
from skillopt_sleep.harvest import (
|
|
_detect_feedback,
|
|
_is_meta_prompt,
|
|
_iter_jsonl,
|
|
_project_matches,
|
|
)
|
|
from skillopt_sleep.staging import _SECRET_PATTERNS
|
|
from skillopt_sleep.types import SessionDigest
|
|
|
|
|
|
def _payload(rec: Dict[str, Any]) -> Dict[str, Any]:
|
|
payload = rec.get("payload")
|
|
return payload if isinstance(payload, dict) else {}
|
|
|
|
|
|
def _timestamp(rec: Dict[str, Any], payload: Dict[str, Any]) -> str:
|
|
for value in (
|
|
payload.get("timestamp"),
|
|
rec.get("timestamp"),
|
|
payload.get("started_at"),
|
|
payload.get("completed_at"),
|
|
):
|
|
if isinstance(value, str) and value:
|
|
return value
|
|
return ""
|
|
|
|
|
|
def _text_from_any(content: Any) -> str:
|
|
if isinstance(content, str):
|
|
return content
|
|
if isinstance(content, list):
|
|
parts: List[str] = []
|
|
for item in content:
|
|
if isinstance(item, str):
|
|
parts.append(item)
|
|
elif isinstance(item, dict):
|
|
if item.get("type") == "text" and item.get("text"):
|
|
parts.append(str(item["text"]))
|
|
elif item.get("text"):
|
|
parts.append(str(item["text"]))
|
|
return "\n".join(parts)
|
|
if isinstance(content, dict):
|
|
if content.get("text"):
|
|
return str(content["text"])
|
|
if content.get("content"):
|
|
return _text_from_any(content["content"])
|
|
return ""
|
|
|
|
|
|
def _strip_codex_meta(text: str) -> str:
|
|
stripped = text.strip()
|
|
if not stripped:
|
|
return ""
|
|
if stripped.startswith("<codex_internal_context"):
|
|
return ""
|
|
if stripped.startswith("<environment_context"):
|
|
return ""
|
|
if stripped.startswith("# AGENTS.md instructions") or "--- project-doc ---" in stripped:
|
|
for marker in ("</environment_context>", "</INSTRUCTIONS>"):
|
|
idx = stripped.rfind(marker)
|
|
if idx == -1:
|
|
continue
|
|
tail = stripped[idx + len(marker):].strip()
|
|
if tail and not tail.startswith("<"):
|
|
return tail
|
|
return ""
|
|
return stripped
|
|
|
|
|
|
def _sanitize_text(text: str) -> str:
|
|
sanitized = _strip_codex_meta(text).replace("\x00", "").strip()
|
|
if not sanitized or _is_meta_prompt(sanitized):
|
|
return ""
|
|
for pattern, replacement in _SECRET_PATTERNS:
|
|
sanitized = pattern.sub(replacement, sanitized)
|
|
return sanitized
|
|
|
|
|
|
def _sanitize_tool_name(name: str) -> str:
|
|
return re.sub(r"[^A-Za-z0-9_.:-]+", "_", name)[:80]
|
|
|
|
|
|
def _tool_name(payload: Dict[str, Any]) -> str:
|
|
payload_type = payload.get("type")
|
|
name = payload.get("name")
|
|
if isinstance(name, str) and name:
|
|
return _sanitize_tool_name(name)
|
|
if payload_type == "exec_command_end":
|
|
return "exec_command"
|
|
if payload_type == "patch_apply_end":
|
|
return "apply_patch"
|
|
if payload_type == "web_search_call":
|
|
return "web_search"
|
|
if payload_type == "tool_search_call":
|
|
return "tool_search"
|
|
if isinstance(payload_type, str) and payload_type.endswith("_tool_call"):
|
|
return _sanitize_tool_name(payload_type)
|
|
return ""
|
|
|
|
|
|
def _dedup(xs: Iterable[str]) -> List[str]:
|
|
seen = set()
|
|
out: List[str] = []
|
|
for x in xs:
|
|
if x not in seen:
|
|
seen.add(x)
|
|
out.append(x)
|
|
return out
|
|
|
|
|
|
def digest_codex_archived_session(path: str, project: str = "") -> Optional[SessionDigest]:
|
|
"""Build a ``SessionDigest`` from one Codex Desktop archived session."""
|
|
session_id = os.path.splitext(os.path.basename(path))[0]
|
|
started = ""
|
|
ended = ""
|
|
session_project = ""
|
|
user_prompts: List[str] = []
|
|
assistant_finals: List[str] = []
|
|
tools: List[str] = []
|
|
feedback: List[str] = []
|
|
n_user = 0
|
|
n_asst = 0
|
|
|
|
for rec in _iter_jsonl(path):
|
|
payload = _payload(rec)
|
|
payload_type = payload.get("type")
|
|
ts = _timestamp(rec, payload)
|
|
if ts:
|
|
if not started:
|
|
started = ts
|
|
ended = ts
|
|
cwd = payload.get("cwd")
|
|
if isinstance(cwd, str) and cwd:
|
|
if not session_project:
|
|
session_project = cwd
|
|
if project and _project_matches(cwd, "invoked", project):
|
|
session_project = cwd
|
|
|
|
role = payload.get("role")
|
|
text = ""
|
|
output_role = ""
|
|
if payload_type == "user_message":
|
|
text = _text_from_any(payload.get("message"))
|
|
output_role = "user"
|
|
elif payload_type == "agent_message":
|
|
text = _text_from_any(payload.get("message"))
|
|
output_role = "assistant"
|
|
elif payload_type == "message" and role in {"user", "assistant"}:
|
|
text = _text_from_any(payload.get("content"))
|
|
output_role = str(role)
|
|
else:
|
|
tool = _tool_name(payload)
|
|
if tool:
|
|
tools.append(tool)
|
|
continue
|
|
|
|
sanitized = _sanitize_text(text)
|
|
if not sanitized:
|
|
continue
|
|
if output_role == "user":
|
|
n_user += 1
|
|
user_prompts.append(sanitized)
|
|
feedback.extend(_detect_feedback(sanitized))
|
|
elif output_role == "assistant":
|
|
n_asst += 1
|
|
assistant_finals.append(sanitized)
|
|
|
|
if project and not _project_matches(session_project or "", "invoked", project):
|
|
return None
|
|
if n_user == 0 and n_asst == 0:
|
|
return None
|
|
|
|
return SessionDigest(
|
|
session_id=session_id,
|
|
project=session_project,
|
|
started_at=started,
|
|
ended_at=ended,
|
|
user_prompts=user_prompts,
|
|
assistant_finals=assistant_finals[-5:],
|
|
tools_used=_dedup(tools),
|
|
files_touched=[],
|
|
feedback_signals=feedback,
|
|
n_user_turns=n_user,
|
|
n_assistant_turns=n_asst,
|
|
raw_path=path,
|
|
)
|
|
|
|
|
|
def harvest_codex(
|
|
archived_sessions_dir: str,
|
|
*,
|
|
scope: Any = "all",
|
|
invoked_project: str = "",
|
|
since_iso: Optional[str] = None,
|
|
limit: int = 0,
|
|
) -> List[SessionDigest]:
|
|
"""Walk ``~/.codex/archived_sessions`` and return matching digests."""
|
|
digests: List[SessionDigest] = []
|
|
if not os.path.isdir(archived_sessions_dir):
|
|
return digests
|
|
|
|
paths = [
|
|
os.path.join(archived_sessions_dir, fn)
|
|
for fn in os.listdir(archived_sessions_dir)
|
|
if fn.endswith(".jsonl")
|
|
]
|
|
paths.sort(key=lambda p: os.path.getmtime(p), reverse=True)
|
|
|
|
project_hint = invoked_project if scope == "invoked" else ""
|
|
for path in paths:
|
|
digest = digest_codex_archived_session(path, project=project_hint)
|
|
if digest is None:
|
|
continue
|
|
if not _project_matches(digest.project or "", scope, invoked_project):
|
|
continue
|
|
if since_iso and digest.ended_at and digest.ended_at < since_iso:
|
|
continue
|
|
digests.append(digest)
|
|
if limit and len(digests) >= limit:
|
|
break
|
|
return digests
|