From 64ed99a6e61fbd92858fa66de01259985519b6da Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Thu, 2 Jul 2026 17:39:09 -0700 Subject: [PATCH] fix(webhook): close per-delivery session at the true end of the run (#57423) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The merged webhook session-close fix (#57370, salvaging #57322) wrapped handle_message in a try/finally — but BasePlatformAdapter.handle_message is fire-and-forget: it spawns _process_message_background and returns before the agent run starts. The finally-close therefore ran BEFORE get_or_create_session created the session row, found no session_id, and silently no-op'd — the ghost-session leak persisted on the real path. (The shipped test masked this by stubbing handle_message with a fake that created the row synchronously.) Move the close to an on_processing_complete override — the lifecycle hook the base class fires at the TRUE end of the run, on the success, failure, and cancellation paths alike. Empirically verified through the real fire-and-forget pipeline: before, ended_at stayed NULL; after, ended_at is set with end_reason=webhook_complete and the row is prunable. Tests now stub only the runner-side _message_handler (the seam the live gateway injects) so handle_message / _process_message_background / on_processing_complete all run for real; adds an AsyncSessionDB-facade coverage test for the coroutine-await branch. --- gateway/platforms/webhook.py | 49 +++--- tests/gateway/test_webhook_session_close.py | 172 ++++++++++---------- 2 files changed, 107 insertions(+), 114 deletions(-) diff --git a/gateway/platforms/webhook.py b/gateway/platforms/webhook.py index 8327213a056c..9fb72b6ec81d 100644 --- a/gateway/platforms/webhook.py +++ b/gateway/platforms/webhook.py @@ -708,19 +708,12 @@ class WebhookAdapter(BasePlatformAdapter): delivery_id, ) - # Non-blocking — return 202 Accepted immediately. Wrap the agent run - # so that the per-delivery webhook session is marked ended in state.db - # once the run finishes. A webhook delivery uses a unique one-shot - # session (delivery_id is baked into the session key above), so it - # will never receive another turn — it must be closed on completion, - # exactly like a cron run closes its session with "cron_complete" - # (cron/scheduler.py). Without this, webhook sessions keep - # ``ended_at`` NULL forever; ``SessionDB.prune_sessions`` only reaps - # rows with ``ended_at`` set, so unclosed webhook sessions accumulate - # unbounded and drive state.db bloat (the ghost-session leak). - task = asyncio.create_task( - self._run_delivery_and_close(event, session_chat_id) - ) + # Non-blocking — return 202 Accepted immediately. The per-delivery + # session is closed by the ``on_processing_complete`` override below + # once the agent run actually finishes (``handle_message`` itself is + # fire-and-forget: it spawns ``_process_message_background`` and + # returns before the run starts, so nothing can be closed here). + task = asyncio.create_task(self.handle_message(event)) self._background_tasks.add(task) task.add_done_callback(self._background_tasks.discard) @@ -734,25 +727,29 @@ class WebhookAdapter(BasePlatformAdapter): status=202, ) - async def _run_delivery_and_close( - self, event: "MessageEvent", session_chat_id: str + async def on_processing_complete( + self, event: "MessageEvent", outcome: Any ) -> None: - """Run the agent for one webhook delivery, then close its session. + """Close the per-delivery webhook session once its run finishes. A webhook delivery is one-shot: the ``delivery_id`` is baked into the - session key so the session will never receive a second turn. Mirror + session key, so the session will never receive a second turn. Mirror the cron completion path (``cron/scheduler.py`` → ``end_session(..., "cron_complete")``) by marking the session ended - once the run finishes. ``end_session()`` is first-reason-wins and - no-ops on an already-ended row, so this is safe even when a terminal - path (compression split, ``/new``, ``agent_close``) already closed it. - The close runs in ``finally`` so an agent error still reaps the row — - otherwise the ghost-session leak persists on the failure path too. + when the run completes. Without this, webhook sessions keep + ``ended_at`` NULL forever; ``SessionDB.prune_sessions`` only reaps + rows with ``ended_at`` set, so unclosed webhook sessions accumulate + unbounded and drive state.db bloat (the ghost-session leak). + + This hook is the one seam that runs at the TRUE end of the run: + ``BasePlatformAdapter._process_message_background`` fires it after the + message handler returns, on the success, failure, and cancellation + paths alike — so error runs are reaped too. (``handle_message`` is + fire-and-forget; wrapping IT closes before the run even starts.) + ``end_session()`` is first-reason-wins and no-ops on an already-ended + row, so this never clobbers a ``compression``/``agent_close`` reason. """ - try: - await self.handle_message(event) - finally: - await self._end_webhook_session(event, session_chat_id) + await self._end_webhook_session(event, event.source.chat_id) async def _end_webhook_session( self, event: "MessageEvent", session_chat_id: str diff --git a/tests/gateway/test_webhook_session_close.py b/tests/gateway/test_webhook_session_close.py index 804bed07bf69..7b09f7aaeb10 100644 --- a/tests/gateway/test_webhook_session_close.py +++ b/tests/gateway/test_webhook_session_close.py @@ -10,10 +10,16 @@ was the primary driver of the SQLite lock-contention gateway outage). The invariant asserted here is a *behavior contract*, not a snapshot: once a webhook delivery's agent run completes, the session row for that delivery must have ``ended_at`` set — mirroring how a cron run closes its session with -``end_session(..., "cron_complete")``. We exercise the REAL close path -(``WebhookAdapter._run_delivery_and_close`` → ``_end_webhook_session`` → -``SessionDB.end_session``) against a REAL ``SessionStore`` + ``SessionDB`` on a -temp HERMES_HOME, so an integration regression can't hide behind a mock. +``end_session(..., "cron_complete")``. + +CRITICAL: these tests go through the REAL ``handle_message`` → +``_process_message_background`` → ``on_processing_complete`` pipeline (only the +runner-side ``_message_handler`` is stubbed, exactly the seam the live gateway +injects). ``handle_message`` is fire-and-forget — it spawns the background +task and returns before the run starts — so any close bolted around +``handle_message`` itself runs BEFORE the session row exists and silently +no-ops. A test that fakes ``handle_message`` to create the row synchronously +masks exactly that bug (the first version of this fix shipped that way). """ import asyncio @@ -21,10 +27,9 @@ import asyncio import pytest from gateway.config import GatewayConfig, Platform, PlatformConfig -from gateway.platforms.base import MessageEvent, MessageType, SendResult +from gateway.platforms.base import MessageEvent, MessageType from gateway.platforms.webhook import WebhookAdapter, _INSECURE_NO_AUTH from gateway.session import SessionSource, SessionStore -from hermes_state import SessionDB def _make_adapter(routes, **extra_kw) -> WebhookAdapter: @@ -51,9 +56,7 @@ class _FakeRunner: return self.session_store._generate_session_key(source) -@pytest.mark.asyncio -async def test_completed_webhook_delivery_closes_its_session(tmp_path): - """After a webhook run finishes, its session row has ended_at set.""" +def _make_store(tmp_path) -> SessionStore: sessions_dir = tmp_path / "sessions" sessions_dir.mkdir() config = GatewayConfig( @@ -61,6 +64,40 @@ async def test_completed_webhook_delivery_closes_its_session(tmp_path): ) store = SessionStore(sessions_dir=sessions_dir, config=config) assert store._db is not None, "test requires a real SessionDB" + return store + + +def _make_event(adapter: WebhookAdapter, delivery_id: str, text: str) -> MessageEvent: + session_chat_id = f"webhook:alerts:{delivery_id}" + source = adapter.build_source( + chat_id=session_chat_id, + chat_name="webhook/alerts", + chat_type="webhook", + user_id="webhook:alerts", + user_name="alerts", + ) + return MessageEvent( + text=text, + message_type=MessageType.TEXT, + source=source, + raw_message={"message": text}, + message_id=delivery_id, + ) + + +async def _drain_background_tasks(adapter: WebhookAdapter, timeout: float = 5.0) -> None: + """Wait for the adapter's spawned processing task(s) to finish.""" + deadline = asyncio.get_event_loop().time() + timeout + while adapter._background_tasks and asyncio.get_event_loop().time() < deadline: + await asyncio.sleep(0.02) + # One extra tick for done-callbacks to run. + await asyncio.sleep(0.05) + + +@pytest.mark.asyncio +async def test_completed_webhook_delivery_closes_its_session(tmp_path): + """After a webhook run finishes (REAL dispatch path), ended_at is set.""" + store = _make_store(tmp_path) runner = _FakeRunner(store) adapter = _make_adapter( @@ -74,36 +111,28 @@ async def test_completed_webhook_delivery_closes_its_session(tmp_path): ) adapter.gateway_runner = runner - # The gateway creates the session row when it routes the inbound event to - # the agent. Simulate that inside handle_message so the close path has a - # real row to reap, and capture the session_id for the assertion. + # Stub the RUNNER-side handler (the seam the live gateway injects) — the + # adapter's own handle_message / _process_message_background pipeline runs + # for real, including the fire-and-forget task spawn and the + # on_processing_complete hook. The handler creates the session row, just + # like GatewayRunner._handle_message does at routing time. created = {} - async def _fake_handle_message(event: MessageEvent) -> None: + async def _message_handler(event: MessageEvent): entry = store.get_or_create_session(event.source) created["session_id"] = entry.session_id + return "" # webhook deliver=log — nothing to send back - adapter.handle_message = _fake_handle_message + adapter._message_handler = _message_handler - delivery_id = "alert-close-001" - session_chat_id = f"webhook:alerts:{delivery_id}" - source = adapter.build_source( - chat_id=session_chat_id, - chat_name="webhook/alerts", - chat_type="webhook", - user_id="webhook:alerts", - user_name="alerts", - ) - event = MessageEvent( - text="Alert: server on fire", - message_type=MessageType.TEXT, - source=source, - raw_message={"message": "server on fire"}, - message_id=delivery_id, - ) + event = _make_event(adapter, "alert-close-001", "Alert: server on fire") - # Run the exact wrapper the adapter now schedules on delivery. - await adapter._run_delivery_and_close(event, session_chat_id) + # Exactly what _handle_webhook schedules. + await adapter.handle_message(event) + # handle_message is fire-and-forget: the session must NOT be expected to + # exist yet. (Guards against reintroducing a close wrapped around + # handle_message itself, which ran before the row existed and no-op'd.) + await _drain_background_tasks(adapter) session_id = created["session_id"] row = store._db.get_session(session_id) @@ -124,13 +153,8 @@ async def test_completed_webhook_delivery_closes_its_session(tmp_path): @pytest.mark.asyncio async def test_webhook_session_closed_even_when_agent_run_raises(tmp_path): - """A failing agent run still closes the session (finally-path).""" - sessions_dir = tmp_path / "sessions" - sessions_dir.mkdir() - config = GatewayConfig( - platforms={Platform.WEBHOOK: PlatformConfig(enabled=True)} - ) - store = SessionStore(sessions_dir=sessions_dir, config=config) + """A failing agent run still closes the session (FAILURE hook path).""" + store = _make_store(tmp_path) runner = _FakeRunner(store) adapter = _make_adapter( @@ -140,33 +164,18 @@ async def test_webhook_session_closed_even_when_agent_run_raises(tmp_path): created = {} - async def _boom(event: MessageEvent) -> None: + async def _boom(event: MessageEvent): # Row exists (routing happened) before the run blows up mid-turn. entry = store.get_or_create_session(event.source) created["session_id"] = entry.session_id raise RuntimeError("agent exploded mid-run") - adapter.handle_message = _boom + adapter._message_handler = _boom - delivery_id = "alert-fail-001" - session_chat_id = f"webhook:alerts:{delivery_id}" - source = adapter.build_source( - chat_id=session_chat_id, - chat_name="webhook/alerts", - chat_type="webhook", - user_id="webhook:alerts", - user_name="alerts", - ) - event = MessageEvent( - text="x", - message_type=MessageType.TEXT, - source=source, - raw_message={}, - message_id=delivery_id, - ) + event = _make_event(adapter, "alert-fail-001", "x") - with pytest.raises(RuntimeError): - await adapter._run_delivery_and_close(event, session_chat_id) + await adapter.handle_message(event) + await _drain_background_tasks(adapter) row = store._db.get_session(created["session_id"]) assert row is not None @@ -178,39 +187,26 @@ async def test_webhook_session_closed_even_when_agent_run_raises(tmp_path): store._db.close() -def test_peek_session_id_resolves_bound_key(tmp_path): - """SessionStore.peek_session_id returns the session_id bound to a key. +@pytest.mark.asyncio +async def test_end_webhook_session_awaits_async_session_db(tmp_path): + """The close path handles the gateway's real AsyncSessionDB facade.""" + from hermes_state import AsyncSessionDB - This is the public, lock-held accessor the webhook close path uses to - resolve a session row from its key without reaching into the private - ``_entries`` dict. A missing/unknown key returns None (so the close path - debug-logs and no-ops rather than closing the wrong row). - """ - sessions_dir = tmp_path / "sessions" - sessions_dir.mkdir() - config = GatewayConfig( - platforms={Platform.WEBHOOK: PlatformConfig(enabled=True)} - ) - store = SessionStore(sessions_dir=sessions_dir, config=config) + store = _make_store(tmp_path) + runner = _FakeRunner(store) + runner._session_db = AsyncSessionDB(store._db) adapter = _make_adapter( {"alerts": {"secret": _INSECURE_NO_AUTH, "prompt": "x", "deliver": "log"}} ) - source = adapter.build_source( - chat_id="webhook:alerts:peek-001", - chat_name="webhook/alerts", - chat_type="webhook", - user_id="webhook:alerts", - user_name="alerts", - ) - entry = store.get_or_create_session(source) - key = store._generate_session_key(source) + adapter.gateway_runner = runner - # Known key → the bound session_id. - assert store.peek_session_id(key) == entry.session_id - # Unknown key and empty key → None (never a wrong-row close). - assert store.peek_session_id("no:such:key") is None - assert store.peek_session_id("") is None - if store._db is not None: - store._db.close() + event = _make_event(adapter, "alert-async-001", "x") + entry = store.get_or_create_session(event.source) + await adapter._end_webhook_session(event, event.source.chat_id) + + row = store._db.get_session(entry.session_id) + assert row["ended_at"] is not None + assert row["end_reason"] == "webhook_complete" + store._db.close()