From b1a3053ceb1f58f330b92b391f7fc2cbe179a57e Mon Sep 17 00:00:00 2001
From: LZDQ <1486701401@qq.com>
Date: Tue, 28 Apr 2026 22:46:47 +0800
Subject: [PATCH] Channel napcat by Claude
---
docs/chat-apps.md | 45 ++++
nanobot/channels/napcat.py | 540 +++++++++++++++++++++++++++++++++++++
2 files changed, 585 insertions(+)
create mode 100644 nanobot/channels/napcat.py
diff --git a/docs/chat-apps.md b/docs/chat-apps.md
index fe14a78cb..2e3bbd750 100644
--- a/docs/chat-apps.md
+++ b/docs/chat-apps.md
@@ -14,6 +14,7 @@ Connect nanobot to your favorite chat platform. Want to build your own? See the
| **Matrix** | Homeserver URL + Access token |
| **Email** | IMAP/SMTP credentials |
| **QQ** | App ID + App Secret |
+| **Napcat (QQ)** | Napcat Forward WebSocket URL + access token |
| **Wecom** | Bot ID + Bot Secret |
| **Microsoft Teams** | App ID + App Password + public HTTPS endpoint |
| **Mochat** | Claw token (auto-setup available) |
@@ -424,6 +425,50 @@ Now send a message to the bot from QQ — it should respond!
+
+Napcat (QQ via OneBot v11 支持群聊等功能)
+
+Connects to a [Napcat](https://github.com/NapNeko/NapCatQQ) instance over its **forward WebSocket** (OneBot v11). Use this when you have your own QQ account running through Napcat and want full private + group chat support.
+
+**1. Set up Napcat**
+
+- Install and log into Napcat, then enable a **Forward WebSocket** server. Recommends: [official napcat docker tutorial](https://github.com/NapNeko/NapCat-Docker)
+- In the webui, follow "网络配置" -> "新建" -> "Websocket 服务器" to create a forward websocket server. By default, the URL is `ws://127.0.0.1:3001`
+- Copy the forward websocket server's token
+- (Optional) In the webui, follow "系统配置" -> "登陆配置" -> "快速登录QQ" to automatically login after restarts
+
+**2. Configure**
+
+```json
+{
+ "channels": {
+ "napcat": {
+ "enabled": true,
+ "wsUrl": "ws://127.0.0.1:3001",
+ "accessToken": "YOUR_WEBSOCKET_TOKEN",
+ "allowFrom": ["*"],
+ "groupPolicy": "mention",
+ "groupPolicyOverrides": {
+ "123456789": "open",
+ "987654321": 0.2
+ },
+ "welcomeNewMembers": true
+ }
+ }
+}
+```
+
+| Option | What it does |
+|--------|--------------|
+| `wsUrl` | Napcat forward-WebSocket endpoint. Bearer auth via `accessToken` is sent in the `Authorization` header. |
+| `allowFrom` | QQ numbers permitted to talk to the bot. `["*"]` = anyone. Required `["*"]` (or include the joining user) for `welcomeNewMembers` to fire. |
+| `groupPolicy` | `"mention"` (default) — reply only when @-mentioned or replying to the bot's own message. `"open"` — reply to every group message. A float `p` in `[0.0, 1.0]` — @mentions and replies-to-bot always reply; every other group message replies with probability `p` (so `0.0` ≡ `"mention"`, `1.0` ≡ `"open"`). Private chats always reply. |
+| `groupPolicyOverrides` | Optional per-group overrides for `groupPolicy`, keyed by group id (as a string). Each value takes the same shape as `groupPolicy` (`"mention"`, `"open"`, or a float). Groups not listed fall back to `groupPolicy`. |
+| `welcomeNewMembers` | When true, `notice.group_increase` events are pushed to the bus as a synthetic message so the agent can greet new joiners. |
+| `maxImageBytes` | Hard cap (in bytes) for inbound image downloads. Defaults to 20 MB. Larger images are dropped with a warning. |
+
+
+
DingTalk (钉钉)
diff --git a/nanobot/channels/napcat.py b/nanobot/channels/napcat.py
new file mode 100644
index 000000000..0caf4063c
--- /dev/null
+++ b/nanobot/channels/napcat.py
@@ -0,0 +1,540 @@
+"""Napcat (OneBot v11) channel for QQ, over WebSocket."""
+
+from __future__ import annotations
+
+import asyncio
+import base64
+import json
+import os
+import random
+import time
+import uuid
+from collections import deque
+from pathlib import Path
+from typing import Annotated, Any, Literal
+import aiohttp
+from loguru import logger
+from pydantic import Field
+
+from websockets.asyncio.client import ClientConnection, connect as ws_connect
+
+from nanobot.bus.events import OutboundMessage
+from nanobot.bus.queue import MessageBus
+from nanobot.channels.base import BaseChannel
+from nanobot.config.paths import get_media_dir
+from nanobot.config.schema import Base
+from nanobot.security.network import validate_url_target
+from nanobot.utils.helpers import safe_filename
+
+_DOWNLOAD_TIMEOUT = aiohttp.ClientTimeout(total=60)
+_ACTION_TIMEOUT = 20.0
+
+
+# `"mention"` (only @mentions / replies) | `"open"` (every message) | float p
+# in [0, 1]: mentions/replies always reply; other messages reply with probability
+# p. 0.0 ≡ "mention", 1.0 ≡ "open".
+GroupPolicy = Literal["mention", "open"] | Annotated[float, Field(ge=0.0, le=1.0)]
+
+
+class NapcatConfig(Base):
+ """Napcat (OneBot v11) channel configuration."""
+
+ enabled: bool = False
+ ws_url: str = "ws://127.0.0.1:3001"
+ access_token: str = ""
+ allow_from: list[str] = Field(default_factory=list)
+ group_policy: GroupPolicy = "mention"
+ # Per-group overrides keyed by stringified group_id, e.g. {"123456": "open"}.
+ # Falls back to `group_policy` when a group_id isn't listed.
+ group_policy_overrides: dict[str, GroupPolicy] = Field(default_factory=dict)
+ welcome_new_members: bool = True
+ # Hard cap for inbound image downloads. Bigger images are dropped.
+ max_image_bytes: int = Field(default=20 * 1024 * 1024, ge=1)
+
+
+class NapcatChannel(BaseChannel):
+ """Napcat / OneBot v11 channel."""
+
+ name = "napcat"
+ display_name = "Napcat (QQ)"
+
+ @classmethod
+ def default_config(cls) -> dict[str, Any]:
+ return NapcatConfig().model_dump(by_alias=True)
+
+ def __init__(self, config: Any, bus: MessageBus):
+ if isinstance(config, dict):
+ config = NapcatConfig.model_validate(config)
+ super().__init__(config, bus)
+ self.config: NapcatConfig = config
+
+ self._ws: ClientConnection | None = None
+ self._http: aiohttp.ClientSession | None = None
+ self._media_root: Path = get_media_dir("napcat")
+ self._self_id: int | None = None
+ self._pending: dict[str, asyncio.Future[dict[str, Any]]] = {}
+ self._processed_ids: deque[int] = deque(maxlen=2000)
+ self._bot_outbound_ids: deque[int] = deque(maxlen=2000)
+
+ # ------------------------------------------------------------------
+ # Lifecycle
+ # ------------------------------------------------------------------
+
+ async def start(self) -> None:
+ if not self.config.ws_url:
+ logger.error("napcat: ws_url not configured")
+ return
+
+ self._running = True
+ self._http = aiohttp.ClientSession(timeout=_DOWNLOAD_TIMEOUT)
+
+ backoff = iter((5, 10)) # then 30s forever
+ while self._running:
+ try:
+ await self._run_once()
+ backoff = iter((5, 10)) # reset after a clean session
+ except asyncio.CancelledError:
+ raise
+ except Exception as e:
+ logger.warning("napcat: connection lost: {}", e)
+ if self._running:
+ await asyncio.sleep(next(backoff, 30))
+
+ async def _run_once(self) -> None:
+ headers = []
+ if self.config.access_token:
+ headers.append(("Authorization", f"Bearer {self.config.access_token}"))
+
+ logger.info("napcat: connecting to {}", self.config.ws_url)
+ async with ws_connect(self.config.ws_url, additional_headers=headers) as ws:
+ self._ws = ws
+ logger.info("napcat: connected")
+ try:
+ # Validate the connection before entering the dispatch loop.
+ # Napcat may interleave meta_event frames before our echo
+ # response, so dispatch any non-matching frames as we go.
+ echo = uuid.uuid4().hex
+ await ws.send(
+ json.dumps(
+ {"action": "get_login_info", "params": {}, "echo": echo},
+ ensure_ascii=False,
+ )
+ )
+ deadline = asyncio.get_running_loop().time() + _ACTION_TIMEOUT
+ while True:
+ remaining = deadline - asyncio.get_running_loop().time()
+ if remaining <= 0:
+ raise asyncio.TimeoutError("get_login_info timed out")
+ raw = await asyncio.wait_for(ws.recv(), timeout=remaining)
+ try:
+ payload = json.loads(raw)
+ except json.JSONDecodeError:
+ continue
+ if isinstance(payload, dict) and payload.get("echo") == echo:
+ data = payload.get("data") or {}
+ logger.info(
+ "napcat: logged in as {} (user_id={})",
+ data.get("nickname"),
+ data.get("user_id"),
+ )
+ break
+ await self._dispatch_frame(raw)
+
+ async for raw in ws:
+ await self._dispatch_frame(raw)
+ finally:
+ self._ws = None
+ self._fail_pending(RuntimeError("napcat: websocket disconnected"))
+
+ async def stop(self) -> None:
+ self._running = False
+ if self._ws is not None:
+ try:
+ await self._ws.close()
+ except Exception:
+ pass
+ self._ws = None
+ if self._http is not None:
+ try:
+ await self._http.close()
+ except Exception:
+ pass
+ self._http = None
+ self._fail_pending(RuntimeError("napcat: stopped"))
+
+ def _fail_pending(self, err: BaseException) -> None:
+ for fut in self._pending.values():
+ if not fut.done():
+ fut.set_exception(err)
+ self._pending.clear()
+
+ # ------------------------------------------------------------------
+ # Frame dispatch
+ # ------------------------------------------------------------------
+
+ async def _dispatch_frame(self, raw: str | bytes) -> None:
+ # logger.debug("dispatch frame {}", raw)
+ try:
+ payload = json.loads(raw)
+ except json.JSONDecodeError:
+ logger.debug("napcat: dropping non-JSON frame")
+ return
+ if not isinstance(payload, dict):
+ return
+
+ # Action response: identified by `echo` and absence of post_type.
+ if "echo" in payload and payload.get("post_type") is None:
+ echo = payload.get("echo")
+ fut = self._pending.pop(echo, None) if isinstance(echo, str) else None
+ if fut and not fut.done():
+ fut.set_result(payload)
+ return
+
+ if (sid := payload.get("self_id")) is not None:
+ try:
+ self._self_id = int(sid)
+ except (TypeError, ValueError):
+ pass
+
+ post_type = payload.get("post_type")
+ if post_type == "message":
+ await self._on_message(payload)
+ elif post_type == "notice":
+ await self._on_notice(payload)
+
+ # ------------------------------------------------------------------
+ # Inbound: messages
+ # ------------------------------------------------------------------
+
+ async def _on_message(self, ev: dict[str, Any]) -> None:
+ msg_id = ev.get("message_id")
+ if isinstance(msg_id, int):
+ if msg_id in self._processed_ids:
+ return
+ self._processed_ids.append(msg_id)
+
+ message_type = ev.get("message_type")
+ user_id = ev.get("user_id")
+ if user_id is None or message_type not in ("group", "private"):
+ return
+
+ segments = self._normalize_segments(ev.get("message"))
+ text, images, mentioned_self, reply_to_id = self._parse_segments(segments)
+
+ media_paths: list[str] = []
+ for info in images:
+ if local := await self._download_image(info):
+ media_paths.append(local)
+
+ sender = ev.get("sender") or {}
+ nickname = sender.get("card") or sender.get("nickname")
+
+ if message_type == "group":
+ group_id = ev.get("group_id")
+ if group_id is None:
+ return
+
+ replying_to_bot = (
+ isinstance(reply_to_id, int) and reply_to_id in self._bot_outbound_ids
+ )
+ if not self._should_reply_in_group(
+ group_id=group_id,
+ mentioned_self=mentioned_self,
+ replying_to_bot=replying_to_bot,
+ ):
+ return
+
+ chat_id = f"group:{group_id}"
+ label = nickname or str(user_id)
+ content = f"{label}: {text}"
+ content = self._format_group_content(
+ text=text,
+ nickname=nickname,
+ user_id=user_id,
+ )
+ else:
+ chat_id = f"private:{user_id}"
+ content = text
+
+ if not content and not media_paths:
+ return
+
+ await self._handle_message(
+ sender_id=str(user_id),
+ chat_id=chat_id,
+ content=content,
+ media=media_paths or None,
+ metadata={
+ "message_id": msg_id,
+ "is_group": message_type == "group",
+ "nickname": nickname,
+ "reply_to": reply_to_id,
+ },
+ )
+
+ @staticmethod
+ def _normalize_segments(message: Any) -> list[dict[str, Any]]:
+ # Napcat defaults to array format. Treat raw strings as a single text
+ # segment rather than parsing CQ codes — that path is fragile and
+ # users can configure napcat to emit arrays.
+ if isinstance(message, list):
+ return [seg for seg in message if isinstance(seg, dict)]
+ if isinstance(message, str) and message:
+ return [{"type": "text", "data": {"text": message}}]
+ return []
+
+ def _parse_segments(
+ self, segments: list[dict[str, Any]]
+ ) -> tuple[str, list[dict[str, Any]], bool, int | None]:
+ parts: list[str] = []
+ images: list[dict[str, Any]] = []
+ mentioned_self = False
+ reply_to: int | None = None
+ self_id_str = str(self._self_id) if self._self_id is not None else None
+
+ for seg in segments:
+ stype = seg.get("type")
+ data = seg.get("data") or {}
+ if stype == "text":
+ if txt := data.get("text"):
+ parts.append(str(txt))
+ elif stype == "image":
+ # OneBot exposes the downloadable image at `url`. Napcat
+ # additionally provides `file` (e.g. .png) and
+ # `file_size` (bytes, sometimes a string).
+ url = data.get("url")
+ if isinstance(url, str) and url.startswith(("http://", "https://")):
+ images.append(
+ {
+ "url": url,
+ "file": data.get("file"),
+ "file_size": data.get("file_size"),
+ }
+ )
+ else:
+ logger.warning("napcat: received invalid image url: {}", url)
+ elif stype == "at":
+ qq = str(data.get("qq", ""))
+ if self_id_str and qq == self_id_str:
+ mentioned_self = True
+ else:
+ parts.append(f"@{qq}")
+ elif stype == "reply":
+ rid = data.get("id")
+ try:
+ reply_to = int(rid) if rid is not None else None
+ except (TypeError, ValueError):
+ pass
+ elif stype == "face":
+ parts.append(f"[face:{data.get('id', '')}]")
+
+ text = " ".join(p.strip() for p in parts if p.strip()).strip()
+ return text, images, mentioned_self, reply_to
+
+ def _should_reply_in_group(
+ self, *, group_id: Any, mentioned_self: bool, replying_to_bot: bool
+ ) -> bool:
+ if mentioned_self or replying_to_bot:
+ return True
+ policy = self.config.group_policy_overrides.get(str(group_id), self.config.group_policy)
+ if policy == "open":
+ return True
+ if policy == "mention":
+ return False
+ # Probability case: float in [0.0, 1.0].
+ return random.random() < float(policy)
+
+ @staticmethod
+ def _format_group_content(
+ *,
+ text: str,
+ nickname: str,
+ user_id: Any,
+ ) -> str:
+ label = nickname or str(user_id)
+ return f"{label}: {text}"
+
+ # ------------------------------------------------------------------
+ # Inbound: notices (member joined etc.)
+ # ------------------------------------------------------------------
+
+ async def _on_notice(self, ev: dict[str, Any]) -> None:
+ if ev.get("notice_type") != "group_increase" or not self.config.welcome_new_members:
+ return
+
+ group_id = ev.get("group_id")
+ user_id = ev.get("user_id")
+ if group_id is None or user_id is None:
+ return
+
+ nickname = await self._lookup_member_name(int(group_id), int(user_id))
+
+ # Note: this routes through is_allowed(). For group bots set
+ # `allow_from: ["*"]` (or include the joining user's id) for welcomes
+ # to fire — same trust model as a regular inbound message.
+ await self._handle_message(
+ sender_id=str(user_id),
+ chat_id=f"group:{group_id}",
+ content=f"[group event] new member {nickname} joined group {group_id}",
+ metadata={
+ "is_group": True,
+ "event": "group_increase",
+ },
+ )
+
+ async def _lookup_member_name(self, group_id: int, user_id: int) -> str:
+ """Lookup group member nickname. Fallback to user id."""
+ try:
+ resp = await self._call_action(
+ "get_group_member_info",
+ {"group_id": group_id, "user_id": user_id, "no_cache": True},
+ )
+ data = resp.get("data", {})
+ return data.get("card") or data.get("nickname") or str(user_id)
+ except Exception as e:
+ logger.warning("napcat: get_group_member_info failed: {}", e)
+ return str(user_id)
+
+ # ------------------------------------------------------------------
+ # Outbound
+ # ------------------------------------------------------------------
+
+ async def send(self, msg: OutboundMessage) -> None:
+ if self._ws is None:
+ logger.warning("napcat: not connected, dropping outbound message")
+ return
+
+ kind, _, target = msg.chat_id.partition(":")
+ if kind not in ("private", "group") or not target:
+ logger.error("napcat: invalid chat_id '{}'", msg.chat_id)
+ return
+
+ segments: list[dict[str, Any]] = []
+ for ref in msg.media or []:
+ if seg := await self._build_image_segment(ref):
+ segments.append(seg)
+ if text := (msg.content or "").strip():
+ segments.append({"type": "text", "data": {"text": text}})
+ if not segments:
+ return
+
+ params: dict[str, Any] = {"message": segments}
+ if kind == "group":
+ params["message_type"] = "group"
+ params["group_id"] = int(target)
+ else:
+ params["message_type"] = "private"
+ params["user_id"] = int(target)
+
+ resp = await self._call_action("send_msg", params)
+ data = resp.get("data") or {}
+ if (mid := data.get("message_id")) is not None:
+ self._bot_outbound_ids.append(int(mid))
+
+ async def _build_image_segment(self, ref: str) -> dict[str, Any] | None:
+ ref = (ref or "").strip()
+ if not ref:
+ return None
+ if ref.startswith(("http://", "https://")):
+ ok, err = validate_url_target(ref)
+ if not ok:
+ logger.warning("napcat: rejected remote image '{}': {}", ref, err)
+ return None
+ return {"type": "image", "data": {"file": ref}}
+ # Local path → base64 so it works even when napcat runs on a
+ # different host/container than nanobot.
+ path = Path(os.path.expanduser(ref)).resolve()
+ if not path.is_file():
+ logger.warning("napcat: local image not found: {}", path)
+ return None
+ data = await asyncio.to_thread(path.read_bytes)
+ return {"type": "image", "data": {"file": "base64://" + base64.b64encode(data).decode()}}
+
+ async def _call_action(
+ self,
+ action: str,
+ params: dict[str, Any],
+ timeout: float = _ACTION_TIMEOUT,
+ ) -> dict[str, Any]:
+ if self._ws is None:
+ raise RuntimeError("napcat: not connected")
+ echo = uuid.uuid4().hex
+ loop = asyncio.get_running_loop()
+ fut: asyncio.Future[dict[str, Any]] = loop.create_future()
+ self._pending[echo] = fut
+ try:
+ await self._ws.send(
+ json.dumps({"action": action, "params": params, "echo": echo}, ensure_ascii=False)
+ )
+ return await asyncio.wait_for(fut, timeout=timeout)
+ finally:
+ self._pending.pop(echo, None)
+
+ # ------------------------------------------------------------------
+ # Image download
+ # ------------------------------------------------------------------
+
+ async def _download_image(self, info: dict[str, Any]) -> str | None:
+ url = info.get("url")
+ if not isinstance(url, str):
+ return None
+ # logger.debug("napcat: downloading image from {}", url)
+ if self._http is None:
+ return None
+ ok, err = validate_url_target(url)
+ if not ok:
+ logger.warning("napcat: skip image '{}': {}", url, err)
+ return None
+ max_bytes = self.config.max_image_bytes
+
+ # Reject upfront when napcat tells us the size and it's too big.
+ try:
+ declared_size = int(info["file_size"])
+ if declared_size > max_bytes:
+ logger.warning(
+ "napcat: image declared size={} exceeds max_image_bytes={} url={}",
+ declared_size,
+ max_bytes,
+ url,
+ )
+ return None
+ except (TypeError, KeyError):
+ pass
+
+ try:
+ async with self._http.get(url, allow_redirects=True) as resp:
+ if resp.status >= 400:
+ logger.warning("napcat: image download status={} url={}", resp.status, url)
+ return None
+ # Stream until EOF, capping memory at max_bytes. Don't use
+ # content.read(max_bytes+1) — it returns only what's currently
+ # buffered, which truncates chunked responses mid-image.
+ buf = bytearray()
+ truncated = False
+ async for chunk in resp.content.iter_chunked(64 * 1024):
+ buf.extend(chunk)
+ if len(buf) > max_bytes:
+ truncated = True
+ break
+ if truncated:
+ logger.warning(
+ "napcat: image exceeds max_image_bytes={} url={}", max_bytes, url
+ )
+ return None
+ data = bytes(buf)
+ except Exception as e:
+ logger.warning("napcat: image download error url={} err={}", url, e)
+ return None
+
+ filename_hint = info.get("file")
+ if filename_hint:
+ name = safe_filename(filename_hint)
+ else:
+ name = f"{int(time.time() * 1000)}.jpg"
+ path = self._media_root / name
+ try:
+ await asyncio.to_thread(path.write_bytes, data)
+ except OSError as e:
+ logger.warning("napcat: failed to save image: {}", e)
+ return None
+ return str(path)