refactor(heartbeat): migrate heartbeat service to cron-based auto-registration

Remove standalone nanobot/heartbeat/ service and replace it with an
auto-registered system cron job on gateway startup. Key behaviors preserved:

- HeartbeatConfig (enabled, interval_s, keep_recent_messages) remains in
  GatewayConfig for backward compatibility.
- On startup, if enabled, a system cron job "heartbeat" is registered with
  schedule derived from interval_s.
- HEARTBEAT.md is checked on each tick; empty/template-identical files skip
  to avoid wasting LLM calls.
- Post-run evaluate_response and session history truncation
  (keep_recent_messages) are retained.
- Delivery target selection, deliverable filtering, and preamble guidance
  are preserved.

Files removed:
- nanobot/heartbeat/__init__.py
- nanobot/heartbeat/service.py
- tests/heartbeat/*
- tests/agent/test_heartbeat_service.py

Templates and docs updated to reflect cron-based usage.
This commit is contained in:
chengyongru 2026-05-27 21:28:59 +08:00 committed by Xubin Ren
parent 7d09f1cd9e
commit fe2af64e04
19 changed files with 122 additions and 1048 deletions

View File

@ -31,10 +31,6 @@ Tool descriptions, skills, and replayed session history also shape model behavio
Anything written into memory, session history, or prompt inputs can be replayed into future LLM calls. Metadata such as timestamps, local media paths, tool-call echoes, and raw fallback dumps must be bounded and sanitized before they become examples for the model to imitate. Anything written into memory, session history, or prompt inputs can be replayed into future LLM calls. Metadata such as timestamps, local media paths, tool-call echoes, and raw fallback dumps must be bounded and sanitized before they become examples for the model to imitate.
## Heartbeat Virtual Tool Call
The heartbeat service (`heartbeat/service.py`) does not parse free-text LLM output. Instead, it injects a virtual `heartbeat` tool with `action: skip | run` into the conversation. Phase 1 is a structured decision; Phase 2 executes only on `run`. When adding new periodic background checks, follow this virtual-tool-call pattern rather than string matching.
## Skills as Extension Point ## Skills as Extension Point
Built-in skills live in `nanobot/skills/` (markdown + YAML frontmatter format). Agent capabilities that are "know-how" rather than code should be added as skills, not hardcoded into the agent loop. External skills can be published to and installed from ClawHub. Built-in skills live in `nanobot/skills/` (markdown + YAML frontmatter format). Agent capabilities that are "know-how" rather than code should be added as skills, not hardcoded into the agent loop. External skills can be published to and installed from ClawHub.

View File

@ -47,7 +47,7 @@ Messages flow through an async `MessageBus` (`nanobot/bus/queue.py`) that decoup
- **WebUI** (`webui/`): Vite-based React SPA that talks to the gateway over a WebSocket multiplex protocol. The dev server proxies `/api`, `/webui`, `/auth`, and WebSocket traffic to the gateway. - **WebUI** (`webui/`): Vite-based React SPA that talks to the gateway over a WebSocket multiplex protocol. The dev server proxies `/api`, `/webui`, `/auth`, and WebSocket traffic to the gateway.
- **API Server** (`nanobot/api/server.py`): OpenAI-compatible HTTP API (`/v1/chat/completions`, `/v1/models`) for programmatic access. - **API Server** (`nanobot/api/server.py`): OpenAI-compatible HTTP API (`/v1/chat/completions`, `/v1/models`) for programmatic access.
- **Command Router** (`nanobot/command/`): Slash command routing and built-in command handlers. - **Command Router** (`nanobot/command/`): Slash command routing and built-in command handlers.
- **Heartbeat** (`nanobot/heartbeat/`): Periodic agent wake-up service for scheduled task checking. - **Heartbeat** (`nanobot/templates/HEARTBEAT.md`): Periodic task list checked via `cron` jobs (legacy dedicated service removed).
- **Pairing** (`nanobot/pairing/`): DM sender approval store with persistent pairing codes per channel. - **Pairing** (`nanobot/pairing/`): DM sender approval store with persistent pairing codes per channel.
- **Skills** (`nanobot/skills/`): Built-in skill definitions (long-goal, cron, github, image-generation, etc.) loaded into agent context. - **Skills** (`nanobot/skills/`): Built-in skill definitions (long-goal, cron, github, image-generation, etc.) loaded into agent context.
- **Security** (`nanobot/security/`): PTH file guard and other security measures activated at CLI entry. - **Security** (`nanobot/security/`): PTH file guard and other security measures activated at CLI entry.

View File

@ -46,17 +46,15 @@ core_agent=$(count_top_level_py_lines "nanobot/agent")
core_bus=$(count_top_level_py_lines "nanobot/bus") core_bus=$(count_top_level_py_lines "nanobot/bus")
core_config=$(count_top_level_py_lines "nanobot/config") core_config=$(count_top_level_py_lines "nanobot/config")
core_cron=$(count_top_level_py_lines "nanobot/cron") core_cron=$(count_top_level_py_lines "nanobot/cron")
core_heartbeat=$(count_top_level_py_lines "nanobot/heartbeat")
core_session=$(count_top_level_py_lines "nanobot/session") core_session=$(count_top_level_py_lines "nanobot/session")
print_row "agent/" "$core_agent" print_row "agent/" "$core_agent"
print_row "bus/" "$core_bus" print_row "bus/" "$core_bus"
print_row "config/" "$core_config" print_row "config/" "$core_config"
print_row "cron/" "$core_cron" print_row "cron/" "$core_cron"
print_row "heartbeat/" "$core_heartbeat"
print_row "session/" "$core_session" print_row "session/" "$core_session"
core_total=$((core_agent + core_bus + core_config + core_cron + core_heartbeat + core_session)) core_total=$((core_agent + core_bus + core_config + core_cron + core_session))
echo "" echo ""
echo "Separate buckets" echo "Separate buckets"

View File

@ -1532,7 +1532,7 @@ By default, nanobot uses `UTC` for runtime time context. If you want the agent t
} }
``` ```
This affects runtime time strings shown to the model, such as runtime context and heartbeat prompts. It also becomes the default timezone for cron schedules when a cron expression omits `tz`, and for one-shot `at` times when the ISO datetime has no explicit offset. This affects runtime time strings shown to the model, such as runtime context. It also becomes the default timezone for cron schedules when a cron expression omits `tz`, and for one-shot `at` times when the ISO datetime has no explicit offset.
Common examples: `UTC`, `America/New_York`, `America/Los_Angeles`, `Europe/London`, `Europe/Berlin`, `Asia/Tokyo`, `Asia/Shanghai`, `Asia/Singapore`, `Australia/Sydney`. Common examples: `UTC`, `America/New_York`, `America/Los_Angeles`, `Europe/London`, `Europe/Berlin`, `Asia/Tokyo`, `Asia/Shanghai`, `Asia/Singapore`, `Australia/Sydney`.

View File

@ -3,8 +3,6 @@
import base64 import base64
import mimetypes import mimetypes
import platform import platform
from contextlib import suppress
from importlib.resources import files as pkg_files
from pathlib import Path from pathlib import Path
from typing import Any, Mapping, Sequence from typing import Any, Mapping, Sequence
@ -12,12 +10,13 @@ from nanobot.agent.memory import MemoryStore
from nanobot.agent.skills import SkillsLoader from nanobot.agent.skills import SkillsLoader
from nanobot.agent.tools import mcp as mcp_tools from nanobot.agent.tools import mcp as mcp_tools
from nanobot.agent.tools.registry import ToolRegistry from nanobot.agent.tools.registry import ToolRegistry
from nanobot.bus.events import InboundMessage
from nanobot.apps.cli import utils as cli_app_utils from nanobot.apps.cli import utils as cli_app_utils
from nanobot.bus.events import InboundMessage
from nanobot.session.goal_state import goal_state_runtime_lines from nanobot.session.goal_state import goal_state_runtime_lines
from nanobot.utils.helpers import ( from nanobot.utils.helpers import (
current_time_str, current_time_str,
detect_image_mime, detect_image_mime,
load_bundled_template,
truncate_text, truncate_text,
) )
from nanobot.utils.prompt_templates import render_template from nanobot.utils.prompt_templates import render_template
@ -168,10 +167,9 @@ class ContextBuilder:
@staticmethod @staticmethod
def _is_template_content(content: str, template_path: str) -> bool: def _is_template_content(content: str, template_path: str) -> bool:
"""Check if *content* is identical to the bundled template (user hasn't customized it).""" """Check if *content* is identical to the bundled template (user hasn't customized it)."""
with suppress(Exception): tpl = load_bundled_template(template_path)
tpl = pkg_files("nanobot") / "templates" / template_path if tpl is not None:
if tpl.is_file(): return content.strip() == tpl.strip()
return content.strip() == tpl.read_text(encoding="utf-8").strip()
return False return False
def build_messages( def build_messages(

View File

@ -1,6 +1,7 @@
"""CLI commands for nanobot.""" """CLI commands for nanobot."""
import asyncio import asyncio
import functools
import os import os
import select import select
import signal import signal
@ -75,6 +76,7 @@ class SafeFileHistory(FileHistory):
from nanobot.cli.stream import StreamRenderer, ThinkingSpinner from nanobot.cli.stream import StreamRenderer, ThinkingSpinner
from nanobot.config.paths import get_workspace_path, is_default_workspace from nanobot.config.paths import get_workspace_path, is_default_workspace
from nanobot.config.schema import Config from nanobot.config.schema import Config
from nanobot.utils.evaluator import evaluate_response
from nanobot.utils.helpers import sync_workspace_templates from nanobot.utils.helpers import sync_workspace_templates
from nanobot.utils.restart import ( from nanobot.utils.restart import (
consume_restart_notice_from_env, consume_restart_notice_from_env,
@ -94,6 +96,20 @@ EXIT_COMMANDS = {"exit", "quit", "/exit", "/quit", ":q"}
_REASONING_SENTENCE_ENDINGS = (".", "!", "?", "", "", "") _REASONING_SENTENCE_ENDINGS = (".", "!", "?", "", "", "")
_REASONING_FLUSH_CHARS = 60 _REASONING_FLUSH_CHARS = 60
_HEARTBEAT_PREAMBLE = (
"[Your response will be delivered directly to the user's messaging app. "
"Output ONLY the final user-facing message. Never reference internal "
"files (HEARTBEAT.md, AWARENESS.md, etc.), your instructions, or your "
"decision process. If nothing needs reporting, respond with just "
"'All clear.' and nothing else.]\n\n"
)
@functools.lru_cache(maxsize=None)
def _heartbeat_template() -> str | None:
from nanobot.utils.helpers import load_bundled_template
return load_bundled_template("HEARTBEAT.md")
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# CLI input: prompt_toolkit for editing, paste, history, and display # CLI input: prompt_toolkit for editing, paste, history, and display
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -718,7 +734,6 @@ def _run_gateway(
from nanobot.channels.websocket import publish_runtime_model_update from nanobot.channels.websocket import publish_runtime_model_update
from nanobot.cron.service import CronService from nanobot.cron.service import CronService
from nanobot.cron.types import CronJob from nanobot.cron.types import CronJob
from nanobot.heartbeat.service import HeartbeatService
from nanobot.providers.factory import build_provider_snapshot, load_provider_snapshot from nanobot.providers.factory import build_provider_snapshot, load_provider_snapshot
from nanobot.providers.image_generation import image_gen_provider_configs from nanobot.providers.image_generation import image_gen_provider_configs
from nanobot.session.manager import SessionManager from nanobot.session.manager import SessionManager
@ -810,6 +825,9 @@ def _run_gateway(
# Set cron callback (needs agent) # Set cron callback (needs agent)
async def on_cron_job(job: CronJob) -> str | None: async def on_cron_job(job: CronJob) -> str | None:
"""Execute a cron job through the agent.""" """Execute a cron job through the agent."""
async def _silent(*_args, **_kwargs):
pass
# Dream is an internal job — run directly, not through the agent loop. # Dream is an internal job — run directly, not through the agent loop.
if job.name == "dream": if job.name == "dream":
try: try:
@ -819,7 +837,56 @@ def _run_gateway(
logger.exception("Dream cron job failed") logger.exception("Dream cron job failed")
return None return None
from nanobot.utils.evaluator import evaluate_response # Heartbeat is a system job that checks HEARTBEAT.md for active tasks.
if job.name == "heartbeat":
heartbeat_file = config.workspace_path / "HEARTBEAT.md"
try:
content = heartbeat_file.read_text(encoding="utf-8")
except OSError:
logger.debug("Heartbeat: HEARTBEAT.md missing")
return None
if not content or content == _heartbeat_template():
logger.debug("Heartbeat: HEARTBEAT.md empty or identical to template")
return None
channel, chat_id = _pick_heartbeat_target()
if channel == "cli":
return None
prompt = (
_HEARTBEAT_PREAMBLE
+ f"Review the following HEARTBEAT.md and report any active tasks:\n\n{content}"
)
resp = await agent.process_direct(
prompt,
session_key="heartbeat",
channel=channel,
chat_id=chat_id,
on_progress=_silent,
)
response = resp.content if resp else ""
# Keep a small tail of heartbeat history so the loop stays bounded.
session = agent.sessions.get_or_create("heartbeat")
session.retain_recent_legal_suffix(hb_cfg.keep_recent_messages)
agent.sessions.save(session)
if not response:
return None
should_notify = await evaluate_response(
response, prompt, agent.provider, agent.model,
)
if should_notify:
logger.info("Heartbeat: completed, delivering response")
await _deliver_to_channel(
OutboundMessage(channel=channel, chat_id=chat_id, content=response),
record=True,
)
else:
logger.info("Heartbeat: silenced by post-run evaluation")
return response
reminder_note = ( reminder_note = (
"The scheduled time has arrived. Deliver this reminder to the user now, " "The scheduled time has arrived. Deliver this reminder to the user now, "
@ -834,9 +901,6 @@ def _run_gateway(
if isinstance(cron_tool, CronTool): if isinstance(cron_tool, CronTool):
cron_token = cron_tool.set_cron_context(True) cron_token = cron_tool.set_cron_context(True)
async def _silent(*_args, **_kwargs):
pass
message_record_token = None message_record_token = None
if isinstance(message_tool, MessageTool): if isinstance(message_tool, MessageTool):
message_record_token = message_tool.set_record_channel_delivery(True) message_record_token = message_tool.set_record_channel_delivery(True)
@ -898,7 +962,6 @@ def _run_gateway(
def _pick_heartbeat_target() -> tuple[str, str]: def _pick_heartbeat_target() -> tuple[str, str]:
"""Pick a routable channel/chat target for heartbeat-triggered messages.""" """Pick a routable channel/chat target for heartbeat-triggered messages."""
enabled = set(channels.enabled_channels) enabled = set(channels.enabled_channels)
# Prefer the most recently updated non-internal session on an enabled channel.
for item in session_manager.list_sessions(): for item in session_manager.list_sessions():
key = item.get("key") or "" key = item.get("key") or ""
if ":" not in key: if ":" not in key:
@ -908,70 +971,8 @@ def _run_gateway(
continue continue
if channel in enabled and chat_id: if channel in enabled and chat_id:
return channel, chat_id return channel, chat_id
# Fallback keeps prior behavior but remains explicit.
return "cli", "direct" return "cli", "direct"
# Create heartbeat service
heartbeat_preamble = (
"[Your response will be delivered directly to the user's messaging app. "
"Output ONLY the final user-facing message. Never reference internal "
"files (HEARTBEAT.md, AWARENESS.md, etc.), your instructions, or your "
"decision process. If nothing needs reporting, respond with just "
"'All clear.' and nothing else.]\n\n"
)
async def on_heartbeat_execute(tasks: str) -> str:
"""Phase 2: execute heartbeat tasks through the full agent loop."""
channel, chat_id = _pick_heartbeat_target()
async def _silent(*_args, **_kwargs):
pass
resp = await agent.process_direct(
heartbeat_preamble + tasks,
session_key="heartbeat",
channel=channel,
chat_id=chat_id,
on_progress=_silent,
)
# Keep a small tail of heartbeat history so the loop stays bounded
# without losing all short-term context between runs.
session = agent.sessions.get_or_create("heartbeat")
session.retain_recent_legal_suffix(hb_cfg.keep_recent_messages)
agent.sessions.save(session)
return resp.content if resp else ""
async def on_heartbeat_notify(response: str) -> None:
"""Deliver a heartbeat response to the user's channel.
In addition to publishing the outbound message, this injects the
delivered text as an assistant turn into the *target channel's*
session. Without this, a user reply on the channel (e.g. "Sure")
lands in a session that has no context about the heartbeat message
and the agent cannot follow through.
"""
channel, chat_id = _pick_heartbeat_target()
if channel == "cli":
return # No external channel available to deliver to
await _deliver_to_channel(
OutboundMessage(channel=channel, chat_id=chat_id, content=response),
record=True,
)
hb_cfg = config.gateway.heartbeat
heartbeat = HeartbeatService(
workspace=config.workspace_path,
llm_runtime=agent.llm_runtime,
on_execute=on_heartbeat_execute,
on_notify=on_heartbeat_notify,
interval_s=hb_cfg.interval_s,
enabled=hb_cfg.enabled,
timezone=config.agents.defaults.timezone,
)
if channels.enabled_channels: if channels.enabled_channels:
console.print(f"[green]✓[/green] Channels enabled: {', '.join(channels.enabled_channels)}") console.print(f"[green]✓[/green] Channels enabled: {', '.join(channels.enabled_channels)}")
else: else:
@ -981,7 +982,11 @@ def _run_gateway(
if cron_status["jobs"] > 0: if cron_status["jobs"] > 0:
console.print(f"[green]✓[/green] Cron: {cron_status['jobs']} scheduled jobs") console.print(f"[green]✓[/green] Cron: {cron_status['jobs']} scheduled jobs")
console.print(f"[green]✓[/green] Heartbeat: every {hb_cfg.interval_s}s") hb_cfg = config.gateway.heartbeat
if hb_cfg.enabled:
console.print(f"[green]✓[/green] Heartbeat: every {hb_cfg.interval_s}s")
else:
console.print("[yellow]✗[/yellow] Heartbeat: disabled")
async def _health_server(host: str, health_port: int): async def _health_server(host: str, health_port: int):
"""Lightweight HTTP health endpoint on the gateway port.""" """Lightweight HTTP health endpoint on the gateway port."""
@ -1032,7 +1037,7 @@ def _run_gateway(
agent.dream.max_batch_size = dream_cfg.max_batch_size agent.dream.max_batch_size = dream_cfg.max_batch_size
agent.dream.max_iterations = dream_cfg.max_iterations agent.dream.max_iterations = dream_cfg.max_iterations
agent.dream.annotate_line_ages = dream_cfg.annotate_line_ages agent.dream.annotate_line_ages = dream_cfg.annotate_line_ages
from nanobot.cron.types import CronJob, CronPayload from nanobot.cron.types import CronJob, CronPayload, CronSchedule
cron.register_system_job(CronJob( cron.register_system_job(CronJob(
id="dream", id="dream",
name="dream", name="dream",
@ -1041,6 +1046,19 @@ def _run_gateway(
)) ))
console.print(f"[green]✓[/green] Dream: {dream_cfg.describe_schedule()}") console.print(f"[green]✓[/green] Dream: {dream_cfg.describe_schedule()}")
# Register Heartbeat system job (idempotent on restart)
if hb_cfg.enabled:
cron.register_system_job(CronJob(
id="heartbeat",
name="heartbeat",
schedule=CronSchedule(
kind="every",
every_ms=hb_cfg.interval_s * 1000,
tz=config.agents.defaults.timezone,
),
payload=CronPayload(kind="system_event"),
))
async def _open_browser_when_ready() -> None: async def _open_browser_when_ready() -> None:
"""Wait for the gateway to bind, then point the user's browser at the webui.""" """Wait for the gateway to bind, then point the user's browser at the webui."""
if not open_browser_url: if not open_browser_url:
@ -1067,7 +1085,6 @@ def _run_gateway(
async def run(): async def run():
try: try:
await cron.start() await cron.start()
await heartbeat.start()
tasks = [ tasks = [
agent.run(), agent.run(),
channels.start_all(), channels.start_all(),
@ -1085,7 +1102,6 @@ def _run_gateway(
console.print(traceback.format_exc()) console.print(traceback.format_exc())
finally: finally:
await agent.close_mcp() await agent.close_mcp()
heartbeat.stop()
cron.stop() cron.stop()
agent.stop() agent.stop()
await channels.stop_all() await channels.stop_all()

View File

@ -1155,7 +1155,7 @@ _SETTINGS_SECTIONS: dict[str, tuple[str, str, set[str] | None]] = {
"Agent Settings": ("Agent Defaults", "Configure default model, temperature, and behavior", None), "Agent Settings": ("Agent Defaults", "Configure default model, temperature, and behavior", None),
"Channel Common": ("Channel Common", "Configure cross-channel behavior: progress, tool hints, retries", None), "Channel Common": ("Channel Common", "Configure cross-channel behavior: progress, tool hints, retries", None),
"API Server": ("API Server", "Configure OpenAI-compatible API endpoint", None), "API Server": ("API Server", "Configure OpenAI-compatible API endpoint", None),
"Gateway": ("Gateway Settings", "Configure server host, port, and heartbeat", None), "Gateway": ("Gateway Settings", "Configure server host, port", None),
"Tools": ("Tools Settings", "Configure web search, shell exec, and other tools", {"mcp_servers"}), "Tools": ("Tools Settings", "Configure web search, shell exec, and other tools", {"mcp_servers"}),
} }

View File

@ -236,7 +236,7 @@ class ProvidersConfig(Base):
class HeartbeatConfig(Base): class HeartbeatConfig(Base):
"""Heartbeat service configuration.""" """Heartbeat service configuration (now backed by cron)."""
enabled: bool = True enabled: bool = True
interval_s: int = 30 * 60 # 30 minutes interval_s: int = 30 * 60 # 30 minutes

View File

@ -1,5 +0,0 @@
"""Heartbeat service for periodic agent wake-ups."""
from nanobot.heartbeat.service import HeartbeatService
__all__ = ["HeartbeatService"]

View File

@ -1,243 +0,0 @@
"""Heartbeat service - periodic agent wake-up to check for tasks."""
from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Any, Callable, Coroutine
from loguru import logger
from nanobot.providers.base import LLMProvider
from nanobot.utils.llm_runtime import LLMRuntimeResolver, static_llm_runtime
_HEARTBEAT_TOOL = [
{
"type": "function",
"function": {
"name": "heartbeat",
"description": "Report heartbeat decision after reviewing tasks.",
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["skip", "run"],
"description": "skip = nothing to do, run = has active tasks",
},
"tasks": {
"type": "string",
"description": "Natural-language summary of active tasks (required for run)",
},
},
"required": ["action"],
},
},
}
]
class HeartbeatService:
"""
Periodic heartbeat service that wakes the agent to check for tasks.
Phase 1 (decision): reads HEARTBEAT.md and asks the LLM via a virtual
tool call whether there are active tasks. This avoids free-text parsing
and the unreliable HEARTBEAT_OK token.
Phase 2 (execution): only triggered when Phase 1 returns ``run``. The
``on_execute`` callback runs the task through the full agent loop and
returns the result to deliver.
"""
def __init__(
self,
workspace: Path,
provider: LLMProvider | None = None,
model: str | None = None,
on_execute: Callable[[str], Coroutine[Any, Any, str]] | None = None,
on_notify: Callable[[str], Coroutine[Any, Any, None]] | None = None,
interval_s: int = 30 * 60,
enabled: bool = True,
timezone: str | None = None,
llm_runtime: LLMRuntimeResolver | None = None,
):
self.workspace = workspace
if llm_runtime is None:
if provider is None or model is None:
raise ValueError("HeartbeatService requires either llm_runtime or provider/model")
llm_runtime = static_llm_runtime(provider, model)
self._llm_runtime = llm_runtime
self.on_execute = on_execute
self.on_notify = on_notify
self.interval_s = interval_s
self.enabled = enabled
self.timezone = timezone
self._running = False
self._task: asyncio.Task | None = None
@property
def heartbeat_file(self) -> Path:
return self.workspace / "HEARTBEAT.md"
def _read_heartbeat_file(self) -> str | None:
if self.heartbeat_file.exists():
try:
return self.heartbeat_file.read_text(encoding="utf-8")
except Exception:
return None
return None
async def _decide(self, content: str) -> tuple[str, str]:
"""Phase 1: ask LLM to decide skip/run via virtual tool call.
Returns (action, tasks) where action is 'skip' or 'run'.
"""
from nanobot.utils.helpers import current_time_str
llm = self._llm_runtime()
response = await llm.provider.chat_with_retry(
messages=[
{"role": "system", "content": "You are a heartbeat agent. Call the heartbeat tool to report your decision."},
{"role": "user", "content": (
f"Current Time: {current_time_str(self.timezone)}\n\n"
"Review the following HEARTBEAT.md and decide whether there are active tasks.\n\n"
f"{content}"
)},
],
tools=_HEARTBEAT_TOOL,
model=llm.model,
)
if not response.should_execute_tools:
if response.has_tool_calls:
logger.warning(
"Ignoring heartbeat tool calls under finish_reason='{}'",
response.finish_reason,
)
return "skip", ""
args = response.tool_calls[0].arguments
return args.get("action", "skip"), args.get("tasks", "")
async def start(self) -> None:
"""Start the heartbeat service."""
if not self.enabled:
logger.info("Heartbeat disabled")
return
if self._running:
logger.warning("Heartbeat already running")
return
self._running = True
self._task = asyncio.create_task(self._run_loop())
logger.info("Heartbeat started (every {}s)", self.interval_s)
def stop(self) -> None:
"""Stop the heartbeat service."""
self._running = False
if self._task:
self._task.cancel()
self._task = None
async def _run_loop(self) -> None:
"""Main heartbeat loop."""
while self._running:
try:
await asyncio.sleep(self.interval_s)
if self._running:
await self._tick()
except asyncio.CancelledError:
break
except Exception:
logger.exception("Heartbeat error")
@staticmethod
def _is_deliverable(response: str) -> bool:
"""Check if a heartbeat response is suitable for user delivery.
Filters out two classes of bad output before the evaluator runs:
1. **Finalization fallback** the runner hit empty-response retries
and produced a canned error message. For heartbeat, empty output
is a valid "nothing to report" outcome, not a failure.
2. **Leaked reasoning** the model reflected internal file names,
decision logic, or meta-commentary instead of a user-facing report.
"""
text = response.lower()
# Runner finalization fallback
if "couldn't produce a final answer" in text:
return False
# Leaked internal reasoning patterns
leaked_patterns = [
"heartbeat.md",
"awareness.md",
"judgment call:",
"decision logic",
"valid options are",
"my instructions",
"i am supposed to",
"strict heartbeat interpretation",
]
if any(pattern in text for pattern in leaked_patterns):
return False
return True
async def _tick(self) -> None:
"""Execute a single heartbeat tick."""
from nanobot.utils.evaluator import evaluate_response
content = self._read_heartbeat_file()
if not content:
logger.debug("Heartbeat: HEARTBEAT.md missing or empty")
return
logger.info("Heartbeat: checking for tasks...")
try:
action, tasks = await self._decide(content)
if action != "run":
logger.info("Heartbeat: OK (nothing to report)")
return
logger.info("Heartbeat: tasks found, executing...")
if self.on_execute:
response = await self.on_execute(tasks)
if not response:
logger.info("Heartbeat: no response from execution")
return
if not self._is_deliverable(response):
logger.info(
"Heartbeat: suppressed non-deliverable response ({})",
response[:80],
)
return
llm = self._llm_runtime()
should_notify = await evaluate_response(
response, tasks, llm.provider, llm.model,
)
if should_notify and self.on_notify:
logger.info("Heartbeat: completed, delivering response")
await self.on_notify(response)
else:
logger.info("Heartbeat: silenced by post-run evaluation")
except Exception:
logger.exception("Heartbeat execution failed")
async def trigger_now(self) -> str | None:
"""Manually trigger a heartbeat."""
content = self._read_heartbeat_file()
if not content:
return None
action, tasks = await self._decide(content)
if action != "run" or not self.on_execute:
return None
return await self.on_execute(tasks)

View File

@ -14,10 +14,10 @@ Get USER_ID and CHANNEL from the current session (e.g., `8281248569` and `telegr
## Heartbeat Tasks ## Heartbeat Tasks
`HEARTBEAT.md` is checked on the configured heartbeat interval. Use file tools to manage periodic tasks. `HEARTBEAT.md` is checked periodically when registered as a cron job. Use the built-in `cron` tool to schedule it (e.g. `cron add --name heartbeat --schedule "every 30m" --message "Check HEARTBEAT.md"`).
- Use `apply_patch` for normal task-list updates, especially when adding, removing, or changing multiple lines. - Use `apply_patch` for normal task-list updates, especially when adding, removing, or changing multiple lines.
- Use `edit_file` only for small exact replacements copied from the current `HEARTBEAT.md`. - Use `edit_file` only for small exact replacements copied from the current `HEARTBEAT.md`.
- Use `write_file` for first creation or intentional full-file rewrites. - Use `write_file` for first creation or intentional full-file rewrites.
When the user asks for a recurring/periodic task, update `HEARTBEAT.md` instead of creating a one-time cron reminder. When the user asks for a recurring/periodic task, update `HEARTBEAT.md` and register it via `cron` instead of creating a one-time reminder.

View File

@ -1,9 +1,9 @@
# Heartbeat Tasks # Heartbeat Tasks
This file is checked every 30 minutes by your nanobot agent. This file is checked periodically by your nanobot agent.
Add tasks below that you want the agent to work on periodically. Register it as a cron job (e.g. `cron add --name heartbeat --schedule "every 30m" --message "Check HEARTBEAT.md"`) to get the same behavior as the legacy heartbeat service.
If this file has no tasks (only headers and comments), the agent will skip the heartbeat. If this file has no tasks (only headers and comments), the agent will skip it.
## Active Tasks ## Active Tasks

View File

@ -63,5 +63,5 @@ documents the general tool contract and non-obvious usage patterns.
## Scheduling and Background Work ## Scheduling and Background Work
- Use `cron` for scheduled reminders or recurring jobs; do not run `nanobot cron` through `exec`. - Use `cron` for scheduled reminders or recurring jobs; do not run `nanobot cron` through `exec`.
- For heartbeat tasks, update `HEARTBEAT.md` according to the agent instructions. - For heartbeat tasks, register `HEARTBEAT.md` as a cron job according to the agent instructions.
- Do not write reminders only to memory files when the user expects an actual notification. - Do not write reminders only to memory files when the user expects an actual notification.

View File

@ -626,3 +626,14 @@ def sync_workspace_templates(workspace: Path, silent: bool = False) -> list[str]
logger.exception("Failed to initialize git store for {}", workspace) logger.exception("Failed to initialize git store for {}", workspace)
return added return added
def load_bundled_template(template_name: str) -> str | None:
"""Read a bundled template file from the nanobot package."""
from importlib.resources import files as pkg_files
with suppress(Exception):
tpl = pkg_files("nanobot") / "templates" / template_name
if tpl.is_file():
return tpl.read_text(encoding="utf-8")
return None

View File

@ -1,336 +0,0 @@
import asyncio
import pytest
from nanobot.heartbeat.service import HeartbeatService
from nanobot.providers.base import LLMProvider, LLMResponse, ToolCallRequest
from nanobot.utils.llm_runtime import LLMRuntime
class DummyProvider(LLMProvider):
def __init__(self, responses: list[LLMResponse]):
super().__init__()
self._responses = list(responses)
self.calls = 0
self.models: list[str | None] = []
async def chat(self, *args, **kwargs) -> LLMResponse:
self.calls += 1
self.models.append(kwargs.get("model"))
if self._responses:
return self._responses.pop(0)
return LLMResponse(content="", tool_calls=[])
def get_default_model(self) -> str:
return "test-model"
@pytest.mark.asyncio
async def test_start_is_idempotent(tmp_path) -> None:
provider = DummyProvider([])
service = HeartbeatService(
workspace=tmp_path,
provider=provider,
model="openai/gpt-4o-mini",
interval_s=9999,
enabled=True,
)
await service.start()
first_task = service._task
await service.start()
assert service._task is first_task
service.stop()
await asyncio.sleep(0)
@pytest.mark.asyncio
async def test_decide_returns_skip_when_no_tool_call(tmp_path) -> None:
provider = DummyProvider([LLMResponse(content="no tool call", tool_calls=[])])
service = HeartbeatService(
workspace=tmp_path,
provider=provider,
model="openai/gpt-4o-mini",
)
action, tasks = await service._decide("heartbeat content")
assert action == "skip"
assert tasks == ""
@pytest.mark.asyncio
async def test_trigger_now_executes_when_decision_is_run(tmp_path) -> None:
(tmp_path / "HEARTBEAT.md").write_text("- [ ] do thing", encoding="utf-8")
provider = DummyProvider([
LLMResponse(
content="",
tool_calls=[
ToolCallRequest(
id="hb_1",
name="heartbeat",
arguments={"action": "run", "tasks": "check open tasks"},
)
],
)
])
called_with: list[str] = []
async def _on_execute(tasks: str) -> str:
called_with.append(tasks)
return "done"
service = HeartbeatService(
workspace=tmp_path,
provider=provider,
model="openai/gpt-4o-mini",
on_execute=_on_execute,
)
result = await service.trigger_now()
assert result == "done"
assert called_with == ["check open tasks"]
@pytest.mark.asyncio
async def test_trigger_now_returns_none_when_decision_is_skip(tmp_path) -> None:
(tmp_path / "HEARTBEAT.md").write_text("- [ ] do thing", encoding="utf-8")
provider = DummyProvider([
LLMResponse(
content="",
tool_calls=[
ToolCallRequest(
id="hb_1",
name="heartbeat",
arguments={"action": "skip"},
)
],
)
])
async def _on_execute(tasks: str) -> str:
return tasks
service = HeartbeatService(
workspace=tmp_path,
provider=provider,
model="openai/gpt-4o-mini",
on_execute=_on_execute,
)
assert await service.trigger_now() is None
@pytest.mark.asyncio
async def test_tick_notifies_when_evaluator_says_yes(tmp_path, monkeypatch) -> None:
"""Phase 1 run -> Phase 2 execute -> Phase 3 evaluate=notify -> on_notify called."""
(tmp_path / "HEARTBEAT.md").write_text("- [ ] check deployments", encoding="utf-8")
provider = DummyProvider([
LLMResponse(
content="",
tool_calls=[
ToolCallRequest(
id="hb_1",
name="heartbeat",
arguments={"action": "run", "tasks": "check deployments"},
)
],
),
])
executed: list[str] = []
notified: list[str] = []
async def _on_execute(tasks: str) -> str:
executed.append(tasks)
return "deployment failed on staging"
async def _on_notify(response: str) -> None:
notified.append(response)
service = HeartbeatService(
workspace=tmp_path,
provider=provider,
model="openai/gpt-4o-mini",
on_execute=_on_execute,
on_notify=_on_notify,
)
async def _eval_notify(*a, **kw):
return True
monkeypatch.setattr("nanobot.utils.evaluator.evaluate_response", _eval_notify)
await service._tick()
assert executed == ["check deployments"]
assert notified == ["deployment failed on staging"]
@pytest.mark.asyncio
async def test_tick_suppresses_when_evaluator_says_no(tmp_path, monkeypatch) -> None:
"""Phase 1 run -> Phase 2 execute -> Phase 3 evaluate=silent -> on_notify NOT called."""
(tmp_path / "HEARTBEAT.md").write_text("- [ ] check status", encoding="utf-8")
provider = DummyProvider([
LLMResponse(
content="",
tool_calls=[
ToolCallRequest(
id="hb_1",
name="heartbeat",
arguments={"action": "run", "tasks": "check status"},
)
],
),
])
executed: list[str] = []
notified: list[str] = []
async def _on_execute(tasks: str) -> str:
executed.append(tasks)
return "everything is fine, no issues"
async def _on_notify(response: str) -> None:
notified.append(response)
service = HeartbeatService(
workspace=tmp_path,
provider=provider,
model="openai/gpt-4o-mini",
on_execute=_on_execute,
on_notify=_on_notify,
)
async def _eval_silent(*a, **kw):
return False
monkeypatch.setattr("nanobot.utils.evaluator.evaluate_response", _eval_silent)
await service._tick()
assert executed == ["check status"]
assert notified == []
def test_tick_uses_runtime_provider_and_model(tmp_path, monkeypatch) -> None:
"""Preset changes must apply to heartbeat decision and post-run evaluation."""
(tmp_path / "HEARTBEAT.md").write_text("- [ ] check runtime model", encoding="utf-8")
runtime_provider = DummyProvider([
LLMResponse(
content="",
tool_calls=[
ToolCallRequest(
id="hb_1",
name="heartbeat",
arguments={"action": "run", "tasks": "check runtime model"},
)
],
),
])
runtime_model = "openai/gpt-4.1"
executed: list[str] = []
evaluated: list[tuple[LLMProvider, str]] = []
async def _on_execute(tasks: str) -> str:
executed.append(tasks)
return "runtime model produced a user-facing update"
async def _eval_capture(response, tasks, provider, model):
evaluated.append((provider, model))
return False
service = HeartbeatService(
workspace=tmp_path,
llm_runtime=lambda: LLMRuntime(runtime_provider, runtime_model),
on_execute=_on_execute,
)
monkeypatch.setattr("nanobot.utils.evaluator.evaluate_response", _eval_capture)
asyncio.run(service._tick())
assert runtime_provider.calls == 1
assert runtime_provider.models == [runtime_model]
assert executed == ["check runtime model"]
assert evaluated == [(runtime_provider, runtime_model)]
@pytest.mark.asyncio
async def test_decide_retries_transient_error_then_succeeds(tmp_path, monkeypatch) -> None:
provider = DummyProvider([
LLMResponse(content="429 rate limit", finish_reason="error"),
LLMResponse(
content="",
tool_calls=[
ToolCallRequest(
id="hb_1",
name="heartbeat",
arguments={"action": "run", "tasks": "check open tasks"},
)
],
),
])
delays: list[int] = []
async def _fake_sleep(delay: int) -> None:
delays.append(delay)
monkeypatch.setattr(asyncio, "sleep", _fake_sleep)
service = HeartbeatService(
workspace=tmp_path,
provider=provider,
model="openai/gpt-4o-mini",
)
action, tasks = await service._decide("heartbeat content")
assert action == "run"
assert tasks == "check open tasks"
assert provider.calls == 2
assert delays == [1]
@pytest.mark.asyncio
async def test_decide_prompt_includes_current_time(tmp_path) -> None:
"""Phase 1 user prompt must contain current time so the LLM can judge task urgency."""
captured_messages: list[dict] = []
class CapturingProvider(LLMProvider):
async def chat(self, *, messages=None, **kwargs) -> LLMResponse:
if messages:
captured_messages.extend(messages)
return LLMResponse(
content="",
tool_calls=[
ToolCallRequest(
id="hb_1", name="heartbeat",
arguments={"action": "skip"},
)
],
)
def get_default_model(self) -> str:
return "test-model"
service = HeartbeatService(
workspace=tmp_path,
provider=CapturingProvider(),
model="test-model",
)
await service._decide("- [ ] check servers at 10:00 UTC")
user_msg = captured_messages[1]
assert user_msg["role"] == "user"
assert "Current Time:" in user_msg["content"]

View File

@ -602,17 +602,17 @@ async def test_process_message_uses_explicit_session_metadata_for_goal_context(
chat_session = loop.sessions.get_or_create("websocket:chat-with-goal") chat_session = loop.sessions.get_or_create("websocket:chat-with-goal")
chat_session.metadata[GOAL_STATE_KEY] = { chat_session.metadata[GOAL_STATE_KEY] = {
"status": "active", "status": "active",
"objective": "This chat goal must not leak into heartbeat.", "objective": "This chat goal must not leak into system.",
} }
loop.sessions.save(chat_session) loop.sessions.save(chat_session)
system_session = loop.sessions.get_or_create("heartbeat") system_session = loop.sessions.get_or_create("system")
system_session.metadata = {} system_session.metadata = {}
loop.sessions.save(system_session) loop.sessions.save(system_session)
loop.context.build_messages = MagicMock( # type: ignore[method-assign] loop.context.build_messages = MagicMock( # type: ignore[method-assign]
return_value=[ return_value=[
{"role": "system", "content": "system"}, {"role": "system", "content": "system"},
{"role": "user", "content": "runtime + heartbeat"}, {"role": "user", "content": "runtime + system"},
] ]
) )
loop._run_agent_loop = AsyncMock(return_value=( # type: ignore[method-assign] loop._run_agent_loop = AsyncMock(return_value=( # type: ignore[method-assign]
@ -620,7 +620,7 @@ async def test_process_message_uses_explicit_session_metadata_for_goal_context(
[], [],
[ [
{"role": "system", "content": "system"}, {"role": "system", "content": "system"},
{"role": "user", "content": "runtime + heartbeat"}, {"role": "user", "content": "runtime + system"},
{"role": "assistant", "content": "ok"}, {"role": "assistant", "content": "ok"},
], ],
"stop", "stop",
@ -630,11 +630,11 @@ async def test_process_message_uses_explicit_session_metadata_for_goal_context(
result = await loop._process_message( result = await loop._process_message(
InboundMessage( InboundMessage(
channel="websocket", channel="websocket",
sender_id="heartbeat", sender_id="system",
chat_id="chat-with-goal", chat_id="chat-with-goal",
content="heartbeat work", content="system work",
), ),
session_key="heartbeat", session_key="system",
) )
assert result is not None assert result is not None

View File

@ -1589,16 +1589,6 @@ def test_gateway_health_endpoint_binds_and_serves_expected_responses(
def register_system_job(self, _job) -> None: def register_system_job(self, _job) -> None:
return None return None
class _FakeHeartbeatService:
def __init__(self, **_kwargs) -> None:
return None
async def start(self) -> None:
return None
def stop(self) -> None:
return None
class _FakeServer: class _FakeServer:
async def __aenter__(self): async def __aenter__(self):
return self return self
@ -1645,7 +1635,6 @@ def test_gateway_health_endpoint_binds_and_serves_expected_responses(
monkeypatch.setattr("nanobot.cli.commands.AgentLoop", _FakeAgentLoop) monkeypatch.setattr("nanobot.cli.commands.AgentLoop", _FakeAgentLoop)
monkeypatch.setattr("nanobot.channels.manager.ChannelManager", _FakeChannelManager) monkeypatch.setattr("nanobot.channels.manager.ChannelManager", _FakeChannelManager)
monkeypatch.setattr("nanobot.cron.service.CronService", _FakeCronService) monkeypatch.setattr("nanobot.cron.service.CronService", _FakeCronService)
monkeypatch.setattr("nanobot.heartbeat.service.HeartbeatService", _FakeHeartbeatService)
monkeypatch.setattr("asyncio.start_server", _fake_start_server) monkeypatch.setattr("asyncio.start_server", _fake_start_server)
result = runner.invoke(app, ["gateway", "--config", str(config_file)]) result = runner.invoke(app, ["gateway", "--config", str(config_file)])

View File

@ -1,120 +0,0 @@
"""Tests for heartbeat context bridge — injecting delivered messages into channel session."""
from nanobot.session.manager import SessionManager
class TestHeartbeatContextBridge:
"""Verify that on_heartbeat_notify injects the assistant message into the
channel session so user replies have conversational context."""
def test_notify_injects_into_channel_session(self, tmp_path):
"""After notify, the target channel session should contain the
heartbeat response as an assistant turn."""
session_mgr = SessionManager(tmp_path / "sessions")
target_key = "telegram:12345"
# Simulate: session exists with one user message
target_session = session_mgr.get_or_create(target_key)
target_session.add_message("user", "hello earlier")
session_mgr.save(target_session)
# Simulate what on_heartbeat_notify does
target_session = session_mgr.get_or_create(target_key)
target_session.add_message(
"assistant",
"3 new emails — invoice, meeting, proposal.",
_channel_delivery=True,
)
session_mgr.save(target_session)
# Reload and verify
reloaded = session_mgr.get_or_create(target_key)
messages = reloaded.get_history(max_messages=0)
roles = [m["role"] for m in messages]
assert roles == ["user", "assistant"]
assert "3 new emails" in messages[-1]["content"]
def test_reply_after_injection_has_context(self, tmp_path):
"""Simulates the full flow: prior conversation exists, heartbeat
injects, then user replies. The session should have the heartbeat
message visible in get_history so the model sees the context."""
session_mgr = SessionManager(tmp_path / "sessions")
target_key = "telegram:12345"
# Pre-existing conversation (user has chatted before)
session = session_mgr.get_or_create(target_key)
session.add_message("user", "Hey")
session.add_message("assistant", "Hi there!")
session_mgr.save(session)
# Step 1: heartbeat injects assistant message
session = session_mgr.get_or_create(target_key)
session.add_message(
"assistant",
"If you want, I can mark that email as read.",
_channel_delivery=True,
)
session_mgr.save(session)
# Step 2: user replies "Sure"
session = session_mgr.get_or_create(target_key)
session.add_message("user", "Sure")
session_mgr.save(session)
# Verify: get_history includes the heartbeat injection
reloaded = session_mgr.get_or_create(target_key)
history = reloaded.get_history(max_messages=0)
roles = [m["role"] for m in history]
assert roles == ["user", "assistant", "assistant", "user"]
assert "mark that email" in history[2]["content"]
assert history[3]["content"] == "Sure"
def test_injection_does_not_duplicate_on_existing_history(self, tmp_path):
"""If the channel session already has messages, the injection
appends cleanly without corruption."""
session_mgr = SessionManager(tmp_path / "sessions")
target_key = "telegram:12345"
# Pre-existing conversation
session = session_mgr.get_or_create(target_key)
session.add_message("user", "What time is it?")
session.add_message("assistant", "It's 2pm.")
session.add_message("user", "Thanks")
session_mgr.save(session)
# Heartbeat injects
session = session_mgr.get_or_create(target_key)
session.add_message(
"assistant",
"You have a meeting in 30 minutes.",
_channel_delivery=True,
)
session_mgr.save(session)
# Verify
reloaded = session_mgr.get_or_create(target_key)
history = reloaded.get_history(max_messages=0)
roles = [m["role"] for m in history]
assert roles == ["user", "assistant", "user", "assistant"]
assert "meeting in 30 minutes" in history[-1]["content"]
def test_reply_after_injection_to_empty_session_keeps_context(self, tmp_path):
"""A user replying to the first delivered message still sees that context."""
session_mgr = SessionManager(tmp_path / "sessions")
target_key = "telegram:99999"
session = session_mgr.get_or_create(target_key)
session.add_message(
"assistant",
"Weather alert: sandstorm expected at 4pm.",
_channel_delivery=True,
)
session.add_message("user", "Sure")
session_mgr.save(session)
reloaded = session_mgr.get_or_create(target_key)
history = reloaded.get_history(max_messages=0)
assert len(history) == 2
assert history[0]["role"] == "assistant"
assert "sandstorm" in history[0]["content"]
assert history[1] == {"role": "user", "content": "Sure"}

View File

@ -1,230 +0,0 @@
"""Tests for HeartbeatService._is_deliverable and _tick suppression."""
import pytest
from nanobot.heartbeat.service import HeartbeatService
from nanobot.providers.base import LLMResponse, ToolCallRequest
# ---------------------------------------------------------------------------
# _is_deliverable unit tests
# ---------------------------------------------------------------------------
class TestIsDeliverable:
"""Verify the pre-evaluator deliverability filter."""
def test_normal_report_is_deliverable(self):
assert HeartbeatService._is_deliverable(
"2 new emails — invoice from Zain, meeting rescheduled to 3pm."
)
def test_short_dismissal_is_deliverable(self):
assert HeartbeatService._is_deliverable("All clear.")
def test_finalization_fallback_blocked(self):
assert not HeartbeatService._is_deliverable(
"I completed the tool steps but couldn't produce a final answer. "
"Please try again or narrow the task."
)
def test_leaked_heartbeat_md_reference_blocked(self):
assert not HeartbeatService._is_deliverable(
"Yes — HEARTBEAT.md has active tasks listed. They are: "
"Check Gmail for important messages, Check Calendar."
)
def test_leaked_awareness_md_reference_blocked(self):
assert not HeartbeatService._is_deliverable(
"I reviewed AWARENESS.md and found no new signals."
)
def test_leaked_judgment_call_blocked(self):
assert not HeartbeatService._is_deliverable(
"Best judgment call: stay quiet."
)
def test_leaked_decision_logic_blocked(self):
assert not HeartbeatService._is_deliverable(
"Strict HEARTBEAT interpretation. Decision logic says SHORT UPDATE."
)
def test_leaked_valid_options_blocked(self):
assert not HeartbeatService._is_deliverable(
"The valid options are FULL REPORT, SHORT UPDATE, or SILENT."
)
def test_leaked_my_instructions_blocked(self):
assert not HeartbeatService._is_deliverable(
"My instructions say to check Gmail and Calendar."
)
def test_leaked_supposed_to_blocked(self):
assert not HeartbeatService._is_deliverable(
"I am supposed to scan for urgent emails."
)
def test_case_insensitive(self):
assert not HeartbeatService._is_deliverable(
"HEARTBEAT.MD has tasks listed."
)
def test_empty_string_is_deliverable(self):
"""Empty string won't reach _is_deliverable in practice (caught earlier),
but should not crash."""
assert HeartbeatService._is_deliverable("")
# ---------------------------------------------------------------------------
# _tick integration: non-deliverable responses never reach evaluator/notify
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_tick_suppresses_finalization_fallback(tmp_path, monkeypatch) -> None:
"""Finalization fallback should be caught before the evaluator runs."""
(tmp_path / "HEARTBEAT.md").write_text("- [ ] check inbox", encoding="utf-8")
from nanobot.providers.base import LLMProvider
class StubProvider(LLMProvider):
async def chat(self, **kwargs) -> LLMResponse:
return LLMResponse(
content="",
tool_calls=[
ToolCallRequest(
id="hb_1", name="heartbeat",
arguments={"action": "run", "tasks": "check inbox"},
)
],
)
def get_default_model(self) -> str:
return "test-model"
notified: list[str] = []
evaluator_called = False
async def _on_execute(tasks: str) -> str:
return (
"I completed the tool steps but couldn't produce a final answer. "
"Please try again or narrow the task."
)
async def _on_notify(response: str) -> None:
notified.append(response)
async def _eval_always_notify(*a, **kw):
nonlocal evaluator_called
evaluator_called = True
return True
monkeypatch.setattr("nanobot.utils.evaluator.evaluate_response", _eval_always_notify)
service = HeartbeatService(
workspace=tmp_path,
provider=StubProvider(),
model="test-model",
on_execute=_on_execute,
on_notify=_on_notify,
)
await service._tick()
assert notified == [], "Finalization fallback should not reach the user"
assert not evaluator_called, "Evaluator should not be called for non-deliverable responses"
@pytest.mark.asyncio
async def test_tick_suppresses_leaked_reasoning(tmp_path, monkeypatch) -> None:
"""Leaked internal reasoning should be caught before the evaluator runs."""
(tmp_path / "HEARTBEAT.md").write_text("- [ ] check status", encoding="utf-8")
from nanobot.providers.base import LLMProvider
class StubProvider(LLMProvider):
async def chat(self, **kwargs) -> LLMResponse:
return LLMResponse(
content="",
tool_calls=[
ToolCallRequest(
id="hb_1", name="heartbeat",
arguments={"action": "run", "tasks": "check status"},
)
],
)
def get_default_model(self) -> str:
return "test-model"
notified: list[str] = []
async def _on_execute(tasks: str) -> str:
return "HEARTBEAT.md has active tasks listed. They are: Check Gmail."
async def _on_notify(response: str) -> None:
notified.append(response)
async def _eval_always_notify(*a, **kw):
return True
monkeypatch.setattr("nanobot.utils.evaluator.evaluate_response", _eval_always_notify)
service = HeartbeatService(
workspace=tmp_path,
provider=StubProvider(),
model="test-model",
on_execute=_on_execute,
on_notify=_on_notify,
)
await service._tick()
assert notified == [], "Leaked reasoning should not reach the user"
@pytest.mark.asyncio
async def test_tick_delivers_normal_report(tmp_path, monkeypatch) -> None:
"""Normal reports should pass through deliverability and evaluator."""
(tmp_path / "HEARTBEAT.md").write_text("- [ ] check inbox", encoding="utf-8")
from nanobot.providers.base import LLMProvider
class StubProvider(LLMProvider):
async def chat(self, **kwargs) -> LLMResponse:
return LLMResponse(
content="",
tool_calls=[
ToolCallRequest(
id="hb_1", name="heartbeat",
arguments={"action": "run", "tasks": "check inbox"},
)
],
)
def get_default_model(self) -> str:
return "test-model"
notified: list[str] = []
async def _on_execute(tasks: str) -> str:
return "3 new emails — client proposal from Zain, invoice, meeting reminder."
async def _on_notify(response: str) -> None:
notified.append(response)
async def _eval_always_notify(*a, **kw):
return True
monkeypatch.setattr("nanobot.utils.evaluator.evaluate_response", _eval_always_notify)
service = HeartbeatService(
workspace=tmp_path,
provider=StubProvider(),
model="test-model",
on_execute=_on_execute,
on_notify=_on_notify,
)
await service._tick()
assert notified == ["3 new emails — client proposal from Zain, invoice, meeting reminder."]