test(channels): add media support tests for QQ and WeCom channels

Cover helpers (sanitize_filename, guess media type), outbound send
(exception handling, media-then-text order, fallback), inbound message
processing (attachments, dedup, empty content), _post_base64file
payload filtering, and WeCom upload/download flows.
This commit is contained in:
chengyongru 2026-04-10 17:02:08 +08:00 committed by chengyongru
parent 4b0fdffe39
commit 4a33c1392b
2 changed files with 887 additions and 0 deletions

View File

@ -0,0 +1,304 @@
"""Tests for QQ channel media support: helpers, send, inbound, and upload."""
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
try:
from nanobot.channels import qq
QQ_AVAILABLE = getattr(qq, "QQ_AVAILABLE", False)
except ImportError:
QQ_AVAILABLE = False
if not QQ_AVAILABLE:
pytest.skip("QQ dependencies not installed (qq-botpy)", allow_module_level=True)
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.qq import (
QQ_FILE_TYPE_FILE,
QQ_FILE_TYPE_IMAGE,
QQChannel,
QQConfig,
_guess_send_file_type,
_is_image_name,
_sanitize_filename,
)
class _FakeApi:
def __init__(self) -> None:
self.c2c_calls: list[dict] = []
self.group_calls: list[dict] = []
async def post_c2c_message(self, **kwargs) -> None:
self.c2c_calls.append(kwargs)
async def post_group_message(self, **kwargs) -> None:
self.group_calls.append(kwargs)
class _FakeHttp:
"""Fake _http for _post_base64file tests."""
def __init__(self, return_value: dict | None = None) -> None:
self.return_value = return_value or {}
self.calls: list[tuple] = []
async def request(self, route, **kwargs):
self.calls.append((route, kwargs))
return self.return_value
class _FakeClient:
def __init__(self, http_return: dict | None = None) -> None:
self.api = _FakeApi()
self.api._http = _FakeHttp(http_return)
# ── Helper function tests (pure, no async) ──────────────────────────
def test_sanitize_filename_strips_path_traversal() -> None:
assert _sanitize_filename("../../etc/passwd") == "passwd"
def test_sanitize_filename_keeps_chinese_chars() -> None:
assert _sanitize_filename("文件1.jpg") == "文件1.jpg"
def test_sanitize_filename_strips_unsafe_chars() -> None:
result = _sanitize_filename('file<>:"|?*.txt')
# All unsafe chars replaced with "_", but * is replaced too
assert result.startswith("file")
assert result.endswith(".txt")
assert "<" not in result
assert ">" not in result
assert '"' not in result
assert "|" not in result
assert "?" not in result
def test_sanitize_filename_empty_input() -> None:
assert _sanitize_filename("") == ""
def test_is_image_name_with_known_extensions() -> None:
for ext in (".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp", ".tif", ".tiff", ".ico", ".svg"):
assert _is_image_name(f"photo{ext}") is True
def test_is_image_name_with_unknown_extension() -> None:
for ext in (".pdf", ".txt", ".mp3", ".mp4"):
assert _is_image_name(f"doc{ext}") is False
def test_guess_send_file_type_image() -> None:
assert _guess_send_file_type("photo.png") == QQ_FILE_TYPE_IMAGE
assert _guess_send_file_type("pic.jpg") == QQ_FILE_TYPE_IMAGE
def test_guess_send_file_type_file() -> None:
assert _guess_send_file_type("doc.pdf") == QQ_FILE_TYPE_FILE
def test_guess_send_file_type_by_mime() -> None:
# A filename with no known extension but whose mime type is image/*
assert _guess_send_file_type("photo.xyz_image_test") == QQ_FILE_TYPE_FILE
# ── send() exception handling ───────────────────────────────────────
@pytest.mark.asyncio
async def test_send_exception_caught_not_raised() -> None:
"""Exceptions inside send() must not propagate."""
channel = QQChannel(QQConfig(app_id="app", secret="secret", allow_from=["*"]), MessageBus())
channel._client = _FakeClient()
with patch.object(channel, "_send_text_only", new_callable=AsyncMock, side_effect=RuntimeError("boom")):
await channel.send(
OutboundMessage(channel="qq", chat_id="user1", content="hello")
)
# No exception raised — test passes if we get here.
@pytest.mark.asyncio
async def test_send_media_then_text() -> None:
"""Media is sent before text when both are present."""
import tempfile
channel = QQChannel(QQConfig(app_id="app", secret="secret", allow_from=["*"]), MessageBus())
channel._client = _FakeClient()
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
f.write(b"\x89PNG\r\n")
tmp = f.name
try:
with patch.object(channel, "_post_base64file", new_callable=AsyncMock, return_value={"file_info": "1"}) as mock_upload:
await channel.send(
OutboundMessage(
channel="qq",
chat_id="user1",
content="text after image",
media=[tmp],
metadata={"message_id": "m1"},
)
)
assert mock_upload.called
# Text should have been sent via c2c (default chat type)
text_calls = [c for c in channel._client.api.c2c_calls if c.get("msg_type") == 0]
assert len(text_calls) >= 1
assert text_calls[-1]["content"] == "text after image"
finally:
import os
os.unlink(tmp)
@pytest.mark.asyncio
async def test_send_media_failure_falls_back_to_text() -> None:
"""When _send_media returns False, a failure notice is appended."""
channel = QQChannel(QQConfig(app_id="app", secret="secret", allow_from=["*"]), MessageBus())
channel._client = _FakeClient()
with patch.object(channel, "_send_media", new_callable=AsyncMock, return_value=False):
await channel.send(
OutboundMessage(
channel="qq",
chat_id="user1",
content="hello",
media=["https://example.com/bad.png"],
metadata={"message_id": "m1"},
)
)
# Should have the failure text among the c2c calls
failure_calls = [c for c in channel._client.api.c2c_calls if "Attachment send failed" in c.get("content", "")]
assert len(failure_calls) == 1
assert "bad.png" in failure_calls[0]["content"]
# ── _on_message() exception handling ────────────────────────────────
@pytest.mark.asyncio
async def test_on_message_exception_caught_not_raised() -> None:
"""Missing required attributes should not crash _on_message."""
channel = QQChannel(QQConfig(app_id="app", secret="secret", allow_from=["*"]), MessageBus())
channel._client = _FakeClient()
# Construct a message-like object that lacks 'author' — triggers AttributeError
bad_data = SimpleNamespace(id="x1", content="hi")
# Should not raise
await channel._on_message(bad_data, is_group=False)
@pytest.mark.asyncio
async def test_on_message_with_attachments() -> None:
"""Messages with attachments produce media_paths and formatted content."""
import tempfile
channel = QQChannel(QQConfig(app_id="app", secret="secret", allow_from=["*"]), MessageBus())
channel._client = _FakeClient()
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
f.write(b"\x89PNG\r\n")
saved_path = f.name
att = SimpleNamespace(url="", filename="screenshot.png", content_type="image/png")
# Patch _download_to_media_dir_chunked to return the temp file path
async def fake_download(url, filename_hint=""):
return saved_path
try:
with patch.object(channel, "_download_to_media_dir_chunked", side_effect=fake_download):
data = SimpleNamespace(
id="att1",
content="look at this",
author=SimpleNamespace(user_openid="u1"),
attachments=[att],
)
await channel._on_message(data, is_group=False)
msg = await channel.bus.consume_inbound()
assert "look at this" in msg.content
assert "screenshot.png" in msg.content
assert "Received files:" in msg.content
assert len(msg.media) == 1
assert msg.media[0] == saved_path
finally:
import os
os.unlink(saved_path)
# ── _post_base64file() ─────────────────────────────────────────────
@pytest.mark.asyncio
async def test_post_base64file_omits_file_name_for_images() -> None:
"""file_type=1 (image) → payload must not contain file_name."""
channel = QQChannel(QQConfig(app_id="app", secret="secret"), MessageBus())
channel._client = _FakeClient(http_return={"file_info": "img_abc"})
await channel._post_base64file(
chat_id="user1",
is_group=False,
file_type=QQ_FILE_TYPE_IMAGE,
file_data="ZmFrZQ==",
file_name="photo.png",
)
http = channel._client.api._http
assert len(http.calls) == 1
payload = http.calls[0][1]["json"]
assert "file_name" not in payload
assert payload["file_type"] == QQ_FILE_TYPE_IMAGE
@pytest.mark.asyncio
async def test_post_base64file_includes_file_name_for_files() -> None:
"""file_type=4 (file) → payload must contain file_name."""
channel = QQChannel(QQConfig(app_id="app", secret="secret"), MessageBus())
channel._client = _FakeClient(http_return={"file_info": "file_abc"})
await channel._post_base64file(
chat_id="user1",
is_group=False,
file_type=QQ_FILE_TYPE_FILE,
file_data="ZmFrZQ==",
file_name="report.pdf",
)
http = channel._client.api._http
assert len(http.calls) == 1
payload = http.calls[0][1]["json"]
assert payload["file_name"] == "report.pdf"
assert payload["file_type"] == QQ_FILE_TYPE_FILE
@pytest.mark.asyncio
async def test_post_base64file_filters_response_to_file_info() -> None:
"""Response with file_info + extra fields must be filtered to only file_info."""
channel = QQChannel(QQConfig(app_id="app", secret="secret"), MessageBus())
channel._client = _FakeClient(http_return={
"file_info": "fi_123",
"file_uuid": "uuid_xxx",
"ttl": 3600,
})
result = await channel._post_base64file(
chat_id="user1",
is_group=False,
file_type=QQ_FILE_TYPE_FILE,
file_data="ZmFrZQ==",
file_name="doc.pdf",
)
assert result == {"file_info": "fi_123"}
assert "file_uuid" not in result
assert "ttl" not in result

View File

@ -0,0 +1,583 @@
"""Tests for WeCom channel: helpers, download, upload, send, and message processing."""
import os
import tempfile
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
try:
import importlib.util
WECOM_AVAILABLE = importlib.util.find_spec("wecom_aibot_sdk") is not None
except ImportError:
WECOM_AVAILABLE = False
if not WECOM_AVAILABLE:
pytest.skip("WeCom dependencies not installed (wecom_aibot_sdk)", allow_module_level=True)
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.wecom import (
WecomChannel,
WecomConfig,
_guess_wecom_media_type,
_sanitize_filename,
)
# Try to import the real response class; fall back to a stub if unavailable.
try:
from wecom_aibot_sdk.utils import WsResponse
_RealWsResponse = WsResponse
except ImportError:
_RealWsResponse = None
class _FakeResponse:
"""Minimal stand-in for wecom_aibot_sdk WsResponse."""
def __init__(self, errcode: int = 0, body: dict | None = None, errmsg: str = "ok"):
self.errcode = errcode
self.errmsg = errmsg
self.body = body or {}
class _FakeWsManager:
"""Tracks send_reply calls and returns configurable responses."""
def __init__(self, responses: list[_FakeResponse] | None = None):
self.responses = responses or []
self.calls: list[tuple[str, dict, str]] = []
self._idx = 0
async def send_reply(self, req_id: str, data: dict, cmd: str) -> _FakeResponse:
self.calls.append((req_id, data, cmd))
if self._idx < len(self.responses):
resp = self.responses[self._idx]
self._idx += 1
return resp
return _FakeResponse()
class _FakeFrame:
"""Minimal frame object with a body dict."""
def __init__(self, body: dict | None = None):
self.body = body or {}
class _FakeWeComClient:
"""Fake WeCom client with mock methods."""
def __init__(self, ws_responses: list[_FakeResponse] | None = None):
self._ws_manager = _FakeWsManager(ws_responses)
self.download_file = AsyncMock(return_value=(None, None))
self.reply = AsyncMock()
self.reply_stream = AsyncMock()
self.send_message = AsyncMock()
self.reply_welcome = AsyncMock()
# ── Helper function tests (pure, no async) ──────────────────────────
def test_sanitize_filename_strips_path_traversal() -> None:
assert _sanitize_filename("../../etc/passwd") == "passwd"
def test_sanitize_filename_keeps_chinese_chars() -> None:
assert _sanitize_filename("文件1.jpg") == "文件1.jpg"
def test_sanitize_filename_empty_input() -> None:
assert _sanitize_filename("") == ""
def test_guess_wecom_media_type_image() -> None:
for ext in (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"):
assert _guess_wecom_media_type(f"photo{ext}") == "image"
def test_guess_wecom_media_type_video() -> None:
for ext in (".mp4", ".avi", ".mov"):
assert _guess_wecom_media_type(f"video{ext}") == "video"
def test_guess_wecom_media_type_voice() -> None:
for ext in (".amr", ".mp3", ".wav", ".ogg"):
assert _guess_wecom_media_type(f"audio{ext}") == "voice"
def test_guess_wecom_media_type_file_fallback() -> None:
for ext in (".pdf", ".doc", ".xlsx", ".zip"):
assert _guess_wecom_media_type(f"doc{ext}") == "file"
def test_guess_wecom_media_type_case_insensitive() -> None:
assert _guess_wecom_media_type("photo.PNG") == "image"
assert _guess_wecom_media_type("photo.Jpg") == "image"
# ── _download_and_save_media() ──────────────────────────────────────
@pytest.mark.asyncio
async def test_download_and_save_success() -> None:
"""Successful download writes file and returns sanitized path."""
channel = WecomChannel(WecomConfig(bot_id="b", secret="s", allow_from=["*"]), MessageBus())
client = _FakeWeComClient()
channel._client = client
fake_data = b"\x89PNG\r\nfake image"
client.download_file.return_value = (fake_data, "raw_photo.png")
with patch("nanobot.channels.wecom.get_media_dir", return_value=Path(tempfile.gettempdir())):
path = await channel._download_and_save_media("https://example.com/img.png", "aes_key", "image", "photo.png")
assert path is not None
assert os.path.isfile(path)
assert os.path.basename(path) == "photo.png"
# Cleanup
os.unlink(path)
@pytest.mark.asyncio
async def test_download_and_save_oversized_rejected() -> None:
"""Data exceeding 200MB is rejected → returns None."""
channel = WecomChannel(WecomConfig(bot_id="b", secret="s", allow_from=["*"]), MessageBus())
client = _FakeWeComClient()
channel._client = client
big_data = b"\x00" * (200 * 1024 * 1024 + 1) # 200MB + 1 byte
client.download_file.return_value = (big_data, "big.bin")
with patch("nanobot.channels.wecom.get_media_dir", return_value=Path(tempfile.gettempdir())):
result = await channel._download_and_save_media("https://example.com/big.bin", "key", "file", "big.bin")
assert result is None
@pytest.mark.asyncio
async def test_download_and_save_failure() -> None:
"""SDK returns None data → returns None."""
channel = WecomChannel(WecomConfig(bot_id="b", secret="s", allow_from=["*"]), MessageBus())
client = _FakeWeComClient()
channel._client = client
client.download_file.return_value = (None, None)
with patch("nanobot.channels.wecom.get_media_dir", return_value=Path(tempfile.gettempdir())):
result = await channel._download_and_save_media("https://example.com/fail.png", "key", "image")
assert result is None
# ── _upload_media_ws() ──────────────────────────────────────────────
@pytest.mark.asyncio
async def test_upload_media_ws_success() -> None:
"""Happy path: init → chunk → finish → returns (media_id, media_type)."""
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
f.write(b"\x89PNG\r\n")
tmp = f.name
try:
responses = [
_FakeResponse(errcode=0, body={"upload_id": "up_1"}),
_FakeResponse(errcode=0, body={}),
_FakeResponse(errcode=0, body={"media_id": "media_abc"}),
]
client = _FakeWeComClient(responses)
channel = WecomChannel(WecomConfig(bot_id="b", secret="s", allow_from=["*"]), MessageBus())
channel._client = client
with patch("wecom_aibot_sdk.utils.generate_req_id", side_effect=lambda x: f"req_{x}"):
media_id, media_type = await channel._upload_media_ws(client, tmp)
assert media_id == "media_abc"
assert media_type == "image"
finally:
os.unlink(tmp)
@pytest.mark.asyncio
async def test_upload_media_ws_oversized_file() -> None:
"""File >200MB triggers ValueError → returns (None, None)."""
# Instead of creating a real 200MB+ file, mock os.path.getsize and open
with patch("os.path.getsize", return_value=200 * 1024 * 1024 + 1), \
patch("builtins.open", MagicMock()):
client = _FakeWeComClient()
channel = WecomChannel(WecomConfig(bot_id="b", secret="s", allow_from=["*"]), MessageBus())
channel._client = client
result = await channel._upload_media_ws(client, "/fake/large.bin")
assert result == (None, None)
@pytest.mark.asyncio
async def test_upload_media_ws_init_failure() -> None:
"""Init step returns errcode != 0 → returns (None, None)."""
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f:
f.write(b"hello")
tmp = f.name
try:
responses = [
_FakeResponse(errcode=50001, errmsg="invalid"),
]
client = _FakeWeComClient(responses)
channel = WecomChannel(WecomConfig(bot_id="b", secret="s", allow_from=["*"]), MessageBus())
channel._client = client
with patch("wecom_aibot_sdk.utils.generate_req_id", side_effect=lambda x: f"req_{x}"):
result = await channel._upload_media_ws(client, tmp)
assert result == (None, None)
finally:
os.unlink(tmp)
@pytest.mark.asyncio
async def test_upload_media_ws_chunk_failure() -> None:
"""Chunk step returns errcode != 0 → returns (None, None)."""
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
f.write(b"\x89PNG\r\n")
tmp = f.name
try:
responses = [
_FakeResponse(errcode=0, body={"upload_id": "up_1"}),
_FakeResponse(errcode=50002, errmsg="chunk fail"),
]
client = _FakeWeComClient(responses)
channel = WecomChannel(WecomConfig(bot_id="b", secret="s", allow_from=["*"]), MessageBus())
channel._client = client
with patch("wecom_aibot_sdk.utils.generate_req_id", side_effect=lambda x: f"req_{x}"):
result = await channel._upload_media_ws(client, tmp)
assert result == (None, None)
finally:
os.unlink(tmp)
@pytest.mark.asyncio
async def test_upload_media_ws_finish_no_media_id() -> None:
"""Finish step returns empty media_id → returns (None, None)."""
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
f.write(b"\x89PNG\r\n")
tmp = f.name
try:
responses = [
_FakeResponse(errcode=0, body={"upload_id": "up_1"}),
_FakeResponse(errcode=0, body={}),
_FakeResponse(errcode=0, body={}), # no media_id
]
client = _FakeWeComClient(responses)
channel = WecomChannel(WecomConfig(bot_id="b", secret="s", allow_from=["*"]), MessageBus())
channel._client = client
with patch("wecom_aibot_sdk.utils.generate_req_id", side_effect=lambda x: f"req_{x}"):
result = await channel._upload_media_ws(client, tmp)
assert result == (None, None)
finally:
os.unlink(tmp)
# ── send() ──────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_send_text_with_frame() -> None:
"""When frame is stored, send uses reply_stream for final text."""
channel = WecomChannel(WecomConfig(bot_id="b", secret="s", allow_from=["*"]), MessageBus())
client = _FakeWeComClient()
channel._client = client
channel._generate_req_id = lambda x: f"req_{x}"
channel._chat_frames["chat1"] = _FakeFrame()
await channel.send(
OutboundMessage(channel="wecom", chat_id="chat1", content="hello")
)
client.reply_stream.assert_called_once()
call_args = client.reply_stream.call_args
assert call_args[0][2] == "hello" # content arg
@pytest.mark.asyncio
async def test_send_progress_with_frame() -> None:
"""When metadata has _progress, send uses reply (not reply_stream)."""
channel = WecomChannel(WecomConfig(bot_id="b", secret="s", allow_from=["*"]), MessageBus())
client = _FakeWeComClient()
channel._client = client
channel._chat_frames["chat1"] = _FakeFrame()
await channel.send(
OutboundMessage(channel="wecom", chat_id="chat1", content="thinking...", metadata={"_progress": True})
)
client.reply.assert_called_once()
client.reply_stream.assert_not_called()
call_args = client.reply.call_args
assert call_args[0][1]["text"]["content"] == "thinking..."
@pytest.mark.asyncio
async def test_send_proactive_without_frame() -> None:
"""Without stored frame, send uses send_message with markdown."""
channel = WecomChannel(WecomConfig(bot_id="b", secret="s", allow_from=["*"]), MessageBus())
client = _FakeWeComClient()
channel._client = client
await channel.send(
OutboundMessage(channel="wecom", chat_id="chat1", content="proactive msg")
)
client.send_message.assert_called_once()
call_args = client.send_message.call_args
assert call_args[0][0] == "chat1"
assert call_args[0][1]["msgtype"] == "markdown"
@pytest.mark.asyncio
async def test_send_media_then_text() -> None:
"""Media files are uploaded and sent before text content."""
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
f.write(b"\x89PNG\r\n")
tmp = f.name
try:
responses = [
_FakeResponse(errcode=0, body={"upload_id": "up_1"}),
_FakeResponse(errcode=0, body={}),
_FakeResponse(errcode=0, body={"media_id": "media_123"}),
]
channel = WecomChannel(WecomConfig(bot_id="b", secret="s", allow_from=["*"]), MessageBus())
client = _FakeWeComClient(responses)
channel._client = client
channel._generate_req_id = lambda x: f"req_{x}"
channel._chat_frames["chat1"] = _FakeFrame()
await channel.send(
OutboundMessage(channel="wecom", chat_id="chat1", content="see image", media=[tmp])
)
# Media should have been sent via reply
media_calls = [c for c in client.reply.call_args_list if c[0][1].get("msgtype") == "image"]
assert len(media_calls) == 1
assert media_calls[0][0][1]["image"]["media_id"] == "media_123"
# Text should have been sent via reply_stream
client.reply_stream.assert_called_once()
finally:
os.unlink(tmp)
@pytest.mark.asyncio
async def test_send_media_file_not_found() -> None:
"""Non-existent media path is skipped with a warning."""
channel = WecomChannel(WecomConfig(bot_id="b", secret="s", allow_from=["*"]), MessageBus())
client = _FakeWeComClient()
channel._client = client
channel._generate_req_id = lambda x: f"req_{x}"
channel._chat_frames["chat1"] = _FakeFrame()
await channel.send(
OutboundMessage(channel="wecom", chat_id="chat1", content="hello", media=["/nonexistent/file.png"])
)
# reply_stream should still be called for the text part
client.reply_stream.assert_called_once()
# No media reply should happen
media_calls = [c for c in client.reply.call_args_list if c[0][1].get("msgtype") in ("image", "file", "video")]
assert len(media_calls) == 0
@pytest.mark.asyncio
async def test_send_exception_caught_not_raised() -> None:
"""Exceptions inside send() must not propagate."""
channel = WecomChannel(WecomConfig(bot_id="b", secret="s", allow_from=["*"]), MessageBus())
client = _FakeWeComClient()
channel._client = client
channel._generate_req_id = lambda x: f"req_{x}"
channel._chat_frames["chat1"] = _FakeFrame()
# Make reply_stream raise
client.reply_stream.side_effect = RuntimeError("boom")
await channel.send(
OutboundMessage(channel="wecom", chat_id="chat1", content="fail test")
)
# No exception — test passes if we reach here.
# ── _process_message() ──────────────────────────────────────────────
@pytest.mark.asyncio
async def test_process_text_message() -> None:
"""Text message is routed to bus with correct fields."""
channel = WecomChannel(WecomConfig(bot_id="b", secret="s", allow_from=["user1"]), MessageBus())
client = _FakeWeComClient()
channel._client = client
frame = _FakeFrame(body={
"msgid": "msg_text_1",
"chatid": "chat1",
"chattype": "single",
"from": {"userid": "user1"},
"text": {"content": "hello wecom"},
})
await channel._process_message(frame, "text")
msg = await channel.bus.consume_inbound()
assert msg.sender_id == "user1"
assert msg.chat_id == "chat1"
assert msg.content == "hello wecom"
assert msg.metadata["msg_type"] == "text"
@pytest.mark.asyncio
async def test_process_image_message() -> None:
"""Image message: download success → media_paths non-empty."""
channel = WecomChannel(WecomConfig(bot_id="b", secret="s", allow_from=["user1"]), MessageBus())
client = _FakeWeComClient()
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
f.write(b"\x89PNG\r\n")
saved = f.name
client.download_file.return_value = (b"\x89PNG\r\n", "photo.png")
channel._client = client
try:
with patch("nanobot.channels.wecom.get_media_dir", return_value=Path(os.path.dirname(saved))):
frame = _FakeFrame(body={
"msgid": "msg_img_1",
"chatid": "chat1",
"from": {"userid": "user1"},
"image": {"url": "https://example.com/img.png", "aeskey": "key123"},
})
await channel._process_message(frame, "image")
msg = await channel.bus.consume_inbound()
assert len(msg.media) == 1
assert msg.media[0].endswith("photo.png")
assert "[image:" in msg.content
finally:
if os.path.exists(saved):
pass # may have been overwritten; clean up if exists
# Clean up any photo.png in tempdir
p = os.path.join(os.path.dirname(saved), "photo.png")
if os.path.exists(p):
os.unlink(p)
@pytest.mark.asyncio
async def test_process_file_message() -> None:
"""File message: download success → media_paths non-empty (critical fix verification)."""
channel = WecomChannel(WecomConfig(bot_id="b", secret="s", allow_from=["user1"]), MessageBus())
client = _FakeWeComClient()
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as f:
f.write(b"%PDF-1.4 fake")
saved = f.name
client.download_file.return_value = (b"%PDF-1.4 fake", "report.pdf")
channel._client = client
try:
with patch("nanobot.channels.wecom.get_media_dir", return_value=Path(os.path.dirname(saved))):
frame = _FakeFrame(body={
"msgid": "msg_file_1",
"chatid": "chat1",
"from": {"userid": "user1"},
"file": {"url": "https://example.com/report.pdf", "aeskey": "key456", "name": "report.pdf"},
})
await channel._process_message(frame, "file")
msg = await channel.bus.consume_inbound()
assert len(msg.media) == 1
assert msg.media[0].endswith("report.pdf")
assert "[file: report.pdf]" in msg.content
finally:
p = os.path.join(os.path.dirname(saved), "report.pdf")
if os.path.exists(p):
os.unlink(p)
@pytest.mark.asyncio
async def test_process_voice_message() -> None:
"""Voice message: transcribed text is included in content."""
channel = WecomChannel(WecomConfig(bot_id="b", secret="s", allow_from=["user1"]), MessageBus())
client = _FakeWeComClient()
channel._client = client
frame = _FakeFrame(body={
"msgid": "msg_voice_1",
"chatid": "chat1",
"from": {"userid": "user1"},
"voice": {"content": "transcribed text here"},
})
await channel._process_message(frame, "voice")
msg = await channel.bus.consume_inbound()
assert "transcribed text here" in msg.content
assert "[voice]" in msg.content
@pytest.mark.asyncio
async def test_process_message_deduplication() -> None:
"""Same msg_id is not processed twice."""
channel = WecomChannel(WecomConfig(bot_id="b", secret="s", allow_from=["user1"]), MessageBus())
client = _FakeWeComClient()
channel._client = client
frame = _FakeFrame(body={
"msgid": "msg_dup_1",
"chatid": "chat1",
"from": {"userid": "user1"},
"text": {"content": "once"},
})
await channel._process_message(frame, "text")
await channel._process_message(frame, "text")
msg = await channel.bus.consume_inbound()
assert msg.content == "once"
# Second message should not appear on the bus
assert channel.bus.inbound.empty()
@pytest.mark.asyncio
async def test_process_message_empty_content_skipped() -> None:
"""Message with empty content produces no bus message."""
channel = WecomChannel(WecomConfig(bot_id="b", secret="s", allow_from=["user1"]), MessageBus())
client = _FakeWeComClient()
channel._client = client
frame = _FakeFrame(body={
"msgid": "msg_empty_1",
"chatid": "chat1",
"from": {"userid": "user1"},
"text": {"content": ""},
})
await channel._process_message(frame, "text")
assert channel.bus.inbound.empty()