diff --git a/docs/chat-apps.md b/docs/chat-apps.md index fe14a78cb..2e3bbd750 100644 --- a/docs/chat-apps.md +++ b/docs/chat-apps.md @@ -14,6 +14,7 @@ Connect nanobot to your favorite chat platform. Want to build your own? See the | **Matrix** | Homeserver URL + Access token | | **Email** | IMAP/SMTP credentials | | **QQ** | App ID + App Secret | +| **Napcat (QQ)** | Napcat Forward WebSocket URL + access token | | **Wecom** | Bot ID + Bot Secret | | **Microsoft Teams** | App ID + App Password + public HTTPS endpoint | | **Mochat** | Claw token (auto-setup available) | @@ -424,6 +425,50 @@ Now send a message to the bot from QQ — it should respond! +
+Napcat (QQ via OneBot v11 支持群聊等功能) + +Connects to a [Napcat](https://github.com/NapNeko/NapCatQQ) instance over its **forward WebSocket** (OneBot v11). Use this when you have your own QQ account running through Napcat and want full private + group chat support. + +**1. Set up Napcat** + +- Install and log into Napcat, then enable a **Forward WebSocket** server. Recommends: [official napcat docker tutorial](https://github.com/NapNeko/NapCat-Docker) +- In the webui, follow "网络配置" -> "新建" -> "Websocket 服务器" to create a forward websocket server. By default, the URL is `ws://127.0.0.1:3001` +- Copy the forward websocket server's token +- (Optional) In the webui, follow "系统配置" -> "登陆配置" -> "快速登录QQ" to automatically login after restarts + +**2. Configure** + +```json +{ + "channels": { + "napcat": { + "enabled": true, + "wsUrl": "ws://127.0.0.1:3001", + "accessToken": "YOUR_WEBSOCKET_TOKEN", + "allowFrom": ["*"], + "groupPolicy": "mention", + "groupPolicyOverrides": { + "123456789": "open", + "987654321": 0.2 + }, + "welcomeNewMembers": true + } + } +} +``` + +| Option | What it does | +|--------|--------------| +| `wsUrl` | Napcat forward-WebSocket endpoint. Bearer auth via `accessToken` is sent in the `Authorization` header. | +| `allowFrom` | QQ numbers permitted to talk to the bot. `["*"]` = anyone. Required `["*"]` (or include the joining user) for `welcomeNewMembers` to fire. | +| `groupPolicy` | `"mention"` (default) — reply only when @-mentioned or replying to the bot's own message. `"open"` — reply to every group message. A float `p` in `[0.0, 1.0]` — @mentions and replies-to-bot always reply; every other group message replies with probability `p` (so `0.0` ≡ `"mention"`, `1.0` ≡ `"open"`). Private chats always reply. | +| `groupPolicyOverrides` | Optional per-group overrides for `groupPolicy`, keyed by group id (as a string). Each value takes the same shape as `groupPolicy` (`"mention"`, `"open"`, or a float). Groups not listed fall back to `groupPolicy`. | +| `welcomeNewMembers` | When true, `notice.group_increase` events are pushed to the bus as a synthetic message so the agent can greet new joiners. | +| `maxImageBytes` | Hard cap (in bytes) for inbound image downloads. Defaults to 20 MB. Larger images are dropped with a warning. | + +
+
DingTalk (钉钉) diff --git a/nanobot/channels/napcat.py b/nanobot/channels/napcat.py new file mode 100644 index 000000000..0caf4063c --- /dev/null +++ b/nanobot/channels/napcat.py @@ -0,0 +1,540 @@ +"""Napcat (OneBot v11) channel for QQ, over WebSocket.""" + +from __future__ import annotations + +import asyncio +import base64 +import json +import os +import random +import time +import uuid +from collections import deque +from pathlib import Path +from typing import Annotated, Any, Literal +import aiohttp +from loguru import logger +from pydantic import Field + +from websockets.asyncio.client import ClientConnection, connect as ws_connect + +from nanobot.bus.events import OutboundMessage +from nanobot.bus.queue import MessageBus +from nanobot.channels.base import BaseChannel +from nanobot.config.paths import get_media_dir +from nanobot.config.schema import Base +from nanobot.security.network import validate_url_target +from nanobot.utils.helpers import safe_filename + +_DOWNLOAD_TIMEOUT = aiohttp.ClientTimeout(total=60) +_ACTION_TIMEOUT = 20.0 + + +# `"mention"` (only @mentions / replies) | `"open"` (every message) | float p +# in [0, 1]: mentions/replies always reply; other messages reply with probability +# p. 0.0 ≡ "mention", 1.0 ≡ "open". +GroupPolicy = Literal["mention", "open"] | Annotated[float, Field(ge=0.0, le=1.0)] + + +class NapcatConfig(Base): + """Napcat (OneBot v11) channel configuration.""" + + enabled: bool = False + ws_url: str = "ws://127.0.0.1:3001" + access_token: str = "" + allow_from: list[str] = Field(default_factory=list) + group_policy: GroupPolicy = "mention" + # Per-group overrides keyed by stringified group_id, e.g. {"123456": "open"}. + # Falls back to `group_policy` when a group_id isn't listed. + group_policy_overrides: dict[str, GroupPolicy] = Field(default_factory=dict) + welcome_new_members: bool = True + # Hard cap for inbound image downloads. Bigger images are dropped. + max_image_bytes: int = Field(default=20 * 1024 * 1024, ge=1) + + +class NapcatChannel(BaseChannel): + """Napcat / OneBot v11 channel.""" + + name = "napcat" + display_name = "Napcat (QQ)" + + @classmethod + def default_config(cls) -> dict[str, Any]: + return NapcatConfig().model_dump(by_alias=True) + + def __init__(self, config: Any, bus: MessageBus): + if isinstance(config, dict): + config = NapcatConfig.model_validate(config) + super().__init__(config, bus) + self.config: NapcatConfig = config + + self._ws: ClientConnection | None = None + self._http: aiohttp.ClientSession | None = None + self._media_root: Path = get_media_dir("napcat") + self._self_id: int | None = None + self._pending: dict[str, asyncio.Future[dict[str, Any]]] = {} + self._processed_ids: deque[int] = deque(maxlen=2000) + self._bot_outbound_ids: deque[int] = deque(maxlen=2000) + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def start(self) -> None: + if not self.config.ws_url: + logger.error("napcat: ws_url not configured") + return + + self._running = True + self._http = aiohttp.ClientSession(timeout=_DOWNLOAD_TIMEOUT) + + backoff = iter((5, 10)) # then 30s forever + while self._running: + try: + await self._run_once() + backoff = iter((5, 10)) # reset after a clean session + except asyncio.CancelledError: + raise + except Exception as e: + logger.warning("napcat: connection lost: {}", e) + if self._running: + await asyncio.sleep(next(backoff, 30)) + + async def _run_once(self) -> None: + headers = [] + if self.config.access_token: + headers.append(("Authorization", f"Bearer {self.config.access_token}")) + + logger.info("napcat: connecting to {}", self.config.ws_url) + async with ws_connect(self.config.ws_url, additional_headers=headers) as ws: + self._ws = ws + logger.info("napcat: connected") + try: + # Validate the connection before entering the dispatch loop. + # Napcat may interleave meta_event frames before our echo + # response, so dispatch any non-matching frames as we go. + echo = uuid.uuid4().hex + await ws.send( + json.dumps( + {"action": "get_login_info", "params": {}, "echo": echo}, + ensure_ascii=False, + ) + ) + deadline = asyncio.get_running_loop().time() + _ACTION_TIMEOUT + while True: + remaining = deadline - asyncio.get_running_loop().time() + if remaining <= 0: + raise asyncio.TimeoutError("get_login_info timed out") + raw = await asyncio.wait_for(ws.recv(), timeout=remaining) + try: + payload = json.loads(raw) + except json.JSONDecodeError: + continue + if isinstance(payload, dict) and payload.get("echo") == echo: + data = payload.get("data") or {} + logger.info( + "napcat: logged in as {} (user_id={})", + data.get("nickname"), + data.get("user_id"), + ) + break + await self._dispatch_frame(raw) + + async for raw in ws: + await self._dispatch_frame(raw) + finally: + self._ws = None + self._fail_pending(RuntimeError("napcat: websocket disconnected")) + + async def stop(self) -> None: + self._running = False + if self._ws is not None: + try: + await self._ws.close() + except Exception: + pass + self._ws = None + if self._http is not None: + try: + await self._http.close() + except Exception: + pass + self._http = None + self._fail_pending(RuntimeError("napcat: stopped")) + + def _fail_pending(self, err: BaseException) -> None: + for fut in self._pending.values(): + if not fut.done(): + fut.set_exception(err) + self._pending.clear() + + # ------------------------------------------------------------------ + # Frame dispatch + # ------------------------------------------------------------------ + + async def _dispatch_frame(self, raw: str | bytes) -> None: + # logger.debug("dispatch frame {}", raw) + try: + payload = json.loads(raw) + except json.JSONDecodeError: + logger.debug("napcat: dropping non-JSON frame") + return + if not isinstance(payload, dict): + return + + # Action response: identified by `echo` and absence of post_type. + if "echo" in payload and payload.get("post_type") is None: + echo = payload.get("echo") + fut = self._pending.pop(echo, None) if isinstance(echo, str) else None + if fut and not fut.done(): + fut.set_result(payload) + return + + if (sid := payload.get("self_id")) is not None: + try: + self._self_id = int(sid) + except (TypeError, ValueError): + pass + + post_type = payload.get("post_type") + if post_type == "message": + await self._on_message(payload) + elif post_type == "notice": + await self._on_notice(payload) + + # ------------------------------------------------------------------ + # Inbound: messages + # ------------------------------------------------------------------ + + async def _on_message(self, ev: dict[str, Any]) -> None: + msg_id = ev.get("message_id") + if isinstance(msg_id, int): + if msg_id in self._processed_ids: + return + self._processed_ids.append(msg_id) + + message_type = ev.get("message_type") + user_id = ev.get("user_id") + if user_id is None or message_type not in ("group", "private"): + return + + segments = self._normalize_segments(ev.get("message")) + text, images, mentioned_self, reply_to_id = self._parse_segments(segments) + + media_paths: list[str] = [] + for info in images: + if local := await self._download_image(info): + media_paths.append(local) + + sender = ev.get("sender") or {} + nickname = sender.get("card") or sender.get("nickname") + + if message_type == "group": + group_id = ev.get("group_id") + if group_id is None: + return + + replying_to_bot = ( + isinstance(reply_to_id, int) and reply_to_id in self._bot_outbound_ids + ) + if not self._should_reply_in_group( + group_id=group_id, + mentioned_self=mentioned_self, + replying_to_bot=replying_to_bot, + ): + return + + chat_id = f"group:{group_id}" + label = nickname or str(user_id) + content = f"{label}: {text}" + content = self._format_group_content( + text=text, + nickname=nickname, + user_id=user_id, + ) + else: + chat_id = f"private:{user_id}" + content = text + + if not content and not media_paths: + return + + await self._handle_message( + sender_id=str(user_id), + chat_id=chat_id, + content=content, + media=media_paths or None, + metadata={ + "message_id": msg_id, + "is_group": message_type == "group", + "nickname": nickname, + "reply_to": reply_to_id, + }, + ) + + @staticmethod + def _normalize_segments(message: Any) -> list[dict[str, Any]]: + # Napcat defaults to array format. Treat raw strings as a single text + # segment rather than parsing CQ codes — that path is fragile and + # users can configure napcat to emit arrays. + if isinstance(message, list): + return [seg for seg in message if isinstance(seg, dict)] + if isinstance(message, str) and message: + return [{"type": "text", "data": {"text": message}}] + return [] + + def _parse_segments( + self, segments: list[dict[str, Any]] + ) -> tuple[str, list[dict[str, Any]], bool, int | None]: + parts: list[str] = [] + images: list[dict[str, Any]] = [] + mentioned_self = False + reply_to: int | None = None + self_id_str = str(self._self_id) if self._self_id is not None else None + + for seg in segments: + stype = seg.get("type") + data = seg.get("data") or {} + if stype == "text": + if txt := data.get("text"): + parts.append(str(txt)) + elif stype == "image": + # OneBot exposes the downloadable image at `url`. Napcat + # additionally provides `file` (e.g. .png) and + # `file_size` (bytes, sometimes a string). + url = data.get("url") + if isinstance(url, str) and url.startswith(("http://", "https://")): + images.append( + { + "url": url, + "file": data.get("file"), + "file_size": data.get("file_size"), + } + ) + else: + logger.warning("napcat: received invalid image url: {}", url) + elif stype == "at": + qq = str(data.get("qq", "")) + if self_id_str and qq == self_id_str: + mentioned_self = True + else: + parts.append(f"@{qq}") + elif stype == "reply": + rid = data.get("id") + try: + reply_to = int(rid) if rid is not None else None + except (TypeError, ValueError): + pass + elif stype == "face": + parts.append(f"[face:{data.get('id', '')}]") + + text = " ".join(p.strip() for p in parts if p.strip()).strip() + return text, images, mentioned_self, reply_to + + def _should_reply_in_group( + self, *, group_id: Any, mentioned_self: bool, replying_to_bot: bool + ) -> bool: + if mentioned_self or replying_to_bot: + return True + policy = self.config.group_policy_overrides.get(str(group_id), self.config.group_policy) + if policy == "open": + return True + if policy == "mention": + return False + # Probability case: float in [0.0, 1.0]. + return random.random() < float(policy) + + @staticmethod + def _format_group_content( + *, + text: str, + nickname: str, + user_id: Any, + ) -> str: + label = nickname or str(user_id) + return f"{label}: {text}" + + # ------------------------------------------------------------------ + # Inbound: notices (member joined etc.) + # ------------------------------------------------------------------ + + async def _on_notice(self, ev: dict[str, Any]) -> None: + if ev.get("notice_type") != "group_increase" or not self.config.welcome_new_members: + return + + group_id = ev.get("group_id") + user_id = ev.get("user_id") + if group_id is None or user_id is None: + return + + nickname = await self._lookup_member_name(int(group_id), int(user_id)) + + # Note: this routes through is_allowed(). For group bots set + # `allow_from: ["*"]` (or include the joining user's id) for welcomes + # to fire — same trust model as a regular inbound message. + await self._handle_message( + sender_id=str(user_id), + chat_id=f"group:{group_id}", + content=f"[group event] new member {nickname} joined group {group_id}", + metadata={ + "is_group": True, + "event": "group_increase", + }, + ) + + async def _lookup_member_name(self, group_id: int, user_id: int) -> str: + """Lookup group member nickname. Fallback to user id.""" + try: + resp = await self._call_action( + "get_group_member_info", + {"group_id": group_id, "user_id": user_id, "no_cache": True}, + ) + data = resp.get("data", {}) + return data.get("card") or data.get("nickname") or str(user_id) + except Exception as e: + logger.warning("napcat: get_group_member_info failed: {}", e) + return str(user_id) + + # ------------------------------------------------------------------ + # Outbound + # ------------------------------------------------------------------ + + async def send(self, msg: OutboundMessage) -> None: + if self._ws is None: + logger.warning("napcat: not connected, dropping outbound message") + return + + kind, _, target = msg.chat_id.partition(":") + if kind not in ("private", "group") or not target: + logger.error("napcat: invalid chat_id '{}'", msg.chat_id) + return + + segments: list[dict[str, Any]] = [] + for ref in msg.media or []: + if seg := await self._build_image_segment(ref): + segments.append(seg) + if text := (msg.content or "").strip(): + segments.append({"type": "text", "data": {"text": text}}) + if not segments: + return + + params: dict[str, Any] = {"message": segments} + if kind == "group": + params["message_type"] = "group" + params["group_id"] = int(target) + else: + params["message_type"] = "private" + params["user_id"] = int(target) + + resp = await self._call_action("send_msg", params) + data = resp.get("data") or {} + if (mid := data.get("message_id")) is not None: + self._bot_outbound_ids.append(int(mid)) + + async def _build_image_segment(self, ref: str) -> dict[str, Any] | None: + ref = (ref or "").strip() + if not ref: + return None + if ref.startswith(("http://", "https://")): + ok, err = validate_url_target(ref) + if not ok: + logger.warning("napcat: rejected remote image '{}': {}", ref, err) + return None + return {"type": "image", "data": {"file": ref}} + # Local path → base64 so it works even when napcat runs on a + # different host/container than nanobot. + path = Path(os.path.expanduser(ref)).resolve() + if not path.is_file(): + logger.warning("napcat: local image not found: {}", path) + return None + data = await asyncio.to_thread(path.read_bytes) + return {"type": "image", "data": {"file": "base64://" + base64.b64encode(data).decode()}} + + async def _call_action( + self, + action: str, + params: dict[str, Any], + timeout: float = _ACTION_TIMEOUT, + ) -> dict[str, Any]: + if self._ws is None: + raise RuntimeError("napcat: not connected") + echo = uuid.uuid4().hex + loop = asyncio.get_running_loop() + fut: asyncio.Future[dict[str, Any]] = loop.create_future() + self._pending[echo] = fut + try: + await self._ws.send( + json.dumps({"action": action, "params": params, "echo": echo}, ensure_ascii=False) + ) + return await asyncio.wait_for(fut, timeout=timeout) + finally: + self._pending.pop(echo, None) + + # ------------------------------------------------------------------ + # Image download + # ------------------------------------------------------------------ + + async def _download_image(self, info: dict[str, Any]) -> str | None: + url = info.get("url") + if not isinstance(url, str): + return None + # logger.debug("napcat: downloading image from {}", url) + if self._http is None: + return None + ok, err = validate_url_target(url) + if not ok: + logger.warning("napcat: skip image '{}': {}", url, err) + return None + max_bytes = self.config.max_image_bytes + + # Reject upfront when napcat tells us the size and it's too big. + try: + declared_size = int(info["file_size"]) + if declared_size > max_bytes: + logger.warning( + "napcat: image declared size={} exceeds max_image_bytes={} url={}", + declared_size, + max_bytes, + url, + ) + return None + except (TypeError, KeyError): + pass + + try: + async with self._http.get(url, allow_redirects=True) as resp: + if resp.status >= 400: + logger.warning("napcat: image download status={} url={}", resp.status, url) + return None + # Stream until EOF, capping memory at max_bytes. Don't use + # content.read(max_bytes+1) — it returns only what's currently + # buffered, which truncates chunked responses mid-image. + buf = bytearray() + truncated = False + async for chunk in resp.content.iter_chunked(64 * 1024): + buf.extend(chunk) + if len(buf) > max_bytes: + truncated = True + break + if truncated: + logger.warning( + "napcat: image exceeds max_image_bytes={} url={}", max_bytes, url + ) + return None + data = bytes(buf) + except Exception as e: + logger.warning("napcat: image download error url={} err={}", url, e) + return None + + filename_hint = info.get("file") + if filename_hint: + name = safe_filename(filename_hint) + else: + name = f"{int(time.time() * 1000)}.jpg" + path = self._media_root / name + try: + await asyncio.to_thread(path.write_bytes, data) + except OSError as e: + logger.warning("napcat: failed to save image: {}", e) + return None + return str(path)