feat(email): support IMAP MOVE and UID expunge fallbacks

This commit is contained in:
Flávio Veloso Soares 2026-05-28 22:42:19 -07:00 committed by Xubin Ren
parent ec5460d23e
commit 4369eb20fc
2 changed files with 223 additions and 26 deletions

View File

@ -16,6 +16,7 @@ from email.parser import BytesParser
from email.utils import parseaddr
from fnmatch import fnmatch
from pathlib import Path
from dataclasses import dataclass
from typing import Any, Literal
from loguru import logger
@ -70,6 +71,13 @@ class EmailConfig(Base):
max_attachments_per_email: int = 5
@dataclass
class _ServerFeatures:
move: bool
uidplus: bool
uid_store: bool | None = None
class EmailChannel(BaseChannel):
"""
Email channel.
@ -635,34 +643,103 @@ class EmailChannel(BaseChannel):
return
try:
features = self._server_features(client)
# Apply all post-actions in one IMAP session. `features` also carries
# session-learned behavior (e.g. UID STORE support) so later UIDs can
# skip known-broken paths.
for uid in post_actions_uids:
if uid:
self._apply_post_action(client, uid)
self._apply_post_action(client, uid, features)
finally:
self._close_imap_client(client)
def _apply_post_action(self, client: Any, uid: str) -> None:
status, data = client.search(None, "UID", uid)
if status != "OK" or not data or not data[0]:
self.logger.warning("Post-action skipped: UID {} not found", uid)
return
imap_id = data[0].split()[0]
def _apply_post_action(
self,
client: Any,
uid: str,
features: _ServerFeatures,
) -> None:
action = self.config.post_action
if action == "delete":
client.store(imap_id, "+FLAGS", "\\Deleted")
client.expunge()
if not self._uid_store_deleted(client, uid, features):
return
self._uid_expunge_or_fallback(client, uid, features)
return
if action == "move":
target = (self.config.post_action_move_mailbox or "").strip()
status, _ = client.copy(imap_id, target)
if status != "OK":
self.logger.warning("Post-action move failed for UID {} to mailbox {}", uid, target)
if features.move:
status, _ = client.uid("MOVE", uid, target)
if status != "OK":
self.logger.warning("Post-action move failed (UID MOVE) for UID {} to mailbox {}", uid, target)
return
client.store(imap_id, "+FLAGS", "\\Deleted")
client.expunge()
status, _ = client.uid("COPY", uid, target)
if status != "OK":
self.logger.warning("Post-action move failed (UID COPY) for UID {} to mailbox {}", uid, target)
return
if not self._uid_store_deleted(client, uid, features):
return
self._uid_expunge_or_fallback(client, uid, features)
@staticmethod
def _server_features(client: Any) -> _ServerFeatures:
caps: set[str] = set()
with suppress(Exception):
status, data = client.capability()
if status == "OK" and data:
for raw in data:
if isinstance(raw, (bytes, bytearray)):
caps.update(token.upper() for token in raw.decode("utf-8", errors="ignore").split())
elif isinstance(raw, str):
caps.update(token.upper() for token in raw.split())
return _ServerFeatures(move="MOVE" in caps, uidplus="UIDPLUS" in caps)
@staticmethod
def _lookup_imap_id_by_uid(client: Any, uid: str) -> bytes | None:
# IMAP exposes two message identifiers: UID (stable) and sequence number
# (session-local). We target by UID first, but some servers may reject
# UID STORE. In that case we resolve the current sequence number for the
# UID and retry with STORE using that sequence id.
status, data = client.search(None, "UID", uid)
if status != "OK" or not data or not data[0]:
return None
return data[0].split()[0]
def _uid_store_deleted(self, client: Any, uid: str, features: _ServerFeatures) -> bool:
# Optimistic path: try UID STORE first because UID is stable and avoids
# sequence-number lookup. If this fails once for the session, remember it
# and use the sequence STORE fallback directly for remaining UIDs.
if features.uid_store is not False:
status, _ = client.uid("STORE", uid, "+FLAGS", "(\\Deleted)")
if status == "OK":
features.uid_store = True
return True
features.uid_store = False
# Compatibility fallback for servers where UID STORE is unavailable or
# unreliable: resolve the current sequence number from UID and use STORE.
imap_id = self._lookup_imap_id_by_uid(client, uid)
if not imap_id:
self.logger.warning("Post-action skipped: UID {} not found", uid)
return False
status, _ = client.store(imap_id, "+FLAGS", "\\Deleted")
if status != "OK":
self.logger.warning("Post-action failed: could not mark UID {} as deleted", uid)
return False
return True
def _uid_expunge_or_fallback(self, client: Any, uid: str, features: _ServerFeatures) -> None:
# Prefer UID-scoped expunge when supported to avoid expunging unrelated
# messages already marked \Deleted in the selected mailbox.
if features.uidplus:
status, _ = client.uid("EXPUNGE", uid)
if status == "OK":
return
self.logger.warning("UID EXPUNGE failed for UID {}, falling back to EXPUNGE", uid)
client.expunge()
@classmethod
def _is_stale_imap_error(cls, exc: Exception) -> bool:

View File

@ -171,6 +171,7 @@ def test_apply_post_actions_batch_delete_uses_one_connection(monkeypatch) -> Non
class FakeIMAP:
def __init__(self) -> None:
self.search_calls: list[tuple] = []
self.uid_calls: list[tuple] = []
self.store_calls: list[tuple[bytes, str, str]] = []
self.expunge_calls = 0
@ -186,6 +187,17 @@ def test_apply_post_actions_batch_delete_uses_one_connection(monkeypatch) -> Non
return "OK", [b"1"]
return "OK", [b"1"]
def capability(self):
return "OK", [b"IMAP4rev1 UIDPLUS"]
def uid(self, command: str, *args):
self.uid_calls.append((command, *args))
if command == "STORE":
return "OK", [b""]
if command == "EXPUNGE":
return "OK", [b""]
return "BAD", [b""]
def fetch(self, _imap_id: bytes, _parts: str):
return "OK", [(b"1 (UID 123 BODY[] {200})", raw), b")"]
@ -206,16 +218,21 @@ def test_apply_post_actions_batch_delete_uses_one_connection(monkeypatch) -> Non
channel = EmailChannel(_make_config(post_action="delete"), MessageBus())
channel._apply_post_actions_batch(["123", "124"])
assert (b"1", "+FLAGS", "\\Deleted") in fake.store_calls
assert fake.expunge_calls == 2
uid_searches = [call for call in fake.search_calls if len(call) >= 3 and call[1] == "UID"]
assert uid_searches == [(None, "UID", "123"), (None, "UID", "124")]
assert fake.store_calls == []
assert fake.expunge_calls == 0
assert fake.search_calls == []
assert fake.uid_calls == [
("STORE", "123", "+FLAGS", "(\\Deleted)"),
("EXPUNGE", "123"),
("STORE", "124", "+FLAGS", "(\\Deleted)"),
("EXPUNGE", "124"),
]
def test_apply_post_actions_batch_move_copies_then_deletes(monkeypatch) -> None:
class FakeIMAP:
def __init__(self) -> None:
self.copy_calls: list[tuple[bytes, str]] = []
self.uid_calls: list[tuple] = []
self.store_calls: list[tuple[bytes, str, str]] = []
self.expunge_calls = 0
@ -228,9 +245,18 @@ def test_apply_post_actions_batch_move_copies_then_deletes(monkeypatch) -> None:
def search(self, *_args):
return "OK", [b"1"]
def copy(self, imap_id: bytes, mailbox: str):
self.copy_calls.append((imap_id, mailbox))
return "OK", [b""]
def capability(self):
return "OK", [b"IMAP4rev1 UIDPLUS"]
def uid(self, command: str, *args):
self.uid_calls.append((command, *args))
if command == "COPY":
return "OK", [b""]
if command == "STORE":
return "OK", [b""]
if command == "EXPUNGE":
return "OK", [b""]
return "BAD", [b""]
def store(self, imap_id: bytes, op: str, flags: str):
self.store_calls.append((imap_id, op, flags))
@ -252,9 +278,103 @@ def test_apply_post_actions_batch_move_copies_then_deletes(monkeypatch) -> None:
)
channel._apply_post_actions_batch(["123"])
assert fake.copy_calls == [(b"1", "Processed")]
assert fake.store_calls == [(b"1", "+FLAGS", "\\Deleted")]
assert fake.expunge_calls == 1
assert fake.uid_calls == [
("COPY", "123", "Processed"),
("STORE", "123", "+FLAGS", "(\\Deleted)"),
("EXPUNGE", "123"),
]
assert fake.store_calls == []
assert fake.expunge_calls == 0
def test_apply_post_actions_batch_move_prefers_uid_move_when_supported(monkeypatch) -> None:
class FakeIMAP:
def __init__(self) -> None:
self.uid_calls: list[tuple] = []
def login(self, _user: str, _pw: str):
return "OK", [b"logged in"]
def select(self, _mailbox: str):
return "OK", [b"1"]
def capability(self):
return "OK", [b"IMAP4rev1 UIDPLUS MOVE"]
def uid(self, command: str, *args):
self.uid_calls.append((command, *args))
if command == "MOVE":
return "OK", [b""]
return "BAD", [b""]
def logout(self):
return "BYE", [b""]
fake = FakeIMAP()
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
channel = EmailChannel(
_make_config(post_action="move", post_action_move_mailbox="Processed"),
MessageBus(),
)
channel._apply_post_actions_batch(["123"])
assert fake.uid_calls == [("MOVE", "123", "Processed")]
def test_apply_post_actions_batch_fallback_caches_uid_store_failure(monkeypatch) -> None:
class FakeIMAP:
def __init__(self) -> None:
self.uid_calls: list[tuple] = []
self.search_calls: list[tuple] = []
self.store_calls: list[tuple[bytes, str, str]] = []
self.expunge_calls = 0
def login(self, _user: str, _pw: str):
return "OK", [b"logged in"]
def select(self, _mailbox: str):
return "OK", [b"2"]
def capability(self):
return "OK", [b"IMAP4rev1"]
def uid(self, command: str, *args):
self.uid_calls.append((command, *args))
if command == "STORE":
return "NO", [b"unsupported"]
return "BAD", [b""]
def search(self, *_args):
self.search_calls.append(_args)
if _args == (None, "UID", "123"):
return "OK", [b"1"]
if _args == (None, "UID", "124"):
return "OK", [b"2"]
return "NO", [b""]
def store(self, imap_id: bytes, op: str, flags: str):
self.store_calls.append((imap_id, op, flags))
return "OK", [b""]
def expunge(self):
self.expunge_calls += 1
return "OK", [b""]
def logout(self):
return "BYE", [b""]
fake = FakeIMAP()
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
channel = EmailChannel(_make_config(post_action="delete"), MessageBus())
channel._apply_post_actions_batch(["123", "124"])
# UID STORE should be attempted only once, then cached as unsupported.
assert [call for call in fake.uid_calls if call[0] == "STORE"] == [("STORE", "123", "+FLAGS", "(\\Deleted)")]
assert fake.search_calls == [(None, "UID", "123"), (None, "UID", "124")]
assert fake.store_calls == [(b"1", "+FLAGS", "\\Deleted"), (b"2", "+FLAGS", "\\Deleted")]
assert fake.expunge_calls == 2
@pytest.mark.asyncio