diff --git a/nanobot/channels/qq.py b/nanobot/channels/qq.py index bef2cf27a..484eed6e2 100644 --- a/nanobot/channels/qq.py +++ b/nanobot/channels/qq.py @@ -242,43 +242,46 @@ class QQChannel(BaseChannel): async def send(self, msg: OutboundMessage) -> None: """Send attachments first, then text.""" - if not self._client: - logger.warning("QQ client not initialized") - return + try: + if not self._client: + logger.warning("QQ client not initialized") + return - msg_id = msg.metadata.get("message_id") - chat_type = self._chat_type_cache.get(msg.chat_id, "c2c") - is_group = chat_type == "group" + msg_id = msg.metadata.get("message_id") + chat_type = self._chat_type_cache.get(msg.chat_id, "c2c") + is_group = chat_type == "group" - # 1) Send media - for media_ref in msg.media or []: - ok = await self._send_media( - chat_id=msg.chat_id, - media_ref=media_ref, - msg_id=msg_id, - is_group=is_group, - ) - if not ok: - filename = ( - os.path.basename(urlparse(media_ref).path) - or os.path.basename(media_ref) - or "file" + # 1) Send media + for media_ref in msg.media or []: + ok = await self._send_media( + chat_id=msg.chat_id, + media_ref=media_ref, + msg_id=msg_id, + is_group=is_group, ) + if not ok: + filename = ( + os.path.basename(urlparse(media_ref).path) + or os.path.basename(media_ref) + or "file" + ) + await self._send_text_only( + chat_id=msg.chat_id, + is_group=is_group, + msg_id=msg_id, + content=f"[Attachment send failed: {filename}]", + ) + + # 2) Send text + if msg.content and msg.content.strip(): await self._send_text_only( chat_id=msg.chat_id, is_group=is_group, msg_id=msg_id, - content=f"[Attachment send failed: {filename}]", + content=msg.content.strip(), ) - - # 2) Send text - if msg.content and msg.content.strip(): - await self._send_text_only( - chat_id=msg.chat_id, - is_group=is_group, - msg_id=msg_id, - content=msg.content.strip(), - ) + except Exception: + logger.exception("Error sending QQ message to chat_id={}", msg.chat_id) async def _send_text_only( self, @@ -438,15 +441,26 @@ class QQChannel(BaseChannel): endpoint = "/v2/users/{openid}/files" id_key = "openid" - payload = { + payload: dict[str, Any] = { id_key: chat_id, "file_type": file_type, "file_data": file_data, - "file_name": file_name, "srv_send_msg": srv_send_msg, } + # Only pass file_name for non-image types (file_type=4). + # Passing file_name for images causes QQ client to render them as + # file attachments instead of inline images. + if file_type != QQ_FILE_TYPE_IMAGE and file_name: + payload["file_name"] = file_name + route = Route("POST", endpoint, **{id_key: chat_id}) - return await self._client.api._http.request(route, json=payload) + result = await self._client.api._http.request(route, json=payload) + + # Extract only the file_info field to avoid extra fields (file_uuid, ttl, etc.) + # that may confuse QQ client when sending the media object. + if isinstance(result, dict) and "file_info" in result: + return {"file_info": result["file_info"]} + return result # --------------------------- # Inbound (receive) @@ -454,58 +468,68 @@ class QQChannel(BaseChannel): async def _on_message(self, data: C2CMessage | GroupMessage, is_group: bool = False) -> None: """Parse inbound message, download attachments, and publish to the bus.""" - if data.id in self._processed_ids: - return - self._processed_ids.append(data.id) + try: + if data.id in self._processed_ids: + return + self._processed_ids.append(data.id) - if is_group: - chat_id = data.group_openid - user_id = data.author.member_openid - self._chat_type_cache[chat_id] = "group" - else: - chat_id = str( - getattr(data.author, "id", None) or getattr(data.author, "user_openid", "unknown") - ) - user_id = chat_id - self._chat_type_cache[chat_id] = "c2c" - - content = (data.content or "").strip() - - # the data used by tests don't contain attachments property - # so we use getattr with a default of [] to avoid AttributeError in tests - attachments = getattr(data, "attachments", None) or [] - media_paths, recv_lines, att_meta = await self._handle_attachments(attachments) - - # Compose content that always contains actionable saved paths - if recv_lines: - tag = "[Image]" if any(_is_image_name(Path(p).name) for p in media_paths) else "[File]" - file_block = "Received files:\n" + "\n".join(recv_lines) - content = f"{content}\n\n{file_block}".strip() if content else f"{tag}\n{file_block}" - - if not content and not media_paths: - return - - if self.config.ack_message: - try: - await self._send_text_only( - chat_id=chat_id, - is_group=is_group, - msg_id=data.id, - content=self.config.ack_message, + if is_group: + chat_id = data.group_openid + user_id = data.author.member_openid + self._chat_type_cache[chat_id] = "group" + else: + chat_id = str( + getattr(data.author, "id", None) + or getattr(data.author, "user_openid", "unknown") ) - except Exception: - logger.debug("QQ ack message failed for chat_id={}", chat_id) + user_id = chat_id + self._chat_type_cache[chat_id] = "c2c" - await self._handle_message( - sender_id=user_id, - chat_id=chat_id, - content=content, - media=media_paths if media_paths else None, - metadata={ - "message_id": data.id, - "attachments": att_meta, - }, - ) + content = (data.content or "").strip() + + # the data used by tests don't contain attachments property + # so we use getattr with a default of [] to avoid AttributeError in tests + attachments = getattr(data, "attachments", None) or [] + media_paths, recv_lines, att_meta = await self._handle_attachments(attachments) + + # Compose content that always contains actionable saved paths + if recv_lines: + tag = ( + "[Image]" + if any(_is_image_name(Path(p).name) for p in media_paths) + else "[File]" + ) + file_block = "Received files:\n" + "\n".join(recv_lines) + content = ( + f"{content}\n\n{file_block}".strip() if content else f"{tag}\n{file_block}" + ) + + if not content and not media_paths: + return + + if self.config.ack_message: + try: + await self._send_text_only( + chat_id=chat_id, + is_group=is_group, + msg_id=data.id, + content=self.config.ack_message, + ) + except Exception: + logger.debug("QQ ack message failed for chat_id={}", chat_id) + + await self._handle_message( + sender_id=user_id, + chat_id=chat_id, + content=content, + media=media_paths if media_paths else None, + metadata={ + "message_id": data.id, + "attachments": att_meta, + }, + ) + except Exception: + logger.exception("Error handling QQ inbound message id={}", getattr(data, "id", "?")) async def _handle_attachments( self, @@ -520,7 +544,9 @@ class QQChannel(BaseChannel): return media_paths, recv_lines, att_meta for att in attachments: - url, filename, ctype = att.url, att.filename, att.content_type + url = getattr(att, "url", None) or "" + filename = getattr(att, "filename", None) or "" + ctype = getattr(att, "content_type", None) or "" logger.info("Downloading file from QQ: {}", filename or url) local_path = await self._download_to_media_dir_chunked(url, filename_hint=filename) @@ -555,6 +581,10 @@ class QQChannel(BaseChannel): Enforces a max download size and writes to a .part temp file that is atomically renamed on success. """ + # Handle protocol-relative URLs (e.g. "//multimedia.nt.qq.com/...") + if url.startswith("//"): + url = f"https:{url}" + if not self._http: self._http = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=120)) diff --git a/nanobot/channels/wecom.py b/nanobot/channels/wecom.py index 05ad14825..ec9e782be 100644 --- a/nanobot/channels/wecom.py +++ b/nanobot/channels/wecom.py @@ -1,6 +1,8 @@ """WeCom (Enterprise WeChat) channel implementation using wecom_aibot_sdk.""" import asyncio +import base64 +import hashlib import importlib.util import os from collections import OrderedDict @@ -217,6 +219,7 @@ class WecomChannel(BaseChannel): chat_id = body.get("chatid", sender_id) content_parts = [] + media_paths: list[str] = [] if msg_type == "text": text = body.get("text", {}).get("content", "") @@ -232,7 +235,8 @@ class WecomChannel(BaseChannel): file_path = await self._download_and_save_media(file_url, aes_key, "image") if file_path: filename = os.path.basename(file_path) - content_parts.append(f"[image: {filename}]\n[Image: source: {file_path}]") + content_parts.append(f"[image: {filename}]") + media_paths.append(file_path) else: content_parts.append("[image: download failed]") else: @@ -286,12 +290,11 @@ class WecomChannel(BaseChannel): self._chat_frames[chat_id] = frame # Forward to message bus - # Note: media paths are included in content for broader model compatibility await self._handle_message( sender_id=sender_id, chat_id=chat_id, content=content, - media=None, + media=media_paths or None, metadata={ "message_id": msg_id, "msg_type": msg_type, @@ -336,6 +339,93 @@ class WecomChannel(BaseChannel): logger.error("Error downloading media: {}", e) return None + async def _upload_media_ws( + self, client: Any, file_path: str, + ) -> "tuple[str, str] | tuple[None, None]": + """Upload a local file to WeCom via WebSocket 3-step protocol (base64). + + Uses the WeCom WebSocket upload commands directly via + ``client._ws_manager.send_reply()``: + + ``aibot_upload_media_init`` → upload_id + ``aibot_upload_media_chunk`` × N (≤512 KB raw per chunk, base64) + ``aibot_upload_media_finish`` → media_id + + Returns (media_id, media_type) on success, (None, None) on failure. + """ + from wecom_aibot_sdk.utils import generate_req_id as _gen_req_id + + try: + fname = os.path.basename(file_path) + ext = os.path.splitext(fname)[1].lower() + + if ext in (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"): + media_type = "image" + elif ext in (".mp4", ".avi", ".mov"): + media_type = "video" + elif ext in (".amr", ".mp3", ".wav", ".ogg"): + media_type = "voice" + else: + media_type = "file" + + data = open(file_path, "rb").read() # noqa: SIM115 + file_size = len(data) + md5_hash = hashlib.md5(data).hexdigest() # noqa: S324 + + CHUNK_SIZE = 512 * 1024 # 512 KB raw (before base64) + chunk_list = [data[i : i + CHUNK_SIZE] for i in range(0, file_size, CHUNK_SIZE)] + n_chunks = len(chunk_list) + + # Step 1: init + req_id = _gen_req_id("upload_init") + resp = await client._ws_manager.send_reply(req_id, { + "type": media_type, + "filename": fname, + "total_size": file_size, + "total_chunks": n_chunks, + "md5": md5_hash, + }, "aibot_upload_media_init") + if resp.errcode != 0: + logger.warning("WeCom upload init failed ({}): {}", resp.errcode, resp.errmsg) + return None, None + upload_id = resp.body.get("upload_id") if resp.body else None + if not upload_id: + logger.warning("WeCom upload init: no upload_id in response") + return None, None + + # Step 2: send chunks + for i, chunk in enumerate(chunk_list): + req_id = _gen_req_id("upload_chunk") + resp = await client._ws_manager.send_reply(req_id, { + "upload_id": upload_id, + "chunk_index": i, + "base64_data": base64.b64encode(chunk).decode(), + }, "aibot_upload_media_chunk") + if resp.errcode != 0: + logger.warning("WeCom upload chunk {} failed ({}): {}", i, resp.errcode, resp.errmsg) + return None, None + + # Step 3: finish + req_id = _gen_req_id("upload_finish") + resp = await client._ws_manager.send_reply(req_id, { + "upload_id": upload_id, + }, "aibot_upload_media_finish") + if resp.errcode != 0: + logger.warning("WeCom upload finish failed ({}): {}", resp.errcode, resp.errmsg) + return None, None + + media_id = resp.body.get("media_id") if resp.body else None + if not media_id: + logger.warning("WeCom upload finish: no media_id in response body={}", resp.body) + return None, None + + logger.debug("WeCom uploaded {} ({}) → media_id={}", fname, media_type, media_id[:16] + "...") + return media_id, media_type + + except Exception as e: + logger.error("WeCom _upload_media_ws error for {}: {}", file_path, e) + return None, None + async def send(self, msg: OutboundMessage) -> None: """Send a message through WeCom.""" if not self._client: @@ -343,28 +433,61 @@ class WecomChannel(BaseChannel): return try: - content = msg.content.strip() - if not content: - return + content = (msg.content or "").strip() + is_progress = bool(msg.metadata.get("_progress")) # Get the stored frame for this chat frame = self._chat_frames.get(msg.chat_id) - if not frame: - logger.warning("No frame found for chat {}, cannot reply", msg.chat_id) + + # Send media files via WebSocket upload + for file_path in msg.media or []: + if not os.path.isfile(file_path): + logger.warning("WeCom media file not found: {}", file_path) + continue + media_id, media_type = await self._upload_media_ws(self._client, file_path) + if media_id: + if frame: + await self._client.reply(frame, { + "msgtype": media_type, + media_type: {"media_id": media_id}, + }) + else: + await self._client.send_message(msg.chat_id, { + "msgtype": media_type, + media_type: {"media_id": media_id}, + }) + logger.debug("WeCom sent {} → {}", media_type, msg.chat_id) + else: + content += f"\n[file upload failed: {os.path.basename(file_path)}]" + + if not content: return - # Use streaming reply for better UX - stream_id = self._generate_req_id("stream") - - # Send as streaming message with finish=True - await self._client.reply_stream( - frame, - stream_id, - content, - finish=True, - ) - - logger.debug("WeCom message sent to {}", msg.chat_id) + if frame: + if is_progress: + # Progress messages (thinking text): send as plain reply, no streaming + await self._client.reply(frame, { + "msgtype": "text", + "text": {"content": content}, + }) + logger.debug("WeCom progress sent to {}", msg.chat_id) + else: + # Final response: use streaming reply for better UX + stream_id = self._generate_req_id("stream") + await self._client.reply_stream( + frame, + stream_id, + content, + finish=True, + ) + logger.debug("WeCom message sent to {}", msg.chat_id) + else: + # No frame (e.g. cron push): proactive send only supports markdown + await self._client.send_message(msg.chat_id, { + "msgtype": "markdown", + "markdown": {"content": content}, + }) + logger.info("WeCom proactive send to {}", msg.chat_id) except Exception as e: logger.error("Error sending WeCom message: {}", e)