"""Feishu/Lark channel implementation using lark-oapi SDK with WebSocket long connection.""" import asyncio import importlib.util import json import os import re import threading import time import uuid from collections import OrderedDict from dataclasses import dataclass from typing import Any, Literal from lark_oapi.api.im.v1.model import MentionEvent, P2ImMessageReceiveV1 from loguru import logger from pydantic import Field from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.channels.base import BaseChannel from nanobot.config.paths import get_media_dir from nanobot.config.schema import Base from lark_oapi.core.const import FEISHU_DOMAIN, LARK_DOMAIN FEISHU_AVAILABLE = importlib.util.find_spec("lark_oapi") is not None # Message type display mapping MSG_TYPE_MAP = { "image": "[image]", "audio": "[audio]", "file": "[file]", "sticker": "[sticker]", } def _extract_share_card_content(content_json: dict, msg_type: str) -> str: """Extract text representation from share cards and interactive messages.""" parts = [] if msg_type == "share_chat": parts.append(f"[shared chat: {content_json.get('chat_id', '')}]") elif msg_type == "share_user": parts.append(f"[shared user: {content_json.get('user_id', '')}]") elif msg_type == "interactive": parts.extend(_extract_interactive_content(content_json)) elif msg_type == "share_calendar_event": parts.append(f"[shared calendar event: {content_json.get('event_key', '')}]") elif msg_type == "system": parts.append("[system message]") elif msg_type == "merge_forward": parts.append("[merged forward messages]") return "\n".join(parts) if parts else f"[{msg_type}]" def _extract_interactive_content(content: dict) -> list[str]: """Recursively extract text and links from interactive card content.""" parts = [] if isinstance(content, str): try: content = json.loads(content) except (json.JSONDecodeError, TypeError): return [content] if content.strip() else [] if not isinstance(content, dict): return parts if "title" in content: title = content["title"] if isinstance(title, dict): title_content = title.get("content", "") or title.get("text", "") if title_content: parts.append(f"title: {title_content}") elif isinstance(title, str): parts.append(f"title: {title}") for elements in ( content.get("elements", []) if isinstance(content.get("elements"), list) else [] ): for element in elements: parts.extend(_extract_element_content(element)) card = content.get("card", {}) if card: parts.extend(_extract_interactive_content(card)) header = content.get("header", {}) if header: header_title = header.get("title", {}) if isinstance(header_title, dict): header_text = header_title.get("content", "") or header_title.get("text", "") if header_text: parts.append(f"title: {header_text}") return parts def _extract_element_content(element: dict) -> list[str]: """Extract content from a single card element.""" parts = [] if not isinstance(element, dict): return parts tag = element.get("tag", "") if tag in ("markdown", "lark_md"): content = element.get("content", "") if content: parts.append(content) elif tag == "div": text = element.get("text", {}) if isinstance(text, dict): text_content = text.get("content", "") or text.get("text", "") if text_content: parts.append(text_content) elif isinstance(text, str): parts.append(text) for field in element.get("fields", []): if isinstance(field, dict): field_text = field.get("text", {}) if isinstance(field_text, dict): c = field_text.get("content", "") if c: parts.append(c) elif tag == "a": href = element.get("href", "") text = element.get("text", "") if href: parts.append(f"link: {href}") if text: parts.append(text) elif tag == "button": text = element.get("text", {}) if isinstance(text, dict): c = text.get("content", "") if c: parts.append(c) url = element.get("url", "") or element.get("multi_url", {}).get("url", "") if url: parts.append(f"link: {url}") elif tag == "img": alt = element.get("alt", {}) parts.append(alt.get("content", "[image]") if isinstance(alt, dict) else "[image]") elif tag == "note": for ne in element.get("elements", []): parts.extend(_extract_element_content(ne)) elif tag == "column_set": for col in element.get("columns", []): for ce in col.get("elements", []): parts.extend(_extract_element_content(ce)) elif tag == "plain_text": content = element.get("content", "") if content: parts.append(content) else: for ne in element.get("elements", []): parts.extend(_extract_element_content(ne)) return parts def _extract_post_content(content_json: dict) -> tuple[str, list[str]]: """Extract text and image keys from Feishu post (rich text) message. Handles three payload shapes: - Direct: {"title": "...", "content": [[...]]} - Localized: {"zh_cn": {"title": "...", "content": [...]}} - Wrapped: {"post": {"zh_cn": {"title": "...", "content": [...]}}} """ def _parse_block(block: dict) -> tuple[str | None, list[str]]: if not isinstance(block, dict) or not isinstance(block.get("content"), list): return None, [] texts, images = [], [] if title := block.get("title"): texts.append(title) for row in block["content"]: if not isinstance(row, list): continue for el in row: if not isinstance(el, dict): continue tag = el.get("tag") if tag in ("text", "a"): texts.append(el.get("text", "")) elif tag == "at": texts.append(f"@{el.get('user_name', 'user')}") elif tag == "code_block": lang = el.get("language", "") code_text = el.get("text", "") texts.append(f"\n```{lang}\n{code_text}\n```\n") elif tag == "img" and (key := el.get("image_key")): images.append(key) return (" ".join(texts).strip() or None), images # Unwrap optional {"post": ...} envelope root = content_json if isinstance(root, dict) and isinstance(root.get("post"), dict): root = root["post"] if not isinstance(root, dict): return "", [] # Direct format if "content" in root: text, imgs = _parse_block(root) if text or imgs: return text or "", imgs # Localized: prefer known locales, then fall back to any dict child for key in ("zh_cn", "en_us", "ja_jp"): if key in root: text, imgs = _parse_block(root[key]) if text or imgs: return text or "", imgs for val in root.values(): if isinstance(val, dict): text, imgs = _parse_block(val) if text or imgs: return text or "", imgs return "", [] def _extract_post_text(content_json: dict) -> str: """Extract plain text from Feishu post (rich text) message content. Legacy wrapper for _extract_post_content, returns only text. """ text, _ = _extract_post_content(content_json) return text class FeishuConfig(Base): """Feishu/Lark channel configuration using WebSocket long connection.""" enabled: bool = False app_id: str = "" app_secret: str = "" encrypt_key: str = "" verification_token: str = "" allow_from: list[str] = Field(default_factory=list) react_emoji: str = "THUMBSUP" done_emoji: str | None = None # Emoji to show when task is completed (e.g., "DONE", "OK") tool_hint_prefix: str = "\U0001f527" # Prefix for inline tool hints (default: πŸ”§) group_policy: Literal["open", "mention"] = "mention" reply_to_message: bool = False # If True, bot replies quote the user's original message streaming: bool = True domain: Literal["feishu", "lark"] = "feishu" # Set to "lark" for international Lark _STREAM_ELEMENT_ID = "streaming_md" @dataclass class _FeishuStreamBuf: """Per-chat streaming accumulator using CardKit streaming API.""" text: str = "" card_id: str | None = None sequence: int = 0 last_edit: float = 0.0 class FeishuChannel(BaseChannel): """ Feishu/Lark channel using WebSocket long connection. Uses WebSocket to receive events - no public IP or webhook required. Requires: - App ID and App Secret from Feishu Open Platform - Bot capability enabled - Event subscription enabled (im.message.receive_v1) """ name = "feishu" display_name = "Feishu" _STREAM_EDIT_INTERVAL = 0.5 # throttle between CardKit streaming updates @classmethod def default_config(cls) -> dict[str, Any]: return FeishuConfig().model_dump(by_alias=True) def __init__(self, config: Any, bus: MessageBus): import lark_oapi as lark if isinstance(config, dict): config = FeishuConfig.model_validate(config) super().__init__(config, bus) self.config: FeishuConfig = config self._client: lark.Client = None self._ws_client: Any = None self._ws_thread: threading.Thread | None = None self._processed_message_ids: OrderedDict[str, None] = OrderedDict() # Ordered dedup cache self._loop: asyncio.AbstractEventLoop | None = None self._stream_bufs: dict[str, _FeishuStreamBuf] = {} self._bot_open_id: str | None = None @staticmethod def _register_optional_event(builder: Any, method_name: str, handler: Any) -> Any: """Register an event handler only when the SDK supports it.""" method = getattr(builder, method_name, None) return method(handler) if callable(method) else builder async def start(self) -> None: """Start the Feishu bot with WebSocket long connection.""" if not FEISHU_AVAILABLE: logger.error("Feishu SDK not installed. Run: pip install lark-oapi") return if not self.config.app_id or not self.config.app_secret: logger.error("Feishu app_id and app_secret not configured") return import lark_oapi as lark self._running = True self._loop = asyncio.get_running_loop() # Create Lark client for sending messages domain = LARK_DOMAIN if self.config.domain == "lark" else FEISHU_DOMAIN self._client = ( lark.Client.builder() .app_id(self.config.app_id) .app_secret(self.config.app_secret) .domain(domain) .log_level(lark.LogLevel.INFO) .build() ) builder = lark.EventDispatcherHandler.builder( self.config.encrypt_key or "", self.config.verification_token or "", ).register_p2_im_message_receive_v1(self._on_message_sync) builder = self._register_optional_event( builder, "register_p2_im_message_reaction_created_v1", self._on_reaction_created ) builder = self._register_optional_event( builder, "register_p2_im_message_reaction_deleted_v1", self._on_reaction_deleted ) builder = self._register_optional_event( builder, "register_p2_im_message_message_read_v1", self._on_message_read ) builder = self._register_optional_event( builder, "register_p2_im_chat_access_event_bot_p2p_chat_entered_v1", self._on_bot_p2p_chat_entered, ) event_handler = builder.build() # Create WebSocket client for long connection self._ws_client = lark.ws.Client( self.config.app_id, self.config.app_secret, domain=domain, event_handler=event_handler, log_level=lark.LogLevel.INFO, ) # Start WebSocket client in a separate thread with reconnect loop. # A dedicated event loop is created for this thread so that lark_oapi's # module-level `loop = asyncio.get_event_loop()` picks up an idle loop # instead of the already-running main asyncio loop, which would cause # "This event loop is already running" errors. def run_ws(): import time import lark_oapi.ws.client as _lark_ws_client ws_loop = asyncio.new_event_loop() asyncio.set_event_loop(ws_loop) # Patch the module-level loop used by lark's ws Client.start() _lark_ws_client.loop = ws_loop try: while self._running: try: self._ws_client.start() except Exception as e: logger.warning("Feishu WebSocket error: {}", e) if self._running: time.sleep(5) finally: ws_loop.close() self._ws_thread = threading.Thread(target=run_ws, daemon=True) self._ws_thread.start() # Fetch bot's own open_id for accurate @mention matching self._bot_open_id = await asyncio.get_running_loop().run_in_executor( None, self._fetch_bot_open_id ) if self._bot_open_id: logger.info("Feishu bot open_id: {}", self._bot_open_id) else: logger.warning("Could not fetch bot open_id; @mention matching may be inaccurate") logger.info("Feishu bot started with WebSocket long connection") logger.info("No public IP required - using WebSocket to receive events") # Keep running until stopped while self._running: await asyncio.sleep(1) async def stop(self) -> None: """ Stop the Feishu bot. Notice: lark.ws.Client does not expose stop method, simply exiting the program will close the client. Reference: https://github.com/larksuite/oapi-sdk-python/blob/v2_main/lark_oapi/ws/client.py#L86 """ self._running = False logger.info("Feishu bot stopped") def _fetch_bot_open_id(self) -> str | None: """Fetch the bot's own open_id via GET /open-apis/bot/v3/info.""" try: import lark_oapi as lark request = ( lark.BaseRequest.builder() .http_method(lark.HttpMethod.GET) .uri("/open-apis/bot/v3/info") .token_types({lark.AccessTokenType.APP}) .build() ) response = self._client.request(request) if response.success(): import json data = json.loads(response.raw.content) bot = (data.get("data") or data).get("bot") or data.get("bot") or {} return bot.get("open_id") logger.warning("Failed to get bot info: code={}, msg={}", response.code, response.msg) return None except Exception as e: logger.warning("Error fetching bot info: {}", e) return None @staticmethod def _resolve_mentions(text: str, mentions: list[MentionEvent] | None) -> str: """Replace @_user_n placeholders with actual user info from mentions. Args: text: The message text containing @_user_n placeholders mentions: List of mention objects from Feishu message Returns: Text with placeholders replaced by @姓名 (open_id) """ if not mentions or not text: return text for mention in mentions: key = mention.key or None if not key or key not in text: continue user_id_obj = mention.id or None if not user_id_obj: continue open_id = user_id_obj.open_id user_id = user_id_obj.user_id name = mention.name or key # Format: @姓名 (open_id, user_id: xxx) if open_id and user_id: replacement = f"@{name} ({open_id}, user id: {user_id})" elif open_id: replacement = f"@{name} ({open_id})" else: replacement = f"@{name}" text = text.replace(key, replacement) return text def _is_bot_mentioned(self, message: Any) -> bool: """Check if the bot is @mentioned in the message.""" raw_content = message.content or "" if "@_all" in raw_content: return True for mention in getattr(message, "mentions", None) or []: mid = getattr(mention, "id", None) if not mid: continue mention_open_id = getattr(mid, "open_id", None) or "" if self._bot_open_id: if mention_open_id == self._bot_open_id: return True else: # Fallback heuristic when bot open_id is unavailable if not getattr(mid, "user_id", None) and mention_open_id.startswith("ou_"): return True return False def _is_group_message_for_bot(self, message: Any) -> bool: """Allow group messages when policy is open or bot is @mentioned.""" if self.config.group_policy == "open": return True return self._is_bot_mentioned(message) def _add_reaction_sync(self, message_id: str, emoji_type: str) -> str | None: """Sync helper for adding reaction (runs in thread pool).""" from lark_oapi.api.im.v1 import ( CreateMessageReactionRequest, CreateMessageReactionRequestBody, Emoji, ) try: request = ( CreateMessageReactionRequest.builder() .message_id(message_id) .request_body( CreateMessageReactionRequestBody.builder() .reaction_type(Emoji.builder().emoji_type(emoji_type).build()) .build() ) .build() ) response = self._client.im.v1.message_reaction.create(request) if not response.success(): logger.warning( "Failed to add reaction: code={}, msg={}", response.code, response.msg ) return None else: logger.debug("Added {} reaction to message {}", emoji_type, message_id) return response.data.reaction_id if response.data else None except Exception as e: logger.warning("Error adding reaction: {}", e) return None async def _add_reaction(self, message_id: str, emoji_type: str = "THUMBSUP") -> str | None: """ Add a reaction emoji to a message (non-blocking). Common emoji types: THUMBSUP, OK, EYES, DONE, OnIt, HEART """ if not self._client: return None loop = asyncio.get_running_loop() return await loop.run_in_executor(None, self._add_reaction_sync, message_id, emoji_type) def _remove_reaction_sync(self, message_id: str, reaction_id: str) -> None: """Sync helper for removing reaction (runs in thread pool).""" from lark_oapi.api.im.v1 import DeleteMessageReactionRequest try: request = ( DeleteMessageReactionRequest.builder() .message_id(message_id) .reaction_id(reaction_id) .build() ) response = self._client.im.v1.message_reaction.delete(request) if response.success(): logger.debug("Removed reaction {} from message {}", reaction_id, message_id) else: logger.debug( "Failed to remove reaction: code={}, msg={}", response.code, response.msg ) except Exception as e: logger.debug("Error removing reaction: {}", e) async def _remove_reaction(self, message_id: str, reaction_id: str) -> None: """ Remove a reaction emoji from a message (non-blocking). Used to clear the "processing" indicator after bot replies. """ if not self._client or not reaction_id: return loop = asyncio.get_running_loop() await loop.run_in_executor(None, self._remove_reaction_sync, message_id, reaction_id) # Regex to match markdown tables (header + separator + data rows) _TABLE_RE = re.compile( r"((?:^[ \t]*\|.+\|[ \t]*\n)(?:^[ \t]*\|[-:\s|]+\|[ \t]*\n)(?:^[ \t]*\|.+\|[ \t]*\n?)+)", re.MULTILINE, ) _HEADING_RE = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE) _CODE_BLOCK_RE = re.compile(r"(```[\s\S]*?```)", re.MULTILINE) # Markdown formatting patterns that should be stripped from plain-text # surfaces like table cells and heading text. _MD_BOLD_RE = re.compile(r"\*\*(.+?)\*\*") _MD_BOLD_UNDERSCORE_RE = re.compile(r"__(.+?)__") _MD_ITALIC_RE = re.compile(r"(? str: """Strip markdown formatting markers from text for plain display. Feishu table cells do not support markdown rendering, so we remove the formatting markers to keep the text readable. """ # Remove bold markers text = cls._MD_BOLD_RE.sub(r"\1", text) text = cls._MD_BOLD_UNDERSCORE_RE.sub(r"\1", text) # Remove italic markers text = cls._MD_ITALIC_RE.sub(r"\1", text) # Remove strikethrough markers text = cls._MD_STRIKE_RE.sub(r"\1", text) return text @classmethod def _parse_md_table(cls, table_text: str) -> dict | None: """Parse a markdown table into a Feishu table element.""" lines = [_line.strip() for _line in table_text.strip().split("\n") if _line.strip()] if len(lines) < 3: return None def split(_line: str) -> list[str]: return [c.strip() for c in _line.strip("|").split("|")] headers = [cls._strip_md_formatting(h) for h in split(lines[0])] rows = [[cls._strip_md_formatting(c) for c in split(_line)] for _line in lines[2:]] columns = [ {"tag": "column", "name": f"c{i}", "display_name": h, "width": "auto"} for i, h in enumerate(headers) ] return { "tag": "table", "page_size": len(rows) + 1, "columns": columns, "rows": [ {f"c{i}": r[i] if i < len(r) else "" for i in range(len(headers))} for r in rows ], } def _build_card_elements(self, content: str) -> list[dict]: """Split content into div/markdown + table elements for Feishu card.""" elements, last_end = [], 0 for m in self._TABLE_RE.finditer(content): before = content[last_end : m.start()] if before.strip(): elements.extend(self._split_headings(before)) elements.append( self._parse_md_table(m.group(1)) or {"tag": "markdown", "content": m.group(1)} ) last_end = m.end() remaining = content[last_end:] if remaining.strip(): elements.extend(self._split_headings(remaining)) return elements or [{"tag": "markdown", "content": content}] @staticmethod def _split_elements_by_table_limit( elements: list[dict], max_tables: int = 1 ) -> list[list[dict]]: """Split card elements into groups with at most *max_tables* table elements each. Feishu cards have a hard limit of one table per card (API error 11310). When the rendered content contains multiple markdown tables each table is placed in a separate card message so every table reaches the user. """ if not elements: return [[]] groups: list[list[dict]] = [] current: list[dict] = [] table_count = 0 for el in elements: if el.get("tag") == "table": if table_count >= max_tables: if current: groups.append(current) current = [] table_count = 0 current.append(el) table_count += 1 else: current.append(el) if current: groups.append(current) return groups or [[]] def _split_headings(self, content: str) -> list[dict]: """Split content by headings, converting headings to div elements.""" protected = content code_blocks = [] for m in self._CODE_BLOCK_RE.finditer(content): code_blocks.append(m.group(1)) protected = protected.replace(m.group(1), f"\x00CODE{len(code_blocks) - 1}\x00", 1) elements = [] last_end = 0 for m in self._HEADING_RE.finditer(protected): before = protected[last_end : m.start()].strip() if before: elements.append({"tag": "markdown", "content": before}) text = self._strip_md_formatting(m.group(2).strip()) display_text = f"**{text}**" if text else "" elements.append( { "tag": "div", "text": { "tag": "lark_md", "content": display_text, }, } ) last_end = m.end() remaining = protected[last_end:].strip() if remaining: elements.append({"tag": "markdown", "content": remaining}) for i, cb in enumerate(code_blocks): for el in elements: if el.get("tag") == "markdown": el["content"] = el["content"].replace(f"\x00CODE{i}\x00", cb) return elements or [{"tag": "markdown", "content": content}] # ── Smart format detection ────────────────────────────────────────── # Patterns that indicate "complex" markdown needing card rendering _COMPLEX_MD_RE = re.compile( r"```" # fenced code block r"|^\|.+\|.*\n\s*\|[-:\s|]+\|" # markdown table (header + separator) r"|^#{1,6}\s+", # headings re.MULTILINE, ) # Simple markdown patterns (bold, italic, strikethrough) _SIMPLE_MD_RE = re.compile( r"\*\*.+?\*\*" # **bold** r"|__.+?__" # __bold__ r"|(? str: """Determine the optimal Feishu message format for *content*. Returns one of: - ``"text"`` – plain text, short and no markdown - ``"post"`` – rich text (links only, moderate length) - ``"interactive"`` – card with full markdown rendering """ stripped = content.strip() # Complex markdown (code blocks, tables, headings) β†’ always card if cls._COMPLEX_MD_RE.search(stripped): return "interactive" # Long content β†’ card (better readability with card layout) if len(stripped) > cls._POST_MAX_LEN: return "interactive" # Has bold/italic/strikethrough β†’ card (post format can't render these) if cls._SIMPLE_MD_RE.search(stripped): return "interactive" # Has list items β†’ card (post format can't render list bullets well) if cls._LIST_RE.search(stripped) or cls._OLIST_RE.search(stripped): return "interactive" # Has links β†’ post format (supports tags) if cls._MD_LINK_RE.search(stripped): return "post" # Short plain text β†’ text format if len(stripped) <= cls._TEXT_MAX_LEN: return "text" # Medium plain text without any formatting β†’ post format return "post" @classmethod def _markdown_to_post(cls, content: str) -> str: """Convert markdown content to Feishu post message JSON. Handles links ``[text](url)`` as ``a`` tags; everything else as ``text`` tags. Each line becomes a paragraph (row) in the post body. """ lines = content.strip().split("\n") paragraphs: list[list[dict]] = [] for line in lines: elements: list[dict] = [] last_end = 0 for m in cls._MD_LINK_RE.finditer(line): # Text before this link before = line[last_end : m.start()] if before: elements.append({"tag": "text", "text": before}) elements.append( { "tag": "a", "text": m.group(1), "href": m.group(2), } ) last_end = m.end() # Remaining text after last link remaining = line[last_end:] if remaining: elements.append({"tag": "text", "text": remaining}) # Empty line β†’ empty paragraph for spacing if not elements: elements.append({"tag": "text", "text": ""}) paragraphs.append(elements) post_body = { "zh_cn": { "content": paragraphs, } } return json.dumps(post_body, ensure_ascii=False) _IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp", ".ico", ".tiff", ".tif"} _AUDIO_EXTS = {".opus"} _VIDEO_EXTS = {".mp4", ".mov", ".avi"} _FILE_TYPE_MAP = { ".opus": "opus", ".mp4": "mp4", ".pdf": "pdf", ".doc": "doc", ".docx": "doc", ".xls": "xls", ".xlsx": "xls", ".ppt": "ppt", ".pptx": "ppt", } def _upload_image_sync(self, file_path: str) -> str | None: """Upload an image to Feishu and return the image_key.""" from lark_oapi.api.im.v1 import CreateImageRequest, CreateImageRequestBody try: with open(file_path, "rb") as f: request = ( CreateImageRequest.builder() .request_body( CreateImageRequestBody.builder().image_type("message").image(f).build() ) .build() ) response = self._client.im.v1.image.create(request) if response.success(): image_key = response.data.image_key logger.debug("Uploaded image {}: {}", os.path.basename(file_path), image_key) return image_key else: logger.error( "Failed to upload image: code={}, msg={}", response.code, response.msg ) return None except Exception as e: logger.error("Error uploading image {}: {}", file_path, e) return None def _upload_file_sync(self, file_path: str) -> str | None: """Upload a file to Feishu and return the file_key.""" from lark_oapi.api.im.v1 import CreateFileRequest, CreateFileRequestBody ext = os.path.splitext(file_path)[1].lower() file_type = self._FILE_TYPE_MAP.get(ext, "stream") file_name = os.path.basename(file_path) try: with open(file_path, "rb") as f: request = ( CreateFileRequest.builder() .request_body( CreateFileRequestBody.builder() .file_type(file_type) .file_name(file_name) .file(f) .build() ) .build() ) response = self._client.im.v1.file.create(request) if response.success(): file_key = response.data.file_key logger.debug("Uploaded file {}: {}", file_name, file_key) return file_key else: logger.error( "Failed to upload file: code={}, msg={}", response.code, response.msg ) return None except Exception as e: logger.error("Error uploading file {}: {}", file_path, e) return None def _download_image_sync( self, message_id: str, image_key: str ) -> tuple[bytes | None, str | None]: """Download an image from Feishu message by message_id and image_key.""" from lark_oapi.api.im.v1 import GetMessageResourceRequest try: request = ( GetMessageResourceRequest.builder() .message_id(message_id) .file_key(image_key) .type("image") .build() ) response = self._client.im.v1.message_resource.get(request) if response.success(): file_data = response.file # GetMessageResourceRequest returns BytesIO, need to read bytes if hasattr(file_data, "read"): file_data = file_data.read() return file_data, response.file_name else: logger.error( "Failed to download image: code={}, msg={}", response.code, response.msg ) return None, None except Exception as e: logger.error("Error downloading image {}: {}", image_key, e) return None, None def _download_file_sync( self, message_id: str, file_key: str, resource_type: str = "file" ) -> tuple[bytes | None, str | None]: """Download a file/audio/media from a Feishu message by message_id and file_key.""" from lark_oapi.api.im.v1 import GetMessageResourceRequest # Feishu resource download API only accepts 'image' or 'file' as type. # Both 'audio' and 'media' (video) messages use type='file' for download. if resource_type in ("audio", "media"): resource_type = "file" try: request = ( GetMessageResourceRequest.builder() .message_id(message_id) .file_key(file_key) .type(resource_type) .build() ) response = self._client.im.v1.message_resource.get(request) if response.success(): file_data = response.file if hasattr(file_data, "read"): file_data = file_data.read() return file_data, response.file_name else: logger.error( "Failed to download {}: code={}, msg={}", resource_type, response.code, response.msg, ) return None, None except Exception: logger.exception("Error downloading {} {}", resource_type, file_key) return None, None async def _download_and_save_media( self, msg_type: str, content_json: dict, message_id: str | None = None ) -> tuple[str | None, str]: """ Download media from Feishu and save to local disk. Returns: (file_path, content_text) - file_path is None if download failed """ loop = asyncio.get_running_loop() media_dir = get_media_dir("feishu") data, filename = None, None if msg_type == "image": image_key = content_json.get("image_key") if image_key and message_id: data, filename = await loop.run_in_executor( None, self._download_image_sync, message_id, image_key ) if not filename: filename = f"{image_key[:16]}.jpg" elif msg_type in ("audio", "file", "media"): file_key = content_json.get("file_key") if not file_key: logger.warning("Feishu {} message missing file_key: {}", msg_type, content_json) return None, f"[{msg_type}: missing file_key]" if not message_id: logger.warning("Feishu {} message missing message_id", msg_type) return None, f"[{msg_type}: missing message_id]" data, filename = await loop.run_in_executor( None, self._download_file_sync, message_id, file_key, msg_type ) if not data: logger.warning("Feishu {} download failed: file_key={}", msg_type, file_key) return None, f"[{msg_type}: download failed]" if not filename: filename = file_key[:16] # Feishu voice messages are opus in OGG container. # Use .ogg extension for better Whisper compatibility. if msg_type == "audio": if not any(filename.endswith(ext) for ext in (".opus", ".ogg", ".oga")): filename = f"{filename}.ogg" if data and filename: file_path = media_dir / filename file_path.write_bytes(data) logger.debug("Downloaded {} to {}", msg_type, file_path) return str(file_path), f"[{msg_type}: {filename}]" return None, f"[{msg_type}: download failed]" _REPLY_CONTEXT_MAX_LEN = 200 def _get_message_content_sync(self, message_id: str) -> str | None: """Fetch the text content of a Feishu message by ID (synchronous). Returns a "[Reply to: ...]" context string, or None on failure. """ from lark_oapi.api.im.v1 import GetMessageRequest try: request = GetMessageRequest.builder().message_id(message_id).build() response = self._client.im.v1.message.get(request) if not response.success(): logger.debug( "Feishu: could not fetch parent message {}: code={}, msg={}", message_id, response.code, response.msg, ) return None items = getattr(response.data, "items", None) if not items: return None msg_obj = items[0] raw_content = getattr(msg_obj, "body", None) raw_content = getattr(raw_content, "content", None) if raw_content else None if not raw_content: return None try: content_json = json.loads(raw_content) except (json.JSONDecodeError, TypeError): return None msg_type = getattr(msg_obj, "msg_type", "") if msg_type == "text": text = content_json.get("text", "").strip() elif msg_type == "post": text, _ = _extract_post_content(content_json) text = text.strip() else: text = "" if not text: return None if len(text) > self._REPLY_CONTEXT_MAX_LEN: text = text[: self._REPLY_CONTEXT_MAX_LEN] + "..." return f"[Reply to: {text}]" except Exception as e: logger.debug("Feishu: error fetching parent message {}: {}", message_id, e) return None def _reply_message_sync(self, parent_message_id: str, msg_type: str, content: str) -> bool: """Reply to an existing Feishu message using the Reply API (synchronous).""" from lark_oapi.api.im.v1 import ReplyMessageRequest, ReplyMessageRequestBody try: request = ( ReplyMessageRequest.builder() .message_id(parent_message_id) .request_body( ReplyMessageRequestBody.builder().msg_type(msg_type).content(content).build() ) .build() ) response = self._client.im.v1.message.reply(request) if not response.success(): logger.error( "Failed to reply to Feishu message {}: code={}, msg={}, log_id={}", parent_message_id, response.code, response.msg, response.get_log_id(), ) return False logger.debug("Feishu reply sent to message {}", parent_message_id) return True except Exception as e: logger.error("Error replying to Feishu message {}: {}", parent_message_id, e) return False def _send_message_sync( self, receive_id_type: str, receive_id: str, msg_type: str, content: str ) -> str | None: """Send a single message and return the message_id on success.""" from lark_oapi.api.im.v1 import CreateMessageRequest, CreateMessageRequestBody try: request = ( CreateMessageRequest.builder() .receive_id_type(receive_id_type) .request_body( CreateMessageRequestBody.builder() .receive_id(receive_id) .msg_type(msg_type) .content(content) .build() ) .build() ) response = self._client.im.v1.message.create(request) if not response.success(): logger.error( "Failed to send Feishu {} message: code={}, msg={}, log_id={}", msg_type, response.code, response.msg, response.get_log_id(), ) return None msg_id = getattr(response.data, "message_id", None) logger.debug("Feishu {} message sent to {}: {}", msg_type, receive_id, msg_id) return msg_id except Exception as e: logger.error("Error sending Feishu {} message: {}", msg_type, e) return None def _create_streaming_card_sync(self, receive_id_type: str, chat_id: str) -> str | None: """Create a CardKit streaming card, send it to chat, return card_id.""" from lark_oapi.api.cardkit.v1 import CreateCardRequest, CreateCardRequestBody card_json = { "schema": "2.0", "config": {"wide_screen_mode": True, "update_multi": True, "streaming_mode": True}, "body": { "elements": [{"tag": "markdown", "content": "", "element_id": _STREAM_ELEMENT_ID}] }, } try: request = ( CreateCardRequest.builder() .request_body( CreateCardRequestBody.builder() .type("card_json") .data(json.dumps(card_json, ensure_ascii=False)) .build() ) .build() ) response = self._client.cardkit.v1.card.create(request) if not response.success(): logger.warning( "Failed to create streaming card: code={}, msg={}", response.code, response.msg ) return None card_id = getattr(response.data, "card_id", None) if card_id: message_id = self._send_message_sync( receive_id_type, chat_id, "interactive", json.dumps({"type": "card", "data": {"card_id": card_id}}), ) if message_id: return card_id logger.warning( "Created streaming card {} but failed to send it to {}", card_id, chat_id ) return None except Exception as e: logger.warning("Error creating streaming card: {}", e) return None def _stream_update_text_sync(self, card_id: str, content: str, sequence: int) -> bool: """Stream-update the markdown element on a CardKit card (typewriter effect).""" from lark_oapi.api.cardkit.v1 import ( ContentCardElementRequest, ContentCardElementRequestBody, ) try: request = ( ContentCardElementRequest.builder() .card_id(card_id) .element_id(_STREAM_ELEMENT_ID) .request_body( ContentCardElementRequestBody.builder() .content(content) .sequence(sequence) .build() ) .build() ) response = self._client.cardkit.v1.card_element.content(request) if not response.success(): logger.warning( "Failed to stream-update card {}: code={}, msg={}", card_id, response.code, response.msg, ) return False return True except Exception as e: logger.warning("Error stream-updating card {}: {}", card_id, e) return False def _close_streaming_mode_sync(self, card_id: str, sequence: int) -> bool: """Turn off CardKit streaming_mode so the chat list preview exits the streaming placeholder. Per Feishu docs, streaming cards keep a generating-style summary in the session list until streaming_mode is set to false via card settings (after final content update). Sequence must strictly exceed the previous card OpenAPI operation on this entity. """ from lark_oapi.api.cardkit.v1 import SettingsCardRequest, SettingsCardRequestBody settings_payload = json.dumps({"config": {"streaming_mode": False}}, ensure_ascii=False) try: request = ( SettingsCardRequest.builder() .card_id(card_id) .request_body( SettingsCardRequestBody.builder() .settings(settings_payload) .sequence(sequence) .uuid(str(uuid.uuid4())) .build() ) .build() ) response = self._client.cardkit.v1.card.settings(request) if not response.success(): logger.warning( "Failed to close streaming on card {}: code={}, msg={}", card_id, response.code, response.msg, ) return False return True except Exception as e: logger.warning("Error closing streaming on card {}: {}", card_id, e) return False async def send_delta( self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None ) -> None: """Progressive streaming via CardKit: create card on first delta, stream-update on subsequent. Supported metadata keys: _stream_end: Finalize the streaming card. _tool_hint: Delta is a formatted tool hint (for display only). message_id: Original message id (used with _stream_end for reaction cleanup). reaction_id: Reaction id to remove on stream end. """ if not self._client: return meta = metadata or {} loop = asyncio.get_running_loop() rid_type = "chat_id" if chat_id.startswith("oc_") else "open_id" # --- stream end: final update or fallback --- if meta.get("_stream_end"): if (message_id := meta.get("message_id")) and (reaction_id := meta.get("reaction_id")): await self._remove_reaction(message_id, reaction_id) # Add completion emoji if configured if self.config.done_emoji and message_id: await self._add_reaction(message_id, self.config.done_emoji) buf = self._stream_bufs.pop(chat_id, None) if not buf or not buf.text: return # Try to finalize via streaming card; if that fails (e.g. # streaming mode was closed by Feishu due to timeout), fall # back to sending a regular interactive card. if buf.card_id: buf.sequence += 1 ok = await loop.run_in_executor( None, self._stream_update_text_sync, buf.card_id, buf.text, buf.sequence, ) if ok: buf.sequence += 1 await loop.run_in_executor( None, self._close_streaming_mode_sync, buf.card_id, buf.sequence, ) return logger.warning( "Streaming card {} final update failed, falling back to regular card", buf.card_id, ) for chunk in self._split_elements_by_table_limit( self._build_card_elements(buf.text) ): card = json.dumps( {"config": {"wide_screen_mode": True}, "elements": chunk}, ensure_ascii=False, ) await loop.run_in_executor( None, self._send_message_sync, rid_type, chat_id, "interactive", card ) return # --- accumulate delta --- buf = self._stream_bufs.get(chat_id) if buf is None: buf = _FeishuStreamBuf() self._stream_bufs[chat_id] = buf buf.text += delta if not buf.text.strip(): return now = time.monotonic() if buf.card_id is None: card_id = await loop.run_in_executor( None, self._create_streaming_card_sync, rid_type, chat_id ) if card_id: buf.card_id = card_id buf.sequence = 1 await loop.run_in_executor( None, self._stream_update_text_sync, card_id, buf.text, 1 ) buf.last_edit = now elif (now - buf.last_edit) >= self._STREAM_EDIT_INTERVAL: buf.sequence += 1 await loop.run_in_executor( None, self._stream_update_text_sync, buf.card_id, buf.text, buf.sequence ) buf.last_edit = now async def send(self, msg: OutboundMessage) -> None: """Send a message through Feishu, including media (images/files) if present.""" if not self._client: logger.warning("Feishu client not initialized") return try: receive_id_type = "chat_id" if msg.chat_id.startswith("oc_") else "open_id" loop = asyncio.get_running_loop() # Handle tool hint messages. When a streaming card is active for # this chat, inline the hint into the card instead of sending a # separate message so the user experience stays cohesive. if msg.metadata.get("_tool_hint"): hint = (msg.content or "").strip() if not hint: return buf = self._stream_bufs.get(msg.chat_id) if buf and buf.card_id: # Delegate to send_delta so tool hints get the same # throttling (and card creation) as regular text deltas. await self.send_delta( msg.chat_id, "\n\n" + self._format_tool_hint_delta(hint) + "\n\n", ) return # No active streaming card β€” send as a regular # interactive card with the same πŸ”§ prefix style. card = json.dumps( {"config": {"wide_screen_mode": True}, "elements": [ {"tag": "markdown", "content": self._format_tool_hint_delta(hint)}, ]}, ensure_ascii=False, ) await loop.run_in_executor( None, self._send_message_sync, receive_id_type, msg.chat_id, "interactive", card ) return # Determine whether the first message should quote the user's message. # Only the very first send (media or text) in this call uses reply; subsequent # chunks/media fall back to plain create to avoid redundant quote bubbles. reply_message_id: str | None = None if self.config.reply_to_message and not msg.metadata.get("_progress", False): reply_message_id = msg.metadata.get("message_id") or None # For topic group messages, always reply to keep context in thread elif msg.metadata.get("thread_id"): reply_message_id = ( msg.metadata.get("root_id") or msg.metadata.get("message_id") or None ) first_send = True # tracks whether the reply has already been used def _do_send(m_type: str, content: str) -> None: """Send via reply (first message) or create (subsequent).""" nonlocal first_send if reply_message_id and first_send: first_send = False ok = self._reply_message_sync(reply_message_id, m_type, content) if ok: return # Fall back to regular send if reply fails self._send_message_sync(receive_id_type, msg.chat_id, m_type, content) for file_path in msg.media: if not os.path.isfile(file_path): logger.warning("Media file not found: {}", file_path) continue ext = os.path.splitext(file_path)[1].lower() if ext in self._IMAGE_EXTS: key = await loop.run_in_executor(None, self._upload_image_sync, file_path) if key: await loop.run_in_executor( None, _do_send, "image", json.dumps({"image_key": key}, ensure_ascii=False), ) else: key = await loop.run_in_executor(None, self._upload_file_sync, file_path) if key: # Use msg_type "audio" for audio, "video" for video, "file" for documents. # Feishu requires these specific msg_types for inline playback. # Note: "media" is only valid as a tag inside "post" messages, not as a standalone msg_type. if ext in self._AUDIO_EXTS: media_type = "audio" elif ext in self._VIDEO_EXTS: media_type = "video" else: media_type = "file" await loop.run_in_executor( None, _do_send, media_type, json.dumps({"file_key": key}, ensure_ascii=False), ) if msg.content and msg.content.strip(): fmt = self._detect_msg_format(msg.content) if fmt == "text": # Short plain text – send as simple text message text_body = json.dumps({"text": msg.content.strip()}, ensure_ascii=False) await loop.run_in_executor(None, _do_send, "text", text_body) elif fmt == "post": # Medium content with links – send as rich-text post post_body = self._markdown_to_post(msg.content) await loop.run_in_executor(None, _do_send, "post", post_body) else: # Complex / long content – send as interactive card elements = self._build_card_elements(msg.content) for chunk in self._split_elements_by_table_limit(elements): card = {"config": {"wide_screen_mode": True}, "elements": chunk} await loop.run_in_executor( None, _do_send, "interactive", json.dumps(card, ensure_ascii=False), ) except Exception as e: logger.error("Error sending Feishu message: {}", e) raise def _on_message_sync(self, data: Any) -> None: """ Sync handler for incoming messages (called from WebSocket thread). Schedules async handling in the main event loop. """ if self._loop and self._loop.is_running(): asyncio.run_coroutine_threadsafe(self._on_message(data), self._loop) async def _on_message(self, data: P2ImMessageReceiveV1) -> None: """Handle incoming message from Feishu.""" try: event = data.event message = event.message sender = event.sender logger.debug("Feishu raw message: {}", message.content) logger.debug("Feishu mentions: {}", getattr(message, "mentions", None)) # Deduplication check message_id = message.message_id if message_id in self._processed_message_ids: return self._processed_message_ids[message_id] = None # Trim cache while len(self._processed_message_ids) > 1000: self._processed_message_ids.popitem(last=False) # Skip bot messages if sender.sender_type == "bot": return sender_id = sender.sender_id.open_id if sender.sender_id else "unknown" chat_id = message.chat_id chat_type = message.chat_type msg_type = message.message_type if chat_type == "group" and not self._is_group_message_for_bot(message): logger.debug("Feishu: skipping group message (not mentioned)") return # Add reaction reaction_id = await self._add_reaction(message_id, self.config.react_emoji) # Parse content content_parts = [] media_paths = [] try: content_json = json.loads(message.content) if message.content else {} except json.JSONDecodeError: content_json = {} if msg_type == "text": text = content_json.get("text", "") if text: mentions = getattr(message, "mentions", None) text = self._resolve_mentions(text, mentions) content_parts.append(text) elif msg_type == "post": text, image_keys = _extract_post_content(content_json) if text: content_parts.append(text) # Download images embedded in post for img_key in image_keys: file_path, content_text = await self._download_and_save_media( "image", {"image_key": img_key}, message_id ) if file_path: media_paths.append(file_path) content_parts.append(content_text) elif msg_type in ("image", "audio", "file", "media"): file_path, content_text = await self._download_and_save_media( msg_type, content_json, message_id ) if file_path: media_paths.append(file_path) if msg_type == "audio" and file_path: transcription = await self.transcribe_audio(file_path) if transcription: content_text = f"[transcription: {transcription}]" content_parts.append(content_text) elif msg_type in ( "share_chat", "share_user", "interactive", "share_calendar_event", "system", "merge_forward", ): # Handle share cards and interactive messages text = _extract_share_card_content(content_json, msg_type) if text: content_parts.append(text) else: content_parts.append(MSG_TYPE_MAP.get(msg_type, f"[{msg_type}]")) # Extract reply context (parent/root message IDs) parent_id = getattr(message, "parent_id", None) or None root_id = getattr(message, "root_id", None) or None thread_id = getattr(message, "thread_id", None) or None # Prepend quoted message text when the user replied to another message if parent_id and self._client: loop = asyncio.get_running_loop() reply_ctx = await loop.run_in_executor( None, self._get_message_content_sync, parent_id ) if reply_ctx: content_parts.insert(0, reply_ctx) content = "\n".join(content_parts) if content_parts else "" if not content and not media_paths: return # Build topic-scoped session key for conversation isolation. # Group chat: thread replies (root_id != message_id) get a scoped # session so each Feishu thread has its own conversation context. # Private chat: no override β€” same behavior as Telegram/Slack. if chat_type == "group" and root_id and root_id != message_id: session_key = f"feishu:{chat_id}:{root_id}" else: session_key = None # Forward to message bus reply_to = chat_id if chat_type == "group" else sender_id await self._handle_message( sender_id=sender_id, chat_id=reply_to, content=content, media=media_paths, metadata={ "message_id": message_id, "reaction_id": reaction_id, "chat_type": chat_type, "msg_type": msg_type, "parent_id": parent_id, "root_id": root_id, "thread_id": thread_id, }, session_key=session_key, ) except Exception as e: logger.error("Error processing Feishu message: {}", e) def _on_reaction_created(self, data: Any) -> None: """Ignore reaction events so they do not generate SDK noise.""" pass def _on_reaction_deleted(self, data: Any) -> None: """Ignore reaction deleted events so they do not generate SDK noise.""" pass def _on_message_read(self, data: Any) -> None: """Ignore read events so they do not generate SDK noise.""" pass def _on_bot_p2p_chat_entered(self, data: Any) -> None: """Ignore p2p-enter events when a user opens a bot chat.""" logger.debug("Bot entered p2p chat (user opened chat window)") pass @staticmethod def _format_tool_hint_lines(tool_hint: str) -> str: """Split tool hints across lines on top-level call separators only.""" parts: list[str] = [] buf: list[str] = [] depth = 0 in_string = False quote_char = "" escaped = False for i, ch in enumerate(tool_hint): buf.append(ch) if in_string: if escaped: escaped = False elif ch == "\\": escaped = True elif ch == quote_char: in_string = False continue if ch in {'"', "'"}: in_string = True quote_char = ch continue if ch == "(": depth += 1 continue if ch == ")" and depth > 0: depth -= 1 continue if ch == "," and depth == 0: next_char = tool_hint[i + 1] if i + 1 < len(tool_hint) else "" if next_char == " ": parts.append("".join(buf).rstrip()) buf = [] if buf: parts.append("".join(buf).strip()) return "\n".join(part for part in parts if part) def _format_tool_hint_delta(self, tool_hint: str) -> str: """Format a tool hint string with the πŸ”§ prefix for each line.""" lines = self.__class__._format_tool_hint_lines(tool_hint).split("\n") return "\n".join( f"{self.config.tool_hint_prefix} {ln}" for ln in lines if ln.strip() )