feat(weixin): add fallback logic for referenced media download

This commit is contained in:
xcosmosbox 2026-03-29 15:19:57 +08:00
parent 0207b541df
commit 2abd990b89
2 changed files with 120 additions and 0 deletions

View File

@ -691,6 +691,52 @@ class WeixinChannel(BaseChannel):
else:
content_parts.append("[video]")
# Fallback: when no top-level media was downloaded, try quoted/referenced media.
# This aligns with the reference plugin behavior that checks ref_msg.message_item
# when main item_list has no downloadable media.
if not media_paths:
ref_media_item: dict[str, Any] | None = None
for item in item_list:
if item.get("type", 0) != ITEM_TEXT:
continue
ref = item.get("ref_msg") or {}
candidate = ref.get("message_item") or {}
if candidate.get("type", 0) in (ITEM_IMAGE, ITEM_VOICE, ITEM_FILE, ITEM_VIDEO):
ref_media_item = candidate
break
if ref_media_item:
ref_type = ref_media_item.get("type", 0)
if ref_type == ITEM_IMAGE:
image_item = ref_media_item.get("image_item") or {}
file_path = await self._download_media_item(image_item, "image")
if file_path:
content_parts.append(f"[image]\n[Image: source: {file_path}]")
media_paths.append(file_path)
elif ref_type == ITEM_VOICE:
voice_item = ref_media_item.get("voice_item") or {}
file_path = await self._download_media_item(voice_item, "voice")
if file_path:
transcription = await self.transcribe_audio(file_path)
if transcription:
content_parts.append(f"[voice] {transcription}")
else:
content_parts.append(f"[voice]\n[Audio: source: {file_path}]")
media_paths.append(file_path)
elif ref_type == ITEM_FILE:
file_item = ref_media_item.get("file_item") or {}
file_name = file_item.get("file_name", "unknown")
file_path = await self._download_media_item(file_item, "file", file_name)
if file_path:
content_parts.append(f"[file: {file_name}]\n[File: source: {file_path}]")
media_paths.append(file_path)
elif ref_type == ITEM_VIDEO:
video_item = ref_media_item.get("video_item") or {}
file_path = await self._download_media_item(video_item, "video")
if file_path:
content_parts.append(f"[video]\n[Video: source: {file_path}]")
media_paths.append(file_path)
content = "\n".join(content_parts)
if not content:
return

View File

@ -176,6 +176,80 @@ async def test_process_message_extracts_media_and_preserves_paths() -> None:
assert inbound.media == ["/tmp/test.jpg"]
@pytest.mark.asyncio
async def test_process_message_falls_back_to_referenced_media_when_no_top_level_media() -> None:
channel, bus = _make_channel()
channel._download_media_item = AsyncMock(return_value="/tmp/ref.jpg")
await channel._process_message(
{
"message_type": 1,
"message_id": "m3-ref-fallback",
"from_user_id": "wx-user",
"context_token": "ctx-3-ref-fallback",
"item_list": [
{
"type": ITEM_TEXT,
"text_item": {"text": "reply to image"},
"ref_msg": {
"message_item": {
"type": ITEM_IMAGE,
"image_item": {"media": {"encrypt_query_param": "ref-enc"}},
},
},
},
],
}
)
inbound = await asyncio.wait_for(bus.consume_inbound(), timeout=1.0)
channel._download_media_item.assert_awaited_once_with(
{"media": {"encrypt_query_param": "ref-enc"}},
"image",
)
assert inbound.media == ["/tmp/ref.jpg"]
assert "reply to image" in inbound.content
assert "[image]" in inbound.content
@pytest.mark.asyncio
async def test_process_message_does_not_use_referenced_fallback_when_top_level_media_exists() -> None:
channel, bus = _make_channel()
channel._download_media_item = AsyncMock(side_effect=["/tmp/top.jpg", "/tmp/ref.jpg"])
await channel._process_message(
{
"message_type": 1,
"message_id": "m3-ref-no-fallback",
"from_user_id": "wx-user",
"context_token": "ctx-3-ref-no-fallback",
"item_list": [
{"type": ITEM_IMAGE, "image_item": {"media": {"encrypt_query_param": "top-enc"}}},
{
"type": ITEM_TEXT,
"text_item": {"text": "has top-level media"},
"ref_msg": {
"message_item": {
"type": ITEM_IMAGE,
"image_item": {"media": {"encrypt_query_param": "ref-enc"}},
},
},
},
],
}
)
inbound = await asyncio.wait_for(bus.consume_inbound(), timeout=1.0)
channel._download_media_item.assert_awaited_once_with(
{"media": {"encrypt_query_param": "top-enc"}},
"image",
)
assert inbound.media == ["/tmp/top.jpg"]
assert "/tmp/ref.jpg" not in inbound.content
@pytest.mark.asyncio
async def test_send_without_context_token_does_not_send_text() -> None:
channel, _bus = _make_channel()