feat(weixin): implement getConfig and sendTyping

This commit is contained in:
xcosmosbox 2026-03-29 16:25:25 +08:00 committed by Xubin Ren
parent faf2b07923
commit 345c393e53
2 changed files with 135 additions and 14 deletions

View File

@ -99,6 +99,9 @@ MAX_CONSECUTIVE_FAILURES = 3
BACKOFF_DELAY_S = 30
RETRY_DELAY_S = 2
MAX_QR_REFRESH_COUNT = 3
TYPING_STATUS_TYPING = 1
TYPING_STATUS_CANCEL = 2
TYPING_TICKET_TTL_S = 24 * 60 * 60
# Default long-poll timeout; overridden by server via longpolling_timeout_ms.
DEFAULT_LONG_POLL_TIMEOUT_S = 35
@ -158,6 +161,7 @@ class WeixinChannel(BaseChannel):
self._poll_task: asyncio.Task | None = None
self._next_poll_timeout_s: int = DEFAULT_LONG_POLL_TIMEOUT_S
self._session_pause_until: float = 0.0
self._typing_tickets: dict[str, tuple[str, float]] = {}
# ------------------------------------------------------------------
# State persistence
@ -832,6 +836,40 @@ class WeixinChannel(BaseChannel):
# Outbound (matches send.ts buildTextMessageReq + sendMessageWeixin)
# ------------------------------------------------------------------
async def _get_typing_ticket(self, user_id: str, context_token: str = "") -> str:
"""Get typing ticket for a user with simple per-user TTL cache."""
now = time.time()
cached = self._typing_tickets.get(user_id)
if cached:
ticket, expires_at = cached
if ticket and now < expires_at:
return ticket
body: dict[str, Any] = {
"ilink_user_id": user_id,
"context_token": context_token or None,
"base_info": BASE_INFO,
}
data = await self._api_post("ilink/bot/getconfig", body)
if data.get("ret", 0) == 0:
ticket = str(data.get("typing_ticket", "") or "")
if ticket:
self._typing_tickets[user_id] = (ticket, now + TYPING_TICKET_TTL_S)
return ticket
return ""
async def _send_typing(self, user_id: str, typing_ticket: str, status: int) -> None:
"""Best-effort sendtyping wrapper."""
if not typing_ticket:
return
body: dict[str, Any] = {
"ilink_user_id": user_id,
"typing_ticket": typing_ticket,
"status": status,
"base_info": BASE_INFO,
}
await self._api_post("ilink/bot/sendtyping", body)
async def send(self, msg: OutboundMessage) -> None:
if not self._client or not self._token:
logger.warning("WeChat client not initialized or not authenticated")
@ -851,29 +889,48 @@ class WeixinChannel(BaseChannel):
)
return
# --- Send media files first (following Telegram channel pattern) ---
for media_path in (msg.media or []):
try:
await self._send_media_file(msg.chat_id, media_path, ctx_token)
except Exception as e:
filename = Path(media_path).name
logger.error("Failed to send WeChat media {}: {}", media_path, e)
# Notify user about failure via text
await self._send_text(
msg.chat_id, f"[Failed to send: {filename}]", ctx_token,
)
typing_ticket = ""
try:
typing_ticket = await self._get_typing_ticket(msg.chat_id, ctx_token)
except Exception as e:
logger.warning("WeChat getconfig failed for {}: {}", msg.chat_id, e)
typing_ticket = ""
# --- Send text content ---
if not content:
return
if typing_ticket:
try:
await self._send_typing(msg.chat_id, typing_ticket, TYPING_STATUS_TYPING)
except Exception as e:
logger.debug("WeChat sendtyping(start) failed for {}: {}", msg.chat_id, e)
try:
# --- Send media files first (following Telegram channel pattern) ---
for media_path in (msg.media or []):
try:
await self._send_media_file(msg.chat_id, media_path, ctx_token)
except Exception as e:
filename = Path(media_path).name
logger.error("Failed to send WeChat media {}: {}", media_path, e)
# Notify user about failure via text
await self._send_text(
msg.chat_id, f"[Failed to send: {filename}]", ctx_token,
)
# --- Send text content ---
if not content:
return
chunks = split_message(content, WEIXIN_MAX_MESSAGE_LEN)
for chunk in chunks:
await self._send_text(msg.chat_id, chunk, ctx_token)
except Exception as e:
logger.error("Error sending WeChat message: {}", e)
raise
finally:
if typing_ticket:
try:
await self._send_typing(msg.chat_id, typing_ticket, TYPING_STATUS_CANCEL)
except Exception as e:
logger.debug("WeChat sendtyping(cancel) failed for {}: {}", msg.chat_id, e)
async def _send_text(
self,

View File

@ -280,6 +280,70 @@ async def test_send_does_not_send_when_session_is_paused() -> None:
channel._send_text.assert_not_awaited()
@pytest.mark.asyncio
async def test_get_typing_ticket_fetches_and_caches_per_user() -> None:
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._api_post = AsyncMock(return_value={"ret": 0, "typing_ticket": "ticket-1"})
first = await channel._get_typing_ticket("wx-user", "ctx-1")
second = await channel._get_typing_ticket("wx-user", "ctx-2")
assert first == "ticket-1"
assert second == "ticket-1"
channel._api_post.assert_awaited_once_with(
"ilink/bot/getconfig",
{"ilink_user_id": "wx-user", "context_token": "ctx-1", "base_info": weixin_mod.BASE_INFO},
)
@pytest.mark.asyncio
async def test_send_uses_typing_start_and_cancel_when_ticket_available() -> None:
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._context_tokens["wx-user"] = "ctx-typing"
channel._send_text = AsyncMock()
channel._api_post = AsyncMock(
side_effect=[
{"ret": 0, "typing_ticket": "ticket-typing"},
{"ret": 0},
{"ret": 0},
]
)
await channel.send(
type("Msg", (), {"chat_id": "wx-user", "content": "pong", "media": [], "metadata": {}})()
)
channel._send_text.assert_awaited_once_with("wx-user", "pong", "ctx-typing")
assert channel._api_post.await_count == 3
assert channel._api_post.await_args_list[0].args[0] == "ilink/bot/getconfig"
assert channel._api_post.await_args_list[1].args[0] == "ilink/bot/sendtyping"
assert channel._api_post.await_args_list[1].args[1]["status"] == 1
assert channel._api_post.await_args_list[2].args[0] == "ilink/bot/sendtyping"
assert channel._api_post.await_args_list[2].args[1]["status"] == 2
@pytest.mark.asyncio
async def test_send_still_sends_text_when_typing_ticket_missing() -> None:
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._context_tokens["wx-user"] = "ctx-no-ticket"
channel._send_text = AsyncMock()
channel._api_post = AsyncMock(return_value={"ret": 1, "errmsg": "no config"})
await channel.send(
type("Msg", (), {"chat_id": "wx-user", "content": "pong", "media": [], "metadata": {}})()
)
channel._send_text.assert_awaited_once_with("wx-user", "pong", "ctx-no-ticket")
channel._api_post.assert_awaited_once()
assert channel._api_post.await_args_list[0].args[0] == "ilink/bot/getconfig"
@pytest.mark.asyncio
async def test_poll_once_pauses_session_on_expired_errcode() -> None:
channel, _bus = _make_channel()