fix(gateway): suppress NO_REPLY/[SILENT] markers on the streaming path

The agent emits a bare control marker (NO_REPLY / [SILENT] / …) when it
intentionally chooses not to reply.  The gateway's whole-response filter
(is_intentional_silence_agent_result) suppresses this on the non-streaming
delivery path, but the streaming path (GatewayStreamConsumer) had no silence
awareness: it edited the raw marker onto the screen delta-by-delta and
finalized it BEFORE the whole-response filter could run.  On any
streaming-capable adapter (Slack, Telegram, Discord, …) users saw a literal
'NO_REPLY' message leak into chat.

Fix (contained in the stream consumer + a shared predicate; no new config,
no platform-specific code):

- gateway/response_filters.py: add is_partial_silence_marker() — the
  streaming counterpart to is_intentional_silence_response(), sharing the
  same marker set and canonicalization so the two never drift.
- gateway/stream_consumer.py:
  - Mid-stream hold-back: defer edits while the accumulated buffer is still a
    prefix of a silence marker, so a partial marker never flashes on an
    interval tick.
  - On stream end (got_done): if the final buffer is exactly a marker, retract
    any preview already shown (best-effort delete_message, reusing the
    _try_fresh_final cleanup path) and leave the delivery flags False so the
    gateway's own filter turns the marker into '' and no fallback send fires.

Substantive prose that merely mentions a marker is still delivered normally.

Tests: tests/gateway/test_stream_consumer_silence.py — predicate truth table
+ end-to-end run() suppression (single-shot + token-by-token), preview
retraction, no-delete-support best-effort, [SILENT] parity, and
prose-passthrough. Prove-fail verified by reverting only the consumer change
(the 4 behavioral tests fail: 'NO_REPLY'/'[SILENT]' leaks).
This commit is contained in:
Ben
2026-07-01 13:45:38 +10:00
parent 44ddc552f5
commit b561815eb3
3 changed files with 347 additions and 0 deletions

View File

@@ -51,3 +51,30 @@ def is_intentional_silence_agent_result(agent_result: dict | None, response: Any
if agent_result.get("failed"):
return False
return is_intentional_silence_response(response)
def is_partial_silence_marker(text: Any) -> bool:
"""Return True while ``text`` could still resolve to a silence marker.
The streaming path accumulates the reply delta-by-delta and must decide,
before the whole response is known, whether to show what it has so far.
A buffer whose canonical form is a non-empty *prefix* of a silence marker
(e.g. ``"NO"`` on the way to ``"NO_REPLY"``, or an exact marker that has
not yet been terminated by stream-end) is held back so a raw marker is
never edited onto the screen and then belatedly retracted.
Anything that has already diverged from every marker (ordinary prose) —
and anything longer than the marker cap — returns False so normal
streaming resumes immediately. This is the streaming counterpart to
:func:`is_intentional_silence_response`, sharing the same marker set and
canonicalization so the two never drift.
"""
if not isinstance(text, str):
return False
stripped = text.strip()
if not stripped or len(stripped) > 64:
return False
candidate = _canonical_silence_candidate(stripped)
if not candidate:
return False
return any(marker.startswith(candidate) for marker in LIVE_GATEWAY_SILENT_MARKERS)

View File

@@ -32,6 +32,10 @@ from gateway.config import (
DEFAULT_STREAMING_BUFFER_THRESHOLD as _DEFAULT_STREAMING_BUFFER_THRESHOLD,
DEFAULT_STREAMING_CURSOR as _DEFAULT_STREAMING_CURSOR,
)
from gateway.response_filters import (
is_intentional_silence_response as _is_intentional_silence_response,
is_partial_silence_marker as _is_partial_silence_marker,
)
logger = logging.getLogger("gateway.stream_consumer")
@@ -542,6 +546,22 @@ class GatewayStreamConsumer:
if got_done:
self._flush_think_buffer()
# Intentional-silence suppression. When the agent chose
# not to reply it emits a bare control marker (NO_REPLY /
# [SILENT] / …). The gateway's whole-response filter
# (gateway/run.py) suppresses this on the non-streaming
# path, but by the time it runs the stream consumer has
# already edited the raw marker onto the screen. Detect
# the exact-marker final buffer here and retract any
# preview instead of finalizing it, so the marker never
# reaches the chat. Substantive prose that merely mentions
# a marker is NOT suppressed (see is_intentional_silence_response).
if _is_intentional_silence_response(
self._clean_for_display(self._accumulated)
):
await self._suppress_silence_marker()
return
# Decide whether to flush an edit
now = time.monotonic()
elapsed = now - self._last_edit_time
@@ -562,6 +582,24 @@ class GatewayStreamConsumer:
)
current_update_visible = False
# Hold back mid-stream edits while the buffer so far could
# still resolve to an intentional-silence marker. Without
# this, a partial marker (e.g. "NO_REPLY" streamed as
# "NO"→"NO_REPLY") would flash onto the screen on an interval
# tick before got_done can suppress it. Only defers display —
# got_done above always resolves the buffer (suppress if it's
# an exact marker, otherwise fall through and flush normally),
# so genuine prose that merely starts marker-like is never lost.
if (
should_edit
and not got_done
and not got_segment_break
and commentary_text is None
and _is_partial_silence_marker(
self._clean_for_display(self._accumulated)
)
):
should_edit = False
if should_edit and self._accumulated:
# Split overflow: if accumulated text exceeds the platform
# limit, split into properly sized chunks.
@@ -1359,6 +1397,49 @@ class GatewayStreamConsumer:
self._final_response_sent = True
return True
async def _suppress_silence_marker(self) -> None:
"""Retract any streamed preview when the final reply is a silence marker.
The agent chose not to respond and emitted a bare control marker. Any
preview message the consumer already put on screen (a partial marker
flushed on an interval tick, or a preamble before a tool boundary) must
be removed so the raw marker is never left visible. Deletion reuses the
same best-effort ``delete_message`` path as :meth:`_try_fresh_final`.
Crucially, the delivery flags (``_final_response_sent`` /
``_final_content_delivered``) are left **False**: nothing was delivered.
The gateway then does not mistake the marker for a delivered reply, and
its own whole-response filter turns the marker into "" so no fallback
send happens either. ``_already_sent`` is likewise cleared so the
gateway's ``already_sent`` short-circuits do not fire.
"""
stale_ids = set(self._preview_message_ids)
if self._message_id and self._message_id != "__no_edit__":
stale_ids.add(self._message_id)
delete_fn = getattr(self.adapter, "delete_message", None)
if delete_fn is not None:
for stale_id in stale_ids:
if not stale_id or stale_id == "__no_edit__":
continue
try:
await delete_fn(self.chat_id, stale_id)
except Exception as e:
logger.debug(
"Silence-marker preview cleanup failed (%s): %s",
stale_id, e,
)
self._preview_message_ids = set()
self._message_id = None
self._accumulated = ""
self._last_sent_text = ""
self._already_sent = False
self._final_response_sent = False
self._final_content_delivered = False
logger.info(
"Suppressed streamed intentional-silence marker (chat=%s)",
self.chat_id,
)
async def _send_or_edit(
self, text: str, *, finalize: bool = False, is_turn_final: bool = True,
) -> bool:

View File

@@ -0,0 +1,239 @@
"""Streaming intentional-silence suppression.
When the agent chooses not to reply it emits a bare control marker
(``NO_REPLY`` / ``[SILENT]`` / …). The gateway's whole-response filter
(``gateway/response_filters.is_intentional_silence_agent_result``) suppresses
this on the non-streaming delivery path, but the *streaming* path
(``GatewayStreamConsumer``) previously had no silence awareness: it edited the
raw marker onto the screen delta-by-delta and finalized it *before* the
whole-response filter could run. On any streaming-capable adapter (Slack,
Telegram, Discord, …) users saw a literal ``NO_REPLY`` bubble.
These tests pin the two halves of the fix:
* ``is_partial_silence_marker`` — the mid-stream hold-back predicate.
* ``GatewayStreamConsumer`` — an exact-marker final buffer is suppressed and
any already-shown preview is retracted, while substantive prose that merely
mentions a marker is delivered normally.
"""
from __future__ import annotations
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
import pytest
from gateway.response_filters import (
is_intentional_silence_response,
is_partial_silence_marker,
)
from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig
# --------------------------------------------------------------------------
# is_partial_silence_marker — mid-stream hold-back predicate
# --------------------------------------------------------------------------
# Buffers that could still resolve to a marker → held back while streaming.
PARTIAL_POSITIVE = [
"N",
"NO",
"NO_",
"NO_REP",
"NO_REPLY", # exact marker, not yet terminated by stream-end
"NO REPLY",
"no reply", # canonicalized (case/space-insensitive)
" no_reply ", # surrounding whitespace stripped
"[",
"[SIL",
"[SILENT]",
"SILENT",
"sil",
]
# Buffers that have already diverged from every marker → stream normally.
PARTIAL_NEGATIVE = [
"",
" ",
"No reply needed — here is the plan", # diverged past the marker
"NO_REPLYING", # superset, not a prefix
"Nope",
"Hello there",
"The NO_REPLY token means silence", # marker mentioned mid-prose
"x" * 65, # over the 64-char cap
"silence is golden", # 'SILENCE...' is not a marker prefix
]
@pytest.mark.parametrize("text", PARTIAL_POSITIVE)
def test_partial_silence_marker_positive(text):
assert is_partial_silence_marker(text) is True
@pytest.mark.parametrize("text", PARTIAL_NEGATIVE)
def test_partial_silence_marker_negative(text):
assert is_partial_silence_marker(text) is False
def test_partial_silence_marker_none_safe():
assert is_partial_silence_marker(None) is False
def test_partial_predicate_agrees_with_exact_on_full_markers():
"""Every exact silence marker is also a (trivial) partial of itself."""
from gateway.response_filters import LIVE_GATEWAY_SILENT_MARKERS
for marker in LIVE_GATEWAY_SILENT_MARKERS:
assert is_partial_silence_marker(marker) is True
assert is_intentional_silence_response(marker) is True
# --------------------------------------------------------------------------
# GatewayStreamConsumer — end-to-end suppression through run()
# --------------------------------------------------------------------------
def _make_adapter(*, supports_delete: bool = True) -> MagicMock:
"""Minimal MagicMock adapter wired for send/edit/delete."""
adapter = MagicMock()
adapter.REQUIRES_EDIT_FINALIZE = False
adapter.MAX_MESSAGE_LENGTH = 4096
adapter.send = AsyncMock(return_value=SimpleNamespace(
success=True, message_id="preview_1",
))
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(
success=True, message_id="preview_1",
))
if supports_delete:
adapter.delete_message = AsyncMock(return_value=True)
else:
del adapter.delete_message # type: ignore[attr-defined]
return adapter
def _sent_and_edited(adapter):
texts = []
for call in adapter.send.call_args_list:
texts.append(call.kwargs.get("content", ""))
if getattr(adapter, "edit_message", None) is not None:
for call in adapter.edit_message.call_args_list:
texts.append(call.kwargs.get("content", ""))
return texts
class TestStreamedSilenceSuppression:
@pytest.mark.asyncio
async def test_no_reply_only_stream_is_fully_suppressed(self):
"""A stream whose entire content is NO_REPLY sends nothing visible."""
adapter = _make_adapter()
consumer = GatewayStreamConsumer(
adapter, "chat_1",
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1),
)
consumer.on_delta("NO_REPLY")
consumer.finish()
await consumer.run()
# No marker text ever reached the platform.
for text in _sent_and_edited(adapter):
assert "NO_REPLY" not in text, f"marker leaked: {text!r}"
# Delivery flags stay False so the gateway does not treat the marker
# as a delivered reply (its whole-response filter then drops it too).
assert consumer.final_response_sent is False
assert consumer.final_content_delivered is False
assert consumer.already_sent is False
@pytest.mark.asyncio
async def test_partial_marker_preview_is_retracted(self):
"""A marker flushed mid-stream as a preview is deleted on completion."""
adapter = _make_adapter()
consumer = GatewayStreamConsumer(
adapter, "chat_1",
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1),
)
# Force a mid-stream preview: pretend "NO_REPLY" was already put on
# screen (the pre-fix behaviour) before got_done runs.
consumer._message_id = "preview_1"
consumer._preview_message_ids = {"preview_1"}
consumer._already_sent = True
consumer.on_delta("NO_REPLY")
consumer.finish()
await consumer.run()
# The stale preview was best-effort deleted.
adapter.delete_message.assert_awaited_once_with("chat_1", "preview_1")
assert consumer.final_content_delivered is False
assert consumer.already_sent is False
@pytest.mark.asyncio
async def test_suppression_without_delete_support_is_best_effort(self):
"""Adapter lacking delete_message still suppresses (leaves no new send)."""
adapter = _make_adapter(supports_delete=False)
consumer = GatewayStreamConsumer(
adapter, "chat_1",
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1),
)
consumer.on_delta("NO_REPLY")
consumer.finish()
await consumer.run()
for text in _sent_and_edited(adapter):
assert "NO_REPLY" not in text
assert consumer.final_content_delivered is False
@pytest.mark.asyncio
async def test_bracket_silent_marker_suppressed(self):
"""The [SILENT] marker is suppressed just like NO_REPLY."""
adapter = _make_adapter()
consumer = GatewayStreamConsumer(
adapter, "chat_1",
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1),
)
consumer.on_delta("[SILENT]")
consumer.finish()
await consumer.run()
for text in _sent_and_edited(adapter):
assert "[SILENT]" not in text
assert consumer.final_content_delivered is False
@pytest.mark.asyncio
async def test_prose_mentioning_marker_is_delivered(self):
"""Substantive prose that merely mentions NO_REPLY is NOT suppressed."""
adapter = _make_adapter()
consumer = GatewayStreamConsumer(
adapter, "chat_1",
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5),
)
body = "The NO_REPLY token tells the gateway to stay silent."
consumer.on_delta(body)
consumer.finish()
await consumer.run()
delivered = "".join(_sent_and_edited(adapter))
assert "NO_REPLY" in delivered
assert consumer.final_content_delivered is True
@pytest.mark.asyncio
async def test_marker_prefix_then_prose_is_delivered(self):
"""A reply that starts marker-like but continues is delivered whole.
"NO REPLY needed …" passes through the mid-stream hold-back while the
buffer is still a marker prefix, then flushes normally once it diverges.
The final text is NOT an exact marker, so got_done does not suppress it.
"""
adapter = _make_adapter()
consumer = GatewayStreamConsumer(
adapter, "chat_1",
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1),
)
consumer.on_delta("NO REPLY")
consumer.on_delta(" needed — the build is already green.")
consumer.finish()
await consumer.run()
delivered = "".join(_sent_and_edited(adapter))
assert "the build is already green" in delivered
assert consumer.final_content_delivered is True