feat(channel): add message send retry mechanism with exponential backoff

- Add send_max_retries config option (default: 3, range: 0-10)
- Implement _send_with_retry in ChannelManager with 1s/2s/4s backoff
- Propagate CancelledError for graceful shutdown
- Fix telegram send_delta to raise exceptions for Manager retry
- Add comprehensive tests for retry logic
- Document channel settings in README
This commit is contained in:
chengyongru 2026-03-25 18:37:32 +08:00 committed by Xubin Ren
parent 3f71014b7c
commit 5e9fa28ff2
6 changed files with 707 additions and 12 deletions

View File

@ -1157,6 +1157,38 @@ That's it! Environment variables, model routing, config matching, and `nanobot s
</details>
### Channel Settings
Global settings that apply to all channels. Configure under the `channels` section in `~/.nanobot/config.json`:
```json
{
"channels": {
"sendProgress": true,
"sendToolHints": false,
"sendMaxRetries": 3,
"telegram": { ... }
}
}
```
| Setting | Default | Description |
|---------|---------|-------------|
| `sendProgress` | `true` | Stream agent's text progress to the channel |
| `sendToolHints` | `false` | Stream tool-call hints (e.g. `read_file("…")`) |
| `sendMaxRetries` | `3` | Max retry attempts for message send failures (0-10) |
#### Retry Behavior
When a message fails to send, nanobot will automatically retry with exponential backoff:
- **Attempts 1-3**: Retry delays are 1s, 2s, 4s
- **Attempts 4+**: Retry delay caps at 4s
- **Transient failures** (network hiccups, temporary API limits): Retry usually succeeds
- **Permanent failures** (invalid token, channel banned): All retries fail
> [!NOTE]
> When a channel is completely unavailable, there's no way to notify the user since we cannot reach them through that channel. Monitor logs for "Failed to send to {channel} after N attempts" to detect persistent delivery failures.
### Web Search

View File

@ -7,10 +7,14 @@ from typing import Any
from loguru import logger
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.schema import Config
# Retry delays for message sending (exponential backoff: 1s, 2s, 4s)
_SEND_RETRY_DELAYS = (1, 2, 4)
class ChannelManager:
"""
@ -129,15 +133,7 @@ class ChannelManager:
channel = self.channels.get(msg.channel)
if channel:
try:
if msg.metadata.get("_stream_delta") or msg.metadata.get("_stream_end"):
await channel.send_delta(msg.chat_id, msg.content, msg.metadata)
elif msg.metadata.get("_streamed"):
pass
else:
await channel.send(msg)
except Exception as e:
logger.error("Error sending to {}: {}", msg.channel, e)
await self._send_with_retry(channel, msg)
else:
logger.warning("Unknown channel: {}", msg.channel)
@ -146,6 +142,41 @@ class ChannelManager:
except asyncio.CancelledError:
break
async def _send_with_retry(self, channel: BaseChannel, msg: OutboundMessage) -> None:
"""Send a message with retry on failure using exponential backoff.
Note: CancelledError is re-raised to allow graceful shutdown.
"""
max_attempts = max(self.config.channels.send_max_retries, 1)
for attempt in range(max_attempts):
try:
if msg.metadata.get("_stream_delta") or msg.metadata.get("_stream_end"):
await channel.send_delta(msg.chat_id, msg.content, msg.metadata)
elif msg.metadata.get("_streamed"):
pass
else:
await channel.send(msg)
return # Send succeeded
except asyncio.CancelledError:
raise # Propagate cancellation for graceful shutdown
except Exception as e:
if attempt == max_attempts - 1:
logger.error(
"Failed to send to {} after {} attempts: {} - {}",
msg.channel, max_attempts, type(e).__name__, e
)
return
delay = _SEND_RETRY_DELAYS[min(attempt, len(_SEND_RETRY_DELAYS) - 1)]
logger.warning(
"Send to {} failed (attempt {}/{}): {}, retrying in {}s",
msg.channel, attempt + 1, max_attempts, type(e).__name__, delay
)
try:
await asyncio.sleep(delay)
except asyncio.CancelledError:
raise # Propagate cancellation during sleep
def get_channel(self, name: str) -> BaseChannel | None:
"""Get a channel by name."""
return self.channels.get(name)

View File

@ -528,6 +528,7 @@ class TelegramChannel(BaseChannel):
buf.last_edit = now
except Exception as e:
logger.warning("Stream initial send failed: {}", e)
raise # Let ChannelManager handle retry
elif (now - buf.last_edit) >= self._STREAM_EDIT_INTERVAL:
try:
await self._call_with_retry(
@ -536,8 +537,9 @@ class TelegramChannel(BaseChannel):
text=buf.text,
)
buf.last_edit = now
except Exception:
pass
except Exception as e:
logger.warning("Stream edit failed: {}", e)
raise # Let ChannelManager handle retry
async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /start command."""

View File

@ -25,6 +25,7 @@ class ChannelsConfig(Base):
send_progress: bool = True # stream agent's text progress to the channel
send_tool_hints: bool = False # stream tool-call hints (e.g. read_file("…"))
send_max_retries: int = Field(default=3, ge=0, le=10) # Max retry attempts for message send failures
class AgentDefaults(Base):

View File

@ -120,3 +120,16 @@ ignore = ["E501"]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
[tool.coverage.run]
source = ["nanobot"]
omit = ["tests/*", "**/tests/*"]
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"def __repr__",
"raise NotImplementedError",
"if __name__ == .__main__.:",
"if TYPE_CHECKING:",
]

View File

@ -2,8 +2,9 @@
from __future__ import annotations
import asyncio
from types import SimpleNamespace
from unittest.mock import patch
from unittest.mock import AsyncMock, patch
import pytest
@ -262,3 +263,618 @@ def test_builtin_channel_init_from_dict():
ch = TelegramChannel({"enabled": False, "token": "test-tok", "allowFrom": ["*"]}, bus)
assert ch.config.token == "test-tok"
assert ch.config.allow_from == ["*"]
def test_channels_config_send_max_retries_default():
"""ChannelsConfig should have send_max_retries with default value of 3."""
cfg = ChannelsConfig()
assert hasattr(cfg, 'send_max_retries')
assert cfg.send_max_retries == 3
def test_channels_config_send_max_retries_upper_bound():
"""send_max_retries should be bounded to prevent resource exhaustion."""
from pydantic import ValidationError
# Value too high should be rejected
with pytest.raises(ValidationError):
ChannelsConfig(send_max_retries=100)
# Negative should be rejected
with pytest.raises(ValidationError):
ChannelsConfig(send_max_retries=-1)
# Boundary values should be allowed
cfg_min = ChannelsConfig(send_max_retries=0)
assert cfg_min.send_max_retries == 0
cfg_max = ChannelsConfig(send_max_retries=10)
assert cfg_max.send_max_retries == 10
# Value above upper bound should be rejected
with pytest.raises(ValidationError):
ChannelsConfig(send_max_retries=11)
# ---------------------------------------------------------------------------
# _send_with_retry
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_send_with_retry_succeeds_first_try():
"""_send_with_retry should succeed on first try and not retry."""
call_count = 0
class _FailingChannel(BaseChannel):
name = "failing"
display_name = "Failing"
async def start(self) -> None:
pass
async def stop(self) -> None:
pass
async def send(self, msg: OutboundMessage) -> None:
nonlocal call_count
call_count += 1
# Succeeds on first try
fake_config = SimpleNamespace(
channels=ChannelsConfig(send_max_retries=3),
providers=SimpleNamespace(groq=SimpleNamespace(api_key="")),
)
mgr = ChannelManager.__new__(ChannelManager)
mgr.config = fake_config
mgr.bus = MessageBus()
mgr.channels = {"failing": _FailingChannel(fake_config, mgr.bus)}
mgr._dispatch_task = None
msg = OutboundMessage(channel="failing", chat_id="123", content="test")
await mgr._send_with_retry(mgr.channels["failing"], msg)
assert call_count == 1
@pytest.mark.asyncio
async def test_send_with_retry_retries_on_failure():
"""_send_with_retry should retry on failure up to max_retries times."""
call_count = 0
class _FailingChannel(BaseChannel):
name = "failing"
display_name = "Failing"
async def start(self) -> None:
pass
async def stop(self) -> None:
pass
async def send(self, msg: OutboundMessage) -> None:
nonlocal call_count
call_count += 1
raise RuntimeError("simulated failure")
fake_config = SimpleNamespace(
channels=ChannelsConfig(send_max_retries=3),
providers=SimpleNamespace(groq=SimpleNamespace(api_key="")),
)
mgr = ChannelManager.__new__(ChannelManager)
mgr.config = fake_config
mgr.bus = MessageBus()
mgr.channels = {"failing": _FailingChannel(fake_config, mgr.bus)}
mgr._dispatch_task = None
msg = OutboundMessage(channel="failing", chat_id="123", content="test")
# Patch asyncio.sleep to avoid actual delays
with patch("nanobot.channels.manager.asyncio.sleep", new_callable=AsyncMock) as mock_sleep:
await mgr._send_with_retry(mgr.channels["failing"], msg)
assert call_count == 3 # 3 total attempts (initial + 2 retries)
assert mock_sleep.call_count == 2 # 2 sleeps between retries
@pytest.mark.asyncio
async def test_send_with_retry_no_retry_when_max_is_zero():
"""_send_with_retry should not retry when send_max_retries is 0."""
call_count = 0
class _FailingChannel(BaseChannel):
name = "failing"
display_name = "Failing"
async def start(self) -> None:
pass
async def stop(self) -> None:
pass
async def send(self, msg: OutboundMessage) -> None:
nonlocal call_count
call_count += 1
raise RuntimeError("simulated failure")
fake_config = SimpleNamespace(
channels=ChannelsConfig(send_max_retries=0),
providers=SimpleNamespace(groq=SimpleNamespace(api_key="")),
)
mgr = ChannelManager.__new__(ChannelManager)
mgr.config = fake_config
mgr.bus = MessageBus()
mgr.channels = {"failing": _FailingChannel(fake_config, mgr.bus)}
mgr._dispatch_task = None
msg = OutboundMessage(channel="failing", chat_id="123", content="test")
with patch("nanobot.channels.manager.asyncio.sleep", new_callable=AsyncMock):
await mgr._send_with_retry(mgr.channels["failing"], msg)
assert call_count == 1 # Called once but no retry (max(0, 1) = 1)
@pytest.mark.asyncio
async def test_send_with_retry_calls_send_delta():
"""_send_with_retry should call send_delta when metadata has _stream_delta."""
send_delta_called = False
class _StreamingChannel(BaseChannel):
name = "streaming"
display_name = "Streaming"
async def start(self) -> None:
pass
async def stop(self) -> None:
pass
async def send(self, msg: OutboundMessage) -> None:
pass # Should not be called
async def send_delta(self, chat_id: str, delta: str, metadata: dict | None = None) -> None:
nonlocal send_delta_called
send_delta_called = True
fake_config = SimpleNamespace(
channels=ChannelsConfig(send_max_retries=3),
providers=SimpleNamespace(groq=SimpleNamespace(api_key="")),
)
mgr = ChannelManager.__new__(ChannelManager)
mgr.config = fake_config
mgr.bus = MessageBus()
mgr.channels = {"streaming": _StreamingChannel(fake_config, mgr.bus)}
mgr._dispatch_task = None
msg = OutboundMessage(
channel="streaming", chat_id="123", content="test delta",
metadata={"_stream_delta": True}
)
await mgr._send_with_retry(mgr.channels["streaming"], msg)
assert send_delta_called is True
@pytest.mark.asyncio
async def test_send_with_retry_skips_send_when_streamed():
"""_send_with_retry should not call send when metadata has _streamed flag."""
send_called = False
send_delta_called = False
class _StreamedChannel(BaseChannel):
name = "streamed"
display_name = "Streamed"
async def start(self) -> None:
pass
async def stop(self) -> None:
pass
async def send(self, msg: OutboundMessage) -> None:
nonlocal send_called
send_called = True
async def send_delta(self, chat_id: str, delta: str, metadata: dict | None = None) -> None:
nonlocal send_delta_called
send_delta_called = True
fake_config = SimpleNamespace(
channels=ChannelsConfig(send_max_retries=3),
providers=SimpleNamespace(groq=SimpleNamespace(api_key="")),
)
mgr = ChannelManager.__new__(ChannelManager)
mgr.config = fake_config
mgr.bus = MessageBus()
mgr.channels = {"streamed": _StreamedChannel(fake_config, mgr.bus)}
mgr._dispatch_task = None
# _streamed means message was already sent via send_delta, so skip send
msg = OutboundMessage(
channel="streamed", chat_id="123", content="test",
metadata={"_streamed": True}
)
await mgr._send_with_retry(mgr.channels["streamed"], msg)
assert send_called is False
assert send_delta_called is False
@pytest.mark.asyncio
async def test_send_with_retry_propagates_cancelled_error():
"""_send_with_retry should re-raise CancelledError for graceful shutdown."""
class _CancellingChannel(BaseChannel):
name = "cancelling"
display_name = "Cancelling"
async def start(self) -> None:
pass
async def stop(self) -> None:
pass
async def send(self, msg: OutboundMessage) -> None:
raise asyncio.CancelledError("simulated cancellation")
fake_config = SimpleNamespace(
channels=ChannelsConfig(send_max_retries=3),
providers=SimpleNamespace(groq=SimpleNamespace(api_key="")),
)
mgr = ChannelManager.__new__(ChannelManager)
mgr.config = fake_config
mgr.bus = MessageBus()
mgr.channels = {"cancelling": _CancellingChannel(fake_config, mgr.bus)}
mgr._dispatch_task = None
msg = OutboundMessage(channel="cancelling", chat_id="123", content="test")
with pytest.raises(asyncio.CancelledError):
await mgr._send_with_retry(mgr.channels["cancelling"], msg)
@pytest.mark.asyncio
async def test_send_with_retry_propagates_cancelled_error_during_sleep():
"""_send_with_retry should re-raise CancelledError during sleep."""
call_count = 0
class _FailingChannel(BaseChannel):
name = "failing"
display_name = "Failing"
async def start(self) -> None:
pass
async def stop(self) -> None:
pass
async def send(self, msg: OutboundMessage) -> None:
nonlocal call_count
call_count += 1
raise RuntimeError("simulated failure")
fake_config = SimpleNamespace(
channels=ChannelsConfig(send_max_retries=3),
providers=SimpleNamespace(groq=SimpleNamespace(api_key="")),
)
mgr = ChannelManager.__new__(ChannelManager)
mgr.config = fake_config
mgr.bus = MessageBus()
mgr.channels = {"failing": _FailingChannel(fake_config, mgr.bus)}
mgr._dispatch_task = None
msg = OutboundMessage(channel="failing", chat_id="123", content="test")
# Mock sleep to raise CancelledError
async def cancel_during_sleep(_):
raise asyncio.CancelledError("cancelled during sleep")
with patch("nanobot.channels.manager.asyncio.sleep", side_effect=cancel_during_sleep):
with pytest.raises(asyncio.CancelledError):
await mgr._send_with_retry(mgr.channels["failing"], msg)
# Should have attempted once before sleep was cancelled
assert call_count == 1
# ---------------------------------------------------------------------------
# ChannelManager - lifecycle and getters
# ---------------------------------------------------------------------------
class _ChannelWithAllowFrom(BaseChannel):
"""Channel with configurable allow_from."""
name = "withallow"
display_name = "With Allow"
def __init__(self, config, bus, allow_from):
super().__init__(config, bus)
self.config.allow_from = allow_from
async def start(self) -> None:
pass
async def stop(self) -> None:
pass
async def send(self, msg: OutboundMessage) -> None:
pass
class _StartableChannel(BaseChannel):
"""Channel that tracks start/stop calls."""
name = "startable"
display_name = "Startable"
def __init__(self, config, bus):
super().__init__(config, bus)
self.started = False
self.stopped = False
async def start(self) -> None:
self.started = True
async def stop(self) -> None:
self.stopped = True
async def send(self, msg: OutboundMessage) -> None:
pass
@pytest.mark.asyncio
async def test_validate_allow_from_raises_on_empty_list():
"""_validate_allow_from should raise SystemExit when allow_from is empty list."""
fake_config = SimpleNamespace(
channels=ChannelsConfig(),
providers=SimpleNamespace(groq=SimpleNamespace(api_key="")),
)
mgr = ChannelManager.__new__(ChannelManager)
mgr.config = fake_config
mgr.channels = {"test": _ChannelWithAllowFrom(fake_config, None, [])}
mgr._dispatch_task = None
with pytest.raises(SystemExit) as exc_info:
mgr._validate_allow_from()
assert "empty allowFrom" in str(exc_info.value)
@pytest.mark.asyncio
async def test_validate_allow_from_passes_with_asterisk():
"""_validate_allow_from should not raise when allow_from contains '*'."""
fake_config = SimpleNamespace(
channels=ChannelsConfig(),
providers=SimpleNamespace(groq=SimpleNamespace(api_key="")),
)
mgr = ChannelManager.__new__(ChannelManager)
mgr.config = fake_config
mgr.channels = {"test": _ChannelWithAllowFrom(fake_config, None, ["*"])}
mgr._dispatch_task = None
# Should not raise
mgr._validate_allow_from()
@pytest.mark.asyncio
async def test_get_channel_returns_channel_if_exists():
"""get_channel should return the channel if it exists."""
fake_config = SimpleNamespace(
channels=ChannelsConfig(),
providers=SimpleNamespace(groq=SimpleNamespace(api_key="")),
)
mgr = ChannelManager.__new__(ChannelManager)
mgr.config = fake_config
mgr.bus = MessageBus()
mgr.channels = {"telegram": _StartableChannel(fake_config, mgr.bus)}
mgr._dispatch_task = None
assert mgr.get_channel("telegram") is not None
assert mgr.get_channel("nonexistent") is None
@pytest.mark.asyncio
async def test_get_status_returns_running_state():
"""get_status should return enabled and running state for each channel."""
fake_config = SimpleNamespace(
channels=ChannelsConfig(),
providers=SimpleNamespace(groq=SimpleNamespace(api_key="")),
)
mgr = ChannelManager.__new__(ChannelManager)
mgr.config = fake_config
mgr.bus = MessageBus()
ch = _StartableChannel(fake_config, mgr.bus)
mgr.channels = {"startable": ch}
mgr._dispatch_task = None
status = mgr.get_status()
assert status["startable"]["enabled"] is True
assert status["startable"]["running"] is False # Not started yet
@pytest.mark.asyncio
async def test_enabled_channels_returns_channel_names():
"""enabled_channels should return list of enabled channel names."""
fake_config = SimpleNamespace(
channels=ChannelsConfig(),
providers=SimpleNamespace(groq=SimpleNamespace(api_key="")),
)
mgr = ChannelManager.__new__(ChannelManager)
mgr.config = fake_config
mgr.bus = MessageBus()
mgr.channels = {
"telegram": _StartableChannel(fake_config, mgr.bus),
"slack": _StartableChannel(fake_config, mgr.bus),
}
mgr._dispatch_task = None
enabled = mgr.enabled_channels
assert "telegram" in enabled
assert "slack" in enabled
assert len(enabled) == 2
@pytest.mark.asyncio
async def test_stop_all_cancels_dispatcher_and_stops_channels():
"""stop_all should cancel the dispatch task and stop all channels."""
fake_config = SimpleNamespace(
channels=ChannelsConfig(),
providers=SimpleNamespace(groq=SimpleNamespace(api_key="")),
)
mgr = ChannelManager.__new__(ChannelManager)
mgr.config = fake_config
mgr.bus = MessageBus()
ch = _StartableChannel(fake_config, mgr.bus)
mgr.channels = {"startable": ch}
# Create a real cancelled task
async def dummy_task():
while True:
await asyncio.sleep(1)
dispatch_task = asyncio.create_task(dummy_task())
mgr._dispatch_task = dispatch_task
await mgr.stop_all()
# Task should be cancelled
assert dispatch_task.cancelled()
# Channel should be stopped
assert ch.stopped is True
@pytest.mark.asyncio
async def test_start_channel_logs_error_on_failure():
"""_start_channel should log error when channel start fails."""
class _FailingChannel(BaseChannel):
name = "failing"
display_name = "Failing"
async def start(self) -> None:
raise RuntimeError("connection failed")
async def stop(self) -> None:
pass
async def send(self, msg: OutboundMessage) -> None:
pass
fake_config = SimpleNamespace(
channels=ChannelsConfig(),
providers=SimpleNamespace(groq=SimpleNamespace(api_key="")),
)
mgr = ChannelManager.__new__(ChannelManager)
mgr.config = fake_config
mgr.bus = MessageBus()
mgr.channels = {}
mgr._dispatch_task = None
ch = _FailingChannel(fake_config, mgr.bus)
# Should not raise, just log error
await mgr._start_channel("failing", ch)
@pytest.mark.asyncio
async def test_stop_all_handles_channel_exception():
"""stop_all should handle exceptions when stopping channels gracefully."""
class _StopFailingChannel(BaseChannel):
name = "stopfailing"
display_name = "Stop Failing"
async def start(self) -> None:
pass
async def stop(self) -> None:
raise RuntimeError("stop failed")
async def send(self, msg: OutboundMessage) -> None:
pass
fake_config = SimpleNamespace(
channels=ChannelsConfig(),
providers=SimpleNamespace(groq=SimpleNamespace(api_key="")),
)
mgr = ChannelManager.__new__(ChannelManager)
mgr.config = fake_config
mgr.bus = MessageBus()
mgr.channels = {"stopfailing": _StopFailingChannel(fake_config, mgr.bus)}
mgr._dispatch_task = None
# Should not raise even if channel.stop() raises
await mgr.stop_all()
@pytest.mark.asyncio
async def test_start_all_no_channels_logs_warning():
"""start_all should log warning when no channels are enabled."""
fake_config = SimpleNamespace(
channels=ChannelsConfig(),
providers=SimpleNamespace(groq=SimpleNamespace(api_key="")),
)
mgr = ChannelManager.__new__(ChannelManager)
mgr.config = fake_config
mgr.bus = MessageBus()
mgr.channels = {} # No channels
mgr._dispatch_task = None
# Should return early without creating dispatch task
await mgr.start_all()
assert mgr._dispatch_task is None
@pytest.mark.asyncio
async def test_start_all_creates_dispatch_task():
"""start_all should create the dispatch task when channels exist."""
fake_config = SimpleNamespace(
channels=ChannelsConfig(),
providers=SimpleNamespace(groq=SimpleNamespace(api_key="")),
)
mgr = ChannelManager.__new__(ChannelManager)
mgr.config = fake_config
mgr.bus = MessageBus()
ch = _StartableChannel(fake_config, mgr.bus)
mgr.channels = {"startable": ch}
mgr._dispatch_task = None
# Cancel immediately after start to avoid running forever
async def cancel_after_start():
await asyncio.sleep(0.01)
if mgr._dispatch_task:
mgr._dispatch_task.cancel()
cancel_task = asyncio.create_task(cancel_after_start())
try:
await mgr.start_all()
except asyncio.CancelledError:
pass
finally:
cancel_task.cancel()
try:
await cancel_task
except asyncio.CancelledError:
pass
# Dispatch task should have been created
assert mgr._dispatch_task is not None