From f0f0bf02d77e24046a4c35037d5bd3d938222bc7 Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Wed, 25 Mar 2026 14:34:37 +0000 Subject: [PATCH] refactor(channel): centralize retry around explicit send failures Make channel delivery failures raise consistently so retry policy lives in ChannelManager rather than being split across individual channels. Tighten Telegram stream finalization, clarify sendMaxRetries semantics, and align the docs with the behavior the system actually guarantees. --- README.md | 9 +++++---- nanobot/channels/base.py | 9 ++++++++- nanobot/channels/feishu.py | 1 + nanobot/channels/manager.py | 15 +++++++++------ nanobot/channels/mochat.py | 1 + nanobot/channels/slack.py | 1 + nanobot/channels/telegram.py | 9 ++++++--- nanobot/channels/wecom.py | 1 + nanobot/channels/weixin.py | 1 + nanobot/channels/whatsapp.py | 2 ++ nanobot/config/schema.py | 2 +- tests/channels/test_telegram_channel.py | 21 +++++++++++++++++++-- 12 files changed, 55 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 40ecd4cb1..ae2512eb0 100644 --- a/README.md +++ b/README.md @@ -1176,14 +1176,15 @@ Global settings that apply to all channels. Configure under the `channels` secti |---------|---------|-------------| | `sendProgress` | `true` | Stream agent's text progress to the channel | | `sendToolHints` | `false` | Stream tool-call hints (e.g. `read_file("…")`) | -| `sendMaxRetries` | `3` | Max retry attempts for message send failures (0-10) | +| `sendMaxRetries` | `3` | Max delivery attempts per outbound message, including the initial send (0-10 configured, minimum 1 actual attempt) | #### Retry Behavior -When a message fails to send, nanobot will automatically retry with exponential backoff: +When a channel send operation raises an error, nanobot retries with exponential backoff: -- **Attempts 1-3**: Retry delays are 1s, 2s, 4s -- **Attempts 4+**: Retry delay caps at 4s +- **Attempt 1**: Initial send +- **Attempts 2-4**: Retry delays are 1s, 2s, 4s +- **Attempts 5+**: Retry delay caps at 4s - **Transient failures** (network hiccups, temporary API limits): Retry usually succeeds - **Permanent failures** (invalid token, channel banned): All retries fail diff --git a/nanobot/channels/base.py b/nanobot/channels/base.py index 87614cb46..5a776eed4 100644 --- a/nanobot/channels/base.py +++ b/nanobot/channels/base.py @@ -85,11 +85,18 @@ class BaseChannel(ABC): Args: msg: The message to send. + + Implementations should raise on delivery failure so the channel manager + can apply any retry policy in one place. """ pass async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None: - """Deliver a streaming text chunk. Override in subclass to enable streaming.""" + """Deliver a streaming text chunk. + + Override in subclasses to enable streaming. Implementations should + raise on delivery failure so the channel manager can retry. + """ pass @property diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index 06daf409d..0ffca601e 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -1031,6 +1031,7 @@ class FeishuChannel(BaseChannel): except Exception as e: logger.error("Error sending Feishu message: {}", e) + raise def _on_message_sync(self, data: Any) -> None: """ diff --git a/nanobot/channels/manager.py b/nanobot/channels/manager.py index 2f1b400c4..2ec7c001e 100644 --- a/nanobot/channels/manager.py +++ b/nanobot/channels/manager.py @@ -142,6 +142,14 @@ class ChannelManager: except asyncio.CancelledError: break + @staticmethod + async def _send_once(channel: BaseChannel, msg: OutboundMessage) -> None: + """Send one outbound message without retry policy.""" + if msg.metadata.get("_stream_delta") or msg.metadata.get("_stream_end"): + await channel.send_delta(msg.chat_id, msg.content, msg.metadata) + elif not msg.metadata.get("_streamed"): + await channel.send(msg) + async def _send_with_retry(self, channel: BaseChannel, msg: OutboundMessage) -> None: """Send a message with retry on failure using exponential backoff. @@ -151,12 +159,7 @@ class ChannelManager: for attempt in range(max_attempts): try: - if msg.metadata.get("_stream_delta") or msg.metadata.get("_stream_end"): - await channel.send_delta(msg.chat_id, msg.content, msg.metadata) - elif msg.metadata.get("_streamed"): - pass - else: - await channel.send(msg) + await self._send_once(channel, msg) return # Send succeeded except asyncio.CancelledError: raise # Propagate cancellation for graceful shutdown diff --git a/nanobot/channels/mochat.py b/nanobot/channels/mochat.py index 629379f2e..0b02aec62 100644 --- a/nanobot/channels/mochat.py +++ b/nanobot/channels/mochat.py @@ -374,6 +374,7 @@ class MochatChannel(BaseChannel): content, msg.reply_to) except Exception as e: logger.error("Failed to send Mochat message: {}", e) + raise # ---- config / init helpers --------------------------------------------- diff --git a/nanobot/channels/slack.py b/nanobot/channels/slack.py index 87194ac70..2503f6a2d 100644 --- a/nanobot/channels/slack.py +++ b/nanobot/channels/slack.py @@ -145,6 +145,7 @@ class SlackChannel(BaseChannel): except Exception as e: logger.error("Error sending Slack message: {}", e) + raise async def _on_socket_request( self, diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index fcccbe8a4..c3041c9d2 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -476,6 +476,7 @@ class TelegramChannel(BaseChannel): ) except Exception as e2: logger.error("Error sending Telegram message: {}", e2) + raise async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None: """Progressive message editing: send on first delta, edit on subsequent ones.""" @@ -485,7 +486,7 @@ class TelegramChannel(BaseChannel): int_chat_id = int(chat_id) if meta.get("_stream_end"): - buf = self._stream_bufs.pop(chat_id, None) + buf = self._stream_bufs.get(chat_id) if not buf or not buf.message_id or not buf.text: return self._stop_typing(chat_id) @@ -504,8 +505,10 @@ class TelegramChannel(BaseChannel): chat_id=int_chat_id, message_id=buf.message_id, text=buf.text, ) - except Exception: - pass + except Exception as e2: + logger.warning("Final stream edit failed: {}", e2) + raise # Let ChannelManager handle retry + self._stream_bufs.pop(chat_id, None) return buf = self._stream_bufs.get(chat_id) diff --git a/nanobot/channels/wecom.py b/nanobot/channels/wecom.py index 2f248559e..05ad14825 100644 --- a/nanobot/channels/wecom.py +++ b/nanobot/channels/wecom.py @@ -368,3 +368,4 @@ class WecomChannel(BaseChannel): except Exception as e: logger.error("Error sending WeCom message: {}", e) + raise diff --git a/nanobot/channels/weixin.py b/nanobot/channels/weixin.py index 3fbe329aa..f09ef95f7 100644 --- a/nanobot/channels/weixin.py +++ b/nanobot/channels/weixin.py @@ -751,6 +751,7 @@ class WeixinChannel(BaseChannel): await self._send_text(msg.chat_id, chunk, ctx_token) except Exception as e: logger.error("Error sending WeChat message: {}", e) + raise async def _send_text( self, diff --git a/nanobot/channels/whatsapp.py b/nanobot/channels/whatsapp.py index 8826a64f3..95bde46e9 100644 --- a/nanobot/channels/whatsapp.py +++ b/nanobot/channels/whatsapp.py @@ -146,6 +146,7 @@ class WhatsAppChannel(BaseChannel): await self._ws.send(json.dumps(payload, ensure_ascii=False)) except Exception as e: logger.error("Error sending WhatsApp message: {}", e) + raise for media_path in msg.media or []: try: @@ -160,6 +161,7 @@ class WhatsAppChannel(BaseChannel): await self._ws.send(json.dumps(payload, ensure_ascii=False)) except Exception as e: logger.error("Error sending WhatsApp media {}: {}", media_path, e) + raise async def _handle_bridge_message(self, raw: str) -> None: """Handle a message from the bridge.""" diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index 1d964a642..15fcacafe 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -25,7 +25,7 @@ class ChannelsConfig(Base): send_progress: bool = True # stream agent's text progress to the channel send_tool_hints: bool = False # stream tool-call hints (e.g. read_file("…")) - send_max_retries: int = Field(default=3, ge=0, le=10) # Max retry attempts for message send failures + send_max_retries: int = Field(default=3, ge=0, le=10) # Max delivery attempts (initial send included) class AgentDefaults(Base): diff --git a/tests/channels/test_telegram_channel.py b/tests/channels/test_telegram_channel.py index 353d5d05d..6b4c008e0 100644 --- a/tests/channels/test_telegram_channel.py +++ b/tests/channels/test_telegram_channel.py @@ -13,7 +13,7 @@ except ImportError: from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus -from nanobot.channels.telegram import TELEGRAM_REPLY_CONTEXT_MAX_LEN, TelegramChannel +from nanobot.channels.telegram import TELEGRAM_REPLY_CONTEXT_MAX_LEN, TelegramChannel, _StreamBuf from nanobot.channels.telegram import TelegramConfig @@ -271,13 +271,30 @@ async def test_send_text_gives_up_after_max_retries() -> None: orig_delay = tg_mod._SEND_RETRY_BASE_DELAY tg_mod._SEND_RETRY_BASE_DELAY = 0.01 try: - await channel._send_text(123, "hello", None, {}) + with pytest.raises(TimedOut): + await channel._send_text(123, "hello", None, {}) finally: tg_mod._SEND_RETRY_BASE_DELAY = orig_delay assert channel._app.bot.sent_messages == [] +@pytest.mark.asyncio +async def test_send_delta_stream_end_raises_and_keeps_buffer_on_failure() -> None: + channel = TelegramChannel( + TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]), + MessageBus(), + ) + channel._app = _FakeApp(lambda: None) + channel._app.bot.edit_message_text = AsyncMock(side_effect=RuntimeError("boom")) + channel._stream_bufs["123"] = _StreamBuf(text="hello", message_id=7, last_edit=0.0) + + with pytest.raises(RuntimeError, match="boom"): + await channel.send_delta("123", "", {"_stream_end": True}) + + assert "123" in channel._stream_bufs + + def test_derive_topic_session_key_uses_thread_id() -> None: message = SimpleNamespace( chat=SimpleNamespace(type="supergroup"),