refactor(channel): centralize retry around explicit send failures

Make channel delivery failures raise consistently so retry policy lives in ChannelManager rather than being split across individual channels. Tighten Telegram stream finalization, clarify sendMaxRetries semantics, and align the docs with the behavior the system actually guarantees.
This commit is contained in:
Xubin Ren 2026-03-25 14:34:37 +00:00 committed by chengyongru
parent 226fdfcb91
commit c2a9dc884c
12 changed files with 55 additions and 17 deletions

View File

@ -1270,14 +1270,15 @@ Global settings that apply to all channels. Configure under the `channels` secti
|---------|---------|-------------| |---------|---------|-------------|
| `sendProgress` | `true` | Stream agent's text progress to the channel | | `sendProgress` | `true` | Stream agent's text progress to the channel |
| `sendToolHints` | `false` | Stream tool-call hints (e.g. `read_file("…")`) | | `sendToolHints` | `false` | Stream tool-call hints (e.g. `read_file("…")`) |
| `sendMaxRetries` | `3` | Max retry attempts for message send failures (0-10) | | `sendMaxRetries` | `3` | Max delivery attempts per outbound message, including the initial send (0-10 configured, minimum 1 actual attempt) |
#### Retry Behavior #### Retry Behavior
When a message fails to send, nanobot will automatically retry with exponential backoff: When a channel send operation raises an error, nanobot retries with exponential backoff:
- **Attempts 1-3**: Retry delays are 1s, 2s, 4s - **Attempt 1**: Initial send
- **Attempts 4+**: Retry delay caps at 4s - **Attempts 2-4**: Retry delays are 1s, 2s, 4s
- **Attempts 5+**: Retry delay caps at 4s
- **Transient failures** (network hiccups, temporary API limits): Retry usually succeeds - **Transient failures** (network hiccups, temporary API limits): Retry usually succeeds
- **Permanent failures** (invalid token, channel banned): All retries fail - **Permanent failures** (invalid token, channel banned): All retries fail

View File

@ -85,11 +85,18 @@ class BaseChannel(ABC):
Args: Args:
msg: The message to send. msg: The message to send.
Implementations should raise on delivery failure so the channel manager
can apply any retry policy in one place.
""" """
pass pass
async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None: async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None:
"""Deliver a streaming text chunk. Override in subclass to enable streaming.""" """Deliver a streaming text chunk.
Override in subclasses to enable streaming. Implementations should
raise on delivery failure so the channel manager can retry.
"""
pass pass
@property @property

View File

@ -1183,6 +1183,7 @@ class FeishuChannel(BaseChannel):
except Exception as e: except Exception as e:
logger.error("Error sending Feishu message: {}", e) logger.error("Error sending Feishu message: {}", e)
raise
def _on_message_sync(self, data: Any) -> None: def _on_message_sync(self, data: Any) -> None:
""" """

View File

@ -142,6 +142,14 @@ class ChannelManager:
except asyncio.CancelledError: except asyncio.CancelledError:
break break
@staticmethod
async def _send_once(channel: BaseChannel, msg: OutboundMessage) -> None:
"""Send one outbound message without retry policy."""
if msg.metadata.get("_stream_delta") or msg.metadata.get("_stream_end"):
await channel.send_delta(msg.chat_id, msg.content, msg.metadata)
elif not msg.metadata.get("_streamed"):
await channel.send(msg)
async def _send_with_retry(self, channel: BaseChannel, msg: OutboundMessage) -> None: async def _send_with_retry(self, channel: BaseChannel, msg: OutboundMessage) -> None:
"""Send a message with retry on failure using exponential backoff. """Send a message with retry on failure using exponential backoff.
@ -151,12 +159,7 @@ class ChannelManager:
for attempt in range(max_attempts): for attempt in range(max_attempts):
try: try:
if msg.metadata.get("_stream_delta") or msg.metadata.get("_stream_end"): await self._send_once(channel, msg)
await channel.send_delta(msg.chat_id, msg.content, msg.metadata)
elif msg.metadata.get("_streamed"):
pass
else:
await channel.send(msg)
return # Send succeeded return # Send succeeded
except asyncio.CancelledError: except asyncio.CancelledError:
raise # Propagate cancellation for graceful shutdown raise # Propagate cancellation for graceful shutdown

View File

@ -374,6 +374,7 @@ class MochatChannel(BaseChannel):
content, msg.reply_to) content, msg.reply_to)
except Exception as e: except Exception as e:
logger.error("Failed to send Mochat message: {}", e) logger.error("Failed to send Mochat message: {}", e)
raise
# ---- config / init helpers --------------------------------------------- # ---- config / init helpers ---------------------------------------------

View File

@ -145,6 +145,7 @@ class SlackChannel(BaseChannel):
except Exception as e: except Exception as e:
logger.error("Error sending Slack message: {}", e) logger.error("Error sending Slack message: {}", e)
raise
async def _on_socket_request( async def _on_socket_request(
self, self,

View File

@ -479,6 +479,7 @@ class TelegramChannel(BaseChannel):
) )
except Exception as e2: except Exception as e2:
logger.error("Error sending Telegram message: {}", e2) logger.error("Error sending Telegram message: {}", e2)
raise
async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None: async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None:
"""Progressive message editing: send on first delta, edit on subsequent ones.""" """Progressive message editing: send on first delta, edit on subsequent ones."""
@ -488,7 +489,7 @@ class TelegramChannel(BaseChannel):
int_chat_id = int(chat_id) int_chat_id = int(chat_id)
if meta.get("_stream_end"): if meta.get("_stream_end"):
buf = self._stream_bufs.pop(chat_id, None) buf = self._stream_bufs.get(chat_id)
if not buf or not buf.message_id or not buf.text: if not buf or not buf.message_id or not buf.text:
return return
self._stop_typing(chat_id) self._stop_typing(chat_id)
@ -507,8 +508,10 @@ class TelegramChannel(BaseChannel):
chat_id=int_chat_id, message_id=buf.message_id, chat_id=int_chat_id, message_id=buf.message_id,
text=buf.text, text=buf.text,
) )
except Exception: except Exception as e2:
pass logger.warning("Final stream edit failed: {}", e2)
raise # Let ChannelManager handle retry
self._stream_bufs.pop(chat_id, None)
return return
buf = self._stream_bufs.get(chat_id) buf = self._stream_bufs.get(chat_id)

View File

@ -368,3 +368,4 @@ class WecomChannel(BaseChannel):
except Exception as e: except Exception as e:
logger.error("Error sending WeCom message: {}", e) logger.error("Error sending WeCom message: {}", e)
raise

View File

@ -751,6 +751,7 @@ class WeixinChannel(BaseChannel):
await self._send_text(msg.chat_id, chunk, ctx_token) await self._send_text(msg.chat_id, chunk, ctx_token)
except Exception as e: except Exception as e:
logger.error("Error sending WeChat message: {}", e) logger.error("Error sending WeChat message: {}", e)
raise
async def _send_text( async def _send_text(
self, self,

View File

@ -146,6 +146,7 @@ class WhatsAppChannel(BaseChannel):
await self._ws.send(json.dumps(payload, ensure_ascii=False)) await self._ws.send(json.dumps(payload, ensure_ascii=False))
except Exception as e: except Exception as e:
logger.error("Error sending WhatsApp message: {}", e) logger.error("Error sending WhatsApp message: {}", e)
raise
for media_path in msg.media or []: for media_path in msg.media or []:
try: try:
@ -160,6 +161,7 @@ class WhatsAppChannel(BaseChannel):
await self._ws.send(json.dumps(payload, ensure_ascii=False)) await self._ws.send(json.dumps(payload, ensure_ascii=False))
except Exception as e: except Exception as e:
logger.error("Error sending WhatsApp media {}: {}", media_path, e) logger.error("Error sending WhatsApp media {}: {}", media_path, e)
raise
async def _handle_bridge_message(self, raw: str) -> None: async def _handle_bridge_message(self, raw: str) -> None:
"""Handle a message from the bridge.""" """Handle a message from the bridge."""

View File

@ -25,7 +25,7 @@ class ChannelsConfig(Base):
send_progress: bool = True # stream agent's text progress to the channel send_progress: bool = True # stream agent's text progress to the channel
send_tool_hints: bool = False # stream tool-call hints (e.g. read_file("…")) send_tool_hints: bool = False # stream tool-call hints (e.g. read_file("…"))
send_max_retries: int = Field(default=3, ge=0, le=10) # Max retry attempts for message send failures send_max_retries: int = Field(default=3, ge=0, le=10) # Max delivery attempts (initial send included)
class AgentDefaults(Base): class AgentDefaults(Base):

View File

@ -13,7 +13,7 @@ except ImportError:
from nanobot.bus.events import OutboundMessage from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.channels.telegram import TELEGRAM_REPLY_CONTEXT_MAX_LEN, TelegramChannel from nanobot.channels.telegram import TELEGRAM_REPLY_CONTEXT_MAX_LEN, TelegramChannel, _StreamBuf
from nanobot.channels.telegram import TelegramConfig from nanobot.channels.telegram import TelegramConfig
@ -271,13 +271,30 @@ async def test_send_text_gives_up_after_max_retries() -> None:
orig_delay = tg_mod._SEND_RETRY_BASE_DELAY orig_delay = tg_mod._SEND_RETRY_BASE_DELAY
tg_mod._SEND_RETRY_BASE_DELAY = 0.01 tg_mod._SEND_RETRY_BASE_DELAY = 0.01
try: try:
await channel._send_text(123, "hello", None, {}) with pytest.raises(TimedOut):
await channel._send_text(123, "hello", None, {})
finally: finally:
tg_mod._SEND_RETRY_BASE_DELAY = orig_delay tg_mod._SEND_RETRY_BASE_DELAY = orig_delay
assert channel._app.bot.sent_messages == [] assert channel._app.bot.sent_messages == []
@pytest.mark.asyncio
async def test_send_delta_stream_end_raises_and_keeps_buffer_on_failure() -> None:
channel = TelegramChannel(
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
MessageBus(),
)
channel._app = _FakeApp(lambda: None)
channel._app.bot.edit_message_text = AsyncMock(side_effect=RuntimeError("boom"))
channel._stream_bufs["123"] = _StreamBuf(text="hello", message_id=7, last_edit=0.0)
with pytest.raises(RuntimeError, match="boom"):
await channel.send_delta("123", "", {"_stream_end": True})
assert "123" in channel._stream_bufs
def test_derive_topic_session_key_uses_thread_id() -> None: def test_derive_topic_session_key_uses_thread_id() -> None:
message = SimpleNamespace( message = SimpleNamespace(
chat=SimpleNamespace(type="supergroup"), chat=SimpleNamespace(type="supergroup"),