Add Postgres observation storage foundation

This commit is contained in:
Alex Newman
2026-05-07 18:26:12 -07:00
parent 3315838ef9
commit 4e0fc77af3
15 changed files with 4004 additions and 0 deletions

View File

@@ -136,6 +136,7 @@
"glob": "^13.0.6",
"handlebars": "^4.7.9",
"ioredis": "^5.10.1",
"pg": "^8.20.0",
"picocolors": "^1.1.1",
"react": "^19.2.5",
"react-dom": "^19.2.5",
@@ -156,6 +157,7 @@
"@types/dompurify": "^3.2.0",
"@types/express": "^5.0.6",
"@types/node": "^25.6.0",
"@types/pg": "^8.20.0",
"@types/react": "^19.2.14",
"@types/react-dom": "^19.2.3",
"esbuild": "^0.28.0",

View File

@@ -0,0 +1,987 @@
# Claude-Mem 13 Server Beta: Independent BullMQ Observation Runtime
Status: implementation plan
Date: 2026-05-07
Release target: claude-mem 13 Server (beta)
Relationship to prior plans:
- Extends `plans/2026-05-07-claude-mem-server-apache-bullmq-team-auth.md`.
- Supersedes the worker-parity parts of `plans/2026-05-07-claude-mem-13-server-beta-full-worker-parity.md` where that plan allowed Server beta to wrap/copy `WorkerService`.
- Keeps the existing worker in place, but makes Server beta a fully independent runtime, not a facade over worker internals.
## Executive Decision
Server beta must own its runtime end to end:
```text
REST/MCP/hooks -> Server beta HTTP/API layer -> BullMQ observation jobs -> provider generation -> server storage/search
```
The worker remains the stable legacy runtime, but Server beta must not depend on `WorkerService`, worker HTTP routes, worker queue consumers, or worker process lifecycle to generate observations.
Server beta should use BullMQ/Valkey as its canonical queue and Postgres as its canonical observation store. SQLite remains the legacy worker/local compatibility store only. Redis/Valkey is runtime infrastructure for jobs, retries, concurrency, and observability, not the source of truth for observations.
## Terminology Decision
Claude-mem's domain object is an **observation**. Server beta must preserve that wording in user-facing APIs, docs, jobs, storage names, tests, logs, and implementation plans.
Use "memory" only for legacy compatibility names that already exist in worker-era code or for external library/API concepts that cannot be renamed cleanly. New Server beta/Postgres concepts should be named around observations:
- `observations`, not `memory_items`
- `observation_sources`, not `memory_sources`
- `ObservationRepository`, not `MemoryItemsRepository`
- `GenerateObservationsForEventJob`, not generic memory generation
- `/v1/observations` and observation-focused MCP tools as the canonical surface
If any compatibility endpoint still uses `/v1/memories`, it should be treated as an alias over observations, not the canonical Server beta model.
## Phase 0: Documentation Discovery
### Local Sources Read
- `plans/2026-05-07-claude-mem-server-apache-bullmq-team-auth.md`
- `plans/2026-05-07-claude-mem-13-server-beta-full-worker-parity.md`
- `/Users/alexnewman/Downloads/claude-mem-handoff-docs/claude-mem-server-plan.md`
- `src/server/routes/v1/ServerV1Routes.ts`
- `src/server/queue/BullMqObservationQueueEngine.ts`
- `src/server/queue/ObservationQueueEngine.ts`
- `src/services/worker-service.ts`
- `src/services/worker/SessionManager.ts`
- `src/services/worker/agents/ResponseProcessor.ts`
- `src/services/worker/ClaudeProvider.ts`
- `src/services/worker/GeminiProvider.ts`
- `src/services/worker/OpenRouterProvider.ts`
- `src/services/worker/http/shared.ts`
- `src/storage/sqlite/agent-events.ts`
- `src/storage/sqlite/memory-items.ts`
- `src/core/schemas/agent-event.ts`
- `src/core/schemas/memory-item.ts`
- `scripts/e2e-server-beta-docker.sh`
- `docker/e2e/server-beta-e2e.mjs`
### External Docs Read
- BullMQ Workers: https://docs.bullmq.io/guide/workers
- BullMQ Worker Concurrency: https://docs.bullmq.io/guide/workers/concurrency
- BullMQ Stalled Jobs: https://docs.bullmq.io/guide/jobs/stalled
- Better Auth Express integration: https://better-auth.com/docs/integrations/express
### Concrete Findings
- The current `/v1` server route stores supplied events and direct observation records under legacy "memory" route/repository names:
- `src/server/routes/v1/ServerV1Routes.ts` registers `POST /v1/events`, `POST /v1/events/batch`, and `POST /v1/memories`.
- Those routes call `AgentEventsRepository.create(...)` and `MemoryItemsRepository.create(...)`.
- They do not currently enqueue a provider generation job.
- The current AI observation generation path is worker-owned:
- `src/services/worker/SessionManager.ts` consumes queued messages through `getMessageIterator(...)`.
- `src/services/worker-service.ts` starts provider sessions through `startSessionProcessor(...)`.
- `src/services/worker/agents/ResponseProcessor.ts` parses provider XML with `parseAgentXml(...)` and writes observations through `sessionStore.storeObservations(...)`.
- The existing v2 parity plan names `Claude/Gemini/OpenRouter providers`, session ingest routes, queue semantics, and hook routing as parity requirements, but it does not explicitly require `/v1/events` to generate observations.
- BullMQ official docs establish the primitives Server beta should use directly:
- `Worker` processes jobs and moves successful jobs to completed or thrown jobs to failed.
- BullMQ workers should attach an `error` listener.
- Workers support `autorun: false`.
- Workers support concurrency via the worker options object.
- Multiple workers are the recommended way to improve availability.
- Active jobs can stall and be retried when workers stop renewing locks.
- Better Auth Express docs require the auth handler to mount before `express.json()` and use `/api/auth/*splat` for Express 5.
### Allowed APIs And Patterns
- Copy Express pre-body route mounting from `src/services/server/Server.ts` plus Better Auth docs.
- Copy API-key auth from `src/server/middleware/auth.ts` and `src/server/auth/api-key-service.ts`.
- Copy repository behavior where useful, but implement Server beta repositories against Postgres; do not reuse worker legacy `SessionStore` as the server observation model.
- Copy provider request construction from `src/services/worker/ClaudeProvider.ts`, `GeminiProvider.ts`, and `OpenRouterProvider.ts`, then move shared logic into `src/server/generation` or `src/core/generation`.
- Copy XML parsing from `src/sdk/parser.ts` and current post-processing rules from `src/services/worker/agents/ResponseProcessor.ts`.
- Use BullMQ `Queue`, `Worker`, and `QueueEvents` directly for Server beta generation queues.
- Keep Valkey/Redis health checks from `src/server/queue/redis-config.ts` and existing Docker E2E setup.
### Anti-Pattern Guards
- Do not make Server beta call `new WorkerService()`.
- Do not make Server beta depend on worker HTTP route classes for generation.
- Do not make `/v1` a write-only event archive while claiming Server beta generates observations.
- Do not use the legacy SQLite pending-message queue for Server beta generation.
- Do not store canonical observation records in Redis.
- Do not remove or destabilize the existing worker.
- Do not silently fall back from explicit Server beta BullMQ mode to SQLite.
- Do not mount Better Auth after `express.json()`.
## Target Architecture
### Runtime Separation
```text
src/services/worker-service.ts
Legacy worker runtime. Stable compatibility path. May import shared core pieces later.
src/server/runtime/ServerBetaService.ts
Independent server runtime. Owns HTTP server, BullMQ queues, provider generation workers,
server storage repositories, auth, health, and Docker deployment.
```
### Server Beta Flow
```text
POST /v1/events
POST /v1/events/batch
Claude Code hook routed to Server beta
MCP observation_record_* tool
|
v
AgentEventsRepository transaction
|
v
ObservationGenerationJobRepository outbox row
|
v
BullMQ Queue.add(...)
|
v
BullMQ Worker processor
|
v
ProviderObservationGenerator
|
v
parseAgentXml / structured parser
|
v
ObservationRepository.create(...) + ObservationSourcesRepository.addSource(...)
|
v
QueueEvents/SSE/audit/search index update
```
## Phase 1: Postgres Observation Storage Foundation
### What To Implement
- Add Server beta Postgres configuration:
- add package dependencies `pg` and `@types/pg` to the Node/Bun TypeScript package manifest used by this repo;
- centralize Postgres storage code under:
- `src/storage/postgres/config.ts` for environment parsing, pool sizing, timeouts, and SSL settings;
- `src/storage/postgres/pool.ts` for the shared `pg.Pool` factory, health check, transactions, and graceful shutdown;
- `src/storage/postgres/schema.ts` for migration/bootstrap SQL and schema version constants;
- `src/storage/postgres/index.ts` for exports used by Server beta runtime wiring;
- `CLAUDE_MEM_SERVER_DATABASE_URL`;
- connection pool size and timeout settings;
- startup validation that fails Server beta when Postgres is required but unavailable;
- graceful shutdown that drains and closes the Postgres pool.
- Add a migration/bootstrap helper for Server beta storage:
- creates required schemas/tables/indexes;
- records applied migration versions;
- is safe to run repeatedly on startup and in tests.
- Define canonical Postgres tables:
- `teams`;
- `projects`;
- `team_members`;
- `api_keys`;
- `audit_log`;
- `server_sessions`;
- `agent_events`;
- `observations`;
- `observation_sources`;
- `observation_generation_jobs`;
- `observation_generation_job_events`.
- Implement the initial schema contract explicitly in Phase 1 migrations. Column names can be refined only if all repository contracts and tests are updated in the same phase:
```sql
CREATE TABLE teams (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE projects (
id TEXT PRIMARY KEY,
team_id TEXT NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
name TEXT NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (id, team_id)
);
CREATE TABLE team_members (
team_id TEXT NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
user_id TEXT NOT NULL,
role TEXT NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (team_id, user_id)
);
CREATE TABLE api_keys (
id TEXT PRIMARY KEY,
key_hash TEXT NOT NULL UNIQUE,
team_id TEXT REFERENCES teams(id) ON DELETE CASCADE,
project_id TEXT REFERENCES projects(id) ON DELETE CASCADE,
actor_id TEXT NOT NULL,
scopes JSONB NOT NULL DEFAULT '[]'::jsonb,
revoked_at TIMESTAMPTZ,
expires_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CHECK (project_id IS NULL OR team_id IS NOT NULL),
FOREIGN KEY (project_id, team_id) REFERENCES projects(id, team_id) ON DELETE CASCADE
);
CREATE TABLE audit_log (
id TEXT PRIMARY KEY,
team_id TEXT REFERENCES teams(id) ON DELETE SET NULL,
project_id TEXT REFERENCES projects(id) ON DELETE SET NULL,
actor_id TEXT,
api_key_id TEXT REFERENCES api_keys(id) ON DELETE SET NULL,
action TEXT NOT NULL,
resource_type TEXT NOT NULL,
resource_id TEXT,
details JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CHECK (project_id IS NULL OR team_id IS NOT NULL),
FOREIGN KEY (project_id, team_id) REFERENCES projects(id, team_id) ON DELETE SET NULL
);
CREATE TABLE server_sessions (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
team_id TEXT NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
external_session_id TEXT,
content_session_id TEXT,
agent_id TEXT,
agent_type TEXT,
platform_source TEXT,
generation_status TEXT NOT NULL DEFAULT 'idle',
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
ended_at TIMESTAMPTZ,
last_generated_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (project_id, external_session_id),
FOREIGN KEY (project_id, team_id) REFERENCES projects(id, team_id) ON DELETE CASCADE
);
CREATE TABLE agent_events (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
team_id TEXT NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
server_session_id TEXT REFERENCES server_sessions(id) ON DELETE SET NULL,
source_adapter TEXT NOT NULL,
source_event_id TEXT,
idempotency_key TEXT NOT NULL,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
occurred_at TIMESTAMPTZ NOT NULL,
received_at TIMESTAMPTZ NOT NULL DEFAULT now(),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (idempotency_key),
UNIQUE (id, project_id, team_id),
FOREIGN KEY (project_id, team_id) REFERENCES projects(id, team_id) ON DELETE CASCADE
);
CREATE TABLE observation_generation_jobs (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
team_id TEXT NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
agent_event_id TEXT REFERENCES agent_events(id) ON DELETE CASCADE,
source_type TEXT NOT NULL CHECK (source_type IN ('agent_event', 'session_summary', 'observation_reindex')),
source_id TEXT NOT NULL,
server_session_id TEXT REFERENCES server_sessions(id) ON DELETE SET NULL,
job_type TEXT NOT NULL,
status TEXT NOT NULL CHECK (status IN ('queued', 'processing', 'completed', 'failed', 'cancelled')),
idempotency_key TEXT NOT NULL UNIQUE,
bullmq_job_id TEXT UNIQUE,
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 3,
next_attempt_at TIMESTAMPTZ,
locked_at TIMESTAMPTZ,
locked_by TEXT,
completed_at TIMESTAMPTZ,
failed_at TIMESTAMPTZ,
cancelled_at TIMESTAMPTZ,
last_error JSONB,
payload JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (team_id, project_id, source_type, source_id, job_type),
CHECK (
(source_type = 'agent_event' AND agent_event_id IS NOT NULL AND source_id = agent_event_id)
OR
(source_type = 'session_summary' AND agent_event_id IS NULL AND server_session_id IS NOT NULL AND source_id = server_session_id)
OR
(source_type = 'observation_reindex' AND agent_event_id IS NULL)
),
FOREIGN KEY (agent_event_id, project_id, team_id) REFERENCES agent_events(id, project_id, team_id) ON DELETE CASCADE,
FOREIGN KEY (project_id, team_id) REFERENCES projects(id, team_id) ON DELETE CASCADE
);
CREATE TABLE observations (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
team_id TEXT NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
server_session_id TEXT REFERENCES server_sessions(id) ON DELETE SET NULL,
kind TEXT NOT NULL DEFAULT 'observation',
content TEXT NOT NULL,
content_search TSVECTOR GENERATED ALWAYS AS (to_tsvector('english', content)) STORED,
generation_key TEXT,
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
embedding JSONB,
created_by_job_id TEXT REFERENCES observation_generation_jobs(id) ON DELETE SET NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (team_id, project_id, generation_key),
FOREIGN KEY (project_id, team_id) REFERENCES projects(id, team_id) ON DELETE CASCADE
);
CREATE TABLE observation_sources (
id TEXT PRIMARY KEY,
observation_id TEXT NOT NULL REFERENCES observations(id) ON DELETE CASCADE,
agent_event_id TEXT REFERENCES agent_events(id) ON DELETE CASCADE,
generation_job_id TEXT REFERENCES observation_generation_jobs(id) ON DELETE SET NULL,
source_type TEXT NOT NULL CHECK (source_type IN ('agent_event', 'session_summary', 'observation_reindex', 'manual')),
source_id TEXT NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (observation_id, source_type, source_id),
UNIQUE (source_type, source_id, generation_job_id, observation_id),
CHECK (
(source_type = 'agent_event' AND agent_event_id IS NOT NULL AND source_id = agent_event_id)
OR
(source_type <> 'agent_event' AND agent_event_id IS NULL)
)
);
CREATE TABLE observation_generation_job_events (
id TEXT PRIMARY KEY,
generation_job_id TEXT NOT NULL REFERENCES observation_generation_jobs(id) ON DELETE CASCADE,
event_type TEXT NOT NULL CHECK (event_type IN ('queued', 'enqueued', 'processing', 'retry_scheduled', 'completed', 'failed', 'cancelled')),
status_after TEXT NOT NULL CHECK (status_after IN ('queued', 'processing', 'completed', 'failed', 'cancelled')),
attempt INTEGER NOT NULL DEFAULT 0,
details JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_agent_events_project_session ON agent_events(project_id, server_session_id, occurred_at);
CREATE INDEX idx_projects_team ON projects(team_id, id);
CREATE INDEX idx_agent_events_team_project ON agent_events(team_id, project_id, occurred_at);
CREATE INDEX idx_observations_project_session ON observations(project_id, server_session_id, created_at);
CREATE INDEX idx_observations_team_project ON observations(team_id, project_id, created_at);
CREATE INDEX idx_observations_content_search ON observations USING GIN (content_search);
CREATE INDEX idx_observation_sources_event ON observation_sources(agent_event_id);
CREATE INDEX idx_observation_sources_source ON observation_sources(source_type, source_id);
CREATE INDEX idx_observation_jobs_status_next_attempt ON observation_generation_jobs(status, next_attempt_at, created_at);
CREATE INDEX idx_observation_jobs_team_project ON observation_generation_jobs(team_id, project_id, status, created_at);
CREATE INDEX idx_observation_jobs_event ON observation_generation_jobs(agent_event_id);
CREATE INDEX idx_observation_jobs_source ON observation_generation_jobs(source_type, source_id);
CREATE INDEX idx_observation_job_events_job_created ON observation_generation_job_events(generation_job_id, created_at);
CREATE INDEX idx_audit_log_scope_created ON audit_log(project_id, team_id, created_at);
```
- Define event/outbox relationships:
- `agent_events` is the canonical Postgres table for raw ingested agent events and their project/session/team ownership;
- every project is owned by exactly one team through `projects.team_id`; Server beta has no unowned/default project mode in the Postgres canonical store;
- repositories and routes must resolve project ownership from `projects.team_id`, require the caller's team/API-key scope to match it, and reject any request body or repository write where `team_id` disagrees with the project's owner;
- project-scoped rows that carry both `project_id` and `team_id` must use FK-backed ownership validation through `FOREIGN KEY (project_id, team_id) REFERENCES projects(id, team_id)`;
- `observation_generation_jobs.source_type` and `observation_generation_jobs.source_id` identify the durable source of work for event, summary, and reindex jobs without overloading event-only columns;
- event generation jobs use `source_type = 'agent_event'`, `source_id = agent_event_id`, and a non-null `agent_event_id` FK to the source `agent_events` row being processed;
- session summary jobs use `source_type = 'session_summary'`, `source_id = server_session_id`, and `agent_event_id = NULL`;
- reindex jobs use `source_type = 'observation_reindex'`, `source_id` set to the target observation ID or deterministic reindex scope ID, and `agent_event_id = NULL`;
- repositories must validate non-event `source_id` ownership before job insert: session summary jobs must load the `server_sessions` row under the same `project_id`/`team_id`, and observation reindex jobs must load the target observation or documented reindex scope under the same `project_id`/`team_id`;
- `observation_generation_job_events` records durable lifecycle/outbox events for each observation generation job, including enqueue, processing, retry, completion, and failure state changes;
- `observation_generation_job_events` may reference `agent_events` through its job relationship, but it is not a replacement for `agent_events` and must not store raw event payloads as the canonical event record.
- Define outbox status and idempotency rules:
- `observation_generation_jobs.status` is constrained to `queued`, `processing`, `completed`, `failed`, or `cancelled`;
- legal lifecycle is `queued -> processing -> completed`, `queued -> processing -> failed`, `queued -> cancelled`, and retry transitions from stale/failed retryable work back to `queued` only when `attempts < max_attempts`;
- `attempts` increments only when a worker transitions a job to `processing`;
- `next_attempt_at` gates retry/reconciliation eligibility;
- `locked_at` and `locked_by` are set while a worker owns processing and are cleared or superseded on completion, failure, cancellation, or stale-lock recovery;
- `completed_at`, `failed_at`, and `cancelled_at` are terminal timestamps and exactly one may be non-null for terminal jobs;
- `agent_events.source_event_id` is optional adapter metadata only and must not be used as the sole idempotency authority;
- `agent_events.idempotency_key` is required and deterministic: when `source_event_id` is present, derive it from `team_id`, `project_id`, `source_adapter`, and `source_event_id`; when omitted, derive it from `team_id`, `project_id`, `source_adapter`, `server_session_id`, `event_type`, `occurred_at`, and a canonical JSON hash of `payload`;
- `UNIQUE (idempotency_key)` on `agent_events` suppresses duplicate ingestion for native event IDs, batch imports, and clients with omitted source event IDs;
- job `idempotency_key` must be deterministic from `team_id`, `project_id`, `source_type`, `source_id`, and `job_type`, and `UNIQUE (idempotency_key)` suppresses duplicate outbox rows;
- `UNIQUE (team_id, project_id, source_type, source_id, job_type)` guarantees one source/job relationship per generation kind within the owning project/team scope across event, summary, and reindex jobs;
- `bullmq_job_id` must be deterministic and unique when present so reconciliation can safely re-add or replace terminal BullMQ jobs;
- `observations.generation_key` is nullable for direct/manual observations and required for provider/generated observations;
- provider-generated `generation_key` must be deterministic as `generation:v1:{generation_job_id}:{parsed_observation_index}:{canonical_content_fingerprint}` where the content fingerprint is computed after parser normalization and before persistence;
- `UNIQUE (team_id, project_id, generation_key)` on `observations` is the primary retry idempotency guard within the owning project/team scope: retrying the same job and parsed observation must upsert/reload the existing observation instead of creating a new row;
- `observations.created_by_job_id` is a nullable foreign key to `observation_generation_jobs(id)`; provider-generated observations must set it to the durable Postgres generation job that created the observation;
- `observation_sources.generation_job_id` is a nullable foreign key to `observation_generation_jobs(id)`; generated observation source rows must set it when the observation came from a generation job;
- `observation_sources.source_type` and `observation_sources.source_id` mirror the job source model so generated observations can link to events, session summaries, reindex scopes, or manual/direct sources without ambiguous nullable uniqueness;
- `UNIQUE (observation_id, source_type, source_id)` guarantees a source cannot be linked to the same observation more than once;
- generated observation writes must also be idempotent through `observation_sources`: the same `source_type`, `source_id`, `generation_job_id`, and `observation_id` relationship must not be inserted twice;
- mutation APIs that touch observation sources, generation job status, or generation job lifecycle events must require `project_id` and `team_id` and include them in the mutating SQL predicate before changing rows;
- `ObservationRepository.search(...)` must use the generated `observations.content_search` `tsvector`, the GIN index on `content_search`, and `websearch_to_tsquery('english', query)` for scoped full-text search;
- provider retries must reload the Postgres job row and the authoritative source row before side effects; for event jobs that source row is `agent_events`, for summary jobs it is `server_sessions`, and for reindex jobs it is the target observation or documented reindex scope. BullMQ payload data is advisory execution data, not authority.
- Define repository interfaces and Postgres implementations:
- `ProjectRepository`;
- `TeamRepository`;
- `ObservationRepository`;
- `ObservationSourcesRepository`;
- `ObservationGenerationJobRepository`;
- `ObservationGenerationJobEventsRepository` for durable lifecycle/outbox events such as queued, enqueued, processing, retry scheduled, completed, failed, and cancelled;
- `AgentEventsRepository` backed by the Server beta Postgres connection.
- Keep legacy names as adapters only:
- existing `memory_items` data can be migrated or viewed as observations;
- existing `MemoryItemsRepository` remains a current-code compatibility reference, not the Server beta repository contract.
- Add test helpers that skip Postgres-backed integration tests when no test Postgres URL is configured.
### Documentation References
- Copy current repository behavior and field validation from existing storage code, but implement the canonical Server beta storage in Postgres.
- Copy compatible field constraints from `src/core/schemas/memory-item.ts` only to preserve legacy import/alias behavior; new Server beta schemas should be named around observations.
- Copy migration idempotency patterns from existing storage bootstrap code where applicable.
- Use prior SQLite storage decisions as superseded context only where they conflict with Postgres as canonical Server beta storage.
### Verification Checklist
- Unit tests for repository interfaces using fake adapters where useful.
- Postgres integration tests for:
- migration/bootstrap idempotency;
- `ProjectRepository.create(...)` requires a valid `team_id`, lookup returns the owning team, and project-scoped repository writes reject mismatched `team_id`/`project_id` pairs;
- `ObservationRepository.create(...)` and lookup by project/session/team;
- `ObservationRepository.search(...)` uses the generated `content_search` column with the GIN-backed `websearch_to_tsquery` path and returns only rows for the requested project/team scope;
- `ObservationSourcesRepository.addSource(...)` idempotency;
- `ObservationSourcesRepository.addSource(...)` requires project/team scope and rejects wrong-scope observation/source/job relationships without inserting rows;
- `AgentEventsRepository.create(...)`, batch insert/reload, lookup by project/session/team, deterministic `idempotency_key` generation when `source_event_id` is present, and deterministic `idempotency_key` fallback when `source_event_id` is omitted;
- ingesting the same event twice with omitted source event IDs must not create duplicate `agent_events` rows and must not duplicate generation jobs;
- `ObservationGenerationJobRepository` create/status transition/reload and duplicate-job suppression for event, session summary, and reindex jobs using deterministic `source_type`, `source_id`, and `idempotency_key`;
- `ObservationGenerationJobRepository.transitionStatus(...)` requires project/team scope in both the conditional update and fallback reload and must not mutate rows when called with the wrong scope;
- generated observation retry idempotency through `observations.generation_key`, including retrying the same job and parsed observation index/content without creating a duplicate observation;
- `ObservationGenerationJobEventsRepository` lifecycle append/list tests and outbox event linking through `observation_generation_job_events`;
- `ObservationGenerationJobEventsRepository.append(...)` requires project/team scope and appends only when the referenced job belongs to that project/team.
- Integration tests skip cleanly with an explicit skip reason when no Postgres test URL is configured.
- `rg -n "MemoryItemsRepository" src/server`
- new Server beta implementation source must not use legacy repository contracts except in explicit compatibility adapters.
### Anti-Pattern Guards
- Do not make SQLite the canonical Server beta observation store.
- Do not add new Server beta tables named `memory_items` or new repositories named `MemoryItemsRepository`.
- Do not let BullMQ or Redis/Valkey be the source of truth for observations or outbox history.
- Do not hide missing Postgres by silently falling back to worker SQLite.
## Phase 2: Define Server Runtime Boundary
### What To Implement
- Add `src/server/runtime/ServerBetaService.ts`.
- Add `src/server/runtime/create-server-beta-service.ts`.
- Add `src/server/runtime/types.ts` for the service graph:
- Postgres connection pool;
- initialized Phase 1 storage bootstrap/migration status;
- auth mode;
- queue manager boundary as an inert interface with a disabled/no-op adapter;
- generation worker manager boundary as an inert interface with a disabled/no-op adapter;
- provider registry boundary as an inert interface with a disabled/no-op adapter;
- SSE/event broadcaster boundary as an inert interface with a disabled/no-op adapter;
- server storage repositories.
- Phase 2 creates lifecycle/runtime boundaries only. It must not implement BullMQ queue processing, provider-backed observation generation, generation workers, or SSE broadcasting; actual queue manager implementation starts in Phase 3, provider/generation implementation starts in later generation phases, and the real event broadcaster is wired only when its phase requires it.
- Route `claude-mem server start|stop|restart|status` to `ServerBetaService`, not `WorkerService`.
- Keep worker commands routed to `WorkerService`.
- Add separate runtime state files:
- `.server-beta.pid`
- `.server-beta.port`
- `.server-beta.runtime.json`
- Add `/v1/info.runtime = "server-beta"` and `/api/health.runtime = "server-beta"` in Server beta.
### Documentation References
- Copy the route-handler composition style from `src/services/server/Server.ts`.
- Copy only lifecycle primitives from `src/services/worker-service.ts`; do not copy the full worker class.
- Copy PID-file safety patterns from `src/services/infrastructure/ProcessManager.ts`.
- Use the prior parity plan section "Phase 2: Independent Server Beta Lifecycle" as the baseline, but strengthen it: independent means no `WorkerService` dependency.
### Verification Checklist
- `rg -n "WorkerService|services/worker-service|worker/http" src/server src/npx-cli/commands/server.ts src/npx-cli/commands/worker.ts`
- Server runtime source must not import or instantiate `WorkerService`.
- `npx claude-mem server status` reports server-beta state independently of worker state.
- Worker `start|stop|status` commands still work.
- Server beta can start while worker is stopped.
- Server beta can stop without touching worker.
### Anti-Pattern Guards
- Do not overload worker PID/port files.
- Do not implement Server beta by booting worker in the background.
- Do not use worker health as the server health source.
## Phase 3: BullMQ-First Server Queue
### What To Implement
- Add `src/server/jobs/types.ts`:
- `ServerGenerationJob`
- `GenerateObservationsForEventJob`
- `GenerateObservationsForEventBatchJob`
- `GenerateSessionSummaryJob`
- `ReindexObservationJob`
- every job type must carry `team_id`, `project_id`, `source_type`, `source_id`, and `generation_job_id`; event jobs additionally carry `agent_event_id`, summary jobs carry `server_session_id`, and reindex jobs carry the target observation ID or deterministic reindex scope ID.
- Add `src/server/jobs/ServerJobQueue.ts` wrapping BullMQ `Queue`, `Worker`, and `QueueEvents`.
- Add `src/server/jobs/job-id.ts` for deterministic, colon-free job IDs.
- Add `src/server/jobs/outbox.ts` using `ObservationGenerationJobRepository`:
- durable rows live in `observation_generation_jobs`;
- source identity lives in `source_type`/`source_id`; lifecycle events live in `observation_generation_job_events`;
- status fields: `queued`, `processing`, `completed`, `failed`, `cancelled`;
- attempts, last error, timestamps, project/session/team IDs.
- Make the outbox the durable source of "what should be generated"; BullMQ is the execution transport.
- Add startup reconciliation:
- enqueue outbox rows in `queued` or stale `processing`;
- do not enqueue rows for already completed jobs;
- remove or replace terminal BullMQ jobs before deterministic job ID reuse.
- Add queue health to `/v1/info`, `/api/health`, and `claude-mem server status`.
### Documentation References
- BullMQ Workers docs: use `new Worker(queueName, async job => ...)`, attach `worker.on('error', ...)`, and use worker events for completion/failure.
- BullMQ Concurrency docs: use explicit worker `concurrency`, default conservative value `1` per provider/session lane, configurable later.
- BullMQ Stalled Jobs docs: design jobs as idempotent because active jobs may be moved back to waiting.
- Existing `src/server/queue/BullMqObservationQueueEngine.ts` has tested deterministic job IDs and Redis health wiring; copy its safe ID and health patterns, not its worker-iterator compatibility shape.
### Verification Checklist
- Unit tests for:
- job ID stability;
- duplicate enqueue suppression;
- terminal job replacement;
- outbox restart reconciliation;
- failed job retained in Postgres and BullMQ;
- Redis unavailable fails Server beta startup when BullMQ is selected.
- Integration tests with a fake processor:
- start Server beta queue manager + Postgres + Valkey;
- create outbox rows directly through `ObservationGenerationJobRepository`;
- enqueue fake jobs;
- restart before fake processing completes;
- assert reconciliation resumes jobs and marks the outbox exactly once.
### Anti-Pattern Guards
- Do not treat BullMQ completed/failed state as canonical history.
- Do not require event route wiring or provider generation for this phase to pass.
- Do not allow duplicate processor side effects on retry; later observation writes must be idempotent by deterministic observation generation key and source/job ID.
- Do not use BullMQ Pro-only groups.
- Do not leave pending work only in Redis.
## Phase 4: Server-Owned Event-To-Generation-Job Pipeline
### What To Implement
- Change `POST /v1/events` and `POST /v1/events/batch` to:
1. validate auth and project/team scope;
2. insert events transactionally;
3. create server outbox generation jobs in the same transaction;
4. enqueue corresponding BullMQ jobs after commit.
- Add opt-in request control:
- default: enqueue generation asynchronously;
- `?generate=false`: store event only;
- `?wait=true`: if implemented in this phase, wait only for bounded queue acceptance or job status and return queued/accepted/job status. It must not claim observations were generated.
- Add `GET /v1/jobs/:id` for generation status.
- Keep `POST /v1/memories` only as a compatibility alias for manual/direct observation insertion. It must not call the generator.
### Documentation References
- Copy current REST validation/auth style from `src/server/routes/v1/ServerV1Routes.ts`.
- Copy atomic write approach from the existing fixed `/v1/events/batch` transaction.
- Copy JSON serde and repository behavior from current storage implementations while implementing Postgres-backed Server beta repositories.
- Copy Docker E2E style from `docker/e2e/server-beta-e2e.mjs`.
### Verification Checklist
- `POST /v1/events` returns `event` and `generationJob`.
- `POST /v1/events?generate=false` returns no generation job.
- Event insert and outbox generation-job creation are committed transactionally: no event without its required outbox/job row, and no outbox/job row without its event link.
- A successful event request enqueues the corresponding BullMQ job after commit.
- Mixed-project batch pre-validation rejects the request before any event, outbox/job, or BullMQ enqueue side effect occurs.
- `POST /v1/events?wait=true`, if implemented, returns queued/accepted/job status only; it does not return generated observation IDs or imply provider generation completed.
- Project-scoped API key cannot enqueue generation for another project.
### Anti-Pattern Guards
- Do not call worker `/api/sessions/observations`.
- Do not make `/v1/events` depend on Claude Code-specific hook payload shape.
- Do not generate observations inside the HTTP request without queueing first.
- Do not require provider generation, generated observation IDs, or generated observation duplicate checks for Phase 4 verification.
## Phase 5: Extract Provider Generation Without Worker Coupling
### What To Implement
- Add `src/server/generation/ProviderObservationGenerator.ts`.
- Add provider adapters under `src/server/generation/providers/`:
- `ClaudeObservationProvider`
- `GeminiObservationProvider`
- `OpenRouterObservationProvider`
- Extract common prompt construction and provider-call code from worker providers into reusable modules.
- Keep worker providers as compatibility wrappers that can call the shared provider adapters later.
- Add `src/server/generation/processGeneratedResponse.ts`:
- parse response with `parseAgentXml(...)`;
- map parsed observations to a new server observation create schema/repository input;
- store via `ObservationRepository`;
- link sources to event/job IDs;
- update outbox status;
- audit observation generation.
- Add `GET /v1/events/:id/observations` to inspect generated observations for an event.
- Add `observation_sources.sourceType = "agent_event"` support if not already present, or add a server-specific source table mapping event IDs to observation IDs.
- Add a stable server generation prompt:
- input: list of `AgentEvent` records plus project/session metadata;
- output: XML or structured JSON accepted by existing parser;
- include `<private>` skip behavior.
### Documentation References
- Copy parse/store behavior from `src/services/worker/agents/ResponseProcessor.ts`.
- Copy provider-specific auth and request construction from:
- `src/services/worker/ClaudeProvider.ts`
- `src/services/worker/GeminiProvider.ts`
- `src/services/worker/OpenRouterProvider.ts`
- Copy compatible field constraints from the existing legacy observation schema in `src/core/schemas/memory-item.ts`, but expose the Server beta create contract as an observation schema.
- Keep provider error classification semantics from `src/services/worker/provider-errors.ts`.
### Verification Checklist
- Unit tests using fake provider:
- valid XML yields an observation;
- skip/private response marks job completed with no observation;
- malformed response fails job or marks retryable according to policy;
- generated observation preserves project/session/source metadata.
- `POST /v1/events?wait=true` returns generated observation IDs only after Phase 5 provider generation and persistence are wired and the job finishes within timeout.
- Replaying the same event/job after restart does not duplicate generated observations.
- Provider classification tests still pass.
- Worker response processor tests still pass.
- `rg -n "services/worker/(ClaudeProvider|GeminiProvider|OpenRouterProvider|agents/ResponseProcessor)" src/server`
- must return no direct imports from Server beta generation.
### Anti-Pattern Guards
- Do not import `WorkerRef`, `ActiveSession`, or legacy worker session types into server generation.
- Do not mutate legacy `SessionStore` tables from Server beta generation.
- Do not make server provider code assume a Claude Code transcript.
## Phase 6: Server Session Semantics Independent Of Worker Sessions
### What To Implement
- Treat `server_sessions` as the canonical Server beta session model.
- Add fields needed for generation:
- `contentSessionId` or generic external session ID;
- `agentId`;
- `agentType`;
- `platformSource`;
- `generationStatus`;
- `lastGeneratedAtEpoch`.
- Add `ServerSessionRuntimeRepository` helpers:
- get active session;
- list unprocessed events;
- mark generation started/completed/failed.
- Add session-level generation policies:
- generate per event;
- batch small event bursts by short debounce window;
- generate summary on `/v1/sessions/:id/end`.
- Make this policy configurable with server settings.
### Documentation References
- Copy server session repository behavior from current storage code while implementing the Server beta session repository against Postgres.
- Copy queue idle/claim semantics from current BullMQ tests only where they serve idempotency and retry behavior.
- Copy current summary behavior from worker providers, but store summaries as observation records with kind/type `"summary"`.
### Verification Checklist
- Starting/ending a server session does not touch legacy worker session rows except through explicit migration/import code.
- Ending a session enqueues a summary generation job.
- Re-ending a session is idempotent.
- Session-scoped API keys remain project-scoped.
### Anti-Pattern Guards
- Do not require a legacy worker session ID to generate Server beta observations.
- Do not use worker `ActiveSession` as the server runtime state object.
## Phase 7: Hook Routing To Server Beta Without Worker Dependency
### What To Implement
- When installer selects Server beta, hooks should call Server beta endpoints directly:
- SessionStart -> `/v1/sessions/start` or compatibility endpoint;
- PostToolUse -> `/v1/events`;
- Stop/Summarize -> `/v1/sessions/:id/end`.
- Keep worker fallback only as fallback:
- if Server beta is selected but unhealthy, hook can fall back to worker and log a warning;
- fallback must be observable in hook output/logs.
- Add a server API-key bootstrap for local hooks:
- install creates a local hook API key scoped to local project/user;
- key is stored in local settings with correct file permissions;
- key rotation command exists.
- Keep existing hook JSON outputs unchanged.
### Documentation References
- Copy hook commands and expected outputs from `plugin/hooks/hooks.json`.
- Copy current hook HTTP call patterns from source files that generate the worker-service bundle, not from the generated bundle itself.
- Copy current installer prompt/setting pattern from `src/npx-cli/commands/install.ts`.
### Verification Checklist
- Lifecycle hook tests pass in worker mode.
- Lifecycle hook tests pass in server-beta mode.
- Server-beta mode with server down falls back to worker and logs one warning.
- Server-beta mode with server healthy does not start worker.
- Generated observation appears after a PostToolUse hook using only Server beta.
### Anti-Pattern Guards
- Do not route Server beta hooks through worker `/api/sessions/observations`.
- Do not silently start worker when Server beta is healthy.
- Do not store hook API keys in generated bundles.
## Phase 8: MCP Uses Server Runtime Directly
### What To Implement
- Add MCP tools backed by Server beta APIs/core logic:
- `observation_add`
- `observation_record_event`
- `observation_search`
- `observation_context`
- `observation_generation_status`
- Existing `memory_*` MCP names may remain only as compatibility aliases over the observation tools.
- Existing MCP search tools may continue to work with worker, but Server beta mode must not require worker.
- MCP write tools should create events or direct observations through the same service methods as REST.
### Documentation References
- Copy current MCP tool schema style from `src/servers/mcp-server.ts`.
- Copy new REST schemas from `src/core/schemas/*`.
- Copy auth mode rules from Server beta API-key middleware.
### Verification Checklist
- MCP client can record an event and retrieve generated context without worker running.
- MCP client can search generated observations.
- Existing MCP search tests remain green.
### Anti-Pattern Guards
- Do not duplicate generation logic in MCP tools.
- Do not import `WorkerService` into MCP server mode.
## Phase 9: Compatibility Without Coupling
### What To Implement
- Keep compatibility routes only as adapters:
- `/api/sessions/observations` -> convert legacy payload to `AgentEvent` -> enqueue Server beta generation job.
- `/api/sessions/summarize` -> convert legacy payload to session-end/summary job.
- legacy data/search routes -> read from Server beta repositories or explicit migration views.
- Compatibility adapters may live in `src/server/compat/*`.
- They must call Server beta services, not worker route classes.
- Add a parity map documenting each legacy route:
- native server implementation;
- adapter implementation;
- intentionally unsupported in Server beta.
### Documentation References
- Copy payload normalization from `src/services/worker/http/shared.ts`.
- Copy Claude Code mapper style from `src/adapters/claude-code/mapper.ts`.
- Copy route response snapshots from existing worker route tests.
### Verification Checklist
- `rg -n "services/worker/http/routes|WorkerService" src/server/compat src/server/runtime`
- must return no imports.
- Legacy PostToolUse route on Server beta creates an event and generation job.
- Viewer compatibility routes do not require worker.
### Anti-Pattern Guards
- Do not copy worker route classes wholesale into Server beta.
- Do not let compatibility adapters become the canonical Server API.
## Phase 10: Docker And Deployable Runtime
### What To Implement
- Docker image starts Server beta only:
- no worker process;
- no worker PID;
- no worker health dependency.
- Compose stack includes:
- Server beta container;
- Postgres container for canonical observation/job/session storage;
- Valkey container for BullMQ.
- Add env validation:
- `CLAUDE_MEM_RUNTIME=server-beta`
- `CLAUDE_MEM_QUEUE_ENGINE=bullmq`
- Postgres URL required.
- Redis/Valkey URL required.
- API-key auth required by default.
- Add optional separate generation worker process mode:
- `claude-mem server worker start`
- same codebase, separate process, same BullMQ queues.
### Documentation References
- Copy current Docker E2E style from `scripts/e2e-server-beta-docker.sh`.
- Copy current Docker image layout from `docker/claude-mem/Dockerfile`.
- Copy Valkey settings from `plans/2026-05-06-redis-dependency-strategy.md`.
### Verification Checklist
- Docker E2E starts no worker.
- `docker compose ps` shows server + Postgres + Valkey.
- `/v1/events?wait=true` creates generated observations.
- Restart server mid-job and verify retry/idempotency.
- Revoke API key and verify write/search denial.
### Anti-Pattern Guards
- Do not install or spawn worker in the Server beta container.
- Do not use local-dev auth in Docker.
- Do not use a process-local queue in Docker.
## Phase 11: Team-Aware Generation
### What To Implement
- Ensure every generation job carries:
- `team_id`;
- `project_id`;
- actor/API-key ID;
- source adapter.
- Enforce scopes before event insert and before job execution.
- Store generated observations with team/project metadata.
- Audit:
- event received;
- job queued;
- provider generation started;
- observation generated;
- observation served.
- Add team-level queue status endpoint:
- `/v1/teams/:id/jobs`
- `/v1/projects/:id/jobs`
### Documentation References
- Copy API-key/team storage patterns from `src/storage/sqlite/teams.ts` and `src/storage/sqlite/auth.ts`.
- Copy project-scoping guards from `src/server/routes/v1/ServerV1Routes.ts`.
- Copy audit repository style from current server storage.
### Verification Checklist
- Team-scoped key cannot read/write/generate outside team projects.
- Project-scoped key cannot enqueue generation for another project.
- Generated observation includes correct team/project IDs.
- Audit records include generation job IDs.
### Anti-Pattern Guards
- Do not let BullMQ job data become an auth bypass.
- Do not trust job payload project/team IDs without reloading the outbox row from Postgres.
## Phase 12: Observability And Operations
### What To Implement
- Add `claude-mem server jobs status`.
- Add `claude-mem server jobs retry <id>`.
- Add `claude-mem server jobs cancel <id>`.
- Add `claude-mem server jobs failed`.
- Add queue metrics:
- waiting;
- active;
- completed;
- failed;
- delayed;
- stalled event count.
- Add logs with request ID/job ID correlation.
- Add `/v1/jobs` list endpoint.
### Documentation References
- BullMQ Workers docs for worker `completed`, `failed`, `progress`, and `error` events.
- BullMQ Stalled Jobs docs for stalled event behavior and rare-stall assumption.
- Existing `src/services/worker/http/routes/LogsRoutes.ts` for log tailing style.
### Verification Checklist
- Failed provider response appears in `server jobs failed`.
- Retry moves job back to queued and generates an observation once.
- Cancel prevents later generation.
- Stalled events are logged with job ID.
### Anti-Pattern Guards
- Do not expose full sensitive event payloads in queue status by default.
- Do not retry non-idempotently.
## Phase 13: Final Verification Gate
Phase 13 is not an implementation phase and does not need the implementation-phase template. It is the final release gate for proving the independently implemented Server beta runtime is complete, durable, and still compatible with the legacy worker runtime.
### Required Automated Tests
- Unit:
- provider generation parser;
- event-to-job transaction;
- job ID/idempotency;
- team/project auth on generation;
- compatibility route adapters.
- Integration:
- Server beta starts without worker;
- `/v1/events` generates observations;
- hook PostToolUse generates observations through Server beta;
- MCP event write generates observations through Server beta;
- restart during active generation retries safely.
- Docker:
- Server beta + Postgres + Valkey;
- API-key auth;
- event generation;
- restart persistence;
- revoked-key denial;
- no worker process.
### Required Greps
```bash
rg -n "new WorkerService|services/worker-service|services/worker/http/routes" src/server
rg -n "PendingMessageStore|SessionQueueProcessor" src/server
rg -n "CLAUDE_MEM_AUTH_MODE=local-dev|ALLOW_LOCAL_DEV_BYPASS" docker docs/server.md
rg -n "POST /v1/events|generationJob|wait=true" docs README.md
```
Expected:
- First two greps return no Server beta runtime imports.
- Docker docs do not recommend local-dev auth.
- Docs mention event generation semantics.
### Manual Verification
1. Start worker, confirm existing worker flow still works.
2. Stop worker.
3. Start Server beta with Valkey.
4. Submit a generic REST event.
5. Confirm observations appear without worker running.
6. Submit a Claude Code PostToolUse payload through Server beta hook routing.
7. Confirm observations appear without worker running.
8. Restart Server beta during a provider call.
9. Confirm the job retries and generates once.
### Exit Criteria
Server beta is independent when all are true:
- Server beta can generate observations while worker is stopped.
- Docker Server beta image does not spawn worker.
- `/v1/events` can enqueue and generate observations.
- Hook routing to Server beta generates observations when healthy.
- BullMQ queue state survives restart and retries safely.
- Postgres server storage is the source of truth for observations and generation job history.
- The worker remains available as a separate stable runtime.

View File

@@ -0,0 +1,186 @@
// SPDX-License-Identifier: Apache-2.0
import type { JsonObject, JsonValue, PostgresQueryable } from './utils.js';
import {
assertProjectOwnership,
assertSessionOwnership,
canonicalJson,
deterministicKey,
newId,
queryOne,
toEpoch,
toJsonObject
} from './utils.js';
export interface PostgresAgentEvent {
id: string;
projectId: string;
teamId: string;
serverSessionId: string | null;
sourceAdapter: string;
sourceEventId: string | null;
idempotencyKey: string;
eventType: string;
payload: JsonValue;
metadata: JsonObject;
occurredAtEpoch: number;
receivedAtEpoch: number;
createdAtEpoch: number;
}
export interface CreatePostgresAgentEventInput {
id?: string;
projectId: string;
teamId: string;
serverSessionId?: string | null;
sourceAdapter: string;
sourceEventId?: string | null;
eventType: string;
payload?: JsonValue;
metadata?: JsonObject;
occurredAt: Date | string | number;
}
interface AgentEventRow {
id: string;
project_id: string;
team_id: string;
server_session_id: string | null;
source_adapter: string;
source_event_id: string | null;
idempotency_key: string;
event_type: string;
payload: unknown;
metadata: unknown;
occurred_at: Date;
received_at: Date;
created_at: Date;
}
export class PostgresAgentEventsRepository {
constructor(private client: PostgresQueryable) {}
async create(input: CreatePostgresAgentEventInput): Promise<PostgresAgentEvent> {
await assertProjectOwnership(this.client, input.projectId, input.teamId);
if (input.serverSessionId) {
await assertSessionOwnership(this.client, input.serverSessionId, input.projectId, input.teamId);
}
const idempotencyKey = buildAgentEventIdempotencyKey(input);
const row = await queryOne<AgentEventRow>(
this.client,
`
INSERT INTO agent_events (
id, project_id, team_id, server_session_id, source_adapter,
source_event_id, idempotency_key, event_type, payload, metadata, occurred_at
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::jsonb, $10::jsonb, $11)
ON CONFLICT (idempotency_key) DO UPDATE SET
metadata = agent_events.metadata || excluded.metadata
RETURNING *
`,
[
input.id ?? newId(),
input.projectId,
input.teamId,
input.serverSessionId ?? null,
input.sourceAdapter,
input.sourceEventId ?? null,
idempotencyKey,
input.eventType,
JSON.stringify(input.payload ?? {}),
JSON.stringify(input.metadata ?? {}),
new Date(input.occurredAt)
]
);
return mapAgentEventRow(row!);
}
async createMany(inputs: CreatePostgresAgentEventInput[]): Promise<PostgresAgentEvent[]> {
const events: PostgresAgentEvent[] = [];
for (const input of inputs) {
events.push(await this.create(input));
}
return events;
}
async getByIdForScope(input: {
id: string;
projectId: string;
teamId: string;
}): Promise<PostgresAgentEvent | null> {
const row = await queryOne<AgentEventRow>(
this.client,
'SELECT * FROM agent_events WHERE id = $1 AND project_id = $2 AND team_id = $3',
[input.id, input.projectId, input.teamId]
);
return row ? mapAgentEventRow(row) : null;
}
async listByProject(input: {
projectId: string;
teamId: string;
serverSessionId?: string | null;
limit?: number;
}): Promise<PostgresAgentEvent[]> {
const result = await this.client.query<AgentEventRow>(
`
SELECT * FROM agent_events
WHERE project_id = $1
AND team_id = $2
AND ($3::text IS NULL OR server_session_id = $3)
ORDER BY occurred_at DESC
LIMIT $4
`,
[input.projectId, input.teamId, input.serverSessionId ?? null, input.limit ?? 100]
);
return result.rows.map(mapAgentEventRow);
}
}
export function buildAgentEventIdempotencyKey(input: {
teamId: string;
projectId: string;
sourceAdapter: string;
sourceEventId?: string | null;
serverSessionId?: string | null;
eventType: string;
occurredAt: Date | string | number;
payload?: JsonValue;
}): string {
if (input.sourceEventId) {
return `agent_event:v1:${deterministicKey([
input.teamId,
input.projectId,
input.sourceAdapter,
input.sourceEventId
])}`;
}
return `agent_event:v1:${deterministicKey([
input.teamId,
input.projectId,
input.sourceAdapter,
input.serverSessionId ?? null,
input.eventType,
new Date(input.occurredAt).toISOString(),
canonicalJson(input.payload ?? {})
])}`;
}
function mapAgentEventRow(row: AgentEventRow): PostgresAgentEvent {
return {
id: row.id,
projectId: row.project_id,
teamId: row.team_id,
serverSessionId: row.server_session_id,
sourceAdapter: row.source_adapter,
sourceEventId: row.source_event_id,
idempotencyKey: row.idempotency_key,
eventType: row.event_type,
payload: row.payload,
metadata: toJsonObject(row.metadata),
occurredAtEpoch: toEpoch(row.occurred_at),
receivedAtEpoch: toEpoch(row.received_at),
createdAtEpoch: toEpoch(row.created_at)
};
}

View File

@@ -0,0 +1,168 @@
// SPDX-License-Identifier: Apache-2.0
import type { JsonObject, PostgresQueryable } from './utils.js';
import { assertProjectOwnership, newId, queryOne, toDate, toEpoch, toJsonArray, toJsonObject } from './utils.js';
export interface PostgresApiKey {
id: string;
keyHash: string;
teamId: string | null;
projectId: string | null;
actorId: string;
scopes: unknown[];
revokedAtEpoch: number | null;
expiresAtEpoch: number | null;
createdAtEpoch: number;
updatedAtEpoch: number;
}
export interface PostgresAuditLog {
id: string;
teamId: string | null;
projectId: string | null;
actorId: string | null;
apiKeyId: string | null;
action: string;
resourceType: string;
resourceId: string | null;
details: JsonObject;
createdAtEpoch: number;
}
interface ApiKeyRow {
id: string;
key_hash: string;
team_id: string | null;
project_id: string | null;
actor_id: string;
scopes: unknown;
revoked_at: Date | null;
expires_at: Date | null;
created_at: Date;
updated_at: Date;
}
interface AuditLogRow {
id: string;
team_id: string | null;
project_id: string | null;
actor_id: string | null;
api_key_id: string | null;
action: string;
resource_type: string;
resource_id: string | null;
details: unknown;
created_at: Date;
}
export class PostgresAuthRepository {
constructor(private client: PostgresQueryable) {}
async createApiKey(input: {
id?: string;
keyHash: string;
teamId?: string | null;
projectId?: string | null;
actorId: string;
scopes?: unknown[];
expiresAt?: Date | null;
}): Promise<PostgresApiKey> {
if (input.projectId && input.teamId) {
await assertProjectOwnership(this.client, input.projectId, input.teamId);
}
const id = input.id ?? newId();
const row = await queryOne<ApiKeyRow>(
this.client,
`
INSERT INTO api_keys (id, key_hash, team_id, project_id, actor_id, scopes, expires_at)
VALUES ($1, $2, $3, $4, $5, $6::jsonb, $7)
RETURNING *
`,
[
id,
input.keyHash,
input.teamId ?? null,
input.projectId ?? null,
input.actorId,
JSON.stringify(input.scopes ?? []),
input.expiresAt ?? null
]
);
return mapApiKeyRow(row!);
}
async createAuditLog(input: {
id?: string;
teamId?: string | null;
projectId?: string | null;
actorId?: string | null;
apiKeyId?: string | null;
action: string;
resourceType: string;
resourceId?: string | null;
details?: JsonObject;
}): Promise<PostgresAuditLog> {
if (input.projectId && input.teamId) {
await assertProjectOwnership(this.client, input.projectId, input.teamId);
}
const id = input.id ?? newId();
const row = await queryOne<AuditLogRow>(
this.client,
`
INSERT INTO audit_log (
id, team_id, project_id, actor_id, api_key_id, action,
resource_type, resource_id, details
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::jsonb)
RETURNING *
`,
[
id,
input.teamId ?? null,
input.projectId ?? null,
input.actorId ?? null,
input.apiKeyId ?? null,
input.action,
input.resourceType,
input.resourceId ?? null,
JSON.stringify(input.details ?? {})
]
);
return mapAuditLogRow(row!);
}
async getApiKeyByHash(keyHash: string): Promise<PostgresApiKey | null> {
const row = await queryOne<ApiKeyRow>(this.client, 'SELECT * FROM api_keys WHERE key_hash = $1', [keyHash]);
return row ? mapApiKeyRow(row) : null;
}
}
function mapApiKeyRow(row: ApiKeyRow): PostgresApiKey {
return {
id: row.id,
keyHash: row.key_hash,
teamId: row.team_id,
projectId: row.project_id,
actorId: row.actor_id,
scopes: toJsonArray(row.scopes),
revokedAtEpoch: toDate(row.revoked_at)?.getTime() ?? null,
expiresAtEpoch: toDate(row.expires_at)?.getTime() ?? null,
createdAtEpoch: toEpoch(row.created_at),
updatedAtEpoch: toEpoch(row.updated_at)
};
}
function mapAuditLogRow(row: AuditLogRow): PostgresAuditLog {
return {
id: row.id,
teamId: row.team_id,
projectId: row.project_id,
actorId: row.actor_id,
apiKeyId: row.api_key_id,
action: row.action,
resourceType: row.resource_type,
resourceId: row.resource_id,
details: toJsonObject(row.details),
createdAtEpoch: toEpoch(row.created_at)
};
}

View File

@@ -0,0 +1,72 @@
// SPDX-License-Identifier: Apache-2.0
export interface PostgresConfig {
connectionString: string;
max: number;
idleTimeoutMillis: number;
connectionTimeoutMillis: number;
statementTimeoutMillis: number;
ssl: boolean | { rejectUnauthorized: boolean };
}
export interface ParsePostgresConfigOptions {
env?: NodeJS.ProcessEnv;
requireDatabaseUrl?: boolean;
}
const DEFAULT_POOL_MAX = 10;
const DEFAULT_IDLE_TIMEOUT_MS = 30_000;
const DEFAULT_CONNECTION_TIMEOUT_MS = 5_000;
const DEFAULT_STATEMENT_TIMEOUT_MS = 30_000;
export function getPostgresDatabaseUrl(env: NodeJS.ProcessEnv = process.env): string | null {
return env.CLAUDE_MEM_SERVER_DATABASE_URL || null;
}
export function parsePostgresConfig(options: ParsePostgresConfigOptions = {}): PostgresConfig | null {
const env = options.env ?? process.env;
const connectionString = getPostgresDatabaseUrl(env);
if (!connectionString) {
if (options.requireDatabaseUrl) {
throw new Error('Postgres requires CLAUDE_MEM_SERVER_DATABASE_URL');
}
return null;
}
return {
connectionString,
max: parsePositiveInt(env.CLAUDE_MEM_POSTGRES_POOL_MAX, DEFAULT_POOL_MAX),
idleTimeoutMillis: parsePositiveInt(env.CLAUDE_MEM_POSTGRES_IDLE_TIMEOUT_MS, DEFAULT_IDLE_TIMEOUT_MS),
connectionTimeoutMillis: parsePositiveInt(env.CLAUDE_MEM_POSTGRES_CONNECTION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS),
statementTimeoutMillis: parsePositiveInt(env.CLAUDE_MEM_POSTGRES_STATEMENT_TIMEOUT_MS, DEFAULT_STATEMENT_TIMEOUT_MS),
ssl: parseSsl(connectionString, env)
};
}
function parsePositiveInt(value: string | undefined, fallback: number): number {
if (!value) {
return fallback;
}
const parsed = Number.parseInt(value, 10);
return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback;
}
function parseSsl(connectionString: string, env: NodeJS.ProcessEnv): boolean | { rejectUnauthorized: boolean } {
if (env.CLAUDE_MEM_POSTGRES_SSL === 'disable' || env.PGSSLMODE === 'disable') {
return false;
}
if (env.CLAUDE_MEM_POSTGRES_SSL === 'require' || env.PGSSLMODE === 'require') {
return { rejectUnauthorized: false };
}
try {
const url = new URL(connectionString);
if (url.searchParams.get('sslmode') === 'require') {
return { rejectUnauthorized: false };
}
} catch {
return false;
}
return false;
}

View File

@@ -0,0 +1,457 @@
// SPDX-License-Identifier: Apache-2.0
import type { JsonObject, PostgresQueryable } from './utils.js';
import {
assertProjectOwnership,
assertSessionOwnership,
deterministicKey,
newId,
queryOne,
toDate,
toEpoch,
toJsonObject
} from './utils.js';
export type ObservationGenerationJobSourceType = 'agent_event' | 'session_summary' | 'observation_reindex';
export type ObservationGenerationJobStatus = 'queued' | 'processing' | 'completed' | 'failed' | 'cancelled';
export type ObservationGenerationJobEventType =
| 'queued'
| 'enqueued'
| 'processing'
| 'retry_scheduled'
| 'completed'
| 'failed'
| 'cancelled';
export interface PostgresObservationGenerationJob {
id: string;
projectId: string;
teamId: string;
agentEventId: string | null;
sourceType: ObservationGenerationJobSourceType;
sourceId: string;
serverSessionId: string | null;
jobType: string;
status: ObservationGenerationJobStatus;
idempotencyKey: string;
bullmqJobId: string | null;
attempts: number;
maxAttempts: number;
nextAttemptAtEpoch: number | null;
lockedAtEpoch: number | null;
lockedBy: string | null;
completedAtEpoch: number | null;
failedAtEpoch: number | null;
cancelledAtEpoch: number | null;
lastError: JsonObject | null;
payload: JsonObject;
createdAtEpoch: number;
updatedAtEpoch: number;
}
export interface PostgresObservationGenerationJobEvent {
id: string;
generationJobId: string;
eventType: ObservationGenerationJobEventType;
statusAfter: ObservationGenerationJobStatus;
attempt: number;
details: JsonObject;
createdAtEpoch: number;
}
interface JobRow {
id: string;
project_id: string;
team_id: string;
agent_event_id: string | null;
source_type: ObservationGenerationJobSourceType;
source_id: string;
server_session_id: string | null;
job_type: string;
status: ObservationGenerationJobStatus;
idempotency_key: string;
bullmq_job_id: string | null;
attempts: number;
max_attempts: number;
next_attempt_at: Date | null;
locked_at: Date | null;
locked_by: string | null;
completed_at: Date | null;
failed_at: Date | null;
cancelled_at: Date | null;
last_error: unknown | null;
payload: unknown;
created_at: Date;
updated_at: Date;
}
interface JobEventRow {
id: string;
generation_job_id: string;
event_type: ObservationGenerationJobEventType;
status_after: ObservationGenerationJobStatus;
attempt: number;
details: unknown;
created_at: Date;
}
export class PostgresObservationGenerationJobRepository {
constructor(private client: PostgresQueryable) {}
async create(input: {
id?: string;
projectId: string;
teamId: string;
sourceType: ObservationGenerationJobSourceType;
sourceId: string;
agentEventId?: string | null;
serverSessionId?: string | null;
jobType: string;
status?: ObservationGenerationJobStatus;
bullmqJobId?: string | null;
maxAttempts?: number;
payload?: JsonObject;
}): Promise<PostgresObservationGenerationJob> {
await this.validateSource(input);
const sourceModel = normalizeSourceModel(input);
const idempotencyKey = buildObservationGenerationJobIdempotencyKey(input);
const row = await queryOne<JobRow>(
this.client,
`
INSERT INTO observation_generation_jobs (
id, project_id, team_id, agent_event_id, source_type, source_id,
server_session_id, job_type, status, idempotency_key, bullmq_job_id,
max_attempts, payload
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13::jsonb)
ON CONFLICT (idempotency_key) DO UPDATE SET
payload = observation_generation_jobs.payload || excluded.payload,
updated_at = now()
RETURNING *
`,
[
input.id ?? newId(),
input.projectId,
input.teamId,
sourceModel.agentEventId,
input.sourceType,
input.sourceId,
sourceModel.serverSessionId,
input.jobType,
input.status ?? 'queued',
idempotencyKey,
input.bullmqJobId ?? null,
input.maxAttempts ?? 3,
JSON.stringify(input.payload ?? {})
]
);
return mapJobRow(row!);
}
async getByIdForScope(input: {
id: string;
projectId: string;
teamId: string;
}): Promise<PostgresObservationGenerationJob | null> {
const row = await queryOne<JobRow>(
this.client,
'SELECT * FROM observation_generation_jobs WHERE id = $1 AND project_id = $2 AND team_id = $3',
[input.id, input.projectId, input.teamId]
);
return row ? mapJobRow(row) : null;
}
async transitionStatus(input: {
id: string;
projectId: string;
teamId: string;
status: ObservationGenerationJobStatus;
lockedBy?: string | null;
lastError?: JsonObject | null;
nextAttemptAt?: Date | null;
}): Promise<PostgresObservationGenerationJob | null> {
const row = await queryOne<JobRow>(
this.client,
`
UPDATE observation_generation_jobs
SET
status = $2,
attempts = CASE WHEN $2 = 'processing' THEN attempts + 1 ELSE attempts END,
locked_at = CASE WHEN $2 = 'processing' THEN now() ELSE NULL END,
locked_by = CASE WHEN $2 = 'processing' THEN $3 ELSE NULL END,
next_attempt_at = CASE WHEN $2 = 'queued' THEN $4::timestamptz ELSE NULL::timestamptz END,
completed_at = CASE WHEN $2 = 'completed' THEN now() ELSE NULL END,
failed_at = CASE WHEN $2 = 'failed' THEN now() ELSE NULL END,
cancelled_at = CASE WHEN $2 = 'cancelled' THEN now() ELSE NULL END,
last_error = $5::jsonb,
updated_at = now()
WHERE id = $1
AND project_id = $6
AND team_id = $7
AND (
(status = 'queued' AND $2 IN ('processing', 'failed', 'cancelled'))
OR
(status = 'processing' AND $2 IN ('queued', 'completed', 'failed', 'cancelled'))
)
AND ($2 <> 'processing' OR attempts < max_attempts)
AND ($2 <> 'queued' OR attempts < max_attempts)
RETURNING *
`,
[
input.id,
input.status,
input.lockedBy ?? null,
input.nextAttemptAt ?? null,
input.lastError == null ? null : JSON.stringify(input.lastError),
input.projectId,
input.teamId
]
);
if (row) {
return mapJobRow(row);
}
const current = await queryOne<JobRow>(
this.client,
'SELECT * FROM observation_generation_jobs WHERE id = $1 AND project_id = $2 AND team_id = $3',
[input.id, input.projectId, input.teamId]
);
if (!current) {
return null;
}
assertValidJobStatusTransition(mapJobRow(current), input.status);
throw new Error('observation generation job status transition was not applied');
}
async listByStatusForScope(input: {
status: ObservationGenerationJobStatus;
projectId: string;
teamId: string;
limit?: number;
}): Promise<PostgresObservationGenerationJob[]> {
const result = await this.client.query<JobRow>(
`
SELECT * FROM observation_generation_jobs
WHERE status = $1 AND project_id = $2 AND team_id = $3
ORDER BY created_at ASC
LIMIT $4
`,
[input.status, input.projectId, input.teamId, input.limit ?? 100]
);
return result.rows.map(mapJobRow);
}
private async validateSource(input: {
projectId: string;
teamId: string;
sourceType: ObservationGenerationJobSourceType;
sourceId: string;
agentEventId?: string | null;
serverSessionId?: string | null;
}): Promise<void> {
await assertProjectOwnership(this.client, input.projectId, input.teamId);
if (input.sourceType === 'agent_event') {
const eventId = input.agentEventId ?? input.sourceId;
const row = await queryOne<{ id: string; server_session_id: string | null }>(
this.client,
'SELECT id, server_session_id FROM agent_events WHERE id = $1 AND project_id = $2 AND team_id = $3',
[eventId, input.projectId, input.teamId]
);
if (!row || input.sourceId !== eventId) {
throw new Error('agent_event source_id must belong to project_id and team_id');
}
if (input.serverSessionId) {
await assertSessionOwnership(this.client, input.serverSessionId, input.projectId, input.teamId);
if (row.server_session_id && row.server_session_id !== input.serverSessionId) {
throw new Error('server_session_id must match the agent_event server_session_id');
}
}
return;
}
if (input.sourceType === 'session_summary') {
const sessionId = input.serverSessionId ?? input.sourceId;
await assertSessionOwnership(this.client, sessionId, input.projectId, input.teamId);
if (input.sourceId !== sessionId) {
throw new Error('session_summary source_id must equal server_session_id');
}
return;
}
const observation = await queryOne<{ id: string }>(
this.client,
'SELECT id FROM observations WHERE id = $1 AND project_id = $2 AND team_id = $3',
[input.sourceId, input.projectId, input.teamId]
);
if (!observation) {
throw new Error('observation_reindex source_id must belong to project_id and team_id');
}
if (input.serverSessionId) {
await assertSessionOwnership(this.client, input.serverSessionId, input.projectId, input.teamId);
}
}
}
export class PostgresObservationGenerationJobEventsRepository {
constructor(private client: PostgresQueryable) {}
async append(input: {
id?: string;
generationJobId: string;
projectId: string;
teamId: string;
eventType: ObservationGenerationJobEventType;
statusAfter: ObservationGenerationJobStatus;
attempt?: number;
details?: JsonObject;
}): Promise<PostgresObservationGenerationJobEvent> {
const row = await queryOne<JobEventRow>(
this.client,
`
INSERT INTO observation_generation_job_events (
id, generation_job_id, event_type, status_after, attempt, details
)
SELECT $1, jobs.id, $4, $5, $6, $7::jsonb
FROM observation_generation_jobs jobs
WHERE jobs.id = $2
AND jobs.project_id = $3
AND jobs.team_id = $8
RETURNING observation_generation_job_events.*
`,
[
input.id ?? newId(),
input.generationJobId,
input.projectId,
input.eventType,
input.statusAfter,
input.attempt ?? 0,
JSON.stringify(input.details ?? {}),
input.teamId
]
);
if (!row) {
throw new Error('generation_job_id must belong to project_id and team_id');
}
return mapJobEventRow(row!);
}
async listByJobForScope(input: {
generationJobId: string;
projectId: string;
teamId: string;
}): Promise<PostgresObservationGenerationJobEvent[]> {
const result = await this.client.query<JobEventRow>(
`
SELECT events.*
FROM observation_generation_job_events events
INNER JOIN observation_generation_jobs jobs ON jobs.id = events.generation_job_id
WHERE events.generation_job_id = $1 AND jobs.project_id = $2 AND jobs.team_id = $3
ORDER BY events.created_at ASC
`,
[input.generationJobId, input.projectId, input.teamId]
);
return result.rows.map(mapJobEventRow);
}
}
export function buildObservationGenerationJobIdempotencyKey(input: {
teamId: string;
projectId: string;
sourceType: ObservationGenerationJobSourceType;
sourceId: string;
jobType: string;
}): string {
return `observation_generation_job:v1:${deterministicKey([
input.teamId,
input.projectId,
input.sourceType,
input.sourceId,
input.jobType
])}`;
}
function normalizeSourceModel(input: {
sourceType: ObservationGenerationJobSourceType;
sourceId: string;
agentEventId?: string | null;
serverSessionId?: string | null;
}): { agentEventId: string | null; serverSessionId: string | null } {
if (input.sourceType === 'agent_event') {
return { agentEventId: input.agentEventId ?? input.sourceId, serverSessionId: input.serverSessionId ?? null };
}
if (input.sourceType === 'session_summary') {
return { agentEventId: null, serverSessionId: input.serverSessionId ?? input.sourceId };
}
return { agentEventId: null, serverSessionId: input.serverSessionId ?? null };
}
const TERMINAL_JOB_STATUSES = new Set<ObservationGenerationJobStatus>(['completed', 'failed', 'cancelled']);
const ALLOWED_JOB_TRANSITIONS: Record<ObservationGenerationJobStatus, readonly ObservationGenerationJobStatus[]> = {
queued: ['processing', 'failed', 'cancelled'],
processing: ['queued', 'completed', 'failed', 'cancelled'],
completed: [],
failed: [],
cancelled: []
};
function assertValidJobStatusTransition(
current: PostgresObservationGenerationJob,
nextStatus: ObservationGenerationJobStatus
): void {
if (TERMINAL_JOB_STATUSES.has(current.status)) {
throw new Error(`cannot transition observation generation job from terminal status ${current.status}`);
}
if (!ALLOWED_JOB_TRANSITIONS[current.status].includes(nextStatus)) {
throw new Error(`illegal observation generation job transition from ${current.status} to ${nextStatus}`);
}
if (nextStatus === 'processing' && current.attempts >= current.maxAttempts) {
throw new Error('cannot process observation generation job after max_attempts is reached');
}
if (nextStatus === 'queued' && current.attempts >= current.maxAttempts) {
throw new Error('cannot retry observation generation job after max_attempts is reached');
}
}
function mapJobRow(row: JobRow): PostgresObservationGenerationJob {
return {
id: row.id,
projectId: row.project_id,
teamId: row.team_id,
agentEventId: row.agent_event_id,
sourceType: row.source_type,
sourceId: row.source_id,
serverSessionId: row.server_session_id,
jobType: row.job_type,
status: row.status,
idempotencyKey: row.idempotency_key,
bullmqJobId: row.bullmq_job_id,
attempts: row.attempts,
maxAttempts: row.max_attempts,
nextAttemptAtEpoch: toDate(row.next_attempt_at)?.getTime() ?? null,
lockedAtEpoch: toDate(row.locked_at)?.getTime() ?? null,
lockedBy: row.locked_by,
completedAtEpoch: toDate(row.completed_at)?.getTime() ?? null,
failedAtEpoch: toDate(row.failed_at)?.getTime() ?? null,
cancelledAtEpoch: toDate(row.cancelled_at)?.getTime() ?? null,
lastError: row.last_error == null ? null : toJsonObject(row.last_error),
payload: toJsonObject(row.payload),
createdAtEpoch: toEpoch(row.created_at),
updatedAtEpoch: toEpoch(row.updated_at)
};
}
function mapJobEventRow(row: JobEventRow): PostgresObservationGenerationJobEvent {
return {
id: row.id,
generationJobId: row.generation_job_id,
eventType: row.event_type,
statusAfter: row.status_after,
attempt: row.attempt,
details: toJsonObject(row.details),
createdAtEpoch: toEpoch(row.created_at)
};
}

View File

@@ -0,0 +1,51 @@
// SPDX-License-Identifier: Apache-2.0
import type { PostgresQueryable } from './utils.js';
import { PostgresAgentEventsRepository } from './agent-events.js';
import { PostgresAuthRepository } from './auth.js';
import {
PostgresObservationGenerationJobEventsRepository,
PostgresObservationGenerationJobRepository
} from './generation-jobs.js';
import { PostgresObservationRepository, PostgresObservationSourcesRepository } from './observations.js';
import { PostgresProjectsRepository } from './projects.js';
import { PostgresServerSessionsRepository } from './server-sessions.js';
import { PostgresTeamsRepository } from './teams.js';
export * from './agent-events.js';
export * from './auth.js';
export * from './config.js';
export * from './generation-jobs.js';
export * from './observations.js';
export * from './pool.js';
export * from './projects.js';
export * from './schema.js';
export * from './server-sessions.js';
export * from './teams.js';
export type * from './utils.js';
export interface PostgresStorageRepositories {
teams: PostgresTeamsRepository;
projects: PostgresProjectsRepository;
auth: PostgresAuthRepository;
sessions: PostgresServerSessionsRepository;
agentEvents: PostgresAgentEventsRepository;
observations: PostgresObservationRepository;
observationSources: PostgresObservationSourcesRepository;
observationGenerationJobs: PostgresObservationGenerationJobRepository;
observationGenerationJobEvents: PostgresObservationGenerationJobEventsRepository;
}
export function createPostgresStorageRepositories(client: PostgresQueryable): PostgresStorageRepositories {
return {
teams: new PostgresTeamsRepository(client),
projects: new PostgresProjectsRepository(client),
auth: new PostgresAuthRepository(client),
sessions: new PostgresServerSessionsRepository(client),
agentEvents: new PostgresAgentEventsRepository(client),
observations: new PostgresObservationRepository(client),
observationSources: new PostgresObservationSourcesRepository(client),
observationGenerationJobs: new PostgresObservationGenerationJobRepository(client),
observationGenerationJobEvents: new PostgresObservationGenerationJobEventsRepository(client)
};
}

View File

@@ -0,0 +1,395 @@
// SPDX-License-Identifier: Apache-2.0
import type { JsonObject, JsonValue, PostgresQueryable } from './utils.js';
import {
assertProjectOwnership,
assertSessionOwnership,
canonicalJson,
deterministicKey,
newId,
queryOne,
toEpoch,
toJsonObject
} from './utils.js';
export type ObservationSourceType = 'agent_event' | 'session_summary' | 'observation_reindex' | 'manual';
export interface PostgresObservation {
id: string;
projectId: string;
teamId: string;
serverSessionId: string | null;
kind: string;
content: string;
generationKey: string | null;
metadata: JsonObject;
embedding: JsonValue | null;
createdByJobId: string | null;
createdAtEpoch: number;
updatedAtEpoch: number;
}
export interface PostgresObservationSource {
id: string;
observationId: string;
agentEventId: string | null;
generationJobId: string | null;
sourceType: ObservationSourceType;
sourceId: string;
metadata: JsonObject;
createdAtEpoch: number;
}
interface ObservationRow {
id: string;
project_id: string;
team_id: string;
server_session_id: string | null;
kind: string;
content: string;
generation_key: string | null;
metadata: unknown;
embedding: unknown | null;
created_by_job_id: string | null;
created_at: Date;
updated_at: Date;
}
interface ObservationSourceRow {
id: string;
observation_id: string;
agent_event_id: string | null;
generation_job_id: string | null;
source_type: ObservationSourceType;
source_id: string;
metadata: unknown;
created_at: Date;
}
export class PostgresObservationRepository {
constructor(private client: PostgresQueryable) {}
async create(input: {
id?: string;
projectId: string;
teamId: string;
serverSessionId?: string | null;
kind?: string;
content: string;
generationKey?: string | null;
metadata?: JsonObject;
embedding?: JsonValue | null;
createdByJobId?: string | null;
}): Promise<PostgresObservation> {
await assertProjectOwnership(this.client, input.projectId, input.teamId);
if (input.serverSessionId) {
await assertSessionOwnership(this.client, input.serverSessionId, input.projectId, input.teamId);
}
if (input.createdByJobId) {
await assertJobOwnership(this.client, input.createdByJobId, input.projectId, input.teamId);
}
const row = await queryOne<ObservationRow>(
this.client,
`
INSERT INTO observations (
id, project_id, team_id, server_session_id, kind, content,
generation_key, metadata, embedding, created_by_job_id
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb, $9::jsonb, $10)
ON CONFLICT (team_id, project_id, generation_key) WHERE generation_key IS NOT NULL DO UPDATE SET
updated_at = observations.updated_at
RETURNING *
`,
[
input.id ?? newId(),
input.projectId,
input.teamId,
input.serverSessionId ?? null,
input.kind ?? 'observation',
input.content,
input.generationKey ?? null,
JSON.stringify(input.metadata ?? {}),
input.embedding == null ? null : JSON.stringify(input.embedding),
input.createdByJobId ?? null
]
);
return mapObservationRow(row!);
}
async getByIdForScope(input: {
id: string;
projectId: string;
teamId: string;
}): Promise<PostgresObservation | null> {
const row = await queryOne<ObservationRow>(
this.client,
'SELECT * FROM observations WHERE id = $1 AND project_id = $2 AND team_id = $3',
[input.id, input.projectId, input.teamId]
);
return row ? mapObservationRow(row) : null;
}
async listByProject(input: {
projectId: string;
teamId: string;
serverSessionId?: string | null;
limit?: number;
}): Promise<PostgresObservation[]> {
const result = await this.client.query<ObservationRow>(
`
SELECT * FROM observations
WHERE project_id = $1
AND team_id = $2
AND ($3::text IS NULL OR server_session_id = $3)
ORDER BY created_at DESC
LIMIT $4
`,
[input.projectId, input.teamId, input.serverSessionId ?? null, input.limit ?? 100]
);
return result.rows.map(mapObservationRow);
}
async search(input: {
projectId: string;
teamId: string;
query: string;
limit?: number;
}): Promise<PostgresObservation[]> {
const result = await this.client.query<ObservationRow>(
`
SELECT * FROM observations
WHERE project_id = $1
AND team_id = $2
AND content_search @@ websearch_to_tsquery('english', $3)
ORDER BY ts_rank(content_search, websearch_to_tsquery('english', $3)) DESC, updated_at DESC
LIMIT $4
`,
[input.projectId, input.teamId, input.query, input.limit ?? 20]
);
return result.rows.map(mapObservationRow);
}
}
export class PostgresObservationSourcesRepository {
constructor(private client: PostgresQueryable) {}
async addSource(input: {
id?: string;
observationId: string;
projectId: string;
teamId: string;
sourceType: ObservationSourceType;
sourceId: string;
agentEventId?: string | null;
generationJobId?: string | null;
metadata?: JsonObject;
}): Promise<PostgresObservationSource> {
const observation = await queryOne<{ id: string }>(
this.client,
'SELECT id FROM observations WHERE id = $1 AND project_id = $2 AND team_id = $3',
[input.observationId, input.projectId, input.teamId]
);
if (!observation) {
throw new Error('observation_id does not exist');
}
const agentEventId = input.sourceType === 'agent_event'
? input.agentEventId ?? input.sourceId
: null;
if (input.sourceType === 'agent_event') {
if (agentEventId !== input.sourceId) {
throw new Error('agent_event source_id must equal agent_event_id');
}
await assertAgentEventOwnership(this.client, input.sourceId, input.projectId, input.teamId);
} else if (input.sourceType === 'session_summary' && !input.generationJobId) {
await assertSessionOwnership(this.client, input.sourceId, input.projectId, input.teamId);
} else if (input.sourceType === 'observation_reindex' && !input.generationJobId) {
await assertObservationOwnership(this.client, input.sourceId, input.projectId, input.teamId);
}
if (input.generationJobId) {
await assertGenerationJobMatchesSource(this.client, {
generationJobId: input.generationJobId,
projectId: input.projectId,
teamId: input.teamId,
sourceType: input.sourceType,
sourceId: input.sourceId,
agentEventId
});
}
const row = await queryOne<ObservationSourceRow>(
this.client,
`
INSERT INTO observation_sources (
id, observation_id, agent_event_id, generation_job_id,
source_type, source_id, metadata
)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb)
ON CONFLICT (observation_id, source_type, source_id) DO UPDATE SET
metadata = observation_sources.metadata || excluded.metadata
RETURNING *
`,
[
input.id ?? newId(),
input.observationId,
agentEventId,
input.generationJobId ?? null,
input.sourceType,
input.sourceId,
JSON.stringify(input.metadata ?? {})
]
);
return mapObservationSourceRow(row!);
}
async listByObservationForScope(input: {
observationId: string;
projectId: string;
teamId: string;
}): Promise<PostgresObservationSource[]> {
const result = await this.client.query<ObservationSourceRow>(
`
SELECT observation_sources.*
FROM observation_sources
INNER JOIN observations
ON observations.id = observation_sources.observation_id
WHERE observation_sources.observation_id = $1
AND observations.project_id = $2
AND observations.team_id = $3
ORDER BY observation_sources.created_at ASC
`,
[input.observationId, input.projectId, input.teamId]
);
return result.rows.map(mapObservationSourceRow);
}
}
export function buildObservationGenerationKey(input: {
generationJobId: string;
parsedObservationIndex: number;
content: string;
}): string {
return `generation:v1:${input.generationJobId}:${input.parsedObservationIndex}:${deterministicKey([
canonicalJson(input.content.trim())
])}`;
}
async function assertJobOwnership(
client: PostgresQueryable,
generationJobId: string,
projectId: string,
teamId: string
): Promise<void> {
const row = await queryOne<{ id: string }>(
client,
'SELECT id FROM observation_generation_jobs WHERE id = $1 AND project_id = $2 AND team_id = $3',
[generationJobId, projectId, teamId]
);
if (!row) {
throw new Error('generation_job_id must belong to project_id and team_id');
}
}
async function assertGenerationJobMatchesSource(
client: PostgresQueryable,
input: {
generationJobId: string;
projectId: string;
teamId: string;
sourceType: ObservationSourceType;
sourceId: string;
agentEventId: string | null;
}
): Promise<void> {
if (input.sourceType === 'manual') {
throw new Error('manual observation sources cannot be linked to a generation_job_id');
}
const row = await queryOne<{
id: string;
source_type: string;
source_id: string;
agent_event_id: string | null;
}>(
client,
`
SELECT id, source_type, source_id, agent_event_id
FROM observation_generation_jobs
WHERE id = $1 AND project_id = $2 AND team_id = $3
`,
[input.generationJobId, input.projectId, input.teamId]
);
if (!row) {
throw new Error('generation_job_id must belong to project_id and team_id');
}
if (row.source_type !== input.sourceType || row.source_id !== input.sourceId) {
throw new Error('generation_job_id source model must match observation source');
}
if (input.sourceType === 'agent_event' && row.agent_event_id !== input.agentEventId) {
throw new Error('generation_job_id agent_event_id must match observation source');
}
}
async function assertAgentEventOwnership(
client: PostgresQueryable,
agentEventId: string,
projectId: string,
teamId: string
): Promise<void> {
const row = await queryOne<{ id: string }>(
client,
'SELECT id FROM agent_events WHERE id = $1 AND project_id = $2 AND team_id = $3',
[agentEventId, projectId, teamId]
);
if (!row) {
throw new Error('agent_event_id must belong to project_id and team_id');
}
}
async function assertObservationOwnership(
client: PostgresQueryable,
observationId: string,
projectId: string,
teamId: string
): Promise<void> {
const row = await queryOne<{ id: string }>(
client,
'SELECT id FROM observations WHERE id = $1 AND project_id = $2 AND team_id = $3',
[observationId, projectId, teamId]
);
if (!row) {
throw new Error('observation_reindex source_id must belong to project_id and team_id');
}
}
function mapObservationRow(row: ObservationRow): PostgresObservation {
return {
id: row.id,
projectId: row.project_id,
teamId: row.team_id,
serverSessionId: row.server_session_id,
kind: row.kind,
content: row.content,
generationKey: row.generation_key,
metadata: toJsonObject(row.metadata),
embedding: row.embedding,
createdByJobId: row.created_by_job_id,
createdAtEpoch: toEpoch(row.created_at),
updatedAtEpoch: toEpoch(row.updated_at)
};
}
function mapObservationSourceRow(row: ObservationSourceRow): PostgresObservationSource {
return {
id: row.id,
observationId: row.observation_id,
agentEventId: row.agent_event_id,
generationJobId: row.generation_job_id,
sourceType: row.source_type,
sourceId: row.source_id,
metadata: toJsonObject(row.metadata),
createdAtEpoch: toEpoch(row.created_at)
};
}

View File

@@ -0,0 +1,68 @@
// SPDX-License-Identifier: Apache-2.0
import pg, { type Pool as PgPool, type PoolClient as PgPoolClient } from 'pg';
import { parsePostgresConfig, type PostgresConfig } from './config.js';
const { Pool } = pg;
export type PostgresPool = PgPool;
export type PostgresPoolClient = PgPoolClient;
let sharedPool: PostgresPool | null = null;
export function createPostgresPool(config: PostgresConfig): PostgresPool {
return new Pool({
connectionString: config.connectionString,
max: config.max,
idleTimeoutMillis: config.idleTimeoutMillis,
connectionTimeoutMillis: config.connectionTimeoutMillis,
statement_timeout: config.statementTimeoutMillis,
ssl: config.ssl
});
}
export function getSharedPostgresPool(options: { requireDatabaseUrl?: boolean } = {}): PostgresPool {
if (sharedPool) {
return sharedPool;
}
const config = parsePostgresConfig({ requireDatabaseUrl: options.requireDatabaseUrl ?? true });
if (!config) {
throw new Error('Postgres requires CLAUDE_MEM_SERVER_DATABASE_URL');
}
sharedPool = createPostgresPool(config);
return sharedPool;
}
export async function checkPostgresHealth(pool: PostgresPool): Promise<boolean> {
try {
await pool.query('SELECT 1');
return true;
} catch {
return false;
}
}
export async function withPostgresTransaction<T>(
pool: PostgresPool,
fn: (client: PostgresPoolClient) => Promise<T>
): Promise<T> {
const client = await pool.connect();
try {
await client.query('BEGIN');
const result = await fn(client);
await client.query('COMMIT');
return result;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
export async function closePostgresPool(pool: PostgresPool): Promise<void> {
if (pool === sharedPool) {
sharedPool = null;
}
await pool.end();
}

View File

@@ -0,0 +1,65 @@
// SPDX-License-Identifier: Apache-2.0
import type { JsonObject, PostgresQueryable } from './utils.js';
import { newId, queryOne, toEpoch, toJsonObject } from './utils.js';
export interface PostgresProject {
id: string;
teamId: string;
name: string;
metadata: JsonObject;
createdAtEpoch: number;
updatedAtEpoch: number;
}
interface ProjectRow {
id: string;
team_id: string;
name: string;
metadata: unknown;
created_at: Date;
updated_at: Date;
}
export class PostgresProjectsRepository {
constructor(private client: PostgresQueryable) {}
async create(input: {
id?: string;
teamId: string;
name: string;
metadata?: JsonObject;
}): Promise<PostgresProject> {
const id = input.id ?? newId();
const row = await queryOne<ProjectRow>(
this.client,
`
INSERT INTO projects (id, team_id, name, metadata)
VALUES ($1, $2, $3, $4::jsonb)
RETURNING *
`,
[id, input.teamId, input.name, JSON.stringify(input.metadata ?? {})]
);
return mapProjectRow(row!);
}
async getByIdForTeam(id: string, teamId: string): Promise<PostgresProject | null> {
const row = await queryOne<ProjectRow>(
this.client,
'SELECT * FROM projects WHERE id = $1 AND team_id = $2',
[id, teamId]
);
return row ? mapProjectRow(row) : null;
}
}
function mapProjectRow(row: ProjectRow): PostgresProject {
return {
id: row.id,
teamId: row.team_id,
name: row.name,
metadata: toJsonObject(row.metadata),
createdAtEpoch: toEpoch(row.created_at),
updatedAtEpoch: toEpoch(row.updated_at)
};
}

View File

@@ -0,0 +1,283 @@
// SPDX-License-Identifier: Apache-2.0
import type { PostgresQueryable } from './utils.js';
export const SERVER_BETA_POSTGRES_SCHEMA_VERSION = 1;
export const SERVER_BETA_POSTGRES_TABLES = [
'server_beta_schema_migrations',
'teams',
'projects',
'team_members',
'api_keys',
'audit_log',
'server_sessions',
'agent_events',
'observation_generation_jobs',
'observations',
'observation_sources',
'observation_generation_job_events'
] as const;
export async function bootstrapServerBetaPostgresSchema(client: PostgresQueryable): Promise<void> {
if (isPostgresPool(client)) {
const poolClient = await client.connect();
try {
await bootstrapServerBetaPostgresSchema(poolClient);
} finally {
poolClient.release();
}
return;
}
await client.query('BEGIN');
try {
await client.query(PHASE_1_SCHEMA_SQL);
await client.query(
`
INSERT INTO server_beta_schema_migrations (version, description)
VALUES ($1, $2)
ON CONFLICT (version) DO NOTHING
`,
[SERVER_BETA_POSTGRES_SCHEMA_VERSION, 'phase 1 postgres observation storage foundation']
);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
}
}
interface PostgresPoolLike extends PostgresQueryable {
connect(): Promise<PostgresQueryable & { release(): void }>;
}
function isPostgresPool(client: PostgresQueryable): client is PostgresPoolLike {
const candidate = client as {
connect?: unknown;
release?: unknown;
totalCount?: unknown;
idleCount?: unknown;
waitingCount?: unknown;
};
return (
typeof candidate.connect === 'function'
&& typeof candidate.release !== 'function'
&& typeof candidate.totalCount === 'number'
&& typeof candidate.idleCount === 'number'
&& typeof candidate.waitingCount === 'number'
);
}
const PHASE_1_SCHEMA_SQL = `
CREATE TABLE IF NOT EXISTS server_beta_schema_migrations (
version INTEGER PRIMARY KEY,
description TEXT NOT NULL,
applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS teams (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS projects (
id TEXT PRIMARY KEY,
team_id TEXT NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
name TEXT NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (id, team_id)
);
CREATE TABLE IF NOT EXISTS team_members (
team_id TEXT NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
user_id TEXT NOT NULL,
role TEXT NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (team_id, user_id)
);
CREATE TABLE IF NOT EXISTS api_keys (
id TEXT PRIMARY KEY,
key_hash TEXT NOT NULL UNIQUE,
team_id TEXT REFERENCES teams(id) ON DELETE CASCADE,
project_id TEXT REFERENCES projects(id) ON DELETE CASCADE,
actor_id TEXT NOT NULL,
scopes JSONB NOT NULL DEFAULT '[]'::jsonb,
revoked_at TIMESTAMPTZ,
expires_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CHECK (project_id IS NULL OR team_id IS NOT NULL),
FOREIGN KEY (project_id, team_id) REFERENCES projects(id, team_id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS audit_log (
id TEXT PRIMARY KEY,
team_id TEXT REFERENCES teams(id) ON DELETE SET NULL,
project_id TEXT REFERENCES projects(id) ON DELETE SET NULL,
actor_id TEXT,
api_key_id TEXT REFERENCES api_keys(id) ON DELETE SET NULL,
action TEXT NOT NULL,
resource_type TEXT NOT NULL,
resource_id TEXT,
details JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CHECK (project_id IS NULL OR team_id IS NOT NULL),
FOREIGN KEY (project_id, team_id) REFERENCES projects(id, team_id) ON DELETE SET NULL
);
CREATE TABLE IF NOT EXISTS server_sessions (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
team_id TEXT NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
external_session_id TEXT,
idempotency_key TEXT,
content_session_id TEXT,
agent_id TEXT,
agent_type TEXT,
platform_source TEXT,
generation_status TEXT NOT NULL DEFAULT 'idle',
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
ended_at TIMESTAMPTZ,
last_generated_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (project_id, external_session_id),
FOREIGN KEY (project_id, team_id) REFERENCES projects(id, team_id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS agent_events (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
team_id TEXT NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
server_session_id TEXT REFERENCES server_sessions(id) ON DELETE SET NULL,
source_adapter TEXT NOT NULL,
source_event_id TEXT,
idempotency_key TEXT NOT NULL,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
occurred_at TIMESTAMPTZ NOT NULL,
received_at TIMESTAMPTZ NOT NULL DEFAULT now(),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (idempotency_key),
UNIQUE (id, project_id, team_id),
FOREIGN KEY (project_id, team_id) REFERENCES projects(id, team_id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS observation_generation_jobs (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
team_id TEXT NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
agent_event_id TEXT REFERENCES agent_events(id) ON DELETE CASCADE,
source_type TEXT NOT NULL CHECK (source_type IN ('agent_event', 'session_summary', 'observation_reindex')),
source_id TEXT NOT NULL,
server_session_id TEXT REFERENCES server_sessions(id) ON DELETE SET NULL,
job_type TEXT NOT NULL,
status TEXT NOT NULL CHECK (status IN ('queued', 'processing', 'completed', 'failed', 'cancelled')),
idempotency_key TEXT NOT NULL UNIQUE,
bullmq_job_id TEXT UNIQUE,
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 3,
next_attempt_at TIMESTAMPTZ,
locked_at TIMESTAMPTZ,
locked_by TEXT,
completed_at TIMESTAMPTZ,
failed_at TIMESTAMPTZ,
cancelled_at TIMESTAMPTZ,
last_error JSONB,
payload JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CHECK (
(source_type = 'agent_event' AND agent_event_id IS NOT NULL AND source_id = agent_event_id)
OR
(source_type = 'session_summary' AND agent_event_id IS NULL AND server_session_id IS NOT NULL AND source_id = server_session_id)
OR
(source_type = 'observation_reindex' AND agent_event_id IS NULL)
),
FOREIGN KEY (agent_event_id, project_id, team_id) REFERENCES agent_events(id, project_id, team_id) ON DELETE CASCADE,
FOREIGN KEY (project_id, team_id) REFERENCES projects(id, team_id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS observations (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
team_id TEXT NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
server_session_id TEXT REFERENCES server_sessions(id) ON DELETE SET NULL,
kind TEXT NOT NULL DEFAULT 'observation',
content TEXT NOT NULL,
content_search TSVECTOR GENERATED ALWAYS AS (to_tsvector('english', content)) STORED,
generation_key TEXT,
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
embedding JSONB,
created_by_job_id TEXT REFERENCES observation_generation_jobs(id) ON DELETE SET NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
FOREIGN KEY (project_id, team_id) REFERENCES projects(id, team_id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS observation_sources (
id TEXT PRIMARY KEY,
observation_id TEXT NOT NULL REFERENCES observations(id) ON DELETE CASCADE,
agent_event_id TEXT REFERENCES agent_events(id) ON DELETE CASCADE,
generation_job_id TEXT REFERENCES observation_generation_jobs(id) ON DELETE SET NULL,
source_type TEXT NOT NULL CHECK (source_type IN ('agent_event', 'session_summary', 'observation_reindex', 'manual')),
source_id TEXT NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (observation_id, source_type, source_id),
UNIQUE (source_type, source_id, generation_job_id, observation_id),
CHECK (
(source_type = 'agent_event' AND agent_event_id IS NOT NULL AND source_id = agent_event_id)
OR
(source_type <> 'agent_event' AND agent_event_id IS NULL)
)
);
CREATE TABLE IF NOT EXISTS observation_generation_job_events (
id TEXT PRIMARY KEY,
generation_job_id TEXT NOT NULL REFERENCES observation_generation_jobs(id) ON DELETE CASCADE,
event_type TEXT NOT NULL CHECK (event_type IN ('queued', 'enqueued', 'processing', 'retry_scheduled', 'completed', 'failed', 'cancelled')),
status_after TEXT NOT NULL CHECK (status_after IN ('queued', 'processing', 'completed', 'failed', 'cancelled')),
attempt INTEGER NOT NULL DEFAULT 0,
details JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_agent_events_project_session ON agent_events(project_id, server_session_id, occurred_at);
ALTER TABLE server_sessions ADD COLUMN IF NOT EXISTS idempotency_key TEXT;
ALTER TABLE observations ADD COLUMN IF NOT EXISTS content_search TSVECTOR GENERATED ALWAYS AS (to_tsvector('english', content)) STORED;
ALTER TABLE observations DROP CONSTRAINT IF EXISTS observations_generation_key_key;
ALTER TABLE observation_generation_jobs DROP CONSTRAINT IF EXISTS observation_generation_jobs_source_type_source_id_job_type_key;
CREATE UNIQUE INDEX IF NOT EXISTS idx_server_sessions_project_idempotency
ON server_sessions(project_id, idempotency_key)
WHERE idempotency_key IS NOT NULL;
CREATE UNIQUE INDEX IF NOT EXISTS idx_observations_generation_key_scope
ON observations(team_id, project_id, generation_key)
WHERE generation_key IS NOT NULL;
CREATE UNIQUE INDEX IF NOT EXISTS idx_observation_jobs_source_scope
ON observation_generation_jobs(team_id, project_id, source_type, source_id, job_type);
CREATE INDEX IF NOT EXISTS idx_projects_team ON projects(team_id, id);
CREATE INDEX IF NOT EXISTS idx_agent_events_team_project ON agent_events(team_id, project_id, occurred_at);
CREATE INDEX IF NOT EXISTS idx_observations_project_session ON observations(project_id, server_session_id, created_at);
CREATE INDEX IF NOT EXISTS idx_observations_team_project ON observations(team_id, project_id, created_at);
CREATE INDEX IF NOT EXISTS idx_observations_content_search ON observations USING GIN (content_search);
CREATE INDEX IF NOT EXISTS idx_observation_sources_event ON observation_sources(agent_event_id);
CREATE INDEX IF NOT EXISTS idx_observation_sources_source ON observation_sources(source_type, source_id);
CREATE INDEX IF NOT EXISTS idx_observation_jobs_status_next_attempt ON observation_generation_jobs(status, next_attempt_at, created_at);
CREATE INDEX IF NOT EXISTS idx_observation_jobs_team_project ON observation_generation_jobs(team_id, project_id, status, created_at);
CREATE INDEX IF NOT EXISTS idx_observation_jobs_event ON observation_generation_jobs(agent_event_id);
CREATE INDEX IF NOT EXISTS idx_observation_jobs_source ON observation_generation_jobs(source_type, source_id);
CREATE INDEX IF NOT EXISTS idx_observation_job_events_job_created ON observation_generation_job_events(generation_job_id, created_at);
CREATE INDEX IF NOT EXISTS idx_audit_log_scope_created ON audit_log(project_id, team_id, created_at);
`;

View File

@@ -0,0 +1,186 @@
// SPDX-License-Identifier: Apache-2.0
import type { JsonObject, PostgresQueryable } from './utils.js';
import { assertProjectOwnership, deterministicKey, newId, queryOne, toDate, toEpoch, toJsonObject } from './utils.js';
export interface PostgresServerSession {
id: string;
projectId: string;
teamId: string;
externalSessionId: string | null;
idempotencyKey: string | null;
contentSessionId: string | null;
agentId: string | null;
agentType: string | null;
platformSource: string | null;
generationStatus: string;
metadata: JsonObject;
startedAtEpoch: number;
endedAtEpoch: number | null;
lastGeneratedAtEpoch: number | null;
createdAtEpoch: number;
updatedAtEpoch: number;
}
interface ServerSessionRow {
id: string;
project_id: string;
team_id: string;
external_session_id: string | null;
idempotency_key: string | null;
content_session_id: string | null;
agent_id: string | null;
agent_type: string | null;
platform_source: string | null;
generation_status: string;
metadata: unknown;
started_at: Date;
ended_at: Date | null;
last_generated_at: Date | null;
created_at: Date;
updated_at: Date;
}
export class PostgresServerSessionsRepository {
constructor(private client: PostgresQueryable) {}
async create(input: {
id?: string;
projectId: string;
teamId: string;
externalSessionId?: string | null;
contentSessionId?: string | null;
agentId?: string | null;
agentType?: string | null;
platformSource?: string | null;
generationStatus?: string;
metadata?: JsonObject;
}): Promise<PostgresServerSession> {
await assertProjectOwnership(this.client, input.projectId, input.teamId);
const id = input.id ?? newId();
const idempotencyKey = buildServerSessionIdempotencyKey(input);
const row = await queryOne<ServerSessionRow>(
this.client,
`
INSERT INTO server_sessions (
id, project_id, team_id, external_session_id, idempotency_key, content_session_id,
agent_id, agent_type, platform_source, generation_status, metadata
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11::jsonb)
ON CONFLICT (project_id, idempotency_key) WHERE idempotency_key IS NOT NULL DO UPDATE SET
external_session_id = excluded.external_session_id,
content_session_id = excluded.content_session_id,
agent_id = excluded.agent_id,
agent_type = excluded.agent_type,
platform_source = excluded.platform_source,
generation_status = excluded.generation_status,
metadata = excluded.metadata,
updated_at = now()
RETURNING *
`,
[
id,
input.projectId,
input.teamId,
input.externalSessionId ?? null,
idempotencyKey,
input.contentSessionId ?? null,
input.agentId ?? null,
input.agentType ?? null,
input.platformSource ?? null,
input.generationStatus ?? 'idle',
JSON.stringify(input.metadata ?? {})
]
);
return mapServerSessionRow(row!);
}
async getByIdForScope(input: {
id: string;
projectId: string;
teamId: string;
}): Promise<PostgresServerSession | null> {
const row = await queryOne<ServerSessionRow>(
this.client,
'SELECT * FROM server_sessions WHERE id = $1 AND project_id = $2 AND team_id = $3',
[input.id, input.projectId, input.teamId]
);
return row ? mapServerSessionRow(row) : null;
}
async listByProject(projectId: string, teamId: string): Promise<PostgresServerSession[]> {
const result = await this.client.query<ServerSessionRow>(
`
SELECT * FROM server_sessions
WHERE project_id = $1 AND team_id = $2
ORDER BY started_at DESC
`,
[projectId, teamId]
);
return result.rows.map(mapServerSessionRow);
}
}
export function buildServerSessionIdempotencyKey(input: {
projectId: string;
teamId: string;
externalSessionId?: string | null;
contentSessionId?: string | null;
agentId?: string | null;
agentType?: string | null;
platformSource?: string | null;
}): string | null {
if (input.externalSessionId) {
return `server_session:v1:${deterministicKey([
input.teamId,
input.projectId,
'external',
input.externalSessionId
])}`;
}
if (input.contentSessionId) {
return `server_session:v1:${deterministicKey([
input.teamId,
input.projectId,
'content',
input.platformSource ?? null,
input.agentId ?? null,
input.contentSessionId
])}`;
}
if (input.agentId && input.platformSource) {
return `server_session:v1:${deterministicKey([
input.teamId,
input.projectId,
'agent',
input.platformSource,
input.agentId,
input.agentType ?? null
])}`;
}
return null;
}
function mapServerSessionRow(row: ServerSessionRow): PostgresServerSession {
return {
id: row.id,
projectId: row.project_id,
teamId: row.team_id,
externalSessionId: row.external_session_id,
idempotencyKey: row.idempotency_key,
contentSessionId: row.content_session_id,
agentId: row.agent_id,
agentType: row.agent_type,
platformSource: row.platform_source,
generationStatus: row.generation_status,
metadata: toJsonObject(row.metadata),
startedAtEpoch: toEpoch(row.started_at),
endedAtEpoch: toDate(row.ended_at)?.getTime() ?? null,
lastGeneratedAtEpoch: toDate(row.last_generated_at)?.getTime() ?? null,
createdAtEpoch: toEpoch(row.created_at),
updatedAtEpoch: toEpoch(row.updated_at)
};
}

View File

@@ -0,0 +1,127 @@
// SPDX-License-Identifier: Apache-2.0
import type { PostgresQueryable, JsonObject } from './utils.js';
import { newId, queryOne, toEpoch, toJsonObject } from './utils.js';
export type PostgresTeamRole = 'owner' | 'admin' | 'member' | 'viewer';
export interface PostgresTeam {
id: string;
name: string;
metadata: JsonObject;
createdAtEpoch: number;
updatedAtEpoch: number;
}
export interface PostgresTeamMember {
teamId: string;
userId: string;
role: PostgresTeamRole;
metadata: JsonObject;
createdAtEpoch: number;
updatedAtEpoch: number;
}
interface TeamRow {
id: string;
name: string;
metadata: unknown;
created_at: Date;
updated_at: Date;
}
interface TeamMemberRow {
team_id: string;
user_id: string;
role: PostgresTeamRole;
metadata: unknown;
created_at: Date;
updated_at: Date;
}
export class PostgresTeamsRepository {
constructor(private client: PostgresQueryable) {}
async create(input: { id?: string; name: string; metadata?: JsonObject }): Promise<PostgresTeam> {
const id = input.id ?? newId();
const row = await queryOne<TeamRow>(
this.client,
`
INSERT INTO teams (id, name, metadata)
VALUES ($1, $2, $3::jsonb)
RETURNING *
`,
[id, input.name, JSON.stringify(input.metadata ?? {})]
);
return mapTeamRow(row!);
}
async addMember(input: {
teamId: string;
userId: string;
role: PostgresTeamRole;
metadata?: JsonObject;
}): Promise<PostgresTeamMember> {
const row = await queryOne<TeamMemberRow>(
this.client,
`
INSERT INTO team_members (team_id, user_id, role, metadata)
VALUES ($1, $2, $3, $4::jsonb)
ON CONFLICT (team_id, user_id) DO UPDATE SET
role = excluded.role,
metadata = excluded.metadata,
updated_at = now()
RETURNING *
`,
[input.teamId, input.userId, input.role, JSON.stringify(input.metadata ?? {})]
);
return mapTeamMemberRow(row!);
}
async getByIdForUser(input: {
id: string;
userId: string;
}): Promise<PostgresTeam | null> {
const row = await queryOne<TeamRow>(
this.client,
`
SELECT teams.*
FROM teams
INNER JOIN team_members ON team_members.team_id = teams.id
WHERE teams.id = $1 AND team_members.user_id = $2
`,
[input.id, input.userId]
);
return row ? mapTeamRow(row) : null;
}
async getMember(teamId: string, userId: string): Promise<PostgresTeamMember | null> {
const row = await queryOne<TeamMemberRow>(
this.client,
'SELECT * FROM team_members WHERE team_id = $1 AND user_id = $2',
[teamId, userId]
);
return row ? mapTeamMemberRow(row) : null;
}
}
function mapTeamRow(row: TeamRow): PostgresTeam {
return {
id: row.id,
name: row.name,
metadata: toJsonObject(row.metadata),
createdAtEpoch: toEpoch(row.created_at),
updatedAtEpoch: toEpoch(row.updated_at)
};
}
function mapTeamMemberRow(row: TeamMemberRow): PostgresTeamMember {
return {
teamId: row.team_id,
userId: row.user_id,
role: row.role,
metadata: toJsonObject(row.metadata),
createdAtEpoch: toEpoch(row.created_at),
updatedAtEpoch: toEpoch(row.updated_at)
};
}

View File

@@ -0,0 +1,107 @@
// SPDX-License-Identifier: Apache-2.0
import { createHash, randomUUID } from 'crypto';
import type { QueryResult, QueryResultRow } from 'pg';
export type JsonObject = Record<string, unknown>;
export type JsonValue = unknown;
export interface PostgresQueryable {
query<T extends QueryResultRow = QueryResultRow>(text: string, values?: unknown[]): Promise<QueryResult<T>>;
}
export function newId(): string {
return randomUUID();
}
export function toJsonObject(value: unknown): JsonObject {
if (value && typeof value === 'object' && !Array.isArray(value)) {
return value as JsonObject;
}
return {};
}
export function toJsonArray(value: unknown): unknown[] {
return Array.isArray(value) ? value : [];
}
export function toEpoch(value: Date | string | number): number {
if (typeof value === 'number') {
return value;
}
return new Date(value).getTime();
}
export function toDate(value: Date | string | number | null | undefined): Date | null {
if (value == null) {
return null;
}
return value instanceof Date ? value : new Date(value);
}
export async function queryOne<T extends QueryResultRow>(
client: PostgresQueryable,
text: string,
values: unknown[] = []
): Promise<T | null> {
const result = await client.query<T>(text, values);
return result.rows[0] ?? null;
}
export async function assertProjectOwnership(
client: PostgresQueryable,
projectId: string,
teamId: string
): Promise<void> {
const row = await queryOne<{ id: string }>(
client,
'SELECT id FROM projects WHERE id = $1 AND team_id = $2',
[projectId, teamId]
);
if (!row) {
throw new Error('project_id must belong to team_id');
}
}
export async function assertSessionOwnership(
client: PostgresQueryable,
serverSessionId: string,
projectId: string,
teamId: string
): Promise<void> {
const row = await queryOne<{ id: string }>(
client,
'SELECT id FROM server_sessions WHERE id = $1 AND project_id = $2 AND team_id = $3',
[serverSessionId, projectId, teamId]
);
if (!row) {
throw new Error('server_session_id must belong to project_id and team_id');
}
}
export function canonicalJson(value: unknown): string {
return JSON.stringify(sortJson(value));
}
export function deterministicKey(parts: readonly unknown[]): string {
const fingerprint = createHash('sha256')
.update(canonicalJson(parts))
.digest('hex');
return fingerprint;
}
function sortJson(value: unknown): unknown {
if (Array.isArray(value)) {
return value.map(sortJson);
}
if (value && typeof value === 'object') {
const record = value as Record<string, unknown>;
return Object.keys(record)
.sort()
.reduce<Record<string, unknown>>((acc, key) => {
acc[key] = sortJson(record[key]);
return acc;
}, {});
}
return value;
}

View File

@@ -0,0 +1,850 @@
import { afterEach, beforeEach, describe, expect, it } from 'bun:test';
import pg from 'pg';
import {
SERVER_BETA_POSTGRES_TABLES,
bootstrapServerBetaPostgresSchema,
buildObservationGenerationKey,
createPostgresStorageRepositories,
type PostgresPoolClient,
type PostgresStorageRepositories
} from '../../../src/storage/postgres/index.js';
const testDatabaseUrl = process.env.CLAUDE_MEM_TEST_POSTGRES_URL;
describe('server beta postgres schema bootstrap', () => {
it('acquires and releases a client when bootstrapping from a pool', async () => {
const queries: string[] = [];
let released = false;
const pool = {
totalCount: 0,
idleCount: 0,
waitingCount: 0,
async connect() {
return {
release(): void {
released = true;
},
async query(text: string) {
queries.push(text);
return { rows: [], rowCount: 0 };
}
};
},
async query(): Promise<never> {
throw new Error('pool query should not be used for schema bootstrap');
}
};
await bootstrapServerBetaPostgresSchema(pool);
expect(queries[0]).toBe('BEGIN');
expect(queries.at(-1)).toBe('COMMIT');
expect(released).toBe(true);
});
it('uses an already-connected pool client without reconnecting it', async () => {
const queries: string[] = [];
const client = {
async connect(): Promise<never> {
throw new Error('client should not reconnect');
},
release(): void {},
async query(text: string) {
queries.push(text);
return { rows: [], rowCount: 0 };
}
} as unknown as PostgresPoolClient;
await bootstrapServerBetaPostgresSchema(client);
expect(queries[0]).toBe('BEGIN');
expect(queries.at(-1)).toBe('COMMIT');
});
});
describe('server beta postgres observation storage', () => {
if (!testDatabaseUrl) {
it.skip('requires explicit CLAUDE_MEM_TEST_POSTGRES_URL for Postgres integration tests', () => {});
return;
}
const pool = new pg.Pool({ connectionString: testDatabaseUrl });
let client: PostgresPoolClient;
let schemaName: string;
let storage: PostgresStorageRepositories;
beforeEach(async () => {
client = await pool.connect();
schemaName = `cm_pg_test_${crypto.randomUUID().replaceAll('-', '_')}`;
await client.query(`CREATE SCHEMA ${quoteIdentifier(schemaName)}`);
await client.query(`SET search_path TO ${quoteIdentifier(schemaName)}`);
await bootstrapServerBetaPostgresSchema(client);
storage = createPostgresStorageRepositories(client);
});
afterEach(async () => {
if (client) {
await client.query(`DROP SCHEMA IF EXISTS ${quoteIdentifier(schemaName)} CASCADE`);
client.release();
}
});
it('creates the Phase 1 schema idempotently', async () => {
await bootstrapServerBetaPostgresSchema(client);
const result = await client.query<{ table_name: string }>(
`
SELECT table_name
FROM information_schema.tables
WHERE table_schema = $1
`,
[schemaName]
);
const tables = new Set(result.rows.map(row => row.table_name));
for (const table of SERVER_BETA_POSTGRES_TABLES) {
expect(tables.has(table)).toBe(true);
}
});
it('enforces project/team ownership for project-scoped writes', async () => {
const teamA = await storage.teams.create({ name: 'Team A' });
const teamB = await storage.teams.create({ name: 'Team B' });
const projectA = await storage.projects.create({ teamId: teamA.id, name: 'Project A' });
await expect(storage.projects.create({ teamId: 'missing-team', name: 'Invalid' })).rejects.toThrow();
await expect(storage.sessions.create({
projectId: projectA.id,
teamId: teamB.id
})).rejects.toThrow(/project_id must belong to team_id/);
});
it('deduplicates agent events with deterministic idempotency keys when source event IDs are omitted', async () => {
const { project, session } = await createFixtureScope(storage);
const occurredAt = new Date('2026-05-07T20:00:00.000Z');
const payload = { message: 'same payload', nested: { b: 2, a: 1 } };
const first = await storage.agentEvents.create({
projectId: project.id,
teamId: project.teamId,
serverSessionId: session.id,
sourceAdapter: 'claude-code',
eventType: 'user_prompt',
payload,
occurredAt
});
const second = await storage.agentEvents.create({
projectId: project.id,
teamId: project.teamId,
serverSessionId: session.id,
sourceAdapter: 'claude-code',
eventType: 'user_prompt',
payload: { nested: { a: 1, b: 2 }, message: 'same payload' },
occurredAt
});
const withNativeId = await storage.agentEvents.create({
projectId: project.id,
teamId: project.teamId,
sourceAdapter: 'cursor',
sourceEventId: 'event-1',
eventType: 'tool_call',
payload: { one: true },
occurredAt
});
const duplicateNativeId = await storage.agentEvents.create({
projectId: project.id,
teamId: project.teamId,
sourceAdapter: 'cursor',
sourceEventId: 'event-1',
eventType: 'tool_call',
payload: { two: true },
occurredAt
});
expect(second.id).toBe(first.id);
expect(second.idempotencyKey).toBe(first.idempotencyKey);
expect(duplicateNativeId.id).toBe(withNativeId.id);
});
it('creates observations, searches content, links sources, and preserves generation retry idempotency', async () => {
const { project, session, event, eventJob } = await createFixtureScopeWithEventJob(storage);
const generationKey = buildObservationGenerationKey({
generationJobId: eventJob.id,
parsedObservationIndex: 0,
content: 'Postgres is the canonical observation store'
});
const observation = await storage.observations.create({
projectId: project.id,
teamId: project.teamId,
serverSessionId: session.id,
content: 'Postgres is the canonical observation store',
generationKey,
createdByJobId: eventJob.id,
metadata: { generated: true }
});
const retry = await storage.observations.create({
projectId: project.id,
teamId: project.teamId,
serverSessionId: session.id,
content: 'Postgres is the canonical observation store',
generationKey,
createdByJobId: eventJob.id
});
const source = await storage.observationSources.addSource({
observationId: observation.id,
projectId: project.id,
teamId: project.teamId,
sourceType: 'agent_event',
sourceId: event.id,
generationJobId: eventJob.id
});
const duplicateSource = await storage.observationSources.addSource({
observationId: observation.id,
projectId: project.id,
teamId: project.teamId,
sourceType: 'agent_event',
sourceId: event.id,
generationJobId: eventJob.id
});
const search = await storage.observations.search({
projectId: project.id,
teamId: project.teamId,
query: 'canonical observation'
});
expect(retry.id).toBe(observation.id);
expect(source.id).toBe(duplicateSource.id);
expect(search.map(item => item.id)).toContain(observation.id);
await expect(storage.observationSources.listByObservationForScope({
observationId: observation.id,
projectId: project.id,
teamId: project.teamId
})).resolves.toHaveLength(1);
});
it('scopes observation generation_key idempotency to project and team', async () => {
const firstScope = await createFixtureScope(storage);
const secondScope = await createFixtureScope(storage);
const generationKey = 'shared-generation-key';
const first = await storage.observations.create({
projectId: firstScope.project.id,
teamId: firstScope.project.teamId,
content: 'First scoped generation key observation',
generationKey
});
const retry = await storage.observations.create({
projectId: firstScope.project.id,
teamId: firstScope.project.teamId,
content: 'First scoped generation key observation retry',
generationKey
});
const second = await storage.observations.create({
projectId: secondScope.project.id,
teamId: secondScope.project.teamId,
content: 'Second scoped generation key observation',
generationKey
});
expect(retry.id).toBe(first.id);
expect(second.id).not.toBe(first.id);
expect(second.projectId).toBe(secondScope.project.id);
expect(second.teamId).toBe(secondScope.project.teamId);
});
it('scopes observation source reads to the observation project and team', async () => {
const { project, event, eventJob } = await createFixtureScopeWithEventJob(storage);
const other = await createFixtureScope(storage);
const observation = await storage.observations.create({
projectId: project.id,
teamId: project.teamId,
content: 'Scoped observation source reader'
});
await storage.observationSources.addSource({
observationId: observation.id,
projectId: project.id,
teamId: project.teamId,
sourceType: 'agent_event',
sourceId: event.id,
generationJobId: eventJob.id
});
await expect(storage.observationSources.listByObservationForScope({
observationId: observation.id,
projectId: project.id,
teamId: project.teamId
})).resolves.toHaveLength(1);
await expect(storage.observationSources.listByObservationForScope({
observationId: observation.id,
projectId: other.project.id,
teamId: other.project.teamId
})).resolves.toEqual([]);
});
it('does not mutate scoped observation source, job transition, or job event writes with the wrong scope', async () => {
const { project, event, eventJob } = await createFixtureScopeWithEventJob(storage);
const other = await createFixtureScope(storage);
const observation = await storage.observations.create({
projectId: project.id,
teamId: project.teamId,
content: 'Wrong-scope mutation guard'
});
await expect(storage.observationSources.addSource({
observationId: observation.id,
projectId: other.project.id,
teamId: other.project.teamId,
sourceType: 'agent_event',
sourceId: event.id,
generationJobId: eventJob.id
})).rejects.toThrow(/observation_id/);
await expect(storage.observationGenerationJobs.transitionStatus({
id: eventJob.id,
projectId: other.project.id,
teamId: other.project.teamId,
status: 'processing',
lockedBy: 'wrong-scope-worker'
})).resolves.toBeNull();
await expect(storage.observationGenerationJobEvents.append({
generationJobId: eventJob.id,
projectId: other.project.id,
teamId: other.project.teamId,
eventType: 'processing',
statusAfter: 'processing'
})).rejects.toThrow(/generation_job_id must belong/);
await expect(storage.observationSources.listByObservationForScope({
observationId: observation.id,
projectId: project.id,
teamId: project.teamId
})).resolves.toEqual([]);
await expect(storage.observationGenerationJobs.getByIdForScope({
id: eventJob.id,
projectId: project.id,
teamId: project.teamId
})).resolves.toMatchObject({ status: 'queued', attempts: 0, lockedBy: null });
await expect(storage.observationGenerationJobEvents.listByJobForScope({
generationJobId: eventJob.id,
projectId: project.id,
teamId: project.teamId
})).resolves.toEqual([]);
});
it('deduplicates sessions by deterministic identity when external session IDs are omitted', async () => {
const { project } = await createFixtureScope(storage);
const first = await storage.sessions.create({
projectId: project.id,
teamId: project.teamId,
contentSessionId: 'content-session-1',
agentId: 'agent-1',
platformSource: 'claude-code',
metadata: { first: true }
});
const second = await storage.sessions.create({
projectId: project.id,
teamId: project.teamId,
contentSessionId: 'content-session-1',
agentId: 'agent-1',
platformSource: 'claude-code',
metadata: { second: true }
});
expect(second.id).toBe(first.id);
expect(second.idempotencyKey).toBe(first.idempotencyKey);
expect(second.idempotencyKey).not.toBeNull();
});
it('exposes scoped getters for auth-visible project resources', async () => {
const { project, session, event, eventJob } = await createFixtureScopeWithEventJob(storage);
const other = await createFixtureScope(storage);
const observation = await storage.observations.create({
projectId: project.id,
teamId: project.teamId,
serverSessionId: session.id,
content: 'Scoped getter observation',
createdByJobId: eventJob.id
});
await expect(storage.projects.getByIdForTeam(project.id, project.teamId)).resolves.toMatchObject({ id: project.id });
await expect(storage.sessions.getByIdForScope({
id: session.id,
projectId: project.id,
teamId: project.teamId
})).resolves.toMatchObject({ id: session.id });
await expect(storage.agentEvents.getByIdForScope({
id: event.id,
projectId: project.id,
teamId: project.teamId
})).resolves.toMatchObject({ id: event.id });
await expect(storage.observationGenerationJobs.getByIdForScope({
id: eventJob.id,
projectId: project.id,
teamId: project.teamId
})).resolves.toMatchObject({ id: eventJob.id });
await expect(storage.observations.getByIdForScope({
id: observation.id,
projectId: project.id,
teamId: project.teamId
})).resolves.toMatchObject({ id: observation.id });
await expect(storage.projects.getByIdForTeam(project.id, other.project.teamId)).resolves.toBeNull();
await expect(storage.sessions.getByIdForScope({
id: session.id,
projectId: other.project.id,
teamId: other.project.teamId
})).resolves.toBeNull();
await expect(storage.agentEvents.getByIdForScope({
id: event.id,
projectId: other.project.id,
teamId: other.project.teamId
})).resolves.toBeNull();
await expect(storage.observationGenerationJobs.getByIdForScope({
id: eventJob.id,
projectId: other.project.id,
teamId: other.project.teamId
})).resolves.toBeNull();
await expect(storage.observations.getByIdForScope({
id: observation.id,
projectId: other.project.id,
teamId: other.project.teamId
})).resolves.toBeNull();
});
it('does not expose unscoped auth-visible getters on exported repositories', async () => {
for (const repository of [
storage.projects,
storage.sessions,
storage.agentEvents,
storage.observationGenerationJobs,
storage.observations,
storage.observationSources
]) {
const exposed = repository as unknown as Record<string, unknown>;
expect(exposed.getById).toBeUndefined();
expect(exposed[['getById', 'Internal'].join('')]).toBeUndefined();
expect(exposed[['listBy', 'Status'].join('')]).toBeUndefined();
expect(exposed[['listBy', 'Job'].join('')]).toBeUndefined();
expect(exposed[['listBy', 'Observation'].join('')]).toBeUndefined();
}
});
it('scopes team lookup by membership', async () => {
const team = await storage.teams.create({ name: 'Scoped Team' });
await storage.teams.addMember({ teamId: team.id, userId: 'member-1', role: 'viewer' });
await expect(storage.teams.getByIdForUser({
id: team.id,
userId: 'member-1'
})).resolves.toMatchObject({ id: team.id });
await expect(storage.teams.getByIdForUser({
id: team.id,
userId: 'outsider'
})).resolves.toBeNull();
});
it('rejects illegal generation job lifecycle transitions and max-attempt retries', async () => {
const { project, event } = await createFixtureScopeWithEventJob(storage);
const job = await storage.observationGenerationJobs.create({
projectId: project.id,
teamId: project.teamId,
sourceType: 'agent_event',
sourceId: event.id,
agentEventId: event.id,
jobType: 'single_attempt_generate',
maxAttempts: 1
});
const processing = await storage.observationGenerationJobs.transitionStatus({
id: job.id,
projectId: project.id,
teamId: project.teamId,
status: 'processing',
lockedBy: 'worker-1'
});
await expect(storage.observationGenerationJobs.transitionStatus({
id: job.id,
projectId: project.id,
teamId: project.teamId,
status: 'queued',
nextAttemptAt: new Date('2026-05-07T22:00:00.000Z')
})).rejects.toThrow(/max_attempts/);
const failed = await storage.observationGenerationJobs.transitionStatus({
id: job.id,
projectId: project.id,
teamId: project.teamId,
status: 'failed',
lastError: { message: 'attempt failed' }
});
expect(processing?.attempts).toBe(1);
expect(failed?.failedAtEpoch).not.toBeNull();
expect(failed?.completedAtEpoch).toBeNull();
expect(failed?.cancelledAtEpoch).toBeNull();
await expect(storage.observationGenerationJobs.transitionStatus({
id: job.id,
projectId: project.id,
teamId: project.teamId,
status: 'processing',
lockedBy: 'worker-2'
})).rejects.toThrow(/terminal status failed/);
});
it('allows only one worker to transition a queued generation job to processing', async () => {
const { eventJob } = await createFixtureScopeWithEventJob(storage);
let workerA: PostgresPoolClient | null = null;
let workerB: PostgresPoolClient | null = null;
try {
workerA = await pool.connect();
workerB = await pool.connect();
await workerA.query(`SET search_path TO ${quoteIdentifier(schemaName)}`);
await workerB.query(`SET search_path TO ${quoteIdentifier(schemaName)}`);
const workerAStorage = createPostgresStorageRepositories(workerA);
const workerBStorage = createPostgresStorageRepositories(workerB);
const results = await Promise.allSettled([
workerAStorage.observationGenerationJobs.transitionStatus({
id: eventJob.id,
projectId: eventJob.projectId,
teamId: eventJob.teamId,
status: 'processing',
lockedBy: 'worker-a'
}),
workerBStorage.observationGenerationJobs.transitionStatus({
id: eventJob.id,
projectId: eventJob.projectId,
teamId: eventJob.teamId,
status: 'processing',
lockedBy: 'worker-b'
})
]);
const fulfilled = results.filter(result => result.status === 'fulfilled');
const rejected = results.filter(result => result.status === 'rejected');
const claimed = await storage.observationGenerationJobs.getByIdForScope({
id: eventJob.id,
projectId: eventJob.projectId,
teamId: eventJob.teamId
});
expect(fulfilled).toHaveLength(1);
expect(rejected).toHaveLength(1);
expect(claimed?.status).toBe('processing');
expect(claimed?.attempts).toBe(1);
} finally {
workerA?.release();
workerB?.release();
}
});
it('validates server session ownership when creating event generation jobs', async () => {
const scope = await createFixtureScopeWithEventJob(storage);
const other = await createFixtureScope(storage);
const siblingSession = await storage.sessions.create({
projectId: scope.project.id,
teamId: scope.team.id,
externalSessionId: crypto.randomUUID()
});
await expect(storage.observationGenerationJobs.create({
projectId: scope.project.id,
teamId: scope.team.id,
sourceType: 'agent_event',
sourceId: scope.event.id,
agentEventId: scope.event.id,
serverSessionId: other.session.id,
jobType: 'invalid_cross_scope_session'
})).rejects.toThrow(/server_session_id must belong/);
await expect(storage.observationGenerationJobs.create({
projectId: scope.project.id,
teamId: scope.team.id,
sourceType: 'agent_event',
sourceId: scope.event.id,
agentEventId: scope.event.id,
serverSessionId: siblingSession.id,
jobType: 'invalid_event_session'
})).rejects.toThrow(/server_session_id must match/);
});
it('requires linked generation jobs to match observation source models', async () => {
const { project, event, eventJob } = await createFixtureScopeWithEventJob(storage);
const secondEvent = await storage.agentEvents.create({
projectId: project.id,
teamId: project.teamId,
sourceAdapter: 'claude-code',
sourceEventId: crypto.randomUUID(),
eventType: 'assistant_response',
payload: { content: 'second response' },
occurredAt: new Date('2026-05-07T21:30:00.000Z')
});
const secondJob = await storage.observationGenerationJobs.create({
projectId: project.id,
teamId: project.teamId,
sourceType: 'agent_event',
sourceId: secondEvent.id,
agentEventId: secondEvent.id,
jobType: 'generate_observations'
});
const observation = await storage.observations.create({
projectId: project.id,
teamId: project.teamId,
content: 'Observation source model validation'
});
await expect(storage.observationSources.addSource({
observationId: observation.id,
projectId: project.id,
teamId: project.teamId,
sourceType: 'agent_event',
sourceId: event.id,
generationJobId: secondJob.id
})).rejects.toThrow(/source model/);
await expect(storage.observationSources.addSource({
observationId: observation.id,
projectId: project.id,
teamId: project.teamId,
sourceType: 'agent_event',
sourceId: event.id,
agentEventId: secondEvent.id,
generationJobId: eventJob.id
})).rejects.toThrow(/source_id must equal agent_event_id/);
});
it('validates non-agent observation sources that are not linked through generation jobs', async () => {
const scope = await createFixtureScope(storage);
const other = await createFixtureScope(storage);
const targetObservation = await storage.observations.create({
projectId: scope.project.id,
teamId: scope.project.teamId,
content: 'Target observation for non-agent source validation'
});
const sourceObservation = await storage.observations.create({
projectId: scope.project.id,
teamId: scope.project.teamId,
content: 'Source observation for reindex validation'
});
const otherObservation = await storage.observations.create({
projectId: other.project.id,
teamId: other.project.teamId,
content: 'Cross-scope source observation'
});
await expect(storage.observationSources.addSource({
observationId: targetObservation.id,
projectId: scope.project.id,
teamId: scope.project.teamId,
sourceType: 'session_summary',
sourceId: scope.session.id
})).resolves.toMatchObject({ sourceType: 'session_summary', sourceId: scope.session.id });
await expect(storage.observationSources.addSource({
observationId: targetObservation.id,
projectId: scope.project.id,
teamId: scope.project.teamId,
sourceType: 'observation_reindex',
sourceId: sourceObservation.id
})).resolves.toMatchObject({ sourceType: 'observation_reindex', sourceId: sourceObservation.id });
await expect(storage.observationSources.addSource({
observationId: targetObservation.id,
projectId: scope.project.id,
teamId: scope.project.teamId,
sourceType: 'session_summary',
sourceId: other.session.id
})).rejects.toThrow(/server_session_id must belong/);
await expect(storage.observationSources.addSource({
observationId: targetObservation.id,
projectId: scope.project.id,
teamId: scope.project.teamId,
sourceType: 'observation_reindex',
sourceId: otherObservation.id
})).rejects.toThrow(/observation_reindex source_id must belong/);
});
it('scopes generation job source uniqueness to project and team', async () => {
const firstScope = await createFixtureScope(storage);
const secondScope = await createFixtureScope(storage);
const sharedSourceId = 'shared-source-id';
const jobType = 'shared_source_generate';
await client.query(
`
INSERT INTO observation_generation_jobs (
id, project_id, team_id, source_type, source_id, job_type, status, idempotency_key
)
VALUES ($1, $2, $3, 'observation_reindex', $4, $5, 'queued', $6)
`,
[
crypto.randomUUID(),
firstScope.project.id,
firstScope.project.teamId,
sharedSourceId,
jobType,
'first-scope-source-key'
]
);
await client.query(
`
INSERT INTO observation_generation_jobs (
id, project_id, team_id, source_type, source_id, job_type, status, idempotency_key
)
VALUES ($1, $2, $3, 'observation_reindex', $4, $5, 'queued', $6)
`,
[
crypto.randomUUID(),
secondScope.project.id,
secondScope.project.teamId,
sharedSourceId,
jobType,
'second-scope-source-key'
]
);
await expect(client.query(
`
INSERT INTO observation_generation_jobs (
id, project_id, team_id, source_type, source_id, job_type, status, idempotency_key
)
VALUES ($1, $2, $3, 'observation_reindex', $4, $5, 'queued', $6)
`,
[
crypto.randomUUID(),
firstScope.project.id,
firstScope.project.teamId,
sharedSourceId,
jobType,
'duplicate-first-scope-source-key'
]
)).rejects.toThrow();
});
it('deduplicates generation jobs by source model and records lifecycle events', async () => {
const { project, session, event, eventJob } = await createFixtureScopeWithEventJob(storage);
const other = await createFixtureScope(storage);
const duplicateEventJob = await storage.observationGenerationJobs.create({
projectId: project.id,
teamId: project.teamId,
sourceType: 'agent_event',
sourceId: event.id,
agentEventId: event.id,
jobType: 'generate_observations'
});
const summaryJob = await storage.observationGenerationJobs.create({
projectId: project.id,
teamId: project.teamId,
sourceType: 'session_summary',
sourceId: session.id,
serverSessionId: session.id,
jobType: 'generate_session_summary'
});
const observation = await storage.observations.create({
projectId: project.id,
teamId: project.teamId,
content: 'Reindexable observation'
});
const reindexJob = await storage.observationGenerationJobs.create({
projectId: project.id,
teamId: project.teamId,
sourceType: 'observation_reindex',
sourceId: observation.id,
jobType: 'reindex_observation'
});
const processing = await storage.observationGenerationJobs.transitionStatus({
id: eventJob.id,
projectId: project.id,
teamId: project.teamId,
status: 'processing',
lockedBy: 'worker-1'
});
await storage.observationGenerationJobEvents.append({
generationJobId: eventJob.id,
projectId: project.id,
teamId: project.teamId,
eventType: 'queued',
statusAfter: 'queued'
});
await storage.observationGenerationJobEvents.append({
generationJobId: eventJob.id,
projectId: project.id,
teamId: project.teamId,
eventType: 'processing',
statusAfter: 'processing',
attempt: processing?.attempts ?? 1
});
const scopedQueuedJobs = await storage.observationGenerationJobs.listByStatusForScope({
status: 'queued',
projectId: project.id,
teamId: project.teamId
});
const wrongScopeQueuedJobs = await storage.observationGenerationJobs.listByStatusForScope({
status: 'queued',
projectId: other.project.id,
teamId: other.project.teamId
});
const lifecycle = await storage.observationGenerationJobEvents.listByJobForScope({
generationJobId: eventJob.id,
projectId: project.id,
teamId: project.teamId
});
const wrongScopeLifecycle = await storage.observationGenerationJobEvents.listByJobForScope({
generationJobId: eventJob.id,
projectId: other.project.id,
teamId: other.project.teamId
});
expect(duplicateEventJob.id).toBe(eventJob.id);
expect(summaryJob.sourceType).toBe('session_summary');
expect(summaryJob.agentEventId).toBeNull();
expect(summaryJob.serverSessionId).toBe(session.id);
expect(reindexJob.sourceType).toBe('observation_reindex');
expect(reindexJob.agentEventId).toBeNull();
expect(processing?.attempts).toBe(1);
expect(scopedQueuedJobs.map(job => job.id).sort()).toEqual([summaryJob.id, reindexJob.id].sort());
expect(wrongScopeQueuedJobs).toEqual([]);
expect(lifecycle.map(eventRecord => eventRecord.eventType)).toEqual(['queued', 'processing']);
expect(wrongScopeLifecycle).toEqual([]);
});
});
async function createFixtureScope(storage: PostgresStorageRepositories) {
const team = await storage.teams.create({ name: 'Core' });
const project = await storage.projects.create({ teamId: team.id, name: 'Claude Mem' });
const session = await storage.sessions.create({
projectId: project.id,
teamId: team.id,
externalSessionId: crypto.randomUUID(),
platformSource: 'claude-code'
});
return { team, project, session };
}
async function createFixtureScopeWithEventJob(storage: PostgresStorageRepositories) {
const scope = await createFixtureScope(storage);
const event = await storage.agentEvents.create({
projectId: scope.project.id,
teamId: scope.team.id,
serverSessionId: scope.session.id,
sourceAdapter: 'claude-code',
sourceEventId: crypto.randomUUID(),
eventType: 'assistant_response',
payload: { content: 'response' },
occurredAt: new Date('2026-05-07T21:00:00.000Z')
});
const eventJob = await storage.observationGenerationJobs.create({
projectId: scope.project.id,
teamId: scope.team.id,
sourceType: 'agent_event',
sourceId: event.id,
agentEventId: event.id,
serverSessionId: scope.session.id,
jobType: 'generate_observations'
});
return { ...scope, event, eventJob };
}
function quoteIdentifier(identifier: string): string {
return `"${identifier.replaceAll('"', '""')}"`;
}