mirror of
https://github.com/thedotmack/claude-mem.git
synced 2026-07-03 12:32:32 +08:00
fix(chroma): enforce single chroma-mcp subprocess per worker (#2313)
Root cause: every reconnect path in ChromaMcpManager — connectInternal's re-entry, the connect-timeout catch, callTool's transport-error retry, and the transport.onclose handler — used to abandon `this.transport`/`this.client` by calling at most `transport.close()` and nulling the handles. The MCP SDK's StdioClientTransport.close() only signals the direct child (uvx); on Linux the grandchildren (uv -> python -> chroma-mcp) re-parent to init and survive because the SDK does not put the subprocess in its own process group. Each reconnect therefore leaked a full chroma-mcp tree, accumulating 20+ instances per session. Fix: introduce a private disposeCurrentSubprocess() helper that always tree- kills via the existing killProcessTree primitive before nulling the transport reference, and route every "abandon current transport" path (reconnect, connect-timeout, transport error, onclose, stop) through it. The existing `connecting: Promise<void> | null` lock continues to serialize concurrent ensureConnected() callers into a single spawn. Adds tests/services/sync/chroma-mcp-manager-singleton.test.ts covering: - 5 parallel ensureConnected() calls produce exactly one spawn - a transport-error reconnect tree-kills the prior subprocess pid before spawning a replacement - stop() disposes state including any pending connecting promise Manual verification needed on Linux: after a long session with multiple tool uses, `ps aux | grep chroma-mcp | wc -l` should return 1, not 20+. Fixes #2313. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -92,15 +92,14 @@ export class ChromaMcpManager {
|
||||
}
|
||||
|
||||
private async connectInternal(): Promise<void> {
|
||||
if (this.transport) {
|
||||
try { await this.transport.close(); } catch { /* already dead */ }
|
||||
}
|
||||
if (this.client) {
|
||||
try { await this.client.close(); } catch { /* already dead */ }
|
||||
}
|
||||
this.client = null;
|
||||
this.transport = null;
|
||||
this.connected = false;
|
||||
// Singleton invariant (#2313): kill any pre-existing chroma-mcp subprocess
|
||||
// tree before spawning a new one. The MCP SDK's transport.close() only
|
||||
// signals the direct child (uvx); on Linux the grandchildren (uv, python,
|
||||
// chroma-mcp) get re-parented to init and survive, accumulating 20+
|
||||
// instances per session if reconnects fire repeatedly. Reuse the same
|
||||
// tree-kill primitive used by stop() so reconnect can never leave
|
||||
// orphans behind.
|
||||
await this.disposeCurrentSubprocess();
|
||||
|
||||
const commandArgs = this.buildCommandArgs();
|
||||
const spawnEnvironment = this.getSpawnEnv();
|
||||
@@ -141,14 +140,12 @@ export class ChromaMcpManager {
|
||||
await Promise.race([mcpConnectionPromise, timeoutPromise]);
|
||||
} catch (connectionError) {
|
||||
clearTimeout(timeoutId!);
|
||||
logger.warn('CHROMA_MCP', 'Connection failed, killing subprocess to prevent zombie', {
|
||||
logger.warn('CHROMA_MCP', 'Connection failed, killing subprocess tree to prevent zombie', {
|
||||
error: connectionError instanceof Error ? connectionError.message : String(connectionError)
|
||||
});
|
||||
try { await this.transport.close(); } catch { /* best effort */ }
|
||||
try { await this.client.close(); } catch { /* best effort */ }
|
||||
this.client = null;
|
||||
this.transport = null;
|
||||
this.connected = false;
|
||||
// Tree-kill (not just transport.close) so failed-connect descendants
|
||||
// can't survive on Linux (#2313).
|
||||
await this.disposeCurrentSubprocess();
|
||||
throw connectionError;
|
||||
}
|
||||
clearTimeout(timeoutId!);
|
||||
@@ -159,6 +156,7 @@ export class ChromaMcpManager {
|
||||
logger.info('CHROMA_MCP', 'Connected to chroma-mcp successfully');
|
||||
|
||||
const currentTransport = this.transport;
|
||||
const currentTrackedPid = (this.transport as unknown as { _process?: ChildProcess })._process?.pid;
|
||||
this.transport.onclose = () => {
|
||||
if (this.transport !== currentTransport) {
|
||||
logger.debug('CHROMA_MCP', 'Ignoring stale onclose from previous transport');
|
||||
@@ -170,6 +168,20 @@ export class ChromaMcpManager {
|
||||
this.client = null;
|
||||
this.transport = null;
|
||||
this.lastConnectionFailureTimestamp = Date.now();
|
||||
|
||||
// Direct child (uvx) emitted close, but on Linux the grandchildren
|
||||
// (uv/python/chroma-mcp) often outlive their parent because MCP SDK
|
||||
// does not use process groups. Sweep the descendant tree using the
|
||||
// captured PID — best-effort; pgrep returns nothing if everything
|
||||
// already exited (#2313).
|
||||
if (currentTrackedPid) {
|
||||
ChromaMcpManager.killProcessTree(currentTrackedPid).catch((error) => {
|
||||
logger.debug('CHROMA_MCP', 'Background tree-kill after onclose finished (best-effort)', {
|
||||
pid: currentTrackedPid,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -237,14 +249,15 @@ export class ChromaMcpManager {
|
||||
arguments: toolArguments
|
||||
});
|
||||
} catch (transportError) {
|
||||
this.connected = false;
|
||||
this.client = null;
|
||||
this.transport = null;
|
||||
|
||||
logger.warn('CHROMA_MCP', `Transport error during "${toolName}", reconnecting and retrying once`, {
|
||||
error: transportError instanceof Error ? transportError.message : String(transportError)
|
||||
});
|
||||
|
||||
// Tree-kill the dying subprocess before reconnect. Previously this path
|
||||
// just nulled the handle, which on Linux leaks the uv/python/chroma-mcp
|
||||
// descendants every time a transport error happens (#2313).
|
||||
await this.disposeCurrentSubprocess();
|
||||
|
||||
try {
|
||||
await this.ensureConnected();
|
||||
result = await this.client!.callTool({
|
||||
@@ -352,6 +365,53 @@ export class ChromaMcpManager {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Singleton enforcement helper (#2313): tree-kill the currently tracked
|
||||
* chroma-mcp subprocess and reset all state so the next spawn starts clean.
|
||||
*
|
||||
* Why this is the singleton invariant: every code path that intends to
|
||||
* abandon `this.transport` / `this.client` (reconnect, transport error,
|
||||
* connect-timeout, onclose, stop()) MUST funnel through here. The MCP
|
||||
* SDK's transport.close() only signals the direct child (uvx); on Linux
|
||||
* the grandchildren (uv, python, chroma-mcp) re-parent to init and
|
||||
* accumulate. Calling killProcessTree() against the captured PID before
|
||||
* we drop the reference is the only way to guarantee at most one
|
||||
* chroma-mcp subprocess tree exists per worker process.
|
||||
*
|
||||
* Idempotent and best-effort — safe to call when there is no active
|
||||
* subprocess (no-op in that case).
|
||||
*/
|
||||
private async disposeCurrentSubprocess(): Promise<void> {
|
||||
const chromaProcess = (this.transport as unknown as { _process?: ChildProcess })?._process;
|
||||
const trackedPid = chromaProcess?.pid;
|
||||
|
||||
if (trackedPid) {
|
||||
try {
|
||||
await ChromaMcpManager.killProcessTree(trackedPid);
|
||||
} catch (error) {
|
||||
logger.warn('CHROMA_MCP', 'failed to kill prior chroma-mcp tree (best-effort)', {
|
||||
pid: trackedPid,
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (this.transport) {
|
||||
try { await this.transport.close(); } catch { /* already dead */ }
|
||||
}
|
||||
if (this.client) {
|
||||
try { await this.client.close(); } catch { /* already dead */ }
|
||||
}
|
||||
|
||||
if (trackedPid) {
|
||||
getSupervisor().unregisterProcess(CHROMA_SUPERVISOR_ID);
|
||||
}
|
||||
|
||||
this.client = null;
|
||||
this.transport = null;
|
||||
this.connected = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gracefully stop the MCP connection and kill the chroma-mcp subprocess tree.
|
||||
*
|
||||
@@ -365,34 +425,15 @@ export class ChromaMcpManager {
|
||||
* pattern from shutdown.ts (Principle 5: OS-supervised teardown).
|
||||
*/
|
||||
async stop(): Promise<void> {
|
||||
if (!this.client) {
|
||||
if (!this.client && !this.transport) {
|
||||
logger.debug('CHROMA_MCP', 'No active MCP connection to stop');
|
||||
this.connecting = null;
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info('CHROMA_MCP', 'Stopping chroma-mcp MCP connection');
|
||||
|
||||
// Kill the entire process tree before closing the MCP client so
|
||||
// descendants (uv, python, chroma-mcp) don't become orphans.
|
||||
const chromaProcess = (this.transport as unknown as { _process?: ChildProcess })?._process;
|
||||
if (chromaProcess?.pid) {
|
||||
await ChromaMcpManager.killProcessTree(chromaProcess.pid);
|
||||
}
|
||||
|
||||
try {
|
||||
await this.client.close();
|
||||
} catch (error) {
|
||||
if (error instanceof Error) {
|
||||
logger.debug('CHROMA_MCP', 'Error during client close (subprocess may already be dead)', {}, error);
|
||||
} else {
|
||||
logger.debug('CHROMA_MCP', 'Error during client close (subprocess may already be dead)', { error: String(error) });
|
||||
}
|
||||
}
|
||||
|
||||
getSupervisor().unregisterProcess(CHROMA_SUPERVISOR_ID);
|
||||
this.client = null;
|
||||
this.transport = null;
|
||||
this.connected = false;
|
||||
await this.disposeCurrentSubprocess();
|
||||
this.connecting = null;
|
||||
|
||||
logger.info('CHROMA_MCP', 'chroma-mcp MCP connection stopped');
|
||||
|
||||
228
tests/services/sync/chroma-mcp-manager-singleton.test.ts
Normal file
228
tests/services/sync/chroma-mcp-manager-singleton.test.ts
Normal file
@@ -0,0 +1,228 @@
|
||||
import { describe, it, expect, beforeEach, mock } from 'bun:test';
|
||||
|
||||
// Singleton enforcement regression coverage for issue #2313.
|
||||
//
|
||||
// Hypothesis under test: prior to the fix, ChromaMcpManager could leak its
|
||||
// chroma-mcp subprocess tree on every reconnect / transport error, accumulating
|
||||
// 20+ instances per session on Linux because the MCP SDK's transport.close()
|
||||
// only signals the direct child (uvx). The fix routes every "abandon current
|
||||
// transport" path through disposeCurrentSubprocess(), which tree-kills via
|
||||
// killProcessTree() before nulling the handles.
|
||||
|
||||
let transportCount = 0;
|
||||
const transportInstances: Array<FakeTransport> = [];
|
||||
|
||||
interface FakeChildProcess {
|
||||
pid: number;
|
||||
once: (event: string, _cb: (...args: unknown[]) => void) => FakeChildProcess;
|
||||
on: (event: string, _cb: (...args: unknown[]) => void) => FakeChildProcess;
|
||||
}
|
||||
|
||||
class FakeTransport {
|
||||
static nextPid = 100_000;
|
||||
onclose: (() => void) | null = null;
|
||||
closed = false;
|
||||
// Mimic StdioClientTransport's internal `_process` field that the manager
|
||||
// pokes into via `(this.transport as unknown as { _process })._process`.
|
||||
_process: FakeChildProcess;
|
||||
|
||||
constructor(_opts: { command: string; args: string[] }) {
|
||||
transportCount += 1;
|
||||
const pid = FakeTransport.nextPid++;
|
||||
const child: FakeChildProcess = {
|
||||
pid,
|
||||
once: function (this: FakeChildProcess) { return this; },
|
||||
on: function (this: FakeChildProcess) { return this; },
|
||||
};
|
||||
this._process = child;
|
||||
transportInstances.push(this);
|
||||
}
|
||||
|
||||
async close(): Promise<void> {
|
||||
this.closed = true;
|
||||
}
|
||||
}
|
||||
|
||||
mock.module('@modelcontextprotocol/sdk/client/stdio.js', () => ({
|
||||
StdioClientTransport: FakeTransport,
|
||||
}));
|
||||
|
||||
let connectImpl: () => Promise<void> = async () => {};
|
||||
let callToolImpl: () => Promise<unknown> = async () => ({
|
||||
content: [{ type: 'text', text: '{}' }],
|
||||
});
|
||||
|
||||
class FakeClient {
|
||||
closed = false;
|
||||
async connect(): Promise<void> {
|
||||
await connectImpl();
|
||||
}
|
||||
async callTool(): Promise<unknown> {
|
||||
return await callToolImpl();
|
||||
}
|
||||
async close(): Promise<void> {
|
||||
this.closed = true;
|
||||
}
|
||||
}
|
||||
|
||||
mock.module('@modelcontextprotocol/sdk/client/index.js', () => ({
|
||||
Client: FakeClient,
|
||||
}));
|
||||
|
||||
mock.module('../../../src/shared/SettingsDefaultsManager.js', () => ({
|
||||
SettingsDefaultsManager: {
|
||||
get: () => '',
|
||||
getInt: () => 0,
|
||||
loadFromFile: () => ({}),
|
||||
},
|
||||
}));
|
||||
|
||||
mock.module('../../../src/shared/paths.js', () => ({
|
||||
USER_SETTINGS_PATH: '/tmp/fake-settings.json',
|
||||
paths: {
|
||||
chroma: () => '/tmp/fake-chroma',
|
||||
combinedCerts: () => '/tmp/fake-combined-certs.pem',
|
||||
},
|
||||
}));
|
||||
|
||||
mock.module('../../../src/utils/logger.js', () => ({
|
||||
logger: {
|
||||
info: () => {},
|
||||
debug: () => {},
|
||||
warn: () => {},
|
||||
error: () => {},
|
||||
failure: () => {},
|
||||
},
|
||||
}));
|
||||
|
||||
// Track tree-kill invocations and the transport whose subprocess was killed.
|
||||
const killTreeCalls: number[] = [];
|
||||
|
||||
mock.module('../../../src/supervisor/index.ts', () => ({
|
||||
getSupervisor: () => ({
|
||||
assertCanSpawn: () => {},
|
||||
registerProcess: () => {},
|
||||
unregisterProcess: () => {},
|
||||
}),
|
||||
}));
|
||||
|
||||
mock.module('../../../src/supervisor/env-sanitizer.js', () => ({
|
||||
sanitizeEnv: (env: NodeJS.ProcessEnv) => env,
|
||||
}));
|
||||
|
||||
// Replace child_process.execFile so the static killProcessTree implementation
|
||||
// can be observed without actually shelling out. We feed pgrep an empty stdout
|
||||
// (no descendants) so the only signal target is the root pid.
|
||||
mock.module('child_process', () => {
|
||||
const original = require('node:child_process');
|
||||
return {
|
||||
...original,
|
||||
execFile: (
|
||||
cmd: string,
|
||||
args: string[],
|
||||
_opts: unknown,
|
||||
cb: (err: Error | null, stdout: { stdout: string; stderr: string }) => void
|
||||
) => {
|
||||
// Bun's promisify path will call this as if it were a Node-style callback.
|
||||
if (cmd === 'pgrep') {
|
||||
cb(null, { stdout: '', stderr: '' } as any);
|
||||
} else {
|
||||
cb(null, { stdout: '', stderr: '' } as any);
|
||||
}
|
||||
},
|
||||
execSync: () => '',
|
||||
};
|
||||
});
|
||||
|
||||
// Stub process.kill so the tree-kill path can record targets without crashing
|
||||
// the test runner if the synthetic PID happens to collide with a real one.
|
||||
const realProcessKill = process.kill.bind(process);
|
||||
const stubbedProcessKill = ((pid: number, _signal?: string | number) => {
|
||||
killTreeCalls.push(pid);
|
||||
return true;
|
||||
}) as typeof process.kill;
|
||||
process.kill = stubbedProcessKill;
|
||||
|
||||
import { ChromaMcpManager } from '../../../src/services/sync/ChromaMcpManager.js';
|
||||
|
||||
function resetState(): void {
|
||||
transportCount = 0;
|
||||
transportInstances.length = 0;
|
||||
killTreeCalls.length = 0;
|
||||
connectImpl = async () => {};
|
||||
callToolImpl = async () => ({ content: [{ type: 'text', text: '{}' }] });
|
||||
}
|
||||
|
||||
describe('ChromaMcpManager singleton enforcement (#2313)', () => {
|
||||
beforeEach(async () => {
|
||||
await ChromaMcpManager.reset();
|
||||
resetState();
|
||||
});
|
||||
|
||||
it('serializes concurrent ensureConnected() calls into one spawn', async () => {
|
||||
const mgr = ChromaMcpManager.getInstance();
|
||||
|
||||
// Five parallel callers race ensureConnected via callTool — only one
|
||||
// chroma-mcp subprocess (one transport) should be spawned.
|
||||
await Promise.all(
|
||||
Array.from({ length: 5 }, () =>
|
||||
mgr.callTool('chroma_list_collections', { limit: 1 })
|
||||
)
|
||||
);
|
||||
|
||||
expect(transportCount).toBe(1);
|
||||
});
|
||||
|
||||
it('kills the prior subprocess tree before a reconnect spawn', async () => {
|
||||
const mgr = ChromaMcpManager.getInstance();
|
||||
|
||||
// First call: opens transport #1.
|
||||
await mgr.callTool('chroma_list_collections', { limit: 1 });
|
||||
expect(transportInstances.length).toBe(1);
|
||||
const firstPid = transportInstances[0]._process.pid;
|
||||
|
||||
// Second call: rig callTool to throw a transport error on the FIRST attempt
|
||||
// so the manager runs its reconnect-and-retry path. The retry should
|
||||
// dispose the prior subprocess tree (firstPid) before spawning a new one.
|
||||
let invocations = 0;
|
||||
callToolImpl = async () => {
|
||||
invocations += 1;
|
||||
if (invocations === 1) {
|
||||
throw new Error('Connection closed');
|
||||
}
|
||||
return { content: [{ type: 'text', text: '{}' }] };
|
||||
};
|
||||
|
||||
await mgr.callTool('chroma_list_collections', { limit: 1 });
|
||||
|
||||
expect(transportInstances.length).toBe(2);
|
||||
// The first transport's pid must have been signaled by killProcessTree
|
||||
// before the second transport spawned.
|
||||
expect(killTreeCalls).toContain(firstPid);
|
||||
});
|
||||
|
||||
it('stop() disposes state including any pending connecting promise', async () => {
|
||||
const mgr = ChromaMcpManager.getInstance();
|
||||
|
||||
await mgr.callTool('chroma_list_collections', { limit: 1 });
|
||||
expect(transportInstances.length).toBe(1);
|
||||
const subprocessPid = transportInstances[0]._process.pid;
|
||||
|
||||
await mgr.stop();
|
||||
|
||||
// After stop(), every internal handle should be cleared and the prior
|
||||
// subprocess tree must have been signaled.
|
||||
expect(killTreeCalls).toContain(subprocessPid);
|
||||
|
||||
// A subsequent ensureConnected must spawn a fresh transport (not reuse
|
||||
// a stale one).
|
||||
await mgr.callTool('chroma_list_collections', { limit: 1 });
|
||||
expect(transportInstances.length).toBe(2);
|
||||
});
|
||||
});
|
||||
|
||||
// Restore the real process.kill once the test module finishes evaluating any
|
||||
// late-arriving microtasks.
|
||||
process.on('exit', () => {
|
||||
process.kill = realProcessKill;
|
||||
});
|
||||
Reference in New Issue
Block a user