From d0527a8cf4fb85e6a5bb0b427b3325fb615df3d0 Mon Sep 17 00:00:00 2001 From: Ben Lenarts Date: Sun, 5 Apr 2026 08:06:28 +0200 Subject: [PATCH] feat(email): add attachment extraction support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Save inbound email attachments to the media directory with configurable MIME type filtering (glob patterns like "image/*"), per-attachment size limits, and max attachment count. Filenames are sanitized to prevent path traversal. Controlled by allowed_attachment_types — empty (default) means disabled, non-empty enables extraction for matching types. --- nanobot/channels/email.py | 81 ++++++++++ tests/channels/test_email_channel.py | 222 +++++++++++++++++++++++++++ 2 files changed, 303 insertions(+) diff --git a/nanobot/channels/email.py b/nanobot/channels/email.py index bee2ceccd..15edff490 100644 --- a/nanobot/channels/email.py +++ b/nanobot/channels/email.py @@ -1,6 +1,7 @@ """Email channel implementation using IMAP polling + SMTP replies.""" import asyncio +from fnmatch import fnmatch import html import imaplib import re @@ -14,13 +15,17 @@ from email.parser import BytesParser from email.utils import parseaddr from typing import Any +from pathlib import Path + from loguru import logger from pydantic import Field from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.channels.base import BaseChannel +from nanobot.config.paths import get_media_dir from nanobot.config.schema import Base +from nanobot.utils.helpers import safe_filename class EmailConfig(Base): @@ -55,6 +60,11 @@ class EmailConfig(Base): verify_dkim: bool = True # Require Authentication-Results with dkim=pass verify_spf: bool = True # Require Authentication-Results with spf=pass + # Attachment handling — set allowed types to enable (e.g. ["application/pdf", "image/*"], or ["*"] for all) + allowed_attachment_types: list[str] = Field(default_factory=list) + max_attachment_size: int = 2_000_000 # 2MB per attachment + max_attachments_per_email: int = 5 + class EmailChannel(BaseChannel): """ @@ -153,6 +163,7 @@ class EmailChannel(BaseChannel): sender_id=sender, chat_id=sender, content=item["content"], + media=item.get("media") or None, metadata=item.get("metadata", {}), ) except Exception as e: @@ -404,6 +415,20 @@ class EmailChannel(BaseChannel): f"{body}" ) + # --- Attachment extraction --- + attachment_paths: list[str] = [] + if self.config.allowed_attachment_types: + saved = self._extract_attachments( + parsed, + uid or "noid", + allowed_types=self.config.allowed_attachment_types, + max_size=self.config.max_attachment_size, + max_count=self.config.max_attachments_per_email, + ) + for p in saved: + attachment_paths.append(str(p)) + content += f"\n[attachment: {p.name} — saved to {p}]" + metadata = { "message_id": message_id, "subject": subject, @@ -418,6 +443,7 @@ class EmailChannel(BaseChannel): "message_id": message_id, "content": content, "metadata": metadata, + "media": attachment_paths, } ) @@ -537,6 +563,61 @@ class EmailChannel(BaseChannel): dkim_pass = True return spf_pass, dkim_pass + @classmethod + def _extract_attachments( + cls, + msg: Any, + uid: str, + *, + allowed_types: list[str], + max_size: int, + max_count: int, + ) -> list[Path]: + """Extract and save email attachments to the media directory. + + Returns list of saved file paths. + """ + if not msg.is_multipart(): + return [] + + saved: list[Path] = [] + media_dir = get_media_dir("email") + + for part in msg.walk(): + if len(saved) >= max_count: + break + if part.get_content_disposition() != "attachment": + continue + + content_type = part.get_content_type() + if not any(fnmatch(content_type, pat) for pat in allowed_types): + logger.debug("Email attachment skipped (type {}): not in allowed list", content_type) + continue + + payload = part.get_payload(decode=True) + if payload is None: + continue + if len(payload) > max_size: + logger.warning( + "Email attachment skipped: size {} exceeds limit {}", + len(payload), + max_size, + ) + continue + + raw_name = part.get_filename() or "attachment" + sanitized = safe_filename(raw_name) or "attachment" + dest = media_dir / f"{uid}_{sanitized}" + + try: + dest.write_bytes(payload) + saved.append(dest) + logger.info("Email attachment saved: {}", dest) + except Exception as exc: + logger.warning("Failed to save email attachment {}: {}", dest, exc) + + return saved + @staticmethod def _html_to_text(raw_html: str) -> str: text = re.sub(r"<\s*br\s*/?>", "\n", raw_html, flags=re.IGNORECASE) diff --git a/tests/channels/test_email_channel.py b/tests/channels/test_email_channel.py index 2d0e33ce3..6d6d2f74f 100644 --- a/tests/channels/test_email_channel.py +++ b/tests/channels/test_email_channel.py @@ -1,5 +1,6 @@ from email.message import EmailMessage from datetime import date +from pathlib import Path import imaplib import pytest @@ -650,3 +651,224 @@ def test_check_authentication_results_method() -> None: spf, dkim = EmailChannel._check_authentication_results(parsed) assert spf is False assert dkim is True + + +# --------------------------------------------------------------------------- +# Attachment extraction tests +# --------------------------------------------------------------------------- + + +def _make_raw_email_with_attachment( + from_addr: str = "alice@example.com", + subject: str = "With attachment", + body: str = "See attached.", + attachment_name: str = "doc.pdf", + attachment_content: bytes = b"%PDF-1.4 fake pdf content", + attachment_mime: str = "application/pdf", + auth_results: str | None = None, +) -> bytes: + msg = EmailMessage() + msg["From"] = from_addr + msg["To"] = "bot@example.com" + msg["Subject"] = subject + msg["Message-ID"] = "" + if auth_results: + msg["Authentication-Results"] = auth_results + msg.set_content(body) + maintype, subtype = attachment_mime.split("/", 1) + msg.add_attachment( + attachment_content, + maintype=maintype, + subtype=subtype, + filename=attachment_name, + ) + return msg.as_bytes() + + +def test_extract_attachments_saves_pdf(tmp_path, monkeypatch) -> None: + """PDF attachment is saved to media dir and path returned in media list.""" + monkeypatch.setattr("nanobot.channels.email.get_media_dir", lambda ch: tmp_path) + + raw = _make_raw_email_with_attachment() + fake = _make_fake_imap(raw) + monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake) + + cfg = _make_config(allowed_attachment_types=["application/pdf"], verify_dkim=False, verify_spf=False) + channel = EmailChannel(cfg, MessageBus()) + items = channel._fetch_new_messages() + + assert len(items) == 1 + assert len(items[0]["media"]) == 1 + saved_path = Path(items[0]["media"][0]) + assert saved_path.exists() + assert saved_path.read_bytes() == b"%PDF-1.4 fake pdf content" + assert "500_doc.pdf" in saved_path.name + assert "[attachment:" in items[0]["content"] + + +def test_extract_attachments_disabled_by_default(monkeypatch) -> None: + """With no allowed_attachment_types (default), no attachments are extracted.""" + raw = _make_raw_email_with_attachment() + fake = _make_fake_imap(raw) + monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake) + + cfg = _make_config(verify_dkim=False, verify_spf=False) + assert cfg.allowed_attachment_types == [] + channel = EmailChannel(cfg, MessageBus()) + items = channel._fetch_new_messages() + + assert len(items) == 1 + assert items[0]["media"] == [] + assert "[attachment:" not in items[0]["content"] + + +def test_extract_attachments_mime_type_filter(tmp_path, monkeypatch) -> None: + """Non-allowed MIME types are skipped.""" + monkeypatch.setattr("nanobot.channels.email.get_media_dir", lambda ch: tmp_path) + + raw = _make_raw_email_with_attachment( + attachment_name="image.png", + attachment_content=b"\x89PNG fake", + attachment_mime="image/png", + ) + fake = _make_fake_imap(raw) + monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake) + + cfg = _make_config( + allowed_attachment_types=["application/pdf"], + verify_dkim=False, + verify_spf=False, + ) + channel = EmailChannel(cfg, MessageBus()) + items = channel._fetch_new_messages() + + assert len(items) == 1 + assert items[0]["media"] == [] + + +def test_extract_attachments_empty_allowed_types_rejects_all(tmp_path, monkeypatch) -> None: + """Empty allowed_attachment_types means no types are accepted.""" + monkeypatch.setattr("nanobot.channels.email.get_media_dir", lambda ch: tmp_path) + + raw = _make_raw_email_with_attachment( + attachment_name="image.png", + attachment_content=b"\x89PNG fake", + attachment_mime="image/png", + ) + fake = _make_fake_imap(raw) + monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake) + + cfg = _make_config( + allowed_attachment_types=[], + verify_dkim=False, + verify_spf=False, + ) + channel = EmailChannel(cfg, MessageBus()) + items = channel._fetch_new_messages() + + assert len(items) == 1 + assert items[0]["media"] == [] + + +def test_extract_attachments_wildcard_pattern(tmp_path, monkeypatch) -> None: + """Glob patterns like 'image/*' match attachment MIME types.""" + monkeypatch.setattr("nanobot.channels.email.get_media_dir", lambda ch: tmp_path) + + raw = _make_raw_email_with_attachment( + attachment_name="photo.jpg", + attachment_content=b"\xff\xd8\xff fake jpeg", + attachment_mime="image/jpeg", + ) + fake = _make_fake_imap(raw) + monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake) + + cfg = _make_config( + allowed_attachment_types=["image/*"], + verify_dkim=False, + verify_spf=False, + ) + channel = EmailChannel(cfg, MessageBus()) + items = channel._fetch_new_messages() + + assert len(items) == 1 + assert len(items[0]["media"]) == 1 + + +def test_extract_attachments_size_limit(tmp_path, monkeypatch) -> None: + """Attachments exceeding max_attachment_size are skipped.""" + monkeypatch.setattr("nanobot.channels.email.get_media_dir", lambda ch: tmp_path) + + raw = _make_raw_email_with_attachment( + attachment_content=b"x" * 1000, + ) + fake = _make_fake_imap(raw) + monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake) + + cfg = _make_config( + allowed_attachment_types=["*"], + max_attachment_size=500, + verify_dkim=False, + verify_spf=False, + ) + channel = EmailChannel(cfg, MessageBus()) + items = channel._fetch_new_messages() + + assert len(items) == 1 + assert items[0]["media"] == [] + + +def test_extract_attachments_max_count(tmp_path, monkeypatch) -> None: + """Only max_attachments_per_email are saved.""" + monkeypatch.setattr("nanobot.channels.email.get_media_dir", lambda ch: tmp_path) + + # Build email with 3 attachments + msg = EmailMessage() + msg["From"] = "alice@example.com" + msg["To"] = "bot@example.com" + msg["Subject"] = "Many attachments" + msg["Message-ID"] = "" + msg.set_content("See attached.") + for i in range(3): + msg.add_attachment( + f"content {i}".encode(), + maintype="application", + subtype="pdf", + filename=f"doc{i}.pdf", + ) + raw = msg.as_bytes() + + fake = _make_fake_imap(raw) + monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake) + + cfg = _make_config( + allowed_attachment_types=["*"], + max_attachments_per_email=2, + verify_dkim=False, + verify_spf=False, + ) + channel = EmailChannel(cfg, MessageBus()) + items = channel._fetch_new_messages() + + assert len(items) == 1 + assert len(items[0]["media"]) == 2 + + +def test_extract_attachments_sanitizes_filename(tmp_path, monkeypatch) -> None: + """Path traversal in filenames is neutralized.""" + monkeypatch.setattr("nanobot.channels.email.get_media_dir", lambda ch: tmp_path) + + raw = _make_raw_email_with_attachment( + attachment_name="../../../etc/passwd", + ) + fake = _make_fake_imap(raw) + monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake) + + cfg = _make_config(allowed_attachment_types=["*"], verify_dkim=False, verify_spf=False) + channel = EmailChannel(cfg, MessageBus()) + items = channel._fetch_new_messages() + + assert len(items) == 1 + assert len(items[0]["media"]) == 1 + saved_path = Path(items[0]["media"][0]) + # File must be inside the media dir, not escaped via path traversal + assert saved_path.parent == tmp_path