diff --git a/docs/public/telemetry.mdx b/docs/public/telemetry.mdx index fbd79dc04..ab8113a5c 100644 --- a/docs/public/telemetry.mdx +++ b/docs/public/telemetry.mdx @@ -109,6 +109,24 @@ One value is derived server-side rather than sent by the client: PostHog resolve | `hook_failed` | A claude-mem hook fails hard — the worker is unreachable past the fail-loud threshold, or a blocking error occurs | `hook_type`, `error_mode`, `consecutive_failures`, `threshold_tripped` | | `error_occurred` | The worker returns an HTTP 5xx | `error_category` | +## Historical backfill + +Telemetry shipped later than claude-mem itself, so installs that predate it have activity the live events never saw. On the first worker start after upgrading, claude-mem performs a **one-time** backfill of that pre-telemetry history — anonymized counts only, passed through the same whitelist scrubber as everything else: + +| Event | When (timestamp) | What it carries | +|---|---|---| +| `historical_activity` | One per day the install was active, stamped on that historical day | Daily activity counts only: observations, sessions, summaries, prompts, distinct-project **count**, observation-type buckets (`obs_type_*`), session outcomes (`session_completed_count` / `session_failed_count`), per-platform session counts (`sessions_claude_count` etc.), `subagent_obs_count`, `discovery_tokens`, plus `backfilled: true`. Profile-less (`$process_person_profile: false`), like all high-volume events | +| `install_inferred` | Once, stamped on the install's first recorded activity day | `first_active_date` (a date string, e.g. `2025-10-19`) and `backfilled: true` | + +Like everything else, these are counts and closed-set values only — **never titles, prompts, file contents, or project names**. The same anonymous install UUID identifies them, and every property passes the whitelist scrubber. + +A few things worth knowing: + +- **It runs once.** A completion marker (`backfill.json` in the claude-mem data directory) is written after a successful send and prevents the backfill from ever running again. Until a run succeeds, no marker is written, so a failed attempt simply retries on the next worker start. +- **It honors the exact same consent gates as live telemetry** — `DO_NOT_TRACK`, `CLAUDE_MEM_TELEMETRY=0`, and `enabled: false` in `telemetry.json` all block it, and [debug mode](#debug-mode) prints the would-be payload without sending. +- **Opting out before the first worker start after upgrading prevents it entirely.** Nothing is sent and no marker is written while you are opted out — though if you opt back in later, the backfill will then run. +- **Location is upload-time, not historical.** The coarse location PostHog derives at ingestion (see above) reflects where the events were uploaded from, not where you were on the historical dates they describe. + ## What is NEVER collected | Never collected | Notes | diff --git a/plans/2026-06-11-posthog-historical-backfill.md b/plans/2026-06-11-posthog-historical-backfill.md new file mode 100644 index 000000000..bfc5bc909 --- /dev/null +++ b/plans/2026-06-11-posthog-historical-backfill.md @@ -0,0 +1,502 @@ +# PostHog Historical Backfill — Anonymized Observation Metadata Import + +**Goal**: One-time, per-install backfill of anonymized daily activity rollups from the local +SQLite DB into PostHog, using PostHog's historical-migration ingestion mode, so growth +(installs over time, reconstructed WAU/MAU, cohort retention) is visible for activity that +predates telemetry shipping. + +**What gets sent** (anonymous counts/sums only — never titles, text, prompts, project names, +or any raw string column): +- One `historical_activity` event per active day per install, profile-less + (`$process_person_profile: false`), carrying ONLY rollup counters + `backfilled: true` + (no `buildBaseProperties()` — stamping the *current* version/os onto 2025-dated events + would permanently poison version-over-time charts): + `{ observation_count, session_count, summary_count, prompt_count, project_count, + discovery_tokens, obs_type_bugfix, obs_type_discovery, obs_type_decision, + obs_type_refactor, obs_type_other, session_completed_count, session_failed_count, + sessions_claude_count, sessions_codex_count, sessions_gemini_count, + sessions_other_platform_count, subagent_obs_count, backfilled: true }`. +- One `install_inferred` event at noon UTC of the install day, `person: true`, with + `first_active_date` in properties and `$set` — this single event draws the adoption curve. + +**Known caveats (accept, do not solve)**: +- Survivorship bias — only currently-installed, telemetry-consenting users backfill. The + curve shows the history of the *retained* base. +- The last ~2.5 days before each install's first post-upgrade worker start are never + backfilled (PostHog's 48h rule + whole-day windowing). Do NOT "fix" this by re-running + without the marker — duplicates are worse than the gap. +- Geo properties on backfilled events reflect location at *upload* time, not the historical + date (`disableGeoip: false` kept for consistency with live telemetry). +- Dashboards slicing by `version` must filter `backfilled != true`; combined install metrics + must dedupe `install_inferred` ∪ `install_completed(is_update=false)` per distinct_id + (telemetry-era installs emit both). +- Event-style live metrics (searches, injections, compression spend) are intentionally absent + from backfill — they are not stored locally and cannot be reconstructed. + +--- + +## Phase 0: Documentation Discovery — COMPLETE (findings consolidated below) + +All implementation phases MUST use only the APIs listed here. Citations were verified against +the working tree, the published posthog-node v5.36.15 source, and the real +`~/.claude-mem/claude-mem.db` (read-only) on 2026-06-11, then adversarially re-verified on +2026-06-12 (see Review log at the bottom). + +**Precondition**: this worktree has no `node_modules` — run `bun install` (or `npm install`) +before Phase 1 so imports and `bun test` resolve. + +### Allowed APIs + +**posthog-node ^5.36.15** (root `package.json:143`, imported in +`src/services/telemetry/telemetry.ts:1`; caret range — if the installed major ever drifts, +re-verify `historicalMigration` forwarding in `dist/`, search for `historical_migration`): +- `new PostHog(apiKey, options)` — `PostHogOptions = Omit & {...}`. + `PostHogCoreOptions` **includes** `historicalMigration?: boolean` ("Special flag to indicate + ingested data is for a historical migration", default false). It is forwarded as + `historical_migration: true` on every `/batch` request body (verified in + `@posthog/core@1.32.1` shipped source). Also valid: `host`, `flushAt`, `maxBatchSize`, + `maxQueueSize`, `disableGeoip`. +- `client.capture(msg: EventMessage)` — `EventMessage` includes `distinctId`, `event`, + `properties`, **`timestamp?: Date`** (a `Date` object, NOT an ISO string), **`uuid?: string`**. + Both are forwarded end-to-end to the payload. +- **capture() enqueues asynchronously** (a multi-microtask promise chain runs before the event + reaches the queue). A bare `await client.flush()` does NOT join those pending captures and + can resolve while events are still un-enqueued. Only `client.shutdown()` joins pending + capture promises and then loops flush until the queue is empty — **shutdown() is the only + delivery barrier**. +- `shutdown()` SWALLOWS fetch errors internally (`logFlushError`); its resolution proves + nothing about delivery. Delivery failure is observable only via the public error emitter: + `client.on('error', handler)` (`@posthog/core` `posthog-core-stateless.ts:301`). +- The in-memory queue silently drops the OLDEST event past `maxQueueSize` (default 1,000). + Multi-year installs can exceed 1,000 active days — `maxQueueSize` must be raised. +- Background flushes fire automatically when the queue reaches `flushAt`; their errors are + swallowed AND a non-network (4xx) failure removes the batch from the queue before throwing. + Therefore: set `flushAt`/`maxBatchSize`/`maxQueueSize` all to 5000 so NO background flush + ever fires and the entire backfill goes as one request at shutdown (the SDK auto-halves the + batch on HTTP 413). One request also makes a cross-restart retry byte-identical — the best + possible dedupe-key match. + +**PostHog historical-migration rules** (https://posthog.com/docs/migrate): +- Set `historicalMigration: true` so events bypass standard ingestion/billing handling. +- Event timestamps must be **at least 48 hours in the past**. Server behavior for violating + events is undocumented (rejection, billing, silent acceptance all possible) — the + client-side day window is the only guard, and it applies to **every** event including + `install_inferred`. +- "There is no way to selectively delete event data in PostHog" — idempotency is mandatory + before anything ships (deterministic `uuid` + completion marker). +- uuid dedupe (https://posthog.com/docs/data/events) is **eventual and best-effort** + (ClickHouse merge-time, not query-time) and keyed on + `(toDate(timestamp), event, distinct_id, uuid)` — retried events must carry byte-identical + timestamp+uuid+event+distinctId. The marker (not the uuid) is the PRIMARY idempotency gate; + the uuid minimizes damage in the crash-retry window. Residual accepted risk: a crash between + server-side ingest and marker write can leave transient duplicates until ClickHouse merges. + +**Existing telemetry modules to copy from** (do not reinvent): +- Consent: `resolveTelemetryConsent(process.env, loadTelemetryConfig())` — + `src/services/telemetry/consent.ts:68-73`. Install UUID: `getOrCreateInstallId()` — + `consent.ts:114-124` (random v4 UUID persisted to `telemetry.json` via `getTelemetryConfigPath()`). +- Scrub: `scrubProperties(props)` whitelist scrubber — `src/services/telemetry/scrub.ts:126-149`; + whitelist set `ALLOWED_PROPERTY_KEYS` at `scrub.ts:8-117`. **Properties not in the whitelist + are silently dropped** — new keys MUST be added there. Already present (verified): the + `obs_type_*` family (live `context_injected` ships them), `session_count`, + `observation_count`. Confirmed ABSENT (must add): `discovery_tokens`. +- Key/host: `getTelemetryApiKey()`, `getTelemetryHost()` — `src/services/telemetry/common.ts:22-28`. + Note: `getTelemetryApiKey()` falls back to the **embedded production key** and is never + falsy — every worker boot anywhere can send real data (see Phase 2/3 sequencing). + Base props: `buildBaseProperties()` — `common.ts:100-113` (returns CURRENT version, + os_version, runtime_version, locale, is_ci — historical events must NOT carry it). + Person-prop subset: `PERSON_PROPERTY_KEYS` + `buildPersonSet()` — `common.ts:36-76`. + `buildPersonSet` only copies keys PRESENT on the event's properties — a person trait that is + never assigned to the event silently never ships. +- Capture-path conventions (consent gate → scrub → debug-mode stderr print → no-key no-op → + capture; swallow all errors): `captureEvent()` — `src/services/telemetry/telemetry.ts:72-117`. +- DB-reading telemetry pattern: `collectInstallStats(db: Database)` — + `src/services/telemetry/install-stats.ts:29-99`. Per-block try/catch; typed `.get()` casts. + Keep the per-block try/catch in the rollup too: older installs may lack a table or column — + skip that block's keys, never throw. +- Epoch normalization (legacy rows store **seconds**, newer rows milliseconds — verified + against a real DB where 273 rows render as 1970 without it): + `asMs(col)` → `` `CASE WHEN ${col} < 1000000000000 THEN ${col} * 1000 ELSE ${col} END` `` — + `install-stats.ts:23-25`. `DAY_MS = 86_400_000` — `install-stats.ts:27`. Note `MIN(asMs(x))` + applies normalization INSIDE the MIN (correct); `asMs(MIN(x))` would not be. +- Deterministic UUID: **`Bun.randomUUIDv5(name, namespace)`** — exists and is deterministic on + the worker's embedded Bun 1.3.9 (verified by execution). Use it with one fixed namespace + UUID constant in the module. The npm `uuid` package is not installed and must not be added. +- Discovery-token storage semantics (load-bearing for aggregation): + `src/services/sqlite/SessionStore.ts:1901-2015` — ONE `discoveryTokens` value per + compression turn is written identically to EVERY observation row of that turn (line 1962) + AND to the turn's `session_summaries` row (line 2004). Summing across observations + multi-counts by the obs-per-turn factor. **Sum `session_summaries.discovery_tokens` only.** + +**Database access** (`bun:sqlite`, synchronous): +- `import { Database } from 'bun:sqlite'` — pattern at `src/services/sqlite/SessionStore.ts:1-2`. +- Query: `db.query(sql).get(...)` / `.all(...)` with `as` type casts — + e.g. `src/services/worker-service.ts:508-513`, `install-stats.ts:36-50`. +- Relevant columns: base schema in `src/services/sqlite/migrations/runner.ts:43-112` — + `observations(project, created_at_epoch, memory_session_id, type, agent_type)`, + `sdk_sessions(project, started_at_epoch, memory_session_id, status, platform_source)`, + `session_summaries(project, created_at_epoch)`, `user_prompts(created_at_epoch)` + (counts only — NEVER select `prompt_text`). **`discovery_tokens` is NOT in 43-112**: it is + added by `ensureDiscoveryTokensColumn` at `runner.ts:381-402` + (`ALTER TABLE ... ADD COLUMN discovery_tokens INTEGER DEFAULT 0` on both observations and + session_summaries). Query only columns created in runner.ts — the dev DB has extra columns + other installs lack. +- Observation `type` buckets: use the closed `STAT_TYPE_BUCKETS` set from + `src/services/context/ContextBuilder.ts` (bugfix/discovery/decision/refactor/other) so the + backfill vocabulary is identical to live `context_injected`. +- `platform_source` is a user-influenceable string — bucket in JS to the closed enum + {claude, codex, gemini, other}; never ship the raw value. + +**Worker integration points** (`src/services/worker-service.ts`): +- `worker_started` capture: lines 532-540 (inside `initializeBackground()`). +- Fire-and-forget background-task precedent to copy: + `ChromaSync.backfillAllProjects(...)` `.then/.catch` block at lines 551-556. +- Daily heartbeat precedent: `setInterval(..., 24*60*60*1000)` + `.unref?.()` at lines 544-547. +- Shutdown order: `worker_stopped` capture BEFORE `shutdownTelemetry()` (lines 699-703). + +**State files in `~/.claude-mem`**: +- Path root: `DATA_DIR` / `paths.dataDir()` — `src/shared/paths.ts:40, 129-151`. +- Read with `readJsonSafe(path, fallback)` from `src/utils/json-utils.js` (as + `consent.ts:5` imports it). **There is no JSON-write helper** — write with + `mkdirSync` + `writeFileSync` exactly as `saveTelemetryConfig` does at `consent.ts:103-107`. +- The marker is its own file `backfill.json` — do NOT merge it into `telemetry.json` + (the consent save path would clobber it). + +**Logger** (console.* is forbidden in services — enforced by +`scripts/check-hook-io-discipline.cjs` / `tests/logger-usage-standards.test.ts`): +- `import { logger } from '../utils/logger.js'`; `logger.info('SYSTEM', msg, ctx, err?)`. + `'TELEMETRY'` is NOT in the `Component` union (`src/utils/logger.ts:15-52`) — use `'SYSTEM'`. + +**Tests** (`bun test`; global posthog-node mock): +- Global mock of `posthog-node` in `tests/preload.ts:45-55` records + `postHogConstructorCalls` / `postHogCaptureCalls` — extend, don't replace. The mock class + has NO `flush()`, `shutdown()` race-compatible signature, or `on()` — add no-op + `flush()`/`shutdown()`/`on()` to it. +- State reset: `__resetTelemetryForTests()` — `src/services/telemetry/telemetry.ts:125-129`. +- Env/temp-dir isolation pattern: `tests/telemetry/telemetry-client.test.ts:30-53` + (`CLAUDE_MEM_DATA_DIR = mkdtempSync(...)`). +- In-memory DB schema pattern: `tests/telemetry/install-stats.test.ts:8-28` — but its `makeDb` + schema LACKS `discovery_tokens`, `memory_session_id`, `type`, `agent_type`, `status`, + `platform_source`, and the `user_prompts` table. Extend the test schema with every column + the rollup queries touch before writing tests, or the queries throw `no such column` (and + the per-block try/catch would mask it as silently-empty rollups). + +### Anti-patterns (DO NOT) + +- ❌ `historical_migration` (snake_case) as a posthog-node constructor option — the SDK option + is **`historicalMigration`** (camelCase). Snake_case is only for the raw `/batch` HTTP API. +- ❌ Passing `timestamp` as an ISO string to `client.capture()` — `EventMessage.timestamp` is + typed `Date`. Construct `new Date(...)`. +- ❌ Reusing the live singleton client in `telemetry.ts` for backfill — it lacks + `historicalMigration` and its `isShutdown` latch must stay untouched. Build a dedicated, + short-lived client. +- ❌ Sending properties without adding them to `ALLOWED_PROPERTY_KEYS` — the scrubber drops + them silently and the backfill would ship empty events. +- ❌ Raw `created_at_epoch` / `started_at_epoch` math without `asMs()` — legacy second-unit + rows land in 1970 and poison the earliest cohort. Additionally apply the project-epoch + floor (below): corrupt epochs can also land on plausible-looking wrong days, not just 1970. +- ❌ Filtering rollup rows by raw epoch against a cutoff instant — that ships a permanently + truncated PARTIAL day. Always include whole UTC day buckets only (window rule in Phase 1). +- ❌ Summing `discovery_tokens` across observations AND session_summaries — the same per-turn + value is stored on every row of the turn (see Phase 0); sum summaries only. +- ❌ `buildBaseProperties()` on `historical_activity` events — current version/os on + 2025-dated events is permanently wrong version-over-time data. +- ❌ The npm `uuid` package — use `Bun.randomUUIDv5` (verified present and deterministic). +- ❌ `console.*` anywhere in `src/services/` — use `logger` (debug-mode payload printing is the + one exception and must use `process.stderr.write`, copying `telemetry.ts:97-103`). +- ❌ Writing the completion marker before delivery is confirmed — and "delivery confirmed" + means `await client.shutdown()` resolved AND zero `client.on('error')` events, NOT a bare + `flush()` (see Phase 0 SDK facts). A crash mid-send must retry on next startup. +- ❌ Booting the worker on the dev machine during Phases 1–3 without + `CLAUDE_MEM_TELEMETRY=0` (or `CLAUDE_MEM_TELEMETRY_DEBUG=1`) exported — the embedded + production key + default-on consent means a casual `npm run build-and-sync` ships the dev + machine's entire history to production PostHog before the dry-run gate. + +--- + +## Phase 1: Backfill module + tests (rollups, events, transport, marker) + +**Create `src/services/telemetry/backfill.ts`** (one module, one test file). + +### 1.1 Day window (one rule, used everywhere) + +- `lastFullDay = utcDayString(nowMs - 60 * 3_600_000)` — 60h = 48h (PostHog contract) + 12h + (noon-UTC event timestamps). Noon of any included day is then guaranteed ≥48h old. +- `PROJECT_EPOCH_FLOOR = Date.parse('2024-01-01T00:00:00Z')` — predates claude-mem's first + release; rows with normalized epoch below it are corrupt and ignored everywhere (rollups + AND first-activity MIN). +- `installDay = utcDayString(firstActivityEpochMs)` (1.3). Include only whole UTC days where + `installDay <= day <= lastFullDay`, comparing **day strings** (YYYY-MM-DD compares + lexicographically) — never raw epochs, so no partial days can ship. The lower bound discards + backdated artifact rows (verified on the reference DB: obs ids 66888/66889 carry a + 2025-08-12 epoch but belong to a session started 2026-04-10). + +### 1.2 `collectDailyRollups(db: Database, lastFullDay: string, installDay: string): DailyRollup[]` + +Copy the query style of `collectInstallStats` (per-block try/catch — a missing table/column +skips that block's keys), bucketing every table by +`date(/1000,'unixepoch') AS day`, merged in a `Map`. Pinned +semantics — these exact aggregations, so any two implementers ship identical numbers: + +| key | source (per day) | +|---|---| +| `observation_count` | `COUNT(*)` FROM observations | +| `obs_type_bugfix/discovery/decision/refactor/other` | observations `GROUP BY day, type`, bucketed via `STAT_TYPE_BUCKETS` | +| `subagent_obs_count` | `COUNT(*)` FROM observations WHERE `agent_type IS NOT NULL` | +| `session_count` | `COUNT(*)` FROM sdk_sessions **only** (do NOT add observations' distinct memory_session_id — same sessions, double count) | +| `session_completed_count` / `session_failed_count` | sdk_sessions `GROUP BY day, status` (closed enum) | +| `sessions_claude_count` / `sessions_codex_count` / `sessions_gemini_count` / `sessions_other_platform_count` | sdk_sessions `platform_source`, bucketed in JS to the closed enum | +| `summary_count` | `COUNT(*)` FROM session_summaries | +| `discovery_tokens` | `SUM(discovery_tokens)` FROM **session_summaries only** (per-turn cost — see Phase 0 storage semantics) | +| `prompt_count` | `COUNT(*)` FROM user_prompts (count only — never `prompt_text`) | +| `project_count` | `COUNT(DISTINCT project)` over `(SELECT day, project FROM observations UNION SELECT day, project FROM sdk_sessions)` — cross-table distinct in ONE query; never sum per-table distincts (triple-counts the same project) | + +Omitted on purpose: session durations (verified dirty — 22% of completed sessions show >24h +stale spans), dev-only columns not in runner.ts, anything string-valued. + +### 1.3 `findFirstActivityEpochMs(db: Database): number | null` + +`SELECT MIN(asMs(started_at_epoch)) FROM sdk_sessions WHERE asMs(started_at_epoch) >= FLOOR` +(copy `install-stats.ts:56-61` — note asMs INSIDE the MIN), falling back to the observations +MIN **only if sdk_sessions is empty**. Keep sessions-first: session timestamps are write-time +and trustworthy, while observation epochs can be backdated artifacts (verified — see 1.1); +a cross-table MIN would bake an artifact date into undeletable data. + +### 1.4 `deterministicEventUuid(installId, event, day): string` + +`Bun.randomUUIDv5(`${installId}|${event}|${day}`, BACKFILL_NAMESPACE)` where +`BACKFILL_NAMESPACE` is one fixed UUID constant in the module. One line; deterministic; +no hand-rolled hashing. + +### 1.5 `buildBackfillEvents(db, installId, nowMs): EventMessage-like[]` + +Pure assembly: +- Each rollup → `historical_activity` with `timestamp: new Date(day + 'T12:00:00Z')` and the + deterministic uuid. Noon UTC is load-bearing TWICE: it keeps the event inside its day for + dashboard timezones in UTC-12..+11 (keep the PostHog project timezone on UTC), and it makes + the timestamp retry-stable, which the dedupe key requires — do not "simplify" to a + non-deterministic timestamp. Properties: the rollup counters + `backfilled: true`, passed + through `scrubProperties(...)`, then `$process_person_profile: false`. + **No `buildBaseProperties()`** (see anti-patterns). +- One `install_inferred` with `timestamp: new Date(installDay + 'T12:00:00Z')` (noon UTC — + retry-stable even if the sessions/observations MIN source flips between runs) and uuid keyed + on day `'install'`. Properties: `{ ...buildBaseProperties(), first_active_date: installDay, + backfilled: true }` through `scrubProperties`, then `$set = buildPersonSet(props)` + (copy `telemetry.ts:91-95`). `first_active_date` MUST be assigned here — `buildPersonSet` + only copies keys present on the event, and a whitelisted-but-never-assigned trait silently + never ships. Base props are fine on this one person-event ($set = current person state). +- If `installDay > lastFullDay` (install younger than ~60h): return `[]`. Such installs have + live telemetry coverage for their entire life — there is no pre-telemetry history to + reconstruct, and shipping a <48h timestamp violates the migration contract. + +### 1.6 Whitelist additions + +Add to `ALLOWED_PROPERTY_KEYS` (`scrub.ts:8-117`) — verify each against the file first: +`discovery_tokens` (confirmed absent), `summary_count`, `prompt_count`, `project_count`, +`backfilled`, `first_active_date`, `session_completed_count`, `session_failed_count`, +`sessions_claude_count`, `sessions_codex_count`, `sessions_gemini_count`, +`sessions_other_platform_count`, `subagent_obs_count`. +Already whitelisted — reuse, do not re-add: `observation_count`, `session_count`, +the `obs_type_*` family. (`session_count`/`observation_count` carry different semantics on +live `context_injected` — acceptable: PostHog properties are filtered per-event in practice; +note the collision in the PR description.) +Add `first_active_date` to `PERSON_PROPERTY_KEYS` (`common.ts:36-60`). + +### 1.7 Marker + transport: `runHistoricalBackfill(db: Database): Promise` + +Marker file `~/.claude-mem/backfill.json` at `join(dataDir, 'backfill.json')` (mirror +`getTelemetryConfigPath()`, `consent.ts:76-78`). Shape: +`{ completedAt: string, throughDay: string, eventCount: number, installId: string }`. +Read with `readJsonSafe`; write with `mkdirSync` + `writeFileSync` (as +`saveTelemetryConfig`, `consent.ts:103-107`). + +Gate sequence (ORDER MATTERS — debug must precede every marker write): +1. Marker exists → return (idempotency gate #1). +2. `resolveTelemetryConsent(process.env, loadTelemetryConfig())` false → return + **without writing the marker** (a later opt-in still backfills). +3. Build events via 1.5. +4. `CLAUDE_MEM_TELEMETRY_DEBUG === '1'` → `process.stderr.write` one summary line + (event count + day range) then one line per event (copy `telemetry.ts:97-103`), do NOT + send, do NOT write marker — even when the event list is empty. This dry-run intentionally + re-runs on every debug-mode worker start; the marker must never latch from debug mode. + (Only the exact value `'1'` activates it, matching `captureEvent`.) +5. Zero events → write marker, return. (Fresh installs land here: nothing pre-telemetry + exists and live `install_completed`/daily events cover them from day 0 — `install_inferred` + is intentionally NOT emitted for them.) +6. `!getTelemetryApiKey()` → return without marker. (Vestigial — the embedded key makes this + unreachable; keep the one-liner for symmetry with `captureEvent`, write no test for it.) +7. Dedicated client: + `new PostHog(getTelemetryApiKey(), { host: getTelemetryHost(), historicalMigration: true, + flushAt: 5000, maxBatchSize: 5000, maxQueueSize: 5000, disableGeoip: false })` — the 5000s + guarantee a single batch, no swallowed background flushes, and no silent queue-cap drops + (see Phase 0 SDK facts). +8. `const errors: unknown[] = []; client.on('error', e => errors.push(e))`, then + `client.capture({ distinctId: getOrCreateInstallId(), event, properties, timestamp, uuid })` + per event, then `await client.shutdown()` — NO separate `flush()` call (it is not a + delivery barrier) and NO 3s race (this runs fire-and-forget in the background; the SDK's + default shutdown timeout is fine). +9. Write the marker ONLY if shutdown resolved AND `errors.length === 0`. Wrap the whole body + in try/catch that logs via `logger` (`'SYSTEM'`) and never throws (telemetry must never + break the worker — `telemetry.ts:114-116`). + +**Verification checklist**: +- [ ] New test `tests/telemetry/backfill.test.ts` using the `:memory:` pattern from + `tests/telemetry/install-stats.test.ts:8-28` with the schema EXTENDED per Phase 0 + (discovery_tokens, memory_session_id, type, agent_type, status, platform_source, + user_prompts table). Cases: + (a) mixed second/ms epochs land on the correct day (insert `created_at_epoch = + 1755000000` seconds; assert no 1970 day); + (b) day window: a row 47h old is excluded AND no partial day ever ships (whole-day + buckets only); a row before `PROJECT_EPOCH_FLOOR` or before `installDay` produces no day; + (c) `deterministicEventUuid` stable across calls, general UUID shape (it is a v5 — do + not assert a v4 version nibble); + (d) every property in built events survives `scrubProperties` (no silent drops); + (e) empty DB → zero events, no throw; + (f) discovery_tokens dedupe: one turn = 3 observations + 1 summary all carrying 100 → + day total **100**, not 400; + (g) one session + one observation in it, same day → `session_count` 1 and + `project_count` 1 (no double counting); + (h) `install_inferred` uses the sessions MIN, is stamped noon UTC of `installDay`, and + its `$set` contains `first_active_date`; + (i) first activity 1h ago → zero events; + (j) consent-off → zero `postHogCaptureCalls`, no marker; marker present → zero calls; + debug mode → zero calls, no marker **including on an empty DB**; second invocation after + success → zero calls; + (k) happy path → constructor received `historicalMigration: true`, every capture has + `uuid` + `Date` timestamp, marker written with correct `throughDay`; an `error` emitted + via the mock's `on` handler → NO marker. +- [ ] `bun test tests/telemetry/` passes. + +**Anti-pattern guards**: no marker write reachable before the debug gate; sum +summaries-only for discovery_tokens; day-string windowing only; no `buildBaseProperties` on +`historical_activity`; no `console.*`; no raw epoch math without `asMs`. + +--- + +## Phase 2: Live dry-run against the real DB — THE SHIP/NO-SHIP GATE + +This phase MUST complete before Phase 3 wires anything into the worker, because PostHog data +cannot be selectively deleted and `getTelemetryApiKey()` always returns the embedded +production key. + +1. Run `runHistoricalBackfill` against the real `~/.claude-mem/claude-mem.db` with + `CLAUDE_MEM_TELEMETRY_DEBUG=1` (small runner script or one-off test), and eyeball the + stderr payload dump: + - First day = **2025-10-19** for this machine (NOT Aug 2025 — the two 2025-08-12 rows are + verified backdated artifacts and must be absent thanks to the installDay clamp). + - No day newer than `lastFullDay` (≈ T-2.5 days), **no 1970 days**, no day before + `install_inferred`'s `first_active_date`. + - Plausible counts (~215 active days, ~93k total observations on this machine). + - Exactly one `install_inferred`, `first_active_date: 2025-10-19`, noon-UTC timestamp. + - No string property that looks like user content; every property is a number, boolean, or + the single `first_active_date` date string. +2. Repeat runs are expected and safe (debug mode never writes the marker). If a non-debug test + run ever latches the marker locally, the reset is `rm ~/.claude-mem/backfill.json`. + +--- + +## Phase 3: Worker wiring + telemetry docs disclosure + +**Export `CLAUDE_MEM_TELEMETRY=0` (or `CLAUDE_MEM_TELEMETRY_DEBUG=1`) in the dev shell for +every worker boot in this phase** — `npm run build-and-sync` restarts the worker, and an +unguarded boot performs the real production send from the dev machine. + +1. **Wire into startup**: in `src/services/worker-service.ts`, immediately after the + `worker_started` capture (lines 532-540) and alongside the ChromaSync fire-and-forget + precedent (lines 551-556), add: + + ```typescript + runHistoricalBackfill(this.dbManager.getConnection()).catch(error => { + logger.error('SYSTEM', 'Telemetry historical backfill failed (non-blocking)', {}, error as Error); + }); + ``` + + Non-blocking, after core init. Do not add it to the heartbeat — it is one-shot by marker + (a failed run retries on the NEXT worker start because no marker was written). + +2. **Docs**: update `docs/public/telemetry.mdx` — add a short "Historical backfill" section: + what is sent (daily anonymous counts + inferred install date), that it runs once, that it + honors the same consent gates (`DO_NOT_TRACK`, `CLAUDE_MEM_TELEMETRY=0`, `telemetry.json`), + that opting out before first worker start after upgrade prevents it entirely, and that geo + properties on backfilled events reflect upload-time location. Follow the existing page's + tone/structure (read it first). + +**Verification checklist**: +- [ ] `bun test` (full suite) passes — especially `tests/logger-usage-standards.test.ts`. +- [ ] `grep -rn "runHistoricalBackfill" src/` shows exactly two hits: definition + the one + worker-service call site. +- [ ] Worker boots clean **with telemetry disabled in the shell**: `CLAUDE_MEM_TELEMETRY=0 + npm run build-and-sync`, then confirm via worker log (`~/.claude-mem/logs/`) that + startup completes and no backfill error is logged. + +**Anti-pattern guards**: do not block `initializeBackground()` on the backfill promise; do not +capture `worker_stopped`-style events from inside backfill; no unguarded dev-machine boots. + +--- + +## Phase 4: Final Verification + +1. **Anti-pattern greps** (all must return nothing): + - `grep -rn "historical_migration" src/` (wrong spelling for SDK path) + - `grep -rn "console\." src/services/telemetry/backfill.ts` + - `grep -rn "from 'uuid'" src/` + - `grep -n "buildBaseProperties" src/services/telemetry/backfill.ts` → must appear ONLY in + the install_inferred builder, never for historical_activity. +2. **Whitelist proof**: test asserting `scrubProperties(buildBackfillEvents(...)[i].properties)` + retains every expected key on both event types. +3. **Full suite**: `bun test` green; `npm run build-and-sync` (telemetry-guarded shell) + succeeds; worker starts. +4. **Re-run the Phase 2 dry-run** one final time on the release build (the marker is still + absent on the dev machine if Phase 3 boots were guarded — if not, `rm + ~/.claude-mem/backfill.json` first). +5. **Post-ship validation (manual, in PostHog UI after release)**: + - BEFORE trusting reconstructed WAU: build a unique-users trend mixing one person event + (`worker_started`) and one profile-less event (`session_compressed`) for a known + installId and confirm it counts 1 user — this validates that profile-less + `historical_activity` and person `install_inferred` merge as one unique user. + - Adoption curve = `install_inferred` ∪ `install_completed(is_update=false)`, deduped per + distinct_id (telemetry-era installs emit both — document on the dashboard). + - Trend on unique `historical_activity` users by week = reconstructed WAU. + - Annotate dashboards: survivorship bias; `version` slicing must filter + `backfilled != true`; geo on backfilled events is upload-time. + +--- + +## Phase ordering & session boundaries + +Phase 1 (module + tests) → Phase 2 (live dry-run gate, BEFORE any wiring) → Phase 3 +(wiring + docs, telemetry-guarded) → Phase 4 (verification + post-ship). An executor starting +any phase cold should read this file's Phase 0 plus the cited source files for that phase +before writing code. Run `bun install` first — the worktree has no `node_modules`. + +--- + +## Review log (2026-06-12) + +Adversarially reviewed by a 55-agent workflow (5 dimensions × independent skeptic +verification; 26 findings confirmed, 6 downgraded, 1 disputed, 0 refuted). Material changes +vs. the first draft: +- **Pinned rollup semantics** (session_count from sdk_sessions only; project_count via UNION + distinct; discovery_tokens from summaries only — was multi-counted ~Nx per obs-per-turn). +- **Day window redefined** (whole-day buckets ≤ `utcDay(now-60h)`; was an ambiguous row-level + 48h filter that could ship truncated days and <48h timestamps). +- **install_inferred**: kept sessions-first MIN (a proposed cross-table MIN would have baked + two verified backdated artifact rows — 2025-08-12 epochs written by a 2026-04-10 session — + into undeletable data); added installDay clamp on rollups, 60h skip rule, noon-UTC + retry-stable timestamp, and explicit `first_active_date` assignment (was whitelisted but + never attached to any event). +- **Marker gating rebuilt on real SDK semantics**: bare `flush()` is not a delivery barrier + and `shutdown()` swallows errors — single-batch config + `on('error')` latch + marker only + on clean shutdown. Dropped the 3s race and the dead "flush every 5,000" advice. +- **Phase order fixed**: the live dry-run now precedes worker wiring (the embedded prod key + + default-on consent meant the old Phase 3 "boot the worker" step performed the real + irreversible send before the old Phase 4 dry-run, whose marker then made that dry-run a + silent no-op). +- **More usage data, same privacy posture**: added prompt_count, obs_type_* breakdown + (whitelist keys already exist from live telemetry), session outcome counts, platform + buckets, subagent count; project_count now includes session-only days. +- **Simplifications**: Bun.randomUUIDv5 one-liner replaces hand-rolled sha256 nibble-forcing + (the "invented API" anti-pattern claim was false — verified on Bun 1.3.9); deleted dead + whitelist keys (backfill_days, backfill_events); old Phases 1+2 merged; debug gate moved + ahead of all marker writes; corrected citations (discovery_tokens lives in a migration at + runner.ts:381-402, not the base schema; there is no JSON-write helper). diff --git a/src/services/telemetry/backfill.ts b/src/services/telemetry/backfill.ts new file mode 100644 index 000000000..0942cbd31 --- /dev/null +++ b/src/services/telemetry/backfill.ts @@ -0,0 +1,563 @@ +import { join } from 'path'; +import { mkdirSync, writeFileSync } from 'fs'; +import { PostHog } from 'posthog-node'; +import type { Database } from 'bun:sqlite'; +import { resolveDataDir } from '../../shared/paths.js'; +import { readJsonSafe } from '../../utils/json-utils.js'; +import { logger } from '../../utils/logger.js'; +import { + resolveTelemetryConsent, + loadTelemetryConfig, + getOrCreateInstallId, +} from './consent.js'; +import { scrubProperties } from './scrub.js'; +import { + getTelemetryApiKey, + getTelemetryHost, + buildBaseProperties, + buildPersonSet, +} from './common.js'; + +/** + * One-time historical backfill of anonymized daily activity rollups into + * PostHog (historical-migration ingestion mode), so growth metrics cover + * activity that predates telemetry shipping. + * + * What ships (counts/sums only — never titles, text, prompts, project names, + * or any raw string column): + * - one profile-less `historical_activity` event per active UTC day, and + * - one `install_inferred` person event at noon UTC of the inferred + * install day. + * + * Idempotency: a completion marker (~/.claude-mem/backfill.json) is the + * primary gate; deterministic per-event UUIDs minimize damage in the + * crash-retry window (PostHog dedupe is best-effort, merge-time). + */ + +/** + * PostHog's historical-migration contract requires event timestamps at least + * 48 hours in the past. Events are stamped at noon UTC of their day, so the + * newest includable day is the UTC day of (now - 60h): 48h contract + 12h + * noon offset. Noon of any included day is then guaranteed >= 48h old. + */ +const BACKFILL_LAG_MS = 60 * 3_600_000; + +/** + * Predates claude-mem's first release. Rows whose normalized epoch falls + * below this are corrupt (e.g. backdated artifacts) and are ignored + * everywhere — rollups AND the first-activity MIN. + */ +export const PROJECT_EPOCH_FLOOR = Date.parse('2024-01-01T00:00:00Z'); + +/** + * Fixed namespace for deterministic (UUIDv5) backfill event ids. Never change + * this value: retried events must carry byte-identical uuids for PostHog's + * dedupe key to match. + */ +const BACKFILL_NAMESPACE = '8a9c2f4e-31b7-5d68-9c4a-f02e6d5b8a17'; + +const BACKFILL_MARKER_FILENAME = 'backfill.json'; + +/** + * Mirror of the private STAT_TYPE_BUCKETS set in + * src/services/context/ContextBuilder.ts — the closed observation-type + * vocabulary live `context_injected` events use. Everything else buckets to + * 'other' so the backfill vocabulary is identical to live telemetry. + */ +const STAT_TYPE_BUCKETS = new Set(['bugfix', 'discovery', 'decision', 'refactor']); + +/** + * Epoch columns hold mixed units historically: a few hundred legacy rows were + * written in seconds, everything since in milliseconds. Normalize to ms in + * SQL before any date math (same rule as install-stats.ts — and note it must + * be applied INSIDE aggregate functions like MIN, never outside). + */ +function asMs(col: string): string { + return `CASE WHEN ${col} < 1000000000000 THEN ${col} * 1000 ELSE ${col} END`; +} + +/** YYYY-MM-DD (UTC) for an epoch-milliseconds instant. */ +export function utcDayString(epochMs: number): string { + return new Date(epochMs).toISOString().slice(0, 10); +} + +export interface DailyRollup { + day: string; + counters: Record; +} + +export interface BackfillEvent { + event: string; + properties: Record; + timestamp: Date; + uuid: string; +} + +interface BackfillMarker { + completedAt: string; + throughDay: string; + eventCount: number; + installId: string; +} + +function getBackfillMarkerPath(): string { + return join(resolveDataDir(), BACKFILL_MARKER_FILENAME); +} + +/** + * True when a completion marker exists. A corrupt marker file still counts as + * complete: a marker was written at some point, and duplicate sends are worse + * than a gap (PostHog data cannot be selectively deleted). + */ +function isBackfillComplete(): boolean { + try { + return readJsonSafe | null>(getBackfillMarkerPath(), null) !== null; + } catch { + return true; + } +} + +function writeBackfillMarker(marker: BackfillMarker): void { + const dataDir = resolveDataDir(); + mkdirSync(dataDir, { recursive: true }); + writeFileSync(getBackfillMarkerPath(), JSON.stringify(marker, null, 2) + '\n'); +} + +/** + * Per-day anonymous activity rollups, bucketed by UTC day. Only whole UTC + * days inside `installDay <= day <= lastFullDay` are included, comparing + * day strings (YYYY-MM-DD compares lexicographically) — never raw epochs, so + * no partial day can ever ship. Rows below PROJECT_EPOCH_FLOOR are ignored. + * + * Each query block is independently best-effort (a missing table/column on an + * older install skips that block's keys, never throws) — same pattern as + * collectInstallStats. + */ +export function collectDailyRollups( + db: Database, + lastFullDay: string, + installDay: string +): DailyRollup[] { + const byDay = new Map>(); + + const add = (day: string | null | undefined, key: string, value: number): void => { + if (!day) return; + let counters = byDay.get(day); + if (!counters) { + counters = {}; + byDay.set(day, counters); + } + counters[key] = (counters[key] ?? 0) + value; + }; + + /** Shared per-table SQL fragments: day bucket + window/floor filter. */ + const frag = (epochCol: string): { day: string; where: string } => { + const ms = asMs(epochCol); + const day = `date((${ms})/1000, 'unixepoch')`; + return { + day, + where: `${ms} >= ?1 AND ${day} >= ?2 AND ${day} <= ?3`, + }; + }; + const params = [PROJECT_EPOCH_FLOOR, installDay, lastFullDay] as const; + + // observation_count + try { + const f = frag('created_at_epoch'); + const rows = db + .query( + `SELECT ${f.day} AS day, COUNT(*) AS c FROM observations WHERE ${f.where} GROUP BY day` + ) + .all(...params) as Array<{ day: string; c: number }>; + for (const row of rows) add(row.day, 'observation_count', row.c); + } catch { + // Table not created yet on this install — skip this block's keys. + } + + // obs_type_* — closed vocabulary via STAT_TYPE_BUCKETS, zero-filled for any + // day that has observations (matches live context_injected event shape). + try { + const f = frag('created_at_epoch'); + const rows = db + .query( + `SELECT ${f.day} AS day, type, COUNT(*) AS c FROM observations WHERE ${f.where} GROUP BY day, type` + ) + .all(...params) as Array<{ day: string; type: string | null; c: number }>; + for (const row of rows) { + for (const bucket of ['bugfix', 'discovery', 'decision', 'refactor', 'other']) { + add(row.day, `obs_type_${bucket}`, 0); + } + const bucket = row.type && STAT_TYPE_BUCKETS.has(row.type) ? row.type : 'other'; + add(row.day, `obs_type_${bucket}`, row.c); + } + } catch { + // Missing table/column — skip. + } + + // subagent_obs_count + try { + const f = frag('created_at_epoch'); + const rows = db + .query( + `SELECT ${f.day} AS day, COUNT(*) AS c FROM observations WHERE ${f.where} AND agent_type IS NOT NULL GROUP BY day` + ) + .all(...params) as Array<{ day: string; c: number }>; + for (const row of rows) add(row.day, 'subagent_obs_count', row.c); + } catch { + // agent_type arrives via migration — older installs skip this key. + } + + // session_count — sdk_sessions ONLY (observations' memory_session_id covers + // the same sessions; adding it would double count). + try { + const f = frag('started_at_epoch'); + const rows = db + .query( + `SELECT ${f.day} AS day, COUNT(*) AS c FROM sdk_sessions WHERE ${f.where} GROUP BY day` + ) + .all(...params) as Array<{ day: string; c: number }>; + for (const row of rows) add(row.day, 'session_count', row.c); + } catch { + // No sessions table yet. + } + + // session_completed_count / session_failed_count — closed status enum; + // 'active' rows are counted by session_count only. + try { + const f = frag('started_at_epoch'); + const rows = db + .query( + `SELECT ${f.day} AS day, status, COUNT(*) AS c FROM sdk_sessions WHERE ${f.where} GROUP BY day, status` + ) + .all(...params) as Array<{ day: string; status: string | null; c: number }>; + for (const row of rows) { + if (row.status === 'completed') add(row.day, 'session_completed_count', row.c); + else if (row.status === 'failed') add(row.day, 'session_failed_count', row.c); + } + } catch { + // Missing table/column — skip. + } + + // sessions_{claude,codex,gemini,other_platform}_count — platform_source is + // user-influenceable; bucket in JS to the closed enum, never ship raw. + try { + const f = frag('started_at_epoch'); + const rows = db + .query( + `SELECT ${f.day} AS day, platform_source, COUNT(*) AS c FROM sdk_sessions WHERE ${f.where} GROUP BY day, platform_source` + ) + .all(...params) as Array<{ day: string; platform_source: string | null; c: number }>; + for (const row of rows) { + const platform = + row.platform_source === 'claude' || row.platform_source === 'codex' || row.platform_source === 'gemini' + ? row.platform_source + : 'other_platform'; + add(row.day, `sessions_${platform}_count`, row.c); + } + } catch { + // platform_source arrives via migration — skip. + } + + // summary_count + try { + const f = frag('created_at_epoch'); + const rows = db + .query( + `SELECT ${f.day} AS day, COUNT(*) AS c FROM session_summaries WHERE ${f.where} GROUP BY day` + ) + .all(...params) as Array<{ day: string; c: number }>; + for (const row of rows) add(row.day, 'summary_count', row.c); + } catch { + // No summaries table yet. + } + + // discovery_tokens — session_summaries ONLY. The same per-turn value is + // written to every observation row of the turn AND the turn's summary row; + // summing across observations multi-counts by the obs-per-turn factor. + try { + const f = frag('created_at_epoch'); + const rows = db + .query( + `SELECT ${f.day} AS day, COALESCE(SUM(discovery_tokens), 0) AS total FROM session_summaries WHERE ${f.where} GROUP BY day` + ) + .all(...params) as Array<{ day: string; total: number }>; + for (const row of rows) add(row.day, 'discovery_tokens', row.total); + } catch { + // discovery_tokens arrives via migration — skip. + } + + // prompt_count — COUNT only; prompt_text is never selected. + try { + const f = frag('created_at_epoch'); + const rows = db + .query( + `SELECT ${f.day} AS day, COUNT(*) AS c FROM user_prompts WHERE ${f.where} GROUP BY day` + ) + .all(...params) as Array<{ day: string; c: number }>; + for (const row of rows) add(row.day, 'prompt_count', row.c); + } catch { + // No user_prompts table yet. + } + + // project_count — cross-table distinct in ONE query (UNION dedupes the + // same project appearing in both tables on the same day; summing per-table + // distincts would multi-count). + try { + const fo = frag('created_at_epoch'); + const fs = frag('started_at_epoch'); + const rows = db + .query( + `SELECT day, COUNT(DISTINCT project) AS c FROM ( + SELECT ${fo.day} AS day, project FROM observations WHERE ${fo.where} + UNION + SELECT ${fs.day} AS day, project FROM sdk_sessions WHERE ${fs.where} + ) GROUP BY day` + ) + .all(...params) as Array<{ day: string; c: number }>; + for (const row of rows) add(row.day, 'project_count', row.c); + } catch { + // Either table missing — skip. + } + + return Array.from(byDay.entries()) + .sort(([a], [b]) => (a < b ? -1 : a > b ? 1 : 0)) + .map(([day, counters]) => ({ day, counters })); +} + +/** + * Earliest trustworthy activity epoch (ms). Sessions-first: session start + * timestamps are write-time and trustworthy, while observation epochs can be + * backdated artifacts. The observations MIN is consulted only when + * sdk_sessions has no usable rows at all. + */ +export function findFirstActivityEpochMs(db: Database): number | null { + try { + const ms = asMs('started_at_epoch'); + const row = db + .query(`SELECT MIN(${ms}) AS epoch FROM sdk_sessions WHERE ${ms} >= ?1`) + .get(PROJECT_EPOCH_FLOOR) as { epoch: number | null } | null; + if (row?.epoch) return row.epoch; + } catch { + // No sessions table yet — fall through to observations. + } + + try { + const ms = asMs('created_at_epoch'); + const row = db + .query(`SELECT MIN(${ms}) AS epoch FROM observations WHERE ${ms} >= ?1`) + .get(PROJECT_EPOCH_FLOOR) as { epoch: number | null } | null; + if (row?.epoch) return row.epoch; + } catch { + // No observations table either. + } + + return null; +} + +/** + * Deterministic (UUIDv5) event id so a crash-window retry carries a + * byte-identical uuid — PostHog's dedupe key is + * (toDate(timestamp), event, distinct_id, uuid). + */ +export function deterministicEventUuid(installId: string, event: string, day: string): string { + return Bun.randomUUIDv5(`${installId}|${event}|${day}`, BACKFILL_NAMESPACE); +} + +/** + * Pure assembly of the full backfill payload: + * - one `historical_activity` per active day — rollup counters + + * backfilled:true, scrubbed, profile-less. NO buildBaseProperties(): + * stamping the CURRENT version/os onto historical days would permanently + * poison version-over-time charts. + * - one `install_inferred` at noon UTC of the install day — base props + + * first_active_date, scrubbed, with $set person traits ($set = current + * person state, so base props are correct here). + * + * Noon UTC is load-bearing twice: it keeps each event inside its UTC day for + * dashboards in UTC-12..+11, and it is retry-stable (the dedupe key needs a + * byte-identical timestamp). + * + * Installs younger than the lag window (installDay > lastFullDay) return []: + * live telemetry covers their entire life, and shipping a <48h timestamp + * would violate the historical-migration contract. + */ +export function buildBackfillEvents( + db: Database, + installId: string, + nowMs: number +): BackfillEvent[] { + const lastFullDay = utcDayString(nowMs - BACKFILL_LAG_MS); + + const firstActivityEpochMs = findFirstActivityEpochMs(db); + if (firstActivityEpochMs === null) return []; + + const installDay = utcDayString(firstActivityEpochMs); + if (installDay > lastFullDay) return []; + + const events: BackfillEvent[] = []; + + for (const rollup of collectDailyRollups(db, lastFullDay, installDay)) { + const properties: Record = scrubProperties({ + ...rollup.counters, + backfilled: true, + }); + // $-prefixed PostHog directives are not user data and bypass the + // whitelist; added AFTER scrubbing (same as captureEvent). + properties.$process_person_profile = false; + events.push({ + event: 'historical_activity', + properties, + timestamp: new Date(rollup.day + 'T12:00:00Z'), + uuid: deterministicEventUuid(installId, 'historical_activity', rollup.day), + }); + } + + const installProps: Record = scrubProperties({ + ...buildBaseProperties(), + // Explicit assignment is load-bearing: buildPersonSet only copies keys + // PRESENT on the event's properties. + first_active_date: installDay, + backfilled: true, + }); + installProps.$set = buildPersonSet(installProps); + events.push({ + event: 'install_inferred', + properties: installProps, + timestamp: new Date(installDay + 'T12:00:00Z'), + uuid: deterministicEventUuid(installId, 'install_inferred', 'install'), + }); + + return events; +} + +/** + * One-shot historical backfill. Fire-and-forget from worker startup; never + * throws (telemetry must never break the worker). + * + * Gate sequence (ORDER MATTERS — the debug dry-run must precede every marker + * write so debug mode can never latch the marker): + * 1. completion marker exists -> return + * 2. no telemetry consent -> return (no marker — later opt-in still backfills) + * 3. build events + * 4. CLAUDE_MEM_TELEMETRY_DEBUG=1 -> stderr dry-run, NO send, NO marker + * 5. zero events -> write marker, return + * 6. no API key -> return (vestigial: the embedded key is never falsy) + * 7. dedicated historicalMigration client, single-batch sizing + * 8. on('error') latch + capture all + await shutdown() (the ONLY delivery barrier) + * 9. marker ONLY on clean shutdown with zero emitted errors + */ +export async function runHistoricalBackfill(db: Database): Promise { + try { + if (isBackfillComplete()) return; + + if (!resolveTelemetryConsent(process.env, loadTelemetryConfig())) return; + + const nowMs = Date.now(); + const lastFullDay = utcDayString(nowMs - BACKFILL_LAG_MS); + const installId = getOrCreateInstallId(); + const events = buildBackfillEvents(db, installId, nowMs); + + if (process.env.CLAUDE_MEM_TELEMETRY_DEBUG === '1') { + // Dry-run: print the exact payload to stderr (debug mode is a human in + // the foreground — same convention as captureEvent), send nothing, + // write no marker. Intentionally re-runs on every debug worker start. + const days = events + .filter(e => e.event === 'historical_activity') + .map(e => e.timestamp.toISOString().slice(0, 10)); + const dayRange = days.length > 0 ? `${days[0]}..${days[days.length - 1]}` : '(none)'; + process.stderr.write( + `[telemetry-backfill] dry-run: ${events.length} events, days ${dayRange}, lastFullDay ${lastFullDay}\n` + ); + for (const e of events) { + process.stderr.write( + '[telemetry-backfill] ' + + JSON.stringify({ + event: e.event, + timestamp: e.timestamp.toISOString(), + uuid: e.uuid, + properties: e.properties, + }) + + '\n' + ); + } + return; + } + + if (events.length === 0) { + // Fresh installs land here: nothing pre-telemetry exists, and live + // telemetry covers them from day 0 — latch so we never rescan. + writeBackfillMarker({ + completedAt: new Date().toISOString(), + throughDay: lastFullDay, + eventCount: 0, + installId, + }); + return; + } + + if (!getTelemetryApiKey()) return; + + // Dedicated short-lived client — the live singleton lacks + // historicalMigration and its shutdown latch must stay untouched. The + // 5000s make flushAt unreachable (no swallowed background flushes) and + // keep the whole backfill in ONE request at shutdown, with no silent + // queue-cap drops for multi-year installs. + const client = new PostHog(getTelemetryApiKey(), { + host: getTelemetryHost(), + historicalMigration: true, + flushAt: 5000, + maxBatchSize: 5000, + maxQueueSize: 5000, + disableGeoip: false, + }); + + // shutdown() swallows fetch errors internally; the public error emitter + // is the only delivery-failure signal. + const errors: unknown[] = []; + client.on('error', (err: unknown) => { + errors.push(err); + }); + + for (const e of events) { + client.capture({ + distinctId: installId, + event: e.event, + properties: e.properties, + timestamp: e.timestamp, + uuid: e.uuid, + }); + } + + // shutdown() is the only delivery barrier: it joins pending capture + // promises, then loops flush until the queue drains. A bare flush() can + // resolve while captures are still un-enqueued. + await client.shutdown(); + + if (errors.length === 0) { + writeBackfillMarker({ + completedAt: new Date().toISOString(), + throughDay: lastFullDay, + eventCount: events.length, + installId, + }); + logger.info('SYSTEM', 'Telemetry historical backfill complete', { + eventCount: events.length, + throughDay: lastFullDay, + }); + } else { + // No marker: the next worker start retries with byte-identical events + // (deterministic uuid + noon-UTC timestamps make the retry dedupable). + logger.warn('SYSTEM', 'Telemetry historical backfill delivery errored; will retry on next worker start', { + eventCount: events.length, + errorCount: errors.length, + }); + } + } catch (error) { + logger.error( + 'SYSTEM', + 'Telemetry historical backfill failed (non-blocking)', + {}, + error instanceof Error ? error : new Error(String(error)) + ); + } +} diff --git a/src/services/telemetry/common.ts b/src/services/telemetry/common.ts index 499706e79..f1c3cc0c1 100644 --- a/src/services/telemetry/common.ts +++ b/src/services/telemetry/common.ts @@ -46,6 +46,10 @@ export const PERSON_PROPERTY_KEYS = [ 'runtime_mode', 'install_method', 'claude_code_version', + // Inferred install day (YYYY-MM-DD) from the one-time historical backfill's + // install_inferred event — anchors the adoption curve for installs that + // predate telemetry. + 'first_active_date', // Install snapshot (refreshed by the daily worker_started heartbeat) — // lets cohorts slice by install scale, age, and activity. 'db_observation_count', diff --git a/src/services/telemetry/scrub.ts b/src/services/telemetry/scrub.ts index 1861036d1..8b560d074 100644 --- a/src/services/telemetry/scrub.ts +++ b/src/services/telemetry/scrub.ts @@ -114,6 +114,25 @@ export const ALLOWED_PROPERTY_KEYS: Set = new Set([ 'error_mode', 'consecutive_failures', 'threshold_tripped', + // Historical backfill (backfill.ts) — anonymous per-day rollup counters, + // the backfilled:true flag, and the single inferred-install date string + // (YYYY-MM-DD). Counts/sums and closed-enum buckets only — never project + // names, prompt text, or any raw string column. observation_count, + // session_count, and the obs_type_* family are shared with live + // context_injected above (different per-event semantics — see PR notes). + 'discovery_tokens', + 'summary_count', + 'prompt_count', + 'project_count', + 'backfilled', + 'first_active_date', + 'session_completed_count', + 'session_failed_count', + 'sessions_claude_count', + 'sessions_codex_count', + 'sessions_gemini_count', + 'sessions_other_platform_count', + 'subagent_obs_count', ]); const MAX_STRING_LENGTH = 200; diff --git a/src/services/worker-service.ts b/src/services/worker-service.ts index a0297df3c..170f4e0ef 100644 --- a/src/services/worker-service.ts +++ b/src/services/worker-service.ts @@ -23,6 +23,7 @@ import { ensureWorkerStarted as ensureWorkerStartedShared, type WorkerStartResul import { acquireSpawnLock, releaseSpawnLock } from '../shared/worker-spawn-gate.js'; import { captureEvent, shutdownTelemetry } from './telemetry/telemetry.js'; import { collectInstallStats } from './telemetry/install-stats.js'; +import { runHistoricalBackfill } from './telemetry/backfill.js'; export { isPluginDisabledInClaudeSettings } from '../shared/plugin-state.js'; import { isPluginDisabledInClaudeSettings } from '../shared/plugin-state.js'; @@ -546,6 +547,17 @@ export class WorkerService implements WorkerRef { }, 24 * 60 * 60 * 1000); this.telemetryHeartbeat.unref?.(); + // One-time historical telemetry backfill (anonymized daily rollups). + // Fire-and-forget: gated internally by the backfill.json marker and the + // same consent checks as live telemetry; a failed run retries on the + // next worker start because no marker is written. + // runHistoricalBackfill never rejects by contract (its body is fully + // try/caught), so this .catch is an unhandled-rejection backstop that + // keeps the worker alive if that contract ever regresses. + runHistoricalBackfill(this.dbManager.getConnection()).catch(error => { + logger.error('SYSTEM', 'Telemetry historical backfill failed (non-blocking)', {}, error as Error); + }); + await this.startTranscriptWatcher(settings); if (this.chromaMcpManager) { diff --git a/tests/preload.ts b/tests/preload.ts index a6276eb25..e84951f58 100644 --- a/tests/preload.ts +++ b/tests/preload.ts @@ -42,14 +42,40 @@ export type PostHogConstructorCall = { apiKey: string; options: Record> = []; +/** + * Behavior knob for the mock below. When `emitErrorOnShutdown` is set, the + * next shutdown() call emits it through every handler registered via + * on('error', ...) before resolving — simulating posthog-node's real + * delivery-failure surface (shutdown() swallows fetch errors internally; the + * public error emitter is the only failure signal). Tests that set it MUST + * reset it to null afterwards. + */ +export const postHogMockBehavior: { emitErrorOnShutdown: unknown | null } = { + emitErrorOnShutdown: null, +}; + mock.module('posthog-node', () => ({ PostHog: class { + private handlers: Record void>> = {}; constructor(apiKey: string, options: Record) { postHogConstructorCalls.push({ apiKey, options }); } capture(payload: Record): void { postHogCaptureCalls.push(payload); } - async shutdown(): Promise {} + on(event: string, handler: (...args: unknown[]) => void): () => void { + (this.handlers[event] ??= []).push(handler); + return () => {}; + } + /** Test-only trigger mirroring the real SDK's internal emitter. */ + emit(event: string, ...args: unknown[]): void { + for (const handler of this.handlers[event] ?? []) handler(...args); + } + async flush(): Promise {} + async shutdown(): Promise { + if (postHogMockBehavior.emitErrorOnShutdown !== null) { + this.emit('error', postHogMockBehavior.emitErrorOnShutdown); + } + } }, })); diff --git a/tests/telemetry/backfill.test.ts b/tests/telemetry/backfill.test.ts new file mode 100644 index 000000000..c246974b3 --- /dev/null +++ b/tests/telemetry/backfill.test.ts @@ -0,0 +1,496 @@ +import { describe, it, expect, beforeAll, afterAll, beforeEach, afterEach } from 'bun:test'; +import { Database } from 'bun:sqlite'; +import { mkdtempSync, rmSync, existsSync, readFileSync, writeFileSync } from 'fs'; +import { tmpdir } from 'os'; +import { join } from 'path'; +import { + postHogConstructorCalls, + postHogCaptureCalls, + postHogMockBehavior, +} from '../preload'; +import { + utcDayString, + collectDailyRollups, + findFirstActivityEpochMs, + deterministicEventUuid, + buildBackfillEvents, + runHistoricalBackfill, + PROJECT_EPOCH_FLOOR, +} from '../../src/services/telemetry/backfill'; +import { scrubProperties } from '../../src/services/telemetry/scrub'; +import { __resetTelemetryForTests } from '../../src/services/telemetry/telemetry'; + +const HOUR_MS = 3_600_000; +const DAY_MS = 86_400_000; +const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/; + +/** + * In-memory DB matching the runner.ts schema for every column the rollup + * queries touch (install-stats test pattern, EXTENDED: discovery_tokens on + * both observations and session_summaries, memory_session_id, type, + * agent_type, status, platform_source, and the user_prompts table). + */ +function makeDb(): Database { + const db = new Database(':memory:'); + db.run(` + CREATE TABLE sdk_sessions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + memory_session_id TEXT, + project TEXT NOT NULL, + platform_source TEXT NOT NULL DEFAULT 'claude', + started_at_epoch INTEGER NOT NULL, + status TEXT NOT NULL DEFAULT 'active' + ); + CREATE TABLE observations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + memory_session_id TEXT, + project TEXT NOT NULL, + type TEXT NOT NULL DEFAULT 'other', + agent_type TEXT, + created_at_epoch INTEGER NOT NULL, + discovery_tokens INTEGER DEFAULT 0 + ); + CREATE TABLE session_summaries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + memory_session_id TEXT, + project TEXT NOT NULL, + created_at_epoch INTEGER NOT NULL, + discovery_tokens INTEGER DEFAULT 0 + ); + CREATE TABLE user_prompts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + content_session_id TEXT, + prompt_number INTEGER, + prompt_text TEXT, + created_at_epoch INTEGER NOT NULL + ); + `); + return db; +} + +type SessionRow = { + epoch: number; + project?: string; + memId?: string | null; + status?: string; + platform?: string; +}; +function insertSession(db: Database, row: SessionRow): void { + db.prepare( + 'INSERT INTO sdk_sessions (memory_session_id, project, platform_source, started_at_epoch, status) VALUES (?, ?, ?, ?, ?)' + ).run(row.memId ?? null, row.project ?? 'alpha', row.platform ?? 'claude', row.epoch, row.status ?? 'completed'); +} + +type ObsRow = { + epoch: number; + project?: string; + memId?: string | null; + type?: string; + agentType?: string | null; + tokens?: number; +}; +function insertObs(db: Database, row: ObsRow): void { + db.prepare( + 'INSERT INTO observations (memory_session_id, project, type, agent_type, created_at_epoch, discovery_tokens) VALUES (?, ?, ?, ?, ?, ?)' + ).run(row.memId ?? null, row.project ?? 'alpha', row.type ?? 'other', row.agentType ?? null, row.epoch, row.tokens ?? 0); +} + +type SummaryRow = { epoch: number; project?: string; memId?: string | null; tokens?: number }; +function insertSummary(db: Database, row: SummaryRow): void { + db.prepare( + 'INSERT INTO session_summaries (memory_session_id, project, created_at_epoch, discovery_tokens) VALUES (?, ?, ?, ?)' + ).run(row.memId ?? null, row.project ?? 'alpha', row.epoch, row.tokens ?? 0); +} + +function insertPrompt(db: Database, epoch: number): void { + db.prepare( + 'INSERT INTO user_prompts (content_session_id, prompt_number, prompt_text, created_at_epoch) VALUES (?, ?, ?, ?)' + ).run('cs-1', 1, '', epoch); +} + +function historicalDays(events: ReturnType): string[] { + return events + .filter(e => e.event === 'historical_activity') + .map(e => e.timestamp.toISOString().slice(0, 10)); +} + +/** Capture process.stderr.write output for the duration of fn. */ +async function withStderrCapture(fn: () => Promise): Promise { + const lines: string[] = []; + const original = process.stderr.write; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + process.stderr.write = ((chunk: any) => { + lines.push(String(chunk)); + return true; + }) as typeof process.stderr.write; + try { + await fn(); + } finally { + process.stderr.write = original; + } + return lines; +} + +// Fixed "now" for the pure builders: 2026-06-12T23:00:00Z. +// lastFullDay = utcDay(NOW - 60h) = utcDay(2026-06-10T11:00Z) = 2026-06-10. +const NOW = Date.UTC(2026, 5, 12, 23, 0, 0); +const LAST_FULL_DAY = '2026-06-10'; + +let tempDir: string; +const savedEnv: Record = {}; +const ENV_KEYS = [ + 'CLAUDE_MEM_DATA_DIR', + 'CLAUDE_MEM_TELEMETRY', + 'CLAUDE_MEM_TELEMETRY_DEBUG', + 'CLAUDE_MEM_TELEMETRY_KEY', + 'CLAUDE_MEM_TELEMETRY_HOST', + 'DO_NOT_TRACK', +]; + +beforeAll(() => { + for (const key of ENV_KEYS) savedEnv[key] = process.env[key]; +}); + +beforeEach(() => { + // Fresh data dir per test so marker/telemetry.json state never leaks + // between tests — and never touches the real ~/.claude-mem. + tempDir = mkdtempSync(join(tmpdir(), 'claude-mem-backfill-')); + process.env.CLAUDE_MEM_DATA_DIR = tempDir; + process.env.CLAUDE_MEM_TELEMETRY = '1'; + delete process.env.CLAUDE_MEM_TELEMETRY_DEBUG; + delete process.env.CLAUDE_MEM_TELEMETRY_KEY; + delete process.env.CLAUDE_MEM_TELEMETRY_HOST; + delete process.env.DO_NOT_TRACK; + postHogConstructorCalls.length = 0; + postHogCaptureCalls.length = 0; + postHogMockBehavior.emitErrorOnShutdown = null; + __resetTelemetryForTests(); +}); + +afterEach(() => { + postHogMockBehavior.emitErrorOnShutdown = null; + rmSync(tempDir, { recursive: true, force: true }); +}); + +afterAll(() => { + for (const key of ENV_KEYS) { + if (savedEnv[key] === undefined) delete process.env[key]; + else process.env[key] = savedEnv[key]; + } + __resetTelemetryForTests(); +}); + +function markerPath(): string { + return join(tempDir, 'backfill.json'); +} + +function readMarker(): Record { + return JSON.parse(readFileSync(markerPath(), 'utf-8')); +} + +describe('collectDailyRollups epoch normalization', () => { + it('(a) lands mixed second/ms epochs on the correct UTC day, never 1970', () => { + const db = makeDb(); + // Legacy seconds-unit rows: 1755000000 s = 2025-08-12T12:00:00Z. + insertObs(db, { epoch: 1_755_000_000 }); + // Same day, written in milliseconds an hour later. + insertObs(db, { epoch: 1_755_000_000_000 + HOUR_MS }); + + const rollups = collectDailyRollups(db, LAST_FULL_DAY, '2024-01-01'); + + expect(rollups.length).toBe(1); + expect(rollups[0].day).toBe('2025-08-12'); + expect(rollups[0].counters.observation_count).toBe(2); + for (const rollup of rollups) { + expect(rollup.day.startsWith('1970')).toBe(false); + } + }); +}); + +describe('day window (whole UTC days only)', () => { + it('(b) excludes a 47h-old row, includes whole day buckets, drops floor/pre-install rows', () => { + const db = makeDb(); + const memId = 's-1'; + // installDay anchor: session 10 days back -> 2026-06-02. + insertSession(db, { epoch: NOW - 10 * DAY_MS, memId }); + // 47h-old session: 2026-06-11T00:00Z, day AFTER lastFullDay -> excluded. + insertSession(db, { epoch: NOW - 47 * HOUR_MS, memId: 's-47h' }); + // Two sessions on lastFullDay itself: one 70h old, one only 49h old — + // BOTH included because the whole UTC day 2026-06-10 is included + // (day-bucket rule, not a raw 60h row cutoff -> no partial day ships). + insertSession(db, { epoch: NOW - 70 * HOUR_MS, memId: 's-70h' }); + insertSession(db, { epoch: NOW - 49 * HOUR_MS, memId: 's-49h' }); + // Corrupt row below PROJECT_EPOCH_FLOOR -> ignored entirely. + insertObs(db, { epoch: Date.UTC(2023, 5, 1) }); + expect(Date.UTC(2023, 5, 1)).toBeLessThan(PROJECT_EPOCH_FLOOR); + // Backdated artifact: observation before installDay -> no day produced. + insertObs(db, { epoch: NOW - 23 * DAY_MS }); + + const events = buildBackfillEvents(db, 'install-1', NOW); + const days = historicalDays(events); + + expect(days).toContain('2026-06-02'); + expect(days).toContain(LAST_FULL_DAY); + expect(days).not.toContain('2026-06-11'); + for (const day of days) { + expect(day <= LAST_FULL_DAY).toBe(true); + expect(day >= '2026-06-02').toBe(true); + expect(day.startsWith('1970')).toBe(false); + } + const lastFullDayEvent = events.find( + e => e.event === 'historical_activity' && e.timestamp.toISOString().startsWith(LAST_FULL_DAY) + ); + expect(lastFullDayEvent?.properties.session_count).toBe(2); + }); +}); + +describe('deterministicEventUuid', () => { + it('(c) is stable across calls and shaped like a UUID', () => { + const a = deterministicEventUuid('install-1', 'historical_activity', '2025-10-19'); + const b = deterministicEventUuid('install-1', 'historical_activity', '2025-10-19'); + expect(a).toBe(b); + expect(a).toMatch(UUID_RE); + // Distinct inputs produce distinct ids. + expect(deterministicEventUuid('install-1', 'historical_activity', '2025-10-20')).not.toBe(a); + expect(deterministicEventUuid('install-2', 'historical_activity', '2025-10-19')).not.toBe(a); + expect(deterministicEventUuid('install-1', 'install_inferred', '2025-10-19')).not.toBe(a); + }); +}); + +describe('buildBackfillEvents properties', () => { + it('(d) every built property survives scrubProperties — no silent drops', () => { + const db = makeDb(); + const epoch = Date.UTC(2026, 0, 15, 10, 0, 0); // 2026-01-15 + insertSession(db, { epoch, memId: 's-1', status: 'completed', platform: 'claude' }); + insertSession(db, { epoch: epoch + 1000, memId: 's-2', status: 'failed', platform: 'codex' }); + insertSession(db, { epoch: epoch + 2000, memId: 's-3', status: 'completed', platform: 'gemini' }); + insertSession(db, { epoch: epoch + 3000, memId: 's-4', status: 'active', platform: 'weird-thing', project: 'beta' }); + insertObs(db, { epoch, memId: 's-1', type: 'bugfix' }); + insertObs(db, { epoch: epoch + 1000, memId: 's-1', type: 'discovery' }); + insertObs(db, { epoch: epoch + 2000, memId: 's-1', type: 'decision' }); + insertObs(db, { epoch: epoch + 3000, memId: 's-1', type: 'refactor' }); + insertObs(db, { epoch: epoch + 4000, memId: 's-1', type: 'weirdtype', agentType: 'task' }); + insertSummary(db, { epoch, memId: 's-1', tokens: 250 }); + insertPrompt(db, epoch); + + const events = buildBackfillEvents(db, 'install-1', NOW); + expect(events.length).toBe(2); + + const activity = events.find(e => e.event === 'historical_activity'); + expect(activity).toBeDefined(); + const props = activity!.properties as Record; + const expectedCounters: Record = { + observation_count: 5, + obs_type_bugfix: 1, + obs_type_discovery: 1, + obs_type_decision: 1, + obs_type_refactor: 1, + obs_type_other: 1, + subagent_obs_count: 1, + session_count: 4, + session_completed_count: 2, + session_failed_count: 1, + sessions_claude_count: 1, + sessions_codex_count: 1, + sessions_gemini_count: 1, + sessions_other_platform_count: 1, + summary_count: 1, + discovery_tokens: 250, + prompt_count: 1, + project_count: 2, + }; + for (const [key, value] of Object.entries(expectedCounters)) { + expect(props[key]).toBe(value); + } + expect(props.backfilled).toBe(true); + expect(props.$process_person_profile).toBe(false); + // No buildBaseProperties on historical_activity — current version/os on + // historical days would poison version-over-time charts. + expect(props.version).toBeUndefined(); + expect(props.os).toBeUndefined(); + // Re-scrubbing the non-$ properties changes nothing: nothing the builder + // attached gets silently dropped by the whitelist. + const { $process_person_profile: _directive, ...rest } = props; + expect(scrubProperties(rest)).toEqual(rest as Record); + + const install = events.find(e => e.event === 'install_inferred'); + expect(install).toBeDefined(); + const installProps = install!.properties as Record; + expect(installProps.first_active_date).toBe('2026-01-15'); + expect(installProps.backfilled).toBe(true); + // Base props ARE expected on the one person event ($set = current state). + expect(typeof installProps.version).toBe('string'); + const { $set: _set, ...installRest } = installProps; + expect(scrubProperties(installRest)).toEqual( + installRest as Record + ); + }); + + it('(e) empty DB produces zero events without throwing', () => { + expect(buildBackfillEvents(makeDb(), 'install-1', NOW)).toEqual([]); + // Even a database with no tables at all. + expect(buildBackfillEvents(new Database(':memory:'), 'install-1', NOW)).toEqual([]); + expect(findFirstActivityEpochMs(new Database(':memory:'))).toBeNull(); + }); + + it('(f) discovery_tokens sums session_summaries only — one turn counts once', () => { + const db = makeDb(); + const epoch = Date.UTC(2025, 2, 3, 9, 0, 0); // 2025-03-03 + // One compression turn: the same 100-token cost is written to every + // observation row of the turn AND the turn's summary row. + insertObs(db, { epoch, memId: 's-1', tokens: 100 }); + insertObs(db, { epoch: epoch + 1000, memId: 's-1', tokens: 100 }); + insertObs(db, { epoch: epoch + 2000, memId: 's-1', tokens: 100 }); + insertSummary(db, { epoch: epoch + 3000, memId: 's-1', tokens: 100 }); + + const rollups = collectDailyRollups(db, LAST_FULL_DAY, '2024-01-01'); + expect(rollups.length).toBe(1); + expect(rollups[0].day).toBe('2025-03-03'); + expect(rollups[0].counters.discovery_tokens).toBe(100); + }); + + it('(g) one session with one observation: no double counting', () => { + const db = makeDb(); + const epoch = Date.UTC(2025, 3, 4, 14, 0, 0); // 2025-04-04 + insertSession(db, { epoch, memId: 's-1', project: 'alpha' }); + insertObs(db, { epoch: epoch + 1000, memId: 's-1', project: 'alpha' }); + + const rollups = collectDailyRollups(db, LAST_FULL_DAY, '2024-01-01'); + expect(rollups.length).toBe(1); + expect(rollups[0].counters.session_count).toBe(1); + expect(rollups[0].counters.project_count).toBe(1); + expect(rollups[0].counters.observation_count).toBe(1); + }); + + it('(h) install_inferred uses the sessions MIN at noon UTC with first_active_date in $set', () => { + const db = makeDb(); + // Backdated artifact observation: 2025-08-12 (legacy seconds epoch). + insertObs(db, { epoch: 1_755_000_000 }); + // Trustworthy session start: 2025-10-19T08:00Z. + insertSession(db, { epoch: Date.UTC(2025, 9, 19, 8, 0, 0), memId: 's-1' }); + + const events = buildBackfillEvents(db, 'install-1', NOW); + const install = events.filter(e => e.event === 'install_inferred'); + expect(install.length).toBe(1); + expect(install[0].timestamp.toISOString()).toBe('2025-10-19T12:00:00.000Z'); + expect(install[0].properties.first_active_date).toBe('2025-10-19'); + const set = install[0].properties.$set as Record; + expect(set.first_active_date).toBe('2025-10-19'); + expect(install[0].uuid).toBe(deterministicEventUuid('install-1', 'install_inferred', 'install')); + // The backdated artifact day is clamped out of the rollups entirely. + expect(historicalDays(events)).not.toContain('2025-08-12'); + }); + + it('(i) first activity 1h ago (installDay > lastFullDay) yields zero events', () => { + const db = makeDb(); + insertSession(db, { epoch: NOW - 1 * HOUR_MS, memId: 's-1' }); + insertObs(db, { epoch: NOW - 1 * HOUR_MS, memId: 's-1' }); + expect(buildBackfillEvents(db, 'install-1', NOW)).toEqual([]); + }); +}); + +describe('runHistoricalBackfill gates', () => { + function seedHistoricalDb(): Database { + const db = makeDb(); + const epoch = Date.now() - 10 * DAY_MS; + insertSession(db, { epoch, memId: 's-1' }); + insertObs(db, { epoch: epoch + 1000, memId: 's-1' }); + return db; + } + + it('(j) consent off: no client, no captures, no marker', async () => { + process.env.CLAUDE_MEM_TELEMETRY = '0'; + await runHistoricalBackfill(seedHistoricalDb()); + expect(postHogConstructorCalls.length).toBe(0); + expect(postHogCaptureCalls.length).toBe(0); + expect(existsSync(markerPath())).toBe(false); + }); + + it('(j) existing marker: returns before doing anything', async () => { + writeFileSync( + markerPath(), + JSON.stringify({ completedAt: 'x', throughDay: '2026-01-01', eventCount: 1, installId: 'i' }) + ); + await runHistoricalBackfill(seedHistoricalDb()); + expect(postHogConstructorCalls.length).toBe(0); + expect(postHogCaptureCalls.length).toBe(0); + }); + + it('(j) debug mode: stderr dry-run, no send, no marker', async () => { + process.env.CLAUDE_MEM_TELEMETRY_DEBUG = '1'; + const db = seedHistoricalDb(); + const lines = await withStderrCapture(async () => { + await runHistoricalBackfill(db); + }); + expect(lines.some(l => l.includes('[telemetry-backfill]'))).toBe(true); + expect(postHogConstructorCalls.length).toBe(0); + expect(postHogCaptureCalls.length).toBe(0); + expect(existsSync(markerPath())).toBe(false); + }); + + it('(j) debug mode on an EMPTY DB: still no marker (debug never latches)', async () => { + process.env.CLAUDE_MEM_TELEMETRY_DEBUG = '1'; + const db = makeDb(); + const lines = await withStderrCapture(async () => { + await runHistoricalBackfill(db); + }); + expect(lines.some(l => l.includes('[telemetry-backfill]'))).toBe(true); + expect(postHogCaptureCalls.length).toBe(0); + expect(existsSync(markerPath())).toBe(false); + }); + + it('zero events (fresh install): writes the marker without sending', async () => { + await runHistoricalBackfill(makeDb()); + expect(postHogConstructorCalls.length).toBe(0); + expect(postHogCaptureCalls.length).toBe(0); + expect(existsSync(markerPath())).toBe(true); + expect(readMarker().eventCount).toBe(0); + }); + + it('(j) second invocation after success sends nothing', async () => { + const db = seedHistoricalDb(); + await runHistoricalBackfill(db); + expect(existsSync(markerPath())).toBe(true); + const sentCount = postHogCaptureCalls.length; + expect(sentCount).toBeGreaterThan(0); + await runHistoricalBackfill(db); + expect(postHogCaptureCalls.length).toBe(sentCount); + }); + + it('(k) happy path: historicalMigration client, uuid + Date timestamps, marker with throughDay', async () => { + const expectedThroughBefore = utcDayString(Date.now() - 60 * HOUR_MS); + await runHistoricalBackfill(seedHistoricalDb()); + const expectedThroughAfter = utcDayString(Date.now() - 60 * HOUR_MS); + + expect(postHogConstructorCalls.length).toBe(1); + const options = postHogConstructorCalls[0].options; + expect(options.historicalMigration).toBe(true); + expect(options.flushAt).toBe(5000); + expect(options.maxBatchSize).toBe(5000); + expect(options.maxQueueSize).toBe(5000); + expect(options.disableGeoip).toBe(false); + + // One active day + one install_inferred. + expect(postHogCaptureCalls.length).toBe(2); + for (const call of postHogCaptureCalls) { + expect(typeof call.uuid).toBe('string'); + expect(call.uuid as string).toMatch(UUID_RE); + expect(call.timestamp instanceof Date).toBe(true); + expect(typeof call.distinctId).toBe('string'); + } + expect(postHogCaptureCalls.map(c => c.event)).toContain('install_inferred'); + + expect(existsSync(markerPath())).toBe(true); + const marker = readMarker(); + expect([expectedThroughBefore, expectedThroughAfter]).toContain(marker.throughDay as string); + expect(marker.eventCount).toBe(2); + expect(marker.installId).toBe(postHogCaptureCalls[0].distinctId as string); + }); + + it('(k) an emitted client error prevents the marker (retry on next start)', async () => { + postHogMockBehavior.emitErrorOnShutdown = new Error('delivery failed'); + await runHistoricalBackfill(seedHistoricalDb()); + expect(postHogCaptureCalls.length).toBe(2); + expect(existsSync(markerPath())).toBe(false); + }); +});