diff --git a/tests/channels/test_email_channel.py b/tests/channels/test_email_channel.py index cb5aed45e..b6cea9263 100644 --- a/tests/channels/test_email_channel.py +++ b/tests/channels/test_email_channel.py @@ -1001,3 +1001,286 @@ def test_extract_attachments_sanitizes_filename(tmp_path, monkeypatch) -> None: saved_path = Path(items[0]["media"][0]) # File must be inside the media dir, not escaped via path traversal assert saved_path.parent == tmp_path + + +# --------------------------------------------------------------------------- +# Agent-initiated file attachment tests (send with media) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_send_with_single_file_attachment(tmp_path, monkeypatch) -> None: + """Agent sends an email with a single file attached.""" + sent_messages: list[EmailMessage] = [] + + class FakeSMTP: + def __init__(self, _host: str, _port: int, timeout: int = 30) -> None: + self.timeout = timeout + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def starttls(self, context=None): + return None + + def login(self, _user: str, _pw: str): + return None + + def send_message(self, msg: EmailMessage): + sent_messages.append(msg) + + monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", lambda h, p, timeout=30: FakeSMTP(h, p, timeout=timeout)) + + # Create a real temp file to attach + attachment = tmp_path / "report.pdf" + attachment.write_bytes(b"%PDF-1.4 fake pdf content") + + channel = EmailChannel(_make_config(), MessageBus()) + await channel.send( + OutboundMessage( + channel="email", + chat_id="alice@example.com", + content="Please find the report attached.", + media=[str(attachment)], + ) + ) + + assert len(sent_messages) == 1 + sent = sent_messages[0] + assert sent["To"] == "alice@example.com" + assert sent.is_multipart(), "Email with attachment should be multipart" + + # Walk parts to find the attachment + attachment_parts = [] + for part in sent.walk(): + if part.get_content_disposition() == "attachment": + attachment_parts.append(part) + assert len(attachment_parts) == 1 + att = attachment_parts[0] + assert att.get_filename() == "report.pdf" + assert att.get_content_type() == "application/pdf" + assert att.get_payload(decode=True) == b"%PDF-1.4 fake pdf content" + + +@pytest.mark.asyncio +async def test_send_with_multiple_file_attachments(tmp_path, monkeypatch) -> None: + """Agent sends an email with multiple files attached.""" + sent_messages: list[EmailMessage] = [] + + class FakeSMTP: + def __init__(self, _host: str, _port: int, timeout: int = 30) -> None: + self.timeout = timeout + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def starttls(self, context=None): + return None + + def login(self, _user: str, _pw: str): + return None + + def send_message(self, msg: EmailMessage): + sent_messages.append(msg) + + monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", lambda h, p, timeout=30: FakeSMTP(h, p, timeout=timeout)) + + file1 = tmp_path / "doc.pdf" + file1.write_bytes(b"%PDF-1.4 doc") + file2 = tmp_path / "image.png" + file2.write_bytes(b"\x89PNG fake image") + file3 = tmp_path / "notes.txt" + file3.write_bytes(b"Hello, this is a text note.") + + channel = EmailChannel(_make_config(), MessageBus()) + await channel.send( + OutboundMessage( + channel="email", + chat_id="bob@example.com", + content="Multiple files attached.", + media=[str(file1), str(file2), str(file3)], + ) + ) + + assert len(sent_messages) == 1 + sent = sent_messages[0] + assert sent.is_multipart() + + attachment_parts = [] + for part in sent.walk(): + if part.get_content_disposition() == "attachment": + attachment_parts.append(part) + assert len(attachment_parts) == 3 + + filenames = {p.get_filename() for p in attachment_parts} + assert filenames == {"doc.pdf", "image.png", "notes.txt"} + + +@pytest.mark.asyncio +async def test_send_skips_missing_attachment_file(tmp_path, monkeypatch) -> None: + """Non-existent attachment file is skipped without breaking the send.""" + sent_messages: list[EmailMessage] = [] + + class FakeSMTP: + def __init__(self, _host: str, _port: int, timeout: int = 30) -> None: + self.timeout = timeout + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def starttls(self, context=None): + return None + + def login(self, _user: str, _pw: str): + return None + + def send_message(self, msg: EmailMessage): + sent_messages.append(msg) + + monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", lambda h, p, timeout=30: FakeSMTP(h, p, timeout=timeout)) + + existing = tmp_path / "real.txt" + existing.write_text("I exist") + + channel = EmailChannel(_make_config(), MessageBus()) + await channel.send( + OutboundMessage( + channel="email", + chat_id="alice@example.com", + content="One attachment is missing.", + media=[ + str(existing), + str(tmp_path / "nonexistent.pdf"), + ], + ) + ) + + assert len(sent_messages) == 1 + sent = sent_messages[0] + assert sent.is_multipart() + + attachment_parts = [] + for part in sent.walk(): + if part.get_content_disposition() == "attachment": + attachment_parts.append(part) + # Only the existing file should be attached + assert len(attachment_parts) == 1 + assert attachment_parts[0].get_filename() == "real.txt" + + +@pytest.mark.asyncio +async def test_send_with_unknown_mime_type_attachment(tmp_path, monkeypatch) -> None: + """File with unknown extension gets application/octet-stream MIME type.""" + sent_messages: list[EmailMessage] = [] + + class FakeSMTP: + def __init__(self, _host: str, _port: int, timeout: int = 30) -> None: + self.timeout = timeout + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def starttls(self, context=None): + return None + + def login(self, _user: str, _pw: str): + return None + + def send_message(self, msg: EmailMessage): + sent_messages.append(msg) + + monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", lambda h, p, timeout=30: FakeSMTP(h, p, timeout=timeout)) + + attachment = tmp_path / "data.unknown_ext_xyz" + attachment.write_bytes(b"some binary data") + + channel = EmailChannel(_make_config(), MessageBus()) + await channel.send( + OutboundMessage( + channel="email", + chat_id="alice@example.com", + content="Unknown MIME type.", + media=[str(attachment)], + ) + ) + + assert len(sent_messages) == 1 + sent = sent_messages[0] + assert sent.is_multipart() + + attachment_parts = [] + for part in sent.walk(): + if part.get_content_disposition() == "attachment": + attachment_parts.append(part) + assert len(attachment_parts) == 1 + att = attachment_parts[0] + assert att.get_content_type() == "application/octet-stream" + assert att.get_filename() == "data.unknown_ext_xyz" + + +@pytest.mark.asyncio +async def test_send_with_media_and_reply_subject_and_in_reply_to(tmp_path, monkeypatch) -> None: + """Attachments work together with reply subject and In-Reply-To headers.""" + sent_messages: list[EmailMessage] = [] + + class FakeSMTP: + def __init__(self, _host: str, _port: int, timeout: int = 30) -> None: + self.timeout = timeout + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def starttls(self, context=None): + return None + + def login(self, _user: str, _pw: str): + return None + + def send_message(self, msg: EmailMessage): + sent_messages.append(msg) + + monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", lambda h, p, timeout=30: FakeSMTP(h, p, timeout=timeout)) + + attachment = tmp_path / "summary.pdf" + attachment.write_bytes(b"%PDF-1.4 summary") + + channel = EmailChannel(_make_config(), MessageBus()) + channel._last_subject_by_chat["alice@example.com"] = "Original subject" + channel._last_message_id_by_chat["alice@example.com"] = "" + + await channel.send( + OutboundMessage( + channel="email", + chat_id="alice@example.com", + content="Reply with attachment.", + media=[str(attachment)], + ) + ) + + assert len(sent_messages) == 1 + sent = sent_messages[0] + assert sent["Subject"] == "Re: Original subject" + assert sent["In-Reply-To"] == "" + assert sent["References"] == "" + + attachment_parts = [] + for part in sent.walk(): + if part.get_content_disposition() == "attachment": + attachment_parts.append(part) + assert len(attachment_parts) == 1 + assert attachment_parts[0].get_filename() == "summary.pdf"