From 5e9fa28ff271ff8a521c93e17e68e4dbf09c40da Mon Sep 17 00:00:00 2001 From: chengyongru Date: Wed, 25 Mar 2026 18:37:32 +0800 Subject: [PATCH] feat(channel): add message send retry mechanism with exponential backoff - Add send_max_retries config option (default: 3, range: 0-10) - Implement _send_with_retry in ChannelManager with 1s/2s/4s backoff - Propagate CancelledError for graceful shutdown - Fix telegram send_delta to raise exceptions for Manager retry - Add comprehensive tests for retry logic - Document channel settings in README --- README.md | 32 ++ nanobot/channels/manager.py | 49 +- nanobot/channels/telegram.py | 6 +- nanobot/config/schema.py | 1 + pyproject.toml | 13 + tests/channels/test_channel_plugins.py | 618 ++++++++++++++++++++++++- 6 files changed, 707 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index b6b212d4e..40ecd4cb1 100644 --- a/README.md +++ b/README.md @@ -1157,6 +1157,38 @@ That's it! Environment variables, model routing, config matching, and `nanobot s +### Channel Settings + +Global settings that apply to all channels. Configure under the `channels` section in `~/.nanobot/config.json`: + +```json +{ + "channels": { + "sendProgress": true, + "sendToolHints": false, + "sendMaxRetries": 3, + "telegram": { ... } + } +} +``` + +| Setting | Default | Description | +|---------|---------|-------------| +| `sendProgress` | `true` | Stream agent's text progress to the channel | +| `sendToolHints` | `false` | Stream tool-call hints (e.g. `read_file("…")`) | +| `sendMaxRetries` | `3` | Max retry attempts for message send failures (0-10) | + +#### Retry Behavior + +When a message fails to send, nanobot will automatically retry with exponential backoff: + +- **Attempts 1-3**: Retry delays are 1s, 2s, 4s +- **Attempts 4+**: Retry delay caps at 4s +- **Transient failures** (network hiccups, temporary API limits): Retry usually succeeds +- **Permanent failures** (invalid token, channel banned): All retries fail + +> [!NOTE] +> When a channel is completely unavailable, there's no way to notify the user since we cannot reach them through that channel. Monitor logs for "Failed to send to {channel} after N attempts" to detect persistent delivery failures. ### Web Search diff --git a/nanobot/channels/manager.py b/nanobot/channels/manager.py index 3a53b6307..2f1b400c4 100644 --- a/nanobot/channels/manager.py +++ b/nanobot/channels/manager.py @@ -7,10 +7,14 @@ from typing import Any from loguru import logger +from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.channels.base import BaseChannel from nanobot.config.schema import Config +# Retry delays for message sending (exponential backoff: 1s, 2s, 4s) +_SEND_RETRY_DELAYS = (1, 2, 4) + class ChannelManager: """ @@ -129,15 +133,7 @@ class ChannelManager: channel = self.channels.get(msg.channel) if channel: - try: - if msg.metadata.get("_stream_delta") or msg.metadata.get("_stream_end"): - await channel.send_delta(msg.chat_id, msg.content, msg.metadata) - elif msg.metadata.get("_streamed"): - pass - else: - await channel.send(msg) - except Exception as e: - logger.error("Error sending to {}: {}", msg.channel, e) + await self._send_with_retry(channel, msg) else: logger.warning("Unknown channel: {}", msg.channel) @@ -146,6 +142,41 @@ class ChannelManager: except asyncio.CancelledError: break + async def _send_with_retry(self, channel: BaseChannel, msg: OutboundMessage) -> None: + """Send a message with retry on failure using exponential backoff. + + Note: CancelledError is re-raised to allow graceful shutdown. + """ + max_attempts = max(self.config.channels.send_max_retries, 1) + + for attempt in range(max_attempts): + try: + if msg.metadata.get("_stream_delta") or msg.metadata.get("_stream_end"): + await channel.send_delta(msg.chat_id, msg.content, msg.metadata) + elif msg.metadata.get("_streamed"): + pass + else: + await channel.send(msg) + return # Send succeeded + except asyncio.CancelledError: + raise # Propagate cancellation for graceful shutdown + except Exception as e: + if attempt == max_attempts - 1: + logger.error( + "Failed to send to {} after {} attempts: {} - {}", + msg.channel, max_attempts, type(e).__name__, e + ) + return + delay = _SEND_RETRY_DELAYS[min(attempt, len(_SEND_RETRY_DELAYS) - 1)] + logger.warning( + "Send to {} failed (attempt {}/{}): {}, retrying in {}s", + msg.channel, attempt + 1, max_attempts, type(e).__name__, delay + ) + try: + await asyncio.sleep(delay) + except asyncio.CancelledError: + raise # Propagate cancellation during sleep + def get_channel(self, name: str) -> BaseChannel | None: """Get a channel by name.""" return self.channels.get(name) diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index 04cc89cc2..fcccbe8a4 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -528,6 +528,7 @@ class TelegramChannel(BaseChannel): buf.last_edit = now except Exception as e: logger.warning("Stream initial send failed: {}", e) + raise # Let ChannelManager handle retry elif (now - buf.last_edit) >= self._STREAM_EDIT_INTERVAL: try: await self._call_with_retry( @@ -536,8 +537,9 @@ class TelegramChannel(BaseChannel): text=buf.text, ) buf.last_edit = now - except Exception: - pass + except Exception as e: + logger.warning("Stream edit failed: {}", e) + raise # Let ChannelManager handle retry async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle /start command.""" diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index 6f05e569e..1d964a642 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -25,6 +25,7 @@ class ChannelsConfig(Base): send_progress: bool = True # stream agent's text progress to the channel send_tool_hints: bool = False # stream tool-call hints (e.g. read_file("…")) + send_max_retries: int = Field(default=3, ge=0, le=10) # Max retry attempts for message send failures class AgentDefaults(Base): diff --git a/pyproject.toml b/pyproject.toml index aca72777d..501a6bb45 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -120,3 +120,16 @@ ignore = ["E501"] [tool.pytest.ini_options] asyncio_mode = "auto" testpaths = ["tests"] + +[tool.coverage.run] +source = ["nanobot"] +omit = ["tests/*", "**/tests/*"] + +[tool.coverage.report] +exclude_lines = [ + "pragma: no cover", + "def __repr__", + "raise NotImplementedError", + "if __name__ == .__main__.:", + "if TYPE_CHECKING:", +] diff --git a/tests/channels/test_channel_plugins.py b/tests/channels/test_channel_plugins.py index 3f34dc598..a0b458a08 100644 --- a/tests/channels/test_channel_plugins.py +++ b/tests/channels/test_channel_plugins.py @@ -2,8 +2,9 @@ from __future__ import annotations +import asyncio from types import SimpleNamespace -from unittest.mock import patch +from unittest.mock import AsyncMock, patch import pytest @@ -262,3 +263,618 @@ def test_builtin_channel_init_from_dict(): ch = TelegramChannel({"enabled": False, "token": "test-tok", "allowFrom": ["*"]}, bus) assert ch.config.token == "test-tok" assert ch.config.allow_from == ["*"] + + +def test_channels_config_send_max_retries_default(): + """ChannelsConfig should have send_max_retries with default value of 3.""" + cfg = ChannelsConfig() + assert hasattr(cfg, 'send_max_retries') + assert cfg.send_max_retries == 3 + + +def test_channels_config_send_max_retries_upper_bound(): + """send_max_retries should be bounded to prevent resource exhaustion.""" + from pydantic import ValidationError + + # Value too high should be rejected + with pytest.raises(ValidationError): + ChannelsConfig(send_max_retries=100) + + # Negative should be rejected + with pytest.raises(ValidationError): + ChannelsConfig(send_max_retries=-1) + + # Boundary values should be allowed + cfg_min = ChannelsConfig(send_max_retries=0) + assert cfg_min.send_max_retries == 0 + + cfg_max = ChannelsConfig(send_max_retries=10) + assert cfg_max.send_max_retries == 10 + + # Value above upper bound should be rejected + with pytest.raises(ValidationError): + ChannelsConfig(send_max_retries=11) + + +# --------------------------------------------------------------------------- +# _send_with_retry +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_send_with_retry_succeeds_first_try(): + """_send_with_retry should succeed on first try and not retry.""" + call_count = 0 + + class _FailingChannel(BaseChannel): + name = "failing" + display_name = "Failing" + + async def start(self) -> None: + pass + + async def stop(self) -> None: + pass + + async def send(self, msg: OutboundMessage) -> None: + nonlocal call_count + call_count += 1 + # Succeeds on first try + + fake_config = SimpleNamespace( + channels=ChannelsConfig(send_max_retries=3), + providers=SimpleNamespace(groq=SimpleNamespace(api_key="")), + ) + + mgr = ChannelManager.__new__(ChannelManager) + mgr.config = fake_config + mgr.bus = MessageBus() + mgr.channels = {"failing": _FailingChannel(fake_config, mgr.bus)} + mgr._dispatch_task = None + + msg = OutboundMessage(channel="failing", chat_id="123", content="test") + await mgr._send_with_retry(mgr.channels["failing"], msg) + + assert call_count == 1 + + +@pytest.mark.asyncio +async def test_send_with_retry_retries_on_failure(): + """_send_with_retry should retry on failure up to max_retries times.""" + call_count = 0 + + class _FailingChannel(BaseChannel): + name = "failing" + display_name = "Failing" + + async def start(self) -> None: + pass + + async def stop(self) -> None: + pass + + async def send(self, msg: OutboundMessage) -> None: + nonlocal call_count + call_count += 1 + raise RuntimeError("simulated failure") + + fake_config = SimpleNamespace( + channels=ChannelsConfig(send_max_retries=3), + providers=SimpleNamespace(groq=SimpleNamespace(api_key="")), + ) + + mgr = ChannelManager.__new__(ChannelManager) + mgr.config = fake_config + mgr.bus = MessageBus() + mgr.channels = {"failing": _FailingChannel(fake_config, mgr.bus)} + mgr._dispatch_task = None + + msg = OutboundMessage(channel="failing", chat_id="123", content="test") + + # Patch asyncio.sleep to avoid actual delays + with patch("nanobot.channels.manager.asyncio.sleep", new_callable=AsyncMock) as mock_sleep: + await mgr._send_with_retry(mgr.channels["failing"], msg) + + assert call_count == 3 # 3 total attempts (initial + 2 retries) + assert mock_sleep.call_count == 2 # 2 sleeps between retries + + +@pytest.mark.asyncio +async def test_send_with_retry_no_retry_when_max_is_zero(): + """_send_with_retry should not retry when send_max_retries is 0.""" + call_count = 0 + + class _FailingChannel(BaseChannel): + name = "failing" + display_name = "Failing" + + async def start(self) -> None: + pass + + async def stop(self) -> None: + pass + + async def send(self, msg: OutboundMessage) -> None: + nonlocal call_count + call_count += 1 + raise RuntimeError("simulated failure") + + fake_config = SimpleNamespace( + channels=ChannelsConfig(send_max_retries=0), + providers=SimpleNamespace(groq=SimpleNamespace(api_key="")), + ) + + mgr = ChannelManager.__new__(ChannelManager) + mgr.config = fake_config + mgr.bus = MessageBus() + mgr.channels = {"failing": _FailingChannel(fake_config, mgr.bus)} + mgr._dispatch_task = None + + msg = OutboundMessage(channel="failing", chat_id="123", content="test") + + with patch("nanobot.channels.manager.asyncio.sleep", new_callable=AsyncMock): + await mgr._send_with_retry(mgr.channels["failing"], msg) + + assert call_count == 1 # Called once but no retry (max(0, 1) = 1) + + +@pytest.mark.asyncio +async def test_send_with_retry_calls_send_delta(): + """_send_with_retry should call send_delta when metadata has _stream_delta.""" + send_delta_called = False + + class _StreamingChannel(BaseChannel): + name = "streaming" + display_name = "Streaming" + + async def start(self) -> None: + pass + + async def stop(self) -> None: + pass + + async def send(self, msg: OutboundMessage) -> None: + pass # Should not be called + + async def send_delta(self, chat_id: str, delta: str, metadata: dict | None = None) -> None: + nonlocal send_delta_called + send_delta_called = True + + fake_config = SimpleNamespace( + channels=ChannelsConfig(send_max_retries=3), + providers=SimpleNamespace(groq=SimpleNamespace(api_key="")), + ) + + mgr = ChannelManager.__new__(ChannelManager) + mgr.config = fake_config + mgr.bus = MessageBus() + mgr.channels = {"streaming": _StreamingChannel(fake_config, mgr.bus)} + mgr._dispatch_task = None + + msg = OutboundMessage( + channel="streaming", chat_id="123", content="test delta", + metadata={"_stream_delta": True} + ) + await mgr._send_with_retry(mgr.channels["streaming"], msg) + + assert send_delta_called is True + + +@pytest.mark.asyncio +async def test_send_with_retry_skips_send_when_streamed(): + """_send_with_retry should not call send when metadata has _streamed flag.""" + send_called = False + send_delta_called = False + + class _StreamedChannel(BaseChannel): + name = "streamed" + display_name = "Streamed" + + async def start(self) -> None: + pass + + async def stop(self) -> None: + pass + + async def send(self, msg: OutboundMessage) -> None: + nonlocal send_called + send_called = True + + async def send_delta(self, chat_id: str, delta: str, metadata: dict | None = None) -> None: + nonlocal send_delta_called + send_delta_called = True + + fake_config = SimpleNamespace( + channels=ChannelsConfig(send_max_retries=3), + providers=SimpleNamespace(groq=SimpleNamespace(api_key="")), + ) + + mgr = ChannelManager.__new__(ChannelManager) + mgr.config = fake_config + mgr.bus = MessageBus() + mgr.channels = {"streamed": _StreamedChannel(fake_config, mgr.bus)} + mgr._dispatch_task = None + + # _streamed means message was already sent via send_delta, so skip send + msg = OutboundMessage( + channel="streamed", chat_id="123", content="test", + metadata={"_streamed": True} + ) + await mgr._send_with_retry(mgr.channels["streamed"], msg) + + assert send_called is False + assert send_delta_called is False + + +@pytest.mark.asyncio +async def test_send_with_retry_propagates_cancelled_error(): + """_send_with_retry should re-raise CancelledError for graceful shutdown.""" + class _CancellingChannel(BaseChannel): + name = "cancelling" + display_name = "Cancelling" + + async def start(self) -> None: + pass + + async def stop(self) -> None: + pass + + async def send(self, msg: OutboundMessage) -> None: + raise asyncio.CancelledError("simulated cancellation") + + fake_config = SimpleNamespace( + channels=ChannelsConfig(send_max_retries=3), + providers=SimpleNamespace(groq=SimpleNamespace(api_key="")), + ) + + mgr = ChannelManager.__new__(ChannelManager) + mgr.config = fake_config + mgr.bus = MessageBus() + mgr.channels = {"cancelling": _CancellingChannel(fake_config, mgr.bus)} + mgr._dispatch_task = None + + msg = OutboundMessage(channel="cancelling", chat_id="123", content="test") + + with pytest.raises(asyncio.CancelledError): + await mgr._send_with_retry(mgr.channels["cancelling"], msg) + + +@pytest.mark.asyncio +async def test_send_with_retry_propagates_cancelled_error_during_sleep(): + """_send_with_retry should re-raise CancelledError during sleep.""" + call_count = 0 + + class _FailingChannel(BaseChannel): + name = "failing" + display_name = "Failing" + + async def start(self) -> None: + pass + + async def stop(self) -> None: + pass + + async def send(self, msg: OutboundMessage) -> None: + nonlocal call_count + call_count += 1 + raise RuntimeError("simulated failure") + + fake_config = SimpleNamespace( + channels=ChannelsConfig(send_max_retries=3), + providers=SimpleNamespace(groq=SimpleNamespace(api_key="")), + ) + + mgr = ChannelManager.__new__(ChannelManager) + mgr.config = fake_config + mgr.bus = MessageBus() + mgr.channels = {"failing": _FailingChannel(fake_config, mgr.bus)} + mgr._dispatch_task = None + + msg = OutboundMessage(channel="failing", chat_id="123", content="test") + + # Mock sleep to raise CancelledError + async def cancel_during_sleep(_): + raise asyncio.CancelledError("cancelled during sleep") + + with patch("nanobot.channels.manager.asyncio.sleep", side_effect=cancel_during_sleep): + with pytest.raises(asyncio.CancelledError): + await mgr._send_with_retry(mgr.channels["failing"], msg) + + # Should have attempted once before sleep was cancelled + assert call_count == 1 + + +# --------------------------------------------------------------------------- +# ChannelManager - lifecycle and getters +# --------------------------------------------------------------------------- + +class _ChannelWithAllowFrom(BaseChannel): + """Channel with configurable allow_from.""" + name = "withallow" + display_name = "With Allow" + + def __init__(self, config, bus, allow_from): + super().__init__(config, bus) + self.config.allow_from = allow_from + + async def start(self) -> None: + pass + + async def stop(self) -> None: + pass + + async def send(self, msg: OutboundMessage) -> None: + pass + + +class _StartableChannel(BaseChannel): + """Channel that tracks start/stop calls.""" + name = "startable" + display_name = "Startable" + + def __init__(self, config, bus): + super().__init__(config, bus) + self.started = False + self.stopped = False + + async def start(self) -> None: + self.started = True + + async def stop(self) -> None: + self.stopped = True + + async def send(self, msg: OutboundMessage) -> None: + pass + + +@pytest.mark.asyncio +async def test_validate_allow_from_raises_on_empty_list(): + """_validate_allow_from should raise SystemExit when allow_from is empty list.""" + fake_config = SimpleNamespace( + channels=ChannelsConfig(), + providers=SimpleNamespace(groq=SimpleNamespace(api_key="")), + ) + + mgr = ChannelManager.__new__(ChannelManager) + mgr.config = fake_config + mgr.channels = {"test": _ChannelWithAllowFrom(fake_config, None, [])} + mgr._dispatch_task = None + + with pytest.raises(SystemExit) as exc_info: + mgr._validate_allow_from() + + assert "empty allowFrom" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_validate_allow_from_passes_with_asterisk(): + """_validate_allow_from should not raise when allow_from contains '*'.""" + fake_config = SimpleNamespace( + channels=ChannelsConfig(), + providers=SimpleNamespace(groq=SimpleNamespace(api_key="")), + ) + + mgr = ChannelManager.__new__(ChannelManager) + mgr.config = fake_config + mgr.channels = {"test": _ChannelWithAllowFrom(fake_config, None, ["*"])} + mgr._dispatch_task = None + + # Should not raise + mgr._validate_allow_from() + + +@pytest.mark.asyncio +async def test_get_channel_returns_channel_if_exists(): + """get_channel should return the channel if it exists.""" + fake_config = SimpleNamespace( + channels=ChannelsConfig(), + providers=SimpleNamespace(groq=SimpleNamespace(api_key="")), + ) + + mgr = ChannelManager.__new__(ChannelManager) + mgr.config = fake_config + mgr.bus = MessageBus() + mgr.channels = {"telegram": _StartableChannel(fake_config, mgr.bus)} + mgr._dispatch_task = None + + assert mgr.get_channel("telegram") is not None + assert mgr.get_channel("nonexistent") is None + + +@pytest.mark.asyncio +async def test_get_status_returns_running_state(): + """get_status should return enabled and running state for each channel.""" + fake_config = SimpleNamespace( + channels=ChannelsConfig(), + providers=SimpleNamespace(groq=SimpleNamespace(api_key="")), + ) + + mgr = ChannelManager.__new__(ChannelManager) + mgr.config = fake_config + mgr.bus = MessageBus() + ch = _StartableChannel(fake_config, mgr.bus) + mgr.channels = {"startable": ch} + mgr._dispatch_task = None + + status = mgr.get_status() + + assert status["startable"]["enabled"] is True + assert status["startable"]["running"] is False # Not started yet + + +@pytest.mark.asyncio +async def test_enabled_channels_returns_channel_names(): + """enabled_channels should return list of enabled channel names.""" + fake_config = SimpleNamespace( + channels=ChannelsConfig(), + providers=SimpleNamespace(groq=SimpleNamespace(api_key="")), + ) + + mgr = ChannelManager.__new__(ChannelManager) + mgr.config = fake_config + mgr.bus = MessageBus() + mgr.channels = { + "telegram": _StartableChannel(fake_config, mgr.bus), + "slack": _StartableChannel(fake_config, mgr.bus), + } + mgr._dispatch_task = None + + enabled = mgr.enabled_channels + + assert "telegram" in enabled + assert "slack" in enabled + assert len(enabled) == 2 + + +@pytest.mark.asyncio +async def test_stop_all_cancels_dispatcher_and_stops_channels(): + """stop_all should cancel the dispatch task and stop all channels.""" + fake_config = SimpleNamespace( + channels=ChannelsConfig(), + providers=SimpleNamespace(groq=SimpleNamespace(api_key="")), + ) + + mgr = ChannelManager.__new__(ChannelManager) + mgr.config = fake_config + mgr.bus = MessageBus() + + ch = _StartableChannel(fake_config, mgr.bus) + mgr.channels = {"startable": ch} + + # Create a real cancelled task + async def dummy_task(): + while True: + await asyncio.sleep(1) + + dispatch_task = asyncio.create_task(dummy_task()) + mgr._dispatch_task = dispatch_task + + await mgr.stop_all() + + # Task should be cancelled + assert dispatch_task.cancelled() + # Channel should be stopped + assert ch.stopped is True + + +@pytest.mark.asyncio +async def test_start_channel_logs_error_on_failure(): + """_start_channel should log error when channel start fails.""" + class _FailingChannel(BaseChannel): + name = "failing" + display_name = "Failing" + + async def start(self) -> None: + raise RuntimeError("connection failed") + + async def stop(self) -> None: + pass + + async def send(self, msg: OutboundMessage) -> None: + pass + + fake_config = SimpleNamespace( + channels=ChannelsConfig(), + providers=SimpleNamespace(groq=SimpleNamespace(api_key="")), + ) + + mgr = ChannelManager.__new__(ChannelManager) + mgr.config = fake_config + mgr.bus = MessageBus() + mgr.channels = {} + mgr._dispatch_task = None + + ch = _FailingChannel(fake_config, mgr.bus) + + # Should not raise, just log error + await mgr._start_channel("failing", ch) + + +@pytest.mark.asyncio +async def test_stop_all_handles_channel_exception(): + """stop_all should handle exceptions when stopping channels gracefully.""" + class _StopFailingChannel(BaseChannel): + name = "stopfailing" + display_name = "Stop Failing" + + async def start(self) -> None: + pass + + async def stop(self) -> None: + raise RuntimeError("stop failed") + + async def send(self, msg: OutboundMessage) -> None: + pass + + fake_config = SimpleNamespace( + channels=ChannelsConfig(), + providers=SimpleNamespace(groq=SimpleNamespace(api_key="")), + ) + + mgr = ChannelManager.__new__(ChannelManager) + mgr.config = fake_config + mgr.bus = MessageBus() + mgr.channels = {"stopfailing": _StopFailingChannel(fake_config, mgr.bus)} + mgr._dispatch_task = None + + # Should not raise even if channel.stop() raises + await mgr.stop_all() + + +@pytest.mark.asyncio +async def test_start_all_no_channels_logs_warning(): + """start_all should log warning when no channels are enabled.""" + fake_config = SimpleNamespace( + channels=ChannelsConfig(), + providers=SimpleNamespace(groq=SimpleNamespace(api_key="")), + ) + + mgr = ChannelManager.__new__(ChannelManager) + mgr.config = fake_config + mgr.bus = MessageBus() + mgr.channels = {} # No channels + mgr._dispatch_task = None + + # Should return early without creating dispatch task + await mgr.start_all() + + assert mgr._dispatch_task is None + + +@pytest.mark.asyncio +async def test_start_all_creates_dispatch_task(): + """start_all should create the dispatch task when channels exist.""" + fake_config = SimpleNamespace( + channels=ChannelsConfig(), + providers=SimpleNamespace(groq=SimpleNamespace(api_key="")), + ) + + mgr = ChannelManager.__new__(ChannelManager) + mgr.config = fake_config + mgr.bus = MessageBus() + + ch = _StartableChannel(fake_config, mgr.bus) + mgr.channels = {"startable": ch} + mgr._dispatch_task = None + + # Cancel immediately after start to avoid running forever + async def cancel_after_start(): + await asyncio.sleep(0.01) + if mgr._dispatch_task: + mgr._dispatch_task.cancel() + + cancel_task = asyncio.create_task(cancel_after_start()) + + try: + await mgr.start_all() + except asyncio.CancelledError: + pass + finally: + cancel_task.cancel() + try: + await cancel_task + except asyncio.CancelledError: + pass + + # Dispatch task should have been created + assert mgr._dispatch_task is not None +