Move the renderer-side AI-streaming runtime (IpcChatTransport, TopicStreamSubscription, streamDispatchCoordinator) out of the top-level src/renderer/transport/ directory into the shared services/aiTransport/ bucket. By shape these are stateful runtime singletons/classes, and the runtime is cross-surface (consumed by chat, quick-assistant, selection), so per the renderer architecture it routes into services/, not its own top-level directory. - Add a curated index.ts barrel exposing only the externally consumed symbols (ipcChatTransport, TopicStreamSubscription, ExecutionTerminal); the class, dispatch coordinator and helpers stay private. - Update the 6 consumer sites (5 imports + 1 vi.mock) to the barrel. - Sync architecture and AI docs to the new path; drop the now-resolved transport/ deviation from the renderer-architecture pending table.
11 KiB
Execution Overlay
The renderer-side counterpart of Main's pipeStreamLoop. Both sides
use the same pure assembler —
AI SDK's readUIMessageStream —
to turn the chunk stream into a CherryUIMessage. Main writes the
result to disk; the renderer paints it onto the chat surface as an
overlay above the SWR-backed history.
Why the same merge function on both sides
UIMessageChunk assembly is non-trivial: text deltas merge by id,
reasoning blocks have their own start/delta/end, tool calls go through
tool-input-start / tool-input-delta / tool-input-available /
tool-output-available, dynamic data parts merge by key, multi-step
turns carry step boundaries. Re-implementing any of this on the
renderer would mean a second source of truth that had to track AI SDK
upstream, with two ways to disagree about partial state.
Running the same readUIMessageStream on the same UIMessageChunk
stream — once on Main (writing to exec.finalMessage), once on the
renderer (driving the overlay) — guarantees structural agreement.
What persists is exactly what the user saw streaming.
Main: pipeStreamLoop(stream)
tee()
├─ branch A → broadcast to listeners → WebContentsListener → IPC chunks
└─ branch B → readUIMessageStream → exec.finalMessage (writes to DB)
▲
│ (DB write)
│
Renderer: TopicStreamSubscription ┌──── readUIMessageStream → snapshot
│ │ │ ▲
│ ▼ │ │
│ routes chunks by │ fed by branch stream
│ executionId + anchor │
│ into stream branches ───────┘
▼
branch ReadableStream → useExecutionOverlay (per execution)
TopicStreamSubscription
src/renderer/services/aiTransport/TopicStreamSubscription.ts. A renderer
class that owns:
- One IPC attach per topic.
attachis ref-counted — every execution that callsregister(executionId, anchorMessageId)increments the count; the lastunregistertriggersdetach(deferred one microtask so a transientactiveExecutionsflicker doesn't detach-then-reattach). - Execution + anchor demux. Each
register(executionId, anchorMessageId)returns aReadableStream<UIMessageChunk>for that model writing to that assistant row. Multi-model parallel responses get separate branches byexecutionId; same-model steer continuations get separate branches byanchorMessageId. - Anchor is part of stream identity.
executionIdnames the model, not the assistant row. During a steer continuation, Main can close A1a and immediately open A2 with the same model id. Chunks for A2 can arrive before React registers A2's reader, so the transport must buffer them underexecutionId + anchorMessageIdinstead of routing them to the closed A1a branch. - Synchronous controller creation. The branch's
ReadableStreamDefaultControlleris created during thenew ReadableStream({ start })call (synchronous), so chunks that arrived betweenregisterand the reader's firstread()are already buffered in the stream's internal queue — late readers never miss replayed chunks. - Terminal demux.
Ai_StreamDone/Ai_StreamErrorclose the matching branch and fan out anExecutionTerminal({ isAbort, isError }) to listeners; if the payload carriesisTopicDoneor noexecutionId, every branch terminates together.
Cancellation layering — do not conflate
| Layer | Owner | Action |
|---|---|---|
| Renderer-local subscription | TopicStreamSubscription.unregister / dispose |
Closes the branch reader, drops listener ref count; Main keeps generating |
| Generation abort | Main (via useChatWithHistory.stop → Chat → Ai_Stream_Abort) |
Stops the LLM |
TopicStreamSubscription NEVER aborts the LLM. Closing all branches
is the renderer equivalent of streamDetach — Main keeps streaming,
other windows keep observing.
Defensive routing
A chunk without executionId is unexpected — Main always tags chat
chunks. As a defensive fallback, if exactly one branch is registered
the chunk routes there; otherwise it's dropped with a warning.
useExecutionOverlay
src/renderer/hooks/useExecutionOverlay.ts. The per-execution
overlay, built on useTopicStreamSubscription.
const { overlay, liveAssistants, disposeOverlay, reset } = useExecutionOverlay(
topicId,
activeExecutions, // ActiveExecution[] from useTopicStreamStatus
uiMessages, // current DB snapshot
{ onFinish }
)
One reader per turn, zero cross-turn state
Each execution gets a one-shot readUIMessageStream reader per
turn, not a stateful AI SDK Chat. A Chat carries
state.messages across turns; reusing it made a new turn resume from
the previous turn's finished assistant ("previous answer + new
stream"). A fresh reader per turn structurally cannot pollute.
The seed rule (continue-safe)
function pickSeed(uiMessages, anchorMessageId): CherryUIMessage | undefined {
if (!anchorMessageId) return undefined
const found = uiMessages.find((m) => m.id === anchorMessageId)
if (!found) return { id: anchorMessageId, role: 'assistant', parts: [] }
// `readUIMessageStream` mutates `message.parts` in place, and `found` is the live
// SWR-derived row — clone the parts so the reader only ever writes to a throwaway.
return { ...found, parts: structuredClone(found.parts ?? []) }
}
The reader is seeded with the message whose id is the execution's
anchorMessageId, taken from the current DB truth at reader-start
time. Two cases:
- Fresh placeholder — the SQLite row has empty parts; the seed is effectively empty and the reader builds the message from scratch.
- Tool-approval / continue-conversation — the row already carries
the prior assistant parts (including the unresolved
tool-inputpart the approval was on). A streamedtool-outputchunk then merges cleanly onto its matchingtool-inputbecause they share the sametoolCallId.
The seed is re-derived from DB on every reader start; it never carries
across turns, and its parts are cloned so the reader's in-place mutation
never touches the SWR row. Combined with the fresh reader, this is the
structural anti-pollution guarantee — not "force empty parts" or "diff
against last frame".
Lifecycle
- Topic switch — every reader is cancelled, every branch
unregistered,
snapshotscleared.prevTopicRefis checked in the render body so the cleanup runs synchronously before the new topic's readers start. activeExecutionschange — diff against the current reader map: cancel + unregister executions no longer in the active list; for newly-active executions, register a branch, clear any retained prior snapshot, kick a new reader.- Terminal — the branch is closed by
TopicStreamSubscription; the reader'sfor awaitexits. TheonFinish(executionId, event)callback fires with the final snapshot +{ isAbort, isError }. - Unmount — every reader is cancelled, every branch unregistered.
Overlay teardown is monotonic
disposeOverlay(messageId) drops exactly one snapshot entry. The chat
shell wires this so the overlay is released only after the DB
refresh promise resolves (see .finally(() => disposeOverlay(...)) in
V2ChatContent). That ordering eliminates the visible flash between
"streaming overlay" and "persisted parts": the SWR cache holds the
authoritative row before the overlay disappears.
The renderer never writes streamed parts to SWR — writing them would race the DB-authoritative refresh and cause flicker.
Why retained snapshots after terminal
The hook keeps the final snapshot in snapshots until one of:
- the same execution restarts (next turn clears it),
- the caller calls
disposeOverlay(messageId)(post-persist handoff), - the caller calls
reset()(e.g. quick-assistant clear), - the topic switches (effect clears all snapshots).
That retention lets consumers read the final frame for the brief window between stream-end and DB-refresh-complete without going through SWR.
React binding
useTopicStreamSubscription(topicId) is the React wrapper:
- Lazy-init per
topicId(idiom mirrorsuseState(() => ...)). - Disposed on unmount or topic switch — drops the Main listener and closes every branch.
Each mounted topic gets one TopicStreamSubscription instance, shared
by every consumer in that React tree (today: useExecutionOverlay).
Code map
src/renderer/services/aiTransport/TopicStreamSubscription.ts ← class
src/renderer/hooks/useTopicStreamSubscription.ts ← React binding
src/renderer/hooks/useExecutionOverlay.ts ← per-execution readers + overlay
src/renderer/pages/home/V2ChatContent.tsx ← consumer + dispose-after-refresh
Invariants reviewers should check
- Same merge function on both sides. Any code that re-implements
chunk → message assembly on the renderer (instead of feeding
readUIMessageStream) is wrong — that's where Main and renderer will diverge first. - One reader per branch identity. No reader should be reused
across
activeExecutionstransitions where eitherexecutionIdoranchorMessageIdchanges. Reusing one is what the v1Chatbug was; keying only by model id reintroduces the same problem for same-model steer continuations. - Seed from current DB.
pickSeedreadsuiMessagesRef.currentat reader-start time. Stashing the seed on first mount and reusing it across turns would defeat the continue-conversation case. - Overlay disposed after DB refresh. Any
disposeOverlay(messageId)call that runs before the DB revalidation promise resolves is a flicker bug. TopicStreamSubscriptionnever aborts. It only detaches. Anything in this layer that callsAi_Stream_Abortis in the wrong place — abort belongs touseChatWithHistory.stop.- Ref-counted attach. A new attach must NOT fire when another execution is already registered for the same topic. A new detach must NOT fire while any execution still has a branch.
Where to read more
- Main-side accumulator: Stream Manager —
pipeStreamLoop - IPC envelope: IPC Transport
- Topic status / approval-anchor surfacing: Tool Approval
- AI SDK upstream:
readUIMessageStreamreference