From 799db3351783d43b4048d845af83b1b7b8b4e6fc Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Sun, 26 Apr 2026 11:46:30 +0000 Subject: [PATCH] fix(heartbeat): record proactive deliveries in channel sessions Route heartbeat, cron, and message-tool deliveries through one gateway helper so user-visible proactive messages are available when the channel replies. Made-with: Cursor --- nanobot/cli/commands.py | 60 ++++++++++++------- nanobot/session/manager.py | 8 ++- tests/cli/test_commands.py | 32 +++++++++- .../test_heartbeat_context_bridge.py | 32 +++++++--- 4 files changed, 100 insertions(+), 32 deletions(-) diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 27a7f6ecb..684e7f4c4 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -656,6 +656,8 @@ def _run_gateway( ) -> None: """Shared gateway runtime; ``open_browser_url`` opens a tab once channels are up.""" from nanobot.agent.loop import AgentLoop + from nanobot.agent.tools.cron import CronTool + from nanobot.agent.tools.message import MessageTool from nanobot.bus.queue import MessageBus from nanobot.channels.manager import ChannelManager from nanobot.cron.service import CronService @@ -704,6 +706,34 @@ def _run_gateway( tools_config=config.tools, ) + from nanobot.agent.loop import UNIFIED_SESSION_KEY + from nanobot.bus.events import OutboundMessage + + def _channel_session_key(channel: str, chat_id: str) -> str: + return ( + UNIFIED_SESSION_KEY + if config.agents.defaults.unified_session + else f"{channel}:{chat_id}" + ) + + async def _deliver_to_channel(msg: OutboundMessage, *, record: bool = True) -> None: + """Publish a user-visible message and mirror it into that channel's session.""" + if ( + record + and msg.channel != "cli" + and msg.content.strip() + and hasattr(session_manager, "get_or_create") + and hasattr(session_manager, "save") + ): + session = session_manager.get_or_create(_channel_session_key(msg.channel, msg.chat_id)) + session.add_message("assistant", msg.content, _channel_delivery=True) + session_manager.save(session) + await bus.publish_outbound(msg) + + message_tool = getattr(agent, "tools", {}).get("message") + if isinstance(message_tool, MessageTool): + message_tool.set_send_callback(_deliver_to_channel) + # Set cron callback (needs agent) async def on_cron_job(job: CronJob) -> str | None: """Execute a cron job through the agent.""" @@ -716,8 +746,6 @@ def _run_gateway( logger.exception("Dream cron job failed") return None - from nanobot.agent.tools.cron import CronTool - from nanobot.agent.tools.message import MessageTool from nanobot.utils.evaluator import evaluate_response reminder_note = ( @@ -757,12 +785,13 @@ def _run_gateway( response, reminder_note, provider, agent.model, ) if should_notify: - from nanobot.bus.events import OutboundMessage - await bus.publish_outbound(OutboundMessage( - channel=job.payload.channel or "cli", - chat_id=job.payload.to, - content=response, - )) + await _deliver_to_channel( + OutboundMessage( + channel=job.payload.channel or "cli", + chat_id=job.payload.to, + content=response, + ) + ) return response cron.on_job = on_cron_job @@ -820,24 +849,11 @@ def _run_gateway( lands in a session that has no context about the heartbeat message and the agent cannot follow through. """ - from nanobot.bus.events import OutboundMessage channel, chat_id = _pick_heartbeat_target() if channel == "cli": return # No external channel available to deliver to - # Inject the delivered message into the channel session so that - # user replies have conversational context. - from nanobot.agent.loop import UNIFIED_SESSION_KEY - target_key = ( - UNIFIED_SESSION_KEY - if config.agents.defaults.unified_session - else f"{channel}:{chat_id}" - ) - target_session = agent.sessions.get_or_create(target_key) - target_session.add_message("assistant", response) - agent.sessions.save(target_session) - - await bus.publish_outbound(OutboundMessage(channel=channel, chat_id=chat_id, content=response)) + await _deliver_to_channel(OutboundMessage(channel=channel, chat_id=chat_id, content=response)) hb_cfg = config.gateway.heartbeat heartbeat = HeartbeatService( diff --git a/nanobot/session/manager.py b/nanobot/session/manager.py index 69509a839..ddcfdea14 100644 --- a/nanobot/session/manager.py +++ b/nanobot/session/manager.py @@ -46,10 +46,14 @@ class Session: unconsolidated = self.messages[self.last_consolidated:] sliced = unconsolidated[-max_messages:] - # Avoid starting mid-turn when possible. + # Avoid starting mid-turn when possible, except for proactive + # assistant deliveries that the user may be replying to. for i, message in enumerate(sliced): if message.get("role") == "user": - sliced = sliced[i:] + start = i + if i > 0 and sliced[i - 1].get("_channel_delivery"): + start = i - 1 + sliced = sliced[start:] break # Drop orphan tool results at the front. diff --git a/tests/cli/test_commands.py b/tests/cli/test_commands.py index 2719beed1..403f53fb3 100644 --- a/tests/cli/test_commands.py +++ b/tests/cli/test_commands.py @@ -942,7 +942,27 @@ def test_gateway_cron_evaluator_receives_scheduled_reminder_context( monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None) monkeypatch.setattr("nanobot.cli.commands._make_provider", lambda _config: provider) monkeypatch.setattr("nanobot.bus.queue.MessageBus", lambda: bus) - monkeypatch.setattr("nanobot.session.manager.SessionManager", lambda _workspace: object()) + + class _FakeSession: + def __init__(self) -> None: + self.messages = [] + + def add_message(self, role: str, content: str, **kwargs) -> None: + self.messages.append({"role": role, "content": content, **kwargs}) + + class _FakeSessionManager: + def __init__(self, _workspace: Path) -> None: + self.session = _FakeSession() + seen["session_manager"] = self + + def get_or_create(self, key: str) -> _FakeSession: + seen["session_key"] = key + return self.session + + def save(self, session: _FakeSession) -> None: + seen["saved_session"] = session + + monkeypatch.setattr("nanobot.session.manager.SessionManager", _FakeSessionManager) class _FakeCron: def __init__(self, _store_path: Path) -> None: @@ -1030,6 +1050,16 @@ def test_gateway_cron_evaluator_receives_scheduled_reminder_context( content="Time to stretch.", ) ) + assert seen["session_key"] == "telegram:user-1" + saved_session = seen["saved_session"] + assert isinstance(saved_session, _FakeSession) + assert saved_session.messages == [ + { + "role": "assistant", + "content": "Time to stretch.", + "_channel_delivery": True, + } + ] def test_gateway_cron_job_suppresses_intermediate_progress( diff --git a/tests/heartbeat/test_heartbeat_context_bridge.py b/tests/heartbeat/test_heartbeat_context_bridge.py index ced2ddddc..5ec02a8bb 100644 --- a/tests/heartbeat/test_heartbeat_context_bridge.py +++ b/tests/heartbeat/test_heartbeat_context_bridge.py @@ -20,7 +20,11 @@ class TestHeartbeatContextBridge: # Simulate what on_heartbeat_notify does target_session = session_mgr.get_or_create(target_key) - target_session.add_message("assistant", "3 new emails — invoice, meeting, proposal.") + target_session.add_message( + "assistant", + "3 new emails — invoice, meeting, proposal.", + _channel_delivery=True, + ) session_mgr.save(target_session) # Reload and verify @@ -45,7 +49,11 @@ class TestHeartbeatContextBridge: # Step 1: heartbeat injects assistant message session = session_mgr.get_or_create(target_key) - session.add_message("assistant", "If you want, I can mark that email as read.") + session.add_message( + "assistant", + "If you want, I can mark that email as read.", + _channel_delivery=True, + ) session_mgr.save(session) # Step 2: user replies "Sure" @@ -76,7 +84,11 @@ class TestHeartbeatContextBridge: # Heartbeat injects session = session_mgr.get_or_create(target_key) - session.add_message("assistant", "You have a meeting in 30 minutes.") + session.add_message( + "assistant", + "You have a meeting in 30 minutes.", + _channel_delivery=True, + ) session_mgr.save(session) # Verify @@ -86,17 +98,23 @@ class TestHeartbeatContextBridge: assert roles == ["user", "assistant", "user", "assistant"] assert "meeting in 30 minutes" in history[-1]["content"] - def test_injection_to_empty_session(self, tmp_path): - """Injecting into a brand-new session (no prior messages) works.""" + def test_reply_after_injection_to_empty_session_keeps_context(self, tmp_path): + """A user replying to the first delivered message still sees that context.""" session_mgr = SessionManager(tmp_path / "sessions") target_key = "telegram:99999" session = session_mgr.get_or_create(target_key) - session.add_message("assistant", "Weather alert: sandstorm expected at 4pm.") + session.add_message( + "assistant", + "Weather alert: sandstorm expected at 4pm.", + _channel_delivery=True, + ) + session.add_message("user", "Sure") session_mgr.save(session) reloaded = session_mgr.get_or_create(target_key) history = reloaded.get_history(max_messages=0) - assert len(history) == 1 + assert len(history) == 2 assert history[0]["role"] == "assistant" assert "sandstorm" in history[0]["content"] + assert history[1] == {"role": "user", "content": "Sure"}