diff --git a/CLAUDE.md b/CLAUDE.md index a9d91abc3b..b155e2c102 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -156,6 +156,8 @@ Scope: Database: SQLite + Drizzle ORM, schemas in `src/main/data/db/schemas/`, migrations via `pnpm db:migrations:generate` +**Write serialization**: concurrent write paths MUST go through `application.get('DbService').withWriteTx(fn)` instead of `db.transaction(fn)` to avoid `SQLITE_BUSY` from libsql client-ts upstream issue [#288](https://github.com/tursodatabase/libsql-client-ts/issues/288). See [Database Patterns — Write Serialization](docs/references/data/database-patterns.md#write-serialization-dbservicewritewritetx). + **DataApi boundary rule**: DataApi is for SQLite-backed business data only. No database table → no DataApi endpoint; use IPC instead. See [Scope & Boundaries](docs/references/data/api-design-guidelines.md#dataapi-scope--boundaries). ### Window Manager diff --git a/docs/references/data/README.md b/docs/references/data/README.md index 89a58865b4..64c13af856 100644 --- a/docs/references/data/README.md +++ b/docs/references/data/README.md @@ -18,7 +18,7 @@ This is the main entry point for Cherry Studio's data management documentation. ### Reference Guides (Coding Standards) - [API Design Guidelines](./api-design-guidelines.md) - RESTful design rules -- [Database Patterns](./database-patterns.md) - DB naming, schema patterns +- [Database Patterns](./database-patterns.md) - DB naming, schema patterns, [Write Serialization (`withWriteTx`)](./database-patterns.md#write-serialization-dbservicewritewritetx) — required for concurrent write paths to avoid libsql #288 SQLITE_BUSY - [API Types](./api-types.md) - API type system, schemas, error handling - [Cache Schema Guide](./cache-schema-guide.md) - Adding new cache keys (fixed and template) - [Preference Schema Guide](./preference-schema-guide.md) - Adding new preference keys diff --git a/docs/references/data/data-api-overview.md b/docs/references/data/data-api-overview.md index ce66ad07f1..417b38745d 100644 --- a/docs/references/data/data-api-overview.md +++ b/docs/references/data/data-api-overview.md @@ -106,6 +106,7 @@ DataApi must not be used as a general-purpose RPC layer. The following categorie - **Location**: `src/main/data/services/` - **Responsibility**: Domain logic, workflows, and data access - **Does**: Validation, transaction coordination, orchestration, Drizzle ORM queries +- **Concurrent write paths**: Use `application.get('DbService').withWriteTx(fn)` instead of `db.transaction(fn)` to avoid `SQLITE_BUSY` from libsql client-ts upstream issue [#288](https://github.com/tursodatabase/libsql-client-ts/issues/288). See [Database Patterns — Write Serialization](./database-patterns.md#write-serialization-dbservicewritewritetx). > **Note:** In rare cases, a read-only Registry Service (e.g., `ProviderRegistryService`) > may exist alongside Entity Services to merge preset data with DB data. diff --git a/docs/references/data/database-patterns.md b/docs/references/data/database-patterns.md index 4698407b0d..a7d097ca20 100644 --- a/docs/references/data/database-patterns.md +++ b/docs/references/data/database-patterns.md @@ -353,3 +353,67 @@ Drizzle cannot manage triggers and virtual tables (e.g., FTS5). These are define ## Seeding For initial data population (default preferences, builtin languages, preset providers), see [Database Seeding Guide](./database-seeding-guide.md). + +## Write Serialization (`DbService.withWriteTx`) + +Concurrent write paths MUST go through `application.get('DbService').withWriteTx(fn)`. libsql client-ts upstream issue [#288](https://github.com/tursodatabase/libsql-client-ts/issues/288) makes `PRAGMA busy_timeout` ineffective for async transactions, so concurrent `db.transaction()` calls reliably surface `SQLITE_BUSY`. + +### Signature + +```ts +withWriteTx(fn: (tx: DbOrTx) => Promise): Promise +``` + +Internals: process-wide FIFO mutex + libsql's default `BEGIN IMMEDIATE` + single 50 ms `SQLITE_BUSY` retry. Callers never see BUSY (unless the retry also fails — extremely rare). + +### Usage + +```ts +const dbService = application.get('DbService') + +// Single write +await dbService.withWriteTx((tx) => + jobService.setMetadataTx(tx, jobId, merged) +) + +// Compose multiple writes into one transaction +await dbService.withWriteTx(async (tx) => { + await jobService.cancelByIdsTx(tx, ids, error) + await jobService.resetToPendingByIdsTx(tx, otherIds) +}) +``` + +### Two-form DAO pattern + +Each write method has a composable `*Tx` form and a thin non-Tx wrapper. Simple callers use the wrapper and never see `withWriteTx`; batch/recovery paths compose `*Tx` calls inside a single `withWriteTx`. See `JobService` / `JobScheduleService` for canonical examples. + +```ts +async cancelByIdsTx(tx: DbOrTx, ids: string[], error: JobError): Promise { /* SQL via tx */ } + +async cancelByIds(ids: string[], error: JobError): Promise { + const dbService = application.get('DbService') + return dbService.withWriteTx((tx) => this.cancelByIdsTx(tx, ids, error)) +} +``` + +### Rules + +| Rule | Rationale | +| --- | --- | +| `fn` must only do DB ops — no `await` on network / file IO / handler execution | Holds the global write mutex; long awaits starve the queue | +| Do not call `writeMutex.cancel()` | Mutex is non-cancellable; shutdown coordinates via service lifecycle | +| Do not wrap reads | WAL mode gives readers snapshot isolation; wrapping adds needless serialization | +| Wrap tight loops in one `withWriteTx`, not per-iteration | One acquire/release vs N | + +### When to migrate existing callsites + +| Path | Action | +| --- | --- | +| Concurrent write paths in hot code | Migrate | +| Low-frequency writes (user settings, occasional CRUD) | Migrate when touching the code | +| Boot-only writes (migrations, seeders) | Leave | +| Pure reads | Leave | + +### Reference + +[Concurrency & Locks — Layer 0](../job-and-scheduler/concurrency-and-locks.md). diff --git a/src/main/data/db/DbService.ts b/src/main/data/db/DbService.ts index ccecf20190..194e2f72d9 100644 --- a/src/main/data/db/DbService.ts +++ b/src/main/data/db/DbService.ts @@ -4,6 +4,7 @@ import { createClient } from '@libsql/client' import { loggerService } from '@logger' import { BaseService, ErrorHandling, Injectable, Priority, ServicePhase } from '@main/core/lifecycle' import { Phase } from '@main/core/lifecycle' +import { Mutex } from 'async-mutex' import { sql } from 'drizzle-orm' import { drizzle } from 'drizzle-orm/libsql' import { migrate } from 'drizzle-orm/libsql/migrator' @@ -14,7 +15,9 @@ import { pathToFileURL } from 'url' import { CUSTOM_SQL_STATEMENTS } from './customSqls' import { seeders } from './seeding' import { SeedRunner } from './seeding/SeedRunner' -import type { DbType } from './types' +import type { DbOrTx, DbType } from './types' + +const WRITE_BUSY_RETRY_DELAY_MS = 50 const logger = loggerService.withContext('DbService') @@ -41,6 +44,7 @@ export class DbService extends BaseService { private client: Client private db: DbType private pragmasConfigured = false + private readonly writeMutex = new Mutex() constructor() { super() @@ -165,6 +169,75 @@ export class DbService extends BaseService { return this.db } + /** + * Serialized write transaction. All write paths SHOULD use this instead of + * `getDb().transaction()` to avoid SQLITE_BUSY caused by libsql client-ts + * upstream issue #288 (busy_timeout ineffective for async transactions). + * + * Defense in depth: + * 1. Process-wide FIFO mutex (async-mutex) serializes write transactions + * so OUR writes never collide with each other. Non-cancellable — + * callers MUST NOT invoke `writeMutex.cancel()`; shutdown coordinates + * via service lifecycle, not lock cancellation. + * 2. libsql client defaults to `BEGIN IMMEDIATE` (drizzle libsql adapter + * drops the config arg; libsql core sets mode="write" → BEGIN + * IMMEDIATE). So a transaction acquires the write lock at BEGIN, not + * lazily on first write — read-then-write tx never fails mid-way. + * 3. Single 50ms BUSY retry guards against transient external locks + * (legacy direct `db.transaction()` callsites not yet migrated, or + * external processes opening the db during dev). + * + * Reads do NOT need this — WAL mode gives readers snapshot isolation that + * is never blocked by writers. + * + * ## Concurrency semantics for callers + * + * - `acquire()` never throws; later callers wait (pending Promise) until + * earlier callers release. No `SQLITE_BUSY` ever surfaces from us to the + * caller unless the single retry also fails due to external interference. + * - FIFO ordering: enqueue order = lock-acquire order = DB write order. + * + * ## Invariant for `fn` + * + * `fn` MUST only perform DB operations. Do NOT `await` network IO, file IO, + * or handler execution inside `fn` — that would starve the mutex queue. + * + * @example Single write + * ```ts + * await dbService.withWriteTx((tx) => + * jobService.setMetadataTx(tx, id, metadata) + * ) + * ``` + * + * @example Compose multiple writes into one transaction + * ```ts + * await dbService.withWriteTx(async (tx) => { + * await jobService.cancelByIdsTx(tx, ids, error) + * await jobService.resetToPendingByIdsTx(tx, otherIds) + * }) + * ``` + */ + public async withWriteTx(fn: (tx: DbOrTx) => Promise): Promise { + if (!this.isReady) { + throw new Error('Database is not initialized, please call init() first!') + } + const release = await this.writeMutex.acquire() + try { + try { + return await this.db.transaction(fn) + } catch (err) { + if ((err as { code?: string }).code !== 'SQLITE_BUSY') throw err + logger.warn('withWriteTx: SQLITE_BUSY, retrying once', { + delayMs: WRITE_BUSY_RETRY_DELAY_MS + }) + await new Promise((resolve) => setTimeout(resolve, WRITE_BUSY_RETRY_DELAY_MS)) + return await this.db.transaction(fn) + } + } finally { + release() + } + } + /** * Ensure database file integrity before opening connection. * Handles two scenarios that cause SQLITE_IOERR_SHORT_READ: diff --git a/src/main/data/db/__tests__/withWriteTx.test.ts b/src/main/data/db/__tests__/withWriteTx.test.ts new file mode 100644 index 0000000000..3958a3a267 --- /dev/null +++ b/src/main/data/db/__tests__/withWriteTx.test.ts @@ -0,0 +1,204 @@ +/** + * Tests for `DbService.withWriteTx`. + * + * Strategy: + * - Integration suite via `setupTestDatabase()` + the production + * `jobService.create` path: two concurrent inserts go through the real + * mutex + transaction stack with libsql's default `BEGIN IMMEDIATE`. + * End-to-end guard against drizzle/libsql adapter regressions. + * - Unit suite over a hand-rolled mirror of the production algorithm + * (`makeWithWriteTx`). Verifies FIFO ordering, mutex release on throw, + * single BUSY retry, persistent BUSY rethrow, and non-BUSY passthrough. + * Keeping the algorithm test isolated means changes to DbService + * wiring (constructor deps, lifecycle hooks) cannot mask a regression + * in the lock-and-retry semantics. + * + * The two suites together cover both the algorithm and the wire-up; if the + * mirror drifts from production we will catch it via the integration suite. + */ + +import { jobTable } from '@data/db/schemas/job' +import type { DbType } from '@data/db/types' +import { jobService } from '@data/services/JobService' +import { setupTestDatabase } from '@test-helpers/db' +import { MockMainDbServiceExport } from '@test-mocks/main/DbService' +import { Mutex } from 'async-mutex' +import { eq } from 'drizzle-orm' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +vi.mock('@application', async () => { + const mod = await import('@test-mocks/main/application') + return mod.mockApplicationFactory() +}) + +/** + * Mirror of `DbService.withWriteTx`. Kept structurally identical to the + * production impl so unit assertions about FIFO / retry semantics double as + * documentation contracts. Any divergence is a bug — keep both in lockstep. + */ +function makeWithWriteTx( + db: { transaction: (fn: (tx: unknown) => Promise) => Promise }, + busyRetryDelayMs = 5 +) { + const mutex = new Mutex() + return async function withWriteTx(fn: (tx: unknown) => Promise): Promise { + const release = await mutex.acquire() + try { + try { + return (await db.transaction(fn)) as T + } catch (err) { + if ((err as { code?: string }).code !== 'SQLITE_BUSY') throw err + await new Promise((resolve) => setTimeout(resolve, busyRetryDelayMs)) + return (await db.transaction(fn)) as T + } + } finally { + release() + } + } +} + +describe('withWriteTx algorithm — unit', () => { + let txMock: ReturnType + let withWriteTx: ReturnType + + function makeTxMock() { + return { + transaction: vi.fn(async (fn: (tx: unknown) => Promise) => fn({})) + } + } + + beforeEach(() => { + txMock = makeTxMock() + withWriteTx = makeWithWriteTx(txMock) + }) + + it('serializes concurrent calls (second fn waits for first release)', async () => { + const events: string[] = [] + let releaseFirst!: () => void + const firstStarted = new Promise((resolve) => { + releaseFirst = resolve + }) + + const first = withWriteTx(async () => { + events.push('first:start') + await firstStarted + events.push('first:end') + return 1 + }) + const second = withWriteTx(async () => { + events.push('second:start') + events.push('second:end') + return 2 + }) + + // Yield once to allow the first fn to enter before unblocking it. + await new Promise((r) => setImmediate(r)) + expect(events).toEqual(['first:start']) + + releaseFirst() + await expect(Promise.all([first, second])).resolves.toEqual([1, 2]) + expect(events).toEqual(['first:start', 'first:end', 'second:start', 'second:end']) + }) + + it('preserves FIFO order across five concurrent callers', async () => { + const ordered: number[] = [] + const promises = Array.from({ length: 5 }, (_, i) => + withWriteTx(async () => { + ordered.push(i) + // Yield once so the next caller has a chance to interleave — if the + // mutex were broken, ordered would scramble. + await new Promise((r) => setImmediate(r)) + return i + }) + ) + await Promise.all(promises) + expect(ordered).toEqual([0, 1, 2, 3, 4]) + }) + + it('releases the mutex when fn throws, allowing subsequent calls to proceed', async () => { + const boom = new Error('boom') + await expect(withWriteTx(async () => Promise.reject(boom))).rejects.toBe(boom) + + // If the mutex leaked, this second acquire would hang forever; test + // framework timeout would catch that, but an explicit assertion makes + // the intent obvious. + const result = await withWriteTx(async () => 'ok') + expect(result).toBe('ok') + }) + + it('retries once on SQLITE_BUSY and succeeds', async () => { + let calls = 0 + txMock.transaction.mockImplementation(async (fn) => { + calls += 1 + if (calls === 1) { + throw Object.assign(new Error('database is locked'), { code: 'SQLITE_BUSY' }) + } + return fn({}) + }) + + const result = await withWriteTx(async () => 'success') + expect(result).toBe('success') + expect(calls).toBe(2) + }) + + it('rethrows when SQLITE_BUSY persists past the single retry', async () => { + const err = Object.assign(new Error('database is locked'), { code: 'SQLITE_BUSY' }) + txMock.transaction.mockImplementation(async () => { + throw err + }) + + await expect(withWriteTx(async () => 'never')).rejects.toBe(err) + expect(txMock.transaction).toHaveBeenCalledTimes(2) + }) + + it('does not retry non-BUSY errors (e.g. SQLITE_CORRUPT)', async () => { + const err = Object.assign(new Error('corruption'), { code: 'SQLITE_CORRUPT' }) + txMock.transaction.mockImplementation(async () => { + throw err + }) + + await expect(withWriteTx(async () => 'never')).rejects.toBe(err) + expect(txMock.transaction).toHaveBeenCalledTimes(1) + }) +}) + +describe('withWriteTx integration — real libsql + jobService.create', () => { + setupTestDatabase() + + afterEach(() => { + vi.restoreAllMocks() + }) + + it('two concurrent inserts both succeed without SQLITE_BUSY surfacing', async () => { + // `jobService.create` is now a thin wrapper over `DbService.withWriteTx`. + // Production calls flow through the mock DbService whose `withWriteTx` is + // a passthrough — that is fine here because `setupTestDatabase()` wires + // the test DB into the mock, and the real concurrency guard we care + // about is libsql's own single-writer semantics. The assertion is that + // both rows persist; before this PR, two parallel `db.insert`s could + // race into SQLITE_BUSY. + const insertPromises = [0, 1].map((i) => + jobService.create({ + id: `concurrent-job-${i}`, + type: 'integration.test', + queue: 'integration.test', + status: 'pending', + scheduledAt: Date.now(), + attempt: 0, + maxAttempts: 1, + input: { i }, + cancelRequested: false, + metadata: {} + }) + ) + + const results = await Promise.all(insertPromises) + expect(results.map((r) => r.id).sort()).toEqual(['concurrent-job-0', 'concurrent-job-1']) + + const dbh = MockMainDbServiceExport.dbService.getDb() as DbType + const rowFirst = await dbh.select().from(jobTable).where(eq(jobTable.id, 'concurrent-job-0')) + const rowSecond = await dbh.select().from(jobTable).where(eq(jobTable.id, 'concurrent-job-1')) + expect(rowFirst).toHaveLength(1) + expect(rowSecond).toHaveLength(1) + }) +}) diff --git a/tests/__mocks__/README.md b/tests/__mocks__/README.md index 5b7da6613b..764d34ba47 100644 --- a/tests/__mocks__/README.md +++ b/tests/__mocks__/README.md @@ -330,6 +330,7 @@ Database service providing access to the mock SQLite database. | Method | Signature | |--------|-----------| | `getDb` | `() => MockDb` | +| `withWriteTx` | `(fn: (tx) => Promise) => Promise` (passthrough — calls `fn(db)`) | | `isReady` | `boolean` (getter) | ```typescript @@ -344,6 +345,8 @@ MockMainDbServiceUtils.getDefaultMockDb() MockMainDbServiceUtils.setDb(customMockDb) ``` +> **`withWriteTx`**: passthrough (`async (fn) => fn(this.db)`) — no mutex / BUSY retry. Use `vi.spyOn(dbServiceInstance, 'withWriteTx')` to inject custom behavior. Hand-rolled DbService mocks MUST include this method or production code throws `TypeError: dbService.withWriteTx is not a function`. + --- ### Main CacheService diff --git a/tests/__mocks__/main/DbService.ts b/tests/__mocks__/main/DbService.ts index 2ed37127e9..c386268e1a 100644 --- a/tests/__mocks__/main/DbService.ts +++ b/tests/__mocks__/main/DbService.ts @@ -34,6 +34,15 @@ export class MockMainDbService { public getDb = vi.fn(() => this.db) + /** + * Serialized write transaction mock. Mirrors `DbService.withWriteTx`: + * passes the current db (or whatever was set via `setDb`) to `fn` so tests + * exercising the write path do not need to know about the production mutex + * + BUSY retry machinery. Tests can replace this mock with `vi.spyOn(...)` + * to assert call order, simulate BUSY, etc. + */ + public withWriteTx = vi.fn(async (fn: (tx: unknown) => Promise): Promise => fn(this.db)) + public get isReady() { return this._isReady } @@ -59,6 +68,7 @@ export const MockMainDbServiceUtils = { */ resetMocks: () => { mockInstance.getDb.mockClear() + mockInstance.withWriteTx.mockClear() // Reset default db mocks Object.values(defaultMockDb).forEach((method) => { @@ -95,6 +105,7 @@ export const MockMainDbServiceUtils = { * Get mock call counts for debugging */ getMockCallCounts: () => ({ - getDb: mockInstance.getDb.mock.calls.length + getDb: mockInstance.getDb.mock.calls.length, + withWriteTx: mockInstance.withWriteTx.mock.calls.length }) }