diff --git a/README.md b/README.md index 40ecd4cb1..ae2512eb0 100644 --- a/README.md +++ b/README.md @@ -1176,14 +1176,15 @@ Global settings that apply to all channels. Configure under the `channels` secti |---------|---------|-------------| | `sendProgress` | `true` | Stream agent's text progress to the channel | | `sendToolHints` | `false` | Stream tool-call hints (e.g. `read_file("…")`) | -| `sendMaxRetries` | `3` | Max retry attempts for message send failures (0-10) | +| `sendMaxRetries` | `3` | Max delivery attempts per outbound message, including the initial send (0-10 configured, minimum 1 actual attempt) | #### Retry Behavior -When a message fails to send, nanobot will automatically retry with exponential backoff: +When a channel send operation raises an error, nanobot retries with exponential backoff: -- **Attempts 1-3**: Retry delays are 1s, 2s, 4s -- **Attempts 4+**: Retry delay caps at 4s +- **Attempt 1**: Initial send +- **Attempts 2-4**: Retry delays are 1s, 2s, 4s +- **Attempts 5+**: Retry delay caps at 4s - **Transient failures** (network hiccups, temporary API limits): Retry usually succeeds - **Permanent failures** (invalid token, channel banned): All retries fail diff --git a/nanobot/channels/base.py b/nanobot/channels/base.py index 87614cb46..5a776eed4 100644 --- a/nanobot/channels/base.py +++ b/nanobot/channels/base.py @@ -85,11 +85,18 @@ class BaseChannel(ABC): Args: msg: The message to send. + + Implementations should raise on delivery failure so the channel manager + can apply any retry policy in one place. """ pass async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None: - """Deliver a streaming text chunk. Override in subclass to enable streaming.""" + """Deliver a streaming text chunk. + + Override in subclasses to enable streaming. Implementations should + raise on delivery failure so the channel manager can retry. + """ pass @property diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index 06daf409d..0ffca601e 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -1031,6 +1031,7 @@ class FeishuChannel(BaseChannel): except Exception as e: logger.error("Error sending Feishu message: {}", e) + raise def _on_message_sync(self, data: Any) -> None: """ diff --git a/nanobot/channels/manager.py b/nanobot/channels/manager.py index 2f1b400c4..2ec7c001e 100644 --- a/nanobot/channels/manager.py +++ b/nanobot/channels/manager.py @@ -142,6 +142,14 @@ class ChannelManager: except asyncio.CancelledError: break + @staticmethod + async def _send_once(channel: BaseChannel, msg: OutboundMessage) -> None: + """Send one outbound message without retry policy.""" + if msg.metadata.get("_stream_delta") or msg.metadata.get("_stream_end"): + await channel.send_delta(msg.chat_id, msg.content, msg.metadata) + elif not msg.metadata.get("_streamed"): + await channel.send(msg) + async def _send_with_retry(self, channel: BaseChannel, msg: OutboundMessage) -> None: """Send a message with retry on failure using exponential backoff. @@ -151,12 +159,7 @@ class ChannelManager: for attempt in range(max_attempts): try: - if msg.metadata.get("_stream_delta") or msg.metadata.get("_stream_end"): - await channel.send_delta(msg.chat_id, msg.content, msg.metadata) - elif msg.metadata.get("_streamed"): - pass - else: - await channel.send(msg) + await self._send_once(channel, msg) return # Send succeeded except asyncio.CancelledError: raise # Propagate cancellation for graceful shutdown diff --git a/nanobot/channels/mochat.py b/nanobot/channels/mochat.py index 629379f2e..0b02aec62 100644 --- a/nanobot/channels/mochat.py +++ b/nanobot/channels/mochat.py @@ -374,6 +374,7 @@ class MochatChannel(BaseChannel): content, msg.reply_to) except Exception as e: logger.error("Failed to send Mochat message: {}", e) + raise # ---- config / init helpers --------------------------------------------- diff --git a/nanobot/channels/slack.py b/nanobot/channels/slack.py index 87194ac70..2503f6a2d 100644 --- a/nanobot/channels/slack.py +++ b/nanobot/channels/slack.py @@ -145,6 +145,7 @@ class SlackChannel(BaseChannel): except Exception as e: logger.error("Error sending Slack message: {}", e) + raise async def _on_socket_request( self, diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index fcccbe8a4..c3041c9d2 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -476,6 +476,7 @@ class TelegramChannel(BaseChannel): ) except Exception as e2: logger.error("Error sending Telegram message: {}", e2) + raise async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None: """Progressive message editing: send on first delta, edit on subsequent ones.""" @@ -485,7 +486,7 @@ class TelegramChannel(BaseChannel): int_chat_id = int(chat_id) if meta.get("_stream_end"): - buf = self._stream_bufs.pop(chat_id, None) + buf = self._stream_bufs.get(chat_id) if not buf or not buf.message_id or not buf.text: return self._stop_typing(chat_id) @@ -504,8 +505,10 @@ class TelegramChannel(BaseChannel): chat_id=int_chat_id, message_id=buf.message_id, text=buf.text, ) - except Exception: - pass + except Exception as e2: + logger.warning("Final stream edit failed: {}", e2) + raise # Let ChannelManager handle retry + self._stream_bufs.pop(chat_id, None) return buf = self._stream_bufs.get(chat_id) diff --git a/nanobot/channels/wecom.py b/nanobot/channels/wecom.py index 2f248559e..05ad14825 100644 --- a/nanobot/channels/wecom.py +++ b/nanobot/channels/wecom.py @@ -368,3 +368,4 @@ class WecomChannel(BaseChannel): except Exception as e: logger.error("Error sending WeCom message: {}", e) + raise diff --git a/nanobot/channels/weixin.py b/nanobot/channels/weixin.py index 3fbe329aa..f09ef95f7 100644 --- a/nanobot/channels/weixin.py +++ b/nanobot/channels/weixin.py @@ -751,6 +751,7 @@ class WeixinChannel(BaseChannel): await self._send_text(msg.chat_id, chunk, ctx_token) except Exception as e: logger.error("Error sending WeChat message: {}", e) + raise async def _send_text( self, diff --git a/nanobot/channels/whatsapp.py b/nanobot/channels/whatsapp.py index 8826a64f3..95bde46e9 100644 --- a/nanobot/channels/whatsapp.py +++ b/nanobot/channels/whatsapp.py @@ -146,6 +146,7 @@ class WhatsAppChannel(BaseChannel): await self._ws.send(json.dumps(payload, ensure_ascii=False)) except Exception as e: logger.error("Error sending WhatsApp message: {}", e) + raise for media_path in msg.media or []: try: @@ -160,6 +161,7 @@ class WhatsAppChannel(BaseChannel): await self._ws.send(json.dumps(payload, ensure_ascii=False)) except Exception as e: logger.error("Error sending WhatsApp media {}: {}", media_path, e) + raise async def _handle_bridge_message(self, raw: str) -> None: """Handle a message from the bridge.""" diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index 1d964a642..15fcacafe 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -25,7 +25,7 @@ class ChannelsConfig(Base): send_progress: bool = True # stream agent's text progress to the channel send_tool_hints: bool = False # stream tool-call hints (e.g. read_file("…")) - send_max_retries: int = Field(default=3, ge=0, le=10) # Max retry attempts for message send failures + send_max_retries: int = Field(default=3, ge=0, le=10) # Max delivery attempts (initial send included) class AgentDefaults(Base): diff --git a/tests/channels/test_telegram_channel.py b/tests/channels/test_telegram_channel.py index 353d5d05d..6b4c008e0 100644 --- a/tests/channels/test_telegram_channel.py +++ b/tests/channels/test_telegram_channel.py @@ -13,7 +13,7 @@ except ImportError: from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus -from nanobot.channels.telegram import TELEGRAM_REPLY_CONTEXT_MAX_LEN, TelegramChannel +from nanobot.channels.telegram import TELEGRAM_REPLY_CONTEXT_MAX_LEN, TelegramChannel, _StreamBuf from nanobot.channels.telegram import TelegramConfig @@ -271,13 +271,30 @@ async def test_send_text_gives_up_after_max_retries() -> None: orig_delay = tg_mod._SEND_RETRY_BASE_DELAY tg_mod._SEND_RETRY_BASE_DELAY = 0.01 try: - await channel._send_text(123, "hello", None, {}) + with pytest.raises(TimedOut): + await channel._send_text(123, "hello", None, {}) finally: tg_mod._SEND_RETRY_BASE_DELAY = orig_delay assert channel._app.bot.sent_messages == [] +@pytest.mark.asyncio +async def test_send_delta_stream_end_raises_and_keeps_buffer_on_failure() -> None: + channel = TelegramChannel( + TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]), + MessageBus(), + ) + channel._app = _FakeApp(lambda: None) + channel._app.bot.edit_message_text = AsyncMock(side_effect=RuntimeError("boom")) + channel._stream_bufs["123"] = _StreamBuf(text="hello", message_id=7, last_edit=0.0) + + with pytest.raises(RuntimeError, match="boom"): + await channel.send_delta("123", "", {"_stream_end": True}) + + assert "123" in channel._stream_bufs + + def test_derive_topic_session_key_uses_thread_id() -> None: message = SimpleNamespace( chat=SimpleNamespace(type="supergroup"),