mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-04-23 11:25:55 +00:00
Make channel delivery failures raise consistently so retry policy lives in ChannelManager rather than being split across individual channels. Tighten Telegram stream finalization, clarify sendMaxRetries semantics, and align the docs with the behavior the system actually guarantees.
372 lines
13 KiB
Python
372 lines
13 KiB
Python
"""WeCom (Enterprise WeChat) channel implementation using wecom_aibot_sdk."""
|
|
|
|
import asyncio
|
|
import importlib.util
|
|
import os
|
|
from collections import OrderedDict
|
|
from typing import Any
|
|
|
|
from loguru import logger
|
|
|
|
from nanobot.bus.events import OutboundMessage
|
|
from nanobot.bus.queue import MessageBus
|
|
from nanobot.channels.base import BaseChannel
|
|
from nanobot.config.paths import get_media_dir
|
|
from nanobot.config.schema import Base
|
|
from pydantic import Field
|
|
|
|
WECOM_AVAILABLE = importlib.util.find_spec("wecom_aibot_sdk") is not None
|
|
|
|
class WecomConfig(Base):
|
|
"""WeCom (Enterprise WeChat) AI Bot channel configuration."""
|
|
|
|
enabled: bool = False
|
|
bot_id: str = ""
|
|
secret: str = ""
|
|
allow_from: list[str] = Field(default_factory=list)
|
|
welcome_message: str = ""
|
|
|
|
|
|
# Message type display mapping
|
|
MSG_TYPE_MAP = {
|
|
"image": "[image]",
|
|
"voice": "[voice]",
|
|
"file": "[file]",
|
|
"mixed": "[mixed content]",
|
|
}
|
|
|
|
|
|
class WecomChannel(BaseChannel):
|
|
"""
|
|
WeCom (Enterprise WeChat) channel using WebSocket long connection.
|
|
|
|
Uses WebSocket to receive events - no public IP or webhook required.
|
|
|
|
Requires:
|
|
- Bot ID and Secret from WeCom AI Bot platform
|
|
"""
|
|
|
|
name = "wecom"
|
|
display_name = "WeCom"
|
|
|
|
@classmethod
|
|
def default_config(cls) -> dict[str, Any]:
|
|
return WecomConfig().model_dump(by_alias=True)
|
|
|
|
def __init__(self, config: Any, bus: MessageBus):
|
|
if isinstance(config, dict):
|
|
config = WecomConfig.model_validate(config)
|
|
super().__init__(config, bus)
|
|
self.config: WecomConfig = config
|
|
self._client: Any = None
|
|
self._processed_message_ids: OrderedDict[str, None] = OrderedDict()
|
|
self._loop: asyncio.AbstractEventLoop | None = None
|
|
self._generate_req_id = None
|
|
# Store frame headers for each chat to enable replies
|
|
self._chat_frames: dict[str, Any] = {}
|
|
|
|
async def start(self) -> None:
|
|
"""Start the WeCom bot with WebSocket long connection."""
|
|
if not WECOM_AVAILABLE:
|
|
logger.error("WeCom SDK not installed. Run: pip install nanobot-ai[wecom]")
|
|
return
|
|
|
|
if not self.config.bot_id or not self.config.secret:
|
|
logger.error("WeCom bot_id and secret not configured")
|
|
return
|
|
|
|
from wecom_aibot_sdk import WSClient, generate_req_id
|
|
|
|
self._running = True
|
|
self._loop = asyncio.get_running_loop()
|
|
self._generate_req_id = generate_req_id
|
|
|
|
# Create WebSocket client
|
|
self._client = WSClient({
|
|
"bot_id": self.config.bot_id,
|
|
"secret": self.config.secret,
|
|
"reconnect_interval": 1000,
|
|
"max_reconnect_attempts": -1, # Infinite reconnect
|
|
"heartbeat_interval": 30000,
|
|
})
|
|
|
|
# Register event handlers
|
|
self._client.on("connected", self._on_connected)
|
|
self._client.on("authenticated", self._on_authenticated)
|
|
self._client.on("disconnected", self._on_disconnected)
|
|
self._client.on("error", self._on_error)
|
|
self._client.on("message.text", self._on_text_message)
|
|
self._client.on("message.image", self._on_image_message)
|
|
self._client.on("message.voice", self._on_voice_message)
|
|
self._client.on("message.file", self._on_file_message)
|
|
self._client.on("message.mixed", self._on_mixed_message)
|
|
self._client.on("event.enter_chat", self._on_enter_chat)
|
|
|
|
logger.info("WeCom bot starting with WebSocket long connection")
|
|
logger.info("No public IP required - using WebSocket to receive events")
|
|
|
|
# Connect
|
|
await self._client.connect_async()
|
|
|
|
# Keep running until stopped
|
|
while self._running:
|
|
await asyncio.sleep(1)
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the WeCom bot."""
|
|
self._running = False
|
|
if self._client:
|
|
await self._client.disconnect()
|
|
logger.info("WeCom bot stopped")
|
|
|
|
async def _on_connected(self, frame: Any) -> None:
|
|
"""Handle WebSocket connected event."""
|
|
logger.info("WeCom WebSocket connected")
|
|
|
|
async def _on_authenticated(self, frame: Any) -> None:
|
|
"""Handle authentication success event."""
|
|
logger.info("WeCom authenticated successfully")
|
|
|
|
async def _on_disconnected(self, frame: Any) -> None:
|
|
"""Handle WebSocket disconnected event."""
|
|
reason = frame.body if hasattr(frame, 'body') else str(frame)
|
|
logger.warning("WeCom WebSocket disconnected: {}", reason)
|
|
|
|
async def _on_error(self, frame: Any) -> None:
|
|
"""Handle error event."""
|
|
logger.error("WeCom error: {}", frame)
|
|
|
|
async def _on_text_message(self, frame: Any) -> None:
|
|
"""Handle text message."""
|
|
await self._process_message(frame, "text")
|
|
|
|
async def _on_image_message(self, frame: Any) -> None:
|
|
"""Handle image message."""
|
|
await self._process_message(frame, "image")
|
|
|
|
async def _on_voice_message(self, frame: Any) -> None:
|
|
"""Handle voice message."""
|
|
await self._process_message(frame, "voice")
|
|
|
|
async def _on_file_message(self, frame: Any) -> None:
|
|
"""Handle file message."""
|
|
await self._process_message(frame, "file")
|
|
|
|
async def _on_mixed_message(self, frame: Any) -> None:
|
|
"""Handle mixed content message."""
|
|
await self._process_message(frame, "mixed")
|
|
|
|
async def _on_enter_chat(self, frame: Any) -> None:
|
|
"""Handle enter_chat event (user opens chat with bot)."""
|
|
try:
|
|
# Extract body from WsFrame dataclass or dict
|
|
if hasattr(frame, 'body'):
|
|
body = frame.body or {}
|
|
elif isinstance(frame, dict):
|
|
body = frame.get("body", frame)
|
|
else:
|
|
body = {}
|
|
|
|
chat_id = body.get("chatid", "") if isinstance(body, dict) else ""
|
|
|
|
if chat_id and self.config.welcome_message:
|
|
await self._client.reply_welcome(frame, {
|
|
"msgtype": "text",
|
|
"text": {"content": self.config.welcome_message},
|
|
})
|
|
except Exception as e:
|
|
logger.error("Error handling enter_chat: {}", e)
|
|
|
|
async def _process_message(self, frame: Any, msg_type: str) -> None:
|
|
"""Process incoming message and forward to bus."""
|
|
try:
|
|
# Extract body from WsFrame dataclass or dict
|
|
if hasattr(frame, 'body'):
|
|
body = frame.body or {}
|
|
elif isinstance(frame, dict):
|
|
body = frame.get("body", frame)
|
|
else:
|
|
body = {}
|
|
|
|
# Ensure body is a dict
|
|
if not isinstance(body, dict):
|
|
logger.warning("Invalid body type: {}", type(body))
|
|
return
|
|
|
|
# Extract message info
|
|
msg_id = body.get("msgid", "")
|
|
if not msg_id:
|
|
msg_id = f"{body.get('chatid', '')}_{body.get('sendertime', '')}"
|
|
|
|
# Deduplication check
|
|
if msg_id in self._processed_message_ids:
|
|
return
|
|
self._processed_message_ids[msg_id] = None
|
|
|
|
# Trim cache
|
|
while len(self._processed_message_ids) > 1000:
|
|
self._processed_message_ids.popitem(last=False)
|
|
|
|
# Extract sender info from "from" field (SDK format)
|
|
from_info = body.get("from", {})
|
|
sender_id = from_info.get("userid", "unknown") if isinstance(from_info, dict) else "unknown"
|
|
|
|
# For single chat, chatid is the sender's userid
|
|
# For group chat, chatid is provided in body
|
|
chat_type = body.get("chattype", "single")
|
|
chat_id = body.get("chatid", sender_id)
|
|
|
|
content_parts = []
|
|
|
|
if msg_type == "text":
|
|
text = body.get("text", {}).get("content", "")
|
|
if text:
|
|
content_parts.append(text)
|
|
|
|
elif msg_type == "image":
|
|
image_info = body.get("image", {})
|
|
file_url = image_info.get("url", "")
|
|
aes_key = image_info.get("aeskey", "")
|
|
|
|
if file_url and aes_key:
|
|
file_path = await self._download_and_save_media(file_url, aes_key, "image")
|
|
if file_path:
|
|
filename = os.path.basename(file_path)
|
|
content_parts.append(f"[image: {filename}]\n[Image: source: {file_path}]")
|
|
else:
|
|
content_parts.append("[image: download failed]")
|
|
else:
|
|
content_parts.append("[image: download failed]")
|
|
|
|
elif msg_type == "voice":
|
|
voice_info = body.get("voice", {})
|
|
# Voice message already contains transcribed content from WeCom
|
|
voice_content = voice_info.get("content", "")
|
|
if voice_content:
|
|
content_parts.append(f"[voice] {voice_content}")
|
|
else:
|
|
content_parts.append("[voice]")
|
|
|
|
elif msg_type == "file":
|
|
file_info = body.get("file", {})
|
|
file_url = file_info.get("url", "")
|
|
aes_key = file_info.get("aeskey", "")
|
|
file_name = file_info.get("name", "unknown")
|
|
|
|
if file_url and aes_key:
|
|
file_path = await self._download_and_save_media(file_url, aes_key, "file", file_name)
|
|
if file_path:
|
|
content_parts.append(f"[file: {file_name}]\n[File: source: {file_path}]")
|
|
else:
|
|
content_parts.append(f"[file: {file_name}: download failed]")
|
|
else:
|
|
content_parts.append(f"[file: {file_name}: download failed]")
|
|
|
|
elif msg_type == "mixed":
|
|
# Mixed content contains multiple message items
|
|
msg_items = body.get("mixed", {}).get("item", [])
|
|
for item in msg_items:
|
|
item_type = item.get("type", "")
|
|
if item_type == "text":
|
|
text = item.get("text", {}).get("content", "")
|
|
if text:
|
|
content_parts.append(text)
|
|
else:
|
|
content_parts.append(MSG_TYPE_MAP.get(item_type, f"[{item_type}]"))
|
|
|
|
else:
|
|
content_parts.append(MSG_TYPE_MAP.get(msg_type, f"[{msg_type}]"))
|
|
|
|
content = "\n".join(content_parts) if content_parts else ""
|
|
|
|
if not content:
|
|
return
|
|
|
|
# Store frame for this chat to enable replies
|
|
self._chat_frames[chat_id] = frame
|
|
|
|
# Forward to message bus
|
|
# Note: media paths are included in content for broader model compatibility
|
|
await self._handle_message(
|
|
sender_id=sender_id,
|
|
chat_id=chat_id,
|
|
content=content,
|
|
media=None,
|
|
metadata={
|
|
"message_id": msg_id,
|
|
"msg_type": msg_type,
|
|
"chat_type": chat_type,
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error("Error processing WeCom message: {}", e)
|
|
|
|
async def _download_and_save_media(
|
|
self,
|
|
file_url: str,
|
|
aes_key: str,
|
|
media_type: str,
|
|
filename: str | None = None,
|
|
) -> str | None:
|
|
"""
|
|
Download and decrypt media from WeCom.
|
|
|
|
Returns:
|
|
file_path or None if download failed
|
|
"""
|
|
try:
|
|
data, fname = await self._client.download_file(file_url, aes_key)
|
|
|
|
if not data:
|
|
logger.warning("Failed to download media from WeCom")
|
|
return None
|
|
|
|
media_dir = get_media_dir("wecom")
|
|
if not filename:
|
|
filename = fname or f"{media_type}_{hash(file_url) % 100000}"
|
|
filename = os.path.basename(filename)
|
|
|
|
file_path = media_dir / filename
|
|
file_path.write_bytes(data)
|
|
logger.debug("Downloaded {} to {}", media_type, file_path)
|
|
return str(file_path)
|
|
|
|
except Exception as e:
|
|
logger.error("Error downloading media: {}", e)
|
|
return None
|
|
|
|
async def send(self, msg: OutboundMessage) -> None:
|
|
"""Send a message through WeCom."""
|
|
if not self._client:
|
|
logger.warning("WeCom client not initialized")
|
|
return
|
|
|
|
try:
|
|
content = msg.content.strip()
|
|
if not content:
|
|
return
|
|
|
|
# Get the stored frame for this chat
|
|
frame = self._chat_frames.get(msg.chat_id)
|
|
if not frame:
|
|
logger.warning("No frame found for chat {}, cannot reply", msg.chat_id)
|
|
return
|
|
|
|
# Use streaming reply for better UX
|
|
stream_id = self._generate_req_id("stream")
|
|
|
|
# Send as streaming message with finish=True
|
|
await self._client.reply_stream(
|
|
frame,
|
|
stream_id,
|
|
content,
|
|
finish=True,
|
|
)
|
|
|
|
logger.debug("WeCom message sent to {}", msg.chat_id)
|
|
|
|
except Exception as e:
|
|
logger.error("Error sending WeCom message: {}", e)
|
|
raise
|