Merge branch 'main' into nightly

This commit is contained in:
chengyongru 2026-03-24 14:34:44 +08:00
commit 54b79ce8b7
31 changed files with 1283 additions and 197 deletions

119
README.md
View File

@ -90,22 +90,38 @@
## Table of Contents
- [News](#-news)
- [Key Features](#key-features-of-nanobot)
- [Architecture](#-architecture)
- [Features](#-features)
- [Install](#-install)
- [Quick Start](#-quick-start)
- [Chat Apps](#-chat-apps)
- [Agent Social Network](#-agent-social-network)
- [Configuration](#-configuration)
- [Multiple Instances](#-multiple-instances)
- [CLI Reference](#-cli-reference)
- [Docker](#-docker)
- [Linux Service](#-linux-service)
- [Project Structure](#-project-structure)
- [Contribute & Roadmap](#-contribute--roadmap)
- [Star History](#-star-history)
- [📢 News](#-news)
- [Key Features of nanobot:](#key-features-of-nanobot)
- [🏗️ Architecture](#-architecture)
- [Table of Contents](#table-of-contents)
- [✨ Features](#-features)
- [📦 Install](#-install)
- [Update to latest version](#update-to-latest-version)
- [🚀 Quick Start](#-quick-start)
- [💬 Chat Apps](#-chat-apps)
- [🌐 Agent Social Network](#-agent-social-network)
- [⚙️ Configuration](#-configuration)
- [Providers](#providers)
- [Web Search](#web-search)
- [MCP (Model Context Protocol)](#mcp-model-context-protocol)
- [Security](#security)
- [🧩 Multiple Instances](#-multiple-instances)
- [Quick Start](#quick-start)
- [Path Resolution](#path-resolution)
- [How It Works](#how-it-works)
- [Minimal Setup](#minimal-setup)
- [Common Use Cases](#common-use-cases)
- [Notes](#notes)
- [💻 CLI Reference](#-cli-reference)
- [🐳 Docker](#-docker)
- [Docker Compose](#docker-compose)
- [Docker](#docker)
- [🐧 Linux Service](#-linux-service)
- [📁 Project Structure](#-project-structure)
- [🤝 Contribute \& Roadmap](#-contribute--roadmap)
- [Branching Strategy](#branching-strategy)
- [Contributors](#contributors)
- [⭐ Star History](#-star-history)
## ✨ Features
@ -172,7 +188,7 @@ nanobot --version
```bash
rm -rf ~/.nanobot/bridge
nanobot channels login
nanobot channels login whatsapp
```
## 🚀 Quick Start
@ -232,21 +248,21 @@ That's it! You have a working AI assistant in 2 minutes.
Connect nanobot to your favorite chat platform. Want to build your own? See the [Channel Plugin Guide](./docs/CHANNEL_PLUGIN_GUIDE.md).
> Channel plugin support is available in the `main` branch; not yet published to PyPI.
| Channel | What you need |
|---------|---------------|
| **Telegram** | Bot token from @BotFather |
| **Discord** | Bot token + Message Content intent |
| **WhatsApp** | QR code scan |
| **WhatsApp** | QR code scan (`nanobot channels login whatsapp`) |
| **WeChat (Weixin)** | QR code scan (`nanobot channels login weixin`) |
| **Feishu** | App ID + App Secret |
| **Mochat** | Claw token (auto-setup available) |
| **DingTalk** | App Key + App Secret |
| **Slack** | Bot token + App-Level token |
| **Matrix** | Homeserver URL + Access token |
| **Email** | IMAP/SMTP credentials |
| **QQ** | App ID + App Secret |
| **Wecom** | Bot ID + Bot Secret |
| **Wecom App** | Corp ID + Agent ID + Secret + Token + AES Key |
| **Mochat** | Claw token (auto-setup available) |
<details>
<summary><b>Telegram</b> (Recommended)</summary>
@ -464,7 +480,7 @@ Requires **Node.js ≥18**.
**1. Link device**
```bash
nanobot channels login
nanobot channels login whatsapp
# Scan QR with WhatsApp → Settings → Linked Devices
```
@ -485,7 +501,7 @@ nanobot channels login
```bash
# Terminal 1
nanobot channels login
nanobot channels login whatsapp
# Terminal 2
nanobot gateway
@ -493,7 +509,7 @@ nanobot gateway
> WhatsApp bridge updates are not applied automatically for existing installations.
> After upgrading nanobot, rebuild the local bridge with:
> `rm -rf ~/.nanobot/bridge && nanobot channels login`
> `rm -rf ~/.nanobot/bridge && nanobot channels login whatsapp`
</details>
@ -721,6 +737,59 @@ nanobot gateway
</details>
<details>
<summary><b>WeChat (微信 / Weixin)</b></summary>
Uses **HTTP long-poll** with QR-code login via the ilinkai personal WeChat API. No local WeChat desktop client is required.
> Weixin support is available from source checkout, but is not included in the current PyPI release yet.
**1. Install from source**
```bash
git clone https://github.com/HKUDS/nanobot.git
cd nanobot
pip install -e ".[weixin]"
```
**2. Configure**
```json
{
"channels": {
"weixin": {
"enabled": true,
"allowFrom": ["YOUR_WECHAT_USER_ID"]
}
}
}
```
> - `allowFrom`: Add the sender ID you see in nanobot logs for your WeChat account. Use `["*"]` to allow all users.
> - `token`: Optional. If omitted, log in interactively and nanobot will save the token for you.
> - `stateDir`: Optional. Defaults to nanobot's runtime directory for Weixin state.
> - `pollTimeout`: Optional long-poll timeout in seconds.
**3. Login**
```bash
nanobot channels login weixin
```
Use `--force` to re-authenticate and ignore any saved token:
```bash
nanobot channels login weixin --force
```
**4. Run**
```bash
nanobot gateway
```
</details>
<details>
<summary><b>Wecom (企业微信)</b></summary>
@ -1491,7 +1560,7 @@ nanobot gateway --config ~/.nanobot-telegram/config.json --workspace /tmp/nanobo
| `nanobot gateway` | Start the gateway |
| `nanobot status` | Show status |
| `nanobot provider login openai-codex` | OAuth login for providers |
| `nanobot channels login` | Link WhatsApp (scan QR) |
| `nanobot channels login <channel>` | Authenticate a channel interactively |
| `nanobot channels status` | Show channel status |
Interactive mode exits: `exit`, `quit`, `/exit`, `/quit`, `:q`, or `Ctrl+D`.

View File

@ -12,6 +12,17 @@ interface SendCommand {
text: string;
}
interface SendMediaCommand {
type: 'send_media';
to: string;
filePath: string;
mimetype: string;
caption?: string;
fileName?: string;
}
type BridgeCommand = SendCommand | SendMediaCommand;
interface BridgeMessage {
type: 'message' | 'status' | 'qr' | 'error';
[key: string]: unknown;
@ -72,7 +83,7 @@ export class BridgeServer {
ws.on('message', async (data) => {
try {
const cmd = JSON.parse(data.toString()) as SendCommand;
const cmd = JSON.parse(data.toString()) as BridgeCommand;
await this.handleCommand(cmd);
ws.send(JSON.stringify({ type: 'sent', to: cmd.to }));
} catch (error) {
@ -92,9 +103,13 @@ export class BridgeServer {
});
}
private async handleCommand(cmd: SendCommand): Promise<void> {
if (cmd.type === 'send' && this.wa) {
private async handleCommand(cmd: BridgeCommand): Promise<void> {
if (!this.wa) return;
if (cmd.type === 'send') {
await this.wa.sendMessage(cmd.to, cmd.text);
} else if (cmd.type === 'send_media') {
await this.wa.sendMedia(cmd.to, cmd.filePath, cmd.mimetype, cmd.caption, cmd.fileName);
}
}

View File

@ -16,8 +16,8 @@ import makeWASocket, {
import { Boom } from '@hapi/boom';
import qrcode from 'qrcode-terminal';
import pino from 'pino';
import { writeFile, mkdir } from 'fs/promises';
import { join } from 'path';
import { readFile, writeFile, mkdir } from 'fs/promises';
import { join, basename } from 'path';
import { randomBytes } from 'crypto';
const VERSION = '0.1.0';
@ -29,6 +29,7 @@ export interface InboundMessage {
content: string;
timestamp: number;
isGroup: boolean;
wasMentioned?: boolean;
media?: string[];
}
@ -48,6 +49,31 @@ export class WhatsAppClient {
this.options = options;
}
private normalizeJid(jid: string | undefined | null): string {
return (jid || '').split(':')[0];
}
private wasMentioned(msg: any): boolean {
if (!msg?.key?.remoteJid?.endsWith('@g.us')) return false;
const candidates = [
msg?.message?.extendedTextMessage?.contextInfo?.mentionedJid,
msg?.message?.imageMessage?.contextInfo?.mentionedJid,
msg?.message?.videoMessage?.contextInfo?.mentionedJid,
msg?.message?.documentMessage?.contextInfo?.mentionedJid,
msg?.message?.audioMessage?.contextInfo?.mentionedJid,
];
const mentioned = candidates.flatMap((items) => (Array.isArray(items) ? items : []));
if (mentioned.length === 0) return false;
const selfIds = new Set(
[this.sock?.user?.id, this.sock?.user?.lid, this.sock?.user?.jid]
.map((jid) => this.normalizeJid(jid))
.filter(Boolean),
);
return mentioned.some((jid: string) => selfIds.has(this.normalizeJid(jid)));
}
async connect(): Promise<void> {
const logger = pino({ level: 'silent' });
const { state, saveCreds } = await useMultiFileAuthState(this.options.authDir);
@ -145,6 +171,7 @@ export class WhatsAppClient {
if (!finalContent && mediaPaths.length === 0) continue;
const isGroup = msg.key.remoteJid?.endsWith('@g.us') || false;
const wasMentioned = this.wasMentioned(msg);
this.options.onMessage({
id: msg.key.id || '',
@ -153,6 +180,7 @@ export class WhatsAppClient {
content: finalContent,
timestamp: msg.messageTimestamp as number,
isGroup,
...(isGroup ? { wasMentioned } : {}),
...(mediaPaths.length > 0 ? { media: mediaPaths } : {}),
});
}
@ -230,6 +258,32 @@ export class WhatsAppClient {
await this.sock.sendMessage(to, { text });
}
async sendMedia(
to: string,
filePath: string,
mimetype: string,
caption?: string,
fileName?: string,
): Promise<void> {
if (!this.sock) {
throw new Error('Not connected');
}
const buffer = await readFile(filePath);
const category = mimetype.split('/')[0];
if (category === 'image') {
await this.sock.sendMessage(to, { image: buffer, caption: caption || undefined, mimetype });
} else if (category === 'video') {
await this.sock.sendMessage(to, { video: buffer, caption: caption || undefined, mimetype });
} else if (category === 'audio') {
await this.sock.sendMessage(to, { audio: buffer, mimetype });
} else {
const name = fileName || basename(filePath);
await this.sock.sendMessage(to, { document: buffer, mimetype, fileName: name });
}
}
async disconnect(): Promise<void> {
if (this.sock) {
this.sock.end(undefined);

View File

@ -15,7 +15,7 @@ root=$(cat nanobot/__init__.py nanobot/__main__.py | wc -l)
printf " %-16s %5s lines\n" "(root)" "$root"
echo ""
total=$(find nanobot -name "*.py" ! -path "*/channels/*" ! -path "*/cli/*" ! -path "*/providers/*" ! -path "*/skills/*" | xargs cat | wc -l)
total=$(find nanobot -name "*.py" ! -path "*/channels/*" ! -path "*/cli/*" ! -path "*/command/*" ! -path "*/providers/*" ! -path "*/skills/*" | xargs cat | wc -l)
echo " Core total: $total lines"
echo ""
echo " (excludes: channels/, cli/, providers/, skills/)"
echo " (excludes: channels/, cli/, command/, providers/, skills/)"

View File

@ -2,6 +2,8 @@
Build a custom nanobot channel in three steps: subclass, package, install.
> **Note:** We recommend developing channel plugins against a source checkout of nanobot (`pip install -e .`) rather than a PyPI release, so you always have access to the latest base-channel features and APIs.
## How It Works
nanobot discovers channel plugins via Python [entry points](https://packaging.python.org/en/latest/specifications/entry-points/). When `nanobot gateway` starts, it scans:

View File

@ -4,17 +4,15 @@ from __future__ import annotations
import asyncio
import json
import os
import re
import sys
import os
import time
from contextlib import AsyncExitStack
from contextlib import AsyncExitStack, nullcontext
from pathlib import Path
from typing import TYPE_CHECKING, Any, Awaitable, Callable
from loguru import logger
from nanobot import __version__
from nanobot.agent.context import ContextBuilder
from nanobot.agent.memory import MemoryConsolidator
from nanobot.agent.subagent import SubagentManager
@ -28,6 +26,7 @@ from nanobot.agent.tools.spawn import SpawnTool
from nanobot.agent.tools.web import WebFetchTool, WebSearchTool
from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.utils.helpers import build_status_content, trim_history_for_budget
from nanobot.command import CommandContext, CommandRouter, register_builtin_commands
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMProvider
from nanobot.session.manager import Session, SessionManager
@ -110,7 +109,12 @@ class AgentLoop:
self._mcp_connecting = False
self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks
self._background_tasks: list[asyncio.Task] = []
self._processing_lock = asyncio.Lock()
self._session_locks: dict[str, asyncio.Lock] = {}
# NANOBOT_MAX_CONCURRENT_REQUESTS: <=0 means unlimited; default 3.
_max = int(os.environ.get("NANOBOT_MAX_CONCURRENT_REQUESTS", "3"))
self._concurrency_gate: asyncio.Semaphore | None = (
asyncio.Semaphore(_max) if _max > 0 else None
)
self.memory_consolidator = MemoryConsolidator(
workspace=workspace,
provider=provider,
@ -122,6 +126,8 @@ class AgentLoop:
max_completion_tokens=provider.generation.max_tokens,
)
self._register_default_tools()
self.commands = CommandRouter()
register_builtin_commands(self.commands)
def _register_default_tools(self) -> None:
"""Register the default set of tools."""
@ -212,28 +218,6 @@ class AgentLoop:
return ", ".join(_fmt(tc) for tc in tool_calls)
def _status_response(self, msg: InboundMessage, session: Session) -> OutboundMessage:
"""Build an outbound status message for a session."""
ctx_est = 0
try:
ctx_est, _ = self.memory_consolidator.estimate_session_prompt_tokens(session)
except Exception:
pass
if ctx_est <= 0:
ctx_est = self._last_usage.get("prompt_tokens", 0)
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=build_status_content(
version=__version__, model=self.model,
start_time=self._start_time, last_usage=self._last_usage,
context_window_tokens=self.context_window_tokens,
session_msg_count=len(session.get_history(max_messages=0)),
context_tokens_estimate=ctx_est,
),
metadata={"render_as": "text"},
)
def _trim_history_for_budget(
self,
messages: list[dict],
@ -256,6 +240,10 @@ class AgentLoop:
on_progress: Callable[..., Awaitable[None]] | None = None,
on_stream: Callable[[str], Awaitable[None]] | None = None,
on_stream_end: Callable[..., Awaitable[None]] | None = None,
*,
channel: str = "cli",
chat_id: str = "direct",
message_id: str | None = None,
) -> tuple[str | None, list[str], list[dict]]:
"""Run the agent iteration loop.
@ -342,6 +330,10 @@ class AgentLoop:
args_str = json.dumps(tc.arguments, ensure_ascii=False)
logger.info("Tool call: {}({})", tc.name, args_str[:200])
# Re-bind tool context right before execution so that
# concurrent sessions don't clobber each other's routing.
self._set_tool_context(channel, chat_id, message_id)
# Execute all tool calls concurrently — the LLM batches
# independent calls in a single response on purpose.
# return_exceptions=True ensures all results are collected
@ -404,52 +396,22 @@ class AgentLoop:
logger.warning("Error consuming inbound message: {}, continuing...", e)
continue
cmd = msg.content.strip().lower()
if cmd == "/stop":
await self._handle_stop(msg)
elif cmd == "/restart":
await self._handle_restart(msg)
elif cmd == "/status":
session = self.sessions.get_or_create(msg.session_key)
await self.bus.publish_outbound(self._status_response(msg, session))
else:
task = asyncio.create_task(self._dispatch(msg))
self._active_tasks.setdefault(msg.session_key, []).append(task)
task.add_done_callback(lambda t, k=msg.session_key: self._active_tasks.get(k, []) and self._active_tasks[k].remove(t) if t in self._active_tasks.get(k, []) else None)
async def _handle_stop(self, msg: InboundMessage) -> None:
"""Cancel all active tasks and subagents for the session."""
tasks = self._active_tasks.pop(msg.session_key, [])
cancelled = sum(1 for t in tasks if not t.done() and t.cancel())
for t in tasks:
try:
await t
except (asyncio.CancelledError, Exception):
pass
sub_cancelled = await self.subagents.cancel_by_session(msg.session_key)
total = cancelled + sub_cancelled
content = f"Stopped {total} task(s)." if total else "No active task to stop."
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, content=content,
))
async def _handle_restart(self, msg: InboundMessage) -> None:
"""Restart the process in-place via os.execv."""
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, content="Restarting...",
))
async def _do_restart():
await asyncio.sleep(1)
# Use -m nanobot instead of sys.argv[0] for Windows compatibility
# (sys.argv[0] may be just "nanobot" without full path on Windows)
os.execv(sys.executable, [sys.executable, "-m", "nanobot"] + sys.argv[1:])
asyncio.create_task(_do_restart())
raw = msg.content.strip()
if self.commands.is_priority(raw):
ctx = CommandContext(msg=msg, session=None, key=msg.session_key, raw=raw, loop=self)
result = await self.commands.dispatch_priority(ctx)
if result:
await self.bus.publish_outbound(result)
continue
task = asyncio.create_task(self._dispatch(msg))
self._active_tasks.setdefault(msg.session_key, []).append(task)
task.add_done_callback(lambda t, k=msg.session_key: self._active_tasks.get(k, []) and self._active_tasks[k].remove(t) if t in self._active_tasks.get(k, []) else None)
async def _dispatch(self, msg: InboundMessage) -> None:
"""Process a message under the global lock."""
async with self._processing_lock:
"""Process a message: per-session serial, cross-session concurrent."""
lock = self._session_locks.setdefault(msg.session_key, asyncio.Lock())
gate = self._concurrency_gate or nullcontext()
async with lock, gate:
try:
on_stream = on_stream_end = None
if msg.metadata.get("_wants_stream"):
@ -533,7 +495,10 @@ class AgentLoop:
current_message=msg.content, channel=channel, chat_id=chat_id,
current_role=current_role,
)
final_content, _, all_msgs = await self._run_agent_loop(messages)
final_content, _, all_msgs = await self._run_agent_loop(
messages, channel=channel, chat_id=chat_id,
message_id=msg.metadata.get("message_id"),
)
self._save_turn(session, all_msgs, 1 + len(history))
self.sessions.save(session)
self._schedule_background(self.memory_consolidator.maybe_consolidate_by_tokens(session))
@ -547,35 +512,11 @@ class AgentLoop:
session = self.sessions.get_or_create(key)
# Slash commands
cmd = msg.content.strip().lower()
if cmd == "/new":
snapshot = session.messages[session.last_consolidated:]
session.clear()
self.sessions.save(session)
self.sessions.invalidate(session.key)
raw = msg.content.strip()
ctx = CommandContext(msg=msg, session=session, key=key, raw=raw, loop=self)
if result := await self.commands.dispatch(ctx):
return result
if snapshot:
self._schedule_background(self.memory_consolidator.archive_messages(snapshot))
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
content="New session started.")
if cmd == "/status":
return self._status_response(msg, session)
if cmd == "/help":
lines = [
"🐈 nanobot commands:",
"/new — Start a new conversation",
"/stop — Stop the current task",
"/restart — Restart the bot",
"/status — Show bot status",
"/help — Show available commands",
]
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content="\n".join(lines),
metadata={"render_as": "text"},
)
await self.memory_consolidator.maybe_consolidate_by_tokens(session)
self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id"))
@ -604,6 +545,8 @@ class AgentLoop:
on_progress=on_progress or _bus_progress,
on_stream=on_stream,
on_stream_end=on_stream_end,
channel=msg.channel, chat_id=msg.chat_id,
message_id=msg.metadata.get("message_id"),
)
if final_content is None:

View File

@ -93,8 +93,10 @@ class ReadFileTool(_FsTool):
"required": ["path"],
}
async def execute(self, path: str, offset: int = 1, limit: int | None = None, **kwargs: Any) -> Any:
async def execute(self, path: str | None = None, offset: int = 1, limit: int | None = None, **kwargs: Any) -> Any:
try:
if not path:
return "Error reading file: Unknown path"
fp = self._resolve(path)
if not fp.exists():
return f"Error: File not found: {path}"
@ -174,8 +176,12 @@ class WriteFileTool(_FsTool):
"required": ["path", "content"],
}
async def execute(self, path: str, content: str, **kwargs: Any) -> str:
async def execute(self, path: str | None = None, content: str | None = None, **kwargs: Any) -> str:
try:
if not path:
raise ValueError("Unknown path")
if content is None:
raise ValueError("Unknown content")
fp = self._resolve(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content, encoding="utf-8")
@ -248,10 +254,18 @@ class EditFileTool(_FsTool):
}
async def execute(
self, path: str, old_text: str, new_text: str,
self, path: str | None = None, old_text: str | None = None,
new_text: str | None = None,
replace_all: bool = False, **kwargs: Any,
) -> str:
try:
if not path:
raise ValueError("Unknown path")
if old_text is None:
raise ValueError("Unknown old_text")
if new_text is None:
raise ValueError("Unknown new_text")
fp = self._resolve(path)
if not fp.exists():
return f"Error: File not found: {path}"
@ -350,10 +364,12 @@ class ListDirTool(_FsTool):
}
async def execute(
self, path: str, recursive: bool = False,
self, path: str | None = None, recursive: bool = False,
max_entries: int | None = None, **kwargs: Any,
) -> str:
try:
if path is None:
raise ValueError("Unknown path")
dp = self._resolve(path)
if not dp.exists():
return f"Error: Directory not found: {path}"

View File

@ -6,6 +6,8 @@ import re
from pathlib import Path
from typing import Any
from loguru import logger
from nanobot.agent.tools.base import Tool
@ -110,6 +112,11 @@ class ExecTool(Tool):
await asyncio.wait_for(process.wait(), timeout=5.0)
except asyncio.TimeoutError:
pass
finally:
try:
os.waitpid(process.pid, os.WNOHANG)
except (ProcessLookupError, ChildProcessError) as e:
logger.debug("Process already reaped or not found: {}", e)
return f"Error: Command timed out after {effective_timeout} seconds"
output_parts = []

View File

@ -26,9 +26,7 @@ class WhatsAppConfig(Base):
bridge_url: str = "ws://localhost:3001"
bridge_token: str = ""
allow_from: list[str] = Field(default_factory=list)
group_policy: Literal["open", "mention"] = (
"open" # "open" responds to all, "mention" only when @mentioned
)
group_policy: Literal["open", "mention"] = "open" # "open" responds to all, "mention" only when @mentioned
class WhatsAppChannel(BaseChannel):
@ -140,11 +138,28 @@ class WhatsAppChannel(BaseChannel):
logger.warning("WhatsApp bridge not connected")
return
try:
payload = {"type": "send", "to": msg.chat_id, "text": msg.content}
await self._ws.send(json.dumps(payload, ensure_ascii=False))
except Exception as e:
logger.error("Error sending WhatsApp message: {}", e)
chat_id = msg.chat_id
if msg.content:
try:
payload = {"type": "send", "to": chat_id, "text": msg.content}
await self._ws.send(json.dumps(payload, ensure_ascii=False))
except Exception as e:
logger.error("Error sending WhatsApp message: {}", e)
for media_path in msg.media or []:
try:
mime, _ = mimetypes.guess_type(media_path)
payload = {
"type": "send_media",
"to": chat_id,
"filePath": media_path,
"mimetype": mime or "application/octet-stream",
"fileName": media_path.rsplit("/", 1)[-1],
}
await self._ws.send(json.dumps(payload, ensure_ascii=False))
except Exception as e:
logger.error("Error sending WhatsApp media {}: {}", media_path, e)
async def _handle_bridge_message(self, raw: str) -> None:
"""Handle a message from the bridge."""

View File

@ -34,7 +34,7 @@ from rich.text import Text
from nanobot import __logo__, __version__
from nanobot.cli.stream import StreamRenderer, ThinkingSpinner
from nanobot.config.paths import get_workspace_path
from nanobot.config.paths import get_workspace_path, is_default_workspace
from nanobot.config.schema import Config
from nanobot.utils.helpers import sync_workspace_templates
@ -294,7 +294,7 @@ def onboard(
# Run interactive wizard if enabled
if wizard:
from nanobot.cli.onboard_wizard import run_onboard
from nanobot.cli.onboard import run_onboard
try:
result = run_onboard(initial_config=config)
@ -479,6 +479,17 @@ def _warn_deprecated_config_keys(config_path: Path | None) -> None:
)
def _migrate_cron_store(config: "Config") -> None:
"""One-time migration: move legacy global cron store into the workspace."""
from nanobot.config.paths import get_cron_dir
legacy_path = get_cron_dir() / "jobs.json"
new_path = config.workspace_path / "cron" / "jobs.json"
if legacy_path.is_file() and not new_path.exists():
new_path.parent.mkdir(parents=True, exist_ok=True)
import shutil
shutil.move(str(legacy_path), str(new_path))
# ============================================================================
# Gateway / Server
@ -496,7 +507,6 @@ def gateway(
from nanobot.agent.loop import AgentLoop
from nanobot.bus.queue import MessageBus
from nanobot.channels.manager import ChannelManager
from nanobot.config.paths import get_cron_dir
from nanobot.cron.service import CronService
from nanobot.cron.types import CronJob
from nanobot.heartbeat.service import HeartbeatService
@ -515,8 +525,12 @@ def gateway(
provider = _make_provider(config)
session_manager = SessionManager(config.workspace_path)
# Create cron service first (callback set after agent creation)
cron_store_path = get_cron_dir() / "jobs.json"
# Preserve existing single-workspace installs, but keep custom workspaces clean.
if is_default_workspace(config.workspace_path):
_migrate_cron_store(config)
# Create cron service with workspace-scoped store
cron_store_path = config.workspace_path / "cron" / "jobs.json"
cron = CronService(cron_store_path)
# Create agent with cron service
@ -621,12 +635,13 @@ def gateway(
chat_id=chat_id,
on_progress=_silent,
)
# Clear the heartbeat session to prevent token overflow from accumulated tasks
# Keep a small tail of heartbeat history so the loop stays bounded
# without losing all short-term context between runs.
session = agent.sessions.get_or_create("heartbeat")
session.clear()
session.retain_recent_legal_suffix(hb_cfg.keep_recent_messages)
agent.sessions.save(session)
return resp.content if resp else ""
async def on_heartbeat_notify(response: str) -> None:
@ -704,7 +719,6 @@ def agent(
from nanobot.agent.loop import AgentLoop
from nanobot.bus.queue import MessageBus
from nanobot.config.paths import get_cron_dir
from nanobot.cron.service import CronService
config = _load_runtime_config(config, workspace)
@ -713,8 +727,12 @@ def agent(
bus = MessageBus()
provider = _make_provider(config)
# Create cron service for tool usage (no callback needed for CLI unless running)
cron_store_path = get_cron_dir() / "jobs.json"
# Preserve existing single-workspace installs, but keep custom workspaces clean.
if is_default_workspace(config.workspace_path):
_migrate_cron_store(config)
# Create cron service with workspace-scoped store
cron_store_path = config.workspace_path / "cron" / "jobs.json"
cron = CronService(cron_store_path)
if logs:
@ -1012,7 +1030,7 @@ def channels_login(
force: bool = typer.Option(False, "--force", "-f", help="Force re-authentication even if already logged in"),
):
"""Authenticate with a channel via QR code or other interactive login."""
from nanobot.channels.registry import discover_all, load_channel_class
from nanobot.channels.registry import discover_all
from nanobot.config.loader import load_config
config = load_config()
@ -1027,7 +1045,7 @@ def channels_login(
console.print(f"{__logo__} {all_channels[channel_name].display_name} Login\n")
channel_cls = load_channel_class(channel_name)
channel_cls = all_channels[channel_name]
channel = channel_cls(channel_cfg, bus=None)
success = asyncio.run(channel.login(force=force))

View File

@ -16,7 +16,7 @@ from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from nanobot.cli.model_info import (
from nanobot.cli.models import (
format_token_count,
get_model_context_limit,
get_model_suggestions,

View File

@ -0,0 +1,6 @@
"""Slash command routing and built-in handlers."""
from nanobot.command.builtin import register_builtin_commands
from nanobot.command.router import CommandContext, CommandRouter
__all__ = ["CommandContext", "CommandRouter", "register_builtin_commands"]

110
nanobot/command/builtin.py Normal file
View File

@ -0,0 +1,110 @@
"""Built-in slash command handlers."""
from __future__ import annotations
import asyncio
import os
import sys
from nanobot import __version__
from nanobot.bus.events import OutboundMessage
from nanobot.command.router import CommandContext, CommandRouter
from nanobot.utils.helpers import build_status_content
async def cmd_stop(ctx: CommandContext) -> OutboundMessage:
"""Cancel all active tasks and subagents for the session."""
loop = ctx.loop
msg = ctx.msg
tasks = loop._active_tasks.pop(msg.session_key, [])
cancelled = sum(1 for t in tasks if not t.done() and t.cancel())
for t in tasks:
try:
await t
except (asyncio.CancelledError, Exception):
pass
sub_cancelled = await loop.subagents.cancel_by_session(msg.session_key)
total = cancelled + sub_cancelled
content = f"Stopped {total} task(s)." if total else "No active task to stop."
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content=content)
async def cmd_restart(ctx: CommandContext) -> OutboundMessage:
"""Restart the process in-place via os.execv."""
msg = ctx.msg
async def _do_restart():
await asyncio.sleep(1)
os.execv(sys.executable, [sys.executable, "-m", "nanobot"] + sys.argv[1:])
asyncio.create_task(_do_restart())
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content="Restarting...")
async def cmd_status(ctx: CommandContext) -> OutboundMessage:
"""Build an outbound status message for a session."""
loop = ctx.loop
session = ctx.session or loop.sessions.get_or_create(ctx.key)
ctx_est = 0
try:
ctx_est, _ = loop.memory_consolidator.estimate_session_prompt_tokens(session)
except Exception:
pass
if ctx_est <= 0:
ctx_est = loop._last_usage.get("prompt_tokens", 0)
return OutboundMessage(
channel=ctx.msg.channel,
chat_id=ctx.msg.chat_id,
content=build_status_content(
version=__version__, model=loop.model,
start_time=loop._start_time, last_usage=loop._last_usage,
context_window_tokens=loop.context_window_tokens,
session_msg_count=len(session.get_history(max_messages=0)),
context_tokens_estimate=ctx_est,
),
metadata={"render_as": "text"},
)
async def cmd_new(ctx: CommandContext) -> OutboundMessage:
"""Start a fresh session."""
loop = ctx.loop
session = ctx.session or loop.sessions.get_or_create(ctx.key)
snapshot = session.messages[session.last_consolidated:]
session.clear()
loop.sessions.save(session)
loop.sessions.invalidate(session.key)
if snapshot:
loop._schedule_background(loop.memory_consolidator.archive_messages(snapshot))
return OutboundMessage(
channel=ctx.msg.channel, chat_id=ctx.msg.chat_id,
content="New session started.",
)
async def cmd_help(ctx: CommandContext) -> OutboundMessage:
"""Return available slash commands."""
lines = [
"🐈 nanobot commands:",
"/new — Start a new conversation",
"/stop — Stop the current task",
"/restart — Restart the bot",
"/status — Show bot status",
"/help — Show available commands",
]
return OutboundMessage(
channel=ctx.msg.channel,
chat_id=ctx.msg.chat_id,
content="\n".join(lines),
metadata={"render_as": "text"},
)
def register_builtin_commands(router: CommandRouter) -> None:
"""Register the default set of slash commands."""
router.priority("/stop", cmd_stop)
router.priority("/restart", cmd_restart)
router.priority("/status", cmd_status)
router.exact("/new", cmd_new)
router.exact("/status", cmd_status)
router.exact("/help", cmd_help)

84
nanobot/command/router.py Normal file
View File

@ -0,0 +1,84 @@
"""Minimal command routing table for slash commands."""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Awaitable, Callable
if TYPE_CHECKING:
from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.session.manager import Session
Handler = Callable[["CommandContext"], Awaitable["OutboundMessage | None"]]
@dataclass
class CommandContext:
"""Everything a command handler needs to produce a response."""
msg: InboundMessage
session: Session | None
key: str
raw: str
args: str = ""
loop: Any = None
class CommandRouter:
"""Pure dict-based command dispatch.
Three tiers checked in order:
1. *priority* exact-match commands handled before the dispatch lock
(e.g. /stop, /restart).
2. *exact* exact-match commands handled inside the dispatch lock.
3. *prefix* longest-prefix-first match (e.g. "/team ").
4. *interceptors* fallback predicates (e.g. team-mode active check).
"""
def __init__(self) -> None:
self._priority: dict[str, Handler] = {}
self._exact: dict[str, Handler] = {}
self._prefix: list[tuple[str, Handler]] = []
self._interceptors: list[Handler] = []
def priority(self, cmd: str, handler: Handler) -> None:
self._priority[cmd] = handler
def exact(self, cmd: str, handler: Handler) -> None:
self._exact[cmd] = handler
def prefix(self, pfx: str, handler: Handler) -> None:
self._prefix.append((pfx, handler))
self._prefix.sort(key=lambda p: len(p[0]), reverse=True)
def intercept(self, handler: Handler) -> None:
self._interceptors.append(handler)
def is_priority(self, text: str) -> bool:
return text.strip().lower() in self._priority
async def dispatch_priority(self, ctx: CommandContext) -> OutboundMessage | None:
"""Dispatch a priority command. Called from run() without the lock."""
handler = self._priority.get(ctx.raw.lower())
if handler:
return await handler(ctx)
return None
async def dispatch(self, ctx: CommandContext) -> OutboundMessage | None:
"""Try exact, prefix, then interceptors. Returns None if unhandled."""
cmd = ctx.raw.lower()
if handler := self._exact.get(cmd):
return await handler(ctx)
for pfx, handler in self._prefix:
if cmd.startswith(pfx):
ctx.args = ctx.raw[len(pfx):]
return await handler(ctx)
for interceptor in self._interceptors:
result = await interceptor(ctx)
if result is not None:
return result
return None

View File

@ -7,6 +7,7 @@ from nanobot.config.paths import (
get_cron_dir,
get_data_dir,
get_legacy_sessions_dir,
is_default_workspace,
get_logs_dir,
get_media_dir,
get_runtime_subdir,
@ -24,6 +25,7 @@ __all__ = [
"get_cron_dir",
"get_logs_dir",
"get_workspace_path",
"is_default_workspace",
"get_cli_history_path",
"get_bridge_install_dir",
"get_legacy_sessions_dir",

View File

@ -40,6 +40,13 @@ def get_workspace_path(workspace: str | None = None) -> Path:
return ensure_dir(path)
def is_default_workspace(workspace: str | Path | None) -> bool:
"""Return whether a workspace resolves to nanobot's default workspace path."""
current = Path(workspace).expanduser() if workspace is not None else Path.home() / ".nanobot" / "workspace"
default = Path.home() / ".nanobot" / "workspace"
return current.resolve(strict=False) == default.resolve(strict=False)
def get_cli_history_path() -> Path:
"""Return the shared CLI history file path."""
return Path.home() / ".nanobot" / "history" / "cli_history"

View File

@ -91,6 +91,7 @@ class HeartbeatConfig(Base):
enabled: bool = True
interval_s: int = 30 * 60 # 30 minutes
keep_recent_messages: int = 8
class GatewayConfig(Base):
@ -174,12 +175,15 @@ class Config(BaseSettings):
self, model: str | None = None
) -> tuple["ProviderConfig | None", str | None]:
"""Match provider config and its registry name. Returns (config, spec_name)."""
from nanobot.providers.registry import PROVIDERS
from nanobot.providers.registry import PROVIDERS, find_by_name
forced = self.agents.defaults.provider
if forced != "auto":
p = getattr(self.providers, forced, None)
return (p, forced) if p else (None, None)
spec = find_by_name(forced)
if spec:
p = getattr(self.providers, spec.name, None)
return (p, spec.name) if p else (None, None)
return None, None
model_lower = (model or self.agents.defaults.model).lower()
model_normalized = model_lower.replace("-", "_")

View File

@ -15,6 +15,8 @@ from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from pydantic.alias_generators import to_snake
@dataclass(frozen=True)
class ProviderSpec:
@ -545,7 +547,8 @@ def find_gateway(
def find_by_name(name: str) -> ProviderSpec | None:
"""Find a provider spec by config field name, e.g. "dashscope"."""
normalized = to_snake(name.replace("-", "_"))
for spec in PROVIDERS:
if spec.name == name:
if spec.name == normalized:
return spec
return None

View File

@ -98,6 +98,32 @@ class Session:
self.last_consolidated = 0
self.updated_at = datetime.now()
def retain_recent_legal_suffix(self, max_messages: int) -> None:
"""Keep a legal recent suffix, mirroring get_history boundary rules."""
if max_messages <= 0:
self.clear()
return
if len(self.messages) <= max_messages:
return
start_idx = max(0, len(self.messages) - max_messages)
# If the cutoff lands mid-turn, extend backward to the nearest user turn.
while start_idx > 0 and self.messages[start_idx].get("role") != "user":
start_idx -= 1
retained = self.messages[start_idx:]
# Mirror get_history(): avoid persisting orphan tool results at the front.
start = self._find_legal_start(retained)
if start:
retained = retained[start:]
dropped = len(self.messages) - len(retained)
self.messages = retained
self.last_consolidated = max(0, self.last_consolidated - dropped)
self.updated_at = datetime.now()
class SessionManager:
"""

View File

@ -76,6 +76,7 @@ dev = [
"matrix-nio[e2e]>=0.25.2",
"mistune>=3.0.0,<4.0.0",
"nh3>=0.2.17,<1.0.0",
"mypy>=1.19.1",
]
[project.scripts]

View File

@ -22,6 +22,10 @@ class _FakePlugin(BaseChannel):
name = "fakeplugin"
display_name = "Fake Plugin"
def __init__(self, config, bus):
super().__init__(config, bus)
self.login_calls: list[bool] = []
async def start(self) -> None:
pass
@ -31,6 +35,10 @@ class _FakePlugin(BaseChannel):
async def send(self, msg: OutboundMessage) -> None:
pass
async def login(self, force: bool = False) -> bool:
self.login_calls.append(force)
return True
class _FakeTelegram(BaseChannel):
"""Plugin that tries to shadow built-in telegram."""
@ -183,6 +191,34 @@ async def test_manager_loads_plugin_from_dict_config():
assert isinstance(mgr.channels["fakeplugin"], _FakePlugin)
def test_channels_login_uses_discovered_plugin_class(monkeypatch):
from nanobot.cli.commands import app
from nanobot.config.schema import Config
from typer.testing import CliRunner
runner = CliRunner()
seen: dict[str, object] = {}
class _LoginPlugin(_FakePlugin):
display_name = "Login Plugin"
async def login(self, force: bool = False) -> bool:
seen["force"] = force
seen["config"] = self.config
return True
monkeypatch.setattr("nanobot.config.loader.load_config", lambda: Config())
monkeypatch.setattr(
"nanobot.channels.registry.discover_all",
lambda: {"fakeplugin": _LoginPlugin},
)
result = runner.invoke(app, ["channels", "login", "fakeplugin", "--force"])
assert result.exit_code == 0
assert seen["force"] is True
@pytest.mark.asyncio
async def test_manager_skips_disabled_plugin():
fake_config = SimpleNamespace(

View File

@ -11,7 +11,7 @@ from nanobot.cli.commands import _make_provider, app
from nanobot.config.schema import Config
from nanobot.providers.litellm_provider import LiteLLMProvider
from nanobot.providers.openai_codex_provider import _strip_model_prefix
from nanobot.providers.registry import find_by_model
from nanobot.providers.registry import find_by_model, find_by_name
runner = CliRunner()
@ -138,10 +138,10 @@ def test_onboard_help_shows_workspace_and_config_options():
def test_onboard_interactive_discard_does_not_save_or_create_workspace(mock_paths, monkeypatch):
config_file, workspace_dir, _ = mock_paths
from nanobot.cli.onboard_wizard import OnboardResult
from nanobot.cli.onboard import OnboardResult
monkeypatch.setattr(
"nanobot.cli.onboard_wizard.run_onboard",
"nanobot.cli.onboard.run_onboard",
lambda initial_config: OnboardResult(config=initial_config, should_save=False),
)
@ -179,10 +179,10 @@ def test_onboard_wizard_preserves_explicit_config_in_next_steps(tmp_path, monkey
config_path = tmp_path / "instance" / "config.json"
workspace_path = tmp_path / "workspace"
from nanobot.cli.onboard_wizard import OnboardResult
from nanobot.cli.onboard import OnboardResult
monkeypatch.setattr(
"nanobot.cli.onboard_wizard.run_onboard",
"nanobot.cli.onboard.run_onboard",
lambda initial_config: OnboardResult(config=initial_config, should_save=True),
)
monkeypatch.setattr("nanobot.channels.registry.discover_all", lambda: {})
@ -240,6 +240,34 @@ def test_config_explicit_ollama_provider_uses_default_localhost_api_base():
assert config.get_api_base() == "http://localhost:11434"
def test_config_accepts_camel_case_explicit_provider_name_for_coding_plan():
config = Config.model_validate(
{
"agents": {
"defaults": {
"provider": "volcengineCodingPlan",
"model": "doubao-1-5-pro",
}
},
"providers": {
"volcengineCodingPlan": {
"apiKey": "test-key",
}
},
}
)
assert config.get_provider_name() == "volcengine_coding_plan"
assert config.get_api_base() == "https://ark.cn-beijing.volces.com/api/coding/v3"
def test_find_by_name_accepts_camel_case_and_hyphen_aliases():
assert find_by_name("volcengineCodingPlan") is not None
assert find_by_name("volcengineCodingPlan").name == "volcengine_coding_plan"
assert find_by_name("github-copilot") is not None
assert find_by_name("github-copilot").name == "github_copilot"
def test_config_auto_detects_ollama_from_local_api_base():
config = Config.model_validate(
{
@ -333,10 +361,8 @@ def mock_agent_runtime(tmp_path):
"""Mock agent command dependencies for focused CLI tests."""
config = Config()
config.agents.defaults.workspace = str(tmp_path / "default-workspace")
cron_dir = tmp_path / "data" / "cron"
with patch("nanobot.config.loader.load_config", return_value=config) as mock_load_config, \
patch("nanobot.config.paths.get_cron_dir", return_value=cron_dir), \
patch("nanobot.cli.commands.sync_workspace_templates") as mock_sync_templates, \
patch("nanobot.cli.commands._make_provider", return_value=object()), \
patch("nanobot.cli.commands._print_agent_response") as mock_print_response, \
@ -413,7 +439,6 @@ def test_agent_config_sets_active_path(monkeypatch, tmp_path: Path) -> None:
lambda path: seen.__setitem__("config_path", path),
)
monkeypatch.setattr("nanobot.config.loader.load_config", lambda _path=None: config)
monkeypatch.setattr("nanobot.config.paths.get_cron_dir", lambda: config_file.parent / "cron")
monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None)
monkeypatch.setattr("nanobot.cli.commands._make_provider", lambda _config: object())
monkeypatch.setattr("nanobot.bus.queue.MessageBus", lambda: object())
@ -438,6 +463,147 @@ def test_agent_config_sets_active_path(monkeypatch, tmp_path: Path) -> None:
assert seen["config_path"] == config_file.resolve()
def test_agent_uses_workspace_directory_for_cron_store(monkeypatch, tmp_path: Path) -> None:
config_file = tmp_path / "instance" / "config.json"
config_file.parent.mkdir(parents=True)
config_file.write_text("{}")
config = Config()
config.agents.defaults.workspace = str(tmp_path / "agent-workspace")
seen: dict[str, Path] = {}
monkeypatch.setattr("nanobot.config.loader.set_config_path", lambda _path: None)
monkeypatch.setattr("nanobot.config.loader.load_config", lambda _path=None: config)
monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None)
monkeypatch.setattr("nanobot.cli.commands._make_provider", lambda _config: object())
monkeypatch.setattr("nanobot.bus.queue.MessageBus", lambda: object())
class _FakeCron:
def __init__(self, store_path: Path) -> None:
seen["cron_store"] = store_path
class _FakeAgentLoop:
def __init__(self, *args, **kwargs) -> None:
pass
async def process_direct(self, *_args, **_kwargs):
return OutboundMessage(channel="cli", chat_id="direct", content="ok")
async def close_mcp(self) -> None:
return None
monkeypatch.setattr("nanobot.cron.service.CronService", _FakeCron)
monkeypatch.setattr("nanobot.agent.loop.AgentLoop", _FakeAgentLoop)
monkeypatch.setattr("nanobot.cli.commands._print_agent_response", lambda *_args, **_kwargs: None)
result = runner.invoke(app, ["agent", "-m", "hello", "-c", str(config_file)])
assert result.exit_code == 0
assert seen["cron_store"] == config.workspace_path / "cron" / "jobs.json"
def test_agent_workspace_override_does_not_migrate_legacy_cron(
monkeypatch, tmp_path: Path
) -> None:
config_file = tmp_path / "instance" / "config.json"
config_file.parent.mkdir(parents=True)
config_file.write_text("{}")
legacy_dir = tmp_path / "global" / "cron"
legacy_dir.mkdir(parents=True)
legacy_file = legacy_dir / "jobs.json"
legacy_file.write_text('{"jobs": []}')
override = tmp_path / "override-workspace"
config = Config()
seen: dict[str, Path] = {}
monkeypatch.setattr("nanobot.config.loader.set_config_path", lambda _path: None)
monkeypatch.setattr("nanobot.config.loader.load_config", lambda _path=None: config)
monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None)
monkeypatch.setattr("nanobot.cli.commands._make_provider", lambda _config: object())
monkeypatch.setattr("nanobot.bus.queue.MessageBus", lambda: object())
monkeypatch.setattr("nanobot.config.paths.get_cron_dir", lambda: legacy_dir)
class _FakeCron:
def __init__(self, store_path: Path) -> None:
seen["cron_store"] = store_path
class _FakeAgentLoop:
def __init__(self, *args, **kwargs) -> None:
pass
async def process_direct(self, *_args, **_kwargs):
return OutboundMessage(channel="cli", chat_id="direct", content="ok")
async def close_mcp(self) -> None:
return None
monkeypatch.setattr("nanobot.cron.service.CronService", _FakeCron)
monkeypatch.setattr("nanobot.agent.loop.AgentLoop", _FakeAgentLoop)
monkeypatch.setattr("nanobot.cli.commands._print_agent_response", lambda *_args, **_kwargs: None)
result = runner.invoke(
app,
["agent", "-m", "hello", "-c", str(config_file), "-w", str(override)],
)
assert result.exit_code == 0
assert seen["cron_store"] == override / "cron" / "jobs.json"
assert legacy_file.exists()
assert not (override / "cron" / "jobs.json").exists()
def test_agent_custom_config_workspace_does_not_migrate_legacy_cron(
monkeypatch, tmp_path: Path
) -> None:
config_file = tmp_path / "instance" / "config.json"
config_file.parent.mkdir(parents=True)
config_file.write_text("{}")
legacy_dir = tmp_path / "global" / "cron"
legacy_dir.mkdir(parents=True)
legacy_file = legacy_dir / "jobs.json"
legacy_file.write_text('{"jobs": []}')
custom_workspace = tmp_path / "custom-workspace"
config = Config()
config.agents.defaults.workspace = str(custom_workspace)
seen: dict[str, Path] = {}
monkeypatch.setattr("nanobot.config.loader.set_config_path", lambda _path: None)
monkeypatch.setattr("nanobot.config.loader.load_config", lambda _path=None: config)
monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None)
monkeypatch.setattr("nanobot.cli.commands._make_provider", lambda _config: object())
monkeypatch.setattr("nanobot.bus.queue.MessageBus", lambda: object())
monkeypatch.setattr("nanobot.config.paths.get_cron_dir", lambda: legacy_dir)
class _FakeCron:
def __init__(self, store_path: Path) -> None:
seen["cron_store"] = store_path
class _FakeAgentLoop:
def __init__(self, *args, **kwargs) -> None:
pass
async def process_direct(self, *_args, **_kwargs):
return OutboundMessage(channel="cli", chat_id="direct", content="ok")
async def close_mcp(self) -> None:
return None
monkeypatch.setattr("nanobot.cron.service.CronService", _FakeCron)
monkeypatch.setattr("nanobot.agent.loop.AgentLoop", _FakeAgentLoop)
monkeypatch.setattr("nanobot.cli.commands._print_agent_response", lambda *_args, **_kwargs: None)
result = runner.invoke(app, ["agent", "-m", "hello", "-c", str(config_file)])
assert result.exit_code == 0
assert seen["cron_store"] == custom_workspace / "cron" / "jobs.json"
assert legacy_file.exists()
assert not (custom_workspace / "cron" / "jobs.json").exists()
def test_agent_overrides_workspace_path(mock_agent_runtime):
workspace_path = Path("/tmp/agent-workspace")
@ -477,6 +643,12 @@ def test_agent_hints_about_deprecated_memory_window(mock_agent_runtime, tmp_path
assert "no longer used" in result.stdout
def test_heartbeat_retains_recent_messages_by_default():
config = Config()
assert config.gateway.heartbeat.keep_recent_messages == 8
def test_gateway_uses_workspace_from_config_by_default(monkeypatch, tmp_path: Path) -> None:
config_file = tmp_path / "instance" / "config.json"
config_file.parent.mkdir(parents=True)
@ -538,7 +710,7 @@ def test_gateway_workspace_option_overrides_config(monkeypatch, tmp_path: Path)
assert config.workspace_path == override
def test_gateway_uses_config_directory_for_cron_store(monkeypatch, tmp_path: Path) -> None:
def test_gateway_uses_workspace_directory_for_cron_store(monkeypatch, tmp_path: Path) -> None:
config_file = tmp_path / "instance" / "config.json"
config_file.parent.mkdir(parents=True)
config_file.write_text("{}")
@ -549,7 +721,6 @@ def test_gateway_uses_config_directory_for_cron_store(monkeypatch, tmp_path: Pat
monkeypatch.setattr("nanobot.config.loader.set_config_path", lambda _path: None)
monkeypatch.setattr("nanobot.config.loader.load_config", lambda _path=None: config)
monkeypatch.setattr("nanobot.config.paths.get_cron_dir", lambda: config_file.parent / "cron")
monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None)
monkeypatch.setattr("nanobot.cli.commands._make_provider", lambda _config: object())
monkeypatch.setattr("nanobot.bus.queue.MessageBus", lambda: object())
@ -565,7 +736,130 @@ def test_gateway_uses_config_directory_for_cron_store(monkeypatch, tmp_path: Pat
result = runner.invoke(app, ["gateway", "--config", str(config_file)])
assert isinstance(result.exception, _StopGatewayError)
assert seen["cron_store"] == config_file.parent / "cron" / "jobs.json"
assert seen["cron_store"] == config.workspace_path / "cron" / "jobs.json"
def test_gateway_workspace_override_does_not_migrate_legacy_cron(
monkeypatch, tmp_path: Path
) -> None:
config_file = tmp_path / "instance" / "config.json"
config_file.parent.mkdir(parents=True)
config_file.write_text("{}")
legacy_dir = tmp_path / "global" / "cron"
legacy_dir.mkdir(parents=True)
legacy_file = legacy_dir / "jobs.json"
legacy_file.write_text('{"jobs": []}')
override = tmp_path / "override-workspace"
config = Config()
seen: dict[str, Path] = {}
monkeypatch.setattr("nanobot.config.loader.set_config_path", lambda _path: None)
monkeypatch.setattr("nanobot.config.loader.load_config", lambda _path=None: config)
monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None)
monkeypatch.setattr("nanobot.cli.commands._make_provider", lambda _config: object())
monkeypatch.setattr("nanobot.bus.queue.MessageBus", lambda: object())
monkeypatch.setattr("nanobot.session.manager.SessionManager", lambda _workspace: object())
monkeypatch.setattr("nanobot.config.paths.get_cron_dir", lambda: legacy_dir)
class _StopCron:
def __init__(self, store_path: Path) -> None:
seen["cron_store"] = store_path
raise _StopGatewayError("stop")
monkeypatch.setattr("nanobot.cron.service.CronService", _StopCron)
result = runner.invoke(
app,
["gateway", "--config", str(config_file), "--workspace", str(override)],
)
assert isinstance(result.exception, _StopGatewayError)
assert seen["cron_store"] == override / "cron" / "jobs.json"
assert legacy_file.exists()
assert not (override / "cron" / "jobs.json").exists()
def test_gateway_custom_config_workspace_does_not_migrate_legacy_cron(
monkeypatch, tmp_path: Path
) -> None:
config_file = tmp_path / "instance" / "config.json"
config_file.parent.mkdir(parents=True)
config_file.write_text("{}")
legacy_dir = tmp_path / "global" / "cron"
legacy_dir.mkdir(parents=True)
legacy_file = legacy_dir / "jobs.json"
legacy_file.write_text('{"jobs": []}')
custom_workspace = tmp_path / "custom-workspace"
config = Config()
config.agents.defaults.workspace = str(custom_workspace)
seen: dict[str, Path] = {}
monkeypatch.setattr("nanobot.config.loader.set_config_path", lambda _path: None)
monkeypatch.setattr("nanobot.config.loader.load_config", lambda _path=None: config)
monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None)
monkeypatch.setattr("nanobot.cli.commands._make_provider", lambda _config: object())
monkeypatch.setattr("nanobot.bus.queue.MessageBus", lambda: object())
monkeypatch.setattr("nanobot.session.manager.SessionManager", lambda _workspace: object())
monkeypatch.setattr("nanobot.config.paths.get_cron_dir", lambda: legacy_dir)
class _StopCron:
def __init__(self, store_path: Path) -> None:
seen["cron_store"] = store_path
raise _StopGatewayError("stop")
monkeypatch.setattr("nanobot.cron.service.CronService", _StopCron)
result = runner.invoke(app, ["gateway", "--config", str(config_file)])
assert isinstance(result.exception, _StopGatewayError)
assert seen["cron_store"] == custom_workspace / "cron" / "jobs.json"
assert legacy_file.exists()
assert not (custom_workspace / "cron" / "jobs.json").exists()
def test_migrate_cron_store_moves_legacy_file(tmp_path: Path) -> None:
"""Legacy global jobs.json is moved into the workspace on first run."""
from nanobot.cli.commands import _migrate_cron_store
legacy_dir = tmp_path / "global" / "cron"
legacy_dir.mkdir(parents=True)
legacy_file = legacy_dir / "jobs.json"
legacy_file.write_text('{"jobs": []}')
config = Config()
config.agents.defaults.workspace = str(tmp_path / "workspace")
workspace_cron = config.workspace_path / "cron" / "jobs.json"
with patch("nanobot.config.paths.get_cron_dir", return_value=legacy_dir):
_migrate_cron_store(config)
assert workspace_cron.exists()
assert workspace_cron.read_text() == '{"jobs": []}'
assert not legacy_file.exists()
def test_migrate_cron_store_skips_when_workspace_file_exists(tmp_path: Path) -> None:
"""Migration does not overwrite an existing workspace cron store."""
from nanobot.cli.commands import _migrate_cron_store
legacy_dir = tmp_path / "global" / "cron"
legacy_dir.mkdir(parents=True)
(legacy_dir / "jobs.json").write_text('{"old": true}')
config = Config()
config.agents.defaults.workspace = str(tmp_path / "workspace")
workspace_cron = config.workspace_path / "cron" / "jobs.json"
workspace_cron.parent.mkdir(parents=True)
workspace_cron.write_text('{"new": true}')
with patch("nanobot.config.paths.get_cron_dir", return_value=legacy_dir):
_migrate_cron_store(config)
assert workspace_cron.read_text() == '{"new": true}'
def test_gateway_uses_configured_port_when_cli_flag_is_missing(monkeypatch, tmp_path: Path) -> None:
@ -610,3 +904,9 @@ def test_gateway_cli_port_overrides_configured_port(monkeypatch, tmp_path: Path)
assert isinstance(result.exception, _StopGatewayError)
assert "port 18792" in result.stdout
def test_channels_login_requires_channel_name() -> None:
result = runner.invoke(app, ["channels", "login"])
assert result.exit_code == 2

View File

@ -10,6 +10,7 @@ from nanobot.config.paths import (
get_media_dir,
get_runtime_subdir,
get_workspace_path,
is_default_workspace,
)
@ -40,3 +41,9 @@ def test_shared_and_legacy_paths_remain_global() -> None:
def test_workspace_path_is_explicitly_resolved() -> None:
assert get_workspace_path() == Path.home() / ".nanobot" / "workspace"
assert get_workspace_path("~/custom-workspace") == Path.home() / "custom-workspace"
def test_is_default_workspace_distinguishes_default_and_custom_paths() -> None:
assert is_default_workspace(None) is True
assert is_default_workspace(Path.home() / ".nanobot" / "workspace") is True
assert is_default_workspace("~/custom-workspace") is False

View File

@ -77,6 +77,11 @@ class TestReadFileTool:
assert "Error" in result
assert "not found" in result
@pytest.mark.asyncio
async def test_missing_path_returns_clear_error(self, tool):
result = await tool.execute()
assert result == "Error reading file: Unknown path"
@pytest.mark.asyncio
async def test_char_budget_trims(self, tool, tmp_path):
"""When the selected slice exceeds _MAX_CHARS the output is trimmed."""
@ -200,6 +205,13 @@ class TestEditFileTool:
assert "Error" in result
assert "not found" in result
@pytest.mark.asyncio
async def test_missing_new_text_returns_clear_error(self, tool, tmp_path):
f = tmp_path / "a.py"
f.write_text("hello", encoding="utf-8")
result = await tool.execute(path=str(f), old_text="hello")
assert result == "Error editing file: Unknown new_text"
# ---------------------------------------------------------------------------
# ListDirTool
@ -265,6 +277,11 @@ class TestListDirTool:
assert "Error" in result
assert "not found" in result
@pytest.mark.asyncio
async def test_missing_path_returns_clear_error(self, tool):
result = await tool.execute()
assert result == "Error listing directory: Unknown path"
# ---------------------------------------------------------------------------
# Workspace restriction + extra_allowed_dirs

View File

@ -12,11 +12,11 @@ from typing import Any, cast
import pytest
from pydantic import BaseModel, Field
from nanobot.cli import onboard_wizard
from nanobot.cli import onboard as onboard_wizard
# Import functions to test
from nanobot.cli.commands import _merge_missing_defaults
from nanobot.cli.onboard_wizard import (
from nanobot.cli.onboard import (
_BACK_PRESSED,
_configure_pydantic_model,
_format_value,
@ -352,7 +352,7 @@ class TestProviderChannelInfo:
"""Tests for provider and channel info retrieval."""
def test_get_provider_names_returns_dict(self):
from nanobot.cli.onboard_wizard import _get_provider_names
from nanobot.cli.onboard import _get_provider_names
names = _get_provider_names()
assert isinstance(names, dict)
@ -363,7 +363,7 @@ class TestProviderChannelInfo:
assert "github_copilot" not in names
def test_get_channel_names_returns_dict(self):
from nanobot.cli.onboard_wizard import _get_channel_names
from nanobot.cli.onboard import _get_channel_names
names = _get_channel_names()
assert isinstance(names, dict)
@ -371,7 +371,7 @@ class TestProviderChannelInfo:
assert len(names) >= 0
def test_get_provider_info_returns_valid_structure(self):
from nanobot.cli.onboard_wizard import _get_provider_info
from nanobot.cli.onboard import _get_provider_info
info = _get_provider_info()
assert isinstance(info, dict)

View File

@ -34,12 +34,15 @@ class TestRestartCommand:
@pytest.mark.asyncio
async def test_restart_sends_message_and_calls_execv(self):
from nanobot.command.builtin import cmd_restart
from nanobot.command.router import CommandContext
loop, bus = _make_loop()
msg = InboundMessage(channel="cli", sender_id="user", chat_id="direct", content="/restart")
ctx = CommandContext(msg=msg, session=None, key=msg.session_key, raw="/restart", loop=loop)
with patch("nanobot.agent.loop.os.execv") as mock_execv:
await loop._handle_restart(msg)
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
with patch("nanobot.command.builtin.os.execv") as mock_execv:
out = await cmd_restart(ctx)
assert "Restarting" in out.content
await asyncio.sleep(1.5)
@ -51,8 +54,8 @@ class TestRestartCommand:
loop, bus = _make_loop()
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/restart")
with patch.object(loop, "_handle_restart") as mock_handle:
mock_handle.return_value = None
with patch.object(loop, "_dispatch", new_callable=AsyncMock) as mock_dispatch, \
patch("nanobot.command.builtin.os.execv"):
await bus.publish_inbound(msg)
loop._running = True
@ -65,7 +68,9 @@ class TestRestartCommand:
except asyncio.CancelledError:
pass
mock_handle.assert_called_once()
mock_dispatch.assert_not_called()
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert "Restarting" in out.content
@pytest.mark.asyncio
async def test_status_intercepted_in_run_loop(self):
@ -73,10 +78,7 @@ class TestRestartCommand:
loop, bus = _make_loop()
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
with patch.object(loop, "_status_response") as mock_status:
mock_status.return_value = OutboundMessage(
channel="telegram", chat_id="c1", content="status ok"
)
with patch.object(loop, "_dispatch", new_callable=AsyncMock) as mock_dispatch:
await bus.publish_inbound(msg)
loop._running = True
@ -89,9 +91,9 @@ class TestRestartCommand:
except asyncio.CancelledError:
pass
mock_status.assert_called_once()
mock_dispatch.assert_not_called()
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert out.content == "status ok"
assert "nanobot" in out.content.lower() or "Model" in out.content
@pytest.mark.asyncio
async def test_run_propagates_external_cancellation(self):

View File

@ -64,6 +64,58 @@ def test_legitimate_tool_pairs_preserved_after_trim():
assert history[0]["role"] == "user"
def test_retain_recent_legal_suffix_keeps_recent_messages():
session = Session(key="test:trim")
for i in range(10):
session.messages.append({"role": "user", "content": f"msg{i}"})
session.retain_recent_legal_suffix(4)
assert len(session.messages) == 4
assert session.messages[0]["content"] == "msg6"
assert session.messages[-1]["content"] == "msg9"
def test_retain_recent_legal_suffix_adjusts_last_consolidated():
session = Session(key="test:trim-cons")
for i in range(10):
session.messages.append({"role": "user", "content": f"msg{i}"})
session.last_consolidated = 7
session.retain_recent_legal_suffix(4)
assert len(session.messages) == 4
assert session.last_consolidated == 1
def test_retain_recent_legal_suffix_zero_clears_session():
session = Session(key="test:trim-zero")
for i in range(10):
session.messages.append({"role": "user", "content": f"msg{i}"})
session.last_consolidated = 5
session.retain_recent_legal_suffix(0)
assert session.messages == []
assert session.last_consolidated == 0
def test_retain_recent_legal_suffix_keeps_legal_tool_boundary():
session = Session(key="test:trim-tools")
session.messages.append({"role": "user", "content": "old"})
session.messages.extend(_tool_turn("old", 0))
session.messages.append({"role": "user", "content": "keep"})
session.messages.extend(_tool_turn("keep", 0))
session.messages.append({"role": "assistant", "content": "done"})
session.retain_recent_legal_suffix(4)
history = session.get_history(max_messages=500)
_assert_no_orphans(history)
assert history[0]["role"] == "user"
assert history[0]["content"] == "keep"
# --- last_consolidated > 0 ---
def test_orphan_trim_with_last_consolidated():

View File

@ -31,16 +31,20 @@ class TestHandleStop:
@pytest.mark.asyncio
async def test_stop_no_active_task(self):
from nanobot.bus.events import InboundMessage
from nanobot.command.builtin import cmd_stop
from nanobot.command.router import CommandContext
loop, bus = _make_loop()
msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop")
await loop._handle_stop(msg)
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
ctx = CommandContext(msg=msg, session=None, key=msg.session_key, raw="/stop", loop=loop)
out = await cmd_stop(ctx)
assert "No active task" in out.content
@pytest.mark.asyncio
async def test_stop_cancels_active_task(self):
from nanobot.bus.events import InboundMessage
from nanobot.command.builtin import cmd_stop
from nanobot.command.router import CommandContext
loop, bus = _make_loop()
cancelled = asyncio.Event()
@ -57,15 +61,17 @@ class TestHandleStop:
loop._active_tasks["test:c1"] = [task]
msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop")
await loop._handle_stop(msg)
ctx = CommandContext(msg=msg, session=None, key=msg.session_key, raw="/stop", loop=loop)
out = await cmd_stop(ctx)
assert cancelled.is_set()
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert "stopped" in out.content.lower()
@pytest.mark.asyncio
async def test_stop_cancels_multiple_tasks(self):
from nanobot.bus.events import InboundMessage
from nanobot.command.builtin import cmd_stop
from nanobot.command.router import CommandContext
loop, bus = _make_loop()
events = [asyncio.Event(), asyncio.Event()]
@ -82,10 +88,10 @@ class TestHandleStop:
loop._active_tasks["test:c1"] = tasks
msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop")
await loop._handle_stop(msg)
ctx = CommandContext(msg=msg, session=None, key=msg.session_key, raw="/stop", loop=loop)
out = await cmd_stop(ctx)
assert all(e.is_set() for e in events)
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert "2 task" in out.content

View File

@ -0,0 +1,127 @@
import asyncio
from unittest.mock import AsyncMock
import pytest
from nanobot.bus.queue import MessageBus
from nanobot.channels.weixin import (
ITEM_IMAGE,
ITEM_TEXT,
MESSAGE_TYPE_BOT,
WeixinChannel,
WeixinConfig,
)
def _make_channel() -> tuple[WeixinChannel, MessageBus]:
bus = MessageBus()
channel = WeixinChannel(
WeixinConfig(enabled=True, allow_from=["*"]),
bus,
)
return channel, bus
@pytest.mark.asyncio
async def test_process_message_deduplicates_inbound_ids() -> None:
channel, bus = _make_channel()
msg = {
"message_type": 1,
"message_id": "m1",
"from_user_id": "wx-user",
"context_token": "ctx-1",
"item_list": [
{"type": ITEM_TEXT, "text_item": {"text": "hello"}},
],
}
await channel._process_message(msg)
first = await asyncio.wait_for(bus.consume_inbound(), timeout=1.0)
await channel._process_message(msg)
assert first.sender_id == "wx-user"
assert first.chat_id == "wx-user"
assert first.content == "hello"
assert bus.inbound_size == 0
@pytest.mark.asyncio
async def test_process_message_caches_context_token_and_send_uses_it() -> None:
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._send_text = AsyncMock()
await channel._process_message(
{
"message_type": 1,
"message_id": "m2",
"from_user_id": "wx-user",
"context_token": "ctx-2",
"item_list": [
{"type": ITEM_TEXT, "text_item": {"text": "ping"}},
],
}
)
await channel.send(
type("Msg", (), {"chat_id": "wx-user", "content": "pong", "media": [], "metadata": {}})()
)
channel._send_text.assert_awaited_once_with("wx-user", "pong", "ctx-2")
@pytest.mark.asyncio
async def test_process_message_extracts_media_and_preserves_paths() -> None:
channel, bus = _make_channel()
channel._download_media_item = AsyncMock(return_value="/tmp/test.jpg")
await channel._process_message(
{
"message_type": 1,
"message_id": "m3",
"from_user_id": "wx-user",
"context_token": "ctx-3",
"item_list": [
{"type": ITEM_IMAGE, "image_item": {"media": {"encrypt_query_param": "x"}}},
],
}
)
inbound = await asyncio.wait_for(bus.consume_inbound(), timeout=1.0)
assert "[image]" in inbound.content
assert "/tmp/test.jpg" in inbound.content
assert inbound.media == ["/tmp/test.jpg"]
@pytest.mark.asyncio
async def test_send_without_context_token_does_not_send_text() -> None:
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._send_text = AsyncMock()
await channel.send(
type("Msg", (), {"chat_id": "unknown-user", "content": "pong", "media": [], "metadata": {}})()
)
channel._send_text.assert_not_awaited()
@pytest.mark.asyncio
async def test_process_message_skips_bot_messages() -> None:
channel, bus = _make_channel()
await channel._process_message(
{
"message_type": MESSAGE_TYPE_BOT,
"message_id": "m4",
"from_user_id": "wx-user",
"item_list": [
{"type": ITEM_TEXT, "text_item": {"text": "hello"}},
],
}
)
assert bus.inbound_size == 0

View File

@ -0,0 +1,157 @@
"""Tests for WhatsApp channel outbound media support."""
import json
from unittest.mock import AsyncMock, MagicMock
import pytest
from nanobot.bus.events import OutboundMessage
from nanobot.channels.whatsapp import WhatsAppChannel
def _make_channel() -> WhatsAppChannel:
bus = MagicMock()
ch = WhatsAppChannel({"enabled": True}, bus)
ch._ws = AsyncMock()
ch._connected = True
return ch
@pytest.mark.asyncio
async def test_send_text_only():
ch = _make_channel()
msg = OutboundMessage(channel="whatsapp", chat_id="123@s.whatsapp.net", content="hello")
await ch.send(msg)
ch._ws.send.assert_called_once()
payload = json.loads(ch._ws.send.call_args[0][0])
assert payload["type"] == "send"
assert payload["text"] == "hello"
@pytest.mark.asyncio
async def test_send_media_dispatches_send_media_command():
ch = _make_channel()
msg = OutboundMessage(
channel="whatsapp",
chat_id="123@s.whatsapp.net",
content="check this out",
media=["/tmp/photo.jpg"],
)
await ch.send(msg)
assert ch._ws.send.call_count == 2
text_payload = json.loads(ch._ws.send.call_args_list[0][0][0])
media_payload = json.loads(ch._ws.send.call_args_list[1][0][0])
assert text_payload["type"] == "send"
assert text_payload["text"] == "check this out"
assert media_payload["type"] == "send_media"
assert media_payload["filePath"] == "/tmp/photo.jpg"
assert media_payload["mimetype"] == "image/jpeg"
assert media_payload["fileName"] == "photo.jpg"
@pytest.mark.asyncio
async def test_send_media_only_no_text():
ch = _make_channel()
msg = OutboundMessage(
channel="whatsapp",
chat_id="123@s.whatsapp.net",
content="",
media=["/tmp/doc.pdf"],
)
await ch.send(msg)
ch._ws.send.assert_called_once()
payload = json.loads(ch._ws.send.call_args[0][0])
assert payload["type"] == "send_media"
assert payload["mimetype"] == "application/pdf"
@pytest.mark.asyncio
async def test_send_multiple_media():
ch = _make_channel()
msg = OutboundMessage(
channel="whatsapp",
chat_id="123@s.whatsapp.net",
content="",
media=["/tmp/a.png", "/tmp/b.mp4"],
)
await ch.send(msg)
assert ch._ws.send.call_count == 2
p1 = json.loads(ch._ws.send.call_args_list[0][0][0])
p2 = json.loads(ch._ws.send.call_args_list[1][0][0])
assert p1["mimetype"] == "image/png"
assert p2["mimetype"] == "video/mp4"
@pytest.mark.asyncio
async def test_send_when_disconnected_is_noop():
ch = _make_channel()
ch._connected = False
msg = OutboundMessage(
channel="whatsapp",
chat_id="123@s.whatsapp.net",
content="hello",
media=["/tmp/x.jpg"],
)
await ch.send(msg)
ch._ws.send.assert_not_called()
@pytest.mark.asyncio
async def test_group_policy_mention_skips_unmentioned_group_message():
ch = WhatsAppChannel({"enabled": True, "groupPolicy": "mention"}, MagicMock())
ch._handle_message = AsyncMock()
await ch._handle_bridge_message(
json.dumps(
{
"type": "message",
"id": "m1",
"sender": "12345@g.us",
"pn": "user@s.whatsapp.net",
"content": "hello group",
"timestamp": 1,
"isGroup": True,
"wasMentioned": False,
}
)
)
ch._handle_message.assert_not_called()
@pytest.mark.asyncio
async def test_group_policy_mention_accepts_mentioned_group_message():
ch = WhatsAppChannel({"enabled": True, "groupPolicy": "mention"}, MagicMock())
ch._handle_message = AsyncMock()
await ch._handle_bridge_message(
json.dumps(
{
"type": "message",
"id": "m1",
"sender": "12345@g.us",
"pn": "user@s.whatsapp.net",
"content": "hello @bot",
"timestamp": 1,
"isGroup": True,
"wasMentioned": True,
}
)
)
ch._handle_message.assert_awaited_once()
kwargs = ch._handle_message.await_args.kwargs
assert kwargs["chat_id"] == "12345@g.us"
assert kwargs["sender_id"] == "user"