mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-04-10 21:23:39 +00:00
Added bug fix to Dingtalk by zipping html to prevent raw failure
This commit is contained in:
parent
42624f5bf3
commit
3cc2ebeef7
@ -5,6 +5,8 @@ import json
|
||||
import mimetypes
|
||||
import os
|
||||
import time
|
||||
import zipfile
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib.parse import unquote, urlparse
|
||||
@ -171,6 +173,7 @@ class DingTalkChannel(BaseChannel):
|
||||
_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp"}
|
||||
_AUDIO_EXTS = {".amr", ".mp3", ".wav", ".ogg", ".m4a", ".aac"}
|
||||
_VIDEO_EXTS = {".mp4", ".mov", ".avi", ".mkv", ".webm"}
|
||||
_ZIP_BEFORE_UPLOAD_EXTS = {".htm", ".html"}
|
||||
|
||||
@classmethod
|
||||
def default_config(cls) -> dict[str, Any]:
|
||||
@ -287,6 +290,31 @@ class DingTalkChannel(BaseChannel):
|
||||
name = os.path.basename(urlparse(media_ref).path)
|
||||
return name or {"image": "image.jpg", "voice": "audio.amr", "video": "video.mp4"}.get(upload_type, "file.bin")
|
||||
|
||||
@staticmethod
|
||||
def _zip_bytes(filename: str, data: bytes) -> tuple[bytes, str, str]:
|
||||
stem = Path(filename).stem or "attachment"
|
||||
safe_name = filename or "attachment.bin"
|
||||
zip_name = f"{stem}.zip"
|
||||
buffer = BytesIO()
|
||||
with zipfile.ZipFile(buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as archive:
|
||||
archive.writestr(safe_name, data)
|
||||
return buffer.getvalue(), zip_name, "application/zip"
|
||||
|
||||
def _normalize_upload_payload(
|
||||
self,
|
||||
filename: str,
|
||||
data: bytes,
|
||||
content_type: str | None,
|
||||
) -> tuple[bytes, str, str | None]:
|
||||
ext = Path(filename).suffix.lower()
|
||||
if ext in self._ZIP_BEFORE_UPLOAD_EXTS or content_type == "text/html":
|
||||
logger.info(
|
||||
"DingTalk does not accept raw HTML attachments, zipping {} before upload",
|
||||
filename,
|
||||
)
|
||||
return self._zip_bytes(filename, data)
|
||||
return data, filename, content_type
|
||||
|
||||
async def _read_media_bytes(
|
||||
self,
|
||||
media_ref: str,
|
||||
@ -444,6 +472,7 @@ class DingTalkChannel(BaseChannel):
|
||||
return False
|
||||
|
||||
filename = filename or self._guess_filename(media_ref, upload_type)
|
||||
data, filename, content_type = self._normalize_upload_payload(filename, data, content_type)
|
||||
file_type = Path(filename).suffix.lower().lstrip(".")
|
||||
if not file_type:
|
||||
guessed = mimetypes.guess_extension(content_type or "")
|
||||
|
||||
@ -1,4 +1,6 @@
|
||||
import asyncio
|
||||
import zipfile
|
||||
from io import BytesIO
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
@ -221,3 +223,78 @@ async def test_download_dingtalk_file(tmp_path, monkeypatch) -> None:
|
||||
assert "messageFiles/download" in channel._http.calls[0]["url"]
|
||||
assert channel._http.calls[0]["json"]["downloadCode"] == "code123"
|
||||
assert channel._http.calls[1]["method"] == "GET"
|
||||
|
||||
|
||||
def test_normalize_upload_payload_zips_html_attachment() -> None:
|
||||
channel = DingTalkChannel(
|
||||
DingTalkConfig(client_id="app", client_secret="secret", allow_from=["*"]),
|
||||
MessageBus(),
|
||||
)
|
||||
|
||||
data, filename, content_type = channel._normalize_upload_payload(
|
||||
"report.html",
|
||||
b"<html><body>Hello</body></html>",
|
||||
"text/html",
|
||||
)
|
||||
|
||||
assert filename == "report.zip"
|
||||
assert content_type == "application/zip"
|
||||
|
||||
archive = zipfile.ZipFile(BytesIO(data))
|
||||
assert archive.namelist() == ["report.html"]
|
||||
assert archive.read("report.html") == b"<html><body>Hello</body></html>"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_media_ref_zips_html_before_upload(tmp_path, monkeypatch) -> None:
|
||||
channel = DingTalkChannel(
|
||||
DingTalkConfig(client_id="app", client_secret="secret", allow_from=["*"]),
|
||||
MessageBus(),
|
||||
)
|
||||
|
||||
html_path = tmp_path / "report.html"
|
||||
html_path.write_text("<html><body>Hello</body></html>", encoding="utf-8")
|
||||
|
||||
captured: dict[str, object] = {}
|
||||
|
||||
async def fake_upload_media(*, token, data, media_type, filename, content_type):
|
||||
captured.update(
|
||||
{
|
||||
"token": token,
|
||||
"data": data,
|
||||
"media_type": media_type,
|
||||
"filename": filename,
|
||||
"content_type": content_type,
|
||||
}
|
||||
)
|
||||
return "media-123"
|
||||
|
||||
async def fake_send_batch_message(token, chat_id, msg_key, msg_param):
|
||||
captured.update(
|
||||
{
|
||||
"sent_token": token,
|
||||
"chat_id": chat_id,
|
||||
"msg_key": msg_key,
|
||||
"msg_param": msg_param,
|
||||
}
|
||||
)
|
||||
return True
|
||||
|
||||
monkeypatch.setattr(channel, "_upload_media", fake_upload_media)
|
||||
monkeypatch.setattr(channel, "_send_batch_message", fake_send_batch_message)
|
||||
|
||||
ok = await channel._send_media_ref("token-123", "user-1", str(html_path))
|
||||
|
||||
assert ok is True
|
||||
assert captured["media_type"] == "file"
|
||||
assert captured["filename"] == "report.zip"
|
||||
assert captured["content_type"] == "application/zip"
|
||||
assert captured["msg_key"] == "sampleFile"
|
||||
assert captured["msg_param"] == {
|
||||
"mediaId": "media-123",
|
||||
"fileName": "report.zip",
|
||||
"fileType": "zip",
|
||||
}
|
||||
|
||||
archive = zipfile.ZipFile(BytesIO(captured["data"]))
|
||||
assert archive.namelist() == ["report.html"]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user