diff --git a/docs/chat-apps.md b/docs/chat-apps.md index 88242a5f7..58429c16b 100644 --- a/docs/chat-apps.md +++ b/docs/chat-apps.md @@ -51,6 +51,43 @@ Connect nanobot to your favorite chat platform. Want to build your own? See the nanobot gateway ``` +**Webhook mode (optional)** + +Telegram uses long polling by default. To receive updates through a webhook, expose +a public HTTPS URL that forwards to nanobot's local listener and set `mode` to +`webhook`: + +```json +{ + "channels": { + "telegram": { + "enabled": true, + "token": "YOUR_BOT_TOKEN", + "mode": "webhook", + "webhookUrl": "https://example.com/telegram", + "webhookListenHost": "127.0.0.1", + "webhookListenPort": 8081, + "webhookPath": "/telegram", + "webhookSecretToken": "CHANGE_ME_RANDOM_SECRET", + "webhookMaxConnections": 4, + "allowFrom": ["YOUR_USER_ID"] + } + } +} +``` + +> `webhookSecretToken` is required in webhook mode. Do not expose the local +> webhook listener directly to the public internet without a reverse proxy or +> tunnel in front of it. TLS/Host policy is handled by your proxy; nanobot only +> listens on `webhookListenHost:webhookListenPort` and validates Telegram's +> webhook secret token. `webhookMaxConnections` defaults to `4`; nanobot +> still serializes Telegram updates per conversation before forwarding them to +> the agent. +> +> `webhookUrl` is the public HTTPS URL registered with Telegram. +> `webhookPath` is the local path nanobot listens on. They often use the same +> path, but may differ when a reverse proxy or tunnel rewrites the request path. +
diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index c88f1080c..876985fe0 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -10,8 +10,9 @@ from contextlib import suppress from dataclasses import dataclass from pathlib import Path from typing import Any, Literal +from urllib.parse import urlparse -from pydantic import Field +from pydantic import Field, field_validator, model_validator from telegram import ( BotCommand, InlineKeyboardButton, @@ -225,11 +226,22 @@ class _StreamBuf: stream_id: str | None = None +@dataclass +class _QueuedTelegramUpdate: + """Telegram update staged for per-session ordered processing.""" + + kind: Literal["command", "message"] + update: Update + context: Any + sort_key: tuple[int, int] + + class TelegramConfig(Base): """Telegram channel configuration.""" enabled: bool = False token: str = "" + mode: Literal["polling", "webhook"] = "polling" allow_from: list[str] = Field(default_factory=list) proxy: str | None = None reply_to_message: bool = False @@ -241,13 +253,48 @@ class TelegramConfig(Base): # Enable inline keyboard buttons in Telegram messages. inline_keyboards: bool = False stream_edit_interval: float = Field(default=_STREAM_EDIT_INTERVAL_DEFAULT, ge=0.1) + webhook_url: str = "" + webhook_listen_host: str = "127.0.0.1" + webhook_listen_port: int = Field(default=8081, ge=1, le=65535) + webhook_path: str = "/telegram" + webhook_secret_token: str = "" + webhook_max_connections: int = Field(default=4, ge=1, le=100) + + @field_validator("webhook_path") + @classmethod + def webhook_path_must_start_with_slash(cls, value: str) -> str: + value = value.strip() or "/telegram" + if not value.startswith("/"): + raise ValueError('webhook_path must start with "/"') + return value + + @model_validator(mode="after") + def validate_webhook_config(self) -> "TelegramConfig": + if self.mode != "webhook": + return self + + url = self.webhook_url.strip() + if not url: + raise ValueError("webhook_url is required when Telegram mode is webhook") + parsed = urlparse(url) + if parsed.scheme != "https" or not parsed.netloc: + raise ValueError("webhook_url must be a public HTTPS URL") + secret = self.webhook_secret_token.strip() + if not secret: + raise ValueError("webhook_secret_token is required when Telegram mode is webhook") + if len(secret) > 256 or re.match(r"^[A-Za-z0-9_-]+$", secret) is None: + raise ValueError( + "webhook_secret_token must be 1-256 characters using only A-Z, a-z, 0-9, _ and -" + ) + return self class TelegramChannel(BaseChannel): """ - Telegram channel using long polling. + Telegram channel using long polling or webhook mode. - Simple and reliable - no webhook/public IP needed. + Long polling is the default. Webhook mode requires a public HTTPS URL and a + Telegram secret token. """ name = "telegram" @@ -294,6 +341,8 @@ class TelegramChannel(BaseChannel): self._bot_user_id: int | None = None self._bot_username: str | None = None self._stream_bufs: dict[str, _StreamBuf] = {} # chat_id -> streaming state + self._inbound_buffers: dict[str, list[_QueuedTelegramUpdate]] = {} + self._inbound_workers: dict[str, asyncio.Task] = {} def is_allowed(self, sender_id: str) -> bool: """Preserve Telegram's legacy id|username allowlist matching.""" @@ -326,7 +375,7 @@ class TelegramChannel(BaseChannel): return content async def start(self) -> None: - """Start the Telegram bot with long polling.""" + """Start the Telegram bot.""" if not self.config.token: self.logger.error("bot token not configured") return @@ -394,9 +443,12 @@ class TelegramChannel(BaseChannel): else: allowed_updates = ["message"] - self.logger.info("Starting bot (polling mode)...") + if self.config.mode == "webhook": + self.logger.info("Starting bot (webhook mode)...") + else: + self.logger.info("Starting bot (polling mode)...") - # Initialize and start polling + # Initialize and start receiving updates await self._app.initialize() await self._app.start() @@ -412,12 +464,26 @@ class TelegramChannel(BaseChannel): except Exception as e: self.logger.warning("Failed to register bot commands: {}", e) - # Start polling (this runs until stopped) - await self._app.updater.start_polling( - allowed_updates=allowed_updates, - drop_pending_updates=False, # Process pending messages on startup - error_callback=self._on_polling_error, - ) + if self.config.mode == "webhook": + # ``url_path`` is the local HTTP route. ``webhook_url`` is the + # public HTTPS URL Telegram calls; reverse proxies may rewrite it. + await self._app.updater.start_webhook( + listen=self.config.webhook_listen_host, + port=self.config.webhook_listen_port, + url_path=self.config.webhook_path.lstrip("/"), + webhook_url=self.config.webhook_url.strip(), + allowed_updates=allowed_updates, + drop_pending_updates=False, + secret_token=self.config.webhook_secret_token.strip(), + max_connections=self.config.webhook_max_connections, + ) + else: + # Start polling (this runs until stopped) + await self._app.updater.start_polling( + allowed_updates=allowed_updates, + drop_pending_updates=False, # Process pending messages on startup + error_callback=self._on_polling_error, + ) # Keep running until stopped while self._running: @@ -436,6 +502,11 @@ class TelegramChannel(BaseChannel): self._media_group_tasks.clear() self._media_group_buffers.clear() + for task in self._inbound_workers.values(): + task.cancel() + self._inbound_workers.clear() + self._inbound_buffers.clear() + if self._app: self.logger.info("Stopping bot...") await self._app.updater.stop() @@ -995,10 +1066,85 @@ class TelegramChannel(BaseChannel): if len(self._message_threads) > 1000: self._message_threads.pop(next(iter(self._message_threads))) + @staticmethod + def _queue_key_for_message(message) -> str: + """Return the final nanobot session key used for ordered Telegram ingress.""" + return TelegramChannel._derive_topic_session_key(message) or f"telegram:{message.chat_id}" + + @staticmethod + def _sort_key_for_update(update: Update) -> tuple[int, int]: + """Sort by chat message id first, then Telegram update id.""" + message = getattr(update, "message", None) + message_id = int(getattr(message, "message_id", 0) or 0) + update_id = int(getattr(update, "update_id", 0) or 0) + return (message_id, update_id) + + def _enqueue_ordered_update( + self, + *, + kind: Literal["command", "message"], + update: Update, + context: ContextTypes.DEFAULT_TYPE, + ) -> None: + """Stage a Telegram update behind a short per-session reorder window.""" + message = update.message + key = self._queue_key_for_message(message) + self._inbound_buffers.setdefault(key, []).append( + _QueuedTelegramUpdate( + kind=kind, + update=update, + context=context, + sort_key=self._sort_key_for_update(update), + ) + ) + if key not in self._inbound_workers: + self._inbound_workers[key] = asyncio.create_task( + self._drain_ordered_updates(key) + ) + + async def _drain_ordered_updates(self, key: str) -> None: + """Drain one Telegram session buffer in stable message order.""" + try: + while self._running: + await asyncio.sleep(0.2) + batch = self._inbound_buffers.get(key, []) + if not batch: + break + self._inbound_buffers[key] = [] + batch.sort(key=lambda item: item.sort_key) + for item in batch: + try: + if item.kind == "command": + await self._process_forward_command(item.update, item.context) + else: + await self._process_message_update(item.update, item.context) + except Exception as e: + self.logger.warning( + "Telegram queued update handling failed for {}: {}", + key, + e, + ) + if not self._inbound_buffers.get(key): + self._inbound_buffers.pop(key, None) + except asyncio.CancelledError: + raise + except Exception as e: + self.logger.warning("Telegram ordered update worker failed for {}: {}", key, e) + finally: + if not self._inbound_buffers.get(key): + self._inbound_workers.pop(key, None) + async def _forward_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Forward slash commands to the bus for unified handling in AgentLoop.""" if not update.message or not update.effective_user: return + if not self._running: + await self._process_forward_command(update, context) + return + self._enqueue_ordered_update(kind="command", update=update, context=context) + + async def _process_forward_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Process a queued slash command.""" message = update.message user = update.effective_user sender_id = self._sender_id(user) @@ -1027,6 +1173,13 @@ class TelegramChannel(BaseChannel): """Handle incoming messages (text, photos, voice, documents).""" if not update.message or not update.effective_user: return + if not self._running: + await self._process_message_update(update, context) + return + self._enqueue_ordered_update(kind="message", update=update, context=context) + + async def _process_message_update(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Process a queued Telegram message update.""" message = update.message user = update.effective_user diff --git a/pyproject.toml b/pyproject.toml index eaf57a2ad..ee27548c7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,7 @@ dependencies = [ "rich>=14.0.0,<15.0.0", "croniter>=6.0.0,<7.0.0", "dingtalk-stream>=0.24.0,<1.0.0", - "python-telegram-bot[socks]>=22.6,<23.0", + "python-telegram-bot[socks,webhooks]>=22.6,<23.0", "lark-oapi>=1.5.0,<2.0.0", "socksio>=1.0.0,<2.0.0", "python-socketio>=5.16.0,<6.0.0", diff --git a/tests/channels/test_telegram_channel.py b/tests/channels/test_telegram_channel.py index 362bfbea9..05e066895 100644 --- a/tests/channels/test_telegram_channel.py +++ b/tests/channels/test_telegram_channel.py @@ -1,3 +1,4 @@ +import asyncio from pathlib import Path from types import SimpleNamespace from unittest.mock import AsyncMock @@ -36,11 +37,19 @@ class _FakeUpdater: def __init__(self, on_start_polling) -> None: self._on_start_polling = on_start_polling self.start_polling_kwargs = None + self.start_webhook_kwargs = None async def start_polling(self, **kwargs) -> None: self.start_polling_kwargs = kwargs self._on_start_polling() + async def start_webhook(self, **kwargs) -> None: + self.start_webhook_kwargs = kwargs + self._on_start_polling() + + async def stop(self) -> None: + pass + class _FakeBot: def __init__(self) -> None: @@ -103,6 +112,12 @@ class _FakeApp: async def start(self) -> None: pass + async def stop(self) -> None: + pass + + async def shutdown(self) -> None: + pass + class _FakeBuilder: def __init__(self, app: _FakeApp) -> None: @@ -232,6 +247,98 @@ async def test_start_respects_custom_pool_config(monkeypatch) -> None: assert poll_req.kwargs["pool_timeout"] == 10.0 +def test_webhook_config_requires_https_url_and_secret() -> None: + with pytest.raises(ValueError, match="webhook_url is required"): + TelegramConfig(enabled=True, token="123:abc", mode="webhook") + + with pytest.raises(ValueError, match="public HTTPS URL"): + TelegramConfig( + enabled=True, + token="123:abc", + mode="webhook", + webhook_url="http://example.com/telegram", + webhook_secret_token="secret", + ) + + with pytest.raises(ValueError, match="webhook_secret_token is required"): + TelegramConfig( + enabled=True, + token="123:abc", + mode="webhook", + webhook_url="https://example.com/telegram", + ) + + +@pytest.mark.asyncio +async def test_start_webhook_mode(monkeypatch) -> None: + _FakeHTTPXRequest.clear() + config = TelegramConfig( + enabled=True, + token="123:abc", + allow_from=["*"], + mode="webhook", + webhook_url="https://example.com/telegram", + webhook_listen_host="127.0.0.1", + webhook_listen_port=8081, + webhook_path="/telegram", + webhook_secret_token="secret-token", + webhook_max_connections=1, + ) + bus = MessageBus() + channel = TelegramChannel(config, bus) + app = _FakeApp(lambda: setattr(channel, "_running", False)) + builder = _FakeBuilder(app) + + monkeypatch.setattr("nanobot.channels.telegram.HTTPXRequest", _FakeHTTPXRequest) + monkeypatch.setattr( + "nanobot.channels.telegram.Application", + SimpleNamespace(builder=lambda: builder), + ) + + await channel.start() + + assert app.updater.start_polling_kwargs is None + assert app.updater.start_webhook_kwargs == { + "listen": "127.0.0.1", + "port": 8081, + "url_path": "telegram", + "webhook_url": "https://example.com/telegram", + "allowed_updates": ["message"], + "drop_pending_updates": False, + "secret_token": "secret-token", + "max_connections": 1, + } + + +@pytest.mark.asyncio +async def test_running_message_handler_reorders_same_session_updates() -> None: + channel = TelegramChannel( + TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]), + MessageBus(), + ) + seen: list[int] = [] + + async def fake_process(update, context) -> None: + seen.append(update.message.message_id) + + channel._process_message_update = fake_process + channel._running = True + + first = _make_telegram_update(text="first") + first.update_id = 100 + first.message.message_id = 1 + second = _make_telegram_update(text="second") + second.update_id = 101 + second.message.message_id = 2 + + await channel._on_message(second, None) + await channel._on_message(first, None) + await asyncio.sleep(0.3) + channel._running = False + + assert seen == [1, 2] + + @pytest.mark.asyncio async def test_send_text_retries_on_timeout() -> None: """_send_text retries on TimedOut before succeeding."""