diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 2e02ffdf4..2e98136bf 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -422,7 +422,7 @@ class AgentLoop: model_preset if model_preset is not None else self.model_preset, ) if publish_update: - self.runtime_events.publish_nowait( + self._runtime_event_bus().publish_nowait( RuntimeModelChanged( model=self.model, model_preset=model_preset if model_preset is not None else self.model_preset, @@ -584,6 +584,29 @@ class AgentLoop: metadata=dict(metadata or {}), ) + def _runtime_event_bus(self) -> RuntimeEventBus: + bus = getattr(self, "runtime_events", None) + if bus is None: + bus = RuntimeEventBus() + self.runtime_events = bus + return bus + + def _pop_pending_turn_latency(self, session_key: str) -> int | None: + pending = getattr(self, "_pending_turn_latency_ms", None) + if not isinstance(pending, dict): + return None + return pending.pop(session_key, None) + + def _pop_pending_turn_runtime(self, session_key: str) -> LLMRuntime | None: + pending = getattr(self, "_pending_turn_runtime", None) + if not isinstance(pending, dict): + return None + return pending.pop(session_key, None) + + def _clear_pending_turn_runtime(self, session_key: str) -> None: + self._pop_pending_turn_latency(session_key) + self._pop_pending_turn_runtime(session_key) + async def _publish_run_status_event( self, msg: InboundMessage, @@ -592,7 +615,7 @@ class AgentLoop: *, started_at: float | None = None, ) -> None: - await self.runtime_events.publish( + await self._runtime_event_bus().publish( TurnRunStatusChanged( context=self._runtime_event_context( channel=msg.channel, @@ -613,7 +636,7 @@ class AgentLoop: session_key: str, metadata: dict[str, Any] | None, ) -> None: - await self.runtime_events.publish( + await self._runtime_event_bus().publish( TurnCompleted( context=self._runtime_event_context( channel=channel, @@ -621,8 +644,8 @@ class AgentLoop: session_key=session_key, metadata=metadata, ), - latency_ms=self._pending_turn_latency_ms.pop(session_key, None), - runtime=self._pending_turn_runtime.pop(session_key, None), + latency_ms=self._pop_pending_turn_latency(session_key), + runtime=self._pop_pending_turn_runtime(session_key), ) ) @@ -1105,13 +1128,11 @@ class AgentLoop: ) if not turn_continuation.internal_continuation_pending(msg.metadata): await self._publish_run_status_event(msg, session_key, "idle") - self._pending_turn_latency_ms.pop(session_key, None) - self._pending_turn_runtime.pop(session_key, None) + self._clear_pending_turn_runtime(session_key) finally: if pending is None: await self._publish_run_status_event(msg, session_key, "idle") - self._pending_turn_latency_ms.pop(session_key, None) - self._pending_turn_runtime.pop(session_key, None) + self._clear_pending_turn_runtime(session_key) async def close_mcp(self) -> None: """Drain pending background archives, then close MCP connections.""" @@ -1373,7 +1394,7 @@ class AgentLoop: # ensure it exists in case this handler is invoked independently. if ctx.session is None: ctx.session = self.sessions.get_or_create(ctx.session_key) - await self.runtime_events.publish( + await self._runtime_event_bus().publish( SessionTurnStarted( context=self._runtime_event_context( channel=msg.channel, @@ -1796,5 +1817,4 @@ class AgentLoop: ) finally: await self._publish_run_status_event(msg, session_key, "idle") - self._pending_turn_latency_ms.pop(session_key, None) - self._pending_turn_runtime.pop(session_key, None) + self._clear_pending_turn_runtime(session_key)