diff --git a/nanobot/channels/dingtalk.py b/nanobot/channels/dingtalk.py index ab12211e8..39b5818bd 100644 --- a/nanobot/channels/dingtalk.py +++ b/nanobot/channels/dingtalk.py @@ -5,6 +5,8 @@ import json import mimetypes import os import time +import zipfile +from io import BytesIO from pathlib import Path from typing import Any from urllib.parse import unquote, urlparse @@ -171,6 +173,7 @@ class DingTalkChannel(BaseChannel): _IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp"} _AUDIO_EXTS = {".amr", ".mp3", ".wav", ".ogg", ".m4a", ".aac"} _VIDEO_EXTS = {".mp4", ".mov", ".avi", ".mkv", ".webm"} + _ZIP_BEFORE_UPLOAD_EXTS = {".htm", ".html"} @classmethod def default_config(cls) -> dict[str, Any]: @@ -287,6 +290,31 @@ class DingTalkChannel(BaseChannel): name = os.path.basename(urlparse(media_ref).path) return name or {"image": "image.jpg", "voice": "audio.amr", "video": "video.mp4"}.get(upload_type, "file.bin") + @staticmethod + def _zip_bytes(filename: str, data: bytes) -> tuple[bytes, str, str]: + stem = Path(filename).stem or "attachment" + safe_name = filename or "attachment.bin" + zip_name = f"{stem}.zip" + buffer = BytesIO() + with zipfile.ZipFile(buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as archive: + archive.writestr(safe_name, data) + return buffer.getvalue(), zip_name, "application/zip" + + def _normalize_upload_payload( + self, + filename: str, + data: bytes, + content_type: str | None, + ) -> tuple[bytes, str, str | None]: + ext = Path(filename).suffix.lower() + if ext in self._ZIP_BEFORE_UPLOAD_EXTS or content_type == "text/html": + logger.info( + "DingTalk does not accept raw HTML attachments, zipping {} before upload", + filename, + ) + return self._zip_bytes(filename, data) + return data, filename, content_type + async def _read_media_bytes( self, media_ref: str, @@ -444,6 +472,7 @@ class DingTalkChannel(BaseChannel): return False filename = filename or self._guess_filename(media_ref, upload_type) + data, filename, content_type = self._normalize_upload_payload(filename, data, content_type) file_type = Path(filename).suffix.lower().lstrip(".") if not file_type: guessed = mimetypes.guess_extension(content_type or "") diff --git a/tests/channels/test_dingtalk_channel.py b/tests/channels/test_dingtalk_channel.py index 6894c8683..f743c4e62 100644 --- a/tests/channels/test_dingtalk_channel.py +++ b/tests/channels/test_dingtalk_channel.py @@ -1,4 +1,6 @@ import asyncio +import zipfile +from io import BytesIO from types import SimpleNamespace import pytest @@ -221,3 +223,78 @@ async def test_download_dingtalk_file(tmp_path, monkeypatch) -> None: assert "messageFiles/download" in channel._http.calls[0]["url"] assert channel._http.calls[0]["json"]["downloadCode"] == "code123" assert channel._http.calls[1]["method"] == "GET" + + +def test_normalize_upload_payload_zips_html_attachment() -> None: + channel = DingTalkChannel( + DingTalkConfig(client_id="app", client_secret="secret", allow_from=["*"]), + MessageBus(), + ) + + data, filename, content_type = channel._normalize_upload_payload( + "report.html", + b"Hello", + "text/html", + ) + + assert filename == "report.zip" + assert content_type == "application/zip" + + archive = zipfile.ZipFile(BytesIO(data)) + assert archive.namelist() == ["report.html"] + assert archive.read("report.html") == b"Hello" + + +@pytest.mark.asyncio +async def test_send_media_ref_zips_html_before_upload(tmp_path, monkeypatch) -> None: + channel = DingTalkChannel( + DingTalkConfig(client_id="app", client_secret="secret", allow_from=["*"]), + MessageBus(), + ) + + html_path = tmp_path / "report.html" + html_path.write_text("Hello", encoding="utf-8") + + captured: dict[str, object] = {} + + async def fake_upload_media(*, token, data, media_type, filename, content_type): + captured.update( + { + "token": token, + "data": data, + "media_type": media_type, + "filename": filename, + "content_type": content_type, + } + ) + return "media-123" + + async def fake_send_batch_message(token, chat_id, msg_key, msg_param): + captured.update( + { + "sent_token": token, + "chat_id": chat_id, + "msg_key": msg_key, + "msg_param": msg_param, + } + ) + return True + + monkeypatch.setattr(channel, "_upload_media", fake_upload_media) + monkeypatch.setattr(channel, "_send_batch_message", fake_send_batch_message) + + ok = await channel._send_media_ref("token-123", "user-1", str(html_path)) + + assert ok is True + assert captured["media_type"] == "file" + assert captured["filename"] == "report.zip" + assert captured["content_type"] == "application/zip" + assert captured["msg_key"] == "sampleFile" + assert captured["msg_param"] == { + "mediaId": "media-123", + "fileName": "report.zip", + "fileType": "zip", + } + + archive = zipfile.ZipFile(BytesIO(captured["data"])) + assert archive.namelist() == ["report.html"]