Files
Yif Yang 5487e2c426 fix(skillopt-sleep): redact secrets before persisting cycle diagnostics
PR #92 added a per-cycle diagnostics.json that surfaces backend stderr,
optimizer replies, and task responses so a 0.0 night is self-diagnosing.
Those free-text fields can carry credentials (e.g. a codex 401 stderr dump
containing an auth token), so persisting them verbatim was a new on-disk
leak surface.

- Add a shared redact_secrets() in staging.py and route diagnostics.json's
  call_error / reflect_raw_head / holdout_detail through it before writing.
- Redact the codex and Claude auth-error log lines too (a secondary sink
  when a file log handler is attached); last_call_error stays raw in memory
  so _AUTH_MARKERS matching is unaffected.
- Centralize _SECRET_PATTERNS in staging.py (harvest_codex now reuses them)
  and extend coverage to AWS / GitHub / Slack / Google / JWT token shapes.
- Tests: secret-shape coverage, private-key blocks, recursive/scalar
  passthrough, no over-redaction of plain prose, fail-fast auth-error log
  redaction, and an end-to-end check that diagnostics.json has no secret.

Observability-only; the gate and learning algorithm are unchanged.

Co-Authored-By: Claude <noreply@anthropic.com>
2026-06-30 19:47:36 +00:00

160 lines
5.8 KiB
Python

"""SkillOpt-Sleep — Stage 5/6: staging and adoption.
Implements the Dreams safety contract: the cycle never mutates the user's
live CLAUDE.md / SKILL.md. It writes proposals + a human-readable report into
a staging directory; a separate, explicit `adopt` step copies them over the
live files after taking a backup.
"""
from __future__ import annotations
import json
import os
import re
import shutil
import time
from typing import Any, List, Optional
from skillopt_sleep.types import SleepReport
# Secret patterns scrubbed from any free-text we persist to the staging dir
# (diagnostics, reports). Kept here so every on-disk artifact shares one
# redaction pass; harvest_codex reuses these for session text too.
_SECRET_PATTERNS: tuple[tuple[re.Pattern[str], str], ...] = (
(re.compile(r"sk-[A-Za-z0-9_-]{10,}"), "[REDACTED_OPENAI_KEY]"),
# Distinctive vendor token prefixes (low false-positive: these prefixes do
# not occur in normal diagnostic prose).
(re.compile(r"\bAKIA[0-9A-Z]{16}\b"), "[REDACTED_AWS_KEY]"),
(re.compile(r"\bgh[pousr]_[A-Za-z0-9]{20,}\b"), "[REDACTED_GITHUB_TOKEN]"),
(re.compile(r"\bxox[baprs]-[A-Za-z0-9-]{10,}\b"), "[REDACTED_SLACK_TOKEN]"),
(re.compile(r"\bAIza[0-9A-Za-z_-]{20,}\b"), "[REDACTED_GOOGLE_KEY]"),
# Bare JWT (three base64url segments) — e.g. a leaked bearer body without
# the "Authorization:" prefix.
(re.compile(r"\beyJ[A-Za-z0-9_-]{8,}\.[A-Za-z0-9_-]{8,}\.[A-Za-z0-9_-]{8,}\b"),
"[REDACTED_JWT]"),
(re.compile(r"(?i)(Authorization:\s*Bearer\s+)[^\s\"']+"), r"\1[REDACTED]"),
(re.compile(r"(?i)(Authorization:\s*Basic\s+)[^\s\"']+"), r"\1[REDACTED]"),
(
re.compile(r"(?i)\b(api[_-]?key|token|password|secret)\b(\s*[:=]\s*)[^\s\"']+"),
r"\1\2[REDACTED]",
),
(
re.compile(r"(?i)\b(api[_-]?key|token|password|secret)\b(\s+)[^\s\"']+"),
r"\1\2[REDACTED]",
),
(
re.compile(
r"-----BEGIN [A-Z ]*PRIVATE KEY-----.*?-----END [A-Z ]*PRIVATE KEY-----",
re.DOTALL,
),
"[REDACTED_PRIVATE_KEY]",
),
)
def redact_secrets(value: Any) -> Any:
"""Scrub secret-looking substrings (API keys, bearer tokens, private keys)
from a string, or recursively from the string leaves of a list/dict.
Used before writing backend stderr / optimizer replies / task responses to
on-disk diagnostics: those are surfaced for debugging, but the underlying
text (e.g. a codex 401 stderr dump) can carry credentials. Non-string
scalars pass through unchanged.
"""
if isinstance(value, str):
out = value
for pattern, replacement in _SECRET_PATTERNS:
out = pattern.sub(replacement, out)
return out
if isinstance(value, list):
return [redact_secrets(v) for v in value]
if isinstance(value, dict):
return {k: redact_secrets(v) for k, v in value.items()}
return value
def _ts_dir() -> str:
return time.strftime("%Y%m%d-%H%M%S", time.localtime())
def staging_root(project: str) -> str:
return os.path.join(project, ".skillopt-sleep", "staging")
def latest_staging(project: str) -> Optional[str]:
root = staging_root(project)
if not os.path.isdir(root):
return None
subs = sorted(
(os.path.join(root, d) for d in os.listdir(root)),
key=lambda p: os.path.getmtime(p),
reverse=True,
)
return subs[0] if subs else None
def write_staging(
project: str,
*,
report: SleepReport,
proposed_skill: Optional[str],
proposed_memory: Optional[str],
live_skill_path: str,
live_memory_path: str,
report_md: str,
) -> str:
"""Write proposals + report into staging/<ts>/ and return that path."""
out = os.path.join(staging_root(project), _ts_dir())
os.makedirs(out, exist_ok=True)
manifest = {
"live_skill_path": live_skill_path,
"live_memory_path": live_memory_path,
"has_skill": proposed_skill is not None,
"has_memory": proposed_memory is not None,
"accepted": report.accepted,
}
if proposed_skill is not None:
with open(os.path.join(out, "proposed_SKILL.md"), "w", encoding="utf-8") as f:
f.write(proposed_skill)
if proposed_memory is not None:
with open(os.path.join(out, "proposed_CLAUDE.md"), "w", encoding="utf-8") as f:
f.write(proposed_memory)
with open(os.path.join(out, "report.json"), "w", encoding="utf-8") as f:
json.dump(report.to_dict(), f, ensure_ascii=False, indent=2)
with open(os.path.join(out, "report.md"), "w", encoding="utf-8") as f:
f.write(report_md)
with open(os.path.join(out, "manifest.json"), "w", encoding="utf-8") as f:
json.dump(manifest, f, ensure_ascii=False, indent=2)
return out
def _backup(path: str, backup_dir: str) -> None:
if os.path.exists(path):
os.makedirs(backup_dir, exist_ok=True)
shutil.copy2(path, os.path.join(backup_dir, os.path.basename(path)))
def adopt(staging_dir: str) -> List[str]:
"""Copy staged proposals over the live files, backing up first.
Returns the list of live paths that were updated.
"""
with open(os.path.join(staging_dir, "manifest.json")) as f:
manifest = json.load(f)
backup_dir = os.path.join(staging_dir, "backup")
updated: List[str] = []
if manifest.get("has_skill"):
live = manifest["live_skill_path"]
os.makedirs(os.path.dirname(live), exist_ok=True)
_backup(live, backup_dir)
shutil.copy2(os.path.join(staging_dir, "proposed_SKILL.md"), live)
updated.append(live)
if manifest.get("has_memory"):
live = manifest["live_memory_path"]
os.makedirs(os.path.dirname(live), exist_ok=True)
_backup(live, backup_dir)
shutil.copy2(os.path.join(staging_dir, "proposed_CLAUDE.md"), live)
updated.append(live)
return updated