fix(chroma): enforce single chroma-mcp subprocess per worker (#2313)

Root cause: every reconnect path in ChromaMcpManager — connectInternal's
re-entry, the connect-timeout catch, callTool's transport-error retry, and
the transport.onclose handler — used to abandon `this.transport`/`this.client`
by calling at most `transport.close()` and nulling the handles. The MCP SDK's
StdioClientTransport.close() only signals the direct child (uvx); on Linux the
grandchildren (uv -> python -> chroma-mcp) re-parent to init and survive
because the SDK does not put the subprocess in its own process group. Each
reconnect therefore leaked a full chroma-mcp tree, accumulating 20+ instances
per session.

Fix: introduce a private disposeCurrentSubprocess() helper that always tree-
kills via the existing killProcessTree primitive before nulling the transport
reference, and route every "abandon current transport" path (reconnect,
connect-timeout, transport error, onclose, stop) through it. The existing
`connecting: Promise<void> | null` lock continues to serialize concurrent
ensureConnected() callers into a single spawn.

Adds tests/services/sync/chroma-mcp-manager-singleton.test.ts covering:
- 5 parallel ensureConnected() calls produce exactly one spawn
- a transport-error reconnect tree-kills the prior subprocess pid before
  spawning a replacement
- stop() disposes state including any pending connecting promise

Manual verification needed on Linux: after a long session with multiple
tool uses, `ps aux | grep chroma-mcp | wc -l` should return 1, not 20+.

Fixes #2313.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alex Newman
2026-05-09 01:31:32 -07:00
parent ecd5b80297
commit c79324ea17
2 changed files with 310 additions and 41 deletions

View File

@@ -92,15 +92,14 @@ export class ChromaMcpManager {
}
private async connectInternal(): Promise<void> {
if (this.transport) {
try { await this.transport.close(); } catch { /* already dead */ }
}
if (this.client) {
try { await this.client.close(); } catch { /* already dead */ }
}
this.client = null;
this.transport = null;
this.connected = false;
// Singleton invariant (#2313): kill any pre-existing chroma-mcp subprocess
// tree before spawning a new one. The MCP SDK's transport.close() only
// signals the direct child (uvx); on Linux the grandchildren (uv, python,
// chroma-mcp) get re-parented to init and survive, accumulating 20+
// instances per session if reconnects fire repeatedly. Reuse the same
// tree-kill primitive used by stop() so reconnect can never leave
// orphans behind.
await this.disposeCurrentSubprocess();
const commandArgs = this.buildCommandArgs();
const spawnEnvironment = this.getSpawnEnv();
@@ -141,14 +140,12 @@ export class ChromaMcpManager {
await Promise.race([mcpConnectionPromise, timeoutPromise]);
} catch (connectionError) {
clearTimeout(timeoutId!);
logger.warn('CHROMA_MCP', 'Connection failed, killing subprocess to prevent zombie', {
logger.warn('CHROMA_MCP', 'Connection failed, killing subprocess tree to prevent zombie', {
error: connectionError instanceof Error ? connectionError.message : String(connectionError)
});
try { await this.transport.close(); } catch { /* best effort */ }
try { await this.client.close(); } catch { /* best effort */ }
this.client = null;
this.transport = null;
this.connected = false;
// Tree-kill (not just transport.close) so failed-connect descendants
// can't survive on Linux (#2313).
await this.disposeCurrentSubprocess();
throw connectionError;
}
clearTimeout(timeoutId!);
@@ -159,6 +156,7 @@ export class ChromaMcpManager {
logger.info('CHROMA_MCP', 'Connected to chroma-mcp successfully');
const currentTransport = this.transport;
const currentTrackedPid = (this.transport as unknown as { _process?: ChildProcess })._process?.pid;
this.transport.onclose = () => {
if (this.transport !== currentTransport) {
logger.debug('CHROMA_MCP', 'Ignoring stale onclose from previous transport');
@@ -170,6 +168,20 @@ export class ChromaMcpManager {
this.client = null;
this.transport = null;
this.lastConnectionFailureTimestamp = Date.now();
// Direct child (uvx) emitted close, but on Linux the grandchildren
// (uv/python/chroma-mcp) often outlive their parent because MCP SDK
// does not use process groups. Sweep the descendant tree using the
// captured PID — best-effort; pgrep returns nothing if everything
// already exited (#2313).
if (currentTrackedPid) {
ChromaMcpManager.killProcessTree(currentTrackedPid).catch((error) => {
logger.debug('CHROMA_MCP', 'Background tree-kill after onclose finished (best-effort)', {
pid: currentTrackedPid,
error: error instanceof Error ? error.message : String(error)
});
});
}
};
}
@@ -237,14 +249,15 @@ export class ChromaMcpManager {
arguments: toolArguments
});
} catch (transportError) {
this.connected = false;
this.client = null;
this.transport = null;
logger.warn('CHROMA_MCP', `Transport error during "${toolName}", reconnecting and retrying once`, {
error: transportError instanceof Error ? transportError.message : String(transportError)
});
// Tree-kill the dying subprocess before reconnect. Previously this path
// just nulled the handle, which on Linux leaks the uv/python/chroma-mcp
// descendants every time a transport error happens (#2313).
await this.disposeCurrentSubprocess();
try {
await this.ensureConnected();
result = await this.client!.callTool({
@@ -352,6 +365,53 @@ export class ChromaMcpManager {
}
}
/**
* Singleton enforcement helper (#2313): tree-kill the currently tracked
* chroma-mcp subprocess and reset all state so the next spawn starts clean.
*
* Why this is the singleton invariant: every code path that intends to
* abandon `this.transport` / `this.client` (reconnect, transport error,
* connect-timeout, onclose, stop()) MUST funnel through here. The MCP
* SDK's transport.close() only signals the direct child (uvx); on Linux
* the grandchildren (uv, python, chroma-mcp) re-parent to init and
* accumulate. Calling killProcessTree() against the captured PID before
* we drop the reference is the only way to guarantee at most one
* chroma-mcp subprocess tree exists per worker process.
*
* Idempotent and best-effort — safe to call when there is no active
* subprocess (no-op in that case).
*/
private async disposeCurrentSubprocess(): Promise<void> {
const chromaProcess = (this.transport as unknown as { _process?: ChildProcess })?._process;
const trackedPid = chromaProcess?.pid;
if (trackedPid) {
try {
await ChromaMcpManager.killProcessTree(trackedPid);
} catch (error) {
logger.warn('CHROMA_MCP', 'failed to kill prior chroma-mcp tree (best-effort)', {
pid: trackedPid,
error: error instanceof Error ? error.message : String(error)
});
}
}
if (this.transport) {
try { await this.transport.close(); } catch { /* already dead */ }
}
if (this.client) {
try { await this.client.close(); } catch { /* already dead */ }
}
if (trackedPid) {
getSupervisor().unregisterProcess(CHROMA_SUPERVISOR_ID);
}
this.client = null;
this.transport = null;
this.connected = false;
}
/**
* Gracefully stop the MCP connection and kill the chroma-mcp subprocess tree.
*
@@ -365,34 +425,15 @@ export class ChromaMcpManager {
* pattern from shutdown.ts (Principle 5: OS-supervised teardown).
*/
async stop(): Promise<void> {
if (!this.client) {
if (!this.client && !this.transport) {
logger.debug('CHROMA_MCP', 'No active MCP connection to stop');
this.connecting = null;
return;
}
logger.info('CHROMA_MCP', 'Stopping chroma-mcp MCP connection');
// Kill the entire process tree before closing the MCP client so
// descendants (uv, python, chroma-mcp) don't become orphans.
const chromaProcess = (this.transport as unknown as { _process?: ChildProcess })?._process;
if (chromaProcess?.pid) {
await ChromaMcpManager.killProcessTree(chromaProcess.pid);
}
try {
await this.client.close();
} catch (error) {
if (error instanceof Error) {
logger.debug('CHROMA_MCP', 'Error during client close (subprocess may already be dead)', {}, error);
} else {
logger.debug('CHROMA_MCP', 'Error during client close (subprocess may already be dead)', { error: String(error) });
}
}
getSupervisor().unregisterProcess(CHROMA_SUPERVISOR_ID);
this.client = null;
this.transport = null;
this.connected = false;
await this.disposeCurrentSubprocess();
this.connecting = null;
logger.info('CHROMA_MCP', 'chroma-mcp MCP connection stopped');

View File

@@ -0,0 +1,228 @@
import { describe, it, expect, beforeEach, mock } from 'bun:test';
// Singleton enforcement regression coverage for issue #2313.
//
// Hypothesis under test: prior to the fix, ChromaMcpManager could leak its
// chroma-mcp subprocess tree on every reconnect / transport error, accumulating
// 20+ instances per session on Linux because the MCP SDK's transport.close()
// only signals the direct child (uvx). The fix routes every "abandon current
// transport" path through disposeCurrentSubprocess(), which tree-kills via
// killProcessTree() before nulling the handles.
let transportCount = 0;
const transportInstances: Array<FakeTransport> = [];
interface FakeChildProcess {
pid: number;
once: (event: string, _cb: (...args: unknown[]) => void) => FakeChildProcess;
on: (event: string, _cb: (...args: unknown[]) => void) => FakeChildProcess;
}
class FakeTransport {
static nextPid = 100_000;
onclose: (() => void) | null = null;
closed = false;
// Mimic StdioClientTransport's internal `_process` field that the manager
// pokes into via `(this.transport as unknown as { _process })._process`.
_process: FakeChildProcess;
constructor(_opts: { command: string; args: string[] }) {
transportCount += 1;
const pid = FakeTransport.nextPid++;
const child: FakeChildProcess = {
pid,
once: function (this: FakeChildProcess) { return this; },
on: function (this: FakeChildProcess) { return this; },
};
this._process = child;
transportInstances.push(this);
}
async close(): Promise<void> {
this.closed = true;
}
}
mock.module('@modelcontextprotocol/sdk/client/stdio.js', () => ({
StdioClientTransport: FakeTransport,
}));
let connectImpl: () => Promise<void> = async () => {};
let callToolImpl: () => Promise<unknown> = async () => ({
content: [{ type: 'text', text: '{}' }],
});
class FakeClient {
closed = false;
async connect(): Promise<void> {
await connectImpl();
}
async callTool(): Promise<unknown> {
return await callToolImpl();
}
async close(): Promise<void> {
this.closed = true;
}
}
mock.module('@modelcontextprotocol/sdk/client/index.js', () => ({
Client: FakeClient,
}));
mock.module('../../../src/shared/SettingsDefaultsManager.js', () => ({
SettingsDefaultsManager: {
get: () => '',
getInt: () => 0,
loadFromFile: () => ({}),
},
}));
mock.module('../../../src/shared/paths.js', () => ({
USER_SETTINGS_PATH: '/tmp/fake-settings.json',
paths: {
chroma: () => '/tmp/fake-chroma',
combinedCerts: () => '/tmp/fake-combined-certs.pem',
},
}));
mock.module('../../../src/utils/logger.js', () => ({
logger: {
info: () => {},
debug: () => {},
warn: () => {},
error: () => {},
failure: () => {},
},
}));
// Track tree-kill invocations and the transport whose subprocess was killed.
const killTreeCalls: number[] = [];
mock.module('../../../src/supervisor/index.ts', () => ({
getSupervisor: () => ({
assertCanSpawn: () => {},
registerProcess: () => {},
unregisterProcess: () => {},
}),
}));
mock.module('../../../src/supervisor/env-sanitizer.js', () => ({
sanitizeEnv: (env: NodeJS.ProcessEnv) => env,
}));
// Replace child_process.execFile so the static killProcessTree implementation
// can be observed without actually shelling out. We feed pgrep an empty stdout
// (no descendants) so the only signal target is the root pid.
mock.module('child_process', () => {
const original = require('node:child_process');
return {
...original,
execFile: (
cmd: string,
args: string[],
_opts: unknown,
cb: (err: Error | null, stdout: { stdout: string; stderr: string }) => void
) => {
// Bun's promisify path will call this as if it were a Node-style callback.
if (cmd === 'pgrep') {
cb(null, { stdout: '', stderr: '' } as any);
} else {
cb(null, { stdout: '', stderr: '' } as any);
}
},
execSync: () => '',
};
});
// Stub process.kill so the tree-kill path can record targets without crashing
// the test runner if the synthetic PID happens to collide with a real one.
const realProcessKill = process.kill.bind(process);
const stubbedProcessKill = ((pid: number, _signal?: string | number) => {
killTreeCalls.push(pid);
return true;
}) as typeof process.kill;
process.kill = stubbedProcessKill;
import { ChromaMcpManager } from '../../../src/services/sync/ChromaMcpManager.js';
function resetState(): void {
transportCount = 0;
transportInstances.length = 0;
killTreeCalls.length = 0;
connectImpl = async () => {};
callToolImpl = async () => ({ content: [{ type: 'text', text: '{}' }] });
}
describe('ChromaMcpManager singleton enforcement (#2313)', () => {
beforeEach(async () => {
await ChromaMcpManager.reset();
resetState();
});
it('serializes concurrent ensureConnected() calls into one spawn', async () => {
const mgr = ChromaMcpManager.getInstance();
// Five parallel callers race ensureConnected via callTool — only one
// chroma-mcp subprocess (one transport) should be spawned.
await Promise.all(
Array.from({ length: 5 }, () =>
mgr.callTool('chroma_list_collections', { limit: 1 })
)
);
expect(transportCount).toBe(1);
});
it('kills the prior subprocess tree before a reconnect spawn', async () => {
const mgr = ChromaMcpManager.getInstance();
// First call: opens transport #1.
await mgr.callTool('chroma_list_collections', { limit: 1 });
expect(transportInstances.length).toBe(1);
const firstPid = transportInstances[0]._process.pid;
// Second call: rig callTool to throw a transport error on the FIRST attempt
// so the manager runs its reconnect-and-retry path. The retry should
// dispose the prior subprocess tree (firstPid) before spawning a new one.
let invocations = 0;
callToolImpl = async () => {
invocations += 1;
if (invocations === 1) {
throw new Error('Connection closed');
}
return { content: [{ type: 'text', text: '{}' }] };
};
await mgr.callTool('chroma_list_collections', { limit: 1 });
expect(transportInstances.length).toBe(2);
// The first transport's pid must have been signaled by killProcessTree
// before the second transport spawned.
expect(killTreeCalls).toContain(firstPid);
});
it('stop() disposes state including any pending connecting promise', async () => {
const mgr = ChromaMcpManager.getInstance();
await mgr.callTool('chroma_list_collections', { limit: 1 });
expect(transportInstances.length).toBe(1);
const subprocessPid = transportInstances[0]._process.pid;
await mgr.stop();
// After stop(), every internal handle should be cleared and the prior
// subprocess tree must have been signaled.
expect(killTreeCalls).toContain(subprocessPid);
// A subsequent ensureConnected must spawn a fresh transport (not reuse
// a stale one).
await mgr.callTool('chroma_list_collections', { limit: 1 });
expect(transportInstances.length).toBe(2);
});
});
// Restore the real process.kill once the test module finishes evaluating any
// late-arriving microtasks.
process.on('exit', () => {
process.kill = realProcessKill;
});