From 7c34910fd1798c35c4254bfba846094a6ab383e9 Mon Sep 17 00:00:00 2001
From: VanditKumar <112763701+KumarVandit@users.noreply.github.com>
Date: Mon, 30 Mar 2026 12:43:23 +0530
Subject: [PATCH] feat(channel): add iMessage integration (#2539)
* feat(channel): add iMessage integration
Add iMessage as a built-in channel via Photon's iMessage platform.
Local mode (macOS) reads ~/Library/Messages/chat.db via sqlite3 with
attachment support, sends via AppleScript, and handles voice
transcription. Remote mode connects to a Photon endpoint over pure
HTTP with proxy support, implementing send, attachments, tapback
reactions, typing indicators, mark-as-read, polls, groups, contacts,
and health checks.
Paragraph splitting sends each \n\n-separated block as a separate
iMessage bubble. Startup seeds existing message IDs to avoid
replaying old messages on restart.
Tested end-to-end on both modes.
* fix(imessage): address review feedback
- Add retry with exponential backoff for SQLite lock contention
- Escape newlines/tabs in AppleScript to prevent parse failures
- Lower _MAX_MESSAGE_LEN to 6000 (paragraph splitting handles UX)
- Add privacy note in README for Photon remote mode
- Merge poll interval constants; make local mode configurable too
---
README.md | 77 +++
nanobot/channels/imessage.py | 941 +++++++++++++++++++++++++++++++++++
2 files changed, 1018 insertions(+)
create mode 100644 nanobot/channels/imessage.py
diff --git a/README.md b/README.md
index 828b56477..563a2091e 100644
--- a/README.md
+++ b/README.md
@@ -259,6 +259,7 @@ Connect nanobot to your favorite chat platform. Want to build your own? See the
| **Email** | IMAP/SMTP credentials |
| **QQ** | App ID + App Secret |
| **Wecom** | Bot ID + Bot Secret |
+| **iMessage** | macOS (local) or Photon server credentials (remote) |
| **Mochat** | Claw token (auto-setup available) |
@@ -829,6 +830,82 @@ nanobot gateway
+
+iMessage
+
+Supports two modes via [Photon](https://photon.codes):
+
+- **Local mode**: macOS only. Reads the on-device iMessage database and sends via AppleScript. No external server needed.
+- **Remote mode**: Get your endpoint and API key from [Photon](https://photon.codes) and connect from any platform. Supports tapback reactions, typing indicators, mark-as-read, attachments, and inline replies.
+
+**Local mode (macOS)**
+
+1. Grant **Full Disk Access** to your terminal in **System Settings → Privacy & Security → Full Disk Access**
+2. Ensure iMessage is signed in and working on the Mac
+
+```json
+{
+ "channels": {
+ "imessage": {
+ "enabled": true,
+ "local": true,
+ "allowFrom": ["+1234567890"]
+ }
+ }
+}
+```
+
+```bash
+nanobot gateway
+```
+
+> Local mode supports sending/receiving text, images, and files. For reactions, typing indicators, and inline replies, use remote mode.
+
+**Remote mode**
+
+1. Get your **endpoint URL** and **API key** from [Photon](https://photon.codes)
+2. Configure:
+
+```json
+{
+ "channels": {
+ "imessage": {
+ "enabled": true,
+ "local": false,
+ "serverUrl": "https://xxxxx.imsgd.photon.codes",
+ "apiKey": "your-api-key",
+ "allowFrom": ["+1234567890"]
+ }
+ }
+}
+```
+
+```bash
+nanobot gateway
+```
+
+> `allowFrom`: Add phone numbers or email addresses. Use `["*"]` to allow all senders.
+> `groupPolicy`: `"open"` (default — respond to all messages) or `"ignore"` (skip group chats entirely).
+> `proxy`: Optional HTTP proxy URL (e.g. `"http://127.0.0.1:7890"`).
+> `pollInterval`: Polling interval in seconds (default `2.0`).
+
+> **Note:** Remote mode routes messages through Photon's [advanced-imessage-http-proxy](https://github.com/photon-hq/advanced-imessage-http-proxy). Your messages and attachments transit Photon's infrastructure — the same provider that hosts your iMessage Kit server. If you need full on-device privacy, use local mode instead.
+
+**Feature comparison:**
+
+| Feature | Local | Remote |
+|---------|-------|--------|
+| Send/receive messages | ✅ | ✅ |
+| Images & files | ✅ | ✅ |
+| Message history | ✅ | ✅ |
+| Reactions (tapbacks) | ❌ | ✅ |
+| Typing indicators | ❌ | ✅ |
+| Mark as read | ❌ | ✅ |
+| Inline replies | ❌ | ✅ (`replyToMessage: true`) |
+| Runs on any platform | ❌ | ✅ |
+
+
+
## 🌐 Agent Social Network
🐈 nanobot is capable of linking to the agent social network (agent community). **Just send one message and your nanobot joins automatically!**
diff --git a/nanobot/channels/imessage.py b/nanobot/channels/imessage.py
new file mode 100644
index 000000000..e77a15c09
--- /dev/null
+++ b/nanobot/channels/imessage.py
@@ -0,0 +1,941 @@
+"""iMessage channel using local macOS database or Photon advanced-imessage-http-proxy."""
+
+from __future__ import annotations
+
+import asyncio
+import base64
+import mimetypes
+import platform
+import sqlite3
+import subprocess
+from collections import OrderedDict
+from pathlib import Path
+from typing import Any, Literal
+
+import httpx
+from loguru import logger
+from pydantic import Field
+
+from nanobot.bus.events import OutboundMessage
+from nanobot.bus.queue import MessageBus
+from nanobot.channels.base import BaseChannel
+from nanobot.config.paths import get_media_dir
+from nanobot.config.schema import Base
+from nanobot.utils.helpers import split_message
+
+_DEFAULT_DB_PATH = str(Path.home() / "Library" / "Messages" / "chat.db")
+_DEFAULT_POLL_INTERVAL = 2.0
+
+_AUDIO_EXTENSIONS = frozenset({".m4a", ".mp3", ".wav", ".aac", ".ogg", ".caf", ".opus"})
+_MAX_MESSAGE_LEN = 6000
+
+
+def _split_paragraphs(text: str) -> list[str]:
+ """Split text on ``\\n\\n`` boundaries, then apply length limits to each chunk."""
+ parts: list[str] = []
+ for para in text.split("\n\n"):
+ stripped = para.strip()
+ if stripped:
+ parts.extend(split_message(stripped, _MAX_MESSAGE_LEN))
+ return parts or [text]
+
+
+# ---------------------------------------------------------------------------
+# Config
+# ---------------------------------------------------------------------------
+
+
+class IMessageConfig(Base):
+ """iMessage channel configuration."""
+
+ enabled: bool = False
+ local: bool = True
+ server_url: str = ""
+ api_key: str = ""
+ proxy: str | None = None
+ poll_interval: float = _DEFAULT_POLL_INTERVAL
+ database_path: str = _DEFAULT_DB_PATH
+ allow_from: list[str] = Field(default_factory=list)
+ group_policy: Literal["open", "ignore"] = "open"
+ reply_to_message: bool = False
+ react_tapback: str = "love"
+ done_tapback: str = ""
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+
+_PHOTON_PROXY_URL = "https://imessage-swagger.photon.codes"
+_PHOTON_KIT_PATTERN = ".imsgd.photon.codes"
+
+
+def _is_photon_kit_url(url: str) -> bool:
+ """Return True when *url* points to a Photon iMessage Kit server (upstream)."""
+ from urllib.parse import urlparse
+
+ return _PHOTON_KIT_PATTERN in (urlparse(url).hostname or "")
+
+
+def _make_bearer_token(server_url: str, api_key: str) -> str:
+ """Build the Bearer token expected by advanced-imessage-http-proxy.
+
+ If the key already decodes to a ``url|key`` pair it is used as-is.
+ """
+ try:
+ decoded = base64.b64decode(api_key, validate=True).decode()
+ if "|" in decoded:
+ return api_key
+ except Exception as e:
+ logger.debug("API key not pre-encoded, will encode: {}", type(e).__name__)
+ raw = f"{server_url}|{api_key}"
+ return base64.b64encode(raw.encode()).decode()
+
+
+def _resolve_proxy_url(server_url: str) -> str:
+ """Return the actual HTTP proxy base URL to use for API calls.
+
+ When the user provides a Photon iMessage Kit server URL (e.g.
+ ``https://xxxxx.imsgd.photon.codes``), requests must go through
+ the shared ``advanced-imessage-http-proxy`` at a fixed endpoint.
+ The original Kit URL is only used inside the Bearer token.
+ """
+ if _is_photon_kit_url(server_url):
+ logger.info(
+ "Photon Kit URL detected — routing through proxy at {} "
+ "(hosted by Photon, the same provider as your iMessage Kit server).",
+ _PHOTON_PROXY_URL,
+ )
+ return _PHOTON_PROXY_URL
+ return server_url
+
+
+def _extract_address(chat_id: str) -> str:
+ """Convert a chatGuid to the proxy's address format.
+
+ ``iMessage;-;+1234567890`` → ``+1234567890``
+ ``iMessage;+;chat123`` → ``group:chat123``
+ ``+1234567890`` → ``+1234567890`` (passthrough)
+ """
+ if ";-;" in chat_id:
+ return chat_id.split(";-;", 1)[1]
+ if ";+;" in chat_id:
+ return "group:" + chat_id.split(";+;", 1)[1]
+ return chat_id
+
+
+# ---------------------------------------------------------------------------
+# Channel
+# ---------------------------------------------------------------------------
+
+
+class IMessageChannel(BaseChannel):
+ """iMessage channel with local (macOS) and remote (Photon) modes.
+
+ Local mode reads from the native iMessage SQLite database and sends
+ via AppleScript — pure Python, no external dependencies.
+
+ Remote mode talks to a Photon ``advanced-imessage-http-proxy`` server.
+ See https://github.com/photon-hq/advanced-imessage-http-proxy for the
+ full API reference.
+ """
+
+ name = "imessage"
+ display_name = "iMessage"
+
+ @classmethod
+ def default_config(cls) -> dict[str, Any]:
+ return IMessageConfig().model_dump(by_alias=True)
+
+ def __init__(self, config: Any, bus: MessageBus):
+ if isinstance(config, dict):
+ config = IMessageConfig.model_validate(config)
+ super().__init__(config, bus)
+ self.config: IMessageConfig = config
+ self._processed_ids: OrderedDict[str, None] = OrderedDict()
+ self._http: httpx.AsyncClient | None = None
+ self._last_rowid: int = 0
+
+ # ---- lifecycle ---------------------------------------------------------
+
+ async def start(self) -> None:
+ if self.config.local:
+ await self._start_local()
+ else:
+ await self._start_remote()
+
+ async def stop(self) -> None:
+ self._running = False
+ if self._http:
+ await self._http.aclose()
+ self._http = None
+
+ async def send(self, msg: OutboundMessage) -> None:
+ if self.config.local:
+ await self._send_local(msg)
+ else:
+ await self._send_remote(msg)
+
+ # ======================================================================
+ # LOCAL MODE (macOS — sqlite3 + AppleScript)
+ # ======================================================================
+
+ async def _start_local(self) -> None:
+ if platform.system() != "Darwin":
+ logger.error("iMessage local mode requires macOS")
+ return
+
+ db_path = self.config.database_path
+ if not Path(db_path).exists():
+ logger.error(
+ "iMessage database not found at {}. "
+ "Ensure Full Disk Access is granted to your terminal.",
+ db_path,
+ )
+ return
+
+ self._running = True
+ self._last_rowid = self._get_max_rowid(db_path)
+ logger.info("iMessage local watcher started (polling {})", db_path)
+
+ while self._running:
+ try:
+ await self._poll_local_db(db_path)
+ except Exception as e:
+ logger.warning("iMessage local poll error: {}", e)
+ await asyncio.sleep(max(0.5, self.config.poll_interval))
+
+ def _get_max_rowid(self, db_path: str) -> int:
+ try:
+ with sqlite3.connect(db_path, uri=True) as conn:
+ cur = conn.execute("SELECT MAX(ROWID) FROM message")
+ row = cur.fetchone()
+ return row[0] or 0
+ except Exception:
+ return 0
+
+ async def _poll_local_db(self, db_path: str) -> None:
+ loop = asyncio.get_running_loop()
+ rows = await loop.run_in_executor(None, self._fetch_new_messages, db_path)
+ for row in rows:
+ await self._handle_local_message(row)
+ self._last_rowid = max(self._last_rowid, int(row["ROWID"]))
+
+ def _fetch_new_messages(self, db_path: str) -> list[dict[str, Any]]:
+ for attempt in range(3):
+ try:
+ return self._query_new_messages(db_path)
+ except sqlite3.OperationalError as e:
+ if "locked" in str(e) and attempt < 2:
+ import time
+
+ time.sleep(0.5 * (attempt + 1))
+ continue
+ raise
+ return []
+
+ def _query_new_messages(self, db_path: str) -> list[dict[str, Any]]:
+ with sqlite3.connect(db_path, uri=True, timeout=10) as conn:
+ conn.row_factory = sqlite3.Row
+ cur = conn.execute(
+ """
+ SELECT
+ m.ROWID,
+ m.guid,
+ m.text,
+ m.is_from_me,
+ m.date AS msg_date,
+ m.service,
+ h.id AS sender,
+ c.chat_identifier,
+ c.style AS chat_style,
+ a.ROWID AS att_rowid,
+ a.filename AS att_filename,
+ a.mime_type AS att_mime,
+ a.transfer_name AS att_transfer_name
+ FROM message m
+ LEFT JOIN handle h ON m.handle_id = h.ROWID
+ LEFT JOIN chat_message_join cmj ON m.ROWID = cmj.message_id
+ LEFT JOIN chat c ON cmj.chat_id = c.ROWID
+ LEFT JOIN message_attachment_join maj ON m.ROWID = maj.message_id
+ LEFT JOIN attachment a ON maj.attachment_id = a.ROWID
+ WHERE m.ROWID > ?
+ ORDER BY m.ROWID ASC
+ """,
+ (self._last_rowid,),
+ )
+ msg_map: dict[int, dict[str, Any]] = {}
+ for row in cur:
+ d = dict(row)
+ rowid = d["ROWID"]
+ if rowid not in msg_map:
+ msg_map[rowid] = {**d, "attachments": []}
+ if d.get("att_rowid"):
+ raw_path = d.get("att_filename") or ""
+ resolved = (
+ raw_path.replace("~", str(Path.home()), 1)
+ if raw_path.startswith("~")
+ else raw_path
+ )
+ msg_map[rowid]["attachments"].append(
+ {
+ "filename": resolved,
+ "mime_type": d.get("att_mime") or "",
+ "transfer_name": d.get("att_transfer_name") or "",
+ }
+ )
+ return list(msg_map.values())
+
+ async def _handle_local_message(self, row: dict[str, Any]) -> None:
+ if row.get("is_from_me"):
+ return
+
+ message_id = row.get("guid", "")
+ if self._is_seen(message_id):
+ return
+
+ sender = row.get("sender") or ""
+ chat_id = row.get("chat_identifier") or sender
+ content = row.get("text") or ""
+ is_group = (row.get("chat_style") or 0) == 43
+
+ if is_group and self.config.group_policy == "ignore":
+ return
+
+ media_paths: list[str] = []
+ for att in row.get("attachments") or []:
+ file_path = att.get("filename", "")
+ if not file_path or not Path(file_path).exists():
+ continue
+
+ mime = att.get("mime_type") or ""
+ ext = Path(file_path).suffix.lower()
+
+ if ext in _AUDIO_EXTENSIONS or mime.startswith("audio/"):
+ transcription = await self.transcribe_audio(file_path)
+ if transcription:
+ voice_tag = f"[Voice Message: {transcription}]"
+ content = f"{content}\n{voice_tag}" if content else voice_tag
+ continue
+
+ media_paths.append(file_path)
+ tag = "image" if mime.startswith("image/") else "file"
+ media_tag = f"[{tag}: {file_path}]"
+ content = f"{content}\n{media_tag}" if content else media_tag
+
+ await self._handle_message(
+ sender_id=sender,
+ chat_id=chat_id,
+ content=content,
+ media=media_paths,
+ metadata={
+ "message_id": message_id,
+ "service": row.get("service", "iMessage"),
+ "is_group": is_group,
+ "source": "local",
+ },
+ )
+ self._mark_seen(message_id)
+
+ async def _send_local(self, msg: OutboundMessage) -> None:
+ if (msg.metadata or {}).get("_progress"):
+ return
+ recipient = msg.chat_id
+ if msg.content:
+ for chunk in _split_paragraphs(msg.content):
+ await self._applescript_send_text(recipient, chunk)
+
+ for media_path in msg.media or []:
+ await self._applescript_send_file(recipient, media_path)
+
+ @staticmethod
+ def _escape_applescript(s: str) -> str:
+ return (
+ s.replace("\\", "\\\\")
+ .replace('"', '\\"')
+ .replace("\n", "\\n")
+ .replace("\r", "\\r")
+ .replace("\t", "\\t")
+ )
+
+ async def _applescript_send_text(self, recipient: str, text: str) -> None:
+ escaped_recipient = self._escape_applescript(recipient)
+ escaped_text = self._escape_applescript(text)
+ script = (
+ f'tell application "Messages"\n'
+ f" set targetService to 1st account whose service type = iMessage\n"
+ f' set targetBuddy to participant "{escaped_recipient}" of targetService\n'
+ f' send "{escaped_text}" to targetBuddy\n'
+ f"end tell"
+ )
+ await self._run_osascript(script)
+
+ async def _applescript_send_file(self, recipient: str, file_path: str) -> None:
+ escaped_recipient = self._escape_applescript(recipient)
+ escaped_path = self._escape_applescript(file_path)
+ script = (
+ f'tell application "Messages"\n'
+ f" set targetService to 1st account whose service type = iMessage\n"
+ f' set targetBuddy to participant "{escaped_recipient}" of targetService\n'
+ f' send POSIX file "{escaped_path}" to targetBuddy\n'
+ f"end tell"
+ )
+ await self._run_osascript(script)
+
+ async def _run_osascript(self, script: str) -> None:
+ loop = asyncio.get_running_loop()
+ try:
+ await loop.run_in_executor(
+ None,
+ lambda: subprocess.run(
+ ["osascript", "-e", script],
+ check=True,
+ capture_output=True,
+ timeout=15,
+ ),
+ )
+ except subprocess.CalledProcessError as e:
+ logger.error(
+ "AppleScript send failed: {}", e.stderr.decode()[:200] if e.stderr else str(e)
+ )
+ raise
+ except subprocess.TimeoutExpired:
+ logger.error("AppleScript send timed out")
+ raise
+
+ # ======================================================================
+ # REMOTE MODE (advanced-imessage-http-proxy)
+ # https://github.com/photon-hq/advanced-imessage-http-proxy
+ # ======================================================================
+
+ def _build_http_client(self) -> httpx.AsyncClient:
+ token = _make_bearer_token(self.config.server_url, self.config.api_key)
+ base_url = _resolve_proxy_url(self.config.server_url)
+ return httpx.AsyncClient(
+ base_url=base_url.rstrip("/"),
+ headers={"Authorization": f"Bearer {token}"},
+ proxy=self.config.proxy or None,
+ timeout=30.0,
+ )
+
+ async def _start_remote(self) -> None:
+ if not self.config.server_url:
+ logger.error("iMessage remote mode requires serverUrl")
+ return
+ if not self.config.api_key:
+ logger.error("iMessage remote mode requires apiKey")
+ return
+
+ self._running = True
+ self._http = self._build_http_client()
+
+ if not await self._api_health():
+ logger.error("iMessage server health check failed — will retry in poll loop")
+
+ await self._seed_existing_message_ids()
+ poll_interval = max(0.5, self.config.poll_interval)
+ logger.info(
+ "iMessage remote polling started ({}s interval, proxy={})",
+ poll_interval,
+ self.config.proxy or "none",
+ )
+
+ while self._running:
+ try:
+ await self._poll_remote()
+ except Exception as e:
+ logger.warning("iMessage remote poll error: {}", e)
+ await asyncio.sleep(poll_interval)
+
+ async def _seed_existing_message_ids(self) -> None:
+ """Mark all existing messages as seen so we only process new ones after startup."""
+ try:
+ resp = await self._api_get_messages(limit=100)
+ if resp and isinstance(resp, list):
+ for msg in resp:
+ msg_id = msg.get("id") or msg.get("guid", "")
+ if msg_id:
+ self._mark_seen(msg_id)
+ logger.info("Seeded {} existing message IDs", len(self._processed_ids))
+ except Exception as e:
+ logger.debug("Could not seed existing message IDs: {}", e)
+
+ # ---- inbound -----------------------------------------------------------
+
+ async def _poll_remote(self) -> None:
+ if not self._http:
+ return
+
+ messages = await self._api_get_messages(limit=50)
+ if not messages:
+ return
+
+ for msg in reversed(messages):
+ await self._handle_remote_message(msg)
+
+ async def _handle_remote_message(self, data: dict[str, Any]) -> None:
+ sender_raw = data.get("from") or ""
+ if sender_raw == "me" or data.get("isFromMe"):
+ return
+
+ message_id = data.get("id") or data.get("guid", "")
+ if self._is_seen(message_id):
+ return
+ self._mark_seen(message_id)
+
+ sender = sender_raw
+ if not sender:
+ handle = data.get("handle")
+ if isinstance(handle, dict):
+ sender = handle.get("address", "")
+
+ address = data.get("chat") or sender
+ if not address:
+ chats = data.get("chats") or []
+ chat_guid = chats[0].get("guid", "") if chats else ""
+ address = _extract_address(chat_guid) if chat_guid else sender
+
+ content = data.get("text") or ""
+ is_group = address.startswith("group:") or (";+;" in address)
+
+ if is_group and self.config.group_policy == "ignore":
+ return
+
+ await self._api_react(address, message_id, self.config.react_tapback)
+ await self._api_mark_read(address)
+
+ media_paths: list[str] = []
+ for att in data.get("attachments") or []:
+ att_guid = att.get("guid", "")
+ name = att.get("transferName") or att.get("filename") or ""
+ if not att_guid or not self._http:
+ continue
+
+ local_path = await self._api_download_attachment(att_guid, name)
+ if not local_path:
+ continue
+
+ mime, _ = mimetypes.guess_type(local_path)
+ ext = Path(local_path).suffix.lower()
+
+ if ext in _AUDIO_EXTENSIONS or (mime and mime.startswith("audio/")):
+ transcription = await self.transcribe_audio(local_path)
+ if transcription:
+ voice_tag = f"[Voice Message: {transcription}]"
+ content = f"{content}\n{voice_tag}" if content else voice_tag
+ continue
+
+ media_paths.append(local_path)
+ tag = "image" if mime and mime.startswith("image/") else "file"
+ media_tag = f"[{tag}: {local_path}]"
+ content = f"{content}\n{media_tag}" if content else media_tag
+
+ await self._handle_message(
+ sender_id=sender,
+ chat_id=address,
+ content=content,
+ media=media_paths,
+ metadata={
+ "message_id": message_id,
+ "is_group": is_group,
+ "source": "remote",
+ "timestamp": data.get("sentAt") or data.get("dateCreated"),
+ },
+ )
+
+ # ---- outbound ----------------------------------------------------------
+
+ async def _send_remote(self, msg: OutboundMessage) -> None:
+ if not self._http:
+ raise RuntimeError("iMessage remote HTTP client not initialised")
+
+ meta = msg.metadata or {}
+ if meta.get("_progress"):
+ return
+
+ to = msg.chat_id
+
+ await self._api_start_typing(to)
+
+ try:
+ if msg.content:
+ chunks = _split_paragraphs(msg.content)
+ for i, chunk in enumerate(chunks):
+ body: dict[str, Any] = {"to": to, "text": chunk, "service": "iMessage"}
+ if i == 0 and self.config.reply_to_message and msg.reply_to:
+ body["replyTo"] = msg.reply_to
+ if await self._api_send(body) is None:
+ raise RuntimeError(f"iMessage text delivery failed for {to}")
+
+ for media_path in msg.media or []:
+ if await self._api_send_file(to, media_path) is None:
+ raise RuntimeError(f"iMessage media delivery failed: {media_path}")
+ finally:
+ await self._api_stop_typing(to)
+
+ message_id = meta.get("message_id")
+ if message_id and self.config.react_tapback:
+ await self._api_remove_react(to, message_id, self.config.react_tapback)
+ if self.config.done_tapback:
+ await self._api_react(to, message_id, self.config.done_tapback)
+
+ # ======================================================================
+ # PROXY API METHODS
+ # https://github.com/photon-hq/advanced-imessage-http-proxy
+ # ======================================================================
+
+ # ---- messaging ---------------------------------------------------------
+
+ async def _api_send(self, body: dict[str, Any]) -> dict[str, Any] | None:
+ """``POST /send`` — send text message with optional effect / reply."""
+ return await self._post("/send", body)
+
+ async def _api_send_file(
+ self, to: str, file_path: str, audio: bool | None = None
+ ) -> dict[str, Any] | None:
+ """``POST /send/file`` — send attachment (image, file, audio message)."""
+ if not self._http:
+ return None
+ if not Path(file_path).exists():
+ logger.warning("iMessage attachment not found: {}", file_path)
+ return None
+ mime, _ = mimetypes.guess_type(file_path)
+ ext = Path(file_path).suffix.lower()
+ is_audio = (
+ audio
+ if audio is not None
+ else (ext in _AUDIO_EXTENSIONS or (mime or "").startswith("audio/"))
+ )
+ data: dict[str, str] = {"to": to}
+ if is_audio:
+ data["audio"] = "true"
+ with open(file_path, "rb") as f:
+ resp = await self._http.post(
+ "/send/file",
+ data=data,
+ files={"file": (Path(file_path).name, f, mime or "application/octet-stream")},
+ )
+ return self._unwrap(resp)
+
+ async def _api_send_sticker(
+ self,
+ to: str,
+ file_path: str,
+ reply_to: str | None = None,
+ **kwargs: Any,
+ ) -> dict[str, Any] | None:
+ """``POST /send/sticker`` — send standalone or reply sticker."""
+ if not self._http:
+ return None
+ data: dict[str, str] = {"to": to}
+ if reply_to:
+ data["replyTo"] = reply_to
+ for k in ("stickerX", "stickerY", "stickerScale", "stickerRotation", "stickerWidth"):
+ if k in kwargs:
+ data[k] = str(kwargs[k])
+ with open(file_path, "rb") as f:
+ resp = await self._http.post(
+ "/send/sticker",
+ data=data,
+ files={"file": (Path(file_path).name, f, "image/png")},
+ )
+ return self._unwrap(resp)
+
+ async def _api_unsend(self, message_id: str) -> dict[str, Any] | None:
+ """``DELETE /messages/:id`` — retract a sent message."""
+ return await self._delete(f"/messages/{message_id}")
+
+ # ---- reactions ---------------------------------------------------------
+
+ async def _api_react(self, chat: str, message_id: str, tapback: str) -> None:
+ """``POST /messages/:id/react`` — add tapback (best-effort)."""
+ if not self._http or not tapback:
+ return
+ try:
+ await self._http.post(
+ f"/messages/{message_id}/react",
+ json={"chat": chat, "type": tapback},
+ )
+ except Exception as e:
+ logger.debug("iMessage tapback failed: {}", e)
+
+ async def _api_remove_react(self, chat: str, message_id: str, tapback: str) -> None:
+ """``DELETE /messages/:id/react`` — remove tapback (best-effort)."""
+ if not self._http or not tapback:
+ return
+ try:
+ await self._http.request(
+ "DELETE",
+ f"/messages/{message_id}/react",
+ json={"chat": chat, "type": tapback},
+ )
+ except Exception as e:
+ logger.debug("iMessage remove tapback failed: {}", e)
+
+ # ---- messages ----------------------------------------------------------
+
+ async def _api_get_messages(
+ self,
+ limit: int = 50,
+ chat: str | None = None,
+ ) -> list[dict[str, Any]]:
+ """``GET /messages`` — query messages."""
+ params: dict[str, Any] = {"limit": limit}
+ if chat:
+ params["chat"] = chat
+ data = await self._get("/messages", params=params)
+ return data if isinstance(data, list) else []
+
+ async def _api_search_messages(
+ self, query: str, chat: str | None = None
+ ) -> list[dict[str, Any]]:
+ """``GET /messages/search`` — search messages by text."""
+ params: dict[str, Any] = {"q": query}
+ if chat:
+ params["chat"] = chat
+ data = await self._get("/messages/search", params=params)
+ return data if isinstance(data, list) else []
+
+ async def _api_get_message(self, message_id: str) -> dict[str, Any] | None:
+ """``GET /messages/:id`` — get single message details."""
+ data = await self._get(f"/messages/{message_id}")
+ return data if isinstance(data, dict) else None
+
+ # ---- chats -------------------------------------------------------------
+
+ async def _api_get_chats(self) -> list[dict[str, Any]]:
+ """``GET /chats`` — list all conversations."""
+ data = await self._get("/chats")
+ return data if isinstance(data, list) else []
+
+ async def _api_get_chat(self, address: str) -> dict[str, Any] | None:
+ """``GET /chats/:id`` — get chat details."""
+ data = await self._get(f"/chats/{address}")
+ return data if isinstance(data, dict) else None
+
+ async def _api_get_chat_messages(
+ self,
+ address: str,
+ limit: int = 50,
+ ) -> list[dict[str, Any]]:
+ """``GET /chats/:id/messages`` — get message history for a chat."""
+ data = await self._get(f"/chats/{address}/messages", params={"limit": limit})
+ return data if isinstance(data, list) else []
+
+ async def _api_get_chat_participants(self, address: str) -> list[dict[str, Any]]:
+ """``GET /chats/:id/participants`` — get group participants."""
+ data = await self._get(f"/chats/{address}/participants")
+ return data if isinstance(data, list) else []
+
+ async def _api_mark_read(self, address: str) -> None:
+ """``POST /chats/:id/read`` — clear unread badge."""
+ if not self._http:
+ return
+ try:
+ await self._http.post(f"/chats/{address}/read")
+ except Exception as e:
+ logger.debug("iMessage mark-read failed: {}", e)
+
+ async def _api_start_typing(self, address: str) -> None:
+ """``POST /chats/:id/typing`` — show typing indicator."""
+ if not self._http:
+ return
+ try:
+ await self._http.post(f"/chats/{address}/typing")
+ except Exception as e:
+ logger.debug("iMessage typing start failed: {}", e)
+
+ async def _api_stop_typing(self, address: str) -> None:
+ """``DELETE /chats/:id/typing`` — stop typing indicator."""
+ if not self._http:
+ return
+ try:
+ await self._http.request("DELETE", f"/chats/{address}/typing")
+ except Exception as e:
+ logger.debug("iMessage typing stop failed: {}", e)
+
+ # ---- groups ------------------------------------------------------------
+
+ async def _api_create_group(
+ self, members: list[str], name: str | None = None
+ ) -> dict[str, Any] | None:
+ """``POST /groups`` — create a group chat."""
+ body: dict[str, Any] = {"members": members}
+ if name:
+ body["name"] = name
+ return await self._post("/groups", body)
+
+ async def _api_update_group(self, group_id: str, name: str) -> dict[str, Any] | None:
+ """``PATCH /groups/:id`` — rename a group."""
+ if not self._http:
+ return None
+ resp = await self._http.patch(f"/groups/{group_id}", json={"name": name})
+ return self._unwrap(resp)
+
+ # ---- polls -------------------------------------------------------------
+
+ async def _api_create_poll(
+ self,
+ to: str,
+ question: str,
+ options: list[str],
+ ) -> dict[str, Any] | None:
+ """``POST /polls`` — create an interactive poll."""
+ return await self._post("/polls", {"to": to, "question": question, "options": options})
+
+ async def _api_get_poll(self, poll_id: str) -> dict[str, Any] | None:
+ """``GET /polls/:id`` — get poll details."""
+ data = await self._get(f"/polls/{poll_id}")
+ return data if isinstance(data, dict) else None
+
+ async def _api_vote_poll(
+ self, poll_id: str, chat: str, option_id: str
+ ) -> dict[str, Any] | None:
+ """``POST /polls/:id/vote`` — vote on a poll option."""
+ return await self._post(f"/polls/{poll_id}/vote", {"chat": chat, "optionId": option_id})
+
+ async def _api_unvote_poll(
+ self, poll_id: str, chat: str, option_id: str
+ ) -> dict[str, Any] | None:
+ """``POST /polls/:id/unvote`` — remove vote from poll."""
+ return await self._post(f"/polls/{poll_id}/unvote", {"chat": chat, "optionId": option_id})
+
+ async def _api_add_poll_option(
+ self, poll_id: str, chat: str, text: str
+ ) -> dict[str, Any] | None:
+ """``POST /polls/:id/options`` — add option to existing poll."""
+ return await self._post(f"/polls/{poll_id}/options", {"chat": chat, "text": text})
+
+ # ---- attachments -------------------------------------------------------
+
+ async def _api_download_attachment(self, att_guid: str, filename: str) -> str | None:
+ """``GET /attachments/:id`` — download to local media dir."""
+ if not self._http:
+ return None
+ try:
+ resp = await self._http.get(f"/attachments/{att_guid}")
+ if not resp.is_success:
+ return None
+ media_dir = get_media_dir("imessage")
+ sanitized_guid = att_guid.replace("/", "_").replace("\\", "_").replace("\x00", "")
+ safe_name = Path(filename).name.replace("\x00", "") if filename else ""
+ if not safe_name:
+ safe_name = f"{sanitized_guid}.bin"
+ dest = (media_dir / safe_name).resolve()
+ if not dest.is_relative_to(media_dir.resolve()):
+ dest = (media_dir / f"{sanitized_guid}.bin").resolve()
+ dest.write_bytes(resp.content)
+ return str(dest)
+ except Exception as e:
+ logger.warning("Failed to download iMessage attachment {}: {}", att_guid, e)
+ return None
+
+ async def _api_attachment_info(self, att_guid: str) -> dict[str, Any] | None:
+ """``GET /attachments/:id/info`` — get attachment metadata."""
+ data = await self._get(f"/attachments/{att_guid}/info")
+ return data if isinstance(data, dict) else None
+
+ # ---- contacts & handles ------------------------------------------------
+
+ async def _api_check_imessage(self, address: str) -> bool:
+ """``GET /check/:address`` — check if address uses iMessage."""
+ data = await self._get(f"/check/{address}")
+ if isinstance(data, dict):
+ return bool(data.get("available") or data.get("imessage"))
+ return False
+
+ async def _api_get_contacts(self) -> list[dict[str, Any]]:
+ """``GET /contacts`` — list device contacts."""
+ data = await self._get("/contacts")
+ return data if isinstance(data, list) else []
+
+ async def _api_get_handles(self) -> list[dict[str, Any]]:
+ """``GET /handles`` — list known handles."""
+ data = await self._get("/handles")
+ return data if isinstance(data, list) else []
+
+ # ---- server ------------------------------------------------------------
+
+ async def _api_server_info(self) -> dict[str, Any] | None:
+ """``GET /server`` — get server info."""
+ data = await self._get("/server")
+ return data if isinstance(data, dict) else None
+
+ async def _api_health(self) -> bool:
+ """``GET /health`` — basic health check."""
+ if not self._http:
+ return False
+ try:
+ resp = await self._http.get("/health")
+ if resp.is_success:
+ logger.info("iMessage server health check passed")
+ return True
+ logger.warning("iMessage health check HTTP {}", resp.status_code)
+ except Exception as e:
+ logger.warning("iMessage health check failed: {}", e)
+ return False
+
+ # ---- HTTP helpers ------------------------------------------------------
+
+ async def _get(self, path: str, params: dict[str, Any] | None = None) -> Any:
+ if not self._http:
+ return None
+ try:
+ resp = await self._http.get(path, params=params)
+ return self._unwrap(resp)
+ except Exception as e:
+ logger.warning("iMessage GET {} failed: {}", path, e)
+ return None
+
+ async def _post(self, path: str, body: dict[str, Any]) -> dict[str, Any] | None:
+ if not self._http:
+ return None
+ try:
+ resp = await self._http.post(path, json=body)
+ if not resp.is_success:
+ logger.warning(
+ "iMessage POST {} HTTP {}: {}", path, resp.status_code, resp.text[:200]
+ )
+ return self._unwrap(resp)
+ except Exception as e:
+ logger.warning("iMessage POST {} failed: {}", path, e)
+ raise
+
+ async def _delete(self, path: str, body: dict[str, Any] | None = None) -> dict[str, Any] | None:
+ if not self._http:
+ return None
+ try:
+ resp = await self._http.request("DELETE", path, json=body)
+ return self._unwrap(resp)
+ except Exception as e:
+ logger.warning("iMessage DELETE {} failed: {}", path, e)
+ return None
+
+ @staticmethod
+ def _unwrap(resp: httpx.Response) -> Any:
+ """Unwrap the proxy's ``{"ok": true, "data": ...}`` envelope."""
+ if not resp.is_success:
+ return None
+ try:
+ body = resp.json()
+ except Exception:
+ logger.debug("iMessage server returned non-JSON response")
+ return None
+ if isinstance(body, dict) and "data" in body:
+ return body["data"]
+ return body
+
+ # ---- dedup helper ------------------------------------------------------
+
+ def _is_seen(self, message_id: str) -> bool:
+ if not message_id:
+ return False
+ return message_id in self._processed_ids
+
+ def _mark_seen(self, message_id: str) -> None:
+ if not message_id:
+ return
+ self._processed_ids[message_id] = None
+ while len(self._processed_ids) > 1000:
+ self._processed_ids.popitem(last=False)