feat: resolve mentions data

This commit is contained in:
wudongxue 2026-03-30 18:11:01 +08:00 committed by Xubin Ren
parent 075bdd5c3c
commit 0291d1f716

View File

@ -11,6 +11,8 @@ from collections import OrderedDict
from dataclasses import dataclass
from typing import Any, Literal
from lark_oapi.api.im.v1.model import P2ImMessageReceiveV1, MentionEvent
from loguru import logger
from nanobot.bus.events import OutboundMessage
@ -75,7 +77,9 @@ def _extract_interactive_content(content: dict) -> list[str]:
elif isinstance(title, str):
parts.append(f"title: {title}")
for elements in content.get("elements", []) if isinstance(content.get("elements"), list) else []:
for elements in (
content.get("elements", []) if isinstance(content.get("elements"), list) else []
):
for element in elements:
parts.extend(_extract_element_content(element))
@ -259,6 +263,7 @@ _STREAM_ELEMENT_ID = "streaming_md"
@dataclass
class _FeishuStreamBuf:
"""Per-chat streaming accumulator using CardKit streaming API."""
text: str = ""
card_id: str | None = None
sequence: int = 0
@ -316,21 +321,22 @@ class FeishuChannel(BaseChannel):
return
import lark_oapi as lark
self._running = True
self._loop = asyncio.get_running_loop()
# Create Lark client for sending messages
self._client = lark.Client.builder() \
.app_id(self.config.app_id) \
.app_secret(self.config.app_secret) \
.log_level(lark.LogLevel.INFO) \
self._client = (
lark.Client.builder()
.app_id(self.config.app_id)
.app_secret(self.config.app_secret)
.log_level(lark.LogLevel.INFO)
.build()
)
builder = lark.EventDispatcherHandler.builder(
self.config.encrypt_key or "",
self.config.verification_token or "",
).register_p2_im_message_receive_v1(
self._on_message_sync
)
).register_p2_im_message_receive_v1(self._on_message_sync)
builder = self._register_optional_event(
builder, "register_p2_im_message_reaction_created_v1", self._on_reaction_created
)
@ -349,7 +355,7 @@ class FeishuChannel(BaseChannel):
self.config.app_id,
self.config.app_secret,
event_handler=event_handler,
log_level=lark.LogLevel.INFO
log_level=lark.LogLevel.INFO,
)
# Start WebSocket client in a separate thread with reconnect loop.
@ -360,6 +366,7 @@ class FeishuChannel(BaseChannel):
def run_ws():
import time
import lark_oapi.ws.client as _lark_ws_client
ws_loop = asyncio.new_event_loop()
asyncio.set_event_loop(ws_loop)
# Patch the module-level loop used by lark's ws Client.start()
@ -409,13 +416,17 @@ class FeishuChannel(BaseChannel):
"""Fetch the bot's own open_id via GET /open-apis/bot/v3/info."""
try:
import lark_oapi as lark
request = lark.RawRequest.builder() \
.http_method(lark.HttpMethod.GET) \
.uri("/open-apis/bot/v3/info") \
request = (
lark.RawRequest.builder()
.http_method(lark.HttpMethod.GET)
.uri("/open-apis/bot/v3/info")
.build()
)
response = self._client.request(request)
if response.success():
import json
data = json.loads(response.raw.content)
bot = (data.get("data") or data).get("bot") or data.get("bot") or {}
return bot.get("open_id")
@ -425,6 +436,45 @@ class FeishuChannel(BaseChannel):
logger.warning("Error fetching bot info: {}", e)
return None
@staticmethod
def _resolve_mentions(text: str, mentions: list[MentionEvent] | None) -> str:
"""Replace @_user_n placeholders with actual user info from mentions.
Args:
text: The message text containing @_user_n placeholders
mentions: List of mention objects from Feishu message
Returns:
Text with placeholders replaced by @姓名 (open_id)
"""
if not mentions or not text:
return text
for mention in mentions:
key = mention.key or None
if not key or key not in text:
continue
userID = mention.id or None
if not userID:
continue
open_id = userID.open_id
user_id = userID.user_id
name = mention.name or key
# Format: @姓名 (open_id, user_id: xxx)
if open_id and user_id:
replacement = f"@{name} ({open_id}, user id: {user_id})"
elif open_id:
replacement = f"@{name} ({open_id})"
else:
replacement = f"@{name}"
text = text.replace(key, replacement)
return text
def _is_bot_mentioned(self, message: Any) -> bool:
"""Check if the bot is @mentioned in the message."""
raw_content = message.content or ""
@ -453,20 +503,30 @@ class FeishuChannel(BaseChannel):
def _add_reaction_sync(self, message_id: str, emoji_type: str) -> str | None:
"""Sync helper for adding reaction (runs in thread pool)."""
from lark_oapi.api.im.v1 import CreateMessageReactionRequest, CreateMessageReactionRequestBody, Emoji
from lark_oapi.api.im.v1 import (
CreateMessageReactionRequest,
CreateMessageReactionRequestBody,
Emoji,
)
try:
request = CreateMessageReactionRequest.builder() \
.message_id(message_id) \
request = (
CreateMessageReactionRequest.builder()
.message_id(message_id)
.request_body(
CreateMessageReactionRequestBody.builder()
.reaction_type(Emoji.builder().emoji_type(emoji_type).build())
.build()
).build()
)
.build()
)
response = self._client.im.v1.message_reaction.create(request)
if not response.success():
logger.warning("Failed to add reaction: code={}, msg={}", response.code, response.msg)
logger.warning(
"Failed to add reaction: code={}, msg={}", response.code, response.msg
)
return None
else:
logger.debug("Added {} reaction to message {}", emoji_type, message_id)
@ -490,17 +550,22 @@ class FeishuChannel(BaseChannel):
def _remove_reaction_sync(self, message_id: str, reaction_id: str) -> None:
"""Sync helper for removing reaction (runs in thread pool)."""
from lark_oapi.api.im.v1 import DeleteMessageReactionRequest
try:
request = DeleteMessageReactionRequest.builder() \
.message_id(message_id) \
.reaction_id(reaction_id) \
request = (
DeleteMessageReactionRequest.builder()
.message_id(message_id)
.reaction_id(reaction_id)
.build()
)
response = self._client.im.v1.message_reaction.delete(request)
if response.success():
logger.debug("Removed reaction {} from message {}", reaction_id, message_id)
else:
logger.debug("Failed to remove reaction: code={}, msg={}", response.code, response.msg)
logger.debug(
"Failed to remove reaction: code={}, msg={}", response.code, response.msg
)
except Exception as e:
logger.debug("Error removing reaction: {}", e)
@ -555,27 +620,35 @@ class FeishuChannel(BaseChannel):
lines = [_line.strip() for _line in table_text.strip().split("\n") if _line.strip()]
if len(lines) < 3:
return None
def split(_line: str) -> list[str]:
return [c.strip() for c in _line.strip("|").split("|")]
headers = [cls._strip_md_formatting(h) for h in split(lines[0])]
rows = [[cls._strip_md_formatting(c) for c in split(_line)] for _line in lines[2:]]
columns = [{"tag": "column", "name": f"c{i}", "display_name": h, "width": "auto"}
for i, h in enumerate(headers)]
columns = [
{"tag": "column", "name": f"c{i}", "display_name": h, "width": "auto"}
for i, h in enumerate(headers)
]
return {
"tag": "table",
"page_size": len(rows) + 1,
"columns": columns,
"rows": [{f"c{i}": r[i] if i < len(r) else "" for i in range(len(headers))} for r in rows],
"rows": [
{f"c{i}": r[i] if i < len(r) else "" for i in range(len(headers))} for r in rows
],
}
def _build_card_elements(self, content: str) -> list[dict]:
"""Split content into div/markdown + table elements for Feishu card."""
elements, last_end = [], 0
for m in self._TABLE_RE.finditer(content):
before = content[last_end:m.start()]
before = content[last_end : m.start()]
if before.strip():
elements.extend(self._split_headings(before))
elements.append(self._parse_md_table(m.group(1)) or {"tag": "markdown", "content": m.group(1)})
elements.append(
self._parse_md_table(m.group(1)) or {"tag": "markdown", "content": m.group(1)}
)
last_end = m.end()
remaining = content[last_end:]
if remaining.strip():
@ -583,7 +656,9 @@ class FeishuChannel(BaseChannel):
return elements or [{"tag": "markdown", "content": content}]
@staticmethod
def _split_elements_by_table_limit(elements: list[dict], max_tables: int = 1) -> list[list[dict]]:
def _split_elements_by_table_limit(
elements: list[dict], max_tables: int = 1
) -> list[list[dict]]:
"""Split card elements into groups with at most *max_tables* table elements each.
Feishu cards have a hard limit of one table per card (API error 11310).
@ -616,23 +691,25 @@ class FeishuChannel(BaseChannel):
code_blocks = []
for m in self._CODE_BLOCK_RE.finditer(content):
code_blocks.append(m.group(1))
protected = protected.replace(m.group(1), f"\x00CODE{len(code_blocks)-1}\x00", 1)
protected = protected.replace(m.group(1), f"\x00CODE{len(code_blocks) - 1}\x00", 1)
elements = []
last_end = 0
for m in self._HEADING_RE.finditer(protected):
before = protected[last_end:m.start()].strip()
before = protected[last_end : m.start()].strip()
if before:
elements.append({"tag": "markdown", "content": before})
text = self._strip_md_formatting(m.group(2).strip())
display_text = f"**{text}**" if text else ""
elements.append({
"tag": "div",
"text": {
"tag": "lark_md",
"content": display_text,
},
})
elements.append(
{
"tag": "div",
"text": {
"tag": "lark_md",
"content": display_text,
},
}
)
last_end = m.end()
remaining = protected[last_end:].strip()
if remaining:
@ -648,19 +725,19 @@ class FeishuChannel(BaseChannel):
# ── Smart format detection ──────────────────────────────────────────
# Patterns that indicate "complex" markdown needing card rendering
_COMPLEX_MD_RE = re.compile(
r"```" # fenced code block
r"```" # fenced code block
r"|^\|.+\|.*\n\s*\|[-:\s|]+\|" # markdown table (header + separator)
r"|^#{1,6}\s+" # headings
, re.MULTILINE,
r"|^#{1,6}\s+", # headings
re.MULTILINE,
)
# Simple markdown patterns (bold, italic, strikethrough)
_SIMPLE_MD_RE = re.compile(
r"\*\*.+?\*\*" # **bold**
r"|__.+?__" # __bold__
r"\*\*.+?\*\*" # **bold**
r"|__.+?__" # __bold__
r"|(?<!\*)\*(?!\*)(.+?)(?<!\*)\*(?!\*)" # *italic* (single *)
r"|~~.+?~~" # ~~strikethrough~~
, re.DOTALL,
r"|~~.+?~~", # ~~strikethrough~~
re.DOTALL,
)
# Markdown link: [text](url)
@ -732,14 +809,16 @@ class FeishuChannel(BaseChannel):
for m in cls._MD_LINK_RE.finditer(line):
# Text before this link
before = line[last_end:m.start()]
before = line[last_end : m.start()]
if before:
elements.append({"tag": "text", "text": before})
elements.append({
"tag": "a",
"text": m.group(1),
"href": m.group(2),
})
elements.append(
{
"tag": "a",
"text": m.group(1),
"href": m.group(2),
}
)
last_end = m.end()
# Remaining text after last link
@ -764,29 +843,39 @@ class FeishuChannel(BaseChannel):
_AUDIO_EXTS = {".opus"}
_VIDEO_EXTS = {".mp4", ".mov", ".avi"}
_FILE_TYPE_MAP = {
".opus": "opus", ".mp4": "mp4", ".pdf": "pdf", ".doc": "doc", ".docx": "doc",
".xls": "xls", ".xlsx": "xls", ".ppt": "ppt", ".pptx": "ppt",
".opus": "opus",
".mp4": "mp4",
".pdf": "pdf",
".doc": "doc",
".docx": "doc",
".xls": "xls",
".xlsx": "xls",
".ppt": "ppt",
".pptx": "ppt",
}
def _upload_image_sync(self, file_path: str) -> str | None:
"""Upload an image to Feishu and return the image_key."""
from lark_oapi.api.im.v1 import CreateImageRequest, CreateImageRequestBody
try:
with open(file_path, "rb") as f:
request = CreateImageRequest.builder() \
request = (
CreateImageRequest.builder()
.request_body(
CreateImageRequestBody.builder()
.image_type("message")
.image(f)
.build()
).build()
CreateImageRequestBody.builder().image_type("message").image(f).build()
)
.build()
)
response = self._client.im.v1.image.create(request)
if response.success():
image_key = response.data.image_key
logger.debug("Uploaded image {}: {}", os.path.basename(file_path), image_key)
return image_key
else:
logger.error("Failed to upload image: code={}, msg={}", response.code, response.msg)
logger.error(
"Failed to upload image: code={}, msg={}", response.code, response.msg
)
return None
except Exception as e:
logger.error("Error uploading image {}: {}", file_path, e)
@ -795,49 +884,62 @@ class FeishuChannel(BaseChannel):
def _upload_file_sync(self, file_path: str) -> str | None:
"""Upload a file to Feishu and return the file_key."""
from lark_oapi.api.im.v1 import CreateFileRequest, CreateFileRequestBody
ext = os.path.splitext(file_path)[1].lower()
file_type = self._FILE_TYPE_MAP.get(ext, "stream")
file_name = os.path.basename(file_path)
try:
with open(file_path, "rb") as f:
request = CreateFileRequest.builder() \
request = (
CreateFileRequest.builder()
.request_body(
CreateFileRequestBody.builder()
.file_type(file_type)
.file_name(file_name)
.file(f)
.build()
).build()
)
.build()
)
response = self._client.im.v1.file.create(request)
if response.success():
file_key = response.data.file_key
logger.debug("Uploaded file {}: {}", file_name, file_key)
return file_key
else:
logger.error("Failed to upload file: code={}, msg={}", response.code, response.msg)
logger.error(
"Failed to upload file: code={}, msg={}", response.code, response.msg
)
return None
except Exception as e:
logger.error("Error uploading file {}: {}", file_path, e)
return None
def _download_image_sync(self, message_id: str, image_key: str) -> tuple[bytes | None, str | None]:
def _download_image_sync(
self, message_id: str, image_key: str
) -> tuple[bytes | None, str | None]:
"""Download an image from Feishu message by message_id and image_key."""
from lark_oapi.api.im.v1 import GetMessageResourceRequest
try:
request = GetMessageResourceRequest.builder() \
.message_id(message_id) \
.file_key(image_key) \
.type("image") \
request = (
GetMessageResourceRequest.builder()
.message_id(message_id)
.file_key(image_key)
.type("image")
.build()
)
response = self._client.im.v1.message_resource.get(request)
if response.success():
file_data = response.file
# GetMessageResourceRequest returns BytesIO, need to read bytes
if hasattr(file_data, 'read'):
if hasattr(file_data, "read"):
file_data = file_data.read()
return file_data, response.file_name
else:
logger.error("Failed to download image: code={}, msg={}", response.code, response.msg)
logger.error(
"Failed to download image: code={}, msg={}", response.code, response.msg
)
return None, None
except Exception as e:
logger.error("Error downloading image {}: {}", image_key, e)
@ -869,17 +971,19 @@ class FeishuChannel(BaseChannel):
file_data = file_data.read()
return file_data, response.file_name
else:
logger.error("Failed to download {}: code={}, msg={}", resource_type, response.code, response.msg)
logger.error(
"Failed to download {}: code={}, msg={}",
resource_type,
response.code,
response.msg,
)
return None, None
except Exception:
logger.exception("Error downloading {} {}", resource_type, file_key)
return None, None
async def _download_and_save_media(
self,
msg_type: str,
content_json: dict,
message_id: str | None = None
self, msg_type: str, content_json: dict, message_id: str | None = None
) -> tuple[str | None, str]:
"""
Download media from Feishu and save to local disk.
@ -928,13 +1032,16 @@ class FeishuChannel(BaseChannel):
Returns a "[Reply to: ...]" context string, or None on failure.
"""
from lark_oapi.api.im.v1 import GetMessageRequest
try:
request = GetMessageRequest.builder().message_id(message_id).build()
response = self._client.im.v1.message.get(request)
if not response.success():
logger.debug(
"Feishu: could not fetch parent message {}: code={}, msg={}",
message_id, response.code, response.msg,
message_id,
response.code,
response.msg,
)
return None
items = getattr(response.data, "items", None)
@ -969,20 +1076,24 @@ class FeishuChannel(BaseChannel):
def _reply_message_sync(self, parent_message_id: str, msg_type: str, content: str) -> bool:
"""Reply to an existing Feishu message using the Reply API (synchronous)."""
from lark_oapi.api.im.v1 import ReplyMessageRequest, ReplyMessageRequestBody
try:
request = ReplyMessageRequest.builder() \
.message_id(parent_message_id) \
request = (
ReplyMessageRequest.builder()
.message_id(parent_message_id)
.request_body(
ReplyMessageRequestBody.builder()
.msg_type(msg_type)
.content(content)
.build()
).build()
ReplyMessageRequestBody.builder().msg_type(msg_type).content(content).build()
)
.build()
)
response = self._client.im.v1.message.reply(request)
if not response.success():
logger.error(
"Failed to reply to Feishu message {}: code={}, msg={}, log_id={}",
parent_message_id, response.code, response.msg, response.get_log_id()
parent_message_id,
response.code,
response.msg,
response.get_log_id(),
)
return False
logger.debug("Feishu reply sent to message {}", parent_message_id)
@ -991,24 +1102,33 @@ class FeishuChannel(BaseChannel):
logger.error("Error replying to Feishu message {}: {}", parent_message_id, e)
return False
def _send_message_sync(self, receive_id_type: str, receive_id: str, msg_type: str, content: str) -> str | None:
def _send_message_sync(
self, receive_id_type: str, receive_id: str, msg_type: str, content: str
) -> str | None:
"""Send a single message and return the message_id on success."""
from lark_oapi.api.im.v1 import CreateMessageRequest, CreateMessageRequestBody
try:
request = CreateMessageRequest.builder() \
.receive_id_type(receive_id_type) \
request = (
CreateMessageRequest.builder()
.receive_id_type(receive_id_type)
.request_body(
CreateMessageRequestBody.builder()
.receive_id(receive_id)
.msg_type(msg_type)
.content(content)
.build()
).build()
)
.build()
)
response = self._client.im.v1.message.create(request)
if not response.success():
logger.error(
"Failed to send Feishu {} message: code={}, msg={}, log_id={}",
msg_type, response.code, response.msg, response.get_log_id()
msg_type,
response.code,
response.msg,
response.get_log_id(),
)
return None
msg_id = getattr(response.data, "message_id", None)
@ -1021,31 +1141,44 @@ class FeishuChannel(BaseChannel):
def _create_streaming_card_sync(self, receive_id_type: str, chat_id: str) -> str | None:
"""Create a CardKit streaming card, send it to chat, return card_id."""
from lark_oapi.api.cardkit.v1 import CreateCardRequest, CreateCardRequestBody
card_json = {
"schema": "2.0",
"config": {"wide_screen_mode": True, "update_multi": True, "streaming_mode": True},
"body": {"elements": [{"tag": "markdown", "content": "", "element_id": _STREAM_ELEMENT_ID}]},
"body": {
"elements": [{"tag": "markdown", "content": "", "element_id": _STREAM_ELEMENT_ID}]
},
}
try:
request = CreateCardRequest.builder().request_body(
CreateCardRequestBody.builder()
.type("card_json")
.data(json.dumps(card_json, ensure_ascii=False))
request = (
CreateCardRequest.builder()
.request_body(
CreateCardRequestBody.builder()
.type("card_json")
.data(json.dumps(card_json, ensure_ascii=False))
.build()
)
.build()
).build()
)
response = self._client.cardkit.v1.card.create(request)
if not response.success():
logger.warning("Failed to create streaming card: code={}, msg={}", response.code, response.msg)
logger.warning(
"Failed to create streaming card: code={}, msg={}", response.code, response.msg
)
return None
card_id = getattr(response.data, "card_id", None)
if card_id:
message_id = self._send_message_sync(
receive_id_type, chat_id, "interactive",
receive_id_type,
chat_id,
"interactive",
json.dumps({"type": "card", "data": {"card_id": card_id}}),
)
if message_id:
return card_id
logger.warning("Created streaming card {} but failed to send it to {}", card_id, chat_id)
logger.warning(
"Created streaming card {} but failed to send it to {}", card_id, chat_id
)
return None
except Exception as e:
logger.warning("Error creating streaming card: {}", e)
@ -1053,18 +1186,32 @@ class FeishuChannel(BaseChannel):
def _stream_update_text_sync(self, card_id: str, content: str, sequence: int) -> bool:
"""Stream-update the markdown element on a CardKit card (typewriter effect)."""
from lark_oapi.api.cardkit.v1 import ContentCardElementRequest, ContentCardElementRequestBody
from lark_oapi.api.cardkit.v1 import (
ContentCardElementRequest,
ContentCardElementRequestBody,
)
try:
request = ContentCardElementRequest.builder() \
.card_id(card_id) \
.element_id(_STREAM_ELEMENT_ID) \
request = (
ContentCardElementRequest.builder()
.card_id(card_id)
.element_id(_STREAM_ELEMENT_ID)
.request_body(
ContentCardElementRequestBody.builder()
.content(content).sequence(sequence).build()
).build()
.content(content)
.sequence(sequence)
.build()
)
.build()
)
response = self._client.cardkit.v1.card_element.content(request)
if not response.success():
logger.warning("Failed to stream-update card {}: code={}, msg={}", card_id, response.code, response.msg)
logger.warning(
"Failed to stream-update card {}: code={}, msg={}",
card_id,
response.code,
response.msg,
)
return False
return True
except Exception as e:
@ -1079,22 +1226,28 @@ class FeishuChannel(BaseChannel):
Sequence must strictly exceed the previous card OpenAPI operation on this entity.
"""
from lark_oapi.api.cardkit.v1 import SettingsCardRequest, SettingsCardRequestBody
settings_payload = json.dumps({"config": {"streaming_mode": False}}, ensure_ascii=False)
try:
request = SettingsCardRequest.builder() \
.card_id(card_id) \
request = (
SettingsCardRequest.builder()
.card_id(card_id)
.request_body(
SettingsCardRequestBody.builder()
.settings(settings_payload)
.sequence(sequence)
.uuid(str(uuid.uuid4()))
.build()
).build()
)
.build()
)
response = self._client.cardkit.v1.card.settings(request)
if not response.success():
logger.warning(
"Failed to close streaming on card {}: code={}, msg={}",
card_id, response.code, response.msg,
card_id,
response.code,
response.msg,
)
return False
return True
@ -1102,7 +1255,9 @@ class FeishuChannel(BaseChannel):
logger.warning("Error closing streaming on card {}: {}", card_id, e)
return False
async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None:
async def send_delta(
self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None
) -> None:
"""Progressive streaming via CardKit: create card on first delta, stream-update on subsequent."""
if not self._client:
return
@ -1121,17 +1276,31 @@ class FeishuChannel(BaseChannel):
if buf.card_id:
buf.sequence += 1
await loop.run_in_executor(
None, self._stream_update_text_sync, buf.card_id, buf.text, buf.sequence,
None,
self._stream_update_text_sync,
buf.card_id,
buf.text,
buf.sequence,
)
# Required so the chat list preview exits the streaming placeholder (Feishu streaming card docs).
buf.sequence += 1
await loop.run_in_executor(
None, self._close_streaming_mode_sync, buf.card_id, buf.sequence,
None,
self._close_streaming_mode_sync,
buf.card_id,
buf.sequence,
)
else:
for chunk in self._split_elements_by_table_limit(self._build_card_elements(buf.text)):
card = json.dumps({"config": {"wide_screen_mode": True}, "elements": chunk}, ensure_ascii=False)
await loop.run_in_executor(None, self._send_message_sync, rid_type, chat_id, "interactive", card)
for chunk in self._split_elements_by_table_limit(
self._build_card_elements(buf.text)
):
card = json.dumps(
{"config": {"wide_screen_mode": True}, "elements": chunk},
ensure_ascii=False,
)
await loop.run_in_executor(
None, self._send_message_sync, rid_type, chat_id, "interactive", card
)
return
# --- accumulate delta ---
@ -1145,15 +1314,21 @@ class FeishuChannel(BaseChannel):
now = time.monotonic()
if buf.card_id is None:
card_id = await loop.run_in_executor(None, self._create_streaming_card_sync, rid_type, chat_id)
card_id = await loop.run_in_executor(
None, self._create_streaming_card_sync, rid_type, chat_id
)
if card_id:
buf.card_id = card_id
buf.sequence = 1
await loop.run_in_executor(None, self._stream_update_text_sync, card_id, buf.text, 1)
await loop.run_in_executor(
None, self._stream_update_text_sync, card_id, buf.text, 1
)
buf.last_edit = now
elif (now - buf.last_edit) >= self._STREAM_EDIT_INTERVAL:
buf.sequence += 1
await loop.run_in_executor(None, self._stream_update_text_sync, buf.card_id, buf.text, buf.sequence)
await loop.run_in_executor(
None, self._stream_update_text_sync, buf.card_id, buf.text, buf.sequence
)
buf.last_edit = now
async def send(self, msg: OutboundMessage) -> None:
@ -1179,14 +1354,13 @@ class FeishuChannel(BaseChannel):
# Only the very first send (media or text) in this call uses reply; subsequent
# chunks/media fall back to plain create to avoid redundant quote bubbles.
reply_message_id: str | None = None
if (
self.config.reply_to_message
and not msg.metadata.get("_progress", False)
):
if self.config.reply_to_message and not msg.metadata.get("_progress", False):
reply_message_id = msg.metadata.get("message_id") or None
# For topic group messages, always reply to keep context in thread
elif msg.metadata.get("thread_id"):
reply_message_id = msg.metadata.get("root_id") or msg.metadata.get("message_id") or None
reply_message_id = (
msg.metadata.get("root_id") or msg.metadata.get("message_id") or None
)
first_send = True # tracks whether the reply has already been used
@ -1210,8 +1384,10 @@ class FeishuChannel(BaseChannel):
key = await loop.run_in_executor(None, self._upload_image_sync, file_path)
if key:
await loop.run_in_executor(
None, _do_send,
"image", json.dumps({"image_key": key}, ensure_ascii=False),
None,
_do_send,
"image",
json.dumps({"image_key": key}, ensure_ascii=False),
)
else:
key = await loop.run_in_executor(None, self._upload_file_sync, file_path)
@ -1226,8 +1402,10 @@ class FeishuChannel(BaseChannel):
else:
media_type = "file"
await loop.run_in_executor(
None, _do_send,
media_type, json.dumps({"file_key": key}, ensure_ascii=False),
None,
_do_send,
media_type,
json.dumps({"file_key": key}, ensure_ascii=False),
)
if msg.content and msg.content.strip():
@ -1249,8 +1427,10 @@ class FeishuChannel(BaseChannel):
for chunk in self._split_elements_by_table_limit(elements):
card = {"config": {"wide_screen_mode": True}, "elements": chunk}
await loop.run_in_executor(
None, _do_send,
"interactive", json.dumps(card, ensure_ascii=False),
None,
_do_send,
"interactive",
json.dumps(card, ensure_ascii=False),
)
except Exception as e:
@ -1265,13 +1445,16 @@ class FeishuChannel(BaseChannel):
if self._loop and self._loop.is_running():
asyncio.run_coroutine_threadsafe(self._on_message(data), self._loop)
async def _on_message(self, data: Any) -> None:
async def _on_message(self, data: P2ImMessageReceiveV1) -> None:
"""Handle incoming message from Feishu."""
try:
event = data.event
message = event.message
sender = event.sender
logger.debug("Feishu raw message: {}", message.content)
logger.debug("Feishu mentions: {}", getattr(message, "mentions", None))
# Deduplication check
message_id = message.message_id
if message_id in self._processed_message_ids:
@ -1310,6 +1493,8 @@ class FeishuChannel(BaseChannel):
if msg_type == "text":
text = content_json.get("text", "")
if text:
mentions = getattr(message, "mentions", None)
text = self._resolve_mentions(text, mentions)
content_parts.append(text)
elif msg_type == "post":
@ -1326,7 +1511,9 @@ class FeishuChannel(BaseChannel):
content_parts.append(content_text)
elif msg_type in ("image", "audio", "file", "media"):
file_path, content_text = await self._download_and_save_media(msg_type, content_json, message_id)
file_path, content_text = await self._download_and_save_media(
msg_type, content_json, message_id
)
if file_path:
media_paths.append(file_path)
@ -1337,7 +1524,14 @@ class FeishuChannel(BaseChannel):
content_parts.append(content_text)
elif msg_type in ("share_chat", "share_user", "interactive", "share_calendar_event", "system", "merge_forward"):
elif msg_type in (
"share_chat",
"share_user",
"interactive",
"share_calendar_event",
"system",
"merge_forward",
):
# Handle share cards and interactive messages
text = _extract_share_card_content(content_json, msg_type)
if text:
@ -1380,8 +1574,9 @@ class FeishuChannel(BaseChannel):
"parent_id": parent_id,
"root_id": root_id,
"thread_id": thread_id,
}
},
)
await self._del_reaction(message_id, reaction_id)
except Exception as e:
logger.error("Error processing Feishu message: {}", e)
@ -1445,7 +1640,9 @@ class FeishuChannel(BaseChannel):
return "\n".join(part for part in parts if part)
async def _send_tool_hint_card(self, receive_id_type: str, receive_id: str, tool_hint: str) -> None:
async def _send_tool_hint_card(
self, receive_id_type: str, receive_id: str, tool_hint: str
) -> None:
"""Send tool hint as an interactive card with formatted code block.
Args:
@ -1461,15 +1658,15 @@ class FeishuChannel(BaseChannel):
card = {
"config": {"wide_screen_mode": True},
"elements": [
{
"tag": "markdown",
"content": f"**Tool Calls**\n\n```text\n{formatted_code}\n```"
}
]
{"tag": "markdown", "content": f"**Tool Calls**\n\n```text\n{formatted_code}\n```"}
],
}
await loop.run_in_executor(
None, self._send_message_sync,
receive_id_type, receive_id, "interactive",
None,
self._send_message_sync,
receive_id_type,
receive_id,
"interactive",
json.dumps(card, ensure_ascii=False),
)