mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-06-13 14:23:58 +00:00
test(email): cover agent-initiated file attachments in outbound messages
This commit is contained in:
parent
25bb053206
commit
82a3fd03b1
@ -1001,3 +1001,286 @@ def test_extract_attachments_sanitizes_filename(tmp_path, monkeypatch) -> None:
|
||||
saved_path = Path(items[0]["media"][0])
|
||||
# File must be inside the media dir, not escaped via path traversal
|
||||
assert saved_path.parent == tmp_path
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Agent-initiated file attachment tests (send with media)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_with_single_file_attachment(tmp_path, monkeypatch) -> None:
|
||||
"""Agent sends an email with a single file attached."""
|
||||
sent_messages: list[EmailMessage] = []
|
||||
|
||||
class FakeSMTP:
|
||||
def __init__(self, _host: str, _port: int, timeout: int = 30) -> None:
|
||||
self.timeout = timeout
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def starttls(self, context=None):
|
||||
return None
|
||||
|
||||
def login(self, _user: str, _pw: str):
|
||||
return None
|
||||
|
||||
def send_message(self, msg: EmailMessage):
|
||||
sent_messages.append(msg)
|
||||
|
||||
monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", lambda h, p, timeout=30: FakeSMTP(h, p, timeout=timeout))
|
||||
|
||||
# Create a real temp file to attach
|
||||
attachment = tmp_path / "report.pdf"
|
||||
attachment.write_bytes(b"%PDF-1.4 fake pdf content")
|
||||
|
||||
channel = EmailChannel(_make_config(), MessageBus())
|
||||
await channel.send(
|
||||
OutboundMessage(
|
||||
channel="email",
|
||||
chat_id="alice@example.com",
|
||||
content="Please find the report attached.",
|
||||
media=[str(attachment)],
|
||||
)
|
||||
)
|
||||
|
||||
assert len(sent_messages) == 1
|
||||
sent = sent_messages[0]
|
||||
assert sent["To"] == "alice@example.com"
|
||||
assert sent.is_multipart(), "Email with attachment should be multipart"
|
||||
|
||||
# Walk parts to find the attachment
|
||||
attachment_parts = []
|
||||
for part in sent.walk():
|
||||
if part.get_content_disposition() == "attachment":
|
||||
attachment_parts.append(part)
|
||||
assert len(attachment_parts) == 1
|
||||
att = attachment_parts[0]
|
||||
assert att.get_filename() == "report.pdf"
|
||||
assert att.get_content_type() == "application/pdf"
|
||||
assert att.get_payload(decode=True) == b"%PDF-1.4 fake pdf content"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_with_multiple_file_attachments(tmp_path, monkeypatch) -> None:
|
||||
"""Agent sends an email with multiple files attached."""
|
||||
sent_messages: list[EmailMessage] = []
|
||||
|
||||
class FakeSMTP:
|
||||
def __init__(self, _host: str, _port: int, timeout: int = 30) -> None:
|
||||
self.timeout = timeout
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def starttls(self, context=None):
|
||||
return None
|
||||
|
||||
def login(self, _user: str, _pw: str):
|
||||
return None
|
||||
|
||||
def send_message(self, msg: EmailMessage):
|
||||
sent_messages.append(msg)
|
||||
|
||||
monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", lambda h, p, timeout=30: FakeSMTP(h, p, timeout=timeout))
|
||||
|
||||
file1 = tmp_path / "doc.pdf"
|
||||
file1.write_bytes(b"%PDF-1.4 doc")
|
||||
file2 = tmp_path / "image.png"
|
||||
file2.write_bytes(b"\x89PNG fake image")
|
||||
file3 = tmp_path / "notes.txt"
|
||||
file3.write_bytes(b"Hello, this is a text note.")
|
||||
|
||||
channel = EmailChannel(_make_config(), MessageBus())
|
||||
await channel.send(
|
||||
OutboundMessage(
|
||||
channel="email",
|
||||
chat_id="bob@example.com",
|
||||
content="Multiple files attached.",
|
||||
media=[str(file1), str(file2), str(file3)],
|
||||
)
|
||||
)
|
||||
|
||||
assert len(sent_messages) == 1
|
||||
sent = sent_messages[0]
|
||||
assert sent.is_multipart()
|
||||
|
||||
attachment_parts = []
|
||||
for part in sent.walk():
|
||||
if part.get_content_disposition() == "attachment":
|
||||
attachment_parts.append(part)
|
||||
assert len(attachment_parts) == 3
|
||||
|
||||
filenames = {p.get_filename() for p in attachment_parts}
|
||||
assert filenames == {"doc.pdf", "image.png", "notes.txt"}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_skips_missing_attachment_file(tmp_path, monkeypatch) -> None:
|
||||
"""Non-existent attachment file is skipped without breaking the send."""
|
||||
sent_messages: list[EmailMessage] = []
|
||||
|
||||
class FakeSMTP:
|
||||
def __init__(self, _host: str, _port: int, timeout: int = 30) -> None:
|
||||
self.timeout = timeout
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def starttls(self, context=None):
|
||||
return None
|
||||
|
||||
def login(self, _user: str, _pw: str):
|
||||
return None
|
||||
|
||||
def send_message(self, msg: EmailMessage):
|
||||
sent_messages.append(msg)
|
||||
|
||||
monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", lambda h, p, timeout=30: FakeSMTP(h, p, timeout=timeout))
|
||||
|
||||
existing = tmp_path / "real.txt"
|
||||
existing.write_text("I exist")
|
||||
|
||||
channel = EmailChannel(_make_config(), MessageBus())
|
||||
await channel.send(
|
||||
OutboundMessage(
|
||||
channel="email",
|
||||
chat_id="alice@example.com",
|
||||
content="One attachment is missing.",
|
||||
media=[
|
||||
str(existing),
|
||||
str(tmp_path / "nonexistent.pdf"),
|
||||
],
|
||||
)
|
||||
)
|
||||
|
||||
assert len(sent_messages) == 1
|
||||
sent = sent_messages[0]
|
||||
assert sent.is_multipart()
|
||||
|
||||
attachment_parts = []
|
||||
for part in sent.walk():
|
||||
if part.get_content_disposition() == "attachment":
|
||||
attachment_parts.append(part)
|
||||
# Only the existing file should be attached
|
||||
assert len(attachment_parts) == 1
|
||||
assert attachment_parts[0].get_filename() == "real.txt"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_with_unknown_mime_type_attachment(tmp_path, monkeypatch) -> None:
|
||||
"""File with unknown extension gets application/octet-stream MIME type."""
|
||||
sent_messages: list[EmailMessage] = []
|
||||
|
||||
class FakeSMTP:
|
||||
def __init__(self, _host: str, _port: int, timeout: int = 30) -> None:
|
||||
self.timeout = timeout
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def starttls(self, context=None):
|
||||
return None
|
||||
|
||||
def login(self, _user: str, _pw: str):
|
||||
return None
|
||||
|
||||
def send_message(self, msg: EmailMessage):
|
||||
sent_messages.append(msg)
|
||||
|
||||
monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", lambda h, p, timeout=30: FakeSMTP(h, p, timeout=timeout))
|
||||
|
||||
attachment = tmp_path / "data.unknown_ext_xyz"
|
||||
attachment.write_bytes(b"some binary data")
|
||||
|
||||
channel = EmailChannel(_make_config(), MessageBus())
|
||||
await channel.send(
|
||||
OutboundMessage(
|
||||
channel="email",
|
||||
chat_id="alice@example.com",
|
||||
content="Unknown MIME type.",
|
||||
media=[str(attachment)],
|
||||
)
|
||||
)
|
||||
|
||||
assert len(sent_messages) == 1
|
||||
sent = sent_messages[0]
|
||||
assert sent.is_multipart()
|
||||
|
||||
attachment_parts = []
|
||||
for part in sent.walk():
|
||||
if part.get_content_disposition() == "attachment":
|
||||
attachment_parts.append(part)
|
||||
assert len(attachment_parts) == 1
|
||||
att = attachment_parts[0]
|
||||
assert att.get_content_type() == "application/octet-stream"
|
||||
assert att.get_filename() == "data.unknown_ext_xyz"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_with_media_and_reply_subject_and_in_reply_to(tmp_path, monkeypatch) -> None:
|
||||
"""Attachments work together with reply subject and In-Reply-To headers."""
|
||||
sent_messages: list[EmailMessage] = []
|
||||
|
||||
class FakeSMTP:
|
||||
def __init__(self, _host: str, _port: int, timeout: int = 30) -> None:
|
||||
self.timeout = timeout
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def starttls(self, context=None):
|
||||
return None
|
||||
|
||||
def login(self, _user: str, _pw: str):
|
||||
return None
|
||||
|
||||
def send_message(self, msg: EmailMessage):
|
||||
sent_messages.append(msg)
|
||||
|
||||
monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", lambda h, p, timeout=30: FakeSMTP(h, p, timeout=timeout))
|
||||
|
||||
attachment = tmp_path / "summary.pdf"
|
||||
attachment.write_bytes(b"%PDF-1.4 summary")
|
||||
|
||||
channel = EmailChannel(_make_config(), MessageBus())
|
||||
channel._last_subject_by_chat["alice@example.com"] = "Original subject"
|
||||
channel._last_message_id_by_chat["alice@example.com"] = "<orig@example.com>"
|
||||
|
||||
await channel.send(
|
||||
OutboundMessage(
|
||||
channel="email",
|
||||
chat_id="alice@example.com",
|
||||
content="Reply with attachment.",
|
||||
media=[str(attachment)],
|
||||
)
|
||||
)
|
||||
|
||||
assert len(sent_messages) == 1
|
||||
sent = sent_messages[0]
|
||||
assert sent["Subject"] == "Re: Original subject"
|
||||
assert sent["In-Reply-To"] == "<orig@example.com>"
|
||||
assert sent["References"] == "<orig@example.com>"
|
||||
|
||||
attachment_parts = []
|
||||
for part in sent.walk():
|
||||
if part.get_content_disposition() == "attachment":
|
||||
attachment_parts.append(part)
|
||||
assert len(attachment_parts) == 1
|
||||
assert attachment_parts[0].get_filename() == "summary.pdf"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user