fix(webhook): close per-delivery session at the true end of the run (#57423)

The merged webhook session-close fix (#57370, salvaging #57322) wrapped
handle_message in a try/finally — but BasePlatformAdapter.handle_message
is fire-and-forget: it spawns _process_message_background and returns
before the agent run starts. The finally-close therefore ran BEFORE
get_or_create_session created the session row, found no session_id, and
silently no-op'd — the ghost-session leak persisted on the real path.
(The shipped test masked this by stubbing handle_message with a fake
that created the row synchronously.)

Move the close to an on_processing_complete override — the lifecycle
hook the base class fires at the TRUE end of the run, on the success,
failure, and cancellation paths alike. Empirically verified through the
real fire-and-forget pipeline: before, ended_at stayed NULL; after,
ended_at is set with end_reason=webhook_complete and the row is
prunable.

Tests now stub only the runner-side _message_handler (the seam the live
gateway injects) so handle_message / _process_message_background /
on_processing_complete all run for real; adds an AsyncSessionDB-facade
coverage test for the coroutine-await branch.
This commit is contained in:
Teknium
2026-07-02 17:39:09 -07:00
committed by GitHub
parent 4aad27b751
commit 64ed99a6e6
2 changed files with 107 additions and 114 deletions

View File

@@ -708,19 +708,12 @@ class WebhookAdapter(BasePlatformAdapter):
delivery_id,
)
# Non-blocking — return 202 Accepted immediately. Wrap the agent run
# so that the per-delivery webhook session is marked ended in state.db
# once the run finishes. A webhook delivery uses a unique one-shot
# session (delivery_id is baked into the session key above), so it
# will never receive another turn — it must be closed on completion,
# exactly like a cron run closes its session with "cron_complete"
# (cron/scheduler.py). Without this, webhook sessions keep
# ``ended_at`` NULL forever; ``SessionDB.prune_sessions`` only reaps
# rows with ``ended_at`` set, so unclosed webhook sessions accumulate
# unbounded and drive state.db bloat (the ghost-session leak).
task = asyncio.create_task(
self._run_delivery_and_close(event, session_chat_id)
)
# Non-blocking — return 202 Accepted immediately. The per-delivery
# session is closed by the ``on_processing_complete`` override below
# once the agent run actually finishes (``handle_message`` itself is
# fire-and-forget: it spawns ``_process_message_background`` and
# returns before the run starts, so nothing can be closed here).
task = asyncio.create_task(self.handle_message(event))
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
@@ -734,25 +727,29 @@ class WebhookAdapter(BasePlatformAdapter):
status=202,
)
async def _run_delivery_and_close(
self, event: "MessageEvent", session_chat_id: str
async def on_processing_complete(
self, event: "MessageEvent", outcome: Any
) -> None:
"""Run the agent for one webhook delivery, then close its session.
"""Close the per-delivery webhook session once its run finishes.
A webhook delivery is one-shot: the ``delivery_id`` is baked into the
session key so the session will never receive a second turn. Mirror
session key, so the session will never receive a second turn. Mirror
the cron completion path (``cron/scheduler.py`` →
``end_session(..., "cron_complete")``) by marking the session ended
once the run finishes. ``end_session()`` is first-reason-wins and
no-ops on an already-ended row, so this is safe even when a terminal
path (compression split, ``/new``, ``agent_close``) already closed it.
The close runs in ``finally`` so an agent error still reaps the row —
otherwise the ghost-session leak persists on the failure path too.
when the run completes. Without this, webhook sessions keep
``ended_at`` NULL forever; ``SessionDB.prune_sessions`` only reaps
rows with ``ended_at`` set, so unclosed webhook sessions accumulate
unbounded and drive state.db bloat (the ghost-session leak).
This hook is the one seam that runs at the TRUE end of the run:
``BasePlatformAdapter._process_message_background`` fires it after the
message handler returns, on the success, failure, and cancellation
paths alike — so error runs are reaped too. (``handle_message`` is
fire-and-forget; wrapping IT closes before the run even starts.)
``end_session()`` is first-reason-wins and no-ops on an already-ended
row, so this never clobbers a ``compression``/``agent_close`` reason.
"""
try:
await self.handle_message(event)
finally:
await self._end_webhook_session(event, session_chat_id)
await self._end_webhook_session(event, event.source.chat_id)
async def _end_webhook_session(
self, event: "MessageEvent", session_chat_id: str

View File

@@ -10,10 +10,16 @@ was the primary driver of the SQLite lock-contention gateway outage).
The invariant asserted here is a *behavior contract*, not a snapshot: once a
webhook delivery's agent run completes, the session row for that delivery must
have ``ended_at`` set — mirroring how a cron run closes its session with
``end_session(..., "cron_complete")``. We exercise the REAL close path
(``WebhookAdapter._run_delivery_and_close`` → ``_end_webhook_session`` →
``SessionDB.end_session``) against a REAL ``SessionStore`` + ``SessionDB`` on a
temp HERMES_HOME, so an integration regression can't hide behind a mock.
``end_session(..., "cron_complete")``.
CRITICAL: these tests go through the REAL ``handle_message`` →
``_process_message_background`` → ``on_processing_complete`` pipeline (only the
runner-side ``_message_handler`` is stubbed, exactly the seam the live gateway
injects). ``handle_message`` is fire-and-forget — it spawns the background
task and returns before the run starts — so any close bolted around
``handle_message`` itself runs BEFORE the session row exists and silently
no-ops. A test that fakes ``handle_message`` to create the row synchronously
masks exactly that bug (the first version of this fix shipped that way).
"""
import asyncio
@@ -21,10 +27,9 @@ import asyncio
import pytest
from gateway.config import GatewayConfig, Platform, PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType, SendResult
from gateway.platforms.base import MessageEvent, MessageType
from gateway.platforms.webhook import WebhookAdapter, _INSECURE_NO_AUTH
from gateway.session import SessionSource, SessionStore
from hermes_state import SessionDB
def _make_adapter(routes, **extra_kw) -> WebhookAdapter:
@@ -51,9 +56,7 @@ class _FakeRunner:
return self.session_store._generate_session_key(source)
@pytest.mark.asyncio
async def test_completed_webhook_delivery_closes_its_session(tmp_path):
"""After a webhook run finishes, its session row has ended_at set."""
def _make_store(tmp_path) -> SessionStore:
sessions_dir = tmp_path / "sessions"
sessions_dir.mkdir()
config = GatewayConfig(
@@ -61,6 +64,40 @@ async def test_completed_webhook_delivery_closes_its_session(tmp_path):
)
store = SessionStore(sessions_dir=sessions_dir, config=config)
assert store._db is not None, "test requires a real SessionDB"
return store
def _make_event(adapter: WebhookAdapter, delivery_id: str, text: str) -> MessageEvent:
session_chat_id = f"webhook:alerts:{delivery_id}"
source = adapter.build_source(
chat_id=session_chat_id,
chat_name="webhook/alerts",
chat_type="webhook",
user_id="webhook:alerts",
user_name="alerts",
)
return MessageEvent(
text=text,
message_type=MessageType.TEXT,
source=source,
raw_message={"message": text},
message_id=delivery_id,
)
async def _drain_background_tasks(adapter: WebhookAdapter, timeout: float = 5.0) -> None:
"""Wait for the adapter's spawned processing task(s) to finish."""
deadline = asyncio.get_event_loop().time() + timeout
while adapter._background_tasks and asyncio.get_event_loop().time() < deadline:
await asyncio.sleep(0.02)
# One extra tick for done-callbacks to run.
await asyncio.sleep(0.05)
@pytest.mark.asyncio
async def test_completed_webhook_delivery_closes_its_session(tmp_path):
"""After a webhook run finishes (REAL dispatch path), ended_at is set."""
store = _make_store(tmp_path)
runner = _FakeRunner(store)
adapter = _make_adapter(
@@ -74,36 +111,28 @@ async def test_completed_webhook_delivery_closes_its_session(tmp_path):
)
adapter.gateway_runner = runner
# The gateway creates the session row when it routes the inbound event to
# the agent. Simulate that inside handle_message so the close path has a
# real row to reap, and capture the session_id for the assertion.
# Stub the RUNNER-side handler (the seam the live gateway injects) — the
# adapter's own handle_message / _process_message_background pipeline runs
# for real, including the fire-and-forget task spawn and the
# on_processing_complete hook. The handler creates the session row, just
# like GatewayRunner._handle_message does at routing time.
created = {}
async def _fake_handle_message(event: MessageEvent) -> None:
async def _message_handler(event: MessageEvent):
entry = store.get_or_create_session(event.source)
created["session_id"] = entry.session_id
return "" # webhook deliver=log — nothing to send back
adapter.handle_message = _fake_handle_message
adapter._message_handler = _message_handler
delivery_id = "alert-close-001"
session_chat_id = f"webhook:alerts:{delivery_id}"
source = adapter.build_source(
chat_id=session_chat_id,
chat_name="webhook/alerts",
chat_type="webhook",
user_id="webhook:alerts",
user_name="alerts",
)
event = MessageEvent(
text="Alert: server on fire",
message_type=MessageType.TEXT,
source=source,
raw_message={"message": "server on fire"},
message_id=delivery_id,
)
event = _make_event(adapter, "alert-close-001", "Alert: server on fire")
# Run the exact wrapper the adapter now schedules on delivery.
await adapter._run_delivery_and_close(event, session_chat_id)
# Exactly what _handle_webhook schedules.
await adapter.handle_message(event)
# handle_message is fire-and-forget: the session must NOT be expected to
# exist yet. (Guards against reintroducing a close wrapped around
# handle_message itself, which ran before the row existed and no-op'd.)
await _drain_background_tasks(adapter)
session_id = created["session_id"]
row = store._db.get_session(session_id)
@@ -124,13 +153,8 @@ async def test_completed_webhook_delivery_closes_its_session(tmp_path):
@pytest.mark.asyncio
async def test_webhook_session_closed_even_when_agent_run_raises(tmp_path):
"""A failing agent run still closes the session (finally-path)."""
sessions_dir = tmp_path / "sessions"
sessions_dir.mkdir()
config = GatewayConfig(
platforms={Platform.WEBHOOK: PlatformConfig(enabled=True)}
)
store = SessionStore(sessions_dir=sessions_dir, config=config)
"""A failing agent run still closes the session (FAILURE hook path)."""
store = _make_store(tmp_path)
runner = _FakeRunner(store)
adapter = _make_adapter(
@@ -140,33 +164,18 @@ async def test_webhook_session_closed_even_when_agent_run_raises(tmp_path):
created = {}
async def _boom(event: MessageEvent) -> None:
async def _boom(event: MessageEvent):
# Row exists (routing happened) before the run blows up mid-turn.
entry = store.get_or_create_session(event.source)
created["session_id"] = entry.session_id
raise RuntimeError("agent exploded mid-run")
adapter.handle_message = _boom
adapter._message_handler = _boom
delivery_id = "alert-fail-001"
session_chat_id = f"webhook:alerts:{delivery_id}"
source = adapter.build_source(
chat_id=session_chat_id,
chat_name="webhook/alerts",
chat_type="webhook",
user_id="webhook:alerts",
user_name="alerts",
)
event = MessageEvent(
text="x",
message_type=MessageType.TEXT,
source=source,
raw_message={},
message_id=delivery_id,
)
event = _make_event(adapter, "alert-fail-001", "x")
with pytest.raises(RuntimeError):
await adapter._run_delivery_and_close(event, session_chat_id)
await adapter.handle_message(event)
await _drain_background_tasks(adapter)
row = store._db.get_session(created["session_id"])
assert row is not None
@@ -178,39 +187,26 @@ async def test_webhook_session_closed_even_when_agent_run_raises(tmp_path):
store._db.close()
def test_peek_session_id_resolves_bound_key(tmp_path):
"""SessionStore.peek_session_id returns the session_id bound to a key.
@pytest.mark.asyncio
async def test_end_webhook_session_awaits_async_session_db(tmp_path):
"""The close path handles the gateway's real AsyncSessionDB facade."""
from hermes_state import AsyncSessionDB
This is the public, lock-held accessor the webhook close path uses to
resolve a session row from its key without reaching into the private
``_entries`` dict. A missing/unknown key returns None (so the close path
debug-logs and no-ops rather than closing the wrong row).
"""
sessions_dir = tmp_path / "sessions"
sessions_dir.mkdir()
config = GatewayConfig(
platforms={Platform.WEBHOOK: PlatformConfig(enabled=True)}
)
store = SessionStore(sessions_dir=sessions_dir, config=config)
store = _make_store(tmp_path)
runner = _FakeRunner(store)
runner._session_db = AsyncSessionDB(store._db)
adapter = _make_adapter(
{"alerts": {"secret": _INSECURE_NO_AUTH, "prompt": "x", "deliver": "log"}}
)
source = adapter.build_source(
chat_id="webhook:alerts:peek-001",
chat_name="webhook/alerts",
chat_type="webhook",
user_id="webhook:alerts",
user_name="alerts",
)
entry = store.get_or_create_session(source)
key = store._generate_session_key(source)
adapter.gateway_runner = runner
# Known key → the bound session_id.
assert store.peek_session_id(key) == entry.session_id
# Unknown key and empty key → None (never a wrong-row close).
assert store.peek_session_id("no:such:key") is None
assert store.peek_session_id("") is None
if store._db is not None:
event = _make_event(adapter, "alert-async-001", "x")
entry = store.get_or_create_session(event.source)
await adapter._end_webhook_session(event, event.source.chat_id)
row = store._db.get_session(entry.session_id)
assert row["ended_at"] is not None
assert row["end_reason"] == "webhook_complete"
store._db.close()