fix: verify Authentication-Results (SPF/DKIM) for inbound emails

This commit is contained in:
idealist17 2026-03-10 16:45:06 +08:00
parent a1b5f21b8b
commit 6e428b7939
3 changed files with 216 additions and 3 deletions

View File

@ -71,6 +71,12 @@ class EmailChannel(BaseChannel):
return
self._running = True
if not self.config.verify_dkim and not self.config.verify_spf:
logger.warning(
"Email channel: DKIM and SPF verification are both DISABLED. "
"Emails with spoofed From headers will be accepted. "
"Set verify_dkim=true and verify_spf=true for anti-spoofing protection."
)
logger.info("Starting Email channel (IMAP polling mode)...")
poll_seconds = max(5, int(self.config.poll_interval_seconds))
@ -270,6 +276,23 @@ class EmailChannel(BaseChannel):
if not sender:
continue
# --- Anti-spoofing: verify Authentication-Results ---
spf_pass, dkim_pass = self._check_authentication_results(parsed)
if self.config.verify_spf and not spf_pass:
logger.warning(
"Email from {} rejected: SPF verification failed "
"(no 'spf=pass' in Authentication-Results header)",
sender,
)
continue
if self.config.verify_dkim and not dkim_pass:
logger.warning(
"Email from {} rejected: DKIM verification failed "
"(no 'dkim=pass' in Authentication-Results header)",
sender,
)
continue
subject = self._decode_header_value(parsed.get("Subject", ""))
date_value = parsed.get("Date", "")
message_id = parsed.get("Message-ID", "").strip()
@ -280,7 +303,7 @@ class EmailChannel(BaseChannel):
body = body[: self.config.max_body_chars]
content = (
f"Email received.\n"
f"[EMAIL-CONTEXT] Email received.\n"
f"From: {sender}\n"
f"Subject: {subject}\n"
f"Date: {date_value}\n\n"
@ -393,6 +416,23 @@ class EmailChannel(BaseChannel):
return cls._html_to_text(payload).strip()
return payload.strip()
@staticmethod
def _check_authentication_results(parsed_msg: Any) -> tuple[bool, bool]:
"""Parse Authentication-Results headers for SPF and DKIM verdicts.
Returns:
A tuple of (spf_pass, dkim_pass) booleans.
"""
spf_pass = False
dkim_pass = False
for ar_header in parsed_msg.get_all("Authentication-Results") or []:
ar_lower = ar_header.lower()
if re.search(r"\bspf\s*=\s*pass\b", ar_lower):
spf_pass = True
if re.search(r"\bdkim\s*=\s*pass\b", ar_lower):
dkim_pass = True
return spf_pass, dkim_pass
@staticmethod
def _html_to_text(raw_html: str) -> str:
text = re.sub(r"<\s*br\s*/?>", "\n", raw_html, flags=re.IGNORECASE)

View File

@ -124,6 +124,10 @@ class EmailConfig(Base):
subject_prefix: str = "Re: "
allow_from: list[str] = Field(default_factory=list) # Allowed sender email addresses
# Email authentication verification (anti-spoofing)
verify_dkim: bool = True # Require Authentication-Results with dkim=pass
verify_spf: bool = True # Require Authentication-Results with spf=pass
class MochatMentionConfig(Base):
"""Mochat mention behavior configuration."""

View File

@ -9,8 +9,8 @@ from nanobot.channels.email import EmailChannel
from nanobot.config.schema import EmailConfig
def _make_config() -> EmailConfig:
return EmailConfig(
def _make_config(**overrides) -> EmailConfig:
defaults = dict(
enabled=True,
consent_granted=True,
imap_host="imap.example.com",
@ -22,19 +22,27 @@ def _make_config() -> EmailConfig:
smtp_username="bot@example.com",
smtp_password="secret",
mark_seen=True,
# Disable auth verification by default so existing tests are unaffected
verify_dkim=False,
verify_spf=False,
)
defaults.update(overrides)
return EmailConfig(**defaults)
def _make_raw_email(
from_addr: str = "alice@example.com",
subject: str = "Hello",
body: str = "This is the body.",
auth_results: str | None = None,
) -> bytes:
msg = EmailMessage()
msg["From"] = from_addr
msg["To"] = "bot@example.com"
msg["Subject"] = subject
msg["Message-ID"] = "<m1@example.com>"
if auth_results:
msg["Authentication-Results"] = auth_results
msg.set_content(body)
return msg.as_bytes()
@ -366,3 +374,164 @@ def test_fetch_messages_between_dates_uses_imap_since_before_without_mark_seen(m
assert fake.search_args is not None
assert fake.search_args[1:] == ("SINCE", "06-Feb-2026", "BEFORE", "07-Feb-2026")
assert fake.store_calls == []
# ---------------------------------------------------------------------------
# Security: Anti-spoofing tests for Authentication-Results verification
# ---------------------------------------------------------------------------
def _make_fake_imap(raw: bytes):
"""Return a FakeIMAP class pre-loaded with the given raw email."""
class FakeIMAP:
def __init__(self) -> None:
self.store_calls: list[tuple[bytes, str, str]] = []
def login(self, _user: str, _pw: str):
return "OK", [b"logged in"]
def select(self, _mailbox: str):
return "OK", [b"1"]
def search(self, *_args):
return "OK", [b"1"]
def fetch(self, _imap_id: bytes, _parts: str):
return "OK", [(b"1 (UID 500 BODY[] {200})", raw), b")"]
def store(self, imap_id: bytes, op: str, flags: str):
self.store_calls.append((imap_id, op, flags))
return "OK", [b""]
def logout(self):
return "BYE", [b""]
return FakeIMAP()
def test_spoofed_email_rejected_when_verify_enabled(monkeypatch) -> None:
"""An email without Authentication-Results should be rejected when verify_dkim=True."""
raw = _make_raw_email(subject="Spoofed", body="Malicious payload")
fake = _make_fake_imap(raw)
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
cfg = _make_config(verify_dkim=True, verify_spf=True)
channel = EmailChannel(cfg, MessageBus())
items = channel._fetch_new_messages()
assert len(items) == 0, "Spoofed email without auth headers should be rejected"
def test_email_with_valid_auth_results_accepted(monkeypatch) -> None:
"""An email with spf=pass and dkim=pass should be accepted."""
raw = _make_raw_email(
subject="Legit",
body="Hello from verified sender",
auth_results="mx.example.com; spf=pass smtp.mailfrom=alice@example.com; dkim=pass header.d=example.com",
)
fake = _make_fake_imap(raw)
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
cfg = _make_config(verify_dkim=True, verify_spf=True)
channel = EmailChannel(cfg, MessageBus())
items = channel._fetch_new_messages()
assert len(items) == 1
assert items[0]["sender"] == "alice@example.com"
assert items[0]["subject"] == "Legit"
def test_email_with_partial_auth_rejected(monkeypatch) -> None:
"""An email with only spf=pass but no dkim=pass should be rejected when verify_dkim=True."""
raw = _make_raw_email(
subject="Partial",
body="Only SPF passes",
auth_results="mx.example.com; spf=pass smtp.mailfrom=alice@example.com; dkim=fail",
)
fake = _make_fake_imap(raw)
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
cfg = _make_config(verify_dkim=True, verify_spf=True)
channel = EmailChannel(cfg, MessageBus())
items = channel._fetch_new_messages()
assert len(items) == 0, "Email with dkim=fail should be rejected"
def test_backward_compat_verify_disabled(monkeypatch) -> None:
"""When verify_dkim=False and verify_spf=False, emails without auth headers are accepted."""
raw = _make_raw_email(subject="NoAuth", body="No auth headers present")
fake = _make_fake_imap(raw)
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
cfg = _make_config(verify_dkim=False, verify_spf=False)
channel = EmailChannel(cfg, MessageBus())
items = channel._fetch_new_messages()
assert len(items) == 1, "With verification disabled, emails should be accepted as before"
def test_email_content_tagged_with_email_context(monkeypatch) -> None:
"""Email content should be prefixed with [EMAIL-CONTEXT] for LLM isolation."""
raw = _make_raw_email(subject="Tagged", body="Check the tag")
fake = _make_fake_imap(raw)
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
cfg = _make_config(verify_dkim=False, verify_spf=False)
channel = EmailChannel(cfg, MessageBus())
items = channel._fetch_new_messages()
assert len(items) == 1
assert items[0]["content"].startswith("[EMAIL-CONTEXT]"), (
"Email content must be tagged with [EMAIL-CONTEXT]"
)
def test_check_authentication_results_method() -> None:
"""Unit test for the _check_authentication_results static method."""
from email.parser import BytesParser
from email import policy
# No Authentication-Results header
msg_no_auth = EmailMessage()
msg_no_auth["From"] = "alice@example.com"
msg_no_auth.set_content("test")
parsed = BytesParser(policy=policy.default).parsebytes(msg_no_auth.as_bytes())
spf, dkim = EmailChannel._check_authentication_results(parsed)
assert spf is False
assert dkim is False
# Both pass
msg_both = EmailMessage()
msg_both["From"] = "alice@example.com"
msg_both["Authentication-Results"] = (
"mx.google.com; spf=pass smtp.mailfrom=example.com; dkim=pass header.d=example.com"
)
msg_both.set_content("test")
parsed = BytesParser(policy=policy.default).parsebytes(msg_both.as_bytes())
spf, dkim = EmailChannel._check_authentication_results(parsed)
assert spf is True
assert dkim is True
# SPF pass, DKIM fail
msg_spf_only = EmailMessage()
msg_spf_only["From"] = "alice@example.com"
msg_spf_only["Authentication-Results"] = (
"mx.google.com; spf=pass smtp.mailfrom=example.com; dkim=fail"
)
msg_spf_only.set_content("test")
parsed = BytesParser(policy=policy.default).parsebytes(msg_spf_only.as_bytes())
spf, dkim = EmailChannel._check_authentication_results(parsed)
assert spf is True
assert dkim is False
# DKIM pass, SPF fail
msg_dkim_only = EmailMessage()
msg_dkim_only["From"] = "alice@example.com"
msg_dkim_only["Authentication-Results"] = (
"mx.google.com; spf=fail smtp.mailfrom=example.com; dkim=pass header.d=example.com"
)
msg_dkim_only.set_content("test")
parsed = BytesParser(policy=policy.default).parsebytes(msg_dkim_only.as_bytes())
spf, dkim = EmailChannel._check_authentication_results(parsed)
assert spf is False
assert dkim is True