diff --git a/nanobot/channels/email.py b/nanobot/channels/email.py index 21d175bfa..611fdb357 100644 --- a/nanobot/channels/email.py +++ b/nanobot/channels/email.py @@ -16,6 +16,7 @@ from email.parser import BytesParser from email.utils import parseaddr from fnmatch import fnmatch from pathlib import Path +from dataclasses import dataclass from typing import Any, Literal from loguru import logger @@ -70,6 +71,13 @@ class EmailConfig(Base): max_attachments_per_email: int = 5 +@dataclass +class _ServerFeatures: + move: bool + uidplus: bool + uid_store: bool | None = None + + class EmailChannel(BaseChannel): """ Email channel. @@ -635,34 +643,103 @@ class EmailChannel(BaseChannel): return try: + features = self._server_features(client) + # Apply all post-actions in one IMAP session. `features` also carries + # session-learned behavior (e.g. UID STORE support) so later UIDs can + # skip known-broken paths. for uid in post_actions_uids: if uid: - self._apply_post_action(client, uid) + self._apply_post_action(client, uid, features) finally: self._close_imap_client(client) - def _apply_post_action(self, client: Any, uid: str) -> None: - status, data = client.search(None, "UID", uid) - if status != "OK" or not data or not data[0]: - self.logger.warning("Post-action skipped: UID {} not found", uid) - return - - imap_id = data[0].split()[0] + def _apply_post_action( + self, + client: Any, + uid: str, + features: _ServerFeatures, + ) -> None: action = self.config.post_action if action == "delete": - client.store(imap_id, "+FLAGS", "\\Deleted") - client.expunge() + if not self._uid_store_deleted(client, uid, features): + return + self._uid_expunge_or_fallback(client, uid, features) return if action == "move": target = (self.config.post_action_move_mailbox or "").strip() - status, _ = client.copy(imap_id, target) - if status != "OK": - self.logger.warning("Post-action move failed for UID {} to mailbox {}", uid, target) + if features.move: + status, _ = client.uid("MOVE", uid, target) + if status != "OK": + self.logger.warning("Post-action move failed (UID MOVE) for UID {} to mailbox {}", uid, target) return - client.store(imap_id, "+FLAGS", "\\Deleted") - client.expunge() + + status, _ = client.uid("COPY", uid, target) + if status != "OK": + self.logger.warning("Post-action move failed (UID COPY) for UID {} to mailbox {}", uid, target) + return + if not self._uid_store_deleted(client, uid, features): + return + self._uid_expunge_or_fallback(client, uid, features) + + @staticmethod + def _server_features(client: Any) -> _ServerFeatures: + caps: set[str] = set() + with suppress(Exception): + status, data = client.capability() + if status == "OK" and data: + for raw in data: + if isinstance(raw, (bytes, bytearray)): + caps.update(token.upper() for token in raw.decode("utf-8", errors="ignore").split()) + elif isinstance(raw, str): + caps.update(token.upper() for token in raw.split()) + return _ServerFeatures(move="MOVE" in caps, uidplus="UIDPLUS" in caps) + + @staticmethod + def _lookup_imap_id_by_uid(client: Any, uid: str) -> bytes | None: + # IMAP exposes two message identifiers: UID (stable) and sequence number + # (session-local). We target by UID first, but some servers may reject + # UID STORE. In that case we resolve the current sequence number for the + # UID and retry with STORE using that sequence id. + status, data = client.search(None, "UID", uid) + if status != "OK" or not data or not data[0]: + return None + return data[0].split()[0] + + def _uid_store_deleted(self, client: Any, uid: str, features: _ServerFeatures) -> bool: + # Optimistic path: try UID STORE first because UID is stable and avoids + # sequence-number lookup. If this fails once for the session, remember it + # and use the sequence STORE fallback directly for remaining UIDs. + if features.uid_store is not False: + status, _ = client.uid("STORE", uid, "+FLAGS", "(\\Deleted)") + if status == "OK": + features.uid_store = True + return True + features.uid_store = False + + # Compatibility fallback for servers where UID STORE is unavailable or + # unreliable: resolve the current sequence number from UID and use STORE. + imap_id = self._lookup_imap_id_by_uid(client, uid) + if not imap_id: + self.logger.warning("Post-action skipped: UID {} not found", uid) + return False + + status, _ = client.store(imap_id, "+FLAGS", "\\Deleted") + if status != "OK": + self.logger.warning("Post-action failed: could not mark UID {} as deleted", uid) + return False + return True + + def _uid_expunge_or_fallback(self, client: Any, uid: str, features: _ServerFeatures) -> None: + # Prefer UID-scoped expunge when supported to avoid expunging unrelated + # messages already marked \Deleted in the selected mailbox. + if features.uidplus: + status, _ = client.uid("EXPUNGE", uid) + if status == "OK": + return + self.logger.warning("UID EXPUNGE failed for UID {}, falling back to EXPUNGE", uid) + client.expunge() @classmethod def _is_stale_imap_error(cls, exc: Exception) -> bool: diff --git a/tests/channels/test_email_channel.py b/tests/channels/test_email_channel.py index c1fa1f8e5..e6ffff5a5 100644 --- a/tests/channels/test_email_channel.py +++ b/tests/channels/test_email_channel.py @@ -171,6 +171,7 @@ def test_apply_post_actions_batch_delete_uses_one_connection(monkeypatch) -> Non class FakeIMAP: def __init__(self) -> None: self.search_calls: list[tuple] = [] + self.uid_calls: list[tuple] = [] self.store_calls: list[tuple[bytes, str, str]] = [] self.expunge_calls = 0 @@ -186,6 +187,17 @@ def test_apply_post_actions_batch_delete_uses_one_connection(monkeypatch) -> Non return "OK", [b"1"] return "OK", [b"1"] + def capability(self): + return "OK", [b"IMAP4rev1 UIDPLUS"] + + def uid(self, command: str, *args): + self.uid_calls.append((command, *args)) + if command == "STORE": + return "OK", [b""] + if command == "EXPUNGE": + return "OK", [b""] + return "BAD", [b""] + def fetch(self, _imap_id: bytes, _parts: str): return "OK", [(b"1 (UID 123 BODY[] {200})", raw), b")"] @@ -206,16 +218,21 @@ def test_apply_post_actions_batch_delete_uses_one_connection(monkeypatch) -> Non channel = EmailChannel(_make_config(post_action="delete"), MessageBus()) channel._apply_post_actions_batch(["123", "124"]) - assert (b"1", "+FLAGS", "\\Deleted") in fake.store_calls - assert fake.expunge_calls == 2 - uid_searches = [call for call in fake.search_calls if len(call) >= 3 and call[1] == "UID"] - assert uid_searches == [(None, "UID", "123"), (None, "UID", "124")] + assert fake.store_calls == [] + assert fake.expunge_calls == 0 + assert fake.search_calls == [] + assert fake.uid_calls == [ + ("STORE", "123", "+FLAGS", "(\\Deleted)"), + ("EXPUNGE", "123"), + ("STORE", "124", "+FLAGS", "(\\Deleted)"), + ("EXPUNGE", "124"), + ] def test_apply_post_actions_batch_move_copies_then_deletes(monkeypatch) -> None: class FakeIMAP: def __init__(self) -> None: - self.copy_calls: list[tuple[bytes, str]] = [] + self.uid_calls: list[tuple] = [] self.store_calls: list[tuple[bytes, str, str]] = [] self.expunge_calls = 0 @@ -228,9 +245,18 @@ def test_apply_post_actions_batch_move_copies_then_deletes(monkeypatch) -> None: def search(self, *_args): return "OK", [b"1"] - def copy(self, imap_id: bytes, mailbox: str): - self.copy_calls.append((imap_id, mailbox)) - return "OK", [b""] + def capability(self): + return "OK", [b"IMAP4rev1 UIDPLUS"] + + def uid(self, command: str, *args): + self.uid_calls.append((command, *args)) + if command == "COPY": + return "OK", [b""] + if command == "STORE": + return "OK", [b""] + if command == "EXPUNGE": + return "OK", [b""] + return "BAD", [b""] def store(self, imap_id: bytes, op: str, flags: str): self.store_calls.append((imap_id, op, flags)) @@ -252,9 +278,103 @@ def test_apply_post_actions_batch_move_copies_then_deletes(monkeypatch) -> None: ) channel._apply_post_actions_batch(["123"]) - assert fake.copy_calls == [(b"1", "Processed")] - assert fake.store_calls == [(b"1", "+FLAGS", "\\Deleted")] - assert fake.expunge_calls == 1 + assert fake.uid_calls == [ + ("COPY", "123", "Processed"), + ("STORE", "123", "+FLAGS", "(\\Deleted)"), + ("EXPUNGE", "123"), + ] + assert fake.store_calls == [] + assert fake.expunge_calls == 0 + + +def test_apply_post_actions_batch_move_prefers_uid_move_when_supported(monkeypatch) -> None: + class FakeIMAP: + def __init__(self) -> None: + self.uid_calls: list[tuple] = [] + + def login(self, _user: str, _pw: str): + return "OK", [b"logged in"] + + def select(self, _mailbox: str): + return "OK", [b"1"] + + def capability(self): + return "OK", [b"IMAP4rev1 UIDPLUS MOVE"] + + def uid(self, command: str, *args): + self.uid_calls.append((command, *args)) + if command == "MOVE": + return "OK", [b""] + return "BAD", [b""] + + def logout(self): + return "BYE", [b""] + + fake = FakeIMAP() + monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake) + + channel = EmailChannel( + _make_config(post_action="move", post_action_move_mailbox="Processed"), + MessageBus(), + ) + channel._apply_post_actions_batch(["123"]) + + assert fake.uid_calls == [("MOVE", "123", "Processed")] + + +def test_apply_post_actions_batch_fallback_caches_uid_store_failure(monkeypatch) -> None: + class FakeIMAP: + def __init__(self) -> None: + self.uid_calls: list[tuple] = [] + self.search_calls: list[tuple] = [] + self.store_calls: list[tuple[bytes, str, str]] = [] + self.expunge_calls = 0 + + def login(self, _user: str, _pw: str): + return "OK", [b"logged in"] + + def select(self, _mailbox: str): + return "OK", [b"2"] + + def capability(self): + return "OK", [b"IMAP4rev1"] + + def uid(self, command: str, *args): + self.uid_calls.append((command, *args)) + if command == "STORE": + return "NO", [b"unsupported"] + return "BAD", [b""] + + def search(self, *_args): + self.search_calls.append(_args) + if _args == (None, "UID", "123"): + return "OK", [b"1"] + if _args == (None, "UID", "124"): + return "OK", [b"2"] + return "NO", [b""] + + def store(self, imap_id: bytes, op: str, flags: str): + self.store_calls.append((imap_id, op, flags)) + return "OK", [b""] + + def expunge(self): + self.expunge_calls += 1 + return "OK", [b""] + + def logout(self): + return "BYE", [b""] + + fake = FakeIMAP() + monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake) + + channel = EmailChannel(_make_config(post_action="delete"), MessageBus()) + channel._apply_post_actions_batch(["123", "124"]) + + # UID STORE should be attempted only once, then cached as unsupported. + assert [call for call in fake.uid_calls if call[0] == "STORE"] == [("STORE", "123", "+FLAGS", "(\\Deleted)")] + assert fake.search_calls == [(None, "UID", "123"), (None, "UID", "124")] + assert fake.store_calls == [(b"1", "+FLAGS", "\\Deleted"), (b"2", "+FLAGS", "\\Deleted")] + assert fake.expunge_calls == 2 @pytest.mark.asyncio