feat(email): add attachment extraction support

Save inbound email attachments to the media directory with configurable
MIME type filtering (glob patterns like "image/*"), per-attachment size
limits, and max attachment count. Filenames are sanitized to prevent
path traversal. Controlled by allowed_attachment_types — empty (default)
means disabled, non-empty enables extraction for matching types.
This commit is contained in:
Ben Lenarts 2026-04-05 08:06:28 +02:00 committed by Xubin Ren
parent 9174a85b4e
commit d0527a8cf4
2 changed files with 303 additions and 0 deletions

View File

@ -1,6 +1,7 @@
"""Email channel implementation using IMAP polling + SMTP replies."""
import asyncio
from fnmatch import fnmatch
import html
import imaplib
import re
@ -14,13 +15,17 @@ from email.parser import BytesParser
from email.utils import parseaddr
from typing import Any
from pathlib import Path
from loguru import logger
from pydantic import Field
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.paths import get_media_dir
from nanobot.config.schema import Base
from nanobot.utils.helpers import safe_filename
class EmailConfig(Base):
@ -55,6 +60,11 @@ class EmailConfig(Base):
verify_dkim: bool = True # Require Authentication-Results with dkim=pass
verify_spf: bool = True # Require Authentication-Results with spf=pass
# Attachment handling — set allowed types to enable (e.g. ["application/pdf", "image/*"], or ["*"] for all)
allowed_attachment_types: list[str] = Field(default_factory=list)
max_attachment_size: int = 2_000_000 # 2MB per attachment
max_attachments_per_email: int = 5
class EmailChannel(BaseChannel):
"""
@ -153,6 +163,7 @@ class EmailChannel(BaseChannel):
sender_id=sender,
chat_id=sender,
content=item["content"],
media=item.get("media") or None,
metadata=item.get("metadata", {}),
)
except Exception as e:
@ -404,6 +415,20 @@ class EmailChannel(BaseChannel):
f"{body}"
)
# --- Attachment extraction ---
attachment_paths: list[str] = []
if self.config.allowed_attachment_types:
saved = self._extract_attachments(
parsed,
uid or "noid",
allowed_types=self.config.allowed_attachment_types,
max_size=self.config.max_attachment_size,
max_count=self.config.max_attachments_per_email,
)
for p in saved:
attachment_paths.append(str(p))
content += f"\n[attachment: {p.name} — saved to {p}]"
metadata = {
"message_id": message_id,
"subject": subject,
@ -418,6 +443,7 @@ class EmailChannel(BaseChannel):
"message_id": message_id,
"content": content,
"metadata": metadata,
"media": attachment_paths,
}
)
@ -537,6 +563,61 @@ class EmailChannel(BaseChannel):
dkim_pass = True
return spf_pass, dkim_pass
@classmethod
def _extract_attachments(
cls,
msg: Any,
uid: str,
*,
allowed_types: list[str],
max_size: int,
max_count: int,
) -> list[Path]:
"""Extract and save email attachments to the media directory.
Returns list of saved file paths.
"""
if not msg.is_multipart():
return []
saved: list[Path] = []
media_dir = get_media_dir("email")
for part in msg.walk():
if len(saved) >= max_count:
break
if part.get_content_disposition() != "attachment":
continue
content_type = part.get_content_type()
if not any(fnmatch(content_type, pat) for pat in allowed_types):
logger.debug("Email attachment skipped (type {}): not in allowed list", content_type)
continue
payload = part.get_payload(decode=True)
if payload is None:
continue
if len(payload) > max_size:
logger.warning(
"Email attachment skipped: size {} exceeds limit {}",
len(payload),
max_size,
)
continue
raw_name = part.get_filename() or "attachment"
sanitized = safe_filename(raw_name) or "attachment"
dest = media_dir / f"{uid}_{sanitized}"
try:
dest.write_bytes(payload)
saved.append(dest)
logger.info("Email attachment saved: {}", dest)
except Exception as exc:
logger.warning("Failed to save email attachment {}: {}", dest, exc)
return saved
@staticmethod
def _html_to_text(raw_html: str) -> str:
text = re.sub(r"<\s*br\s*/?>", "\n", raw_html, flags=re.IGNORECASE)

View File

@ -1,5 +1,6 @@
from email.message import EmailMessage
from datetime import date
from pathlib import Path
import imaplib
import pytest
@ -650,3 +651,224 @@ def test_check_authentication_results_method() -> None:
spf, dkim = EmailChannel._check_authentication_results(parsed)
assert spf is False
assert dkim is True
# ---------------------------------------------------------------------------
# Attachment extraction tests
# ---------------------------------------------------------------------------
def _make_raw_email_with_attachment(
from_addr: str = "alice@example.com",
subject: str = "With attachment",
body: str = "See attached.",
attachment_name: str = "doc.pdf",
attachment_content: bytes = b"%PDF-1.4 fake pdf content",
attachment_mime: str = "application/pdf",
auth_results: str | None = None,
) -> bytes:
msg = EmailMessage()
msg["From"] = from_addr
msg["To"] = "bot@example.com"
msg["Subject"] = subject
msg["Message-ID"] = "<m1@example.com>"
if auth_results:
msg["Authentication-Results"] = auth_results
msg.set_content(body)
maintype, subtype = attachment_mime.split("/", 1)
msg.add_attachment(
attachment_content,
maintype=maintype,
subtype=subtype,
filename=attachment_name,
)
return msg.as_bytes()
def test_extract_attachments_saves_pdf(tmp_path, monkeypatch) -> None:
"""PDF attachment is saved to media dir and path returned in media list."""
monkeypatch.setattr("nanobot.channels.email.get_media_dir", lambda ch: tmp_path)
raw = _make_raw_email_with_attachment()
fake = _make_fake_imap(raw)
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
cfg = _make_config(allowed_attachment_types=["application/pdf"], verify_dkim=False, verify_spf=False)
channel = EmailChannel(cfg, MessageBus())
items = channel._fetch_new_messages()
assert len(items) == 1
assert len(items[0]["media"]) == 1
saved_path = Path(items[0]["media"][0])
assert saved_path.exists()
assert saved_path.read_bytes() == b"%PDF-1.4 fake pdf content"
assert "500_doc.pdf" in saved_path.name
assert "[attachment:" in items[0]["content"]
def test_extract_attachments_disabled_by_default(monkeypatch) -> None:
"""With no allowed_attachment_types (default), no attachments are extracted."""
raw = _make_raw_email_with_attachment()
fake = _make_fake_imap(raw)
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
cfg = _make_config(verify_dkim=False, verify_spf=False)
assert cfg.allowed_attachment_types == []
channel = EmailChannel(cfg, MessageBus())
items = channel._fetch_new_messages()
assert len(items) == 1
assert items[0]["media"] == []
assert "[attachment:" not in items[0]["content"]
def test_extract_attachments_mime_type_filter(tmp_path, monkeypatch) -> None:
"""Non-allowed MIME types are skipped."""
monkeypatch.setattr("nanobot.channels.email.get_media_dir", lambda ch: tmp_path)
raw = _make_raw_email_with_attachment(
attachment_name="image.png",
attachment_content=b"\x89PNG fake",
attachment_mime="image/png",
)
fake = _make_fake_imap(raw)
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
cfg = _make_config(
allowed_attachment_types=["application/pdf"],
verify_dkim=False,
verify_spf=False,
)
channel = EmailChannel(cfg, MessageBus())
items = channel._fetch_new_messages()
assert len(items) == 1
assert items[0]["media"] == []
def test_extract_attachments_empty_allowed_types_rejects_all(tmp_path, monkeypatch) -> None:
"""Empty allowed_attachment_types means no types are accepted."""
monkeypatch.setattr("nanobot.channels.email.get_media_dir", lambda ch: tmp_path)
raw = _make_raw_email_with_attachment(
attachment_name="image.png",
attachment_content=b"\x89PNG fake",
attachment_mime="image/png",
)
fake = _make_fake_imap(raw)
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
cfg = _make_config(
allowed_attachment_types=[],
verify_dkim=False,
verify_spf=False,
)
channel = EmailChannel(cfg, MessageBus())
items = channel._fetch_new_messages()
assert len(items) == 1
assert items[0]["media"] == []
def test_extract_attachments_wildcard_pattern(tmp_path, monkeypatch) -> None:
"""Glob patterns like 'image/*' match attachment MIME types."""
monkeypatch.setattr("nanobot.channels.email.get_media_dir", lambda ch: tmp_path)
raw = _make_raw_email_with_attachment(
attachment_name="photo.jpg",
attachment_content=b"\xff\xd8\xff fake jpeg",
attachment_mime="image/jpeg",
)
fake = _make_fake_imap(raw)
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
cfg = _make_config(
allowed_attachment_types=["image/*"],
verify_dkim=False,
verify_spf=False,
)
channel = EmailChannel(cfg, MessageBus())
items = channel._fetch_new_messages()
assert len(items) == 1
assert len(items[0]["media"]) == 1
def test_extract_attachments_size_limit(tmp_path, monkeypatch) -> None:
"""Attachments exceeding max_attachment_size are skipped."""
monkeypatch.setattr("nanobot.channels.email.get_media_dir", lambda ch: tmp_path)
raw = _make_raw_email_with_attachment(
attachment_content=b"x" * 1000,
)
fake = _make_fake_imap(raw)
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
cfg = _make_config(
allowed_attachment_types=["*"],
max_attachment_size=500,
verify_dkim=False,
verify_spf=False,
)
channel = EmailChannel(cfg, MessageBus())
items = channel._fetch_new_messages()
assert len(items) == 1
assert items[0]["media"] == []
def test_extract_attachments_max_count(tmp_path, monkeypatch) -> None:
"""Only max_attachments_per_email are saved."""
monkeypatch.setattr("nanobot.channels.email.get_media_dir", lambda ch: tmp_path)
# Build email with 3 attachments
msg = EmailMessage()
msg["From"] = "alice@example.com"
msg["To"] = "bot@example.com"
msg["Subject"] = "Many attachments"
msg["Message-ID"] = "<m1@example.com>"
msg.set_content("See attached.")
for i in range(3):
msg.add_attachment(
f"content {i}".encode(),
maintype="application",
subtype="pdf",
filename=f"doc{i}.pdf",
)
raw = msg.as_bytes()
fake = _make_fake_imap(raw)
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
cfg = _make_config(
allowed_attachment_types=["*"],
max_attachments_per_email=2,
verify_dkim=False,
verify_spf=False,
)
channel = EmailChannel(cfg, MessageBus())
items = channel._fetch_new_messages()
assert len(items) == 1
assert len(items[0]["media"]) == 2
def test_extract_attachments_sanitizes_filename(tmp_path, monkeypatch) -> None:
"""Path traversal in filenames is neutralized."""
monkeypatch.setattr("nanobot.channels.email.get_media_dir", lambda ch: tmp_path)
raw = _make_raw_email_with_attachment(
attachment_name="../../../etc/passwd",
)
fake = _make_fake_imap(raw)
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
cfg = _make_config(allowed_attachment_types=["*"], verify_dkim=False, verify_spf=False)
channel = EmailChannel(cfg, MessageBus())
items = channel._fetch_new_messages()
assert len(items) == 1
assert len(items[0]["media"]) == 1
saved_path = Path(items[0]["media"][0])
# File must be inside the media dir, not escaped via path traversal
assert saved_path.parent == tmp_path