fix(heartbeat): record proactive deliveries in channel sessions

Route heartbeat, cron, and message-tool deliveries through one gateway helper so user-visible proactive messages are available when the channel replies.

Made-with: Cursor
This commit is contained in:
Xubin Ren 2026-04-26 11:46:30 +00:00 committed by Xubin Ren
parent 1572626100
commit 799db33517
4 changed files with 100 additions and 32 deletions

View File

@ -656,6 +656,8 @@ def _run_gateway(
) -> None:
"""Shared gateway runtime; ``open_browser_url`` opens a tab once channels are up."""
from nanobot.agent.loop import AgentLoop
from nanobot.agent.tools.cron import CronTool
from nanobot.agent.tools.message import MessageTool
from nanobot.bus.queue import MessageBus
from nanobot.channels.manager import ChannelManager
from nanobot.cron.service import CronService
@ -704,6 +706,34 @@ def _run_gateway(
tools_config=config.tools,
)
from nanobot.agent.loop import UNIFIED_SESSION_KEY
from nanobot.bus.events import OutboundMessage
def _channel_session_key(channel: str, chat_id: str) -> str:
return (
UNIFIED_SESSION_KEY
if config.agents.defaults.unified_session
else f"{channel}:{chat_id}"
)
async def _deliver_to_channel(msg: OutboundMessage, *, record: bool = True) -> None:
"""Publish a user-visible message and mirror it into that channel's session."""
if (
record
and msg.channel != "cli"
and msg.content.strip()
and hasattr(session_manager, "get_or_create")
and hasattr(session_manager, "save")
):
session = session_manager.get_or_create(_channel_session_key(msg.channel, msg.chat_id))
session.add_message("assistant", msg.content, _channel_delivery=True)
session_manager.save(session)
await bus.publish_outbound(msg)
message_tool = getattr(agent, "tools", {}).get("message")
if isinstance(message_tool, MessageTool):
message_tool.set_send_callback(_deliver_to_channel)
# Set cron callback (needs agent)
async def on_cron_job(job: CronJob) -> str | None:
"""Execute a cron job through the agent."""
@ -716,8 +746,6 @@ def _run_gateway(
logger.exception("Dream cron job failed")
return None
from nanobot.agent.tools.cron import CronTool
from nanobot.agent.tools.message import MessageTool
from nanobot.utils.evaluator import evaluate_response
reminder_note = (
@ -757,12 +785,13 @@ def _run_gateway(
response, reminder_note, provider, agent.model,
)
if should_notify:
from nanobot.bus.events import OutboundMessage
await bus.publish_outbound(OutboundMessage(
channel=job.payload.channel or "cli",
chat_id=job.payload.to,
content=response,
))
await _deliver_to_channel(
OutboundMessage(
channel=job.payload.channel or "cli",
chat_id=job.payload.to,
content=response,
)
)
return response
cron.on_job = on_cron_job
@ -820,24 +849,11 @@ def _run_gateway(
lands in a session that has no context about the heartbeat message
and the agent cannot follow through.
"""
from nanobot.bus.events import OutboundMessage
channel, chat_id = _pick_heartbeat_target()
if channel == "cli":
return # No external channel available to deliver to
# Inject the delivered message into the channel session so that
# user replies have conversational context.
from nanobot.agent.loop import UNIFIED_SESSION_KEY
target_key = (
UNIFIED_SESSION_KEY
if config.agents.defaults.unified_session
else f"{channel}:{chat_id}"
)
target_session = agent.sessions.get_or_create(target_key)
target_session.add_message("assistant", response)
agent.sessions.save(target_session)
await bus.publish_outbound(OutboundMessage(channel=channel, chat_id=chat_id, content=response))
await _deliver_to_channel(OutboundMessage(channel=channel, chat_id=chat_id, content=response))
hb_cfg = config.gateway.heartbeat
heartbeat = HeartbeatService(

View File

@ -46,10 +46,14 @@ class Session:
unconsolidated = self.messages[self.last_consolidated:]
sliced = unconsolidated[-max_messages:]
# Avoid starting mid-turn when possible.
# Avoid starting mid-turn when possible, except for proactive
# assistant deliveries that the user may be replying to.
for i, message in enumerate(sliced):
if message.get("role") == "user":
sliced = sliced[i:]
start = i
if i > 0 and sliced[i - 1].get("_channel_delivery"):
start = i - 1
sliced = sliced[start:]
break
# Drop orphan tool results at the front.

View File

@ -942,7 +942,27 @@ def test_gateway_cron_evaluator_receives_scheduled_reminder_context(
monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None)
monkeypatch.setattr("nanobot.cli.commands._make_provider", lambda _config: provider)
monkeypatch.setattr("nanobot.bus.queue.MessageBus", lambda: bus)
monkeypatch.setattr("nanobot.session.manager.SessionManager", lambda _workspace: object())
class _FakeSession:
def __init__(self) -> None:
self.messages = []
def add_message(self, role: str, content: str, **kwargs) -> None:
self.messages.append({"role": role, "content": content, **kwargs})
class _FakeSessionManager:
def __init__(self, _workspace: Path) -> None:
self.session = _FakeSession()
seen["session_manager"] = self
def get_or_create(self, key: str) -> _FakeSession:
seen["session_key"] = key
return self.session
def save(self, session: _FakeSession) -> None:
seen["saved_session"] = session
monkeypatch.setattr("nanobot.session.manager.SessionManager", _FakeSessionManager)
class _FakeCron:
def __init__(self, _store_path: Path) -> None:
@ -1030,6 +1050,16 @@ def test_gateway_cron_evaluator_receives_scheduled_reminder_context(
content="Time to stretch.",
)
)
assert seen["session_key"] == "telegram:user-1"
saved_session = seen["saved_session"]
assert isinstance(saved_session, _FakeSession)
assert saved_session.messages == [
{
"role": "assistant",
"content": "Time to stretch.",
"_channel_delivery": True,
}
]
def test_gateway_cron_job_suppresses_intermediate_progress(

View File

@ -20,7 +20,11 @@ class TestHeartbeatContextBridge:
# Simulate what on_heartbeat_notify does
target_session = session_mgr.get_or_create(target_key)
target_session.add_message("assistant", "3 new emails — invoice, meeting, proposal.")
target_session.add_message(
"assistant",
"3 new emails — invoice, meeting, proposal.",
_channel_delivery=True,
)
session_mgr.save(target_session)
# Reload and verify
@ -45,7 +49,11 @@ class TestHeartbeatContextBridge:
# Step 1: heartbeat injects assistant message
session = session_mgr.get_or_create(target_key)
session.add_message("assistant", "If you want, I can mark that email as read.")
session.add_message(
"assistant",
"If you want, I can mark that email as read.",
_channel_delivery=True,
)
session_mgr.save(session)
# Step 2: user replies "Sure"
@ -76,7 +84,11 @@ class TestHeartbeatContextBridge:
# Heartbeat injects
session = session_mgr.get_or_create(target_key)
session.add_message("assistant", "You have a meeting in 30 minutes.")
session.add_message(
"assistant",
"You have a meeting in 30 minutes.",
_channel_delivery=True,
)
session_mgr.save(session)
# Verify
@ -86,17 +98,23 @@ class TestHeartbeatContextBridge:
assert roles == ["user", "assistant", "user", "assistant"]
assert "meeting in 30 minutes" in history[-1]["content"]
def test_injection_to_empty_session(self, tmp_path):
"""Injecting into a brand-new session (no prior messages) works."""
def test_reply_after_injection_to_empty_session_keeps_context(self, tmp_path):
"""A user replying to the first delivered message still sees that context."""
session_mgr = SessionManager(tmp_path / "sessions")
target_key = "telegram:99999"
session = session_mgr.get_or_create(target_key)
session.add_message("assistant", "Weather alert: sandstorm expected at 4pm.")
session.add_message(
"assistant",
"Weather alert: sandstorm expected at 4pm.",
_channel_delivery=True,
)
session.add_message("user", "Sure")
session_mgr.save(session)
reloaded = session_mgr.get_or_create(target_key)
history = reloaded.get_history(max_messages=0)
assert len(history) == 1
assert len(history) == 2
assert history[0]["role"] == "assistant"
assert "sandstorm" in history[0]["content"]
assert history[1] == {"role": "user", "content": "Sure"}