feat(channels): Add full media support for QQ and WeCom channels

QQ channel improvements (on top of nightly):
- Add top-level try/except in _on_message and send() for resilience
- Use defensive getattr() for attachment attributes (botpy version compat)
- Skip file_name for image uploads to avoid QQ rendering as file attachment
- Extract only file_info from upload response to avoid extra fields
- Handle protocol-relative URLs (//...) in attachment downloads

WeCom channel improvements:
- Add _upload_media_ws() for WebSocket 3-step media upload protocol
- Send media files (image/video/voice/file) via WeCom rich media API
- Support progress messages (plain reply) vs final response (streaming)
- Support proactive send when no frame available (cron push)
- Pass media_paths to message bus for downstream processing
This commit is contained in:
gem12 2026-03-21 21:28:41 +08:00 committed by chengyongru
parent 7b1ce24600
commit 973b888d39
2 changed files with 255 additions and 102 deletions

View File

@ -242,43 +242,46 @@ class QQChannel(BaseChannel):
async def send(self, msg: OutboundMessage) -> None:
"""Send attachments first, then text."""
if not self._client:
logger.warning("QQ client not initialized")
return
try:
if not self._client:
logger.warning("QQ client not initialized")
return
msg_id = msg.metadata.get("message_id")
chat_type = self._chat_type_cache.get(msg.chat_id, "c2c")
is_group = chat_type == "group"
msg_id = msg.metadata.get("message_id")
chat_type = self._chat_type_cache.get(msg.chat_id, "c2c")
is_group = chat_type == "group"
# 1) Send media
for media_ref in msg.media or []:
ok = await self._send_media(
chat_id=msg.chat_id,
media_ref=media_ref,
msg_id=msg_id,
is_group=is_group,
)
if not ok:
filename = (
os.path.basename(urlparse(media_ref).path)
or os.path.basename(media_ref)
or "file"
# 1) Send media
for media_ref in msg.media or []:
ok = await self._send_media(
chat_id=msg.chat_id,
media_ref=media_ref,
msg_id=msg_id,
is_group=is_group,
)
if not ok:
filename = (
os.path.basename(urlparse(media_ref).path)
or os.path.basename(media_ref)
or "file"
)
await self._send_text_only(
chat_id=msg.chat_id,
is_group=is_group,
msg_id=msg_id,
content=f"[Attachment send failed: {filename}]",
)
# 2) Send text
if msg.content and msg.content.strip():
await self._send_text_only(
chat_id=msg.chat_id,
is_group=is_group,
msg_id=msg_id,
content=f"[Attachment send failed: {filename}]",
content=msg.content.strip(),
)
# 2) Send text
if msg.content and msg.content.strip():
await self._send_text_only(
chat_id=msg.chat_id,
is_group=is_group,
msg_id=msg_id,
content=msg.content.strip(),
)
except Exception:
logger.exception("Error sending QQ message to chat_id={}", msg.chat_id)
async def _send_text_only(
self,
@ -438,15 +441,26 @@ class QQChannel(BaseChannel):
endpoint = "/v2/users/{openid}/files"
id_key = "openid"
payload = {
payload: dict[str, Any] = {
id_key: chat_id,
"file_type": file_type,
"file_data": file_data,
"file_name": file_name,
"srv_send_msg": srv_send_msg,
}
# Only pass file_name for non-image types (file_type=4).
# Passing file_name for images causes QQ client to render them as
# file attachments instead of inline images.
if file_type != QQ_FILE_TYPE_IMAGE and file_name:
payload["file_name"] = file_name
route = Route("POST", endpoint, **{id_key: chat_id})
return await self._client.api._http.request(route, json=payload)
result = await self._client.api._http.request(route, json=payload)
# Extract only the file_info field to avoid extra fields (file_uuid, ttl, etc.)
# that may confuse QQ client when sending the media object.
if isinstance(result, dict) and "file_info" in result:
return {"file_info": result["file_info"]}
return result
# ---------------------------
# Inbound (receive)
@ -454,58 +468,68 @@ class QQChannel(BaseChannel):
async def _on_message(self, data: C2CMessage | GroupMessage, is_group: bool = False) -> None:
"""Parse inbound message, download attachments, and publish to the bus."""
if data.id in self._processed_ids:
return
self._processed_ids.append(data.id)
try:
if data.id in self._processed_ids:
return
self._processed_ids.append(data.id)
if is_group:
chat_id = data.group_openid
user_id = data.author.member_openid
self._chat_type_cache[chat_id] = "group"
else:
chat_id = str(
getattr(data.author, "id", None) or getattr(data.author, "user_openid", "unknown")
)
user_id = chat_id
self._chat_type_cache[chat_id] = "c2c"
content = (data.content or "").strip()
# the data used by tests don't contain attachments property
# so we use getattr with a default of [] to avoid AttributeError in tests
attachments = getattr(data, "attachments", None) or []
media_paths, recv_lines, att_meta = await self._handle_attachments(attachments)
# Compose content that always contains actionable saved paths
if recv_lines:
tag = "[Image]" if any(_is_image_name(Path(p).name) for p in media_paths) else "[File]"
file_block = "Received files:\n" + "\n".join(recv_lines)
content = f"{content}\n\n{file_block}".strip() if content else f"{tag}\n{file_block}"
if not content and not media_paths:
return
if self.config.ack_message:
try:
await self._send_text_only(
chat_id=chat_id,
is_group=is_group,
msg_id=data.id,
content=self.config.ack_message,
if is_group:
chat_id = data.group_openid
user_id = data.author.member_openid
self._chat_type_cache[chat_id] = "group"
else:
chat_id = str(
getattr(data.author, "id", None)
or getattr(data.author, "user_openid", "unknown")
)
except Exception:
logger.debug("QQ ack message failed for chat_id={}", chat_id)
user_id = chat_id
self._chat_type_cache[chat_id] = "c2c"
await self._handle_message(
sender_id=user_id,
chat_id=chat_id,
content=content,
media=media_paths if media_paths else None,
metadata={
"message_id": data.id,
"attachments": att_meta,
},
)
content = (data.content or "").strip()
# the data used by tests don't contain attachments property
# so we use getattr with a default of [] to avoid AttributeError in tests
attachments = getattr(data, "attachments", None) or []
media_paths, recv_lines, att_meta = await self._handle_attachments(attachments)
# Compose content that always contains actionable saved paths
if recv_lines:
tag = (
"[Image]"
if any(_is_image_name(Path(p).name) for p in media_paths)
else "[File]"
)
file_block = "Received files:\n" + "\n".join(recv_lines)
content = (
f"{content}\n\n{file_block}".strip() if content else f"{tag}\n{file_block}"
)
if not content and not media_paths:
return
if self.config.ack_message:
try:
await self._send_text_only(
chat_id=chat_id,
is_group=is_group,
msg_id=data.id,
content=self.config.ack_message,
)
except Exception:
logger.debug("QQ ack message failed for chat_id={}", chat_id)
await self._handle_message(
sender_id=user_id,
chat_id=chat_id,
content=content,
media=media_paths if media_paths else None,
metadata={
"message_id": data.id,
"attachments": att_meta,
},
)
except Exception:
logger.exception("Error handling QQ inbound message id={}", getattr(data, "id", "?"))
async def _handle_attachments(
self,
@ -520,7 +544,9 @@ class QQChannel(BaseChannel):
return media_paths, recv_lines, att_meta
for att in attachments:
url, filename, ctype = att.url, att.filename, att.content_type
url = getattr(att, "url", None) or ""
filename = getattr(att, "filename", None) or ""
ctype = getattr(att, "content_type", None) or ""
logger.info("Downloading file from QQ: {}", filename or url)
local_path = await self._download_to_media_dir_chunked(url, filename_hint=filename)
@ -555,6 +581,10 @@ class QQChannel(BaseChannel):
Enforces a max download size and writes to a .part temp file
that is atomically renamed on success.
"""
# Handle protocol-relative URLs (e.g. "//multimedia.nt.qq.com/...")
if url.startswith("//"):
url = f"https:{url}"
if not self._http:
self._http = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=120))

View File

@ -1,6 +1,8 @@
"""WeCom (Enterprise WeChat) channel implementation using wecom_aibot_sdk."""
import asyncio
import base64
import hashlib
import importlib.util
import os
from collections import OrderedDict
@ -217,6 +219,7 @@ class WecomChannel(BaseChannel):
chat_id = body.get("chatid", sender_id)
content_parts = []
media_paths: list[str] = []
if msg_type == "text":
text = body.get("text", {}).get("content", "")
@ -232,7 +235,8 @@ class WecomChannel(BaseChannel):
file_path = await self._download_and_save_media(file_url, aes_key, "image")
if file_path:
filename = os.path.basename(file_path)
content_parts.append(f"[image: {filename}]\n[Image: source: {file_path}]")
content_parts.append(f"[image: {filename}]")
media_paths.append(file_path)
else:
content_parts.append("[image: download failed]")
else:
@ -286,12 +290,11 @@ class WecomChannel(BaseChannel):
self._chat_frames[chat_id] = frame
# Forward to message bus
# Note: media paths are included in content for broader model compatibility
await self._handle_message(
sender_id=sender_id,
chat_id=chat_id,
content=content,
media=None,
media=media_paths or None,
metadata={
"message_id": msg_id,
"msg_type": msg_type,
@ -336,6 +339,93 @@ class WecomChannel(BaseChannel):
logger.error("Error downloading media: {}", e)
return None
async def _upload_media_ws(
self, client: Any, file_path: str,
) -> "tuple[str, str] | tuple[None, None]":
"""Upload a local file to WeCom via WebSocket 3-step protocol (base64).
Uses the WeCom WebSocket upload commands directly via
``client._ws_manager.send_reply()``:
``aibot_upload_media_init`` upload_id
``aibot_upload_media_chunk`` × N (512 KB raw per chunk, base64)
``aibot_upload_media_finish`` media_id
Returns (media_id, media_type) on success, (None, None) on failure.
"""
from wecom_aibot_sdk.utils import generate_req_id as _gen_req_id
try:
fname = os.path.basename(file_path)
ext = os.path.splitext(fname)[1].lower()
if ext in (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"):
media_type = "image"
elif ext in (".mp4", ".avi", ".mov"):
media_type = "video"
elif ext in (".amr", ".mp3", ".wav", ".ogg"):
media_type = "voice"
else:
media_type = "file"
data = open(file_path, "rb").read() # noqa: SIM115
file_size = len(data)
md5_hash = hashlib.md5(data).hexdigest() # noqa: S324
CHUNK_SIZE = 512 * 1024 # 512 KB raw (before base64)
chunk_list = [data[i : i + CHUNK_SIZE] for i in range(0, file_size, CHUNK_SIZE)]
n_chunks = len(chunk_list)
# Step 1: init
req_id = _gen_req_id("upload_init")
resp = await client._ws_manager.send_reply(req_id, {
"type": media_type,
"filename": fname,
"total_size": file_size,
"total_chunks": n_chunks,
"md5": md5_hash,
}, "aibot_upload_media_init")
if resp.errcode != 0:
logger.warning("WeCom upload init failed ({}): {}", resp.errcode, resp.errmsg)
return None, None
upload_id = resp.body.get("upload_id") if resp.body else None
if not upload_id:
logger.warning("WeCom upload init: no upload_id in response")
return None, None
# Step 2: send chunks
for i, chunk in enumerate(chunk_list):
req_id = _gen_req_id("upload_chunk")
resp = await client._ws_manager.send_reply(req_id, {
"upload_id": upload_id,
"chunk_index": i,
"base64_data": base64.b64encode(chunk).decode(),
}, "aibot_upload_media_chunk")
if resp.errcode != 0:
logger.warning("WeCom upload chunk {} failed ({}): {}", i, resp.errcode, resp.errmsg)
return None, None
# Step 3: finish
req_id = _gen_req_id("upload_finish")
resp = await client._ws_manager.send_reply(req_id, {
"upload_id": upload_id,
}, "aibot_upload_media_finish")
if resp.errcode != 0:
logger.warning("WeCom upload finish failed ({}): {}", resp.errcode, resp.errmsg)
return None, None
media_id = resp.body.get("media_id") if resp.body else None
if not media_id:
logger.warning("WeCom upload finish: no media_id in response body={}", resp.body)
return None, None
logger.debug("WeCom uploaded {} ({}) → media_id={}", fname, media_type, media_id[:16] + "...")
return media_id, media_type
except Exception as e:
logger.error("WeCom _upload_media_ws error for {}: {}", file_path, e)
return None, None
async def send(self, msg: OutboundMessage) -> None:
"""Send a message through WeCom."""
if not self._client:
@ -343,28 +433,61 @@ class WecomChannel(BaseChannel):
return
try:
content = msg.content.strip()
if not content:
return
content = (msg.content or "").strip()
is_progress = bool(msg.metadata.get("_progress"))
# Get the stored frame for this chat
frame = self._chat_frames.get(msg.chat_id)
if not frame:
logger.warning("No frame found for chat {}, cannot reply", msg.chat_id)
# Send media files via WebSocket upload
for file_path in msg.media or []:
if not os.path.isfile(file_path):
logger.warning("WeCom media file not found: {}", file_path)
continue
media_id, media_type = await self._upload_media_ws(self._client, file_path)
if media_id:
if frame:
await self._client.reply(frame, {
"msgtype": media_type,
media_type: {"media_id": media_id},
})
else:
await self._client.send_message(msg.chat_id, {
"msgtype": media_type,
media_type: {"media_id": media_id},
})
logger.debug("WeCom sent {}{}", media_type, msg.chat_id)
else:
content += f"\n[file upload failed: {os.path.basename(file_path)}]"
if not content:
return
# Use streaming reply for better UX
stream_id = self._generate_req_id("stream")
# Send as streaming message with finish=True
await self._client.reply_stream(
frame,
stream_id,
content,
finish=True,
)
logger.debug("WeCom message sent to {}", msg.chat_id)
if frame:
if is_progress:
# Progress messages (thinking text): send as plain reply, no streaming
await self._client.reply(frame, {
"msgtype": "text",
"text": {"content": content},
})
logger.debug("WeCom progress sent to {}", msg.chat_id)
else:
# Final response: use streaming reply for better UX
stream_id = self._generate_req_id("stream")
await self._client.reply_stream(
frame,
stream_id,
content,
finish=True,
)
logger.debug("WeCom message sent to {}", msg.chat_id)
else:
# No frame (e.g. cron push): proactive send only supports markdown
await self._client.send_message(msg.chat_id, {
"msgtype": "markdown",
"markdown": {"content": content},
})
logger.info("WeCom proactive send to {}", msg.chat_id)
except Exception as e:
logger.error("Error sending WeCom message: {}", e)