From 6e428b7939473ff7628303e35c52de8d0aabc51c Mon Sep 17 00:00:00 2001 From: idealist17 <1062142957@qq.com> Date: Tue, 10 Mar 2026 16:45:06 +0800 Subject: [PATCH] fix: verify Authentication-Results (SPF/DKIM) for inbound emails --- nanobot/channels/email.py | 42 ++++++++- nanobot/config/schema.py | 4 + tests/test_email_channel.py | 173 +++++++++++++++++++++++++++++++++++- 3 files changed, 216 insertions(+), 3 deletions(-) diff --git a/nanobot/channels/email.py b/nanobot/channels/email.py index 16771fb64..9e2ff4487 100644 --- a/nanobot/channels/email.py +++ b/nanobot/channels/email.py @@ -71,6 +71,12 @@ class EmailChannel(BaseChannel): return self._running = True + if not self.config.verify_dkim and not self.config.verify_spf: + logger.warning( + "Email channel: DKIM and SPF verification are both DISABLED. " + "Emails with spoofed From headers will be accepted. " + "Set verify_dkim=true and verify_spf=true for anti-spoofing protection." + ) logger.info("Starting Email channel (IMAP polling mode)...") poll_seconds = max(5, int(self.config.poll_interval_seconds)) @@ -270,6 +276,23 @@ class EmailChannel(BaseChannel): if not sender: continue + # --- Anti-spoofing: verify Authentication-Results --- + spf_pass, dkim_pass = self._check_authentication_results(parsed) + if self.config.verify_spf and not spf_pass: + logger.warning( + "Email from {} rejected: SPF verification failed " + "(no 'spf=pass' in Authentication-Results header)", + sender, + ) + continue + if self.config.verify_dkim and not dkim_pass: + logger.warning( + "Email from {} rejected: DKIM verification failed " + "(no 'dkim=pass' in Authentication-Results header)", + sender, + ) + continue + subject = self._decode_header_value(parsed.get("Subject", "")) date_value = parsed.get("Date", "") message_id = parsed.get("Message-ID", "").strip() @@ -280,7 +303,7 @@ class EmailChannel(BaseChannel): body = body[: self.config.max_body_chars] content = ( - f"Email received.\n" + f"[EMAIL-CONTEXT] Email received.\n" f"From: {sender}\n" f"Subject: {subject}\n" f"Date: {date_value}\n\n" @@ -393,6 +416,23 @@ class EmailChannel(BaseChannel): return cls._html_to_text(payload).strip() return payload.strip() + @staticmethod + def _check_authentication_results(parsed_msg: Any) -> tuple[bool, bool]: + """Parse Authentication-Results headers for SPF and DKIM verdicts. + + Returns: + A tuple of (spf_pass, dkim_pass) booleans. + """ + spf_pass = False + dkim_pass = False + for ar_header in parsed_msg.get_all("Authentication-Results") or []: + ar_lower = ar_header.lower() + if re.search(r"\bspf\s*=\s*pass\b", ar_lower): + spf_pass = True + if re.search(r"\bdkim\s*=\s*pass\b", ar_lower): + dkim_pass = True + return spf_pass, dkim_pass + @staticmethod def _html_to_text(raw_html: str) -> str: text = re.sub(r"<\s*br\s*/?>", "\n", raw_html, flags=re.IGNORECASE) diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index 8cfcad672..e3953b91c 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -124,6 +124,10 @@ class EmailConfig(Base): subject_prefix: str = "Re: " allow_from: list[str] = Field(default_factory=list) # Allowed sender email addresses + # Email authentication verification (anti-spoofing) + verify_dkim: bool = True # Require Authentication-Results with dkim=pass + verify_spf: bool = True # Require Authentication-Results with spf=pass + class MochatMentionConfig(Base): """Mochat mention behavior configuration.""" diff --git a/tests/test_email_channel.py b/tests/test_email_channel.py index adf35a850..808c8f6fd 100644 --- a/tests/test_email_channel.py +++ b/tests/test_email_channel.py @@ -9,8 +9,8 @@ from nanobot.channels.email import EmailChannel from nanobot.config.schema import EmailConfig -def _make_config() -> EmailConfig: - return EmailConfig( +def _make_config(**overrides) -> EmailConfig: + defaults = dict( enabled=True, consent_granted=True, imap_host="imap.example.com", @@ -22,19 +22,27 @@ def _make_config() -> EmailConfig: smtp_username="bot@example.com", smtp_password="secret", mark_seen=True, + # Disable auth verification by default so existing tests are unaffected + verify_dkim=False, + verify_spf=False, ) + defaults.update(overrides) + return EmailConfig(**defaults) def _make_raw_email( from_addr: str = "alice@example.com", subject: str = "Hello", body: str = "This is the body.", + auth_results: str | None = None, ) -> bytes: msg = EmailMessage() msg["From"] = from_addr msg["To"] = "bot@example.com" msg["Subject"] = subject msg["Message-ID"] = "" + if auth_results: + msg["Authentication-Results"] = auth_results msg.set_content(body) return msg.as_bytes() @@ -366,3 +374,164 @@ def test_fetch_messages_between_dates_uses_imap_since_before_without_mark_seen(m assert fake.search_args is not None assert fake.search_args[1:] == ("SINCE", "06-Feb-2026", "BEFORE", "07-Feb-2026") assert fake.store_calls == [] + + +# --------------------------------------------------------------------------- +# Security: Anti-spoofing tests for Authentication-Results verification +# --------------------------------------------------------------------------- + +def _make_fake_imap(raw: bytes): + """Return a FakeIMAP class pre-loaded with the given raw email.""" + class FakeIMAP: + def __init__(self) -> None: + self.store_calls: list[tuple[bytes, str, str]] = [] + + def login(self, _user: str, _pw: str): + return "OK", [b"logged in"] + + def select(self, _mailbox: str): + return "OK", [b"1"] + + def search(self, *_args): + return "OK", [b"1"] + + def fetch(self, _imap_id: bytes, _parts: str): + return "OK", [(b"1 (UID 500 BODY[] {200})", raw), b")"] + + def store(self, imap_id: bytes, op: str, flags: str): + self.store_calls.append((imap_id, op, flags)) + return "OK", [b""] + + def logout(self): + return "BYE", [b""] + + return FakeIMAP() + + +def test_spoofed_email_rejected_when_verify_enabled(monkeypatch) -> None: + """An email without Authentication-Results should be rejected when verify_dkim=True.""" + raw = _make_raw_email(subject="Spoofed", body="Malicious payload") + fake = _make_fake_imap(raw) + monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake) + + cfg = _make_config(verify_dkim=True, verify_spf=True) + channel = EmailChannel(cfg, MessageBus()) + items = channel._fetch_new_messages() + + assert len(items) == 0, "Spoofed email without auth headers should be rejected" + + +def test_email_with_valid_auth_results_accepted(monkeypatch) -> None: + """An email with spf=pass and dkim=pass should be accepted.""" + raw = _make_raw_email( + subject="Legit", + body="Hello from verified sender", + auth_results="mx.example.com; spf=pass smtp.mailfrom=alice@example.com; dkim=pass header.d=example.com", + ) + fake = _make_fake_imap(raw) + monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake) + + cfg = _make_config(verify_dkim=True, verify_spf=True) + channel = EmailChannel(cfg, MessageBus()) + items = channel._fetch_new_messages() + + assert len(items) == 1 + assert items[0]["sender"] == "alice@example.com" + assert items[0]["subject"] == "Legit" + + +def test_email_with_partial_auth_rejected(monkeypatch) -> None: + """An email with only spf=pass but no dkim=pass should be rejected when verify_dkim=True.""" + raw = _make_raw_email( + subject="Partial", + body="Only SPF passes", + auth_results="mx.example.com; spf=pass smtp.mailfrom=alice@example.com; dkim=fail", + ) + fake = _make_fake_imap(raw) + monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake) + + cfg = _make_config(verify_dkim=True, verify_spf=True) + channel = EmailChannel(cfg, MessageBus()) + items = channel._fetch_new_messages() + + assert len(items) == 0, "Email with dkim=fail should be rejected" + + +def test_backward_compat_verify_disabled(monkeypatch) -> None: + """When verify_dkim=False and verify_spf=False, emails without auth headers are accepted.""" + raw = _make_raw_email(subject="NoAuth", body="No auth headers present") + fake = _make_fake_imap(raw) + monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake) + + cfg = _make_config(verify_dkim=False, verify_spf=False) + channel = EmailChannel(cfg, MessageBus()) + items = channel._fetch_new_messages() + + assert len(items) == 1, "With verification disabled, emails should be accepted as before" + + +def test_email_content_tagged_with_email_context(monkeypatch) -> None: + """Email content should be prefixed with [EMAIL-CONTEXT] for LLM isolation.""" + raw = _make_raw_email(subject="Tagged", body="Check the tag") + fake = _make_fake_imap(raw) + monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake) + + cfg = _make_config(verify_dkim=False, verify_spf=False) + channel = EmailChannel(cfg, MessageBus()) + items = channel._fetch_new_messages() + + assert len(items) == 1 + assert items[0]["content"].startswith("[EMAIL-CONTEXT]"), ( + "Email content must be tagged with [EMAIL-CONTEXT]" + ) + + +def test_check_authentication_results_method() -> None: + """Unit test for the _check_authentication_results static method.""" + from email.parser import BytesParser + from email import policy + + # No Authentication-Results header + msg_no_auth = EmailMessage() + msg_no_auth["From"] = "alice@example.com" + msg_no_auth.set_content("test") + parsed = BytesParser(policy=policy.default).parsebytes(msg_no_auth.as_bytes()) + spf, dkim = EmailChannel._check_authentication_results(parsed) + assert spf is False + assert dkim is False + + # Both pass + msg_both = EmailMessage() + msg_both["From"] = "alice@example.com" + msg_both["Authentication-Results"] = ( + "mx.google.com; spf=pass smtp.mailfrom=example.com; dkim=pass header.d=example.com" + ) + msg_both.set_content("test") + parsed = BytesParser(policy=policy.default).parsebytes(msg_both.as_bytes()) + spf, dkim = EmailChannel._check_authentication_results(parsed) + assert spf is True + assert dkim is True + + # SPF pass, DKIM fail + msg_spf_only = EmailMessage() + msg_spf_only["From"] = "alice@example.com" + msg_spf_only["Authentication-Results"] = ( + "mx.google.com; spf=pass smtp.mailfrom=example.com; dkim=fail" + ) + msg_spf_only.set_content("test") + parsed = BytesParser(policy=policy.default).parsebytes(msg_spf_only.as_bytes()) + spf, dkim = EmailChannel._check_authentication_results(parsed) + assert spf is True + assert dkim is False + + # DKIM pass, SPF fail + msg_dkim_only = EmailMessage() + msg_dkim_only["From"] = "alice@example.com" + msg_dkim_only["Authentication-Results"] = ( + "mx.google.com; spf=fail smtp.mailfrom=example.com; dkim=pass header.d=example.com" + ) + msg_dkim_only.set_content("test") + parsed = BytesParser(policy=policy.default).parsebytes(msg_dkim_only.as_bytes()) + spf, dkim = EmailChannel._check_authentication_results(parsed) + assert spf is False + assert dkim is True