fix: tolerate missing runtime event state in direct loop tests

This commit is contained in:
chengyongru 2026-06-01 14:31:15 +08:00 committed by Xubin Ren
parent 628b250e9a
commit 8129c16b7d

View File

@ -422,7 +422,7 @@ class AgentLoop:
model_preset if model_preset is not None else self.model_preset,
)
if publish_update:
self.runtime_events.publish_nowait(
self._runtime_event_bus().publish_nowait(
RuntimeModelChanged(
model=self.model,
model_preset=model_preset if model_preset is not None else self.model_preset,
@ -584,6 +584,29 @@ class AgentLoop:
metadata=dict(metadata or {}),
)
def _runtime_event_bus(self) -> RuntimeEventBus:
bus = getattr(self, "runtime_events", None)
if bus is None:
bus = RuntimeEventBus()
self.runtime_events = bus
return bus
def _pop_pending_turn_latency(self, session_key: str) -> int | None:
pending = getattr(self, "_pending_turn_latency_ms", None)
if not isinstance(pending, dict):
return None
return pending.pop(session_key, None)
def _pop_pending_turn_runtime(self, session_key: str) -> LLMRuntime | None:
pending = getattr(self, "_pending_turn_runtime", None)
if not isinstance(pending, dict):
return None
return pending.pop(session_key, None)
def _clear_pending_turn_runtime(self, session_key: str) -> None:
self._pop_pending_turn_latency(session_key)
self._pop_pending_turn_runtime(session_key)
async def _publish_run_status_event(
self,
msg: InboundMessage,
@ -592,7 +615,7 @@ class AgentLoop:
*,
started_at: float | None = None,
) -> None:
await self.runtime_events.publish(
await self._runtime_event_bus().publish(
TurnRunStatusChanged(
context=self._runtime_event_context(
channel=msg.channel,
@ -613,7 +636,7 @@ class AgentLoop:
session_key: str,
metadata: dict[str, Any] | None,
) -> None:
await self.runtime_events.publish(
await self._runtime_event_bus().publish(
TurnCompleted(
context=self._runtime_event_context(
channel=channel,
@ -621,8 +644,8 @@ class AgentLoop:
session_key=session_key,
metadata=metadata,
),
latency_ms=self._pending_turn_latency_ms.pop(session_key, None),
runtime=self._pending_turn_runtime.pop(session_key, None),
latency_ms=self._pop_pending_turn_latency(session_key),
runtime=self._pop_pending_turn_runtime(session_key),
)
)
@ -1105,13 +1128,11 @@ class AgentLoop:
)
if not turn_continuation.internal_continuation_pending(msg.metadata):
await self._publish_run_status_event(msg, session_key, "idle")
self._pending_turn_latency_ms.pop(session_key, None)
self._pending_turn_runtime.pop(session_key, None)
self._clear_pending_turn_runtime(session_key)
finally:
if pending is None:
await self._publish_run_status_event(msg, session_key, "idle")
self._pending_turn_latency_ms.pop(session_key, None)
self._pending_turn_runtime.pop(session_key, None)
self._clear_pending_turn_runtime(session_key)
async def close_mcp(self) -> None:
"""Drain pending background archives, then close MCP connections."""
@ -1373,7 +1394,7 @@ class AgentLoop:
# ensure it exists in case this handler is invoked independently.
if ctx.session is None:
ctx.session = self.sessions.get_or_create(ctx.session_key)
await self.runtime_events.publish(
await self._runtime_event_bus().publish(
SessionTurnStarted(
context=self._runtime_event_context(
channel=msg.channel,
@ -1796,5 +1817,4 @@ class AgentLoop:
)
finally:
await self._publish_run_status_event(msg, session_key, "idle")
self._pending_turn_latency_ms.pop(session_key, None)
self._pending_turn_runtime.pop(session_key, None)
self._clear_pending_turn_runtime(session_key)