mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-07-03 16:31:58 +08:00
fix(tui): close busy-flag race that stuck queue-mode back-to-back sends
Under display.busy_input_mode: queue, sending two messages back-to-back
hung the session on 'Analyzing…' until a manual Ctrl+C.
The submit path only marked the session busy inside the .then of an
async input.detect_drop RPC. dispatchSubmission routes queue-vs-send on
getUiState().busy, so a second Enter inside that RPC window read
busy===false and raced a second prompt.submit down the send path
instead of enqueuing locally. The gateway accepts the mid-turn submit
as a success ({status:'queued'}, not an error), and the client's only
re-queue recovery is gated on catching a 'session busy' error — which
never fires — so the message became invisible to the client-side drain
effect and the UI stayed busy forever.
Extract the ready-prompt submit into a pure submissionCore module and
mark the session busy synchronously at the choke point, before the
detect_drop round-trip, closing the gap for every caller (mainline
submit, queue-edit picks, drain, interpolation). Verified the real
gateway already queues+drains both turns correctly, so the fix is
purely client-side. Adds submissionCore.test.ts whose regression
assertions fail without the synchronous busy and pass with it.
This commit is contained in:
131
ui-tui/src/__tests__/submissionCore.test.ts
Normal file
131
ui-tui/src/__tests__/submissionCore.test.ts
Normal file
@@ -0,0 +1,131 @@
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
|
||||
import { isSessionBusyError, markSubmitting, submitPrompt, type SubmitPromptDeps } from '../app/submissionCore.js'
|
||||
import { getUiState, patchUiState, resetUiState } from '../app/uiStore.js'
|
||||
import type { GatewayClient } from '../gatewayClient.js'
|
||||
|
||||
// A gateway double whose `input.detect_drop` resolution we control, so we can
|
||||
// observe UI state DURING the async gap — the exact window the queue-mode race
|
||||
// lived in.
|
||||
function makeDeferredGateway() {
|
||||
let resolveDrop: (v: unknown) => void = () => {}
|
||||
|
||||
const dropPromise = new Promise(res => {
|
||||
resolveDrop = res
|
||||
})
|
||||
|
||||
const calls: string[] = []
|
||||
|
||||
const gw = {
|
||||
request: vi.fn((method: string) => {
|
||||
calls.push(method)
|
||||
|
||||
if (method === 'input.detect_drop') {
|
||||
return dropPromise
|
||||
}
|
||||
|
||||
// prompt.submit et al: resolve immediately with a success shape.
|
||||
return Promise.resolve({ status: 'streaming' })
|
||||
})
|
||||
} as unknown as GatewayClient
|
||||
|
||||
return { calls, gw, resolveDrop: (v: unknown = { matched: false }) => resolveDrop(v) }
|
||||
}
|
||||
|
||||
function makeDeps(gw: GatewayClient, over: Partial<SubmitPromptDeps> = {}): SubmitPromptDeps {
|
||||
return {
|
||||
appendMessage: vi.fn(),
|
||||
enqueue: vi.fn(),
|
||||
expand: (t: string) => t,
|
||||
gw,
|
||||
maybeGoodVibes: vi.fn(),
|
||||
setLastUserMsg: vi.fn(),
|
||||
sys: vi.fn(),
|
||||
...over
|
||||
}
|
||||
}
|
||||
|
||||
describe('submissionCore.submitPrompt — synchronous busy (queue-race fix)', () => {
|
||||
beforeEach(() => {
|
||||
resetUiState()
|
||||
patchUiState({ sid: 'sess-1' })
|
||||
})
|
||||
|
||||
it('flips busy=true SYNCHRONOUSLY, before input.detect_drop resolves', () => {
|
||||
const { gw, resolveDrop } = makeDeferredGateway()
|
||||
|
||||
expect(getUiState().busy).toBe(false)
|
||||
|
||||
submitPrompt('hello', makeDeps(gw))
|
||||
|
||||
// The critical invariant: busy is already true even though the
|
||||
// detect_drop RPC has NOT resolved yet. This is what makes a second,
|
||||
// rapid submit take the local-enqueue branch instead of racing a second
|
||||
// prompt.submit onto the backend.
|
||||
expect(getUiState().busy).toBe(true)
|
||||
expect(getUiState().status).toBe('running…')
|
||||
|
||||
resolveDrop()
|
||||
})
|
||||
|
||||
it('regression: two back-to-back sends — the SECOND sees busy=true in the gap', async () => {
|
||||
const { gw, resolveDrop } = makeDeferredGateway()
|
||||
|
||||
// Emulate dispatchSubmission's routing decision: it sends only when
|
||||
// busy===false, otherwise it would enqueue. We assert the state the
|
||||
// router reads, which is the real regression.
|
||||
submitPrompt('first message', makeDeps(gw))
|
||||
|
||||
// Before the fix, busy was still false here (set only inside detect_drop's
|
||||
// .then), so a second Enter would wrongly route into send() again.
|
||||
const busyWhenSecondArrives = getUiState().busy
|
||||
expect(busyWhenSecondArrives).toBe(true)
|
||||
|
||||
resolveDrop()
|
||||
await Promise.resolve()
|
||||
})
|
||||
|
||||
it('does not submit when there is no session, and does not mark busy', () => {
|
||||
resetUiState() // sid: null
|
||||
const { gw, calls } = makeDeferredGateway()
|
||||
const sys = vi.fn()
|
||||
|
||||
submitPrompt('hello', makeDeps(gw, { sys }))
|
||||
|
||||
expect(getUiState().busy).toBe(false)
|
||||
expect(sys).toHaveBeenCalledWith('session not ready yet')
|
||||
expect(calls).not.toContain('input.detect_drop')
|
||||
})
|
||||
|
||||
it('after detect_drop resolves (no file), it issues prompt.submit', async () => {
|
||||
const { calls, gw, resolveDrop } = makeDeferredGateway()
|
||||
|
||||
submitPrompt('hi there', makeDeps(gw))
|
||||
expect(calls).toEqual(['input.detect_drop'])
|
||||
|
||||
resolveDrop({ matched: false })
|
||||
await Promise.resolve()
|
||||
await Promise.resolve()
|
||||
|
||||
expect(calls).toContain('prompt.submit')
|
||||
})
|
||||
})
|
||||
|
||||
describe('submissionCore.markSubmitting', () => {
|
||||
beforeEach(() => resetUiState())
|
||||
|
||||
it('sets busy + running status', () => {
|
||||
markSubmitting()
|
||||
expect(getUiState().busy).toBe(true)
|
||||
expect(getUiState().status).toBe('running…')
|
||||
})
|
||||
})
|
||||
|
||||
describe('submissionCore.isSessionBusyError', () => {
|
||||
it('matches the legacy busy rejections but not arbitrary errors', () => {
|
||||
expect(isSessionBusyError(new Error('session busy'))).toBe(true)
|
||||
expect(isSessionBusyError(new Error('waiting for model response'))).toBe(true)
|
||||
expect(isSessionBusyError(new Error('some other failure'))).toBe(false)
|
||||
expect(isSessionBusyError('not an error')).toBe(false)
|
||||
})
|
||||
})
|
||||
111
ui-tui/src/app/submissionCore.ts
Normal file
111
ui-tui/src/app/submissionCore.ts
Normal file
@@ -0,0 +1,111 @@
|
||||
import { attachedImageNotice } from '../domain/messages.js'
|
||||
import type { GatewayClient } from '../gatewayClient.js'
|
||||
import type { InputDetectDropResponse, PromptSubmitResponse } from '../gatewayTypes.js'
|
||||
import type { Msg } from '../types.js'
|
||||
|
||||
import { turnController } from './turnController.js'
|
||||
import { getUiState, patchUiState } from './uiStore.js'
|
||||
|
||||
const SESSION_BUSY_RE = /session busy|waiting for model response/i
|
||||
|
||||
export const isSessionBusyError = (e: unknown) => e instanceof Error && SESSION_BUSY_RE.test(e.message)
|
||||
|
||||
export interface SubmitPromptDeps {
|
||||
appendMessage: (msg: Msg) => void
|
||||
enqueue: (text: string) => void
|
||||
expand: (text: string) => string
|
||||
gw: GatewayClient
|
||||
maybeGoodVibes: (text: string) => void
|
||||
setLastUserMsg: (value: string) => void
|
||||
sys: (text: string) => void
|
||||
}
|
||||
|
||||
// Optimistically flip the session to busy the INSTANT a prompt is accepted for
|
||||
// submission — synchronously, before we await anything.
|
||||
//
|
||||
// This is the fix for the queue-mode race (display.busy_input_mode: queue):
|
||||
// the submit path first fires an async `input.detect_drop` RPC and only marked
|
||||
// the session busy inside that RPC's `.then`. A second Enter pressed inside
|
||||
// that round-trip window read `busy === false` in dispatchSubmission and raced
|
||||
// a second `prompt.submit` onto the backend instead of landing in the local
|
||||
// queue. That produced the reported symptom: the second message "waited for
|
||||
// the first to respond, then went to the queue", and the client lost track of
|
||||
// it (the backend accepts a mid-turn submit as {status:"queued"} — a success,
|
||||
// not an error — so the local drain effect that watches the client-side queue
|
||||
// never fires, leaving the UI stuck on "analyzing…" until Ctrl+C).
|
||||
//
|
||||
// Marking busy at the choke point closes the gap for every caller: the mainline
|
||||
// submit, queue-edit picks, and the drain effect all funnel through here.
|
||||
export function markSubmitting(): void {
|
||||
patchUiState({ busy: true, status: 'running…' })
|
||||
}
|
||||
|
||||
// Submit a ready prompt (already resolved to be neither a slash command nor a
|
||||
// shell escape, with a live session). Pulled out of useSubmission so the
|
||||
// synchronous-busy invariant above is unit-testable without React test infra.
|
||||
export function submitPrompt(text: string, deps: SubmitPromptDeps, showUserMessage = true): void {
|
||||
const sid = getUiState().sid
|
||||
|
||||
if (!sid) {
|
||||
return deps.sys('session not ready yet')
|
||||
}
|
||||
|
||||
// Close the async-busy gap up front, before the detect_drop round-trip.
|
||||
markSubmitting()
|
||||
|
||||
const startSubmit = (displayText: string, submitText: string, show = true) => {
|
||||
const liveSid = getUiState().sid
|
||||
|
||||
if (!liveSid) {
|
||||
return deps.sys('session not ready yet')
|
||||
}
|
||||
|
||||
turnController.clearStatusTimer()
|
||||
deps.maybeGoodVibes(submitText)
|
||||
deps.setLastUserMsg(text)
|
||||
|
||||
if (show) {
|
||||
deps.appendMessage({ role: 'user', text: displayText })
|
||||
}
|
||||
|
||||
patchUiState({ busy: true, status: 'running…' })
|
||||
turnController.bufRef = ''
|
||||
turnController.interrupted = false
|
||||
|
||||
deps.gw.request<PromptSubmitResponse>('prompt.submit', { session_id: liveSid, text: submitText }).catch((e: Error) => {
|
||||
// Defensive: prompt.submit no longer rejects a mid-turn send with
|
||||
// "session busy" (the gateway queues it and returns success), but keep
|
||||
// the re-queue path as a safety net for any future/legacy gateway that
|
||||
// still errors, so a message is never silently dropped.
|
||||
if (isSessionBusyError(e)) {
|
||||
deps.enqueue(submitText)
|
||||
patchUiState({ busy: true, status: 'queued for next turn' })
|
||||
|
||||
return deps.sys(`queued: "${submitText.slice(0, 50)}${submitText.length > 50 ? '…' : ''}"`)
|
||||
}
|
||||
|
||||
deps.sys(`error: ${e.message}`)
|
||||
patchUiState({ busy: false, status: 'ready' })
|
||||
})
|
||||
}
|
||||
|
||||
// Always ask the backend whether this looks like a file drop. The backend's
|
||||
// _detect_file_drop handles paths with spaces, quotes, Windows drive letters,
|
||||
// and escaped characters correctly.
|
||||
deps.gw
|
||||
.request<InputDetectDropResponse>('input.detect_drop', { session_id: sid, text })
|
||||
.then(r => {
|
||||
if (!r?.matched) {
|
||||
return startSubmit(text, deps.expand(text), showUserMessage)
|
||||
}
|
||||
|
||||
if (r.is_image) {
|
||||
turnController.pushActivity(attachedImageNotice(r))
|
||||
} else {
|
||||
turnController.pushActivity(`detected file: ${r.name}`)
|
||||
}
|
||||
|
||||
startSubmit(r.text || text, deps.expand(r.text || text), showUserMessage)
|
||||
})
|
||||
.catch(() => startSubmit(text, deps.expand(text), showUserMessage))
|
||||
}
|
||||
@@ -1,28 +1,20 @@
|
||||
import { type MutableRefObject, useCallback, useEffect, useRef } from 'react'
|
||||
|
||||
import { TYPING_IDLE_MS } from '../config/timing.js'
|
||||
import { attachedImageNotice } from '../domain/messages.js'
|
||||
import { completionToApplyOnSubmit, looksLikeSlashCommand } from '../domain/slash.js'
|
||||
import type { GatewayClient } from '../gatewayClient.js'
|
||||
import type {
|
||||
InputDetectDropResponse,
|
||||
PromptSubmitResponse,
|
||||
SessionSteerResponse,
|
||||
ShellExecResponse
|
||||
} from '../gatewayTypes.js'
|
||||
import type { SessionSteerResponse, ShellExecResponse } from '../gatewayTypes.js'
|
||||
import { asRpcResult } from '../lib/rpc.js'
|
||||
import { hasInterpolation, INTERPOLATION_RE } from '../protocol/interpolation.js'
|
||||
import { PASTE_SNIPPET_RE } from '../protocol/paste.js'
|
||||
import type { Msg } from '../types.js'
|
||||
|
||||
import type { ComposerActions, ComposerRefs, ComposerState, PasteSnippet } from './interfaces.js'
|
||||
import { submitPrompt } from './submissionCore.js'
|
||||
import { turnController } from './turnController.js'
|
||||
import { getUiState, patchUiState } from './uiStore.js'
|
||||
|
||||
const DOUBLE_ENTER_MS = 450
|
||||
const SESSION_BUSY_RE = /session busy|waiting for model response/i
|
||||
|
||||
const isSessionBusyError = (e: unknown) => e instanceof Error && SESSION_BUSY_RE.test(e.message)
|
||||
|
||||
const expandSnips = (snips: PasteSnippet[]) => {
|
||||
const byLabel = new Map<string, string[]>()
|
||||
@@ -88,62 +80,19 @@ export function useSubmission(opts: UseSubmissionOptions) {
|
||||
(text: string, showUserMessage = true) => {
|
||||
const expand = expandSnips(composerState.pasteSnips)
|
||||
|
||||
const startSubmit = (displayText: string, submitText: string, showUserMessage = true) => {
|
||||
const sid = getUiState().sid
|
||||
|
||||
if (!sid) {
|
||||
return sys('session not ready yet')
|
||||
}
|
||||
|
||||
turnController.clearStatusTimer()
|
||||
maybeGoodVibes(submitText)
|
||||
setLastUserMsg(text)
|
||||
|
||||
if (showUserMessage) {
|
||||
appendMessage({ role: 'user', text: displayText })
|
||||
}
|
||||
|
||||
patchUiState({ busy: true, status: 'running…' })
|
||||
turnController.bufRef = ''
|
||||
turnController.interrupted = false
|
||||
|
||||
gw.request<PromptSubmitResponse>('prompt.submit', { session_id: sid, text: submitText }).catch((e: Error) => {
|
||||
if (isSessionBusyError(e)) {
|
||||
composerActions.enqueue(submitText)
|
||||
patchUiState({ busy: true, status: 'queued for next turn' })
|
||||
|
||||
return sys(`queued: "${submitText.slice(0, 50)}${submitText.length > 50 ? '…' : ''}"`)
|
||||
}
|
||||
|
||||
sys(`error: ${e.message}`)
|
||||
patchUiState({ busy: false, status: 'ready' })
|
||||
})
|
||||
}
|
||||
|
||||
const sid = getUiState().sid
|
||||
|
||||
if (!sid) {
|
||||
return sys('session not ready yet')
|
||||
}
|
||||
|
||||
// Always ask the backend whether this looks like a file drop.
|
||||
// The backend's _detect_file_drop handles paths with spaces, quotes,
|
||||
// Windows drive letters, and escaped characters correctly.
|
||||
gw.request<InputDetectDropResponse>('input.detect_drop', { session_id: sid, text })
|
||||
.then(r => {
|
||||
if (!r?.matched) {
|
||||
return startSubmit(text, expand(text), showUserMessage)
|
||||
}
|
||||
|
||||
if (r.is_image) {
|
||||
turnController.pushActivity(attachedImageNotice(r))
|
||||
} else {
|
||||
turnController.pushActivity(`detected file: ${r.name}`)
|
||||
}
|
||||
|
||||
startSubmit(r.text || text, expand(r.text || text), showUserMessage)
|
||||
})
|
||||
.catch(() => startSubmit(text, expand(text), showUserMessage))
|
||||
submitPrompt(
|
||||
text,
|
||||
{
|
||||
appendMessage,
|
||||
enqueue: composerActions.enqueue,
|
||||
expand,
|
||||
gw,
|
||||
maybeGoodVibes,
|
||||
setLastUserMsg,
|
||||
sys
|
||||
},
|
||||
showUserMessage
|
||||
)
|
||||
},
|
||||
[appendMessage, composerActions, composerState.pasteSnips, gw, maybeGoodVibes, setLastUserMsg, sys]
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user