From 0291d1f716bf0205914937f9973aa89d4faac4ee Mon Sep 17 00:00:00 2001 From: wudongxue Date: Mon, 30 Mar 2026 18:11:01 +0800 Subject: [PATCH] feat: resolve mentions data --- nanobot/channels/feishu.py | 477 ++++++++++++++++++++++++++----------- 1 file changed, 337 insertions(+), 140 deletions(-) diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index c6124d0c9..60e59a3ba 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -11,6 +11,8 @@ from collections import OrderedDict from dataclasses import dataclass from typing import Any, Literal +from lark_oapi.api.im.v1.model import P2ImMessageReceiveV1, MentionEvent + from loguru import logger from nanobot.bus.events import OutboundMessage @@ -75,7 +77,9 @@ def _extract_interactive_content(content: dict) -> list[str]: elif isinstance(title, str): parts.append(f"title: {title}") - for elements in content.get("elements", []) if isinstance(content.get("elements"), list) else []: + for elements in ( + content.get("elements", []) if isinstance(content.get("elements"), list) else [] + ): for element in elements: parts.extend(_extract_element_content(element)) @@ -259,6 +263,7 @@ _STREAM_ELEMENT_ID = "streaming_md" @dataclass class _FeishuStreamBuf: """Per-chat streaming accumulator using CardKit streaming API.""" + text: str = "" card_id: str | None = None sequence: int = 0 @@ -316,21 +321,22 @@ class FeishuChannel(BaseChannel): return import lark_oapi as lark + self._running = True self._loop = asyncio.get_running_loop() # Create Lark client for sending messages - self._client = lark.Client.builder() \ - .app_id(self.config.app_id) \ - .app_secret(self.config.app_secret) \ - .log_level(lark.LogLevel.INFO) \ + self._client = ( + lark.Client.builder() + .app_id(self.config.app_id) + .app_secret(self.config.app_secret) + .log_level(lark.LogLevel.INFO) .build() + ) builder = lark.EventDispatcherHandler.builder( self.config.encrypt_key or "", self.config.verification_token or "", - ).register_p2_im_message_receive_v1( - self._on_message_sync - ) + ).register_p2_im_message_receive_v1(self._on_message_sync) builder = self._register_optional_event( builder, "register_p2_im_message_reaction_created_v1", self._on_reaction_created ) @@ -349,7 +355,7 @@ class FeishuChannel(BaseChannel): self.config.app_id, self.config.app_secret, event_handler=event_handler, - log_level=lark.LogLevel.INFO + log_level=lark.LogLevel.INFO, ) # Start WebSocket client in a separate thread with reconnect loop. @@ -360,6 +366,7 @@ class FeishuChannel(BaseChannel): def run_ws(): import time import lark_oapi.ws.client as _lark_ws_client + ws_loop = asyncio.new_event_loop() asyncio.set_event_loop(ws_loop) # Patch the module-level loop used by lark's ws Client.start() @@ -409,13 +416,17 @@ class FeishuChannel(BaseChannel): """Fetch the bot's own open_id via GET /open-apis/bot/v3/info.""" try: import lark_oapi as lark - request = lark.RawRequest.builder() \ - .http_method(lark.HttpMethod.GET) \ - .uri("/open-apis/bot/v3/info") \ + + request = ( + lark.RawRequest.builder() + .http_method(lark.HttpMethod.GET) + .uri("/open-apis/bot/v3/info") .build() + ) response = self._client.request(request) if response.success(): import json + data = json.loads(response.raw.content) bot = (data.get("data") or data).get("bot") or data.get("bot") or {} return bot.get("open_id") @@ -425,6 +436,45 @@ class FeishuChannel(BaseChannel): logger.warning("Error fetching bot info: {}", e) return None + @staticmethod + def _resolve_mentions(text: str, mentions: list[MentionEvent] | None) -> str: + """Replace @_user_n placeholders with actual user info from mentions. + + Args: + text: The message text containing @_user_n placeholders + mentions: List of mention objects from Feishu message + + Returns: + Text with placeholders replaced by @姓名 (open_id) + """ + if not mentions or not text: + return text + + for mention in mentions: + key = mention.key or None + if not key or key not in text: + continue + + userID = mention.id or None + if not userID: + continue + + open_id = userID.open_id + user_id = userID.user_id + name = mention.name or key + + # Format: @姓名 (open_id, user_id: xxx) + if open_id and user_id: + replacement = f"@{name} ({open_id}, user id: {user_id})" + elif open_id: + replacement = f"@{name} ({open_id})" + else: + replacement = f"@{name}" + + text = text.replace(key, replacement) + + return text + def _is_bot_mentioned(self, message: Any) -> bool: """Check if the bot is @mentioned in the message.""" raw_content = message.content or "" @@ -453,20 +503,30 @@ class FeishuChannel(BaseChannel): def _add_reaction_sync(self, message_id: str, emoji_type: str) -> str | None: """Sync helper for adding reaction (runs in thread pool).""" - from lark_oapi.api.im.v1 import CreateMessageReactionRequest, CreateMessageReactionRequestBody, Emoji + from lark_oapi.api.im.v1 import ( + CreateMessageReactionRequest, + CreateMessageReactionRequestBody, + Emoji, + ) + try: - request = CreateMessageReactionRequest.builder() \ - .message_id(message_id) \ + request = ( + CreateMessageReactionRequest.builder() + .message_id(message_id) .request_body( CreateMessageReactionRequestBody.builder() .reaction_type(Emoji.builder().emoji_type(emoji_type).build()) .build() - ).build() + ) + .build() + ) response = self._client.im.v1.message_reaction.create(request) if not response.success(): - logger.warning("Failed to add reaction: code={}, msg={}", response.code, response.msg) + logger.warning( + "Failed to add reaction: code={}, msg={}", response.code, response.msg + ) return None else: logger.debug("Added {} reaction to message {}", emoji_type, message_id) @@ -490,17 +550,22 @@ class FeishuChannel(BaseChannel): def _remove_reaction_sync(self, message_id: str, reaction_id: str) -> None: """Sync helper for removing reaction (runs in thread pool).""" from lark_oapi.api.im.v1 import DeleteMessageReactionRequest + try: - request = DeleteMessageReactionRequest.builder() \ - .message_id(message_id) \ - .reaction_id(reaction_id) \ + request = ( + DeleteMessageReactionRequest.builder() + .message_id(message_id) + .reaction_id(reaction_id) .build() + ) response = self._client.im.v1.message_reaction.delete(request) if response.success(): logger.debug("Removed reaction {} from message {}", reaction_id, message_id) else: - logger.debug("Failed to remove reaction: code={}, msg={}", response.code, response.msg) + logger.debug( + "Failed to remove reaction: code={}, msg={}", response.code, response.msg + ) except Exception as e: logger.debug("Error removing reaction: {}", e) @@ -555,27 +620,35 @@ class FeishuChannel(BaseChannel): lines = [_line.strip() for _line in table_text.strip().split("\n") if _line.strip()] if len(lines) < 3: return None + def split(_line: str) -> list[str]: return [c.strip() for c in _line.strip("|").split("|")] + headers = [cls._strip_md_formatting(h) for h in split(lines[0])] rows = [[cls._strip_md_formatting(c) for c in split(_line)] for _line in lines[2:]] - columns = [{"tag": "column", "name": f"c{i}", "display_name": h, "width": "auto"} - for i, h in enumerate(headers)] + columns = [ + {"tag": "column", "name": f"c{i}", "display_name": h, "width": "auto"} + for i, h in enumerate(headers) + ] return { "tag": "table", "page_size": len(rows) + 1, "columns": columns, - "rows": [{f"c{i}": r[i] if i < len(r) else "" for i in range(len(headers))} for r in rows], + "rows": [ + {f"c{i}": r[i] if i < len(r) else "" for i in range(len(headers))} for r in rows + ], } def _build_card_elements(self, content: str) -> list[dict]: """Split content into div/markdown + table elements for Feishu card.""" elements, last_end = [], 0 for m in self._TABLE_RE.finditer(content): - before = content[last_end:m.start()] + before = content[last_end : m.start()] if before.strip(): elements.extend(self._split_headings(before)) - elements.append(self._parse_md_table(m.group(1)) or {"tag": "markdown", "content": m.group(1)}) + elements.append( + self._parse_md_table(m.group(1)) or {"tag": "markdown", "content": m.group(1)} + ) last_end = m.end() remaining = content[last_end:] if remaining.strip(): @@ -583,7 +656,9 @@ class FeishuChannel(BaseChannel): return elements or [{"tag": "markdown", "content": content}] @staticmethod - def _split_elements_by_table_limit(elements: list[dict], max_tables: int = 1) -> list[list[dict]]: + def _split_elements_by_table_limit( + elements: list[dict], max_tables: int = 1 + ) -> list[list[dict]]: """Split card elements into groups with at most *max_tables* table elements each. Feishu cards have a hard limit of one table per card (API error 11310). @@ -616,23 +691,25 @@ class FeishuChannel(BaseChannel): code_blocks = [] for m in self._CODE_BLOCK_RE.finditer(content): code_blocks.append(m.group(1)) - protected = protected.replace(m.group(1), f"\x00CODE{len(code_blocks)-1}\x00", 1) + protected = protected.replace(m.group(1), f"\x00CODE{len(code_blocks) - 1}\x00", 1) elements = [] last_end = 0 for m in self._HEADING_RE.finditer(protected): - before = protected[last_end:m.start()].strip() + before = protected[last_end : m.start()].strip() if before: elements.append({"tag": "markdown", "content": before}) text = self._strip_md_formatting(m.group(2).strip()) display_text = f"**{text}**" if text else "" - elements.append({ - "tag": "div", - "text": { - "tag": "lark_md", - "content": display_text, - }, - }) + elements.append( + { + "tag": "div", + "text": { + "tag": "lark_md", + "content": display_text, + }, + } + ) last_end = m.end() remaining = protected[last_end:].strip() if remaining: @@ -648,19 +725,19 @@ class FeishuChannel(BaseChannel): # ── Smart format detection ────────────────────────────────────────── # Patterns that indicate "complex" markdown needing card rendering _COMPLEX_MD_RE = re.compile( - r"```" # fenced code block + r"```" # fenced code block r"|^\|.+\|.*\n\s*\|[-:\s|]+\|" # markdown table (header + separator) - r"|^#{1,6}\s+" # headings - , re.MULTILINE, + r"|^#{1,6}\s+", # headings + re.MULTILINE, ) # Simple markdown patterns (bold, italic, strikethrough) _SIMPLE_MD_RE = re.compile( - r"\*\*.+?\*\*" # **bold** - r"|__.+?__" # __bold__ + r"\*\*.+?\*\*" # **bold** + r"|__.+?__" # __bold__ r"|(? str | None: """Upload an image to Feishu and return the image_key.""" from lark_oapi.api.im.v1 import CreateImageRequest, CreateImageRequestBody + try: with open(file_path, "rb") as f: - request = CreateImageRequest.builder() \ + request = ( + CreateImageRequest.builder() .request_body( - CreateImageRequestBody.builder() - .image_type("message") - .image(f) - .build() - ).build() + CreateImageRequestBody.builder().image_type("message").image(f).build() + ) + .build() + ) response = self._client.im.v1.image.create(request) if response.success(): image_key = response.data.image_key logger.debug("Uploaded image {}: {}", os.path.basename(file_path), image_key) return image_key else: - logger.error("Failed to upload image: code={}, msg={}", response.code, response.msg) + logger.error( + "Failed to upload image: code={}, msg={}", response.code, response.msg + ) return None except Exception as e: logger.error("Error uploading image {}: {}", file_path, e) @@ -795,49 +884,62 @@ class FeishuChannel(BaseChannel): def _upload_file_sync(self, file_path: str) -> str | None: """Upload a file to Feishu and return the file_key.""" from lark_oapi.api.im.v1 import CreateFileRequest, CreateFileRequestBody + ext = os.path.splitext(file_path)[1].lower() file_type = self._FILE_TYPE_MAP.get(ext, "stream") file_name = os.path.basename(file_path) try: with open(file_path, "rb") as f: - request = CreateFileRequest.builder() \ + request = ( + CreateFileRequest.builder() .request_body( CreateFileRequestBody.builder() .file_type(file_type) .file_name(file_name) .file(f) .build() - ).build() + ) + .build() + ) response = self._client.im.v1.file.create(request) if response.success(): file_key = response.data.file_key logger.debug("Uploaded file {}: {}", file_name, file_key) return file_key else: - logger.error("Failed to upload file: code={}, msg={}", response.code, response.msg) + logger.error( + "Failed to upload file: code={}, msg={}", response.code, response.msg + ) return None except Exception as e: logger.error("Error uploading file {}: {}", file_path, e) return None - def _download_image_sync(self, message_id: str, image_key: str) -> tuple[bytes | None, str | None]: + def _download_image_sync( + self, message_id: str, image_key: str + ) -> tuple[bytes | None, str | None]: """Download an image from Feishu message by message_id and image_key.""" from lark_oapi.api.im.v1 import GetMessageResourceRequest + try: - request = GetMessageResourceRequest.builder() \ - .message_id(message_id) \ - .file_key(image_key) \ - .type("image") \ + request = ( + GetMessageResourceRequest.builder() + .message_id(message_id) + .file_key(image_key) + .type("image") .build() + ) response = self._client.im.v1.message_resource.get(request) if response.success(): file_data = response.file # GetMessageResourceRequest returns BytesIO, need to read bytes - if hasattr(file_data, 'read'): + if hasattr(file_data, "read"): file_data = file_data.read() return file_data, response.file_name else: - logger.error("Failed to download image: code={}, msg={}", response.code, response.msg) + logger.error( + "Failed to download image: code={}, msg={}", response.code, response.msg + ) return None, None except Exception as e: logger.error("Error downloading image {}: {}", image_key, e) @@ -869,17 +971,19 @@ class FeishuChannel(BaseChannel): file_data = file_data.read() return file_data, response.file_name else: - logger.error("Failed to download {}: code={}, msg={}", resource_type, response.code, response.msg) + logger.error( + "Failed to download {}: code={}, msg={}", + resource_type, + response.code, + response.msg, + ) return None, None except Exception: logger.exception("Error downloading {} {}", resource_type, file_key) return None, None async def _download_and_save_media( - self, - msg_type: str, - content_json: dict, - message_id: str | None = None + self, msg_type: str, content_json: dict, message_id: str | None = None ) -> tuple[str | None, str]: """ Download media from Feishu and save to local disk. @@ -928,13 +1032,16 @@ class FeishuChannel(BaseChannel): Returns a "[Reply to: ...]" context string, or None on failure. """ from lark_oapi.api.im.v1 import GetMessageRequest + try: request = GetMessageRequest.builder().message_id(message_id).build() response = self._client.im.v1.message.get(request) if not response.success(): logger.debug( "Feishu: could not fetch parent message {}: code={}, msg={}", - message_id, response.code, response.msg, + message_id, + response.code, + response.msg, ) return None items = getattr(response.data, "items", None) @@ -969,20 +1076,24 @@ class FeishuChannel(BaseChannel): def _reply_message_sync(self, parent_message_id: str, msg_type: str, content: str) -> bool: """Reply to an existing Feishu message using the Reply API (synchronous).""" from lark_oapi.api.im.v1 import ReplyMessageRequest, ReplyMessageRequestBody + try: - request = ReplyMessageRequest.builder() \ - .message_id(parent_message_id) \ + request = ( + ReplyMessageRequest.builder() + .message_id(parent_message_id) .request_body( - ReplyMessageRequestBody.builder() - .msg_type(msg_type) - .content(content) - .build() - ).build() + ReplyMessageRequestBody.builder().msg_type(msg_type).content(content).build() + ) + .build() + ) response = self._client.im.v1.message.reply(request) if not response.success(): logger.error( "Failed to reply to Feishu message {}: code={}, msg={}, log_id={}", - parent_message_id, response.code, response.msg, response.get_log_id() + parent_message_id, + response.code, + response.msg, + response.get_log_id(), ) return False logger.debug("Feishu reply sent to message {}", parent_message_id) @@ -991,24 +1102,33 @@ class FeishuChannel(BaseChannel): logger.error("Error replying to Feishu message {}: {}", parent_message_id, e) return False - def _send_message_sync(self, receive_id_type: str, receive_id: str, msg_type: str, content: str) -> str | None: + def _send_message_sync( + self, receive_id_type: str, receive_id: str, msg_type: str, content: str + ) -> str | None: """Send a single message and return the message_id on success.""" from lark_oapi.api.im.v1 import CreateMessageRequest, CreateMessageRequestBody + try: - request = CreateMessageRequest.builder() \ - .receive_id_type(receive_id_type) \ + request = ( + CreateMessageRequest.builder() + .receive_id_type(receive_id_type) .request_body( CreateMessageRequestBody.builder() .receive_id(receive_id) .msg_type(msg_type) .content(content) .build() - ).build() + ) + .build() + ) response = self._client.im.v1.message.create(request) if not response.success(): logger.error( "Failed to send Feishu {} message: code={}, msg={}, log_id={}", - msg_type, response.code, response.msg, response.get_log_id() + msg_type, + response.code, + response.msg, + response.get_log_id(), ) return None msg_id = getattr(response.data, "message_id", None) @@ -1021,31 +1141,44 @@ class FeishuChannel(BaseChannel): def _create_streaming_card_sync(self, receive_id_type: str, chat_id: str) -> str | None: """Create a CardKit streaming card, send it to chat, return card_id.""" from lark_oapi.api.cardkit.v1 import CreateCardRequest, CreateCardRequestBody + card_json = { "schema": "2.0", "config": {"wide_screen_mode": True, "update_multi": True, "streaming_mode": True}, - "body": {"elements": [{"tag": "markdown", "content": "", "element_id": _STREAM_ELEMENT_ID}]}, + "body": { + "elements": [{"tag": "markdown", "content": "", "element_id": _STREAM_ELEMENT_ID}] + }, } try: - request = CreateCardRequest.builder().request_body( - CreateCardRequestBody.builder() - .type("card_json") - .data(json.dumps(card_json, ensure_ascii=False)) + request = ( + CreateCardRequest.builder() + .request_body( + CreateCardRequestBody.builder() + .type("card_json") + .data(json.dumps(card_json, ensure_ascii=False)) + .build() + ) .build() - ).build() + ) response = self._client.cardkit.v1.card.create(request) if not response.success(): - logger.warning("Failed to create streaming card: code={}, msg={}", response.code, response.msg) + logger.warning( + "Failed to create streaming card: code={}, msg={}", response.code, response.msg + ) return None card_id = getattr(response.data, "card_id", None) if card_id: message_id = self._send_message_sync( - receive_id_type, chat_id, "interactive", + receive_id_type, + chat_id, + "interactive", json.dumps({"type": "card", "data": {"card_id": card_id}}), ) if message_id: return card_id - logger.warning("Created streaming card {} but failed to send it to {}", card_id, chat_id) + logger.warning( + "Created streaming card {} but failed to send it to {}", card_id, chat_id + ) return None except Exception as e: logger.warning("Error creating streaming card: {}", e) @@ -1053,18 +1186,32 @@ class FeishuChannel(BaseChannel): def _stream_update_text_sync(self, card_id: str, content: str, sequence: int) -> bool: """Stream-update the markdown element on a CardKit card (typewriter effect).""" - from lark_oapi.api.cardkit.v1 import ContentCardElementRequest, ContentCardElementRequestBody + from lark_oapi.api.cardkit.v1 import ( + ContentCardElementRequest, + ContentCardElementRequestBody, + ) + try: - request = ContentCardElementRequest.builder() \ - .card_id(card_id) \ - .element_id(_STREAM_ELEMENT_ID) \ + request = ( + ContentCardElementRequest.builder() + .card_id(card_id) + .element_id(_STREAM_ELEMENT_ID) .request_body( ContentCardElementRequestBody.builder() - .content(content).sequence(sequence).build() - ).build() + .content(content) + .sequence(sequence) + .build() + ) + .build() + ) response = self._client.cardkit.v1.card_element.content(request) if not response.success(): - logger.warning("Failed to stream-update card {}: code={}, msg={}", card_id, response.code, response.msg) + logger.warning( + "Failed to stream-update card {}: code={}, msg={}", + card_id, + response.code, + response.msg, + ) return False return True except Exception as e: @@ -1079,22 +1226,28 @@ class FeishuChannel(BaseChannel): Sequence must strictly exceed the previous card OpenAPI operation on this entity. """ from lark_oapi.api.cardkit.v1 import SettingsCardRequest, SettingsCardRequestBody + settings_payload = json.dumps({"config": {"streaming_mode": False}}, ensure_ascii=False) try: - request = SettingsCardRequest.builder() \ - .card_id(card_id) \ + request = ( + SettingsCardRequest.builder() + .card_id(card_id) .request_body( SettingsCardRequestBody.builder() .settings(settings_payload) .sequence(sequence) .uuid(str(uuid.uuid4())) .build() - ).build() + ) + .build() + ) response = self._client.cardkit.v1.card.settings(request) if not response.success(): logger.warning( "Failed to close streaming on card {}: code={}, msg={}", - card_id, response.code, response.msg, + card_id, + response.code, + response.msg, ) return False return True @@ -1102,7 +1255,9 @@ class FeishuChannel(BaseChannel): logger.warning("Error closing streaming on card {}: {}", card_id, e) return False - async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None: + async def send_delta( + self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None + ) -> None: """Progressive streaming via CardKit: create card on first delta, stream-update on subsequent.""" if not self._client: return @@ -1121,17 +1276,31 @@ class FeishuChannel(BaseChannel): if buf.card_id: buf.sequence += 1 await loop.run_in_executor( - None, self._stream_update_text_sync, buf.card_id, buf.text, buf.sequence, + None, + self._stream_update_text_sync, + buf.card_id, + buf.text, + buf.sequence, ) # Required so the chat list preview exits the streaming placeholder (Feishu streaming card docs). buf.sequence += 1 await loop.run_in_executor( - None, self._close_streaming_mode_sync, buf.card_id, buf.sequence, + None, + self._close_streaming_mode_sync, + buf.card_id, + buf.sequence, ) else: - for chunk in self._split_elements_by_table_limit(self._build_card_elements(buf.text)): - card = json.dumps({"config": {"wide_screen_mode": True}, "elements": chunk}, ensure_ascii=False) - await loop.run_in_executor(None, self._send_message_sync, rid_type, chat_id, "interactive", card) + for chunk in self._split_elements_by_table_limit( + self._build_card_elements(buf.text) + ): + card = json.dumps( + {"config": {"wide_screen_mode": True}, "elements": chunk}, + ensure_ascii=False, + ) + await loop.run_in_executor( + None, self._send_message_sync, rid_type, chat_id, "interactive", card + ) return # --- accumulate delta --- @@ -1145,15 +1314,21 @@ class FeishuChannel(BaseChannel): now = time.monotonic() if buf.card_id is None: - card_id = await loop.run_in_executor(None, self._create_streaming_card_sync, rid_type, chat_id) + card_id = await loop.run_in_executor( + None, self._create_streaming_card_sync, rid_type, chat_id + ) if card_id: buf.card_id = card_id buf.sequence = 1 - await loop.run_in_executor(None, self._stream_update_text_sync, card_id, buf.text, 1) + await loop.run_in_executor( + None, self._stream_update_text_sync, card_id, buf.text, 1 + ) buf.last_edit = now elif (now - buf.last_edit) >= self._STREAM_EDIT_INTERVAL: buf.sequence += 1 - await loop.run_in_executor(None, self._stream_update_text_sync, buf.card_id, buf.text, buf.sequence) + await loop.run_in_executor( + None, self._stream_update_text_sync, buf.card_id, buf.text, buf.sequence + ) buf.last_edit = now async def send(self, msg: OutboundMessage) -> None: @@ -1179,14 +1354,13 @@ class FeishuChannel(BaseChannel): # Only the very first send (media or text) in this call uses reply; subsequent # chunks/media fall back to plain create to avoid redundant quote bubbles. reply_message_id: str | None = None - if ( - self.config.reply_to_message - and not msg.metadata.get("_progress", False) - ): + if self.config.reply_to_message and not msg.metadata.get("_progress", False): reply_message_id = msg.metadata.get("message_id") or None # For topic group messages, always reply to keep context in thread elif msg.metadata.get("thread_id"): - reply_message_id = msg.metadata.get("root_id") or msg.metadata.get("message_id") or None + reply_message_id = ( + msg.metadata.get("root_id") or msg.metadata.get("message_id") or None + ) first_send = True # tracks whether the reply has already been used @@ -1210,8 +1384,10 @@ class FeishuChannel(BaseChannel): key = await loop.run_in_executor(None, self._upload_image_sync, file_path) if key: await loop.run_in_executor( - None, _do_send, - "image", json.dumps({"image_key": key}, ensure_ascii=False), + None, + _do_send, + "image", + json.dumps({"image_key": key}, ensure_ascii=False), ) else: key = await loop.run_in_executor(None, self._upload_file_sync, file_path) @@ -1226,8 +1402,10 @@ class FeishuChannel(BaseChannel): else: media_type = "file" await loop.run_in_executor( - None, _do_send, - media_type, json.dumps({"file_key": key}, ensure_ascii=False), + None, + _do_send, + media_type, + json.dumps({"file_key": key}, ensure_ascii=False), ) if msg.content and msg.content.strip(): @@ -1249,8 +1427,10 @@ class FeishuChannel(BaseChannel): for chunk in self._split_elements_by_table_limit(elements): card = {"config": {"wide_screen_mode": True}, "elements": chunk} await loop.run_in_executor( - None, _do_send, - "interactive", json.dumps(card, ensure_ascii=False), + None, + _do_send, + "interactive", + json.dumps(card, ensure_ascii=False), ) except Exception as e: @@ -1265,13 +1445,16 @@ class FeishuChannel(BaseChannel): if self._loop and self._loop.is_running(): asyncio.run_coroutine_threadsafe(self._on_message(data), self._loop) - async def _on_message(self, data: Any) -> None: + async def _on_message(self, data: P2ImMessageReceiveV1) -> None: """Handle incoming message from Feishu.""" try: event = data.event message = event.message sender = event.sender - + + logger.debug("Feishu raw message: {}", message.content) + logger.debug("Feishu mentions: {}", getattr(message, "mentions", None)) + # Deduplication check message_id = message.message_id if message_id in self._processed_message_ids: @@ -1310,6 +1493,8 @@ class FeishuChannel(BaseChannel): if msg_type == "text": text = content_json.get("text", "") if text: + mentions = getattr(message, "mentions", None) + text = self._resolve_mentions(text, mentions) content_parts.append(text) elif msg_type == "post": @@ -1326,7 +1511,9 @@ class FeishuChannel(BaseChannel): content_parts.append(content_text) elif msg_type in ("image", "audio", "file", "media"): - file_path, content_text = await self._download_and_save_media(msg_type, content_json, message_id) + file_path, content_text = await self._download_and_save_media( + msg_type, content_json, message_id + ) if file_path: media_paths.append(file_path) @@ -1337,7 +1524,14 @@ class FeishuChannel(BaseChannel): content_parts.append(content_text) - elif msg_type in ("share_chat", "share_user", "interactive", "share_calendar_event", "system", "merge_forward"): + elif msg_type in ( + "share_chat", + "share_user", + "interactive", + "share_calendar_event", + "system", + "merge_forward", + ): # Handle share cards and interactive messages text = _extract_share_card_content(content_json, msg_type) if text: @@ -1380,8 +1574,9 @@ class FeishuChannel(BaseChannel): "parent_id": parent_id, "root_id": root_id, "thread_id": thread_id, - } + }, ) + await self._del_reaction(message_id, reaction_id) except Exception as e: logger.error("Error processing Feishu message: {}", e) @@ -1445,7 +1640,9 @@ class FeishuChannel(BaseChannel): return "\n".join(part for part in parts if part) - async def _send_tool_hint_card(self, receive_id_type: str, receive_id: str, tool_hint: str) -> None: + async def _send_tool_hint_card( + self, receive_id_type: str, receive_id: str, tool_hint: str + ) -> None: """Send tool hint as an interactive card with formatted code block. Args: @@ -1461,15 +1658,15 @@ class FeishuChannel(BaseChannel): card = { "config": {"wide_screen_mode": True}, "elements": [ - { - "tag": "markdown", - "content": f"**Tool Calls**\n\n```text\n{formatted_code}\n```" - } - ] + {"tag": "markdown", "content": f"**Tool Calls**\n\n```text\n{formatted_code}\n```"} + ], } await loop.run_in_executor( - None, self._send_message_sync, - receive_id_type, receive_id, "interactive", + None, + self._send_message_sync, + receive_id_type, + receive_id, + "interactive", json.dumps(card, ensure_ascii=False), )