mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-06-15 07:14:08 +00:00
feat(email): add configurable post-action handling
This commit is contained in:
parent
85ab55aeee
commit
ec5460d23e
@ -577,6 +577,10 @@ Give nanobot its own email account. It polls **IMAP** for incoming mail and repl
|
|||||||
> - `allowFrom`: Add your email address. Use `["*"]` to accept emails from anyone.
|
> - `allowFrom`: Add your email address. Use `["*"]` to accept emails from anyone.
|
||||||
> - `smtpUseTls` and `smtpUseSsl` default to `true` / `false` respectively, which is correct for Gmail (port 587 + STARTTLS). No need to set them explicitly.
|
> - `smtpUseTls` and `smtpUseSsl` default to `true` / `false` respectively, which is correct for Gmail (port 587 + STARTTLS). No need to set them explicitly.
|
||||||
> - Set `"autoReplyEnabled": false` if you only want to read/analyze emails without sending automatic replies.
|
> - Set `"autoReplyEnabled": false` if you only want to read/analyze emails without sending automatic replies.
|
||||||
|
> - `postAction`: Optional post-processing for processed emails: `"delete"` or `"move"` (default `null`).
|
||||||
|
> This runs only after an accepted email is successfully delivered to the AI pipeline.
|
||||||
|
> - `postActionMoveMailbox`: Destination mailbox used when `postAction` is `"move"` (for example `"Processed"` or `"[Gmail]/Trash"`).
|
||||||
|
> - `postActionIgnoreSkipped`: If `true` (default), skipped emails are ignored for post-action and not moved/deleted.
|
||||||
> - `allowedAttachmentTypes`: Save inbound attachments matching these MIME types — `["*"]` for all, e.g. `["application/pdf", "image/*"]` (default `[]` = disabled).
|
> - `allowedAttachmentTypes`: Save inbound attachments matching these MIME types — `["*"]` for all, e.g. `["application/pdf", "image/*"]` (default `[]` = disabled).
|
||||||
> - `maxAttachmentSize`: Max size per attachment in bytes (default `2000000` / 2MB).
|
> - `maxAttachmentSize`: Max size per attachment in bytes (default `2000000` / 2MB).
|
||||||
> - `maxAttachmentsPerEmail`: Max attachments to save per email (default `5`).
|
> - `maxAttachmentsPerEmail`: Max attachments to save per email (default `5`).
|
||||||
@ -597,6 +601,9 @@ Give nanobot its own email account. It polls **IMAP** for incoming mail and repl
|
|||||||
"smtpPassword": "your-app-password",
|
"smtpPassword": "your-app-password",
|
||||||
"fromAddress": "my-nanobot@gmail.com",
|
"fromAddress": "my-nanobot@gmail.com",
|
||||||
"allowFrom": ["your-real-email@gmail.com"],
|
"allowFrom": ["your-real-email@gmail.com"],
|
||||||
|
"postAction": "move",
|
||||||
|
"postActionMoveMailbox": "[Gmail]/Trash",
|
||||||
|
"postActionIgnoreSkipped": true,
|
||||||
"allowedAttachmentTypes": ["application/pdf", "image/*"]
|
"allowedAttachmentTypes": ["application/pdf", "image/*"]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,7 +16,7 @@ from email.parser import BytesParser
|
|||||||
from email.utils import parseaddr
|
from email.utils import parseaddr
|
||||||
from fnmatch import fnmatch
|
from fnmatch import fnmatch
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any, Literal
|
||||||
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
from pydantic import Field
|
from pydantic import Field
|
||||||
@ -53,6 +53,9 @@ class EmailConfig(Base):
|
|||||||
auto_reply_enabled: bool = True
|
auto_reply_enabled: bool = True
|
||||||
poll_interval_seconds: int = 30
|
poll_interval_seconds: int = 30
|
||||||
mark_seen: bool = True
|
mark_seen: bool = True
|
||||||
|
post_action: Literal["delete", "move"] | None = None
|
||||||
|
post_action_move_mailbox: str | None = None
|
||||||
|
post_action_ignore_skipped: bool = True
|
||||||
max_body_chars: int = 12000
|
max_body_chars: int = 12000
|
||||||
subject_prefix: str = "Re: "
|
subject_prefix: str = "Re: "
|
||||||
allow_from: list[str] = Field(default_factory=list)
|
allow_from: list[str] = Field(default_factory=list)
|
||||||
@ -150,7 +153,9 @@ class EmailChannel(BaseChannel):
|
|||||||
poll_seconds = max(5, int(self.config.poll_interval_seconds))
|
poll_seconds = max(5, int(self.config.poll_interval_seconds))
|
||||||
while self._running:
|
while self._running:
|
||||||
try:
|
try:
|
||||||
inbound_items = await asyncio.to_thread(self._fetch_new_messages)
|
inbound_items, skipped_uids = await asyncio.to_thread(self._fetch_new_messages)
|
||||||
|
should_apply_post_action = self._should_apply_post_action()
|
||||||
|
post_actions_uids: set[str] = set()
|
||||||
for item in inbound_items:
|
for item in inbound_items:
|
||||||
sender = item["sender"]
|
sender = item["sender"]
|
||||||
subject = item.get("subject", "")
|
subject = item.get("subject", "")
|
||||||
@ -161,13 +166,27 @@ class EmailChannel(BaseChannel):
|
|||||||
if message_id:
|
if message_id:
|
||||||
self._last_message_id_by_chat[sender] = message_id
|
self._last_message_id_by_chat[sender] = message_id
|
||||||
|
|
||||||
await self._handle_message(
|
try:
|
||||||
sender_id=sender,
|
await self._handle_message(
|
||||||
chat_id=sender,
|
sender_id=sender,
|
||||||
content=item["content"],
|
chat_id=sender,
|
||||||
media=item.get("media") or None,
|
content=item["content"],
|
||||||
metadata=item.get("metadata", {}),
|
media=item.get("media") or None,
|
||||||
)
|
metadata=item.get("metadata", {}),
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
self.logger.exception("Error delivering email from {}", sender)
|
||||||
|
continue
|
||||||
|
|
||||||
|
uid = str((item.get("metadata") or {}).get("uid") or "")
|
||||||
|
if uid and should_apply_post_action:
|
||||||
|
post_actions_uids.add(uid)
|
||||||
|
|
||||||
|
if should_apply_post_action and not self.config.post_action_ignore_skipped:
|
||||||
|
post_actions_uids.update(skipped_uids)
|
||||||
|
|
||||||
|
if post_actions_uids:
|
||||||
|
await asyncio.to_thread(self._apply_post_actions_batch, sorted(post_actions_uids))
|
||||||
except Exception:
|
except Exception:
|
||||||
self.logger.exception("Polling error")
|
self.logger.exception("Polling error")
|
||||||
|
|
||||||
@ -295,6 +314,9 @@ class EmailChannel(BaseChannel):
|
|||||||
if not self.config.smtp_password:
|
if not self.config.smtp_password:
|
||||||
missing.append("smtp_password")
|
missing.append("smtp_password")
|
||||||
|
|
||||||
|
if self.config.post_action == "move" and not (self.config.post_action_move_mailbox or "").strip():
|
||||||
|
missing.append("post_action_move_mailbox")
|
||||||
|
|
||||||
if missing:
|
if missing:
|
||||||
self.logger.error("Channel not configured, missing: {}", ', '.join(missing))
|
self.logger.error("Channel not configured, missing: {}", ', '.join(missing))
|
||||||
return False
|
return False
|
||||||
@ -318,7 +340,7 @@ class EmailChannel(BaseChannel):
|
|||||||
smtp.login(self.config.smtp_username, self.config.smtp_password)
|
smtp.login(self.config.smtp_username, self.config.smtp_password)
|
||||||
smtp.send_message(msg)
|
smtp.send_message(msg)
|
||||||
|
|
||||||
def _fetch_new_messages(self) -> list[dict[str, Any]]:
|
def _fetch_new_messages(self) -> tuple[list[dict[str, Any]], set[str]]:
|
||||||
"""Poll IMAP and return parsed unread messages."""
|
"""Poll IMAP and return parsed unread messages."""
|
||||||
return self._fetch_messages(
|
return self._fetch_messages(
|
||||||
search_criteria=("UNSEEN",),
|
search_criteria=("UNSEEN",),
|
||||||
@ -341,7 +363,7 @@ class EmailChannel(BaseChannel):
|
|||||||
if end_date <= start_date:
|
if end_date <= start_date:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
return self._fetch_messages(
|
messages, _ = self._fetch_messages(
|
||||||
search_criteria=(
|
search_criteria=(
|
||||||
"SINCE",
|
"SINCE",
|
||||||
self._format_imap_date(start_date),
|
self._format_imap_date(start_date),
|
||||||
@ -352,6 +374,7 @@ class EmailChannel(BaseChannel):
|
|||||||
dedupe=False,
|
dedupe=False,
|
||||||
limit=max(1, int(limit)),
|
limit=max(1, int(limit)),
|
||||||
)
|
)
|
||||||
|
return messages
|
||||||
|
|
||||||
def _fetch_messages(
|
def _fetch_messages(
|
||||||
self,
|
self,
|
||||||
@ -359,8 +382,9 @@ class EmailChannel(BaseChannel):
|
|||||||
mark_seen: bool,
|
mark_seen: bool,
|
||||||
dedupe: bool,
|
dedupe: bool,
|
||||||
limit: int,
|
limit: int,
|
||||||
) -> list[dict[str, Any]]:
|
) -> tuple[list[dict[str, Any]], set[str]]:
|
||||||
messages: list[dict[str, Any]] = []
|
messages: list[dict[str, Any]] = []
|
||||||
|
skipped_uids: set[str] = set()
|
||||||
cycle_uids: set[str] = set()
|
cycle_uids: set[str] = set()
|
||||||
|
|
||||||
for attempt in range(2):
|
for attempt in range(2):
|
||||||
@ -371,15 +395,16 @@ class EmailChannel(BaseChannel):
|
|||||||
dedupe,
|
dedupe,
|
||||||
limit,
|
limit,
|
||||||
messages,
|
messages,
|
||||||
|
skipped_uids,
|
||||||
cycle_uids,
|
cycle_uids,
|
||||||
)
|
)
|
||||||
return messages
|
return messages, skipped_uids
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
if attempt == 1 or not self._is_stale_imap_error(exc):
|
if attempt == 1 or not self._is_stale_imap_error(exc):
|
||||||
raise
|
raise
|
||||||
self.logger.warning("IMAP connection went stale, retrying once: {}", exc)
|
self.logger.warning("IMAP connection went stale, retrying once: {}", exc)
|
||||||
|
|
||||||
return messages
|
return messages, skipped_uids
|
||||||
|
|
||||||
def _fetch_messages_once(
|
def _fetch_messages_once(
|
||||||
self,
|
self,
|
||||||
@ -388,6 +413,7 @@ class EmailChannel(BaseChannel):
|
|||||||
dedupe: bool,
|
dedupe: bool,
|
||||||
limit: int,
|
limit: int,
|
||||||
messages: list[dict[str, Any]],
|
messages: list[dict[str, Any]],
|
||||||
|
skipped_uids: set[str],
|
||||||
cycle_uids: set[str],
|
cycle_uids: set[str],
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Fetch messages by arbitrary IMAP search criteria."""
|
"""Fetch messages by arbitrary IMAP search criteria."""
|
||||||
@ -429,6 +455,8 @@ class EmailChannel(BaseChannel):
|
|||||||
self._remember_processed_uid(uid, dedupe, cycle_uids)
|
self._remember_processed_uid(uid, dedupe, cycle_uids)
|
||||||
if mark_seen:
|
if mark_seen:
|
||||||
client.store(imap_id, "+FLAGS", "\\Seen")
|
client.store(imap_id, "+FLAGS", "\\Seen")
|
||||||
|
if uid:
|
||||||
|
skipped_uids.add(uid)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# --- Anti-spoofing: verify Authentication-Results ---
|
# --- Anti-spoofing: verify Authentication-Results ---
|
||||||
@ -440,6 +468,8 @@ class EmailChannel(BaseChannel):
|
|||||||
sender,
|
sender,
|
||||||
)
|
)
|
||||||
self._remember_processed_uid(uid, dedupe, cycle_uids)
|
self._remember_processed_uid(uid, dedupe, cycle_uids)
|
||||||
|
if uid:
|
||||||
|
skipped_uids.add(uid)
|
||||||
continue
|
continue
|
||||||
if self.config.verify_dkim and not dkim_pass:
|
if self.config.verify_dkim and not dkim_pass:
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
@ -448,12 +478,16 @@ class EmailChannel(BaseChannel):
|
|||||||
sender,
|
sender,
|
||||||
)
|
)
|
||||||
self._remember_processed_uid(uid, dedupe, cycle_uids)
|
self._remember_processed_uid(uid, dedupe, cycle_uids)
|
||||||
|
if uid:
|
||||||
|
skipped_uids.add(uid)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if not self.is_allowed(sender):
|
if not self.is_allowed(sender):
|
||||||
self._remember_processed_uid(uid, dedupe, cycle_uids)
|
self._remember_processed_uid(uid, dedupe, cycle_uids)
|
||||||
if mark_seen:
|
if mark_seen:
|
||||||
client.store(imap_id, "+FLAGS", "\\Seen")
|
client.store(imap_id, "+FLAGS", "\\Seen")
|
||||||
|
if uid:
|
||||||
|
skipped_uids.add(uid)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
subject = self._decode_header_value(parsed.get("Subject", ""))
|
subject = self._decode_header_value(parsed.get("Subject", ""))
|
||||||
@ -588,6 +622,48 @@ class EmailChannel(BaseChannel):
|
|||||||
# Evict a random half to cap memory; mark_seen is the primary dedup
|
# Evict a random half to cap memory; mark_seen is the primary dedup
|
||||||
self._processed_uids = set(list(self._processed_uids)[len(self._processed_uids) // 2:])
|
self._processed_uids = set(list(self._processed_uids)[len(self._processed_uids) // 2:])
|
||||||
|
|
||||||
|
def _should_apply_post_action(self) -> bool:
|
||||||
|
return self.config.post_action in {"delete", "move"}
|
||||||
|
|
||||||
|
def _apply_post_actions_batch(self, post_actions_uids: list[str]) -> None:
|
||||||
|
if not self._should_apply_post_action() or not post_actions_uids:
|
||||||
|
return
|
||||||
|
|
||||||
|
mailbox = self.config.imap_mailbox or "INBOX"
|
||||||
|
client = self._open_imap_client(mailbox=mailbox)
|
||||||
|
if client is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
for uid in post_actions_uids:
|
||||||
|
if uid:
|
||||||
|
self._apply_post_action(client, uid)
|
||||||
|
finally:
|
||||||
|
self._close_imap_client(client)
|
||||||
|
|
||||||
|
def _apply_post_action(self, client: Any, uid: str) -> None:
|
||||||
|
status, data = client.search(None, "UID", uid)
|
||||||
|
if status != "OK" or not data or not data[0]:
|
||||||
|
self.logger.warning("Post-action skipped: UID {} not found", uid)
|
||||||
|
return
|
||||||
|
|
||||||
|
imap_id = data[0].split()[0]
|
||||||
|
action = self.config.post_action
|
||||||
|
|
||||||
|
if action == "delete":
|
||||||
|
client.store(imap_id, "+FLAGS", "\\Deleted")
|
||||||
|
client.expunge()
|
||||||
|
return
|
||||||
|
|
||||||
|
if action == "move":
|
||||||
|
target = (self.config.post_action_move_mailbox or "").strip()
|
||||||
|
status, _ = client.copy(imap_id, target)
|
||||||
|
if status != "OK":
|
||||||
|
self.logger.warning("Post-action move failed for UID {} to mailbox {}", uid, target)
|
||||||
|
return
|
||||||
|
client.store(imap_id, "+FLAGS", "\\Deleted")
|
||||||
|
client.expunge()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _is_stale_imap_error(cls, exc: Exception) -> bool:
|
def _is_stale_imap_error(cls, exc: Exception) -> bool:
|
||||||
message = str(exc).lower()
|
message = str(exc).lower()
|
||||||
|
|||||||
@ -79,17 +79,294 @@ def test_fetch_new_messages_parses_unseen_and_marks_seen(monkeypatch) -> None:
|
|||||||
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
|
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
|
||||||
|
|
||||||
channel = EmailChannel(_make_config(), MessageBus())
|
channel = EmailChannel(_make_config(), MessageBus())
|
||||||
items = channel._fetch_new_messages()
|
items, skipped_uids = channel._fetch_new_messages()
|
||||||
|
|
||||||
assert len(items) == 1
|
assert len(items) == 1
|
||||||
assert items[0]["sender"] == "alice@example.com"
|
assert items[0]["sender"] == "alice@example.com"
|
||||||
assert items[0]["subject"] == "Invoice"
|
assert items[0]["subject"] == "Invoice"
|
||||||
assert "Please pay" in items[0]["content"]
|
assert "Please pay" in items[0]["content"]
|
||||||
assert fake.store_calls == [(b"1", "+FLAGS", "\\Seen")]
|
assert fake.store_calls == [(b"1", "+FLAGS", "\\Seen")]
|
||||||
|
assert skipped_uids == set()
|
||||||
|
|
||||||
# Same UID should be deduped in-process.
|
# Same UID should be deduped in-process.
|
||||||
items_again = channel._fetch_new_messages()
|
items_again, skipped_again = channel._fetch_new_messages()
|
||||||
assert items_again == []
|
assert items_again == []
|
||||||
|
assert skipped_again == set()
|
||||||
|
|
||||||
|
|
||||||
|
def test_fetch_new_messages_returns_accepted_and_skipped_uids(monkeypatch) -> None:
|
||||||
|
raw = _make_raw_email(subject="Invoice", body="Please pay")
|
||||||
|
|
||||||
|
class FakeIMAP:
|
||||||
|
def login(self, _user: str, _pw: str):
|
||||||
|
return "OK", [b"logged in"]
|
||||||
|
|
||||||
|
def select(self, _mailbox: str):
|
||||||
|
return "OK", [b"1"]
|
||||||
|
|
||||||
|
def search(self, *_args):
|
||||||
|
return "OK", [b"1"]
|
||||||
|
|
||||||
|
def fetch(self, _imap_id: bytes, _parts: str):
|
||||||
|
return "OK", [(b"1 (UID 123 BODY[] {200})", raw), b")"]
|
||||||
|
|
||||||
|
def store(self, _imap_id: bytes, _op: str, _flags: str):
|
||||||
|
return "OK", [b""]
|
||||||
|
|
||||||
|
def logout(self):
|
||||||
|
return "BYE", [b""]
|
||||||
|
|
||||||
|
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: FakeIMAP())
|
||||||
|
|
||||||
|
channel = EmailChannel(_make_config(post_action="delete"), MessageBus())
|
||||||
|
items, skipped_uids = channel._fetch_new_messages()
|
||||||
|
|
||||||
|
assert len(items) == 1
|
||||||
|
assert items[0]["metadata"]["uid"] == "123"
|
||||||
|
assert skipped_uids == set()
|
||||||
|
|
||||||
|
|
||||||
|
def test_fetch_new_messages_rejected_returns_skipped_uid(monkeypatch) -> None:
|
||||||
|
raw = _make_raw_email(from_addr="Nanobot <bot@example.com>", subject="Loop test")
|
||||||
|
|
||||||
|
class FakeIMAP:
|
||||||
|
def login(self, _user: str, _pw: str):
|
||||||
|
return "OK", [b"logged in"]
|
||||||
|
|
||||||
|
def select(self, _mailbox: str):
|
||||||
|
return "OK", [b"1"]
|
||||||
|
|
||||||
|
def search(self, *_args):
|
||||||
|
return "OK", [b"1"]
|
||||||
|
|
||||||
|
def fetch(self, _imap_id: bytes, _parts: str):
|
||||||
|
return "OK", [(b"1 (UID 123 BODY[] {200})", raw), b")"]
|
||||||
|
|
||||||
|
def store(self, _imap_id: bytes, _op: str, _flags: str):
|
||||||
|
return "OK", [b""]
|
||||||
|
|
||||||
|
def logout(self):
|
||||||
|
return "BYE", [b""]
|
||||||
|
|
||||||
|
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: FakeIMAP())
|
||||||
|
|
||||||
|
channel_skip = EmailChannel(
|
||||||
|
_make_config(from_address="bot@example.com", post_action="delete", post_action_ignore_skipped=True),
|
||||||
|
MessageBus(),
|
||||||
|
)
|
||||||
|
assert channel_skip._fetch_new_messages() == ([], {"123"})
|
||||||
|
|
||||||
|
channel_apply = EmailChannel(
|
||||||
|
_make_config(from_address="bot@example.com", post_action="delete", post_action_ignore_skipped=False),
|
||||||
|
MessageBus(),
|
||||||
|
)
|
||||||
|
items, skipped_uids = channel_apply._fetch_new_messages()
|
||||||
|
assert items == []
|
||||||
|
assert skipped_uids == {"123"}
|
||||||
|
|
||||||
|
|
||||||
|
def test_apply_post_actions_batch_delete_uses_one_connection(monkeypatch) -> None:
|
||||||
|
raw = _make_raw_email(subject="Invoice", body="Please pay")
|
||||||
|
|
||||||
|
class FakeIMAP:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.search_calls: list[tuple] = []
|
||||||
|
self.store_calls: list[tuple[bytes, str, str]] = []
|
||||||
|
self.expunge_calls = 0
|
||||||
|
|
||||||
|
def login(self, _user: str, _pw: str):
|
||||||
|
return "OK", [b"logged in"]
|
||||||
|
|
||||||
|
def select(self, _mailbox: str):
|
||||||
|
return "OK", [b"1"]
|
||||||
|
|
||||||
|
def search(self, *_args):
|
||||||
|
self.search_calls.append(_args)
|
||||||
|
if len(_args) >= 3 and _args[1] == "UID":
|
||||||
|
return "OK", [b"1"]
|
||||||
|
return "OK", [b"1"]
|
||||||
|
|
||||||
|
def fetch(self, _imap_id: bytes, _parts: str):
|
||||||
|
return "OK", [(b"1 (UID 123 BODY[] {200})", raw), b")"]
|
||||||
|
|
||||||
|
def store(self, imap_id: bytes, op: str, flags: str):
|
||||||
|
self.store_calls.append((imap_id, op, flags))
|
||||||
|
return "OK", [b""]
|
||||||
|
|
||||||
|
def expunge(self):
|
||||||
|
self.expunge_calls += 1
|
||||||
|
return "OK", [b""]
|
||||||
|
|
||||||
|
def logout(self):
|
||||||
|
return "BYE", [b""]
|
||||||
|
|
||||||
|
fake = FakeIMAP()
|
||||||
|
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
|
||||||
|
|
||||||
|
channel = EmailChannel(_make_config(post_action="delete"), MessageBus())
|
||||||
|
channel._apply_post_actions_batch(["123", "124"])
|
||||||
|
|
||||||
|
assert (b"1", "+FLAGS", "\\Deleted") in fake.store_calls
|
||||||
|
assert fake.expunge_calls == 2
|
||||||
|
uid_searches = [call for call in fake.search_calls if len(call) >= 3 and call[1] == "UID"]
|
||||||
|
assert uid_searches == [(None, "UID", "123"), (None, "UID", "124")]
|
||||||
|
|
||||||
|
|
||||||
|
def test_apply_post_actions_batch_move_copies_then_deletes(monkeypatch) -> None:
|
||||||
|
class FakeIMAP:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.copy_calls: list[tuple[bytes, str]] = []
|
||||||
|
self.store_calls: list[tuple[bytes, str, str]] = []
|
||||||
|
self.expunge_calls = 0
|
||||||
|
|
||||||
|
def login(self, _user: str, _pw: str):
|
||||||
|
return "OK", [b"logged in"]
|
||||||
|
|
||||||
|
def select(self, _mailbox: str):
|
||||||
|
return "OK", [b"1"]
|
||||||
|
|
||||||
|
def search(self, *_args):
|
||||||
|
return "OK", [b"1"]
|
||||||
|
|
||||||
|
def copy(self, imap_id: bytes, mailbox: str):
|
||||||
|
self.copy_calls.append((imap_id, mailbox))
|
||||||
|
return "OK", [b""]
|
||||||
|
|
||||||
|
def store(self, imap_id: bytes, op: str, flags: str):
|
||||||
|
self.store_calls.append((imap_id, op, flags))
|
||||||
|
return "OK", [b""]
|
||||||
|
|
||||||
|
def expunge(self):
|
||||||
|
self.expunge_calls += 1
|
||||||
|
return "OK", [b""]
|
||||||
|
|
||||||
|
def logout(self):
|
||||||
|
return "BYE", [b""]
|
||||||
|
|
||||||
|
fake = FakeIMAP()
|
||||||
|
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
|
||||||
|
|
||||||
|
channel = EmailChannel(
|
||||||
|
_make_config(post_action="move", post_action_move_mailbox="Processed"),
|
||||||
|
MessageBus(),
|
||||||
|
)
|
||||||
|
channel._apply_post_actions_batch(["123"])
|
||||||
|
|
||||||
|
assert fake.copy_calls == [(b"1", "Processed")]
|
||||||
|
assert fake.store_calls == [(b"1", "+FLAGS", "\\Deleted")]
|
||||||
|
assert fake.expunge_calls == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_start_applies_post_action_only_after_delivery(monkeypatch) -> None:
|
||||||
|
calls: list[str] = []
|
||||||
|
|
||||||
|
channel = EmailChannel(_make_config(post_action="delete"), MessageBus())
|
||||||
|
|
||||||
|
fetched = ([
|
||||||
|
{
|
||||||
|
"sender": "alice@example.com",
|
||||||
|
"subject": "Hi",
|
||||||
|
"message_id": "<m1@example.com>",
|
||||||
|
"content": "hello",
|
||||||
|
"metadata": {"uid": "123"},
|
||||||
|
}
|
||||||
|
], [])
|
||||||
|
|
||||||
|
def _fake_fetch():
|
||||||
|
channel._running = False
|
||||||
|
return fetched
|
||||||
|
|
||||||
|
async def _fake_handle_message(**_kwargs):
|
||||||
|
calls.append("delivered")
|
||||||
|
|
||||||
|
def _fake_batch(actions):
|
||||||
|
assert calls == ["delivered"]
|
||||||
|
assert actions == ["123"]
|
||||||
|
calls.append("post_action")
|
||||||
|
|
||||||
|
monkeypatch.setattr(channel, "_fetch_new_messages", _fake_fetch)
|
||||||
|
monkeypatch.setattr(channel, "_handle_message", _fake_handle_message)
|
||||||
|
monkeypatch.setattr(channel, "_apply_post_actions_batch", _fake_batch)
|
||||||
|
|
||||||
|
await channel.start()
|
||||||
|
assert calls == ["delivered", "post_action"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_start_skips_post_action_when_delivery_fails(monkeypatch) -> None:
|
||||||
|
called = {"post_action": False}
|
||||||
|
|
||||||
|
channel = EmailChannel(_make_config(post_action="delete"), MessageBus())
|
||||||
|
|
||||||
|
fetched = ([
|
||||||
|
{
|
||||||
|
"sender": "alice@example.com",
|
||||||
|
"subject": "Hi",
|
||||||
|
"message_id": "<m1@example.com>",
|
||||||
|
"content": "hello",
|
||||||
|
"metadata": {"uid": "123"},
|
||||||
|
}
|
||||||
|
], [])
|
||||||
|
|
||||||
|
def _fake_fetch():
|
||||||
|
channel._running = False
|
||||||
|
return fetched
|
||||||
|
|
||||||
|
async def _fake_handle_message(**_kwargs):
|
||||||
|
raise RuntimeError("delivery failed")
|
||||||
|
|
||||||
|
def _fake_batch(_actions):
|
||||||
|
called["post_action"] = True
|
||||||
|
|
||||||
|
monkeypatch.setattr(channel, "_fetch_new_messages", _fake_fetch)
|
||||||
|
monkeypatch.setattr(channel, "_handle_message", _fake_handle_message)
|
||||||
|
monkeypatch.setattr(channel, "_apply_post_actions_batch", _fake_batch)
|
||||||
|
|
||||||
|
await channel.start()
|
||||||
|
assert called["post_action"] is False
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_start_keeps_post_actions_for_successful_emails_when_later_delivery_fails(monkeypatch) -> None:
|
||||||
|
called_actions: list[str] = []
|
||||||
|
|
||||||
|
channel = EmailChannel(_make_config(post_action="delete"), MessageBus())
|
||||||
|
|
||||||
|
fetched = ([
|
||||||
|
{
|
||||||
|
"sender": "alice@example.com",
|
||||||
|
"subject": "First",
|
||||||
|
"message_id": "<m1@example.com>",
|
||||||
|
"content": "ok",
|
||||||
|
"metadata": {"uid": "123"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"sender": "bob@example.com",
|
||||||
|
"subject": "Second",
|
||||||
|
"message_id": "<m2@example.com>",
|
||||||
|
"content": "fail",
|
||||||
|
"metadata": {"uid": "124"},
|
||||||
|
},
|
||||||
|
], [])
|
||||||
|
|
||||||
|
def _fake_fetch():
|
||||||
|
channel._running = False
|
||||||
|
return fetched
|
||||||
|
|
||||||
|
async def _fake_handle_message(**kwargs):
|
||||||
|
if kwargs["chat_id"] == "bob@example.com":
|
||||||
|
raise RuntimeError("delivery failed")
|
||||||
|
|
||||||
|
def _fake_batch(actions):
|
||||||
|
called_actions.extend(actions)
|
||||||
|
|
||||||
|
monkeypatch.setattr(channel, "_fetch_new_messages", _fake_fetch)
|
||||||
|
monkeypatch.setattr(channel, "_handle_message", _fake_handle_message)
|
||||||
|
monkeypatch.setattr(channel, "_apply_post_actions_batch", _fake_batch)
|
||||||
|
|
||||||
|
await channel.start()
|
||||||
|
assert called_actions == ["123"]
|
||||||
|
|
||||||
|
|
||||||
def test_fetch_new_messages_skips_self_sent_email_and_marks_seen(monkeypatch) -> None:
|
def test_fetch_new_messages_skips_self_sent_email_and_marks_seen(monkeypatch) -> None:
|
||||||
@ -122,14 +399,16 @@ def test_fetch_new_messages_skips_self_sent_email_and_marks_seen(monkeypatch) ->
|
|||||||
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
|
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
|
||||||
|
|
||||||
channel = EmailChannel(_make_config(from_address="bot@example.com"), MessageBus())
|
channel = EmailChannel(_make_config(from_address="bot@example.com"), MessageBus())
|
||||||
items = channel._fetch_new_messages()
|
items, skipped_uids = channel._fetch_new_messages()
|
||||||
|
|
||||||
assert items == []
|
assert items == []
|
||||||
|
assert skipped_uids == {"123"}
|
||||||
assert fake.store_calls == [(b"1", "+FLAGS", "\\Seen")]
|
assert fake.store_calls == [(b"1", "+FLAGS", "\\Seen")]
|
||||||
|
|
||||||
# Same UID should still be deduped after being ignored.
|
# Same UID should still be deduped after being ignored.
|
||||||
items_again = channel._fetch_new_messages()
|
items_again, skipped_again = channel._fetch_new_messages()
|
||||||
assert items_again == []
|
assert items_again == []
|
||||||
|
assert skipped_again == set()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
@ -189,7 +468,7 @@ def test_fetch_new_messages_skips_self_sent_across_identity_sources(
|
|||||||
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
|
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
|
||||||
|
|
||||||
channel = EmailChannel(_make_config(**config_override), MessageBus())
|
channel = EmailChannel(_make_config(**config_override), MessageBus())
|
||||||
items = channel._fetch_new_messages()
|
items, _ = channel._fetch_new_messages()
|
||||||
|
|
||||||
assert items == []
|
assert items == []
|
||||||
assert fake.store_calls == [(b"1", "+FLAGS", "\\Seen")]
|
assert fake.store_calls == [(b"1", "+FLAGS", "\\Seen")]
|
||||||
@ -237,7 +516,7 @@ def test_fetch_new_messages_retries_once_when_imap_connection_goes_stale(monkeyp
|
|||||||
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", _factory)
|
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", _factory)
|
||||||
|
|
||||||
channel = EmailChannel(_make_config(), MessageBus())
|
channel = EmailChannel(_make_config(), MessageBus())
|
||||||
items = channel._fetch_new_messages()
|
items, _ = channel._fetch_new_messages()
|
||||||
|
|
||||||
assert len(items) == 1
|
assert len(items) == 1
|
||||||
assert len(fake_instances) == 2
|
assert len(fake_instances) == 2
|
||||||
@ -283,7 +562,7 @@ def test_fetch_new_messages_keeps_messages_collected_before_stale_retry(monkeypa
|
|||||||
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: FlakyIMAP())
|
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: FlakyIMAP())
|
||||||
|
|
||||||
channel = EmailChannel(_make_config(), MessageBus())
|
channel = EmailChannel(_make_config(), MessageBus())
|
||||||
items = channel._fetch_new_messages()
|
items, _ = channel._fetch_new_messages()
|
||||||
|
|
||||||
assert [item["subject"] for item in items] == ["First", "Second"]
|
assert [item["subject"] for item in items] == ["First", "Second"]
|
||||||
|
|
||||||
@ -306,7 +585,12 @@ def test_fetch_new_messages_skips_missing_mailbox(monkeypatch) -> None:
|
|||||||
|
|
||||||
channel = EmailChannel(_make_config(), MessageBus())
|
channel = EmailChannel(_make_config(), MessageBus())
|
||||||
|
|
||||||
assert channel._fetch_new_messages() == []
|
assert channel._fetch_new_messages() == ([], set())
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_config_requires_move_mailbox_for_move_post_action() -> None:
|
||||||
|
channel = EmailChannel(_make_config(post_action="move", post_action_move_mailbox=None), MessageBus())
|
||||||
|
assert channel._validate_config() is False
|
||||||
|
|
||||||
|
|
||||||
def test_extract_text_body_falls_back_to_html() -> None:
|
def test_extract_text_body_falls_back_to_html() -> None:
|
||||||
@ -662,7 +946,7 @@ def test_spoofed_email_rejected_when_verify_enabled(monkeypatch) -> None:
|
|||||||
|
|
||||||
cfg = _make_config(verify_dkim=True, verify_spf=True)
|
cfg = _make_config(verify_dkim=True, verify_spf=True)
|
||||||
channel = EmailChannel(cfg, MessageBus())
|
channel = EmailChannel(cfg, MessageBus())
|
||||||
items = channel._fetch_new_messages()
|
items, _ = channel._fetch_new_messages()
|
||||||
|
|
||||||
assert len(items) == 0, "Spoofed email without auth headers should be rejected"
|
assert len(items) == 0, "Spoofed email without auth headers should be rejected"
|
||||||
|
|
||||||
@ -679,7 +963,7 @@ def test_email_with_valid_auth_results_accepted(monkeypatch) -> None:
|
|||||||
|
|
||||||
cfg = _make_config(verify_dkim=True, verify_spf=True)
|
cfg = _make_config(verify_dkim=True, verify_spf=True)
|
||||||
channel = EmailChannel(cfg, MessageBus())
|
channel = EmailChannel(cfg, MessageBus())
|
||||||
items = channel._fetch_new_messages()
|
items, _ = channel._fetch_new_messages()
|
||||||
|
|
||||||
assert len(items) == 1
|
assert len(items) == 1
|
||||||
assert items[0]["sender"] == "alice@example.com"
|
assert items[0]["sender"] == "alice@example.com"
|
||||||
@ -698,7 +982,7 @@ def test_email_with_partial_auth_rejected(monkeypatch) -> None:
|
|||||||
|
|
||||||
cfg = _make_config(verify_dkim=True, verify_spf=True)
|
cfg = _make_config(verify_dkim=True, verify_spf=True)
|
||||||
channel = EmailChannel(cfg, MessageBus())
|
channel = EmailChannel(cfg, MessageBus())
|
||||||
items = channel._fetch_new_messages()
|
items, _ = channel._fetch_new_messages()
|
||||||
|
|
||||||
assert len(items) == 0, "Email with dkim=fail should be rejected"
|
assert len(items) == 0, "Email with dkim=fail should be rejected"
|
||||||
|
|
||||||
@ -711,7 +995,7 @@ def test_backward_compat_verify_disabled(monkeypatch) -> None:
|
|||||||
|
|
||||||
cfg = _make_config(verify_dkim=False, verify_spf=False)
|
cfg = _make_config(verify_dkim=False, verify_spf=False)
|
||||||
channel = EmailChannel(cfg, MessageBus())
|
channel = EmailChannel(cfg, MessageBus())
|
||||||
items = channel._fetch_new_messages()
|
items, _ = channel._fetch_new_messages()
|
||||||
|
|
||||||
assert len(items) == 1, "With verification disabled, emails should be accepted as before"
|
assert len(items) == 1, "With verification disabled, emails should be accepted as before"
|
||||||
|
|
||||||
@ -724,7 +1008,7 @@ def test_email_content_tagged_with_email_context(monkeypatch) -> None:
|
|||||||
|
|
||||||
cfg = _make_config(verify_dkim=False, verify_spf=False)
|
cfg = _make_config(verify_dkim=False, verify_spf=False)
|
||||||
channel = EmailChannel(cfg, MessageBus())
|
channel = EmailChannel(cfg, MessageBus())
|
||||||
items = channel._fetch_new_messages()
|
items, _ = channel._fetch_new_messages()
|
||||||
|
|
||||||
assert len(items) == 1
|
assert len(items) == 1
|
||||||
assert items[0]["content"].startswith("[EMAIL-CONTEXT]"), (
|
assert items[0]["content"].startswith("[EMAIL-CONTEXT]"), (
|
||||||
@ -836,7 +1120,7 @@ def test_fetch_new_messages_ignores_unauthorized_sender_before_attachments(monke
|
|||||||
)
|
)
|
||||||
channel = EmailChannel(cfg, MessageBus())
|
channel = EmailChannel(cfg, MessageBus())
|
||||||
|
|
||||||
assert channel._fetch_new_messages() == []
|
assert channel._fetch_new_messages() == ([], {"500"})
|
||||||
assert called["attachments"] is False
|
assert called["attachments"] is False
|
||||||
assert fake.store_calls == [(b"1", "+FLAGS", "\\Seen")]
|
assert fake.store_calls == [(b"1", "+FLAGS", "\\Seen")]
|
||||||
|
|
||||||
@ -851,7 +1135,7 @@ def test_extract_attachments_saves_pdf(tmp_path, monkeypatch) -> None:
|
|||||||
|
|
||||||
cfg = _make_config(allowed_attachment_types=["application/pdf"], verify_dkim=False, verify_spf=False)
|
cfg = _make_config(allowed_attachment_types=["application/pdf"], verify_dkim=False, verify_spf=False)
|
||||||
channel = EmailChannel(cfg, MessageBus())
|
channel = EmailChannel(cfg, MessageBus())
|
||||||
items = channel._fetch_new_messages()
|
items, _ = channel._fetch_new_messages()
|
||||||
|
|
||||||
assert len(items) == 1
|
assert len(items) == 1
|
||||||
assert len(items[0]["media"]) == 1
|
assert len(items[0]["media"]) == 1
|
||||||
@ -871,7 +1155,7 @@ def test_extract_attachments_disabled_by_default(monkeypatch) -> None:
|
|||||||
cfg = _make_config(verify_dkim=False, verify_spf=False)
|
cfg = _make_config(verify_dkim=False, verify_spf=False)
|
||||||
assert cfg.allowed_attachment_types == []
|
assert cfg.allowed_attachment_types == []
|
||||||
channel = EmailChannel(cfg, MessageBus())
|
channel = EmailChannel(cfg, MessageBus())
|
||||||
items = channel._fetch_new_messages()
|
items, _ = channel._fetch_new_messages()
|
||||||
|
|
||||||
assert len(items) == 1
|
assert len(items) == 1
|
||||||
assert items[0]["media"] == []
|
assert items[0]["media"] == []
|
||||||
@ -896,7 +1180,7 @@ def test_extract_attachments_mime_type_filter(tmp_path, monkeypatch) -> None:
|
|||||||
verify_spf=False,
|
verify_spf=False,
|
||||||
)
|
)
|
||||||
channel = EmailChannel(cfg, MessageBus())
|
channel = EmailChannel(cfg, MessageBus())
|
||||||
items = channel._fetch_new_messages()
|
items, _ = channel._fetch_new_messages()
|
||||||
|
|
||||||
assert len(items) == 1
|
assert len(items) == 1
|
||||||
assert items[0]["media"] == []
|
assert items[0]["media"] == []
|
||||||
@ -920,7 +1204,7 @@ def test_extract_attachments_empty_allowed_types_rejects_all(tmp_path, monkeypat
|
|||||||
verify_spf=False,
|
verify_spf=False,
|
||||||
)
|
)
|
||||||
channel = EmailChannel(cfg, MessageBus())
|
channel = EmailChannel(cfg, MessageBus())
|
||||||
items = channel._fetch_new_messages()
|
items, _ = channel._fetch_new_messages()
|
||||||
|
|
||||||
assert len(items) == 1
|
assert len(items) == 1
|
||||||
assert items[0]["media"] == []
|
assert items[0]["media"] == []
|
||||||
@ -944,7 +1228,7 @@ def test_extract_attachments_wildcard_pattern(tmp_path, monkeypatch) -> None:
|
|||||||
verify_spf=False,
|
verify_spf=False,
|
||||||
)
|
)
|
||||||
channel = EmailChannel(cfg, MessageBus())
|
channel = EmailChannel(cfg, MessageBus())
|
||||||
items = channel._fetch_new_messages()
|
items, _ = channel._fetch_new_messages()
|
||||||
|
|
||||||
assert len(items) == 1
|
assert len(items) == 1
|
||||||
assert len(items[0]["media"]) == 1
|
assert len(items[0]["media"]) == 1
|
||||||
@ -967,7 +1251,7 @@ def test_extract_attachments_size_limit(tmp_path, monkeypatch) -> None:
|
|||||||
verify_spf=False,
|
verify_spf=False,
|
||||||
)
|
)
|
||||||
channel = EmailChannel(cfg, MessageBus())
|
channel = EmailChannel(cfg, MessageBus())
|
||||||
items = channel._fetch_new_messages()
|
items, _ = channel._fetch_new_messages()
|
||||||
|
|
||||||
assert len(items) == 1
|
assert len(items) == 1
|
||||||
assert items[0]["media"] == []
|
assert items[0]["media"] == []
|
||||||
@ -1003,7 +1287,7 @@ def test_extract_attachments_max_count(tmp_path, monkeypatch) -> None:
|
|||||||
verify_spf=False,
|
verify_spf=False,
|
||||||
)
|
)
|
||||||
channel = EmailChannel(cfg, MessageBus())
|
channel = EmailChannel(cfg, MessageBus())
|
||||||
items = channel._fetch_new_messages()
|
items, _ = channel._fetch_new_messages()
|
||||||
|
|
||||||
assert len(items) == 1
|
assert len(items) == 1
|
||||||
assert len(items[0]["media"]) == 2
|
assert len(items[0]["media"]) == 2
|
||||||
@ -1021,7 +1305,7 @@ def test_extract_attachments_sanitizes_filename(tmp_path, monkeypatch) -> None:
|
|||||||
|
|
||||||
cfg = _make_config(allowed_attachment_types=["*"], verify_dkim=False, verify_spf=False)
|
cfg = _make_config(allowed_attachment_types=["*"], verify_dkim=False, verify_spf=False)
|
||||||
channel = EmailChannel(cfg, MessageBus())
|
channel = EmailChannel(cfg, MessageBus())
|
||||||
items = channel._fetch_new_messages()
|
items, _ = channel._fetch_new_messages()
|
||||||
|
|
||||||
assert len(items) == 1
|
assert len(items) == 1
|
||||||
assert len(items[0]["media"]) == 1
|
assert len(items[0]["media"]) == 1
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user