From e04e1c24ff6e775d306a757542b43f3640974c93 Mon Sep 17 00:00:00 2001 From: xcosmosbox <2162381070@qq.com> Date: Sun, 29 Mar 2026 13:01:44 +0800 Subject: [PATCH 01/10] feat(weixin): 1.align protocol headers with package.json metadata 2.support upload_full_url with fallback to upload_param --- nanobot/channels/weixin.py | 66 +++++++++++++++++++++------ tests/channels/test_weixin_channel.py | 64 +++++++++++++++++++++++++- 2 files changed, 116 insertions(+), 14 deletions(-) diff --git a/nanobot/channels/weixin.py b/nanobot/channels/weixin.py index f09ef95f7..3b62a7260 100644 --- a/nanobot/channels/weixin.py +++ b/nanobot/channels/weixin.py @@ -53,7 +53,41 @@ MESSAGE_TYPE_BOT = 2 MESSAGE_STATE_FINISH = 2 WEIXIN_MAX_MESSAGE_LEN = 4000 -WEIXIN_CHANNEL_VERSION = "1.0.3" + + +def _read_reference_package_meta() -> dict[str, str]: + """Best-effort read of reference `package/package.json` metadata.""" + try: + pkg_path = Path(__file__).resolve().parents[2] / "package" / "package.json" + data = json.loads(pkg_path.read_text(encoding="utf-8")) + return { + "version": str(data.get("version", "") or ""), + "ilink_appid": str(data.get("ilink_appid", "") or ""), + } + except Exception: + return {"version": "", "ilink_appid": ""} + + +def _build_client_version(version: str) -> int: + """Encode semantic version as 0x00MMNNPP (major/minor/patch in one uint32).""" + parts = version.split(".") + + def _as_int(idx: int) -> int: + try: + return int(parts[idx]) + except Exception: + return 0 + + major = _as_int(0) + minor = _as_int(1) + patch = _as_int(2) + return ((major & 0xFF) << 16) | ((minor & 0xFF) << 8) | (patch & 0xFF) + + +_PKG_META = _read_reference_package_meta() +WEIXIN_CHANNEL_VERSION = _PKG_META["version"] or "unknown" +ILINK_APP_ID = _PKG_META["ilink_appid"] +ILINK_APP_CLIENT_VERSION = _build_client_version(_PKG_META["version"] or "0.0.0") BASE_INFO: dict[str, str] = {"channel_version": WEIXIN_CHANNEL_VERSION} # Session-expired error code @@ -199,6 +233,8 @@ class WeixinChannel(BaseChannel): "X-WECHAT-UIN": self._random_wechat_uin(), "Content-Type": "application/json", "AuthorizationType": "ilink_bot_token", + "iLink-App-Id": ILINK_APP_ID, + "iLink-App-ClientVersion": str(ILINK_APP_CLIENT_VERSION), } if auth and self._token: headers["Authorization"] = f"Bearer {self._token}" @@ -267,13 +303,10 @@ class WeixinChannel(BaseChannel): logger.info("Waiting for QR code scan...") while self._running: try: - # Reference plugin sends iLink-App-ClientVersion header for - # QR status polling (login-qr.ts:81). status_data = await self._api_get( "ilink/bot/get_qrcode_status", params={"qrcode": qrcode_id}, auth=False, - extra_headers={"iLink-App-ClientVersion": "1"}, ) except httpx.TimeoutException: continue @@ -838,7 +871,7 @@ class WeixinChannel(BaseChannel): # Matches aesEcbPaddedSize: Math.ceil((size + 1) / 16) * 16 padded_size = ((raw_size + 1 + 15) // 16) * 16 - # Step 1: Get upload URL (upload_param) from server + # Step 1: Get upload URL from server (prefer upload_full_url, fallback to upload_param) file_key = os.urandom(16).hex() upload_body: dict[str, Any] = { "filekey": file_key, @@ -855,19 +888,26 @@ class WeixinChannel(BaseChannel): upload_resp = await self._api_post("ilink/bot/getuploadurl", upload_body) logger.debug("WeChat getuploadurl response: {}", upload_resp) - upload_param = upload_resp.get("upload_param", "") - if not upload_param: - raise RuntimeError(f"getuploadurl returned no upload_param: {upload_resp}") + upload_full_url = str(upload_resp.get("upload_full_url", "") or "").strip() + upload_param = str(upload_resp.get("upload_param", "") or "") + if not upload_full_url and not upload_param: + raise RuntimeError( + "getuploadurl returned no upload URL " + f"(need upload_full_url or upload_param): {upload_resp}" + ) # Step 2: AES-128-ECB encrypt and POST to CDN aes_key_b64 = base64.b64encode(aes_key_raw).decode() encrypted_data = _encrypt_aes_ecb(raw_data, aes_key_b64) - cdn_upload_url = ( - f"{self.config.cdn_base_url}/upload" - f"?encrypted_query_param={quote(upload_param)}" - f"&filekey={quote(file_key)}" - ) + if upload_full_url: + cdn_upload_url = upload_full_url + else: + cdn_upload_url = ( + f"{self.config.cdn_base_url}/upload" + f"?encrypted_query_param={quote(upload_param)}" + f"&filekey={quote(file_key)}" + ) logger.debug("WeChat CDN POST url={} ciphertextSize={}", cdn_upload_url[:80], len(encrypted_data)) cdn_resp = await self._client.post( diff --git a/tests/channels/test_weixin_channel.py b/tests/channels/test_weixin_channel.py index 54d9bd93f..498e49e94 100644 --- a/tests/channels/test_weixin_channel.py +++ b/tests/channels/test_weixin_channel.py @@ -1,6 +1,7 @@ import asyncio import json import tempfile +from pathlib import Path from types import SimpleNamespace from unittest.mock import AsyncMock @@ -42,10 +43,13 @@ def test_make_headers_includes_route_tag_when_configured() -> None: assert headers["Authorization"] == "Bearer token" assert headers["SKRouteTag"] == "123" + assert headers["iLink-App-Id"] == "bot" + assert headers["iLink-App-ClientVersion"] == str((2 << 16) | (1 << 8) | 1) def test_channel_version_matches_reference_plugin_version() -> None: - assert WEIXIN_CHANNEL_VERSION == "1.0.3" + pkg = json.loads(Path("package/package.json").read_text()) + assert WEIXIN_CHANNEL_VERSION == pkg["version"] def test_save_and_load_state_persists_context_tokens(tmp_path) -> None: @@ -278,3 +282,61 @@ async def test_process_message_skips_bot_messages() -> None: ) assert bus.inbound_size == 0 + + +class _DummyHttpResponse: + def __init__(self, *, headers: dict[str, str] | None = None, status_code: int = 200) -> None: + self.headers = headers or {} + self.status_code = status_code + + def raise_for_status(self) -> None: + return None + + +@pytest.mark.asyncio +async def test_send_media_uses_upload_full_url_when_present(tmp_path) -> None: + channel, _bus = _make_channel() + + media_file = tmp_path / "photo.jpg" + media_file.write_bytes(b"hello-weixin") + + cdn_post = AsyncMock(return_value=_DummyHttpResponse(headers={"x-encrypted-param": "dl-param"})) + channel._client = SimpleNamespace(post=cdn_post) + channel._api_post = AsyncMock( + side_effect=[ + { + "upload_full_url": "https://upload-full.example.test/path?foo=bar", + "upload_param": "should-not-be-used", + }, + {"ret": 0}, + ] + ) + + await channel._send_media_file("wx-user", str(media_file), "ctx-1") + + # first POST call is CDN upload + cdn_url = cdn_post.await_args_list[0].args[0] + assert cdn_url == "https://upload-full.example.test/path?foo=bar" + + +@pytest.mark.asyncio +async def test_send_media_falls_back_to_upload_param_url(tmp_path) -> None: + channel, _bus = _make_channel() + + media_file = tmp_path / "photo.jpg" + media_file.write_bytes(b"hello-weixin") + + cdn_post = AsyncMock(return_value=_DummyHttpResponse(headers={"x-encrypted-param": "dl-param"})) + channel._client = SimpleNamespace(post=cdn_post) + channel._api_post = AsyncMock( + side_effect=[ + {"upload_param": "enc-need-fallback"}, + {"ret": 0}, + ] + ) + + await channel._send_media_file("wx-user", str(media_file), "ctx-1") + + cdn_url = cdn_post.await_args_list[0].args[0] + assert cdn_url.startswith(f"{channel.config.cdn_base_url}/upload?encrypted_query_param=enc-need-fallback") + assert "&filekey=" in cdn_url From b1d547568114750b856bb64f5ff0678707d09f5a Mon Sep 17 00:00:00 2001 From: xcosmosbox <2162381070@qq.com> Date: Sun, 29 Mar 2026 13:14:22 +0800 Subject: [PATCH 02/10] fix(weixin): correct PKCS7 unpadding for AES-ECB; support full_url for media download --- nanobot/channels/weixin.py | 56 +++++++++++++++++------- tests/channels/test_weixin_channel.py | 63 +++++++++++++++++++++++++++ 2 files changed, 103 insertions(+), 16 deletions(-) diff --git a/nanobot/channels/weixin.py b/nanobot/channels/weixin.py index 3b62a7260..c829512b9 100644 --- a/nanobot/channels/weixin.py +++ b/nanobot/channels/weixin.py @@ -685,9 +685,10 @@ class WeixinChannel(BaseChannel): """Download + AES-decrypt a media item. Returns local path or None.""" try: media = typed_item.get("media") or {} - encrypt_query_param = media.get("encrypt_query_param", "") + encrypt_query_param = str(media.get("encrypt_query_param", "") or "") + full_url = str(media.get("full_url", "") or "").strip() - if not encrypt_query_param: + if not encrypt_query_param and not full_url: return None # Resolve AES key (media-download.ts:43-45, pic-decrypt.ts:40-52) @@ -704,11 +705,14 @@ class WeixinChannel(BaseChannel): elif media_aes_key_b64: aes_key_b64 = media_aes_key_b64 - # Build CDN download URL with proper URL-encoding (cdn-url.ts:7) - cdn_url = ( - f"{self.config.cdn_base_url}/download" - f"?encrypted_query_param={quote(encrypt_query_param)}" - ) + # Prefer server-provided full_url, fallback to encrypted_query_param URL construction. + if full_url: + cdn_url = full_url + else: + cdn_url = ( + f"{self.config.cdn_base_url}/download" + f"?encrypted_query_param={quote(encrypt_query_param)}" + ) assert self._client is not None resp = await self._client.get(cdn_url) @@ -727,7 +731,8 @@ class WeixinChannel(BaseChannel): ext = _ext_for_type(media_type) if not filename: ts = int(time.time()) - h = abs(hash(encrypt_query_param)) % 100000 + hash_seed = encrypt_query_param or full_url + h = abs(hash(hash_seed)) % 100000 filename = f"{media_type}_{ts}_{h}{ext}" safe_name = os.path.basename(filename) file_path = media_dir / safe_name @@ -1045,23 +1050,42 @@ def _decrypt_aes_ecb(data: bytes, aes_key_b64: str) -> bytes: logger.warning("Failed to parse AES key, returning raw data: {}", e) return data + decrypted: bytes | None = None + try: from Crypto.Cipher import AES cipher = AES.new(key, AES.MODE_ECB) - return cipher.decrypt(data) # pycryptodome auto-strips PKCS7 with unpad + decrypted = cipher.decrypt(data) except ImportError: pass - try: - from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + if decrypted is None: + try: + from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes - cipher_obj = Cipher(algorithms.AES(key), modes.ECB()) - decryptor = cipher_obj.decryptor() - return decryptor.update(data) + decryptor.finalize() - except ImportError: - logger.warning("Cannot decrypt media: install 'pycryptodome' or 'cryptography'") + cipher_obj = Cipher(algorithms.AES(key), modes.ECB()) + decryptor = cipher_obj.decryptor() + decrypted = decryptor.update(data) + decryptor.finalize() + except ImportError: + logger.warning("Cannot decrypt media: install 'pycryptodome' or 'cryptography'") + return data + + return _pkcs7_unpad_safe(decrypted) + + +def _pkcs7_unpad_safe(data: bytes, block_size: int = 16) -> bytes: + """Safely remove PKCS7 padding when valid; otherwise return original bytes.""" + if not data: return data + if len(data) % block_size != 0: + return data + pad_len = data[-1] + if pad_len < 1 or pad_len > block_size: + return data + if data[-pad_len:] != bytes([pad_len]) * pad_len: + return data + return data[:-pad_len] def _ext_for_type(media_type: str) -> str: diff --git a/tests/channels/test_weixin_channel.py b/tests/channels/test_weixin_channel.py index 498e49e94..a52aaa804 100644 --- a/tests/channels/test_weixin_channel.py +++ b/tests/channels/test_weixin_channel.py @@ -7,12 +7,15 @@ from unittest.mock import AsyncMock import pytest +import nanobot.channels.weixin as weixin_mod from nanobot.bus.queue import MessageBus from nanobot.channels.weixin import ( ITEM_IMAGE, ITEM_TEXT, MESSAGE_TYPE_BOT, WEIXIN_CHANNEL_VERSION, + _decrypt_aes_ecb, + _encrypt_aes_ecb, WeixinChannel, WeixinConfig, ) @@ -340,3 +343,63 @@ async def test_send_media_falls_back_to_upload_param_url(tmp_path) -> None: cdn_url = cdn_post.await_args_list[0].args[0] assert cdn_url.startswith(f"{channel.config.cdn_base_url}/upload?encrypted_query_param=enc-need-fallback") assert "&filekey=" in cdn_url + + +def test_decrypt_aes_ecb_strips_valid_pkcs7_padding() -> None: + key_b64 = "MDEyMzQ1Njc4OWFiY2RlZg==" # base64("0123456789abcdef") + plaintext = b"hello-weixin-padding" + + ciphertext = _encrypt_aes_ecb(plaintext, key_b64) + decrypted = _decrypt_aes_ecb(ciphertext, key_b64) + + assert decrypted == plaintext + + +class _DummyDownloadResponse: + def __init__(self, content: bytes, status_code: int = 200) -> None: + self.content = content + self.status_code = status_code + + def raise_for_status(self) -> None: + return None + + +@pytest.mark.asyncio +async def test_download_media_item_uses_full_url_when_present(tmp_path) -> None: + channel, _bus = _make_channel() + weixin_mod.get_media_dir = lambda _name: tmp_path + + full_url = "https://cdn.example.test/download/full" + channel._client = SimpleNamespace( + get=AsyncMock(return_value=_DummyDownloadResponse(content=b"raw-image-bytes")) + ) + + item = { + "media": { + "full_url": full_url, + "encrypt_query_param": "enc-fallback-should-not-be-used", + }, + } + saved_path = await channel._download_media_item(item, "image") + + assert saved_path is not None + assert Path(saved_path).read_bytes() == b"raw-image-bytes" + channel._client.get.assert_awaited_once_with(full_url) + + +@pytest.mark.asyncio +async def test_download_media_item_falls_back_to_encrypt_query_param(tmp_path) -> None: + channel, _bus = _make_channel() + weixin_mod.get_media_dir = lambda _name: tmp_path + + channel._client = SimpleNamespace( + get=AsyncMock(return_value=_DummyDownloadResponse(content=b"fallback-bytes")) + ) + + item = {"media": {"encrypt_query_param": "enc-fallback"}} + saved_path = await channel._download_media_item(item, "image") + + assert saved_path is not None + assert Path(saved_path).read_bytes() == b"fallback-bytes" + called_url = channel._client.get.await_args_list[0].args[0] + assert called_url.startswith(f"{channel.config.cdn_base_url}/download?encrypted_query_param=enc-fallback") From 0207b541df85caab2ac2aabc9fbe30b9ba68a672 Mon Sep 17 00:00:00 2001 From: xcosmosbox <2162381070@qq.com> Date: Sun, 29 Mar 2026 13:37:22 +0800 Subject: [PATCH 03/10] feat(weixin): implement QR redirect handling --- nanobot/channels/weixin.py | 42 +++++++++++++- tests/channels/test_weixin_channel.py | 80 +++++++++++++++++++++++++-- 2 files changed, 116 insertions(+), 6 deletions(-) diff --git a/nanobot/channels/weixin.py b/nanobot/channels/weixin.py index c829512b9..51cef15ee 100644 --- a/nanobot/channels/weixin.py +++ b/nanobot/channels/weixin.py @@ -259,6 +259,25 @@ class WeixinChannel(BaseChannel): resp.raise_for_status() return resp.json() + async def _api_get_with_base( + self, + *, + base_url: str, + endpoint: str, + params: dict | None = None, + auth: bool = True, + extra_headers: dict[str, str] | None = None, + ) -> dict: + """GET helper that allows overriding base_url for QR redirect polling.""" + assert self._client is not None + url = f"{base_url.rstrip('/')}/{endpoint}" + hdrs = self._make_headers(auth=auth) + if extra_headers: + hdrs.update(extra_headers) + resp = await self._client.get(url, params=params, headers=hdrs) + resp.raise_for_status() + return resp.json() + async def _api_post( self, endpoint: str, @@ -299,12 +318,14 @@ class WeixinChannel(BaseChannel): refresh_count = 0 qrcode_id, scan_url = await self._fetch_qr_code() self._print_qr_code(scan_url) + current_poll_base_url = self.config.base_url logger.info("Waiting for QR code scan...") while self._running: try: - status_data = await self._api_get( - "ilink/bot/get_qrcode_status", + status_data = await self._api_get_with_base( + base_url=current_poll_base_url, + endpoint="ilink/bot/get_qrcode_status", params={"qrcode": qrcode_id}, auth=False, ) @@ -333,6 +354,23 @@ class WeixinChannel(BaseChannel): return False elif status == "scaned": logger.info("QR code scanned, waiting for confirmation...") + elif status == "scaned_but_redirect": + redirect_host = str(status_data.get("redirect_host", "") or "").strip() + if redirect_host: + if redirect_host.startswith("http://") or redirect_host.startswith("https://"): + redirected_base = redirect_host + else: + redirected_base = f"https://{redirect_host}" + if redirected_base != current_poll_base_url: + logger.info( + "QR status redirect: switching polling host to {}", + redirected_base, + ) + current_poll_base_url = redirected_base + else: + logger.warning( + "QR status returned scaned_but_redirect but redirect_host is missing", + ) elif status == "expired": refresh_count += 1 if refresh_count > MAX_QR_REFRESH_COUNT: diff --git a/tests/channels/test_weixin_channel.py b/tests/channels/test_weixin_channel.py index a52aaa804..076be610c 100644 --- a/tests/channels/test_weixin_channel.py +++ b/tests/channels/test_weixin_channel.py @@ -227,8 +227,12 @@ async def test_qr_login_refreshes_expired_qr_and_then_succeeds() -> None: channel._api_get = AsyncMock( side_effect=[ {"qrcode": "qr-1", "qrcode_img_content": "url-1"}, - {"status": "expired"}, {"qrcode": "qr-2", "qrcode_img_content": "url-2"}, + ] + ) + channel._api_get_with_base = AsyncMock( + side_effect=[ + {"status": "expired"}, { "status": "confirmed", "bot_token": "token-2", @@ -254,12 +258,16 @@ async def test_qr_login_returns_false_after_too_many_expired_qr_codes() -> None: channel._api_get = AsyncMock( side_effect=[ {"qrcode": "qr-1", "qrcode_img_content": "url-1"}, - {"status": "expired"}, {"qrcode": "qr-2", "qrcode_img_content": "url-2"}, - {"status": "expired"}, {"qrcode": "qr-3", "qrcode_img_content": "url-3"}, - {"status": "expired"}, {"qrcode": "qr-4", "qrcode_img_content": "url-4"}, + ] + ) + channel._api_get_with_base = AsyncMock( + side_effect=[ + {"status": "expired"}, + {"status": "expired"}, + {"status": "expired"}, {"status": "expired"}, ] ) @@ -269,6 +277,70 @@ async def test_qr_login_returns_false_after_too_many_expired_qr_codes() -> None: assert ok is False +@pytest.mark.asyncio +async def test_qr_login_switches_polling_base_url_on_redirect_status() -> None: + channel, _bus = _make_channel() + channel._running = True + channel._save_state = lambda: None + channel._print_qr_code = lambda url: None + channel._fetch_qr_code = AsyncMock(return_value=("qr-1", "url-1")) + + status_side_effect = [ + {"status": "scaned_but_redirect", "redirect_host": "idc.redirect.test"}, + { + "status": "confirmed", + "bot_token": "token-3", + "ilink_bot_id": "bot-3", + "baseurl": "https://example.test", + "ilink_user_id": "wx-user", + }, + ] + channel._api_get = AsyncMock(side_effect=list(status_side_effect)) + channel._api_get_with_base = AsyncMock(side_effect=list(status_side_effect)) + + ok = await channel._qr_login() + + assert ok is True + assert channel._token == "token-3" + assert channel._api_get_with_base.await_count == 2 + first_call = channel._api_get_with_base.await_args_list[0] + second_call = channel._api_get_with_base.await_args_list[1] + assert first_call.kwargs["base_url"] == "https://ilinkai.weixin.qq.com" + assert second_call.kwargs["base_url"] == "https://idc.redirect.test" + + +@pytest.mark.asyncio +async def test_qr_login_redirect_without_host_keeps_current_polling_base_url() -> None: + channel, _bus = _make_channel() + channel._running = True + channel._save_state = lambda: None + channel._print_qr_code = lambda url: None + channel._fetch_qr_code = AsyncMock(return_value=("qr-1", "url-1")) + + status_side_effect = [ + {"status": "scaned_but_redirect"}, + { + "status": "confirmed", + "bot_token": "token-4", + "ilink_bot_id": "bot-4", + "baseurl": "https://example.test", + "ilink_user_id": "wx-user", + }, + ] + channel._api_get = AsyncMock(side_effect=list(status_side_effect)) + channel._api_get_with_base = AsyncMock(side_effect=list(status_side_effect)) + + ok = await channel._qr_login() + + assert ok is True + assert channel._token == "token-4" + assert channel._api_get_with_base.await_count == 2 + first_call = channel._api_get_with_base.await_args_list[0] + second_call = channel._api_get_with_base.await_args_list[1] + assert first_call.kwargs["base_url"] == "https://ilinkai.weixin.qq.com" + assert second_call.kwargs["base_url"] == "https://ilinkai.weixin.qq.com" + + @pytest.mark.asyncio async def test_process_message_skips_bot_messages() -> None: channel, bus = _make_channel() From 2abd990b893edc0da8464f59b53373b5f870d883 Mon Sep 17 00:00:00 2001 From: xcosmosbox <2162381070@qq.com> Date: Sun, 29 Mar 2026 15:19:57 +0800 Subject: [PATCH 04/10] feat(weixin): add fallback logic for referenced media download --- nanobot/channels/weixin.py | 46 +++++++++++++++++ tests/channels/test_weixin_channel.py | 74 +++++++++++++++++++++++++++ 2 files changed, 120 insertions(+) diff --git a/nanobot/channels/weixin.py b/nanobot/channels/weixin.py index 51cef15ee..6324290f3 100644 --- a/nanobot/channels/weixin.py +++ b/nanobot/channels/weixin.py @@ -691,6 +691,52 @@ class WeixinChannel(BaseChannel): else: content_parts.append("[video]") + # Fallback: when no top-level media was downloaded, try quoted/referenced media. + # This aligns with the reference plugin behavior that checks ref_msg.message_item + # when main item_list has no downloadable media. + if not media_paths: + ref_media_item: dict[str, Any] | None = None + for item in item_list: + if item.get("type", 0) != ITEM_TEXT: + continue + ref = item.get("ref_msg") or {} + candidate = ref.get("message_item") or {} + if candidate.get("type", 0) in (ITEM_IMAGE, ITEM_VOICE, ITEM_FILE, ITEM_VIDEO): + ref_media_item = candidate + break + + if ref_media_item: + ref_type = ref_media_item.get("type", 0) + if ref_type == ITEM_IMAGE: + image_item = ref_media_item.get("image_item") or {} + file_path = await self._download_media_item(image_item, "image") + if file_path: + content_parts.append(f"[image]\n[Image: source: {file_path}]") + media_paths.append(file_path) + elif ref_type == ITEM_VOICE: + voice_item = ref_media_item.get("voice_item") or {} + file_path = await self._download_media_item(voice_item, "voice") + if file_path: + transcription = await self.transcribe_audio(file_path) + if transcription: + content_parts.append(f"[voice] {transcription}") + else: + content_parts.append(f"[voice]\n[Audio: source: {file_path}]") + media_paths.append(file_path) + elif ref_type == ITEM_FILE: + file_item = ref_media_item.get("file_item") or {} + file_name = file_item.get("file_name", "unknown") + file_path = await self._download_media_item(file_item, "file", file_name) + if file_path: + content_parts.append(f"[file: {file_name}]\n[File: source: {file_path}]") + media_paths.append(file_path) + elif ref_type == ITEM_VIDEO: + video_item = ref_media_item.get("video_item") or {} + file_path = await self._download_media_item(video_item, "video") + if file_path: + content_parts.append(f"[video]\n[Video: source: {file_path}]") + media_paths.append(file_path) + content = "\n".join(content_parts) if not content: return diff --git a/tests/channels/test_weixin_channel.py b/tests/channels/test_weixin_channel.py index 076be610c..565b08b01 100644 --- a/tests/channels/test_weixin_channel.py +++ b/tests/channels/test_weixin_channel.py @@ -176,6 +176,80 @@ async def test_process_message_extracts_media_and_preserves_paths() -> None: assert inbound.media == ["/tmp/test.jpg"] +@pytest.mark.asyncio +async def test_process_message_falls_back_to_referenced_media_when_no_top_level_media() -> None: + channel, bus = _make_channel() + channel._download_media_item = AsyncMock(return_value="/tmp/ref.jpg") + + await channel._process_message( + { + "message_type": 1, + "message_id": "m3-ref-fallback", + "from_user_id": "wx-user", + "context_token": "ctx-3-ref-fallback", + "item_list": [ + { + "type": ITEM_TEXT, + "text_item": {"text": "reply to image"}, + "ref_msg": { + "message_item": { + "type": ITEM_IMAGE, + "image_item": {"media": {"encrypt_query_param": "ref-enc"}}, + }, + }, + }, + ], + } + ) + + inbound = await asyncio.wait_for(bus.consume_inbound(), timeout=1.0) + + channel._download_media_item.assert_awaited_once_with( + {"media": {"encrypt_query_param": "ref-enc"}}, + "image", + ) + assert inbound.media == ["/tmp/ref.jpg"] + assert "reply to image" in inbound.content + assert "[image]" in inbound.content + + +@pytest.mark.asyncio +async def test_process_message_does_not_use_referenced_fallback_when_top_level_media_exists() -> None: + channel, bus = _make_channel() + channel._download_media_item = AsyncMock(side_effect=["/tmp/top.jpg", "/tmp/ref.jpg"]) + + await channel._process_message( + { + "message_type": 1, + "message_id": "m3-ref-no-fallback", + "from_user_id": "wx-user", + "context_token": "ctx-3-ref-no-fallback", + "item_list": [ + {"type": ITEM_IMAGE, "image_item": {"media": {"encrypt_query_param": "top-enc"}}}, + { + "type": ITEM_TEXT, + "text_item": {"text": "has top-level media"}, + "ref_msg": { + "message_item": { + "type": ITEM_IMAGE, + "image_item": {"media": {"encrypt_query_param": "ref-enc"}}, + }, + }, + }, + ], + } + ) + + inbound = await asyncio.wait_for(bus.consume_inbound(), timeout=1.0) + + channel._download_media_item.assert_awaited_once_with( + {"media": {"encrypt_query_param": "top-enc"}}, + "image", + ) + assert inbound.media == ["/tmp/top.jpg"] + assert "/tmp/ref.jpg" not in inbound.content + + @pytest.mark.asyncio async def test_send_without_context_token_does_not_send_text() -> None: channel, _bus = _make_channel() From 79a915307ce4423e8d1daf7f6221a827b26e4478 Mon Sep 17 00:00:00 2001 From: xcosmosbox <2162381070@qq.com> Date: Sun, 29 Mar 2026 16:25:25 +0800 Subject: [PATCH 05/10] feat(weixin): implement getConfig and sendTyping --- nanobot/channels/weixin.py | 85 ++++++++++++++++++++++----- tests/channels/test_weixin_channel.py | 64 ++++++++++++++++++++ 2 files changed, 135 insertions(+), 14 deletions(-) diff --git a/nanobot/channels/weixin.py b/nanobot/channels/weixin.py index 6324290f3..eb7d218da 100644 --- a/nanobot/channels/weixin.py +++ b/nanobot/channels/weixin.py @@ -99,6 +99,9 @@ MAX_CONSECUTIVE_FAILURES = 3 BACKOFF_DELAY_S = 30 RETRY_DELAY_S = 2 MAX_QR_REFRESH_COUNT = 3 +TYPING_STATUS_TYPING = 1 +TYPING_STATUS_CANCEL = 2 +TYPING_TICKET_TTL_S = 24 * 60 * 60 # Default long-poll timeout; overridden by server via longpolling_timeout_ms. DEFAULT_LONG_POLL_TIMEOUT_S = 35 @@ -158,6 +161,7 @@ class WeixinChannel(BaseChannel): self._poll_task: asyncio.Task | None = None self._next_poll_timeout_s: int = DEFAULT_LONG_POLL_TIMEOUT_S self._session_pause_until: float = 0.0 + self._typing_tickets: dict[str, tuple[str, float]] = {} # ------------------------------------------------------------------ # State persistence @@ -832,6 +836,40 @@ class WeixinChannel(BaseChannel): # Outbound (matches send.ts buildTextMessageReq + sendMessageWeixin) # ------------------------------------------------------------------ + async def _get_typing_ticket(self, user_id: str, context_token: str = "") -> str: + """Get typing ticket for a user with simple per-user TTL cache.""" + now = time.time() + cached = self._typing_tickets.get(user_id) + if cached: + ticket, expires_at = cached + if ticket and now < expires_at: + return ticket + + body: dict[str, Any] = { + "ilink_user_id": user_id, + "context_token": context_token or None, + "base_info": BASE_INFO, + } + data = await self._api_post("ilink/bot/getconfig", body) + if data.get("ret", 0) == 0: + ticket = str(data.get("typing_ticket", "") or "") + if ticket: + self._typing_tickets[user_id] = (ticket, now + TYPING_TICKET_TTL_S) + return ticket + return "" + + async def _send_typing(self, user_id: str, typing_ticket: str, status: int) -> None: + """Best-effort sendtyping wrapper.""" + if not typing_ticket: + return + body: dict[str, Any] = { + "ilink_user_id": user_id, + "typing_ticket": typing_ticket, + "status": status, + "base_info": BASE_INFO, + } + await self._api_post("ilink/bot/sendtyping", body) + async def send(self, msg: OutboundMessage) -> None: if not self._client or not self._token: logger.warning("WeChat client not initialized or not authenticated") @@ -851,29 +889,48 @@ class WeixinChannel(BaseChannel): ) return - # --- Send media files first (following Telegram channel pattern) --- - for media_path in (msg.media or []): - try: - await self._send_media_file(msg.chat_id, media_path, ctx_token) - except Exception as e: - filename = Path(media_path).name - logger.error("Failed to send WeChat media {}: {}", media_path, e) - # Notify user about failure via text - await self._send_text( - msg.chat_id, f"[Failed to send: {filename}]", ctx_token, - ) + typing_ticket = "" + try: + typing_ticket = await self._get_typing_ticket(msg.chat_id, ctx_token) + except Exception as e: + logger.warning("WeChat getconfig failed for {}: {}", msg.chat_id, e) + typing_ticket = "" - # --- Send text content --- - if not content: - return + if typing_ticket: + try: + await self._send_typing(msg.chat_id, typing_ticket, TYPING_STATUS_TYPING) + except Exception as e: + logger.debug("WeChat sendtyping(start) failed for {}: {}", msg.chat_id, e) try: + # --- Send media files first (following Telegram channel pattern) --- + for media_path in (msg.media or []): + try: + await self._send_media_file(msg.chat_id, media_path, ctx_token) + except Exception as e: + filename = Path(media_path).name + logger.error("Failed to send WeChat media {}: {}", media_path, e) + # Notify user about failure via text + await self._send_text( + msg.chat_id, f"[Failed to send: {filename}]", ctx_token, + ) + + # --- Send text content --- + if not content: + return + chunks = split_message(content, WEIXIN_MAX_MESSAGE_LEN) for chunk in chunks: await self._send_text(msg.chat_id, chunk, ctx_token) except Exception as e: logger.error("Error sending WeChat message: {}", e) raise + finally: + if typing_ticket: + try: + await self._send_typing(msg.chat_id, typing_ticket, TYPING_STATUS_CANCEL) + except Exception as e: + logger.debug("WeChat sendtyping(cancel) failed for {}: {}", msg.chat_id, e) async def _send_text( self, diff --git a/tests/channels/test_weixin_channel.py b/tests/channels/test_weixin_channel.py index 565b08b01..64ea0b370 100644 --- a/tests/channels/test_weixin_channel.py +++ b/tests/channels/test_weixin_channel.py @@ -280,6 +280,70 @@ async def test_send_does_not_send_when_session_is_paused() -> None: channel._send_text.assert_not_awaited() +@pytest.mark.asyncio +async def test_get_typing_ticket_fetches_and_caches_per_user() -> None: + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel._api_post = AsyncMock(return_value={"ret": 0, "typing_ticket": "ticket-1"}) + + first = await channel._get_typing_ticket("wx-user", "ctx-1") + second = await channel._get_typing_ticket("wx-user", "ctx-2") + + assert first == "ticket-1" + assert second == "ticket-1" + channel._api_post.assert_awaited_once_with( + "ilink/bot/getconfig", + {"ilink_user_id": "wx-user", "context_token": "ctx-1", "base_info": weixin_mod.BASE_INFO}, + ) + + +@pytest.mark.asyncio +async def test_send_uses_typing_start_and_cancel_when_ticket_available() -> None: + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel._context_tokens["wx-user"] = "ctx-typing" + channel._send_text = AsyncMock() + channel._api_post = AsyncMock( + side_effect=[ + {"ret": 0, "typing_ticket": "ticket-typing"}, + {"ret": 0}, + {"ret": 0}, + ] + ) + + await channel.send( + type("Msg", (), {"chat_id": "wx-user", "content": "pong", "media": [], "metadata": {}})() + ) + + channel._send_text.assert_awaited_once_with("wx-user", "pong", "ctx-typing") + assert channel._api_post.await_count == 3 + assert channel._api_post.await_args_list[0].args[0] == "ilink/bot/getconfig" + assert channel._api_post.await_args_list[1].args[0] == "ilink/bot/sendtyping" + assert channel._api_post.await_args_list[1].args[1]["status"] == 1 + assert channel._api_post.await_args_list[2].args[0] == "ilink/bot/sendtyping" + assert channel._api_post.await_args_list[2].args[1]["status"] == 2 + + +@pytest.mark.asyncio +async def test_send_still_sends_text_when_typing_ticket_missing() -> None: + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel._context_tokens["wx-user"] = "ctx-no-ticket" + channel._send_text = AsyncMock() + channel._api_post = AsyncMock(return_value={"ret": 1, "errmsg": "no config"}) + + await channel.send( + type("Msg", (), {"chat_id": "wx-user", "content": "pong", "media": [], "metadata": {}})() + ) + + channel._send_text.assert_awaited_once_with("wx-user", "pong", "ctx-no-ticket") + channel._api_post.assert_awaited_once() + assert channel._api_post.await_args_list[0].args[0] == "ilink/bot/getconfig" + + @pytest.mark.asyncio async def test_poll_once_pauses_session_on_expired_errcode() -> None: channel, _bus = _make_channel() From ed2ca759e7b2b0c54247fb5485fcbf87c725abee Mon Sep 17 00:00:00 2001 From: xcosmosbox <2162381070@qq.com> Date: Sun, 29 Mar 2026 20:27:23 +0800 Subject: [PATCH 06/10] fix(weixin): align full_url AES key handling and quoted media fallback logic with reference 1. Fix full_url path for non-image media to require AES key and skip download when missing, instead of persisting encrypted bytes as valid media. 2. Restrict quoted media fallback trigger to only when no top-level media item exists, not when top-level media download/decryption fails. --- nanobot/channels/weixin.py | 23 +++++++++- tests/channels/test_weixin_channel.py | 61 +++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 1 deletion(-) diff --git a/nanobot/channels/weixin.py b/nanobot/channels/weixin.py index eb7d218da..74d3a4736 100644 --- a/nanobot/channels/weixin.py +++ b/nanobot/channels/weixin.py @@ -116,6 +116,12 @@ _IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp", ".tiff", ".ico" _VIDEO_EXTS = {".mp4", ".avi", ".mov", ".mkv", ".webm", ".flv"} +def _has_downloadable_media_locator(media: dict[str, Any] | None) -> bool: + if not isinstance(media, dict): + return False + return bool(str(media.get("encrypt_query_param", "") or "") or str(media.get("full_url", "") or "").strip()) + + class WeixinConfig(Base): """Personal WeChat channel configuration.""" @@ -611,6 +617,7 @@ class WeixinChannel(BaseChannel): item_list: list[dict] = msg.get("item_list") or [] content_parts: list[str] = [] media_paths: list[str] = [] + has_top_level_downloadable_media = False for item in item_list: item_type = item.get("type", 0) @@ -647,6 +654,8 @@ class WeixinChannel(BaseChannel): elif item_type == ITEM_IMAGE: image_item = item.get("image_item") or {} + if _has_downloadable_media_locator(image_item.get("media")): + has_top_level_downloadable_media = True file_path = await self._download_media_item(image_item, "image") if file_path: content_parts.append(f"[image]\n[Image: source: {file_path}]") @@ -661,6 +670,8 @@ class WeixinChannel(BaseChannel): if voice_text: content_parts.append(f"[voice] {voice_text}") else: + if _has_downloadable_media_locator(voice_item.get("media")): + has_top_level_downloadable_media = True file_path = await self._download_media_item(voice_item, "voice") if file_path: transcription = await self.transcribe_audio(file_path) @@ -674,6 +685,8 @@ class WeixinChannel(BaseChannel): elif item_type == ITEM_FILE: file_item = item.get("file_item") or {} + if _has_downloadable_media_locator(file_item.get("media")): + has_top_level_downloadable_media = True file_name = file_item.get("file_name", "unknown") file_path = await self._download_media_item( file_item, @@ -688,6 +701,8 @@ class WeixinChannel(BaseChannel): elif item_type == ITEM_VIDEO: video_item = item.get("video_item") or {} + if _has_downloadable_media_locator(video_item.get("media")): + has_top_level_downloadable_media = True file_path = await self._download_media_item(video_item, "video") if file_path: content_parts.append(f"[video]\n[Video: source: {file_path}]") @@ -698,7 +713,7 @@ class WeixinChannel(BaseChannel): # Fallback: when no top-level media was downloaded, try quoted/referenced media. # This aligns with the reference plugin behavior that checks ref_msg.message_item # when main item_list has no downloadable media. - if not media_paths: + if not media_paths and not has_top_level_downloadable_media: ref_media_item: dict[str, Any] | None = None for item in item_list: if item.get("type", 0) != ITEM_TEXT: @@ -793,6 +808,12 @@ class WeixinChannel(BaseChannel): elif media_aes_key_b64: aes_key_b64 = media_aes_key_b64 + # Reference protocol behavior: VOICE/FILE/VIDEO require aes_key; + # only IMAGE may be downloaded as plain bytes when key is missing. + if media_type != "image" and not aes_key_b64: + logger.debug("Missing AES key for {} item, skip media download", media_type) + return None + # Prefer server-provided full_url, fallback to encrypted_query_param URL construction. if full_url: cdn_url = full_url diff --git a/tests/channels/test_weixin_channel.py b/tests/channels/test_weixin_channel.py index 64ea0b370..7701ad597 100644 --- a/tests/channels/test_weixin_channel.py +++ b/tests/channels/test_weixin_channel.py @@ -250,6 +250,46 @@ async def test_process_message_does_not_use_referenced_fallback_when_top_level_m assert "/tmp/ref.jpg" not in inbound.content +@pytest.mark.asyncio +async def test_process_message_does_not_fallback_when_top_level_media_exists_but_download_fails() -> None: + channel, bus = _make_channel() + # Top-level image download fails (None), referenced image would succeed if fallback were triggered. + channel._download_media_item = AsyncMock(side_effect=[None, "/tmp/ref.jpg"]) + + await channel._process_message( + { + "message_type": 1, + "message_id": "m3-ref-no-fallback-on-failure", + "from_user_id": "wx-user", + "context_token": "ctx-3-ref-no-fallback-on-failure", + "item_list": [ + {"type": ITEM_IMAGE, "image_item": {"media": {"encrypt_query_param": "top-enc"}}}, + { + "type": ITEM_TEXT, + "text_item": {"text": "quoted has media"}, + "ref_msg": { + "message_item": { + "type": ITEM_IMAGE, + "image_item": {"media": {"encrypt_query_param": "ref-enc"}}, + }, + }, + }, + ], + } + ) + + inbound = await asyncio.wait_for(bus.consume_inbound(), timeout=1.0) + + # Should only attempt top-level media item; reference fallback must not activate. + channel._download_media_item.assert_awaited_once_with( + {"media": {"encrypt_query_param": "top-enc"}}, + "image", + ) + assert inbound.media == [] + assert "[image]" in inbound.content + assert "/tmp/ref.jpg" not in inbound.content + + @pytest.mark.asyncio async def test_send_without_context_token_does_not_send_text() -> None: channel, _bus = _make_channel() @@ -613,3 +653,24 @@ async def test_download_media_item_falls_back_to_encrypt_query_param(tmp_path) - assert Path(saved_path).read_bytes() == b"fallback-bytes" called_url = channel._client.get.await_args_list[0].args[0] assert called_url.startswith(f"{channel.config.cdn_base_url}/download?encrypted_query_param=enc-fallback") + + +@pytest.mark.asyncio +async def test_download_media_item_non_image_requires_aes_key_even_with_full_url(tmp_path) -> None: + channel, _bus = _make_channel() + weixin_mod.get_media_dir = lambda _name: tmp_path + + full_url = "https://cdn.example.test/download/voice" + channel._client = SimpleNamespace( + get=AsyncMock(return_value=_DummyDownloadResponse(content=b"ciphertext-or-unknown")) + ) + + item = { + "media": { + "full_url": full_url, + }, + } + saved_path = await channel._download_media_item(item, "voice") + + assert saved_path is None + channel._client.get.assert_not_awaited() From 1a4ad676285366a8e74ba22839421b61061c7039 Mon Sep 17 00:00:00 2001 From: xcosmosbox <2162381070@qq.com> Date: Sun, 29 Mar 2026 21:28:58 +0800 Subject: [PATCH 07/10] feat(weixin): add voice message, typing keepalive, getConfig cache, and QR polling resilience --- nanobot/channels/weixin.py | 94 ++++++++++++++-- tests/channels/test_weixin_channel.py | 153 ++++++++++++++++++++++++++ 2 files changed, 235 insertions(+), 12 deletions(-) diff --git a/nanobot/channels/weixin.py b/nanobot/channels/weixin.py index 74d3a4736..4341f21d1 100644 --- a/nanobot/channels/weixin.py +++ b/nanobot/channels/weixin.py @@ -15,6 +15,7 @@ import hashlib import json import mimetypes import os +import random import re import time import uuid @@ -102,18 +103,23 @@ MAX_QR_REFRESH_COUNT = 3 TYPING_STATUS_TYPING = 1 TYPING_STATUS_CANCEL = 2 TYPING_TICKET_TTL_S = 24 * 60 * 60 +TYPING_KEEPALIVE_INTERVAL_S = 5 +CONFIG_CACHE_INITIAL_RETRY_S = 2 +CONFIG_CACHE_MAX_RETRY_S = 60 * 60 # Default long-poll timeout; overridden by server via longpolling_timeout_ms. DEFAULT_LONG_POLL_TIMEOUT_S = 35 -# Media-type codes for getuploadurl (1=image, 2=video, 3=file) +# Media-type codes for getuploadurl (1=image, 2=video, 3=file, 4=voice) UPLOAD_MEDIA_IMAGE = 1 UPLOAD_MEDIA_VIDEO = 2 UPLOAD_MEDIA_FILE = 3 +UPLOAD_MEDIA_VOICE = 4 # File extensions considered as images / videos for outbound media _IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp", ".tiff", ".ico", ".svg"} _VIDEO_EXTS = {".mp4", ".avi", ".mov", ".mkv", ".webm", ".flv"} +_VOICE_EXTS = {".mp3", ".wav", ".amr", ".silk", ".ogg", ".m4a", ".aac", ".flac"} def _has_downloadable_media_locator(media: dict[str, Any] | None) -> bool: @@ -167,7 +173,7 @@ class WeixinChannel(BaseChannel): self._poll_task: asyncio.Task | None = None self._next_poll_timeout_s: int = DEFAULT_LONG_POLL_TIMEOUT_S self._session_pause_until: float = 0.0 - self._typing_tickets: dict[str, tuple[str, float]] = {} + self._typing_tickets: dict[str, dict[str, Any]] = {} # ------------------------------------------------------------------ # State persistence @@ -339,7 +345,16 @@ class WeixinChannel(BaseChannel): params={"qrcode": qrcode_id}, auth=False, ) - except httpx.TimeoutException: + except Exception as e: + if self._is_retryable_qr_poll_error(e): + logger.warning("QR polling temporary error, will retry: {}", e) + await asyncio.sleep(1) + continue + raise + + if not isinstance(status_data, dict): + logger.warning("QR polling got non-object response, continue waiting") + await asyncio.sleep(1) continue status = status_data.get("status", "") @@ -408,6 +423,16 @@ class WeixinChannel(BaseChannel): return False + @staticmethod + def _is_retryable_qr_poll_error(err: Exception) -> bool: + if isinstance(err, httpx.TimeoutException | httpx.TransportError): + return True + if isinstance(err, httpx.HTTPStatusError): + status_code = err.response.status_code if err.response is not None else 0 + if status_code >= 500: + return True + return False + @staticmethod def _print_qr_code(url: str) -> None: try: @@ -858,13 +883,11 @@ class WeixinChannel(BaseChannel): # ------------------------------------------------------------------ async def _get_typing_ticket(self, user_id: str, context_token: str = "") -> str: - """Get typing ticket for a user with simple per-user TTL cache.""" + """Get typing ticket with per-user refresh + failure backoff cache.""" now = time.time() - cached = self._typing_tickets.get(user_id) - if cached: - ticket, expires_at = cached - if ticket and now < expires_at: - return ticket + entry = self._typing_tickets.get(user_id) + if entry and now < float(entry.get("next_fetch_at", 0)): + return str(entry.get("ticket", "") or "") body: dict[str, Any] = { "ilink_user_id": user_id, @@ -874,9 +897,27 @@ class WeixinChannel(BaseChannel): data = await self._api_post("ilink/bot/getconfig", body) if data.get("ret", 0) == 0: ticket = str(data.get("typing_ticket", "") or "") - if ticket: - self._typing_tickets[user_id] = (ticket, now + TYPING_TICKET_TTL_S) - return ticket + self._typing_tickets[user_id] = { + "ticket": ticket, + "ever_succeeded": True, + "next_fetch_at": now + (random.random() * TYPING_TICKET_TTL_S), + "retry_delay_s": CONFIG_CACHE_INITIAL_RETRY_S, + } + return ticket + + prev_delay = float(entry.get("retry_delay_s", CONFIG_CACHE_INITIAL_RETRY_S)) if entry else CONFIG_CACHE_INITIAL_RETRY_S + next_delay = min(prev_delay * 2, CONFIG_CACHE_MAX_RETRY_S) + if entry: + entry["next_fetch_at"] = now + next_delay + entry["retry_delay_s"] = next_delay + return str(entry.get("ticket", "") or "") + + self._typing_tickets[user_id] = { + "ticket": "", + "ever_succeeded": False, + "next_fetch_at": now + CONFIG_CACHE_INITIAL_RETRY_S, + "retry_delay_s": CONFIG_CACHE_INITIAL_RETRY_S, + } return "" async def _send_typing(self, user_id: str, typing_ticket: str, status: int) -> None: @@ -891,6 +932,16 @@ class WeixinChannel(BaseChannel): } await self._api_post("ilink/bot/sendtyping", body) + async def _typing_keepalive_loop(self, user_id: str, typing_ticket: str, stop_event: asyncio.Event) -> None: + while not stop_event.is_set(): + await asyncio.sleep(TYPING_KEEPALIVE_INTERVAL_S) + if stop_event.is_set(): + break + try: + await self._send_typing(user_id, typing_ticket, TYPING_STATUS_TYPING) + except Exception as e: + logger.debug("WeChat sendtyping(keepalive) failed for {}: {}", user_id, e) + async def send(self, msg: OutboundMessage) -> None: if not self._client or not self._token: logger.warning("WeChat client not initialized or not authenticated") @@ -923,6 +974,13 @@ class WeixinChannel(BaseChannel): except Exception as e: logger.debug("WeChat sendtyping(start) failed for {}: {}", msg.chat_id, e) + typing_keepalive_stop = asyncio.Event() + typing_keepalive_task: asyncio.Task | None = None + if typing_ticket: + typing_keepalive_task = asyncio.create_task( + self._typing_keepalive_loop(msg.chat_id, typing_ticket, typing_keepalive_stop) + ) + try: # --- Send media files first (following Telegram channel pattern) --- for media_path in (msg.media or []): @@ -947,6 +1005,14 @@ class WeixinChannel(BaseChannel): logger.error("Error sending WeChat message: {}", e) raise finally: + if typing_keepalive_task: + typing_keepalive_stop.set() + typing_keepalive_task.cancel() + try: + await typing_keepalive_task + except asyncio.CancelledError: + pass + if typing_ticket: try: await self._send_typing(msg.chat_id, typing_ticket, TYPING_STATUS_CANCEL) @@ -1025,6 +1091,10 @@ class WeixinChannel(BaseChannel): upload_type = UPLOAD_MEDIA_VIDEO item_type = ITEM_VIDEO item_key = "video_item" + elif ext in _VOICE_EXTS: + upload_type = UPLOAD_MEDIA_VOICE + item_type = ITEM_VOICE + item_key = "voice_item" else: upload_type = UPLOAD_MEDIA_FILE item_type = ITEM_FILE diff --git a/tests/channels/test_weixin_channel.py b/tests/channels/test_weixin_channel.py index 7701ad597..c4e5cf552 100644 --- a/tests/channels/test_weixin_channel.py +++ b/tests/channels/test_weixin_channel.py @@ -6,6 +6,7 @@ from types import SimpleNamespace from unittest.mock import AsyncMock import pytest +import httpx import nanobot.channels.weixin as weixin_mod from nanobot.bus.queue import MessageBus @@ -595,6 +596,158 @@ async def test_send_media_falls_back_to_upload_param_url(tmp_path) -> None: assert "&filekey=" in cdn_url +@pytest.mark.asyncio +async def test_send_media_voice_file_uses_voice_item_and_voice_upload_type(tmp_path) -> None: + channel, _bus = _make_channel() + + media_file = tmp_path / "voice.mp3" + media_file.write_bytes(b"voice-bytes") + + cdn_post = AsyncMock(return_value=_DummyHttpResponse(headers={"x-encrypted-param": "voice-dl-param"})) + channel._client = SimpleNamespace(post=cdn_post) + channel._api_post = AsyncMock( + side_effect=[ + {"upload_full_url": "https://upload-full.example.test/voice?foo=bar"}, + {"ret": 0}, + ] + ) + + await channel._send_media_file("wx-user", str(media_file), "ctx-voice") + + getupload_body = channel._api_post.await_args_list[0].args[1] + assert getupload_body["media_type"] == 4 + + sendmessage_body = channel._api_post.await_args_list[1].args[1] + item = sendmessage_body["msg"]["item_list"][0] + assert item["type"] == 3 + assert "voice_item" in item + assert "file_item" not in item + assert item["voice_item"]["media"]["encrypt_query_param"] == "voice-dl-param" + + +@pytest.mark.asyncio +async def test_send_typing_uses_keepalive_until_send_finishes() -> None: + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel._context_tokens["wx-user"] = "ctx-typing-loop" + async def _api_post_side_effect(endpoint: str, _body: dict | None = None, *, auth: bool = True): + if endpoint == "ilink/bot/getconfig": + return {"ret": 0, "typing_ticket": "ticket-keepalive"} + return {"ret": 0} + + channel._api_post = AsyncMock(side_effect=_api_post_side_effect) + + async def _slow_send_text(*_args, **_kwargs) -> None: + await asyncio.sleep(0.03) + + channel._send_text = AsyncMock(side_effect=_slow_send_text) + + old_interval = weixin_mod.TYPING_KEEPALIVE_INTERVAL_S + weixin_mod.TYPING_KEEPALIVE_INTERVAL_S = 0.01 + try: + await channel.send( + type("Msg", (), {"chat_id": "wx-user", "content": "pong", "media": [], "metadata": {}})() + ) + finally: + weixin_mod.TYPING_KEEPALIVE_INTERVAL_S = old_interval + + status_calls = [ + c.args[1]["status"] + for c in channel._api_post.await_args_list + if c.args and c.args[0] == "ilink/bot/sendtyping" + ] + assert status_calls.count(1) >= 2 + assert status_calls[-1] == 2 + + +@pytest.mark.asyncio +async def test_get_typing_ticket_failure_uses_backoff_and_cached_ticket(monkeypatch) -> None: + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + + now = {"value": 1000.0} + monkeypatch.setattr(weixin_mod.time, "time", lambda: now["value"]) + monkeypatch.setattr(weixin_mod.random, "random", lambda: 0.5) + + channel._api_post = AsyncMock(return_value={"ret": 0, "typing_ticket": "ticket-ok"}) + first = await channel._get_typing_ticket("wx-user", "ctx-1") + assert first == "ticket-ok" + + # force refresh window reached + now["value"] = now["value"] + (12 * 60 * 60) + 1 + channel._api_post = AsyncMock(return_value={"ret": 1, "errmsg": "temporary failure"}) + + # On refresh failure, should still return cached ticket and apply backoff. + second = await channel._get_typing_ticket("wx-user", "ctx-2") + assert second == "ticket-ok" + assert channel._api_post.await_count == 1 + + # Before backoff expiry, no extra fetch should happen. + now["value"] += 1 + third = await channel._get_typing_ticket("wx-user", "ctx-3") + assert third == "ticket-ok" + assert channel._api_post.await_count == 1 + + +@pytest.mark.asyncio +async def test_qr_login_treats_temporary_connect_error_as_wait_and_recovers() -> None: + channel, _bus = _make_channel() + channel._running = True + channel._save_state = lambda: None + channel._print_qr_code = lambda url: None + channel._fetch_qr_code = AsyncMock(return_value=("qr-1", "url-1")) + + request = httpx.Request("GET", "https://ilinkai.weixin.qq.com/ilink/bot/get_qrcode_status") + channel._api_get_with_base = AsyncMock( + side_effect=[ + httpx.ConnectError("temporary network", request=request), + { + "status": "confirmed", + "bot_token": "token-net-ok", + "ilink_bot_id": "bot-id", + "baseurl": "https://example.test", + "ilink_user_id": "wx-user", + }, + ] + ) + + ok = await channel._qr_login() + + assert ok is True + assert channel._token == "token-net-ok" + + +@pytest.mark.asyncio +async def test_qr_login_treats_5xx_gateway_response_error_as_wait_and_recovers() -> None: + channel, _bus = _make_channel() + channel._running = True + channel._save_state = lambda: None + channel._print_qr_code = lambda url: None + channel._fetch_qr_code = AsyncMock(return_value=("qr-1", "url-1")) + + request = httpx.Request("GET", "https://ilinkai.weixin.qq.com/ilink/bot/get_qrcode_status") + response = httpx.Response(status_code=524, request=request) + channel._api_get_with_base = AsyncMock( + side_effect=[ + httpx.HTTPStatusError("gateway timeout", request=request, response=response), + { + "status": "confirmed", + "bot_token": "token-5xx-ok", + "ilink_bot_id": "bot-id", + "baseurl": "https://example.test", + "ilink_user_id": "wx-user", + }, + ] + ) + + ok = await channel._qr_login() + + assert ok is True + assert channel._token == "token-5xx-ok" + + def test_decrypt_aes_ecb_strips_valid_pkcs7_padding() -> None: key_b64 = "MDEyMzQ1Njc4OWFiY2RlZg==" # base64("0123456789abcdef") plaintext = b"hello-weixin-padding" From 2dce5e07c1db40e28260ec148fefeb1162e025a8 Mon Sep 17 00:00:00 2001 From: xcosmosbox <2162381070@qq.com> Date: Mon, 30 Mar 2026 09:06:49 +0800 Subject: [PATCH 08/10] fix(weixin): fix test file version reader --- nanobot/channels/weixin.py | 21 +++------------------ tests/channels/test_weixin_channel.py | 3 +-- 2 files changed, 4 insertions(+), 20 deletions(-) diff --git a/nanobot/channels/weixin.py b/nanobot/channels/weixin.py index 4341f21d1..7f6c6abab 100644 --- a/nanobot/channels/weixin.py +++ b/nanobot/channels/weixin.py @@ -54,19 +54,8 @@ MESSAGE_TYPE_BOT = 2 MESSAGE_STATE_FINISH = 2 WEIXIN_MAX_MESSAGE_LEN = 4000 - - -def _read_reference_package_meta() -> dict[str, str]: - """Best-effort read of reference `package/package.json` metadata.""" - try: - pkg_path = Path(__file__).resolve().parents[2] / "package" / "package.json" - data = json.loads(pkg_path.read_text(encoding="utf-8")) - return { - "version": str(data.get("version", "") or ""), - "ilink_appid": str(data.get("ilink_appid", "") or ""), - } - except Exception: - return {"version": "", "ilink_appid": ""} +WEIXIN_CHANNEL_VERSION = "2.1.1" +ILINK_APP_ID = "bot" def _build_client_version(version: str) -> int: @@ -84,11 +73,7 @@ def _build_client_version(version: str) -> int: patch = _as_int(2) return ((major & 0xFF) << 16) | ((minor & 0xFF) << 8) | (patch & 0xFF) - -_PKG_META = _read_reference_package_meta() -WEIXIN_CHANNEL_VERSION = _PKG_META["version"] or "unknown" -ILINK_APP_ID = _PKG_META["ilink_appid"] -ILINK_APP_CLIENT_VERSION = _build_client_version(_PKG_META["version"] or "0.0.0") +ILINK_APP_CLIENT_VERSION = _build_client_version(WEIXIN_CHANNEL_VERSION) BASE_INFO: dict[str, str] = {"channel_version": WEIXIN_CHANNEL_VERSION} # Session-expired error code diff --git a/tests/channels/test_weixin_channel.py b/tests/channels/test_weixin_channel.py index c4e5cf552..f4d57a8b0 100644 --- a/tests/channels/test_weixin_channel.py +++ b/tests/channels/test_weixin_channel.py @@ -52,8 +52,7 @@ def test_make_headers_includes_route_tag_when_configured() -> None: def test_channel_version_matches_reference_plugin_version() -> None: - pkg = json.loads(Path("package/package.json").read_text()) - assert WEIXIN_CHANNEL_VERSION == pkg["version"] + assert WEIXIN_CHANNEL_VERSION == "2.1.1" def test_save_and_load_state_persists_context_tokens(tmp_path) -> None: From d0c68157b11a470144b96e5a0afdb5ce0a846ebd Mon Sep 17 00:00:00 2001 From: xcosmosbox <2162381070@qq.com> Date: Tue, 31 Mar 2026 12:55:29 +0800 Subject: [PATCH 09/10] fix(WeiXin): fix full_url download error --- nanobot/channels/weixin.py | 142 ++++++++++++-------------- tests/channels/test_weixin_channel.py | 63 ++++++++++++ 2 files changed, 126 insertions(+), 79 deletions(-) diff --git a/nanobot/channels/weixin.py b/nanobot/channels/weixin.py index 7f6c6abab..c6c1603ae 100644 --- a/nanobot/channels/weixin.py +++ b/nanobot/channels/weixin.py @@ -197,8 +197,7 @@ class WeixinChannel(BaseChannel): if base_url: self.config.base_url = base_url return bool(self._token) - except Exception as e: - logger.warning("Failed to load WeChat state: {}", e) + except Exception: return False def _save_state(self) -> None: @@ -211,8 +210,8 @@ class WeixinChannel(BaseChannel): "base_url": self.config.base_url, } state_file.write_text(json.dumps(data, ensure_ascii=False)) - except Exception as e: - logger.warning("Failed to save WeChat state: {}", e) + except Exception: + pass # ------------------------------------------------------------------ # HTTP helpers (matches api.ts buildHeaders / apiFetch) @@ -243,6 +242,15 @@ class WeixinChannel(BaseChannel): headers["SKRouteTag"] = str(self.config.route_tag).strip() return headers + @staticmethod + def _is_retryable_media_download_error(err: Exception) -> bool: + if isinstance(err, httpx.TimeoutException | httpx.TransportError): + return True + if isinstance(err, httpx.HTTPStatusError): + status_code = err.response.status_code if err.response is not None else 0 + return status_code >= 500 + return False + async def _api_get( self, endpoint: str, @@ -315,13 +323,11 @@ class WeixinChannel(BaseChannel): async def _qr_login(self) -> bool: """Perform QR code login flow. Returns True on success.""" try: - logger.info("Starting WeChat QR code login...") refresh_count = 0 qrcode_id, scan_url = await self._fetch_qr_code() self._print_qr_code(scan_url) current_poll_base_url = self.config.base_url - logger.info("Waiting for QR code scan...") while self._running: try: status_data = await self._api_get_with_base( @@ -332,13 +338,11 @@ class WeixinChannel(BaseChannel): ) except Exception as e: if self._is_retryable_qr_poll_error(e): - logger.warning("QR polling temporary error, will retry: {}", e) await asyncio.sleep(1) continue raise if not isinstance(status_data, dict): - logger.warning("QR polling got non-object response, continue waiting") await asyncio.sleep(1) continue @@ -362,8 +366,6 @@ class WeixinChannel(BaseChannel): else: logger.error("Login confirmed but no bot_token in response") return False - elif status == "scaned": - logger.info("QR code scanned, waiting for confirmation...") elif status == "scaned_but_redirect": redirect_host = str(status_data.get("redirect_host", "") or "").strip() if redirect_host: @@ -372,15 +374,7 @@ class WeixinChannel(BaseChannel): else: redirected_base = f"https://{redirect_host}" if redirected_base != current_poll_base_url: - logger.info( - "QR status redirect: switching polling host to {}", - redirected_base, - ) current_poll_base_url = redirected_base - else: - logger.warning( - "QR status returned scaned_but_redirect but redirect_host is missing", - ) elif status == "expired": refresh_count += 1 if refresh_count > MAX_QR_REFRESH_COUNT: @@ -390,14 +384,8 @@ class WeixinChannel(BaseChannel): MAX_QR_REFRESH_COUNT, ) return False - logger.warning( - "QR code expired, refreshing... ({}/{})", - refresh_count, - MAX_QR_REFRESH_COUNT, - ) qrcode_id, scan_url = await self._fetch_qr_code() self._print_qr_code(scan_url) - logger.info("New QR code generated, waiting for scan...") continue # status == "wait" — keep polling @@ -428,7 +416,6 @@ class WeixinChannel(BaseChannel): qr.make(fit=True) qr.print_ascii(invert=True) except ImportError: - logger.info("QR code URL (install 'qrcode' for terminal display): {}", url) print(f"\nLogin URL: {url}\n") # ------------------------------------------------------------------ @@ -490,12 +477,6 @@ class WeixinChannel(BaseChannel): if not self._running: break consecutive_failures += 1 - logger.error( - "WeChat poll error ({}/{}): {}", - consecutive_failures, - MAX_CONSECUTIVE_FAILURES, - e, - ) if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: consecutive_failures = 0 await asyncio.sleep(BACKOFF_DELAY_S) @@ -510,8 +491,6 @@ class WeixinChannel(BaseChannel): await self._client.aclose() self._client = None self._save_state() - logger.info("WeChat channel stopped") - # ------------------------------------------------------------------ # Polling (matches monitor.ts monitorWeixinProvider) # ------------------------------------------------------------------ @@ -537,10 +516,6 @@ class WeixinChannel(BaseChannel): async def _poll_once(self) -> None: remaining = self._session_pause_remaining_s() if remaining > 0: - logger.warning( - "WeChat session paused, waiting {} min before next poll.", - max((remaining + 59) // 60, 1), - ) await asyncio.sleep(remaining) return @@ -590,8 +565,8 @@ class WeixinChannel(BaseChannel): for msg in msgs: try: await self._process_message(msg) - except Exception as e: - logger.error("Error processing WeChat message: {}", e) + except Exception: + pass # ------------------------------------------------------------------ # Inbound message processing (matches inbound.ts + process-message.ts) @@ -770,13 +745,6 @@ class WeixinChannel(BaseChannel): if not content: return - logger.info( - "WeChat inbound: from={} items={} bodyLen={}", - from_user_id, - ",".join(str(i.get("type", 0)) for i in item_list), - len(content), - ) - await self._handle_message( sender_id=from_user_id, chat_id=from_user_id, @@ -821,27 +789,47 @@ class WeixinChannel(BaseChannel): # Reference protocol behavior: VOICE/FILE/VIDEO require aes_key; # only IMAGE may be downloaded as plain bytes when key is missing. if media_type != "image" and not aes_key_b64: - logger.debug("Missing AES key for {} item, skip media download", media_type) return None - # Prefer server-provided full_url, fallback to encrypted_query_param URL construction. - if full_url: - cdn_url = full_url - else: - cdn_url = ( + assert self._client is not None + fallback_url = "" + if encrypt_query_param: + fallback_url = ( f"{self.config.cdn_base_url}/download" f"?encrypted_query_param={quote(encrypt_query_param)}" ) - assert self._client is not None - resp = await self._client.get(cdn_url) - resp.raise_for_status() - data = resp.content + download_candidates: list[tuple[str, str]] = [] + if full_url: + download_candidates.append(("full_url", full_url)) + if fallback_url and (not full_url or fallback_url != full_url): + download_candidates.append(("encrypt_query_param", fallback_url)) + + data = b"" + for idx, (download_source, cdn_url) in enumerate(download_candidates): + try: + resp = await self._client.get(cdn_url) + resp.raise_for_status() + data = resp.content + break + except Exception as e: + has_more_candidates = idx + 1 < len(download_candidates) + should_fallback = ( + download_source == "full_url" + and has_more_candidates + and self._is_retryable_media_download_error(e) + ) + if should_fallback: + logger.warning( + "WeChat media download failed via full_url, falling back to encrypt_query_param: type={} err={}", + media_type, + e, + ) + continue + raise if aes_key_b64 and data: data = _decrypt_aes_ecb(data, aes_key_b64) - elif not aes_key_b64: - logger.debug("No AES key for {} item, using raw bytes", media_type) if not data: return None @@ -856,7 +844,6 @@ class WeixinChannel(BaseChannel): safe_name = os.path.basename(filename) file_path = media_dir / safe_name file_path.write_bytes(data) - logger.debug("Downloaded WeChat {} to {}", media_type, file_path) return str(file_path) except Exception as e: @@ -918,14 +905,17 @@ class WeixinChannel(BaseChannel): await self._api_post("ilink/bot/sendtyping", body) async def _typing_keepalive_loop(self, user_id: str, typing_ticket: str, stop_event: asyncio.Event) -> None: - while not stop_event.is_set(): - await asyncio.sleep(TYPING_KEEPALIVE_INTERVAL_S) - if stop_event.is_set(): - break - try: - await self._send_typing(user_id, typing_ticket, TYPING_STATUS_TYPING) - except Exception as e: - logger.debug("WeChat sendtyping(keepalive) failed for {}: {}", user_id, e) + try: + while not stop_event.is_set(): + await asyncio.sleep(TYPING_KEEPALIVE_INTERVAL_S) + if stop_event.is_set(): + break + try: + await self._send_typing(user_id, typing_ticket, TYPING_STATUS_TYPING) + except Exception: + pass + finally: + pass async def send(self, msg: OutboundMessage) -> None: if not self._client or not self._token: @@ -933,8 +923,7 @@ class WeixinChannel(BaseChannel): return try: self._assert_session_active() - except RuntimeError as e: - logger.warning("WeChat send blocked: {}", e) + except RuntimeError: return content = msg.content.strip() @@ -949,15 +938,14 @@ class WeixinChannel(BaseChannel): typing_ticket = "" try: typing_ticket = await self._get_typing_ticket(msg.chat_id, ctx_token) - except Exception as e: - logger.warning("WeChat getconfig failed for {}: {}", msg.chat_id, e) + except Exception: typing_ticket = "" if typing_ticket: try: await self._send_typing(msg.chat_id, typing_ticket, TYPING_STATUS_TYPING) - except Exception as e: - logger.debug("WeChat sendtyping(start) failed for {}: {}", msg.chat_id, e) + except Exception: + pass typing_keepalive_stop = asyncio.Event() typing_keepalive_task: asyncio.Task | None = None @@ -1001,8 +989,8 @@ class WeixinChannel(BaseChannel): if typing_ticket: try: await self._send_typing(msg.chat_id, typing_ticket, TYPING_STATUS_CANCEL) - except Exception as e: - logger.debug("WeChat sendtyping(cancel) failed for {}: {}", msg.chat_id, e) + except Exception: + pass async def _send_text( self, @@ -1108,7 +1096,6 @@ class WeixinChannel(BaseChannel): assert self._client is not None upload_resp = await self._api_post("ilink/bot/getuploadurl", upload_body) - logger.debug("WeChat getuploadurl response: {}", upload_resp) upload_full_url = str(upload_resp.get("upload_full_url", "") or "").strip() upload_param = str(upload_resp.get("upload_param", "") or "") @@ -1130,7 +1117,6 @@ class WeixinChannel(BaseChannel): f"?encrypted_query_param={quote(upload_param)}" f"&filekey={quote(file_key)}" ) - logger.debug("WeChat CDN POST url={} ciphertextSize={}", cdn_upload_url[:80], len(encrypted_data)) cdn_resp = await self._client.post( cdn_upload_url, @@ -1146,7 +1132,6 @@ class WeixinChannel(BaseChannel): "CDN upload response missing x-encrypted-param header; " f"status={cdn_resp.status_code} headers={dict(cdn_resp.headers)}" ) - logger.debug("WeChat CDN upload success for {}, got download_param", p.name) # Step 3: Send message with the media item # aes_key for CDNMedia is the hex key encoded as base64 @@ -1195,7 +1180,6 @@ class WeixinChannel(BaseChannel): raise RuntimeError( f"WeChat send media error (code {errcode}): {data.get('errmsg', '')}" ) - logger.info("WeChat media sent: {} (type={})", p.name, item_key) # --------------------------------------------------------------------------- diff --git a/tests/channels/test_weixin_channel.py b/tests/channels/test_weixin_channel.py index f4d57a8b0..515eaa28b 100644 --- a/tests/channels/test_weixin_channel.py +++ b/tests/channels/test_weixin_channel.py @@ -766,6 +766,21 @@ class _DummyDownloadResponse: return None +class _DummyErrorDownloadResponse(_DummyDownloadResponse): + def __init__(self, url: str, status_code: int) -> None: + super().__init__(content=b"", status_code=status_code) + self._url = url + + def raise_for_status(self) -> None: + request = httpx.Request("GET", self._url) + response = httpx.Response(self.status_code, request=request) + raise httpx.HTTPStatusError( + f"download failed with status {self.status_code}", + request=request, + response=response, + ) + + @pytest.mark.asyncio async def test_download_media_item_uses_full_url_when_present(tmp_path) -> None: channel, _bus = _make_channel() @@ -789,6 +804,37 @@ async def test_download_media_item_uses_full_url_when_present(tmp_path) -> None: channel._client.get.assert_awaited_once_with(full_url) +@pytest.mark.asyncio +async def test_download_media_item_falls_back_when_full_url_returns_retryable_error(tmp_path) -> None: + channel, _bus = _make_channel() + weixin_mod.get_media_dir = lambda _name: tmp_path + + full_url = "https://cdn.example.test/download/full?taskid=123" + channel._client = SimpleNamespace( + get=AsyncMock( + side_effect=[ + _DummyErrorDownloadResponse(full_url, 500), + _DummyDownloadResponse(content=b"fallback-bytes"), + ] + ) + ) + + item = { + "media": { + "full_url": full_url, + "encrypt_query_param": "enc-fallback", + }, + } + saved_path = await channel._download_media_item(item, "image") + + assert saved_path is not None + assert Path(saved_path).read_bytes() == b"fallback-bytes" + assert channel._client.get.await_count == 2 + assert channel._client.get.await_args_list[0].args[0] == full_url + fallback_url = channel._client.get.await_args_list[1].args[0] + assert fallback_url.startswith(f"{channel.config.cdn_base_url}/download?encrypted_query_param=enc-fallback") + + @pytest.mark.asyncio async def test_download_media_item_falls_back_to_encrypt_query_param(tmp_path) -> None: channel, _bus = _make_channel() @@ -807,6 +853,23 @@ async def test_download_media_item_falls_back_to_encrypt_query_param(tmp_path) - assert called_url.startswith(f"{channel.config.cdn_base_url}/download?encrypted_query_param=enc-fallback") +@pytest.mark.asyncio +async def test_download_media_item_does_not_retry_when_full_url_fails_without_fallback(tmp_path) -> None: + channel, _bus = _make_channel() + weixin_mod.get_media_dir = lambda _name: tmp_path + + full_url = "https://cdn.example.test/download/full" + channel._client = SimpleNamespace( + get=AsyncMock(return_value=_DummyErrorDownloadResponse(full_url, 500)) + ) + + item = {"media": {"full_url": full_url}} + saved_path = await channel._download_media_item(item, "image") + + assert saved_path is None + channel._client.get.assert_awaited_once_with(full_url) + + @pytest.mark.asyncio async def test_download_media_item_non_image_requires_aes_key_even_with_full_url(tmp_path) -> None: channel, _bus = _make_channel() From bc8fbd1ce4496b87860f6a6d334a116a1b4fb6ce Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Tue, 31 Mar 2026 11:34:33 +0000 Subject: [PATCH 10/10] fix(weixin): reset QR poll host after refresh --- nanobot/channels/weixin.py | 1 + tests/channels/test_weixin_channel.py | 35 +++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/nanobot/channels/weixin.py b/nanobot/channels/weixin.py index c6c1603ae..891cfd099 100644 --- a/nanobot/channels/weixin.py +++ b/nanobot/channels/weixin.py @@ -385,6 +385,7 @@ class WeixinChannel(BaseChannel): ) return False qrcode_id, scan_url = await self._fetch_qr_code() + current_poll_base_url = self.config.base_url self._print_qr_code(scan_url) continue # status == "wait" — keep polling diff --git a/tests/channels/test_weixin_channel.py b/tests/channels/test_weixin_channel.py index 515eaa28b..58fc30865 100644 --- a/tests/channels/test_weixin_channel.py +++ b/tests/channels/test_weixin_channel.py @@ -519,6 +519,41 @@ async def test_qr_login_redirect_without_host_keeps_current_polling_base_url() - assert second_call.kwargs["base_url"] == "https://ilinkai.weixin.qq.com" +@pytest.mark.asyncio +async def test_qr_login_resets_redirect_base_url_after_qr_refresh() -> None: + channel, _bus = _make_channel() + channel._running = True + channel._save_state = lambda: None + channel._print_qr_code = lambda url: None + channel._fetch_qr_code = AsyncMock(side_effect=[("qr-1", "url-1"), ("qr-2", "url-2")]) + + channel._api_get_with_base = AsyncMock( + side_effect=[ + {"status": "scaned_but_redirect", "redirect_host": "idc.redirect.test"}, + {"status": "expired"}, + { + "status": "confirmed", + "bot_token": "token-5", + "ilink_bot_id": "bot-5", + "baseurl": "https://example.test", + "ilink_user_id": "wx-user", + }, + ] + ) + + ok = await channel._qr_login() + + assert ok is True + assert channel._token == "token-5" + assert channel._api_get_with_base.await_count == 3 + first_call = channel._api_get_with_base.await_args_list[0] + second_call = channel._api_get_with_base.await_args_list[1] + third_call = channel._api_get_with_base.await_args_list[2] + assert first_call.kwargs["base_url"] == "https://ilinkai.weixin.qq.com" + assert second_call.kwargs["base_url"] == "https://idc.redirect.test" + assert third_call.kwargs["base_url"] == "https://ilinkai.weixin.qq.com" + + @pytest.mark.asyncio async def test_process_message_skips_bot_messages() -> None: channel, bus = _make_channel()