mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-04-21 10:29:54 +00:00
Make channel delivery failures raise consistently so retry policy lives in ChannelManager rather than being split across individual channels. Tighten Telegram stream finalization, clarify sendMaxRetries semantics, and align the docs with the behavior the system actually guarantees.
302 lines
10 KiB
Python
302 lines
10 KiB
Python
"""WhatsApp channel implementation using Node.js bridge."""
|
|
|
|
import asyncio
|
|
import json
|
|
import mimetypes
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
from collections import OrderedDict
|
|
from pathlib import Path
|
|
from typing import Any, Literal
|
|
|
|
from loguru import logger
|
|
from pydantic import Field
|
|
|
|
from nanobot.bus.events import OutboundMessage
|
|
from nanobot.bus.queue import MessageBus
|
|
from nanobot.channels.base import BaseChannel
|
|
from nanobot.config.schema import Base
|
|
|
|
|
|
class WhatsAppConfig(Base):
|
|
"""WhatsApp channel configuration."""
|
|
|
|
enabled: bool = False
|
|
bridge_url: str = "ws://localhost:3001"
|
|
bridge_token: str = ""
|
|
allow_from: list[str] = Field(default_factory=list)
|
|
group_policy: Literal["open", "mention"] = "open" # "open" responds to all, "mention" only when @mentioned
|
|
|
|
|
|
class WhatsAppChannel(BaseChannel):
|
|
"""
|
|
WhatsApp channel that connects to a Node.js bridge.
|
|
|
|
The bridge uses @whiskeysockets/baileys to handle the WhatsApp Web protocol.
|
|
Communication between Python and Node.js is via WebSocket.
|
|
"""
|
|
|
|
name = "whatsapp"
|
|
display_name = "WhatsApp"
|
|
|
|
@classmethod
|
|
def default_config(cls) -> dict[str, Any]:
|
|
return WhatsAppConfig().model_dump(by_alias=True)
|
|
|
|
def __init__(self, config: Any, bus: MessageBus):
|
|
if isinstance(config, dict):
|
|
config = WhatsAppConfig.model_validate(config)
|
|
super().__init__(config, bus)
|
|
self._ws = None
|
|
self._connected = False
|
|
self._processed_message_ids: OrderedDict[str, None] = OrderedDict()
|
|
|
|
async def login(self, force: bool = False) -> bool:
|
|
"""
|
|
Set up and run the WhatsApp bridge for QR code login.
|
|
|
|
This spawns the Node.js bridge process which handles the WhatsApp
|
|
authentication flow. The process blocks until the user scans the QR code
|
|
or interrupts with Ctrl+C.
|
|
"""
|
|
from nanobot.config.paths import get_runtime_subdir
|
|
|
|
try:
|
|
bridge_dir = _ensure_bridge_setup()
|
|
except RuntimeError as e:
|
|
logger.error("{}", e)
|
|
return False
|
|
|
|
env = {**os.environ}
|
|
if self.config.bridge_token:
|
|
env["BRIDGE_TOKEN"] = self.config.bridge_token
|
|
env["AUTH_DIR"] = str(get_runtime_subdir("whatsapp-auth"))
|
|
|
|
logger.info("Starting WhatsApp bridge for QR login...")
|
|
try:
|
|
subprocess.run(
|
|
[shutil.which("npm"), "start"], cwd=bridge_dir, check=True, env=env
|
|
)
|
|
except subprocess.CalledProcessError:
|
|
return False
|
|
|
|
return True
|
|
|
|
async def start(self) -> None:
|
|
"""Start the WhatsApp channel by connecting to the bridge."""
|
|
import websockets
|
|
|
|
bridge_url = self.config.bridge_url
|
|
|
|
logger.info("Connecting to WhatsApp bridge at {}...", bridge_url)
|
|
|
|
self._running = True
|
|
|
|
while self._running:
|
|
try:
|
|
async with websockets.connect(bridge_url) as ws:
|
|
self._ws = ws
|
|
# Send auth token if configured
|
|
if self.config.bridge_token:
|
|
await ws.send(
|
|
json.dumps({"type": "auth", "token": self.config.bridge_token})
|
|
)
|
|
self._connected = True
|
|
logger.info("Connected to WhatsApp bridge")
|
|
|
|
# Listen for messages
|
|
async for message in ws:
|
|
try:
|
|
await self._handle_bridge_message(message)
|
|
except Exception as e:
|
|
logger.error("Error handling bridge message: {}", e)
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
self._connected = False
|
|
self._ws = None
|
|
logger.warning("WhatsApp bridge connection error: {}", e)
|
|
|
|
if self._running:
|
|
logger.info("Reconnecting in 5 seconds...")
|
|
await asyncio.sleep(5)
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the WhatsApp channel."""
|
|
self._running = False
|
|
self._connected = False
|
|
|
|
if self._ws:
|
|
await self._ws.close()
|
|
self._ws = None
|
|
|
|
async def send(self, msg: OutboundMessage) -> None:
|
|
"""Send a message through WhatsApp."""
|
|
if not self._ws or not self._connected:
|
|
logger.warning("WhatsApp bridge not connected")
|
|
return
|
|
|
|
chat_id = msg.chat_id
|
|
|
|
if msg.content:
|
|
try:
|
|
payload = {"type": "send", "to": chat_id, "text": msg.content}
|
|
await self._ws.send(json.dumps(payload, ensure_ascii=False))
|
|
except Exception as e:
|
|
logger.error("Error sending WhatsApp message: {}", e)
|
|
raise
|
|
|
|
for media_path in msg.media or []:
|
|
try:
|
|
mime, _ = mimetypes.guess_type(media_path)
|
|
payload = {
|
|
"type": "send_media",
|
|
"to": chat_id,
|
|
"filePath": media_path,
|
|
"mimetype": mime or "application/octet-stream",
|
|
"fileName": media_path.rsplit("/", 1)[-1],
|
|
}
|
|
await self._ws.send(json.dumps(payload, ensure_ascii=False))
|
|
except Exception as e:
|
|
logger.error("Error sending WhatsApp media {}: {}", media_path, e)
|
|
raise
|
|
|
|
async def _handle_bridge_message(self, raw: str) -> None:
|
|
"""Handle a message from the bridge."""
|
|
try:
|
|
data = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
logger.warning("Invalid JSON from bridge: {}", raw[:100])
|
|
return
|
|
|
|
msg_type = data.get("type")
|
|
|
|
if msg_type == "message":
|
|
# Incoming message from WhatsApp
|
|
# Deprecated by whatsapp: old phone number style typically: <phone>@s.whatspp.net
|
|
pn = data.get("pn", "")
|
|
# New LID sytle typically:
|
|
sender = data.get("sender", "")
|
|
content = data.get("content", "")
|
|
message_id = data.get("id", "")
|
|
|
|
if message_id:
|
|
if message_id in self._processed_message_ids:
|
|
return
|
|
self._processed_message_ids[message_id] = None
|
|
while len(self._processed_message_ids) > 1000:
|
|
self._processed_message_ids.popitem(last=False)
|
|
|
|
# Extract just the phone number or lid as chat_id
|
|
is_group = data.get("isGroup", False)
|
|
was_mentioned = data.get("wasMentioned", False)
|
|
|
|
if is_group and getattr(self.config, "group_policy", "open") == "mention":
|
|
if not was_mentioned:
|
|
return
|
|
|
|
user_id = pn if pn else sender
|
|
sender_id = user_id.split("@")[0] if "@" in user_id else user_id
|
|
logger.info("Sender {}", sender)
|
|
|
|
# Handle voice transcription if it's a voice message
|
|
if content == "[Voice Message]":
|
|
logger.info(
|
|
"Voice message received from {}, but direct download from bridge is not yet supported.",
|
|
sender_id,
|
|
)
|
|
content = "[Voice Message: Transcription not available for WhatsApp yet]"
|
|
|
|
# Extract media paths (images/documents/videos downloaded by the bridge)
|
|
media_paths = data.get("media") or []
|
|
|
|
# Build content tags matching Telegram's pattern: [image: /path] or [file: /path]
|
|
if media_paths:
|
|
for p in media_paths:
|
|
mime, _ = mimetypes.guess_type(p)
|
|
media_type = "image" if mime and mime.startswith("image/") else "file"
|
|
media_tag = f"[{media_type}: {p}]"
|
|
content = f"{content}\n{media_tag}" if content else media_tag
|
|
|
|
await self._handle_message(
|
|
sender_id=sender_id,
|
|
chat_id=sender, # Use full LID for replies
|
|
content=content,
|
|
media=media_paths,
|
|
metadata={
|
|
"message_id": message_id,
|
|
"timestamp": data.get("timestamp"),
|
|
"is_group": data.get("isGroup", False),
|
|
},
|
|
)
|
|
|
|
elif msg_type == "status":
|
|
# Connection status update
|
|
status = data.get("status")
|
|
logger.info("WhatsApp status: {}", status)
|
|
|
|
if status == "connected":
|
|
self._connected = True
|
|
elif status == "disconnected":
|
|
self._connected = False
|
|
|
|
elif msg_type == "qr":
|
|
# QR code for authentication
|
|
logger.info("Scan QR code in the bridge terminal to connect WhatsApp")
|
|
|
|
elif msg_type == "error":
|
|
logger.error("WhatsApp bridge error: {}", data.get("error"))
|
|
|
|
|
|
def _ensure_bridge_setup() -> Path:
|
|
"""
|
|
Ensure the WhatsApp bridge is set up and built.
|
|
|
|
Returns the bridge directory. Raises RuntimeError if npm is not found
|
|
or bridge cannot be built.
|
|
"""
|
|
from nanobot.config.paths import get_bridge_install_dir
|
|
|
|
user_bridge = get_bridge_install_dir()
|
|
|
|
if (user_bridge / "dist" / "index.js").exists():
|
|
return user_bridge
|
|
|
|
npm_path = shutil.which("npm")
|
|
if not npm_path:
|
|
raise RuntimeError("npm not found. Please install Node.js >= 18.")
|
|
|
|
# Find source bridge
|
|
current_file = Path(__file__)
|
|
pkg_bridge = current_file.parent.parent / "bridge"
|
|
src_bridge = current_file.parent.parent.parent / "bridge"
|
|
|
|
source = None
|
|
if (pkg_bridge / "package.json").exists():
|
|
source = pkg_bridge
|
|
elif (src_bridge / "package.json").exists():
|
|
source = src_bridge
|
|
|
|
if not source:
|
|
raise RuntimeError(
|
|
"WhatsApp bridge source not found. "
|
|
"Try reinstalling: pip install --force-reinstall nanobot"
|
|
)
|
|
|
|
logger.info("Setting up WhatsApp bridge...")
|
|
user_bridge.parent.mkdir(parents=True, exist_ok=True)
|
|
if user_bridge.exists():
|
|
shutil.rmtree(user_bridge)
|
|
shutil.copytree(source, user_bridge, ignore=shutil.ignore_patterns("node_modules", "dist"))
|
|
|
|
logger.info(" Installing dependencies...")
|
|
subprocess.run([npm_path, "install"], cwd=user_bridge, check=True, capture_output=True)
|
|
|
|
logger.info(" Building...")
|
|
subprocess.run([npm_path, "run", "build"], cwd=user_bridge, check=True, capture_output=True)
|
|
|
|
logger.info("Bridge ready")
|
|
return user_bridge
|