From fe2af64e048deb94cbf49539636eb763eea6f93c Mon Sep 17 00:00:00 2001 From: chengyongru <2755839590@qq.com> Date: Wed, 27 May 2026 21:28:59 +0800 Subject: [PATCH] refactor(heartbeat): migrate heartbeat service to cron-based auto-registration Remove standalone nanobot/heartbeat/ service and replace it with an auto-registered system cron job on gateway startup. Key behaviors preserved: - HeartbeatConfig (enabled, interval_s, keep_recent_messages) remains in GatewayConfig for backward compatibility. - On startup, if enabled, a system cron job "heartbeat" is registered with schedule derived from interval_s. - HEARTBEAT.md is checked on each tick; empty/template-identical files skip to avoid wasting LLM calls. - Post-run evaluate_response and session history truncation (keep_recent_messages) are retained. - Delivery target selection, deliverable filtering, and preamble guidance are preserved. Files removed: - nanobot/heartbeat/__init__.py - nanobot/heartbeat/service.py - tests/heartbeat/* - tests/agent/test_heartbeat_service.py Templates and docs updated to reflect cron-based usage. --- .agent/gotchas.md | 4 - CLAUDE.md | 2 +- core_agent_lines.sh | 4 +- docs/configuration.md | 2 +- nanobot/agent/context.py | 12 +- nanobot/cli/commands.py | 160 +++++---- nanobot/cli/onboard.py | 2 +- nanobot/config/schema.py | 2 +- nanobot/heartbeat/__init__.py | 5 - nanobot/heartbeat/service.py | 243 ------------- nanobot/templates/AGENTS.md | 4 +- nanobot/templates/HEARTBEAT.md | 6 +- nanobot/templates/agent/tool_contract.md | 2 +- nanobot/utils/helpers.py | 11 + tests/agent/test_heartbeat_service.py | 336 ------------------ tests/agent/test_loop_save_turn.py | 14 +- tests/cli/test_commands.py | 11 - .../test_heartbeat_context_bridge.py | 120 ------- .../test_heartbeat_deliverability.py | 230 ------------ 19 files changed, 122 insertions(+), 1048 deletions(-) delete mode 100644 nanobot/heartbeat/__init__.py delete mode 100644 nanobot/heartbeat/service.py delete mode 100644 tests/agent/test_heartbeat_service.py delete mode 100644 tests/heartbeat/test_heartbeat_context_bridge.py delete mode 100644 tests/heartbeat/test_heartbeat_deliverability.py diff --git a/.agent/gotchas.md b/.agent/gotchas.md index 4ad462169..61765b11f 100644 --- a/.agent/gotchas.md +++ b/.agent/gotchas.md @@ -31,10 +31,6 @@ Tool descriptions, skills, and replayed session history also shape model behavio Anything written into memory, session history, or prompt inputs can be replayed into future LLM calls. Metadata such as timestamps, local media paths, tool-call echoes, and raw fallback dumps must be bounded and sanitized before they become examples for the model to imitate. -## Heartbeat Virtual Tool Call - -The heartbeat service (`heartbeat/service.py`) does not parse free-text LLM output. Instead, it injects a virtual `heartbeat` tool with `action: skip | run` into the conversation. Phase 1 is a structured decision; Phase 2 executes only on `run`. When adding new periodic background checks, follow this virtual-tool-call pattern rather than string matching. - ## Skills as Extension Point Built-in skills live in `nanobot/skills/` (markdown + YAML frontmatter format). Agent capabilities that are "know-how" rather than code should be added as skills, not hardcoded into the agent loop. External skills can be published to and installed from ClawHub. diff --git a/CLAUDE.md b/CLAUDE.md index d63dd593b..4408c18ce 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -47,7 +47,7 @@ Messages flow through an async `MessageBus` (`nanobot/bus/queue.py`) that decoup - **WebUI** (`webui/`): Vite-based React SPA that talks to the gateway over a WebSocket multiplex protocol. The dev server proxies `/api`, `/webui`, `/auth`, and WebSocket traffic to the gateway. - **API Server** (`nanobot/api/server.py`): OpenAI-compatible HTTP API (`/v1/chat/completions`, `/v1/models`) for programmatic access. - **Command Router** (`nanobot/command/`): Slash command routing and built-in command handlers. -- **Heartbeat** (`nanobot/heartbeat/`): Periodic agent wake-up service for scheduled task checking. +- **Heartbeat** (`nanobot/templates/HEARTBEAT.md`): Periodic task list checked via `cron` jobs (legacy dedicated service removed). - **Pairing** (`nanobot/pairing/`): DM sender approval store with persistent pairing codes per channel. - **Skills** (`nanobot/skills/`): Built-in skill definitions (long-goal, cron, github, image-generation, etc.) loaded into agent context. - **Security** (`nanobot/security/`): PTH file guard and other security measures activated at CLI entry. diff --git a/core_agent_lines.sh b/core_agent_lines.sh index 94cc854bd..fbff18363 100755 --- a/core_agent_lines.sh +++ b/core_agent_lines.sh @@ -46,17 +46,15 @@ core_agent=$(count_top_level_py_lines "nanobot/agent") core_bus=$(count_top_level_py_lines "nanobot/bus") core_config=$(count_top_level_py_lines "nanobot/config") core_cron=$(count_top_level_py_lines "nanobot/cron") -core_heartbeat=$(count_top_level_py_lines "nanobot/heartbeat") core_session=$(count_top_level_py_lines "nanobot/session") print_row "agent/" "$core_agent" print_row "bus/" "$core_bus" print_row "config/" "$core_config" print_row "cron/" "$core_cron" -print_row "heartbeat/" "$core_heartbeat" print_row "session/" "$core_session" -core_total=$((core_agent + core_bus + core_config + core_cron + core_heartbeat + core_session)) +core_total=$((core_agent + core_bus + core_config + core_cron + core_session)) echo "" echo "Separate buckets" diff --git a/docs/configuration.md b/docs/configuration.md index 2bad993f6..78f5e1df7 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1532,7 +1532,7 @@ By default, nanobot uses `UTC` for runtime time context. If you want the agent t } ``` -This affects runtime time strings shown to the model, such as runtime context and heartbeat prompts. It also becomes the default timezone for cron schedules when a cron expression omits `tz`, and for one-shot `at` times when the ISO datetime has no explicit offset. +This affects runtime time strings shown to the model, such as runtime context. It also becomes the default timezone for cron schedules when a cron expression omits `tz`, and for one-shot `at` times when the ISO datetime has no explicit offset. Common examples: `UTC`, `America/New_York`, `America/Los_Angeles`, `Europe/London`, `Europe/Berlin`, `Asia/Tokyo`, `Asia/Shanghai`, `Asia/Singapore`, `Australia/Sydney`. diff --git a/nanobot/agent/context.py b/nanobot/agent/context.py index 7c1779499..68ac5f324 100644 --- a/nanobot/agent/context.py +++ b/nanobot/agent/context.py @@ -3,8 +3,6 @@ import base64 import mimetypes import platform -from contextlib import suppress -from importlib.resources import files as pkg_files from pathlib import Path from typing import Any, Mapping, Sequence @@ -12,12 +10,13 @@ from nanobot.agent.memory import MemoryStore from nanobot.agent.skills import SkillsLoader from nanobot.agent.tools import mcp as mcp_tools from nanobot.agent.tools.registry import ToolRegistry -from nanobot.bus.events import InboundMessage from nanobot.apps.cli import utils as cli_app_utils +from nanobot.bus.events import InboundMessage from nanobot.session.goal_state import goal_state_runtime_lines from nanobot.utils.helpers import ( current_time_str, detect_image_mime, + load_bundled_template, truncate_text, ) from nanobot.utils.prompt_templates import render_template @@ -168,10 +167,9 @@ class ContextBuilder: @staticmethod def _is_template_content(content: str, template_path: str) -> bool: """Check if *content* is identical to the bundled template (user hasn't customized it).""" - with suppress(Exception): - tpl = pkg_files("nanobot") / "templates" / template_path - if tpl.is_file(): - return content.strip() == tpl.read_text(encoding="utf-8").strip() + tpl = load_bundled_template(template_path) + if tpl is not None: + return content.strip() == tpl.strip() return False def build_messages( diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index f7bf043a4..1c0cbbdfc 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -1,6 +1,7 @@ """CLI commands for nanobot.""" import asyncio +import functools import os import select import signal @@ -75,6 +76,7 @@ class SafeFileHistory(FileHistory): from nanobot.cli.stream import StreamRenderer, ThinkingSpinner from nanobot.config.paths import get_workspace_path, is_default_workspace from nanobot.config.schema import Config +from nanobot.utils.evaluator import evaluate_response from nanobot.utils.helpers import sync_workspace_templates from nanobot.utils.restart import ( consume_restart_notice_from_env, @@ -94,6 +96,20 @@ EXIT_COMMANDS = {"exit", "quit", "/exit", "/quit", ":q"} _REASONING_SENTENCE_ENDINGS = (".", "!", "?", "。", "!", "?") _REASONING_FLUSH_CHARS = 60 +_HEARTBEAT_PREAMBLE = ( + "[Your response will be delivered directly to the user's messaging app. " + "Output ONLY the final user-facing message. Never reference internal " + "files (HEARTBEAT.md, AWARENESS.md, etc.), your instructions, or your " + "decision process. If nothing needs reporting, respond with just " + "'All clear.' and nothing else.]\n\n" +) + + +@functools.lru_cache(maxsize=None) +def _heartbeat_template() -> str | None: + from nanobot.utils.helpers import load_bundled_template + return load_bundled_template("HEARTBEAT.md") + # --------------------------------------------------------------------------- # CLI input: prompt_toolkit for editing, paste, history, and display # --------------------------------------------------------------------------- @@ -718,7 +734,6 @@ def _run_gateway( from nanobot.channels.websocket import publish_runtime_model_update from nanobot.cron.service import CronService from nanobot.cron.types import CronJob - from nanobot.heartbeat.service import HeartbeatService from nanobot.providers.factory import build_provider_snapshot, load_provider_snapshot from nanobot.providers.image_generation import image_gen_provider_configs from nanobot.session.manager import SessionManager @@ -810,6 +825,9 @@ def _run_gateway( # Set cron callback (needs agent) async def on_cron_job(job: CronJob) -> str | None: """Execute a cron job through the agent.""" + async def _silent(*_args, **_kwargs): + pass + # Dream is an internal job — run directly, not through the agent loop. if job.name == "dream": try: @@ -819,7 +837,56 @@ def _run_gateway( logger.exception("Dream cron job failed") return None - from nanobot.utils.evaluator import evaluate_response + # Heartbeat is a system job that checks HEARTBEAT.md for active tasks. + if job.name == "heartbeat": + heartbeat_file = config.workspace_path / "HEARTBEAT.md" + try: + content = heartbeat_file.read_text(encoding="utf-8") + except OSError: + logger.debug("Heartbeat: HEARTBEAT.md missing") + return None + if not content or content == _heartbeat_template(): + logger.debug("Heartbeat: HEARTBEAT.md empty or identical to template") + return None + + channel, chat_id = _pick_heartbeat_target() + if channel == "cli": + return None + + prompt = ( + _HEARTBEAT_PREAMBLE + + f"Review the following HEARTBEAT.md and report any active tasks:\n\n{content}" + ) + + resp = await agent.process_direct( + prompt, + session_key="heartbeat", + channel=channel, + chat_id=chat_id, + on_progress=_silent, + ) + response = resp.content if resp else "" + + # Keep a small tail of heartbeat history so the loop stays bounded. + session = agent.sessions.get_or_create("heartbeat") + session.retain_recent_legal_suffix(hb_cfg.keep_recent_messages) + agent.sessions.save(session) + + if not response: + return None + + should_notify = await evaluate_response( + response, prompt, agent.provider, agent.model, + ) + if should_notify: + logger.info("Heartbeat: completed, delivering response") + await _deliver_to_channel( + OutboundMessage(channel=channel, chat_id=chat_id, content=response), + record=True, + ) + else: + logger.info("Heartbeat: silenced by post-run evaluation") + return response reminder_note = ( "The scheduled time has arrived. Deliver this reminder to the user now, " @@ -834,9 +901,6 @@ def _run_gateway( if isinstance(cron_tool, CronTool): cron_token = cron_tool.set_cron_context(True) - async def _silent(*_args, **_kwargs): - pass - message_record_token = None if isinstance(message_tool, MessageTool): message_record_token = message_tool.set_record_channel_delivery(True) @@ -898,7 +962,6 @@ def _run_gateway( def _pick_heartbeat_target() -> tuple[str, str]: """Pick a routable channel/chat target for heartbeat-triggered messages.""" enabled = set(channels.enabled_channels) - # Prefer the most recently updated non-internal session on an enabled channel. for item in session_manager.list_sessions(): key = item.get("key") or "" if ":" not in key: @@ -908,70 +971,8 @@ def _run_gateway( continue if channel in enabled and chat_id: return channel, chat_id - # Fallback keeps prior behavior but remains explicit. return "cli", "direct" - # Create heartbeat service - heartbeat_preamble = ( - "[Your response will be delivered directly to the user's messaging app. " - "Output ONLY the final user-facing message. Never reference internal " - "files (HEARTBEAT.md, AWARENESS.md, etc.), your instructions, or your " - "decision process. If nothing needs reporting, respond with just " - "'All clear.' and nothing else.]\n\n" - ) - - async def on_heartbeat_execute(tasks: str) -> str: - """Phase 2: execute heartbeat tasks through the full agent loop.""" - channel, chat_id = _pick_heartbeat_target() - - async def _silent(*_args, **_kwargs): - pass - - resp = await agent.process_direct( - heartbeat_preamble + tasks, - session_key="heartbeat", - channel=channel, - chat_id=chat_id, - on_progress=_silent, - ) - - # Keep a small tail of heartbeat history so the loop stays bounded - # without losing all short-term context between runs. - session = agent.sessions.get_or_create("heartbeat") - session.retain_recent_legal_suffix(hb_cfg.keep_recent_messages) - agent.sessions.save(session) - - return resp.content if resp else "" - - async def on_heartbeat_notify(response: str) -> None: - """Deliver a heartbeat response to the user's channel. - - In addition to publishing the outbound message, this injects the - delivered text as an assistant turn into the *target channel's* - session. Without this, a user reply on the channel (e.g. "Sure") - lands in a session that has no context about the heartbeat message - and the agent cannot follow through. - """ - channel, chat_id = _pick_heartbeat_target() - if channel == "cli": - return # No external channel available to deliver to - - await _deliver_to_channel( - OutboundMessage(channel=channel, chat_id=chat_id, content=response), - record=True, - ) - - hb_cfg = config.gateway.heartbeat - heartbeat = HeartbeatService( - workspace=config.workspace_path, - llm_runtime=agent.llm_runtime, - on_execute=on_heartbeat_execute, - on_notify=on_heartbeat_notify, - interval_s=hb_cfg.interval_s, - enabled=hb_cfg.enabled, - timezone=config.agents.defaults.timezone, - ) - if channels.enabled_channels: console.print(f"[green]✓[/green] Channels enabled: {', '.join(channels.enabled_channels)}") else: @@ -981,7 +982,11 @@ def _run_gateway( if cron_status["jobs"] > 0: console.print(f"[green]✓[/green] Cron: {cron_status['jobs']} scheduled jobs") - console.print(f"[green]✓[/green] Heartbeat: every {hb_cfg.interval_s}s") + hb_cfg = config.gateway.heartbeat + if hb_cfg.enabled: + console.print(f"[green]✓[/green] Heartbeat: every {hb_cfg.interval_s}s") + else: + console.print("[yellow]✗[/yellow] Heartbeat: disabled") async def _health_server(host: str, health_port: int): """Lightweight HTTP health endpoint on the gateway port.""" @@ -1032,7 +1037,7 @@ def _run_gateway( agent.dream.max_batch_size = dream_cfg.max_batch_size agent.dream.max_iterations = dream_cfg.max_iterations agent.dream.annotate_line_ages = dream_cfg.annotate_line_ages - from nanobot.cron.types import CronJob, CronPayload + from nanobot.cron.types import CronJob, CronPayload, CronSchedule cron.register_system_job(CronJob( id="dream", name="dream", @@ -1041,6 +1046,19 @@ def _run_gateway( )) console.print(f"[green]✓[/green] Dream: {dream_cfg.describe_schedule()}") + # Register Heartbeat system job (idempotent on restart) + if hb_cfg.enabled: + cron.register_system_job(CronJob( + id="heartbeat", + name="heartbeat", + schedule=CronSchedule( + kind="every", + every_ms=hb_cfg.interval_s * 1000, + tz=config.agents.defaults.timezone, + ), + payload=CronPayload(kind="system_event"), + )) + async def _open_browser_when_ready() -> None: """Wait for the gateway to bind, then point the user's browser at the webui.""" if not open_browser_url: @@ -1067,7 +1085,6 @@ def _run_gateway( async def run(): try: await cron.start() - await heartbeat.start() tasks = [ agent.run(), channels.start_all(), @@ -1085,7 +1102,6 @@ def _run_gateway( console.print(traceback.format_exc()) finally: await agent.close_mcp() - heartbeat.stop() cron.stop() agent.stop() await channels.stop_all() diff --git a/nanobot/cli/onboard.py b/nanobot/cli/onboard.py index 9f5fc0a88..98c061731 100644 --- a/nanobot/cli/onboard.py +++ b/nanobot/cli/onboard.py @@ -1155,7 +1155,7 @@ _SETTINGS_SECTIONS: dict[str, tuple[str, str, set[str] | None]] = { "Agent Settings": ("Agent Defaults", "Configure default model, temperature, and behavior", None), "Channel Common": ("Channel Common", "Configure cross-channel behavior: progress, tool hints, retries", None), "API Server": ("API Server", "Configure OpenAI-compatible API endpoint", None), - "Gateway": ("Gateway Settings", "Configure server host, port, and heartbeat", None), + "Gateway": ("Gateway Settings", "Configure server host, port", None), "Tools": ("Tools Settings", "Configure web search, shell exec, and other tools", {"mcp_servers"}), } diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index f194820cc..db6650d91 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -236,7 +236,7 @@ class ProvidersConfig(Base): class HeartbeatConfig(Base): - """Heartbeat service configuration.""" + """Heartbeat service configuration (now backed by cron).""" enabled: bool = True interval_s: int = 30 * 60 # 30 minutes diff --git a/nanobot/heartbeat/__init__.py b/nanobot/heartbeat/__init__.py deleted file mode 100644 index 2ecd87952..000000000 --- a/nanobot/heartbeat/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Heartbeat service for periodic agent wake-ups.""" - -from nanobot.heartbeat.service import HeartbeatService - -__all__ = ["HeartbeatService"] diff --git a/nanobot/heartbeat/service.py b/nanobot/heartbeat/service.py deleted file mode 100644 index 55d26cf11..000000000 --- a/nanobot/heartbeat/service.py +++ /dev/null @@ -1,243 +0,0 @@ -"""Heartbeat service - periodic agent wake-up to check for tasks.""" - -from __future__ import annotations - -import asyncio -from pathlib import Path -from typing import Any, Callable, Coroutine - -from loguru import logger - -from nanobot.providers.base import LLMProvider -from nanobot.utils.llm_runtime import LLMRuntimeResolver, static_llm_runtime - -_HEARTBEAT_TOOL = [ - { - "type": "function", - "function": { - "name": "heartbeat", - "description": "Report heartbeat decision after reviewing tasks.", - "parameters": { - "type": "object", - "properties": { - "action": { - "type": "string", - "enum": ["skip", "run"], - "description": "skip = nothing to do, run = has active tasks", - }, - "tasks": { - "type": "string", - "description": "Natural-language summary of active tasks (required for run)", - }, - }, - "required": ["action"], - }, - }, - } -] - - -class HeartbeatService: - """ - Periodic heartbeat service that wakes the agent to check for tasks. - - Phase 1 (decision): reads HEARTBEAT.md and asks the LLM — via a virtual - tool call — whether there are active tasks. This avoids free-text parsing - and the unreliable HEARTBEAT_OK token. - - Phase 2 (execution): only triggered when Phase 1 returns ``run``. The - ``on_execute`` callback runs the task through the full agent loop and - returns the result to deliver. - """ - - def __init__( - self, - workspace: Path, - provider: LLMProvider | None = None, - model: str | None = None, - on_execute: Callable[[str], Coroutine[Any, Any, str]] | None = None, - on_notify: Callable[[str], Coroutine[Any, Any, None]] | None = None, - interval_s: int = 30 * 60, - enabled: bool = True, - timezone: str | None = None, - llm_runtime: LLMRuntimeResolver | None = None, - ): - self.workspace = workspace - if llm_runtime is None: - if provider is None or model is None: - raise ValueError("HeartbeatService requires either llm_runtime or provider/model") - llm_runtime = static_llm_runtime(provider, model) - self._llm_runtime = llm_runtime - self.on_execute = on_execute - self.on_notify = on_notify - self.interval_s = interval_s - self.enabled = enabled - self.timezone = timezone - self._running = False - self._task: asyncio.Task | None = None - - @property - def heartbeat_file(self) -> Path: - return self.workspace / "HEARTBEAT.md" - - def _read_heartbeat_file(self) -> str | None: - if self.heartbeat_file.exists(): - try: - return self.heartbeat_file.read_text(encoding="utf-8") - except Exception: - return None - return None - - async def _decide(self, content: str) -> tuple[str, str]: - """Phase 1: ask LLM to decide skip/run via virtual tool call. - - Returns (action, tasks) where action is 'skip' or 'run'. - """ - from nanobot.utils.helpers import current_time_str - - llm = self._llm_runtime() - - response = await llm.provider.chat_with_retry( - messages=[ - {"role": "system", "content": "You are a heartbeat agent. Call the heartbeat tool to report your decision."}, - {"role": "user", "content": ( - f"Current Time: {current_time_str(self.timezone)}\n\n" - "Review the following HEARTBEAT.md and decide whether there are active tasks.\n\n" - f"{content}" - )}, - ], - tools=_HEARTBEAT_TOOL, - model=llm.model, - ) - - if not response.should_execute_tools: - if response.has_tool_calls: - logger.warning( - "Ignoring heartbeat tool calls under finish_reason='{}'", - response.finish_reason, - ) - return "skip", "" - - args = response.tool_calls[0].arguments - return args.get("action", "skip"), args.get("tasks", "") - - async def start(self) -> None: - """Start the heartbeat service.""" - if not self.enabled: - logger.info("Heartbeat disabled") - return - if self._running: - logger.warning("Heartbeat already running") - return - - self._running = True - self._task = asyncio.create_task(self._run_loop()) - logger.info("Heartbeat started (every {}s)", self.interval_s) - - def stop(self) -> None: - """Stop the heartbeat service.""" - self._running = False - if self._task: - self._task.cancel() - self._task = None - - async def _run_loop(self) -> None: - """Main heartbeat loop.""" - while self._running: - try: - await asyncio.sleep(self.interval_s) - if self._running: - await self._tick() - except asyncio.CancelledError: - break - except Exception: - logger.exception("Heartbeat error") - - @staticmethod - def _is_deliverable(response: str) -> bool: - """Check if a heartbeat response is suitable for user delivery. - - Filters out two classes of bad output before the evaluator runs: - - 1. **Finalization fallback** — the runner hit empty-response retries - and produced a canned error message. For heartbeat, empty output - is a valid "nothing to report" outcome, not a failure. - 2. **Leaked reasoning** — the model reflected internal file names, - decision logic, or meta-commentary instead of a user-facing report. - """ - text = response.lower() - - # Runner finalization fallback - if "couldn't produce a final answer" in text: - return False - - # Leaked internal reasoning patterns - leaked_patterns = [ - "heartbeat.md", - "awareness.md", - "judgment call:", - "decision logic", - "valid options are", - "my instructions", - "i am supposed to", - "strict heartbeat interpretation", - ] - if any(pattern in text for pattern in leaked_patterns): - return False - - return True - - async def _tick(self) -> None: - """Execute a single heartbeat tick.""" - from nanobot.utils.evaluator import evaluate_response - - content = self._read_heartbeat_file() - if not content: - logger.debug("Heartbeat: HEARTBEAT.md missing or empty") - return - - logger.info("Heartbeat: checking for tasks...") - - try: - action, tasks = await self._decide(content) - - if action != "run": - logger.info("Heartbeat: OK (nothing to report)") - return - - logger.info("Heartbeat: tasks found, executing...") - if self.on_execute: - response = await self.on_execute(tasks) - - if not response: - logger.info("Heartbeat: no response from execution") - return - - if not self._is_deliverable(response): - logger.info( - "Heartbeat: suppressed non-deliverable response ({})", - response[:80], - ) - return - - llm = self._llm_runtime() - should_notify = await evaluate_response( - response, tasks, llm.provider, llm.model, - ) - if should_notify and self.on_notify: - logger.info("Heartbeat: completed, delivering response") - await self.on_notify(response) - else: - logger.info("Heartbeat: silenced by post-run evaluation") - except Exception: - logger.exception("Heartbeat execution failed") - - async def trigger_now(self) -> str | None: - """Manually trigger a heartbeat.""" - content = self._read_heartbeat_file() - if not content: - return None - action, tasks = await self._decide(content) - if action != "run" or not self.on_execute: - return None - return await self.on_execute(tasks) diff --git a/nanobot/templates/AGENTS.md b/nanobot/templates/AGENTS.md index 46cfc08c3..a6c046de4 100644 --- a/nanobot/templates/AGENTS.md +++ b/nanobot/templates/AGENTS.md @@ -14,10 +14,10 @@ Get USER_ID and CHANNEL from the current session (e.g., `8281248569` and `telegr ## Heartbeat Tasks -`HEARTBEAT.md` is checked on the configured heartbeat interval. Use file tools to manage periodic tasks. +`HEARTBEAT.md` is checked periodically when registered as a cron job. Use the built-in `cron` tool to schedule it (e.g. `cron add --name heartbeat --schedule "every 30m" --message "Check HEARTBEAT.md"`). - Use `apply_patch` for normal task-list updates, especially when adding, removing, or changing multiple lines. - Use `edit_file` only for small exact replacements copied from the current `HEARTBEAT.md`. - Use `write_file` for first creation or intentional full-file rewrites. -When the user asks for a recurring/periodic task, update `HEARTBEAT.md` instead of creating a one-time cron reminder. +When the user asks for a recurring/periodic task, update `HEARTBEAT.md` and register it via `cron` instead of creating a one-time reminder. diff --git a/nanobot/templates/HEARTBEAT.md b/nanobot/templates/HEARTBEAT.md index 322dbeb14..ea36a9650 100644 --- a/nanobot/templates/HEARTBEAT.md +++ b/nanobot/templates/HEARTBEAT.md @@ -1,9 +1,9 @@ # Heartbeat Tasks -This file is checked every 30 minutes by your nanobot agent. -Add tasks below that you want the agent to work on periodically. +This file is checked periodically by your nanobot agent. +Register it as a cron job (e.g. `cron add --name heartbeat --schedule "every 30m" --message "Check HEARTBEAT.md"`) to get the same behavior as the legacy heartbeat service. -If this file has no tasks (only headers and comments), the agent will skip the heartbeat. +If this file has no tasks (only headers and comments), the agent will skip it. ## Active Tasks diff --git a/nanobot/templates/agent/tool_contract.md b/nanobot/templates/agent/tool_contract.md index 8d34ad861..ba65cfc79 100644 --- a/nanobot/templates/agent/tool_contract.md +++ b/nanobot/templates/agent/tool_contract.md @@ -63,5 +63,5 @@ documents the general tool contract and non-obvious usage patterns. ## Scheduling and Background Work - Use `cron` for scheduled reminders or recurring jobs; do not run `nanobot cron` through `exec`. -- For heartbeat tasks, update `HEARTBEAT.md` according to the agent instructions. +- For heartbeat tasks, register `HEARTBEAT.md` as a cron job according to the agent instructions. - Do not write reminders only to memory files when the user expects an actual notification. diff --git a/nanobot/utils/helpers.py b/nanobot/utils/helpers.py index ae91bf394..6341bc2bc 100644 --- a/nanobot/utils/helpers.py +++ b/nanobot/utils/helpers.py @@ -626,3 +626,14 @@ def sync_workspace_templates(workspace: Path, silent: bool = False) -> list[str] logger.exception("Failed to initialize git store for {}", workspace) return added + + +def load_bundled_template(template_name: str) -> str | None: + """Read a bundled template file from the nanobot package.""" + from importlib.resources import files as pkg_files + + with suppress(Exception): + tpl = pkg_files("nanobot") / "templates" / template_name + if tpl.is_file(): + return tpl.read_text(encoding="utf-8") + return None diff --git a/tests/agent/test_heartbeat_service.py b/tests/agent/test_heartbeat_service.py deleted file mode 100644 index fe7b54256..000000000 --- a/tests/agent/test_heartbeat_service.py +++ /dev/null @@ -1,336 +0,0 @@ -import asyncio - -import pytest - -from nanobot.heartbeat.service import HeartbeatService -from nanobot.providers.base import LLMProvider, LLMResponse, ToolCallRequest -from nanobot.utils.llm_runtime import LLMRuntime - - -class DummyProvider(LLMProvider): - def __init__(self, responses: list[LLMResponse]): - super().__init__() - self._responses = list(responses) - self.calls = 0 - self.models: list[str | None] = [] - - async def chat(self, *args, **kwargs) -> LLMResponse: - self.calls += 1 - self.models.append(kwargs.get("model")) - if self._responses: - return self._responses.pop(0) - return LLMResponse(content="", tool_calls=[]) - - def get_default_model(self) -> str: - return "test-model" - - -@pytest.mark.asyncio -async def test_start_is_idempotent(tmp_path) -> None: - provider = DummyProvider([]) - - service = HeartbeatService( - workspace=tmp_path, - provider=provider, - model="openai/gpt-4o-mini", - interval_s=9999, - enabled=True, - ) - - await service.start() - first_task = service._task - await service.start() - - assert service._task is first_task - - service.stop() - await asyncio.sleep(0) - - -@pytest.mark.asyncio -async def test_decide_returns_skip_when_no_tool_call(tmp_path) -> None: - provider = DummyProvider([LLMResponse(content="no tool call", tool_calls=[])]) - service = HeartbeatService( - workspace=tmp_path, - provider=provider, - model="openai/gpt-4o-mini", - ) - - action, tasks = await service._decide("heartbeat content") - assert action == "skip" - assert tasks == "" - - -@pytest.mark.asyncio -async def test_trigger_now_executes_when_decision_is_run(tmp_path) -> None: - (tmp_path / "HEARTBEAT.md").write_text("- [ ] do thing", encoding="utf-8") - - provider = DummyProvider([ - LLMResponse( - content="", - tool_calls=[ - ToolCallRequest( - id="hb_1", - name="heartbeat", - arguments={"action": "run", "tasks": "check open tasks"}, - ) - ], - ) - ]) - - called_with: list[str] = [] - - async def _on_execute(tasks: str) -> str: - called_with.append(tasks) - return "done" - - service = HeartbeatService( - workspace=tmp_path, - provider=provider, - model="openai/gpt-4o-mini", - on_execute=_on_execute, - ) - - result = await service.trigger_now() - assert result == "done" - assert called_with == ["check open tasks"] - - -@pytest.mark.asyncio -async def test_trigger_now_returns_none_when_decision_is_skip(tmp_path) -> None: - (tmp_path / "HEARTBEAT.md").write_text("- [ ] do thing", encoding="utf-8") - - provider = DummyProvider([ - LLMResponse( - content="", - tool_calls=[ - ToolCallRequest( - id="hb_1", - name="heartbeat", - arguments={"action": "skip"}, - ) - ], - ) - ]) - - async def _on_execute(tasks: str) -> str: - return tasks - - service = HeartbeatService( - workspace=tmp_path, - provider=provider, - model="openai/gpt-4o-mini", - on_execute=_on_execute, - ) - - assert await service.trigger_now() is None - - -@pytest.mark.asyncio -async def test_tick_notifies_when_evaluator_says_yes(tmp_path, monkeypatch) -> None: - """Phase 1 run -> Phase 2 execute -> Phase 3 evaluate=notify -> on_notify called.""" - (tmp_path / "HEARTBEAT.md").write_text("- [ ] check deployments", encoding="utf-8") - - provider = DummyProvider([ - LLMResponse( - content="", - tool_calls=[ - ToolCallRequest( - id="hb_1", - name="heartbeat", - arguments={"action": "run", "tasks": "check deployments"}, - ) - ], - ), - ]) - - executed: list[str] = [] - notified: list[str] = [] - - async def _on_execute(tasks: str) -> str: - executed.append(tasks) - return "deployment failed on staging" - - async def _on_notify(response: str) -> None: - notified.append(response) - - service = HeartbeatService( - workspace=tmp_path, - provider=provider, - model="openai/gpt-4o-mini", - on_execute=_on_execute, - on_notify=_on_notify, - ) - - async def _eval_notify(*a, **kw): - return True - - monkeypatch.setattr("nanobot.utils.evaluator.evaluate_response", _eval_notify) - - await service._tick() - assert executed == ["check deployments"] - assert notified == ["deployment failed on staging"] - - -@pytest.mark.asyncio -async def test_tick_suppresses_when_evaluator_says_no(tmp_path, monkeypatch) -> None: - """Phase 1 run -> Phase 2 execute -> Phase 3 evaluate=silent -> on_notify NOT called.""" - (tmp_path / "HEARTBEAT.md").write_text("- [ ] check status", encoding="utf-8") - - provider = DummyProvider([ - LLMResponse( - content="", - tool_calls=[ - ToolCallRequest( - id="hb_1", - name="heartbeat", - arguments={"action": "run", "tasks": "check status"}, - ) - ], - ), - ]) - - executed: list[str] = [] - notified: list[str] = [] - - async def _on_execute(tasks: str) -> str: - executed.append(tasks) - return "everything is fine, no issues" - - async def _on_notify(response: str) -> None: - notified.append(response) - - service = HeartbeatService( - workspace=tmp_path, - provider=provider, - model="openai/gpt-4o-mini", - on_execute=_on_execute, - on_notify=_on_notify, - ) - - async def _eval_silent(*a, **kw): - return False - - monkeypatch.setattr("nanobot.utils.evaluator.evaluate_response", _eval_silent) - - await service._tick() - assert executed == ["check status"] - assert notified == [] - - -def test_tick_uses_runtime_provider_and_model(tmp_path, monkeypatch) -> None: - """Preset changes must apply to heartbeat decision and post-run evaluation.""" - (tmp_path / "HEARTBEAT.md").write_text("- [ ] check runtime model", encoding="utf-8") - - runtime_provider = DummyProvider([ - LLMResponse( - content="", - tool_calls=[ - ToolCallRequest( - id="hb_1", - name="heartbeat", - arguments={"action": "run", "tasks": "check runtime model"}, - ) - ], - ), - ]) - runtime_model = "openai/gpt-4.1" - - executed: list[str] = [] - evaluated: list[tuple[LLMProvider, str]] = [] - - async def _on_execute(tasks: str) -> str: - executed.append(tasks) - return "runtime model produced a user-facing update" - - async def _eval_capture(response, tasks, provider, model): - evaluated.append((provider, model)) - return False - - service = HeartbeatService( - workspace=tmp_path, - llm_runtime=lambda: LLMRuntime(runtime_provider, runtime_model), - on_execute=_on_execute, - ) - - monkeypatch.setattr("nanobot.utils.evaluator.evaluate_response", _eval_capture) - - asyncio.run(service._tick()) - - assert runtime_provider.calls == 1 - assert runtime_provider.models == [runtime_model] - assert executed == ["check runtime model"] - assert evaluated == [(runtime_provider, runtime_model)] - - -@pytest.mark.asyncio -async def test_decide_retries_transient_error_then_succeeds(tmp_path, monkeypatch) -> None: - provider = DummyProvider([ - LLMResponse(content="429 rate limit", finish_reason="error"), - LLMResponse( - content="", - tool_calls=[ - ToolCallRequest( - id="hb_1", - name="heartbeat", - arguments={"action": "run", "tasks": "check open tasks"}, - ) - ], - ), - ]) - - delays: list[int] = [] - - async def _fake_sleep(delay: int) -> None: - delays.append(delay) - - monkeypatch.setattr(asyncio, "sleep", _fake_sleep) - - service = HeartbeatService( - workspace=tmp_path, - provider=provider, - model="openai/gpt-4o-mini", - ) - - action, tasks = await service._decide("heartbeat content") - - assert action == "run" - assert tasks == "check open tasks" - assert provider.calls == 2 - assert delays == [1] - - -@pytest.mark.asyncio -async def test_decide_prompt_includes_current_time(tmp_path) -> None: - """Phase 1 user prompt must contain current time so the LLM can judge task urgency.""" - - captured_messages: list[dict] = [] - - class CapturingProvider(LLMProvider): - async def chat(self, *, messages=None, **kwargs) -> LLMResponse: - if messages: - captured_messages.extend(messages) - return LLMResponse( - content="", - tool_calls=[ - ToolCallRequest( - id="hb_1", name="heartbeat", - arguments={"action": "skip"}, - ) - ], - ) - - def get_default_model(self) -> str: - return "test-model" - - service = HeartbeatService( - workspace=tmp_path, - provider=CapturingProvider(), - model="test-model", - ) - - await service._decide("- [ ] check servers at 10:00 UTC") - - user_msg = captured_messages[1] - assert user_msg["role"] == "user" - assert "Current Time:" in user_msg["content"] diff --git a/tests/agent/test_loop_save_turn.py b/tests/agent/test_loop_save_turn.py index 06ed8a0c4..cc8bba804 100644 --- a/tests/agent/test_loop_save_turn.py +++ b/tests/agent/test_loop_save_turn.py @@ -602,17 +602,17 @@ async def test_process_message_uses_explicit_session_metadata_for_goal_context( chat_session = loop.sessions.get_or_create("websocket:chat-with-goal") chat_session.metadata[GOAL_STATE_KEY] = { "status": "active", - "objective": "This chat goal must not leak into heartbeat.", + "objective": "This chat goal must not leak into system.", } loop.sessions.save(chat_session) - system_session = loop.sessions.get_or_create("heartbeat") + system_session = loop.sessions.get_or_create("system") system_session.metadata = {} loop.sessions.save(system_session) loop.context.build_messages = MagicMock( # type: ignore[method-assign] return_value=[ {"role": "system", "content": "system"}, - {"role": "user", "content": "runtime + heartbeat"}, + {"role": "user", "content": "runtime + system"}, ] ) loop._run_agent_loop = AsyncMock(return_value=( # type: ignore[method-assign] @@ -620,7 +620,7 @@ async def test_process_message_uses_explicit_session_metadata_for_goal_context( [], [ {"role": "system", "content": "system"}, - {"role": "user", "content": "runtime + heartbeat"}, + {"role": "user", "content": "runtime + system"}, {"role": "assistant", "content": "ok"}, ], "stop", @@ -630,11 +630,11 @@ async def test_process_message_uses_explicit_session_metadata_for_goal_context( result = await loop._process_message( InboundMessage( channel="websocket", - sender_id="heartbeat", + sender_id="system", chat_id="chat-with-goal", - content="heartbeat work", + content="system work", ), - session_key="heartbeat", + session_key="system", ) assert result is not None diff --git a/tests/cli/test_commands.py b/tests/cli/test_commands.py index 1dede1e13..ba2a89a3d 100644 --- a/tests/cli/test_commands.py +++ b/tests/cli/test_commands.py @@ -1589,16 +1589,6 @@ def test_gateway_health_endpoint_binds_and_serves_expected_responses( def register_system_job(self, _job) -> None: return None - class _FakeHeartbeatService: - def __init__(self, **_kwargs) -> None: - return None - - async def start(self) -> None: - return None - - def stop(self) -> None: - return None - class _FakeServer: async def __aenter__(self): return self @@ -1645,7 +1635,6 @@ def test_gateway_health_endpoint_binds_and_serves_expected_responses( monkeypatch.setattr("nanobot.cli.commands.AgentLoop", _FakeAgentLoop) monkeypatch.setattr("nanobot.channels.manager.ChannelManager", _FakeChannelManager) monkeypatch.setattr("nanobot.cron.service.CronService", _FakeCronService) - monkeypatch.setattr("nanobot.heartbeat.service.HeartbeatService", _FakeHeartbeatService) monkeypatch.setattr("asyncio.start_server", _fake_start_server) result = runner.invoke(app, ["gateway", "--config", str(config_file)]) diff --git a/tests/heartbeat/test_heartbeat_context_bridge.py b/tests/heartbeat/test_heartbeat_context_bridge.py deleted file mode 100644 index 5ec02a8bb..000000000 --- a/tests/heartbeat/test_heartbeat_context_bridge.py +++ /dev/null @@ -1,120 +0,0 @@ -"""Tests for heartbeat context bridge — injecting delivered messages into channel session.""" - -from nanobot.session.manager import SessionManager - - -class TestHeartbeatContextBridge: - """Verify that on_heartbeat_notify injects the assistant message into the - channel session so user replies have conversational context.""" - - def test_notify_injects_into_channel_session(self, tmp_path): - """After notify, the target channel session should contain the - heartbeat response as an assistant turn.""" - session_mgr = SessionManager(tmp_path / "sessions") - target_key = "telegram:12345" - - # Simulate: session exists with one user message - target_session = session_mgr.get_or_create(target_key) - target_session.add_message("user", "hello earlier") - session_mgr.save(target_session) - - # Simulate what on_heartbeat_notify does - target_session = session_mgr.get_or_create(target_key) - target_session.add_message( - "assistant", - "3 new emails — invoice, meeting, proposal.", - _channel_delivery=True, - ) - session_mgr.save(target_session) - - # Reload and verify - reloaded = session_mgr.get_or_create(target_key) - messages = reloaded.get_history(max_messages=0) - roles = [m["role"] for m in messages] - assert roles == ["user", "assistant"] - assert "3 new emails" in messages[-1]["content"] - - def test_reply_after_injection_has_context(self, tmp_path): - """Simulates the full flow: prior conversation exists, heartbeat - injects, then user replies. The session should have the heartbeat - message visible in get_history so the model sees the context.""" - session_mgr = SessionManager(tmp_path / "sessions") - target_key = "telegram:12345" - - # Pre-existing conversation (user has chatted before) - session = session_mgr.get_or_create(target_key) - session.add_message("user", "Hey") - session.add_message("assistant", "Hi there!") - session_mgr.save(session) - - # Step 1: heartbeat injects assistant message - session = session_mgr.get_or_create(target_key) - session.add_message( - "assistant", - "If you want, I can mark that email as read.", - _channel_delivery=True, - ) - session_mgr.save(session) - - # Step 2: user replies "Sure" - session = session_mgr.get_or_create(target_key) - session.add_message("user", "Sure") - session_mgr.save(session) - - # Verify: get_history includes the heartbeat injection - reloaded = session_mgr.get_or_create(target_key) - history = reloaded.get_history(max_messages=0) - roles = [m["role"] for m in history] - assert roles == ["user", "assistant", "assistant", "user"] - assert "mark that email" in history[2]["content"] - assert history[3]["content"] == "Sure" - - def test_injection_does_not_duplicate_on_existing_history(self, tmp_path): - """If the channel session already has messages, the injection - appends cleanly without corruption.""" - session_mgr = SessionManager(tmp_path / "sessions") - target_key = "telegram:12345" - - # Pre-existing conversation - session = session_mgr.get_or_create(target_key) - session.add_message("user", "What time is it?") - session.add_message("assistant", "It's 2pm.") - session.add_message("user", "Thanks") - session_mgr.save(session) - - # Heartbeat injects - session = session_mgr.get_or_create(target_key) - session.add_message( - "assistant", - "You have a meeting in 30 minutes.", - _channel_delivery=True, - ) - session_mgr.save(session) - - # Verify - reloaded = session_mgr.get_or_create(target_key) - history = reloaded.get_history(max_messages=0) - roles = [m["role"] for m in history] - assert roles == ["user", "assistant", "user", "assistant"] - assert "meeting in 30 minutes" in history[-1]["content"] - - def test_reply_after_injection_to_empty_session_keeps_context(self, tmp_path): - """A user replying to the first delivered message still sees that context.""" - session_mgr = SessionManager(tmp_path / "sessions") - target_key = "telegram:99999" - - session = session_mgr.get_or_create(target_key) - session.add_message( - "assistant", - "Weather alert: sandstorm expected at 4pm.", - _channel_delivery=True, - ) - session.add_message("user", "Sure") - session_mgr.save(session) - - reloaded = session_mgr.get_or_create(target_key) - history = reloaded.get_history(max_messages=0) - assert len(history) == 2 - assert history[0]["role"] == "assistant" - assert "sandstorm" in history[0]["content"] - assert history[1] == {"role": "user", "content": "Sure"} diff --git a/tests/heartbeat/test_heartbeat_deliverability.py b/tests/heartbeat/test_heartbeat_deliverability.py deleted file mode 100644 index 77ae5146e..000000000 --- a/tests/heartbeat/test_heartbeat_deliverability.py +++ /dev/null @@ -1,230 +0,0 @@ -"""Tests for HeartbeatService._is_deliverable and _tick suppression.""" - -import pytest - -from nanobot.heartbeat.service import HeartbeatService -from nanobot.providers.base import LLMResponse, ToolCallRequest - -# --------------------------------------------------------------------------- -# _is_deliverable unit tests -# --------------------------------------------------------------------------- - - -class TestIsDeliverable: - """Verify the pre-evaluator deliverability filter.""" - - def test_normal_report_is_deliverable(self): - assert HeartbeatService._is_deliverable( - "2 new emails — invoice from Zain, meeting rescheduled to 3pm." - ) - - def test_short_dismissal_is_deliverable(self): - assert HeartbeatService._is_deliverable("All clear.") - - def test_finalization_fallback_blocked(self): - assert not HeartbeatService._is_deliverable( - "I completed the tool steps but couldn't produce a final answer. " - "Please try again or narrow the task." - ) - - def test_leaked_heartbeat_md_reference_blocked(self): - assert not HeartbeatService._is_deliverable( - "Yes — HEARTBEAT.md has active tasks listed. They are: " - "Check Gmail for important messages, Check Calendar." - ) - - def test_leaked_awareness_md_reference_blocked(self): - assert not HeartbeatService._is_deliverable( - "I reviewed AWARENESS.md and found no new signals." - ) - - def test_leaked_judgment_call_blocked(self): - assert not HeartbeatService._is_deliverable( - "Best judgment call: stay quiet." - ) - - def test_leaked_decision_logic_blocked(self): - assert not HeartbeatService._is_deliverable( - "Strict HEARTBEAT interpretation. Decision logic says SHORT UPDATE." - ) - - def test_leaked_valid_options_blocked(self): - assert not HeartbeatService._is_deliverable( - "The valid options are FULL REPORT, SHORT UPDATE, or SILENT." - ) - - def test_leaked_my_instructions_blocked(self): - assert not HeartbeatService._is_deliverable( - "My instructions say to check Gmail and Calendar." - ) - - def test_leaked_supposed_to_blocked(self): - assert not HeartbeatService._is_deliverable( - "I am supposed to scan for urgent emails." - ) - - def test_case_insensitive(self): - assert not HeartbeatService._is_deliverable( - "HEARTBEAT.MD has tasks listed." - ) - - def test_empty_string_is_deliverable(self): - """Empty string won't reach _is_deliverable in practice (caught earlier), - but should not crash.""" - assert HeartbeatService._is_deliverable("") - - -# --------------------------------------------------------------------------- -# _tick integration: non-deliverable responses never reach evaluator/notify -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_tick_suppresses_finalization_fallback(tmp_path, monkeypatch) -> None: - """Finalization fallback should be caught before the evaluator runs.""" - (tmp_path / "HEARTBEAT.md").write_text("- [ ] check inbox", encoding="utf-8") - - from nanobot.providers.base import LLMProvider - - class StubProvider(LLMProvider): - async def chat(self, **kwargs) -> LLMResponse: - return LLMResponse( - content="", - tool_calls=[ - ToolCallRequest( - id="hb_1", name="heartbeat", - arguments={"action": "run", "tasks": "check inbox"}, - ) - ], - ) - - def get_default_model(self) -> str: - return "test-model" - - notified: list[str] = [] - evaluator_called = False - - async def _on_execute(tasks: str) -> str: - return ( - "I completed the tool steps but couldn't produce a final answer. " - "Please try again or narrow the task." - ) - - async def _on_notify(response: str) -> None: - notified.append(response) - - async def _eval_always_notify(*a, **kw): - nonlocal evaluator_called - evaluator_called = True - return True - - monkeypatch.setattr("nanobot.utils.evaluator.evaluate_response", _eval_always_notify) - - service = HeartbeatService( - workspace=tmp_path, - provider=StubProvider(), - model="test-model", - on_execute=_on_execute, - on_notify=_on_notify, - ) - - await service._tick() - - assert notified == [], "Finalization fallback should not reach the user" - assert not evaluator_called, "Evaluator should not be called for non-deliverable responses" - - -@pytest.mark.asyncio -async def test_tick_suppresses_leaked_reasoning(tmp_path, monkeypatch) -> None: - """Leaked internal reasoning should be caught before the evaluator runs.""" - (tmp_path / "HEARTBEAT.md").write_text("- [ ] check status", encoding="utf-8") - - from nanobot.providers.base import LLMProvider - - class StubProvider(LLMProvider): - async def chat(self, **kwargs) -> LLMResponse: - return LLMResponse( - content="", - tool_calls=[ - ToolCallRequest( - id="hb_1", name="heartbeat", - arguments={"action": "run", "tasks": "check status"}, - ) - ], - ) - - def get_default_model(self) -> str: - return "test-model" - - notified: list[str] = [] - - async def _on_execute(tasks: str) -> str: - return "HEARTBEAT.md has active tasks listed. They are: Check Gmail." - - async def _on_notify(response: str) -> None: - notified.append(response) - - async def _eval_always_notify(*a, **kw): - return True - - monkeypatch.setattr("nanobot.utils.evaluator.evaluate_response", _eval_always_notify) - - service = HeartbeatService( - workspace=tmp_path, - provider=StubProvider(), - model="test-model", - on_execute=_on_execute, - on_notify=_on_notify, - ) - - await service._tick() - - assert notified == [], "Leaked reasoning should not reach the user" - - -@pytest.mark.asyncio -async def test_tick_delivers_normal_report(tmp_path, monkeypatch) -> None: - """Normal reports should pass through deliverability and evaluator.""" - (tmp_path / "HEARTBEAT.md").write_text("- [ ] check inbox", encoding="utf-8") - - from nanobot.providers.base import LLMProvider - - class StubProvider(LLMProvider): - async def chat(self, **kwargs) -> LLMResponse: - return LLMResponse( - content="", - tool_calls=[ - ToolCallRequest( - id="hb_1", name="heartbeat", - arguments={"action": "run", "tasks": "check inbox"}, - ) - ], - ) - - def get_default_model(self) -> str: - return "test-model" - - notified: list[str] = [] - - async def _on_execute(tasks: str) -> str: - return "3 new emails — client proposal from Zain, invoice, meeting reminder." - - async def _on_notify(response: str) -> None: - notified.append(response) - - async def _eval_always_notify(*a, **kw): - return True - - monkeypatch.setattr("nanobot.utils.evaluator.evaluate_response", _eval_always_notify) - - service = HeartbeatService( - workspace=tmp_path, - provider=StubProvider(), - model="test-model", - on_execute=_on_execute, - on_notify=_on_notify, - ) - - await service._tick() - - assert notified == ["3 new emails — client proposal from Zain, invoice, meeting reminder."]