mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-04-27 21:35:51 +00:00
Make channel delivery failures raise consistently so retry policy lives in ChannelManager rather than being split across individual channels. Tighten Telegram stream finalization, clarify sendMaxRetries semantics, and align the docs with the behavior the system actually guarantees.
948 lines
37 KiB
Python
948 lines
37 KiB
Python
"""Mochat channel implementation using Socket.IO with HTTP polling fallback."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
from collections import deque
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
import httpx
|
|
from loguru import logger
|
|
|
|
from nanobot.bus.events import OutboundMessage
|
|
from nanobot.bus.queue import MessageBus
|
|
from nanobot.channels.base import BaseChannel
|
|
from nanobot.config.paths import get_runtime_subdir
|
|
from nanobot.config.schema import Base
|
|
from pydantic import Field
|
|
|
|
try:
|
|
import socketio
|
|
SOCKETIO_AVAILABLE = True
|
|
except ImportError:
|
|
socketio = None
|
|
SOCKETIO_AVAILABLE = False
|
|
|
|
try:
|
|
import msgpack # noqa: F401
|
|
MSGPACK_AVAILABLE = True
|
|
except ImportError:
|
|
MSGPACK_AVAILABLE = False
|
|
|
|
MAX_SEEN_MESSAGE_IDS = 2000
|
|
CURSOR_SAVE_DEBOUNCE_S = 0.5
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data classes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class MochatBufferedEntry:
|
|
"""Buffered inbound entry for delayed dispatch."""
|
|
raw_body: str
|
|
author: str
|
|
sender_name: str = ""
|
|
sender_username: str = ""
|
|
timestamp: int | None = None
|
|
message_id: str = ""
|
|
group_id: str = ""
|
|
|
|
|
|
@dataclass
|
|
class DelayState:
|
|
"""Per-target delayed message state."""
|
|
entries: list[MochatBufferedEntry] = field(default_factory=list)
|
|
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
|
|
timer: asyncio.Task | None = None
|
|
|
|
|
|
@dataclass
|
|
class MochatTarget:
|
|
"""Outbound target resolution result."""
|
|
id: str
|
|
is_panel: bool
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pure helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _safe_dict(value: Any) -> dict:
|
|
"""Return *value* if it's a dict, else empty dict."""
|
|
return value if isinstance(value, dict) else {}
|
|
|
|
|
|
def _str_field(src: dict, *keys: str) -> str:
|
|
"""Return the first non-empty str value found for *keys*, stripped."""
|
|
for k in keys:
|
|
v = src.get(k)
|
|
if isinstance(v, str) and v.strip():
|
|
return v.strip()
|
|
return ""
|
|
|
|
|
|
def _make_synthetic_event(
|
|
message_id: str, author: str, content: Any,
|
|
meta: Any, group_id: str, converse_id: str,
|
|
timestamp: Any = None, *, author_info: Any = None,
|
|
) -> dict[str, Any]:
|
|
"""Build a synthetic ``message.add`` event dict."""
|
|
payload: dict[str, Any] = {
|
|
"messageId": message_id, "author": author,
|
|
"content": content, "meta": _safe_dict(meta),
|
|
"groupId": group_id, "converseId": converse_id,
|
|
}
|
|
if author_info is not None:
|
|
payload["authorInfo"] = _safe_dict(author_info)
|
|
return {
|
|
"type": "message.add",
|
|
"timestamp": timestamp or datetime.utcnow().isoformat(),
|
|
"payload": payload,
|
|
}
|
|
|
|
|
|
def normalize_mochat_content(content: Any) -> str:
|
|
"""Normalize content payload to text."""
|
|
if isinstance(content, str):
|
|
return content.strip()
|
|
if content is None:
|
|
return ""
|
|
try:
|
|
return json.dumps(content, ensure_ascii=False)
|
|
except TypeError:
|
|
return str(content)
|
|
|
|
|
|
def resolve_mochat_target(raw: str) -> MochatTarget:
|
|
"""Resolve id and target kind from user-provided target string."""
|
|
trimmed = (raw or "").strip()
|
|
if not trimmed:
|
|
return MochatTarget(id="", is_panel=False)
|
|
|
|
lowered = trimmed.lower()
|
|
cleaned, forced_panel = trimmed, False
|
|
for prefix in ("mochat:", "group:", "channel:", "panel:"):
|
|
if lowered.startswith(prefix):
|
|
cleaned = trimmed[len(prefix):].strip()
|
|
forced_panel = prefix in {"group:", "channel:", "panel:"}
|
|
break
|
|
|
|
if not cleaned:
|
|
return MochatTarget(id="", is_panel=False)
|
|
return MochatTarget(id=cleaned, is_panel=forced_panel or not cleaned.startswith("session_"))
|
|
|
|
|
|
def extract_mention_ids(value: Any) -> list[str]:
|
|
"""Extract mention ids from heterogeneous mention payload."""
|
|
if not isinstance(value, list):
|
|
return []
|
|
ids: list[str] = []
|
|
for item in value:
|
|
if isinstance(item, str):
|
|
if item.strip():
|
|
ids.append(item.strip())
|
|
elif isinstance(item, dict):
|
|
for key in ("id", "userId", "_id"):
|
|
candidate = item.get(key)
|
|
if isinstance(candidate, str) and candidate.strip():
|
|
ids.append(candidate.strip())
|
|
break
|
|
return ids
|
|
|
|
|
|
def resolve_was_mentioned(payload: dict[str, Any], agent_user_id: str) -> bool:
|
|
"""Resolve mention state from payload metadata and text fallback."""
|
|
meta = payload.get("meta")
|
|
if isinstance(meta, dict):
|
|
if meta.get("mentioned") is True or meta.get("wasMentioned") is True:
|
|
return True
|
|
for f in ("mentions", "mentionIds", "mentionedUserIds", "mentionedUsers"):
|
|
if agent_user_id and agent_user_id in extract_mention_ids(meta.get(f)):
|
|
return True
|
|
if not agent_user_id:
|
|
return False
|
|
content = payload.get("content")
|
|
if not isinstance(content, str) or not content:
|
|
return False
|
|
return f"<@{agent_user_id}>" in content or f"@{agent_user_id}" in content
|
|
|
|
|
|
def resolve_require_mention(config: MochatConfig, session_id: str, group_id: str) -> bool:
|
|
"""Resolve mention requirement for group/panel conversations."""
|
|
groups = config.groups or {}
|
|
for key in (group_id, session_id, "*"):
|
|
if key and key in groups:
|
|
return bool(groups[key].require_mention)
|
|
return bool(config.mention.require_in_groups)
|
|
|
|
|
|
def build_buffered_body(entries: list[MochatBufferedEntry], is_group: bool) -> str:
|
|
"""Build text body from one or more buffered entries."""
|
|
if not entries:
|
|
return ""
|
|
if len(entries) == 1:
|
|
return entries[0].raw_body
|
|
lines: list[str] = []
|
|
for entry in entries:
|
|
if not entry.raw_body:
|
|
continue
|
|
if is_group:
|
|
label = entry.sender_name.strip() or entry.sender_username.strip() or entry.author
|
|
if label:
|
|
lines.append(f"{label}: {entry.raw_body}")
|
|
continue
|
|
lines.append(entry.raw_body)
|
|
return "\n".join(lines).strip()
|
|
|
|
|
|
def parse_timestamp(value: Any) -> int | None:
|
|
"""Parse event timestamp to epoch milliseconds."""
|
|
if not isinstance(value, str) or not value.strip():
|
|
return None
|
|
try:
|
|
return int(datetime.fromisoformat(value.replace("Z", "+00:00")).timestamp() * 1000)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config classes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class MochatMentionConfig(Base):
|
|
"""Mochat mention behavior configuration."""
|
|
|
|
require_in_groups: bool = False
|
|
|
|
|
|
class MochatGroupRule(Base):
|
|
"""Mochat per-group mention requirement."""
|
|
|
|
require_mention: bool = False
|
|
|
|
|
|
class MochatConfig(Base):
|
|
"""Mochat channel configuration."""
|
|
|
|
enabled: bool = False
|
|
base_url: str = "https://mochat.io"
|
|
socket_url: str = ""
|
|
socket_path: str = "/socket.io"
|
|
socket_disable_msgpack: bool = False
|
|
socket_reconnect_delay_ms: int = 1000
|
|
socket_max_reconnect_delay_ms: int = 10000
|
|
socket_connect_timeout_ms: int = 10000
|
|
refresh_interval_ms: int = 30000
|
|
watch_timeout_ms: int = 25000
|
|
watch_limit: int = 100
|
|
retry_delay_ms: int = 500
|
|
max_retry_attempts: int = 0
|
|
claw_token: str = ""
|
|
agent_user_id: str = ""
|
|
sessions: list[str] = Field(default_factory=list)
|
|
panels: list[str] = Field(default_factory=list)
|
|
allow_from: list[str] = Field(default_factory=list)
|
|
mention: MochatMentionConfig = Field(default_factory=MochatMentionConfig)
|
|
groups: dict[str, MochatGroupRule] = Field(default_factory=dict)
|
|
reply_delay_mode: str = "non-mention"
|
|
reply_delay_ms: int = 120000
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Channel
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class MochatChannel(BaseChannel):
|
|
"""Mochat channel using socket.io with fallback polling workers."""
|
|
|
|
name = "mochat"
|
|
display_name = "Mochat"
|
|
|
|
@classmethod
|
|
def default_config(cls) -> dict[str, Any]:
|
|
return MochatConfig().model_dump(by_alias=True)
|
|
|
|
def __init__(self, config: Any, bus: MessageBus):
|
|
if isinstance(config, dict):
|
|
config = MochatConfig.model_validate(config)
|
|
super().__init__(config, bus)
|
|
self.config: MochatConfig = config
|
|
self._http: httpx.AsyncClient | None = None
|
|
self._socket: Any = None
|
|
self._ws_connected = self._ws_ready = False
|
|
|
|
self._state_dir = get_runtime_subdir("mochat")
|
|
self._cursor_path = self._state_dir / "session_cursors.json"
|
|
self._session_cursor: dict[str, int] = {}
|
|
self._cursor_save_task: asyncio.Task | None = None
|
|
|
|
self._session_set: set[str] = set()
|
|
self._panel_set: set[str] = set()
|
|
self._auto_discover_sessions = self._auto_discover_panels = False
|
|
|
|
self._cold_sessions: set[str] = set()
|
|
self._session_by_converse: dict[str, str] = {}
|
|
|
|
self._seen_set: dict[str, set[str]] = {}
|
|
self._seen_queue: dict[str, deque[str]] = {}
|
|
self._delay_states: dict[str, DelayState] = {}
|
|
|
|
self._fallback_mode = False
|
|
self._session_fallback_tasks: dict[str, asyncio.Task] = {}
|
|
self._panel_fallback_tasks: dict[str, asyncio.Task] = {}
|
|
self._refresh_task: asyncio.Task | None = None
|
|
self._target_locks: dict[str, asyncio.Lock] = {}
|
|
|
|
# ---- lifecycle ---------------------------------------------------------
|
|
|
|
async def start(self) -> None:
|
|
"""Start Mochat channel workers and websocket connection."""
|
|
if not self.config.claw_token:
|
|
logger.error("Mochat claw_token not configured")
|
|
return
|
|
|
|
self._running = True
|
|
self._http = httpx.AsyncClient(timeout=30.0)
|
|
self._state_dir.mkdir(parents=True, exist_ok=True)
|
|
await self._load_session_cursors()
|
|
self._seed_targets_from_config()
|
|
await self._refresh_targets(subscribe_new=False)
|
|
|
|
if not await self._start_socket_client():
|
|
await self._ensure_fallback_workers()
|
|
|
|
self._refresh_task = asyncio.create_task(self._refresh_loop())
|
|
while self._running:
|
|
await asyncio.sleep(1)
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop all workers and clean up resources."""
|
|
self._running = False
|
|
if self._refresh_task:
|
|
self._refresh_task.cancel()
|
|
self._refresh_task = None
|
|
|
|
await self._stop_fallback_workers()
|
|
await self._cancel_delay_timers()
|
|
|
|
if self._socket:
|
|
try:
|
|
await self._socket.disconnect()
|
|
except Exception:
|
|
pass
|
|
self._socket = None
|
|
|
|
if self._cursor_save_task:
|
|
self._cursor_save_task.cancel()
|
|
self._cursor_save_task = None
|
|
await self._save_session_cursors()
|
|
|
|
if self._http:
|
|
await self._http.aclose()
|
|
self._http = None
|
|
self._ws_connected = self._ws_ready = False
|
|
|
|
async def send(self, msg: OutboundMessage) -> None:
|
|
"""Send outbound message to session or panel."""
|
|
if not self.config.claw_token:
|
|
logger.warning("Mochat claw_token missing, skip send")
|
|
return
|
|
|
|
parts = ([msg.content.strip()] if msg.content and msg.content.strip() else [])
|
|
if msg.media:
|
|
parts.extend(m for m in msg.media if isinstance(m, str) and m.strip())
|
|
content = "\n".join(parts).strip()
|
|
if not content:
|
|
return
|
|
|
|
target = resolve_mochat_target(msg.chat_id)
|
|
if not target.id:
|
|
logger.warning("Mochat outbound target is empty")
|
|
return
|
|
|
|
is_panel = (target.is_panel or target.id in self._panel_set) and not target.id.startswith("session_")
|
|
try:
|
|
if is_panel:
|
|
await self._api_send("/api/claw/groups/panels/send", "panelId", target.id,
|
|
content, msg.reply_to, self._read_group_id(msg.metadata))
|
|
else:
|
|
await self._api_send("/api/claw/sessions/send", "sessionId", target.id,
|
|
content, msg.reply_to)
|
|
except Exception as e:
|
|
logger.error("Failed to send Mochat message: {}", e)
|
|
raise
|
|
|
|
# ---- config / init helpers ---------------------------------------------
|
|
|
|
def _seed_targets_from_config(self) -> None:
|
|
sessions, self._auto_discover_sessions = self._normalize_id_list(self.config.sessions)
|
|
panels, self._auto_discover_panels = self._normalize_id_list(self.config.panels)
|
|
self._session_set.update(sessions)
|
|
self._panel_set.update(panels)
|
|
for sid in sessions:
|
|
if sid not in self._session_cursor:
|
|
self._cold_sessions.add(sid)
|
|
|
|
@staticmethod
|
|
def _normalize_id_list(values: list[str]) -> tuple[list[str], bool]:
|
|
cleaned = [str(v).strip() for v in values if str(v).strip()]
|
|
return sorted({v for v in cleaned if v != "*"}), "*" in cleaned
|
|
|
|
# ---- websocket ---------------------------------------------------------
|
|
|
|
async def _start_socket_client(self) -> bool:
|
|
if not SOCKETIO_AVAILABLE:
|
|
logger.warning("python-socketio not installed, Mochat using polling fallback")
|
|
return False
|
|
|
|
serializer = "default"
|
|
if not self.config.socket_disable_msgpack:
|
|
if MSGPACK_AVAILABLE:
|
|
serializer = "msgpack"
|
|
else:
|
|
logger.warning("msgpack not installed but socket_disable_msgpack=false; using JSON")
|
|
|
|
client = socketio.AsyncClient(
|
|
reconnection=True,
|
|
reconnection_attempts=self.config.max_retry_attempts or None,
|
|
reconnection_delay=max(0.1, self.config.socket_reconnect_delay_ms / 1000.0),
|
|
reconnection_delay_max=max(0.1, self.config.socket_max_reconnect_delay_ms / 1000.0),
|
|
logger=False, engineio_logger=False, serializer=serializer,
|
|
)
|
|
|
|
@client.event
|
|
async def connect() -> None:
|
|
self._ws_connected, self._ws_ready = True, False
|
|
logger.info("Mochat websocket connected")
|
|
subscribed = await self._subscribe_all()
|
|
self._ws_ready = subscribed
|
|
await (self._stop_fallback_workers() if subscribed else self._ensure_fallback_workers())
|
|
|
|
@client.event
|
|
async def disconnect() -> None:
|
|
if not self._running:
|
|
return
|
|
self._ws_connected = self._ws_ready = False
|
|
logger.warning("Mochat websocket disconnected")
|
|
await self._ensure_fallback_workers()
|
|
|
|
@client.event
|
|
async def connect_error(data: Any) -> None:
|
|
logger.error("Mochat websocket connect error: {}", data)
|
|
|
|
@client.on("claw.session.events")
|
|
async def on_session_events(payload: dict[str, Any]) -> None:
|
|
await self._handle_watch_payload(payload, "session")
|
|
|
|
@client.on("claw.panel.events")
|
|
async def on_panel_events(payload: dict[str, Any]) -> None:
|
|
await self._handle_watch_payload(payload, "panel")
|
|
|
|
for ev in ("notify:chat.inbox.append", "notify:chat.message.add",
|
|
"notify:chat.message.update", "notify:chat.message.recall",
|
|
"notify:chat.message.delete"):
|
|
client.on(ev, self._build_notify_handler(ev))
|
|
|
|
socket_url = (self.config.socket_url or self.config.base_url).strip().rstrip("/")
|
|
socket_path = (self.config.socket_path or "/socket.io").strip().lstrip("/")
|
|
|
|
try:
|
|
self._socket = client
|
|
await client.connect(
|
|
socket_url, transports=["websocket"], socketio_path=socket_path,
|
|
auth={"token": self.config.claw_token},
|
|
wait_timeout=max(1.0, self.config.socket_connect_timeout_ms / 1000.0),
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Failed to connect Mochat websocket: {}", e)
|
|
try:
|
|
await client.disconnect()
|
|
except Exception:
|
|
pass
|
|
self._socket = None
|
|
return False
|
|
|
|
def _build_notify_handler(self, event_name: str):
|
|
async def handler(payload: Any) -> None:
|
|
if event_name == "notify:chat.inbox.append":
|
|
await self._handle_notify_inbox_append(payload)
|
|
elif event_name.startswith("notify:chat.message."):
|
|
await self._handle_notify_chat_message(payload)
|
|
return handler
|
|
|
|
# ---- subscribe ---------------------------------------------------------
|
|
|
|
async def _subscribe_all(self) -> bool:
|
|
ok = await self._subscribe_sessions(sorted(self._session_set))
|
|
ok = await self._subscribe_panels(sorted(self._panel_set)) and ok
|
|
if self._auto_discover_sessions or self._auto_discover_panels:
|
|
await self._refresh_targets(subscribe_new=True)
|
|
return ok
|
|
|
|
async def _subscribe_sessions(self, session_ids: list[str]) -> bool:
|
|
if not session_ids:
|
|
return True
|
|
for sid in session_ids:
|
|
if sid not in self._session_cursor:
|
|
self._cold_sessions.add(sid)
|
|
|
|
ack = await self._socket_call("com.claw.im.subscribeSessions", {
|
|
"sessionIds": session_ids, "cursors": self._session_cursor,
|
|
"limit": self.config.watch_limit,
|
|
})
|
|
if not ack.get("result"):
|
|
logger.error("Mochat subscribeSessions failed: {}", ack.get('message', 'unknown error'))
|
|
return False
|
|
|
|
data = ack.get("data")
|
|
items: list[dict[str, Any]] = []
|
|
if isinstance(data, list):
|
|
items = [i for i in data if isinstance(i, dict)]
|
|
elif isinstance(data, dict):
|
|
sessions = data.get("sessions")
|
|
if isinstance(sessions, list):
|
|
items = [i for i in sessions if isinstance(i, dict)]
|
|
elif "sessionId" in data:
|
|
items = [data]
|
|
for p in items:
|
|
await self._handle_watch_payload(p, "session")
|
|
return True
|
|
|
|
async def _subscribe_panels(self, panel_ids: list[str]) -> bool:
|
|
if not self._auto_discover_panels and not panel_ids:
|
|
return True
|
|
ack = await self._socket_call("com.claw.im.subscribePanels", {"panelIds": panel_ids})
|
|
if not ack.get("result"):
|
|
logger.error("Mochat subscribePanels failed: {}", ack.get('message', 'unknown error'))
|
|
return False
|
|
return True
|
|
|
|
async def _socket_call(self, event_name: str, payload: dict[str, Any]) -> dict[str, Any]:
|
|
if not self._socket:
|
|
return {"result": False, "message": "socket not connected"}
|
|
try:
|
|
raw = await self._socket.call(event_name, payload, timeout=10)
|
|
except Exception as e:
|
|
return {"result": False, "message": str(e)}
|
|
return raw if isinstance(raw, dict) else {"result": True, "data": raw}
|
|
|
|
# ---- refresh / discovery -----------------------------------------------
|
|
|
|
async def _refresh_loop(self) -> None:
|
|
interval_s = max(1.0, self.config.refresh_interval_ms / 1000.0)
|
|
while self._running:
|
|
await asyncio.sleep(interval_s)
|
|
try:
|
|
await self._refresh_targets(subscribe_new=self._ws_ready)
|
|
except Exception as e:
|
|
logger.warning("Mochat refresh failed: {}", e)
|
|
if self._fallback_mode:
|
|
await self._ensure_fallback_workers()
|
|
|
|
async def _refresh_targets(self, subscribe_new: bool) -> None:
|
|
if self._auto_discover_sessions:
|
|
await self._refresh_sessions_directory(subscribe_new)
|
|
if self._auto_discover_panels:
|
|
await self._refresh_panels(subscribe_new)
|
|
|
|
async def _refresh_sessions_directory(self, subscribe_new: bool) -> None:
|
|
try:
|
|
response = await self._post_json("/api/claw/sessions/list", {})
|
|
except Exception as e:
|
|
logger.warning("Mochat listSessions failed: {}", e)
|
|
return
|
|
|
|
sessions = response.get("sessions")
|
|
if not isinstance(sessions, list):
|
|
return
|
|
|
|
new_ids: list[str] = []
|
|
for s in sessions:
|
|
if not isinstance(s, dict):
|
|
continue
|
|
sid = _str_field(s, "sessionId")
|
|
if not sid:
|
|
continue
|
|
if sid not in self._session_set:
|
|
self._session_set.add(sid)
|
|
new_ids.append(sid)
|
|
if sid not in self._session_cursor:
|
|
self._cold_sessions.add(sid)
|
|
cid = _str_field(s, "converseId")
|
|
if cid:
|
|
self._session_by_converse[cid] = sid
|
|
|
|
if not new_ids:
|
|
return
|
|
if self._ws_ready and subscribe_new:
|
|
await self._subscribe_sessions(new_ids)
|
|
if self._fallback_mode:
|
|
await self._ensure_fallback_workers()
|
|
|
|
async def _refresh_panels(self, subscribe_new: bool) -> None:
|
|
try:
|
|
response = await self._post_json("/api/claw/groups/get", {})
|
|
except Exception as e:
|
|
logger.warning("Mochat getWorkspaceGroup failed: {}", e)
|
|
return
|
|
|
|
raw_panels = response.get("panels")
|
|
if not isinstance(raw_panels, list):
|
|
return
|
|
|
|
new_ids: list[str] = []
|
|
for p in raw_panels:
|
|
if not isinstance(p, dict):
|
|
continue
|
|
pt = p.get("type")
|
|
if isinstance(pt, int) and pt != 0:
|
|
continue
|
|
pid = _str_field(p, "id", "_id")
|
|
if pid and pid not in self._panel_set:
|
|
self._panel_set.add(pid)
|
|
new_ids.append(pid)
|
|
|
|
if not new_ids:
|
|
return
|
|
if self._ws_ready and subscribe_new:
|
|
await self._subscribe_panels(new_ids)
|
|
if self._fallback_mode:
|
|
await self._ensure_fallback_workers()
|
|
|
|
# ---- fallback workers --------------------------------------------------
|
|
|
|
async def _ensure_fallback_workers(self) -> None:
|
|
if not self._running:
|
|
return
|
|
self._fallback_mode = True
|
|
for sid in sorted(self._session_set):
|
|
t = self._session_fallback_tasks.get(sid)
|
|
if not t or t.done():
|
|
self._session_fallback_tasks[sid] = asyncio.create_task(self._session_watch_worker(sid))
|
|
for pid in sorted(self._panel_set):
|
|
t = self._panel_fallback_tasks.get(pid)
|
|
if not t or t.done():
|
|
self._panel_fallback_tasks[pid] = asyncio.create_task(self._panel_poll_worker(pid))
|
|
|
|
async def _stop_fallback_workers(self) -> None:
|
|
self._fallback_mode = False
|
|
tasks = [*self._session_fallback_tasks.values(), *self._panel_fallback_tasks.values()]
|
|
for t in tasks:
|
|
t.cancel()
|
|
if tasks:
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
self._session_fallback_tasks.clear()
|
|
self._panel_fallback_tasks.clear()
|
|
|
|
async def _session_watch_worker(self, session_id: str) -> None:
|
|
while self._running and self._fallback_mode:
|
|
try:
|
|
payload = await self._post_json("/api/claw/sessions/watch", {
|
|
"sessionId": session_id, "cursor": self._session_cursor.get(session_id, 0),
|
|
"timeoutMs": self.config.watch_timeout_ms, "limit": self.config.watch_limit,
|
|
})
|
|
await self._handle_watch_payload(payload, "session")
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.warning("Mochat watch fallback error ({}): {}", session_id, e)
|
|
await asyncio.sleep(max(0.1, self.config.retry_delay_ms / 1000.0))
|
|
|
|
async def _panel_poll_worker(self, panel_id: str) -> None:
|
|
sleep_s = max(1.0, self.config.refresh_interval_ms / 1000.0)
|
|
while self._running and self._fallback_mode:
|
|
try:
|
|
resp = await self._post_json("/api/claw/groups/panels/messages", {
|
|
"panelId": panel_id, "limit": min(100, max(1, self.config.watch_limit)),
|
|
})
|
|
msgs = resp.get("messages")
|
|
if isinstance(msgs, list):
|
|
for m in reversed(msgs):
|
|
if not isinstance(m, dict):
|
|
continue
|
|
evt = _make_synthetic_event(
|
|
message_id=str(m.get("messageId") or ""),
|
|
author=str(m.get("author") or ""),
|
|
content=m.get("content"),
|
|
meta=m.get("meta"), group_id=str(resp.get("groupId") or ""),
|
|
converse_id=panel_id, timestamp=m.get("createdAt"),
|
|
author_info=m.get("authorInfo"),
|
|
)
|
|
await self._process_inbound_event(panel_id, evt, "panel")
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.warning("Mochat panel polling error ({}): {}", panel_id, e)
|
|
await asyncio.sleep(sleep_s)
|
|
|
|
# ---- inbound event processing ------------------------------------------
|
|
|
|
async def _handle_watch_payload(self, payload: dict[str, Any], target_kind: str) -> None:
|
|
if not isinstance(payload, dict):
|
|
return
|
|
target_id = _str_field(payload, "sessionId")
|
|
if not target_id:
|
|
return
|
|
|
|
lock = self._target_locks.setdefault(f"{target_kind}:{target_id}", asyncio.Lock())
|
|
async with lock:
|
|
prev = self._session_cursor.get(target_id, 0) if target_kind == "session" else 0
|
|
pc = payload.get("cursor")
|
|
if target_kind == "session" and isinstance(pc, int) and pc >= 0:
|
|
self._mark_session_cursor(target_id, pc)
|
|
|
|
raw_events = payload.get("events")
|
|
if not isinstance(raw_events, list):
|
|
return
|
|
if target_kind == "session" and target_id in self._cold_sessions:
|
|
self._cold_sessions.discard(target_id)
|
|
return
|
|
|
|
for event in raw_events:
|
|
if not isinstance(event, dict):
|
|
continue
|
|
seq = event.get("seq")
|
|
if target_kind == "session" and isinstance(seq, int) and seq > self._session_cursor.get(target_id, prev):
|
|
self._mark_session_cursor(target_id, seq)
|
|
if event.get("type") == "message.add":
|
|
await self._process_inbound_event(target_id, event, target_kind)
|
|
|
|
async def _process_inbound_event(self, target_id: str, event: dict[str, Any], target_kind: str) -> None:
|
|
payload = event.get("payload")
|
|
if not isinstance(payload, dict):
|
|
return
|
|
|
|
author = _str_field(payload, "author")
|
|
if not author or (self.config.agent_user_id and author == self.config.agent_user_id):
|
|
return
|
|
if not self.is_allowed(author):
|
|
return
|
|
|
|
message_id = _str_field(payload, "messageId")
|
|
seen_key = f"{target_kind}:{target_id}"
|
|
if message_id and self._remember_message_id(seen_key, message_id):
|
|
return
|
|
|
|
raw_body = normalize_mochat_content(payload.get("content")) or "[empty message]"
|
|
ai = _safe_dict(payload.get("authorInfo"))
|
|
sender_name = _str_field(ai, "nickname", "email")
|
|
sender_username = _str_field(ai, "agentId")
|
|
|
|
group_id = _str_field(payload, "groupId")
|
|
is_group = bool(group_id)
|
|
was_mentioned = resolve_was_mentioned(payload, self.config.agent_user_id)
|
|
require_mention = target_kind == "panel" and is_group and resolve_require_mention(self.config, target_id, group_id)
|
|
use_delay = target_kind == "panel" and self.config.reply_delay_mode == "non-mention"
|
|
|
|
if require_mention and not was_mentioned and not use_delay:
|
|
return
|
|
|
|
entry = MochatBufferedEntry(
|
|
raw_body=raw_body, author=author, sender_name=sender_name,
|
|
sender_username=sender_username, timestamp=parse_timestamp(event.get("timestamp")),
|
|
message_id=message_id, group_id=group_id,
|
|
)
|
|
|
|
if use_delay:
|
|
delay_key = seen_key
|
|
if was_mentioned:
|
|
await self._flush_delayed_entries(delay_key, target_id, target_kind, "mention", entry)
|
|
else:
|
|
await self._enqueue_delayed_entry(delay_key, target_id, target_kind, entry)
|
|
return
|
|
|
|
await self._dispatch_entries(target_id, target_kind, [entry], was_mentioned)
|
|
|
|
# ---- dedup / buffering -------------------------------------------------
|
|
|
|
def _remember_message_id(self, key: str, message_id: str) -> bool:
|
|
seen_set = self._seen_set.setdefault(key, set())
|
|
seen_queue = self._seen_queue.setdefault(key, deque())
|
|
if message_id in seen_set:
|
|
return True
|
|
seen_set.add(message_id)
|
|
seen_queue.append(message_id)
|
|
while len(seen_queue) > MAX_SEEN_MESSAGE_IDS:
|
|
seen_set.discard(seen_queue.popleft())
|
|
return False
|
|
|
|
async def _enqueue_delayed_entry(self, key: str, target_id: str, target_kind: str, entry: MochatBufferedEntry) -> None:
|
|
state = self._delay_states.setdefault(key, DelayState())
|
|
async with state.lock:
|
|
state.entries.append(entry)
|
|
if state.timer:
|
|
state.timer.cancel()
|
|
state.timer = asyncio.create_task(self._delay_flush_after(key, target_id, target_kind))
|
|
|
|
async def _delay_flush_after(self, key: str, target_id: str, target_kind: str) -> None:
|
|
await asyncio.sleep(max(0, self.config.reply_delay_ms) / 1000.0)
|
|
await self._flush_delayed_entries(key, target_id, target_kind, "timer", None)
|
|
|
|
async def _flush_delayed_entries(self, key: str, target_id: str, target_kind: str, reason: str, entry: MochatBufferedEntry | None) -> None:
|
|
state = self._delay_states.setdefault(key, DelayState())
|
|
async with state.lock:
|
|
if entry:
|
|
state.entries.append(entry)
|
|
current = asyncio.current_task()
|
|
if state.timer and state.timer is not current:
|
|
state.timer.cancel()
|
|
state.timer = None
|
|
entries = state.entries[:]
|
|
state.entries.clear()
|
|
if entries:
|
|
await self._dispatch_entries(target_id, target_kind, entries, reason == "mention")
|
|
|
|
async def _dispatch_entries(self, target_id: str, target_kind: str, entries: list[MochatBufferedEntry], was_mentioned: bool) -> None:
|
|
if not entries:
|
|
return
|
|
last = entries[-1]
|
|
is_group = bool(last.group_id)
|
|
body = build_buffered_body(entries, is_group) or "[empty message]"
|
|
await self._handle_message(
|
|
sender_id=last.author, chat_id=target_id, content=body,
|
|
metadata={
|
|
"message_id": last.message_id, "timestamp": last.timestamp,
|
|
"is_group": is_group, "group_id": last.group_id,
|
|
"sender_name": last.sender_name, "sender_username": last.sender_username,
|
|
"target_kind": target_kind, "was_mentioned": was_mentioned,
|
|
"buffered_count": len(entries),
|
|
},
|
|
)
|
|
|
|
async def _cancel_delay_timers(self) -> None:
|
|
for state in self._delay_states.values():
|
|
if state.timer:
|
|
state.timer.cancel()
|
|
self._delay_states.clear()
|
|
|
|
# ---- notify handlers ---------------------------------------------------
|
|
|
|
async def _handle_notify_chat_message(self, payload: Any) -> None:
|
|
if not isinstance(payload, dict):
|
|
return
|
|
group_id = _str_field(payload, "groupId")
|
|
panel_id = _str_field(payload, "converseId", "panelId")
|
|
if not group_id or not panel_id:
|
|
return
|
|
if self._panel_set and panel_id not in self._panel_set:
|
|
return
|
|
|
|
evt = _make_synthetic_event(
|
|
message_id=str(payload.get("_id") or payload.get("messageId") or ""),
|
|
author=str(payload.get("author") or ""),
|
|
content=payload.get("content"), meta=payload.get("meta"),
|
|
group_id=group_id, converse_id=panel_id,
|
|
timestamp=payload.get("createdAt"), author_info=payload.get("authorInfo"),
|
|
)
|
|
await self._process_inbound_event(panel_id, evt, "panel")
|
|
|
|
async def _handle_notify_inbox_append(self, payload: Any) -> None:
|
|
if not isinstance(payload, dict) or payload.get("type") != "message":
|
|
return
|
|
detail = payload.get("payload")
|
|
if not isinstance(detail, dict):
|
|
return
|
|
if _str_field(detail, "groupId"):
|
|
return
|
|
converse_id = _str_field(detail, "converseId")
|
|
if not converse_id:
|
|
return
|
|
|
|
session_id = self._session_by_converse.get(converse_id)
|
|
if not session_id:
|
|
await self._refresh_sessions_directory(self._ws_ready)
|
|
session_id = self._session_by_converse.get(converse_id)
|
|
if not session_id:
|
|
return
|
|
|
|
evt = _make_synthetic_event(
|
|
message_id=str(detail.get("messageId") or payload.get("_id") or ""),
|
|
author=str(detail.get("messageAuthor") or ""),
|
|
content=str(detail.get("messagePlainContent") or detail.get("messageSnippet") or ""),
|
|
meta={"source": "notify:chat.inbox.append", "converseId": converse_id},
|
|
group_id="", converse_id=converse_id, timestamp=payload.get("createdAt"),
|
|
)
|
|
await self._process_inbound_event(session_id, evt, "session")
|
|
|
|
# ---- cursor persistence ------------------------------------------------
|
|
|
|
def _mark_session_cursor(self, session_id: str, cursor: int) -> None:
|
|
if cursor < 0 or cursor < self._session_cursor.get(session_id, 0):
|
|
return
|
|
self._session_cursor[session_id] = cursor
|
|
if not self._cursor_save_task or self._cursor_save_task.done():
|
|
self._cursor_save_task = asyncio.create_task(self._save_cursor_debounced())
|
|
|
|
async def _save_cursor_debounced(self) -> None:
|
|
await asyncio.sleep(CURSOR_SAVE_DEBOUNCE_S)
|
|
await self._save_session_cursors()
|
|
|
|
async def _load_session_cursors(self) -> None:
|
|
if not self._cursor_path.exists():
|
|
return
|
|
try:
|
|
data = json.loads(self._cursor_path.read_text("utf-8"))
|
|
except Exception as e:
|
|
logger.warning("Failed to read Mochat cursor file: {}", e)
|
|
return
|
|
cursors = data.get("cursors") if isinstance(data, dict) else None
|
|
if isinstance(cursors, dict):
|
|
for sid, cur in cursors.items():
|
|
if isinstance(sid, str) and isinstance(cur, int) and cur >= 0:
|
|
self._session_cursor[sid] = cur
|
|
|
|
async def _save_session_cursors(self) -> None:
|
|
try:
|
|
self._state_dir.mkdir(parents=True, exist_ok=True)
|
|
self._cursor_path.write_text(json.dumps({
|
|
"schemaVersion": 1, "updatedAt": datetime.utcnow().isoformat(),
|
|
"cursors": self._session_cursor,
|
|
}, ensure_ascii=False, indent=2) + "\n", "utf-8")
|
|
except Exception as e:
|
|
logger.warning("Failed to save Mochat cursor file: {}", e)
|
|
|
|
# ---- HTTP helpers ------------------------------------------------------
|
|
|
|
async def _post_json(self, path: str, payload: dict[str, Any]) -> dict[str, Any]:
|
|
if not self._http:
|
|
raise RuntimeError("Mochat HTTP client not initialized")
|
|
url = f"{self.config.base_url.strip().rstrip('/')}{path}"
|
|
response = await self._http.post(url, headers={
|
|
"Content-Type": "application/json", "X-Claw-Token": self.config.claw_token,
|
|
}, json=payload)
|
|
if not response.is_success:
|
|
raise RuntimeError(f"Mochat HTTP {response.status_code}: {response.text[:200]}")
|
|
try:
|
|
parsed = response.json()
|
|
except Exception:
|
|
parsed = response.text
|
|
if isinstance(parsed, dict) and isinstance(parsed.get("code"), int):
|
|
if parsed["code"] != 200:
|
|
msg = str(parsed.get("message") or parsed.get("name") or "request failed")
|
|
raise RuntimeError(f"Mochat API error: {msg} (code={parsed['code']})")
|
|
data = parsed.get("data")
|
|
return data if isinstance(data, dict) else {}
|
|
return parsed if isinstance(parsed, dict) else {}
|
|
|
|
async def _api_send(self, path: str, id_key: str, id_val: str,
|
|
content: str, reply_to: str | None, group_id: str | None = None) -> dict[str, Any]:
|
|
"""Unified send helper for session and panel messages."""
|
|
body: dict[str, Any] = {id_key: id_val, "content": content}
|
|
if reply_to:
|
|
body["replyTo"] = reply_to
|
|
if group_id:
|
|
body["groupId"] = group_id
|
|
return await self._post_json(path, body)
|
|
|
|
@staticmethod
|
|
def _read_group_id(metadata: dict[str, Any]) -> str | None:
|
|
if not isinstance(metadata, dict):
|
|
return None
|
|
value = metadata.get("group_id") or metadata.get("groupId")
|
|
return value.strip() if isinstance(value, str) and value.strip() else None
|