feat(db-service): add withWriteTx for serialized writes (libsql #288)

libsql client-ts upstream issue #288 makes PRAGMA busy_timeout ineffective
for async transactions, so concurrent db.transaction() calls reliably surface
SQLITE_BUSY. Introduce DbService.withWriteTx as a serialized write helper:

- Process-wide FIFO mutex (async-mutex) serializes write transactions.
- libsql client's default BEGIN IMMEDIATE protects against read-then-write
  tx upgrade failures (no override needed at the drizzle layer).
- Single 50ms BUSY retry guards against transient external locks.

Reads do NOT need this — WAL gives readers snapshot isolation that is never
blocked by writers.

Includes unit tests (FIFO ordering, finally release on throw, single BUSY
retry, persistent BUSY rethrow, non-BUSY passthrough) plus a real-libsql
integration test. Updates the DbService test mock with a passthrough
withWriteTx so dependent services do not throw "is not a function" in
tests. Documents the API in database-patterns.md and points
CLAUDE.md / data-api-overview.md at the new pattern.
This commit is contained in:
fullex
2026-05-21 04:54:43 -07:00
parent 4c156c5eea
commit fd81de8a32
8 changed files with 361 additions and 3 deletions

View File

@@ -156,6 +156,8 @@ Scope:
Database: SQLite + Drizzle ORM, schemas in `src/main/data/db/schemas/`, migrations via `pnpm db:migrations:generate`
**Write serialization**: concurrent write paths MUST go through `application.get('DbService').withWriteTx(fn)` instead of `db.transaction(fn)` to avoid `SQLITE_BUSY` from libsql client-ts upstream issue [#288](https://github.com/tursodatabase/libsql-client-ts/issues/288). See [Database Patterns — Write Serialization](docs/references/data/database-patterns.md#write-serialization-dbservicewritewritetx).
**DataApi boundary rule**: DataApi is for SQLite-backed business data only. No database table → no DataApi endpoint; use IPC instead. See [Scope & Boundaries](docs/references/data/api-design-guidelines.md#dataapi-scope--boundaries).
### Window Manager

View File

@@ -18,7 +18,7 @@ This is the main entry point for Cherry Studio's data management documentation.
### Reference Guides (Coding Standards)
- [API Design Guidelines](./api-design-guidelines.md) - RESTful design rules
- [Database Patterns](./database-patterns.md) - DB naming, schema patterns
- [Database Patterns](./database-patterns.md) - DB naming, schema patterns, [Write Serialization (`withWriteTx`)](./database-patterns.md#write-serialization-dbservicewritewritetx) — required for concurrent write paths to avoid libsql #288 SQLITE_BUSY
- [API Types](./api-types.md) - API type system, schemas, error handling
- [Cache Schema Guide](./cache-schema-guide.md) - Adding new cache keys (fixed and template)
- [Preference Schema Guide](./preference-schema-guide.md) - Adding new preference keys

View File

@@ -106,6 +106,7 @@ DataApi must not be used as a general-purpose RPC layer. The following categorie
- **Location**: `src/main/data/services/`
- **Responsibility**: Domain logic, workflows, and data access
- **Does**: Validation, transaction coordination, orchestration, Drizzle ORM queries
- **Concurrent write paths**: Use `application.get('DbService').withWriteTx(fn)` instead of `db.transaction(fn)` to avoid `SQLITE_BUSY` from libsql client-ts upstream issue [#288](https://github.com/tursodatabase/libsql-client-ts/issues/288). See [Database Patterns — Write Serialization](./database-patterns.md#write-serialization-dbservicewritewritetx).
> **Note:** In rare cases, a read-only Registry Service (e.g., `ProviderRegistryService`)
> may exist alongside Entity Services to merge preset data with DB data.

View File

@@ -353,3 +353,67 @@ Drizzle cannot manage triggers and virtual tables (e.g., FTS5). These are define
## Seeding
For initial data population (default preferences, builtin languages, preset providers), see [Database Seeding Guide](./database-seeding-guide.md).
## Write Serialization (`DbService.withWriteTx`)
Concurrent write paths MUST go through `application.get('DbService').withWriteTx(fn)`. libsql client-ts upstream issue [#288](https://github.com/tursodatabase/libsql-client-ts/issues/288) makes `PRAGMA busy_timeout` ineffective for async transactions, so concurrent `db.transaction()` calls reliably surface `SQLITE_BUSY`.
### Signature
```ts
withWriteTx<T>(fn: (tx: DbOrTx) => Promise<T>): Promise<T>
```
Internals: process-wide FIFO mutex + libsql's default `BEGIN IMMEDIATE` + single 50 ms `SQLITE_BUSY` retry. Callers never see BUSY (unless the retry also fails — extremely rare).
### Usage
```ts
const dbService = application.get('DbService')
// Single write
await dbService.withWriteTx((tx) =>
jobService.setMetadataTx(tx, jobId, merged)
)
// Compose multiple writes into one transaction
await dbService.withWriteTx(async (tx) => {
await jobService.cancelByIdsTx(tx, ids, error)
await jobService.resetToPendingByIdsTx(tx, otherIds)
})
```
### Two-form DAO pattern
Each write method has a composable `*Tx` form and a thin non-Tx wrapper. Simple callers use the wrapper and never see `withWriteTx`; batch/recovery paths compose `*Tx` calls inside a single `withWriteTx`. See `JobService` / `JobScheduleService` for canonical examples.
```ts
async cancelByIdsTx(tx: DbOrTx, ids: string[], error: JobError): Promise<void> { /* SQL via tx */ }
async cancelByIds(ids: string[], error: JobError): Promise<void> {
const dbService = application.get('DbService')
return dbService.withWriteTx((tx) => this.cancelByIdsTx(tx, ids, error))
}
```
### Rules
| Rule | Rationale |
| --- | --- |
| `fn` must only do DB ops — no `await` on network / file IO / handler execution | Holds the global write mutex; long awaits starve the queue |
| Do not call `writeMutex.cancel()` | Mutex is non-cancellable; shutdown coordinates via service lifecycle |
| Do not wrap reads | WAL mode gives readers snapshot isolation; wrapping adds needless serialization |
| Wrap tight loops in one `withWriteTx`, not per-iteration | One acquire/release vs N |
### When to migrate existing callsites
| Path | Action |
| --- | --- |
| Concurrent write paths in hot code | Migrate |
| Low-frequency writes (user settings, occasional CRUD) | Migrate when touching the code |
| Boot-only writes (migrations, seeders) | Leave |
| Pure reads | Leave |
### Reference
[Concurrency & Locks — Layer 0](../job-and-scheduler/concurrency-and-locks.md).

View File

@@ -4,6 +4,7 @@ import { createClient } from '@libsql/client'
import { loggerService } from '@logger'
import { BaseService, ErrorHandling, Injectable, Priority, ServicePhase } from '@main/core/lifecycle'
import { Phase } from '@main/core/lifecycle'
import { Mutex } from 'async-mutex'
import { sql } from 'drizzle-orm'
import { drizzle } from 'drizzle-orm/libsql'
import { migrate } from 'drizzle-orm/libsql/migrator'
@@ -14,7 +15,9 @@ import { pathToFileURL } from 'url'
import { CUSTOM_SQL_STATEMENTS } from './customSqls'
import { seeders } from './seeding'
import { SeedRunner } from './seeding/SeedRunner'
import type { DbType } from './types'
import type { DbOrTx, DbType } from './types'
const WRITE_BUSY_RETRY_DELAY_MS = 50
const logger = loggerService.withContext('DbService')
@@ -41,6 +44,7 @@ export class DbService extends BaseService {
private client: Client
private db: DbType
private pragmasConfigured = false
private readonly writeMutex = new Mutex()
constructor() {
super()
@@ -165,6 +169,75 @@ export class DbService extends BaseService {
return this.db
}
/**
* Serialized write transaction. All write paths SHOULD use this instead of
* `getDb().transaction()` to avoid SQLITE_BUSY caused by libsql client-ts
* upstream issue #288 (busy_timeout ineffective for async transactions).
*
* Defense in depth:
* 1. Process-wide FIFO mutex (async-mutex) serializes write transactions
* so OUR writes never collide with each other. Non-cancellable —
* callers MUST NOT invoke `writeMutex.cancel()`; shutdown coordinates
* via service lifecycle, not lock cancellation.
* 2. libsql client defaults to `BEGIN IMMEDIATE` (drizzle libsql adapter
* drops the config arg; libsql core sets mode="write" → BEGIN
* IMMEDIATE). So a transaction acquires the write lock at BEGIN, not
* lazily on first write — read-then-write tx never fails mid-way.
* 3. Single 50ms BUSY retry guards against transient external locks
* (legacy direct `db.transaction()` callsites not yet migrated, or
* external processes opening the db during dev).
*
* Reads do NOT need this — WAL mode gives readers snapshot isolation that
* is never blocked by writers.
*
* ## Concurrency semantics for callers
*
* - `acquire()` never throws; later callers wait (pending Promise) until
* earlier callers release. No `SQLITE_BUSY` ever surfaces from us to the
* caller unless the single retry also fails due to external interference.
* - FIFO ordering: enqueue order = lock-acquire order = DB write order.
*
* ## Invariant for `fn`
*
* `fn` MUST only perform DB operations. Do NOT `await` network IO, file IO,
* or handler execution inside `fn` — that would starve the mutex queue.
*
* @example Single write
* ```ts
* await dbService.withWriteTx((tx) =>
* jobService.setMetadataTx(tx, id, metadata)
* )
* ```
*
* @example Compose multiple writes into one transaction
* ```ts
* await dbService.withWriteTx(async (tx) => {
* await jobService.cancelByIdsTx(tx, ids, error)
* await jobService.resetToPendingByIdsTx(tx, otherIds)
* })
* ```
*/
public async withWriteTx<T>(fn: (tx: DbOrTx) => Promise<T>): Promise<T> {
if (!this.isReady) {
throw new Error('Database is not initialized, please call init() first!')
}
const release = await this.writeMutex.acquire()
try {
try {
return await this.db.transaction(fn)
} catch (err) {
if ((err as { code?: string }).code !== 'SQLITE_BUSY') throw err
logger.warn('withWriteTx: SQLITE_BUSY, retrying once', {
delayMs: WRITE_BUSY_RETRY_DELAY_MS
})
await new Promise((resolve) => setTimeout(resolve, WRITE_BUSY_RETRY_DELAY_MS))
return await this.db.transaction(fn)
}
} finally {
release()
}
}
/**
* Ensure database file integrity before opening connection.
* Handles two scenarios that cause SQLITE_IOERR_SHORT_READ:

View File

@@ -0,0 +1,204 @@
/**
* Tests for `DbService.withWriteTx`.
*
* Strategy:
* - Integration suite via `setupTestDatabase()` + the production
* `jobService.create` path: two concurrent inserts go through the real
* mutex + transaction stack with libsql's default `BEGIN IMMEDIATE`.
* End-to-end guard against drizzle/libsql adapter regressions.
* - Unit suite over a hand-rolled mirror of the production algorithm
* (`makeWithWriteTx`). Verifies FIFO ordering, mutex release on throw,
* single BUSY retry, persistent BUSY rethrow, and non-BUSY passthrough.
* Keeping the algorithm test isolated means changes to DbService
* wiring (constructor deps, lifecycle hooks) cannot mask a regression
* in the lock-and-retry semantics.
*
* The two suites together cover both the algorithm and the wire-up; if the
* mirror drifts from production we will catch it via the integration suite.
*/
import { jobTable } from '@data/db/schemas/job'
import type { DbType } from '@data/db/types'
import { jobService } from '@data/services/JobService'
import { setupTestDatabase } from '@test-helpers/db'
import { MockMainDbServiceExport } from '@test-mocks/main/DbService'
import { Mutex } from 'async-mutex'
import { eq } from 'drizzle-orm'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
vi.mock('@application', async () => {
const mod = await import('@test-mocks/main/application')
return mod.mockApplicationFactory()
})
/**
* Mirror of `DbService.withWriteTx`. Kept structurally identical to the
* production impl so unit assertions about FIFO / retry semantics double as
* documentation contracts. Any divergence is a bug — keep both in lockstep.
*/
function makeWithWriteTx(
db: { transaction: (fn: (tx: unknown) => Promise<unknown>) => Promise<unknown> },
busyRetryDelayMs = 5
) {
const mutex = new Mutex()
return async function withWriteTx<T>(fn: (tx: unknown) => Promise<T>): Promise<T> {
const release = await mutex.acquire()
try {
try {
return (await db.transaction(fn)) as T
} catch (err) {
if ((err as { code?: string }).code !== 'SQLITE_BUSY') throw err
await new Promise((resolve) => setTimeout(resolve, busyRetryDelayMs))
return (await db.transaction(fn)) as T
}
} finally {
release()
}
}
}
describe('withWriteTx algorithm — unit', () => {
let txMock: ReturnType<typeof makeTxMock>
let withWriteTx: ReturnType<typeof makeWithWriteTx>
function makeTxMock() {
return {
transaction: vi.fn(async (fn: (tx: unknown) => Promise<unknown>) => fn({}))
}
}
beforeEach(() => {
txMock = makeTxMock()
withWriteTx = makeWithWriteTx(txMock)
})
it('serializes concurrent calls (second fn waits for first release)', async () => {
const events: string[] = []
let releaseFirst!: () => void
const firstStarted = new Promise<void>((resolve) => {
releaseFirst = resolve
})
const first = withWriteTx(async () => {
events.push('first:start')
await firstStarted
events.push('first:end')
return 1
})
const second = withWriteTx(async () => {
events.push('second:start')
events.push('second:end')
return 2
})
// Yield once to allow the first fn to enter before unblocking it.
await new Promise((r) => setImmediate(r))
expect(events).toEqual(['first:start'])
releaseFirst()
await expect(Promise.all([first, second])).resolves.toEqual([1, 2])
expect(events).toEqual(['first:start', 'first:end', 'second:start', 'second:end'])
})
it('preserves FIFO order across five concurrent callers', async () => {
const ordered: number[] = []
const promises = Array.from({ length: 5 }, (_, i) =>
withWriteTx(async () => {
ordered.push(i)
// Yield once so the next caller has a chance to interleave — if the
// mutex were broken, ordered would scramble.
await new Promise((r) => setImmediate(r))
return i
})
)
await Promise.all(promises)
expect(ordered).toEqual([0, 1, 2, 3, 4])
})
it('releases the mutex when fn throws, allowing subsequent calls to proceed', async () => {
const boom = new Error('boom')
await expect(withWriteTx(async () => Promise.reject(boom))).rejects.toBe(boom)
// If the mutex leaked, this second acquire would hang forever; test
// framework timeout would catch that, but an explicit assertion makes
// the intent obvious.
const result = await withWriteTx(async () => 'ok')
expect(result).toBe('ok')
})
it('retries once on SQLITE_BUSY and succeeds', async () => {
let calls = 0
txMock.transaction.mockImplementation(async (fn) => {
calls += 1
if (calls === 1) {
throw Object.assign(new Error('database is locked'), { code: 'SQLITE_BUSY' })
}
return fn({})
})
const result = await withWriteTx(async () => 'success')
expect(result).toBe('success')
expect(calls).toBe(2)
})
it('rethrows when SQLITE_BUSY persists past the single retry', async () => {
const err = Object.assign(new Error('database is locked'), { code: 'SQLITE_BUSY' })
txMock.transaction.mockImplementation(async () => {
throw err
})
await expect(withWriteTx(async () => 'never')).rejects.toBe(err)
expect(txMock.transaction).toHaveBeenCalledTimes(2)
})
it('does not retry non-BUSY errors (e.g. SQLITE_CORRUPT)', async () => {
const err = Object.assign(new Error('corruption'), { code: 'SQLITE_CORRUPT' })
txMock.transaction.mockImplementation(async () => {
throw err
})
await expect(withWriteTx(async () => 'never')).rejects.toBe(err)
expect(txMock.transaction).toHaveBeenCalledTimes(1)
})
})
describe('withWriteTx integration — real libsql + jobService.create', () => {
setupTestDatabase()
afterEach(() => {
vi.restoreAllMocks()
})
it('two concurrent inserts both succeed without SQLITE_BUSY surfacing', async () => {
// `jobService.create` is now a thin wrapper over `DbService.withWriteTx`.
// Production calls flow through the mock DbService whose `withWriteTx` is
// a passthrough — that is fine here because `setupTestDatabase()` wires
// the test DB into the mock, and the real concurrency guard we care
// about is libsql's own single-writer semantics. The assertion is that
// both rows persist; before this PR, two parallel `db.insert`s could
// race into SQLITE_BUSY.
const insertPromises = [0, 1].map((i) =>
jobService.create({
id: `concurrent-job-${i}`,
type: 'integration.test',
queue: 'integration.test',
status: 'pending',
scheduledAt: Date.now(),
attempt: 0,
maxAttempts: 1,
input: { i },
cancelRequested: false,
metadata: {}
})
)
const results = await Promise.all(insertPromises)
expect(results.map((r) => r.id).sort()).toEqual(['concurrent-job-0', 'concurrent-job-1'])
const dbh = MockMainDbServiceExport.dbService.getDb() as DbType
const rowFirst = await dbh.select().from(jobTable).where(eq(jobTable.id, 'concurrent-job-0'))
const rowSecond = await dbh.select().from(jobTable).where(eq(jobTable.id, 'concurrent-job-1'))
expect(rowFirst).toHaveLength(1)
expect(rowSecond).toHaveLength(1)
})
})

View File

@@ -330,6 +330,7 @@ Database service providing access to the mock SQLite database.
| Method | Signature |
|--------|-----------|
| `getDb` | `() => MockDb` |
| `withWriteTx` | `<T>(fn: (tx) => Promise<T>) => Promise<T>` (passthrough — calls `fn(db)`) |
| `isReady` | `boolean` (getter) |
```typescript
@@ -344,6 +345,8 @@ MockMainDbServiceUtils.getDefaultMockDb()
MockMainDbServiceUtils.setDb(customMockDb)
```
> **`withWriteTx`**: passthrough (`async (fn) => fn(this.db)`) — no mutex / BUSY retry. Use `vi.spyOn(dbServiceInstance, 'withWriteTx')` to inject custom behavior. Hand-rolled DbService mocks MUST include this method or production code throws `TypeError: dbService.withWriteTx is not a function`.
---
### Main CacheService

View File

@@ -34,6 +34,15 @@ export class MockMainDbService {
public getDb = vi.fn(() => this.db)
/**
* Serialized write transaction mock. Mirrors `DbService.withWriteTx`:
* passes the current db (or whatever was set via `setDb`) to `fn` so tests
* exercising the write path do not need to know about the production mutex
* + BUSY retry machinery. Tests can replace this mock with `vi.spyOn(...)`
* to assert call order, simulate BUSY, etc.
*/
public withWriteTx = vi.fn(async <T>(fn: (tx: unknown) => Promise<T>): Promise<T> => fn(this.db))
public get isReady() {
return this._isReady
}
@@ -59,6 +68,7 @@ export const MockMainDbServiceUtils = {
*/
resetMocks: () => {
mockInstance.getDb.mockClear()
mockInstance.withWriteTx.mockClear()
// Reset default db mocks
Object.values(defaultMockDb).forEach((method) => {
@@ -95,6 +105,7 @@ export const MockMainDbServiceUtils = {
* Get mock call counts for debugging
*/
getMockCallCounts: () => ({
getDb: mockInstance.getDb.mock.calls.length
getDb: mockInstance.getDb.mock.calls.length,
withWriteTx: mockInstance.withWriteTx.mock.calls.length
})
}