feat(telegram): add webhook support and ordered message queue

Introduce webhook mode for the Telegram channel and implement a session-based message reordering mechanism.

    Key changes:
    - Update `python-telegram-bot` dependency to include the `webhooks` extra.
    - Add `TelegramConfig` fields for webhook configuration, with validation rules for public HTTPS URLs and Telegram's secret token.
    - Implement `_enqueue_ordered_update` and `_drain_ordered_updates` in `TelegramChannel` to stage incoming messages and commands behind a short per-session reorder
  window, ensuring sequential delivery based on message and update IDs.
    - Configure `start_webhook` in `TelegramChannel.start()` when webhook mode is enabled.
    - Add unit tests for webhook config validations, webhook startup, and message reordering.
    - Document webhook configuration and reverse proxy details in `docs/chat-apps.md`.
This commit is contained in:
outlook84 2026-05-25 11:37:07 +08:00 committed by Xubin Ren
parent 172ec4d4c4
commit a4a2c55120
4 changed files with 310 additions and 13 deletions

View File

@ -51,6 +51,43 @@ Connect nanobot to your favorite chat platform. Want to build your own? See the
nanobot gateway
```
**Webhook mode (optional)**
Telegram uses long polling by default. To receive updates through a webhook, expose
a public HTTPS URL that forwards to nanobot's local listener and set `mode` to
`webhook`:
```json
{
"channels": {
"telegram": {
"enabled": true,
"token": "YOUR_BOT_TOKEN",
"mode": "webhook",
"webhookUrl": "https://example.com/telegram",
"webhookListenHost": "127.0.0.1",
"webhookListenPort": 8081,
"webhookPath": "/telegram",
"webhookSecretToken": "CHANGE_ME_RANDOM_SECRET",
"webhookMaxConnections": 4,
"allowFrom": ["YOUR_USER_ID"]
}
}
}
```
> `webhookSecretToken` is required in webhook mode. Do not expose the local
> webhook listener directly to the public internet without a reverse proxy or
> tunnel in front of it. TLS/Host policy is handled by your proxy; nanobot only
> listens on `webhookListenHost:webhookListenPort` and validates Telegram's
> webhook secret token. `webhookMaxConnections` defaults to `4`; nanobot
> still serializes Telegram updates per conversation before forwarding them to
> the agent.
>
> `webhookUrl` is the public HTTPS URL registered with Telegram.
> `webhookPath` is the local path nanobot listens on. They often use the same
> path, but may differ when a reverse proxy or tunnel rewrites the request path.
</details>
<details>

View File

@ -10,8 +10,9 @@ from contextlib import suppress
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Literal
from urllib.parse import urlparse
from pydantic import Field
from pydantic import Field, field_validator, model_validator
from telegram import (
BotCommand,
InlineKeyboardButton,
@ -225,11 +226,22 @@ class _StreamBuf:
stream_id: str | None = None
@dataclass
class _QueuedTelegramUpdate:
"""Telegram update staged for per-session ordered processing."""
kind: Literal["command", "message"]
update: Update
context: Any
sort_key: tuple[int, int]
class TelegramConfig(Base):
"""Telegram channel configuration."""
enabled: bool = False
token: str = ""
mode: Literal["polling", "webhook"] = "polling"
allow_from: list[str] = Field(default_factory=list)
proxy: str | None = None
reply_to_message: bool = False
@ -241,13 +253,48 @@ class TelegramConfig(Base):
# Enable inline keyboard buttons in Telegram messages.
inline_keyboards: bool = False
stream_edit_interval: float = Field(default=_STREAM_EDIT_INTERVAL_DEFAULT, ge=0.1)
webhook_url: str = ""
webhook_listen_host: str = "127.0.0.1"
webhook_listen_port: int = Field(default=8081, ge=1, le=65535)
webhook_path: str = "/telegram"
webhook_secret_token: str = ""
webhook_max_connections: int = Field(default=4, ge=1, le=100)
@field_validator("webhook_path")
@classmethod
def webhook_path_must_start_with_slash(cls, value: str) -> str:
value = value.strip() or "/telegram"
if not value.startswith("/"):
raise ValueError('webhook_path must start with "/"')
return value
@model_validator(mode="after")
def validate_webhook_config(self) -> "TelegramConfig":
if self.mode != "webhook":
return self
url = self.webhook_url.strip()
if not url:
raise ValueError("webhook_url is required when Telegram mode is webhook")
parsed = urlparse(url)
if parsed.scheme != "https" or not parsed.netloc:
raise ValueError("webhook_url must be a public HTTPS URL")
secret = self.webhook_secret_token.strip()
if not secret:
raise ValueError("webhook_secret_token is required when Telegram mode is webhook")
if len(secret) > 256 or re.match(r"^[A-Za-z0-9_-]+$", secret) is None:
raise ValueError(
"webhook_secret_token must be 1-256 characters using only A-Z, a-z, 0-9, _ and -"
)
return self
class TelegramChannel(BaseChannel):
"""
Telegram channel using long polling.
Telegram channel using long polling or webhook mode.
Simple and reliable - no webhook/public IP needed.
Long polling is the default. Webhook mode requires a public HTTPS URL and a
Telegram secret token.
"""
name = "telegram"
@ -294,6 +341,8 @@ class TelegramChannel(BaseChannel):
self._bot_user_id: int | None = None
self._bot_username: str | None = None
self._stream_bufs: dict[str, _StreamBuf] = {} # chat_id -> streaming state
self._inbound_buffers: dict[str, list[_QueuedTelegramUpdate]] = {}
self._inbound_workers: dict[str, asyncio.Task] = {}
def is_allowed(self, sender_id: str) -> bool:
"""Preserve Telegram's legacy id|username allowlist matching."""
@ -326,7 +375,7 @@ class TelegramChannel(BaseChannel):
return content
async def start(self) -> None:
"""Start the Telegram bot with long polling."""
"""Start the Telegram bot."""
if not self.config.token:
self.logger.error("bot token not configured")
return
@ -394,9 +443,12 @@ class TelegramChannel(BaseChannel):
else:
allowed_updates = ["message"]
self.logger.info("Starting bot (polling mode)...")
if self.config.mode == "webhook":
self.logger.info("Starting bot (webhook mode)...")
else:
self.logger.info("Starting bot (polling mode)...")
# Initialize and start polling
# Initialize and start receiving updates
await self._app.initialize()
await self._app.start()
@ -412,12 +464,26 @@ class TelegramChannel(BaseChannel):
except Exception as e:
self.logger.warning("Failed to register bot commands: {}", e)
# Start polling (this runs until stopped)
await self._app.updater.start_polling(
allowed_updates=allowed_updates,
drop_pending_updates=False, # Process pending messages on startup
error_callback=self._on_polling_error,
)
if self.config.mode == "webhook":
# ``url_path`` is the local HTTP route. ``webhook_url`` is the
# public HTTPS URL Telegram calls; reverse proxies may rewrite it.
await self._app.updater.start_webhook(
listen=self.config.webhook_listen_host,
port=self.config.webhook_listen_port,
url_path=self.config.webhook_path.lstrip("/"),
webhook_url=self.config.webhook_url.strip(),
allowed_updates=allowed_updates,
drop_pending_updates=False,
secret_token=self.config.webhook_secret_token.strip(),
max_connections=self.config.webhook_max_connections,
)
else:
# Start polling (this runs until stopped)
await self._app.updater.start_polling(
allowed_updates=allowed_updates,
drop_pending_updates=False, # Process pending messages on startup
error_callback=self._on_polling_error,
)
# Keep running until stopped
while self._running:
@ -436,6 +502,11 @@ class TelegramChannel(BaseChannel):
self._media_group_tasks.clear()
self._media_group_buffers.clear()
for task in self._inbound_workers.values():
task.cancel()
self._inbound_workers.clear()
self._inbound_buffers.clear()
if self._app:
self.logger.info("Stopping bot...")
await self._app.updater.stop()
@ -995,10 +1066,85 @@ class TelegramChannel(BaseChannel):
if len(self._message_threads) > 1000:
self._message_threads.pop(next(iter(self._message_threads)))
@staticmethod
def _queue_key_for_message(message) -> str:
"""Return the final nanobot session key used for ordered Telegram ingress."""
return TelegramChannel._derive_topic_session_key(message) or f"telegram:{message.chat_id}"
@staticmethod
def _sort_key_for_update(update: Update) -> tuple[int, int]:
"""Sort by chat message id first, then Telegram update id."""
message = getattr(update, "message", None)
message_id = int(getattr(message, "message_id", 0) or 0)
update_id = int(getattr(update, "update_id", 0) or 0)
return (message_id, update_id)
def _enqueue_ordered_update(
self,
*,
kind: Literal["command", "message"],
update: Update,
context: ContextTypes.DEFAULT_TYPE,
) -> None:
"""Stage a Telegram update behind a short per-session reorder window."""
message = update.message
key = self._queue_key_for_message(message)
self._inbound_buffers.setdefault(key, []).append(
_QueuedTelegramUpdate(
kind=kind,
update=update,
context=context,
sort_key=self._sort_key_for_update(update),
)
)
if key not in self._inbound_workers:
self._inbound_workers[key] = asyncio.create_task(
self._drain_ordered_updates(key)
)
async def _drain_ordered_updates(self, key: str) -> None:
"""Drain one Telegram session buffer in stable message order."""
try:
while self._running:
await asyncio.sleep(0.2)
batch = self._inbound_buffers.get(key, [])
if not batch:
break
self._inbound_buffers[key] = []
batch.sort(key=lambda item: item.sort_key)
for item in batch:
try:
if item.kind == "command":
await self._process_forward_command(item.update, item.context)
else:
await self._process_message_update(item.update, item.context)
except Exception as e:
self.logger.warning(
"Telegram queued update handling failed for {}: {}",
key,
e,
)
if not self._inbound_buffers.get(key):
self._inbound_buffers.pop(key, None)
except asyncio.CancelledError:
raise
except Exception as e:
self.logger.warning("Telegram ordered update worker failed for {}: {}", key, e)
finally:
if not self._inbound_buffers.get(key):
self._inbound_workers.pop(key, None)
async def _forward_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Forward slash commands to the bus for unified handling in AgentLoop."""
if not update.message or not update.effective_user:
return
if not self._running:
await self._process_forward_command(update, context)
return
self._enqueue_ordered_update(kind="command", update=update, context=context)
async def _process_forward_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Process a queued slash command."""
message = update.message
user = update.effective_user
sender_id = self._sender_id(user)
@ -1027,6 +1173,13 @@ class TelegramChannel(BaseChannel):
"""Handle incoming messages (text, photos, voice, documents)."""
if not update.message or not update.effective_user:
return
if not self._running:
await self._process_message_update(update, context)
return
self._enqueue_ordered_update(kind="message", update=update, context=context)
async def _process_message_update(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Process a queued Telegram message update."""
message = update.message
user = update.effective_user

View File

@ -37,7 +37,7 @@ dependencies = [
"rich>=14.0.0,<15.0.0",
"croniter>=6.0.0,<7.0.0",
"dingtalk-stream>=0.24.0,<1.0.0",
"python-telegram-bot[socks]>=22.6,<23.0",
"python-telegram-bot[socks,webhooks]>=22.6,<23.0",
"lark-oapi>=1.5.0,<2.0.0",
"socksio>=1.0.0,<2.0.0",
"python-socketio>=5.16.0,<6.0.0",

View File

@ -1,3 +1,4 @@
import asyncio
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock
@ -36,11 +37,19 @@ class _FakeUpdater:
def __init__(self, on_start_polling) -> None:
self._on_start_polling = on_start_polling
self.start_polling_kwargs = None
self.start_webhook_kwargs = None
async def start_polling(self, **kwargs) -> None:
self.start_polling_kwargs = kwargs
self._on_start_polling()
async def start_webhook(self, **kwargs) -> None:
self.start_webhook_kwargs = kwargs
self._on_start_polling()
async def stop(self) -> None:
pass
class _FakeBot:
def __init__(self) -> None:
@ -103,6 +112,12 @@ class _FakeApp:
async def start(self) -> None:
pass
async def stop(self) -> None:
pass
async def shutdown(self) -> None:
pass
class _FakeBuilder:
def __init__(self, app: _FakeApp) -> None:
@ -232,6 +247,98 @@ async def test_start_respects_custom_pool_config(monkeypatch) -> None:
assert poll_req.kwargs["pool_timeout"] == 10.0
def test_webhook_config_requires_https_url_and_secret() -> None:
with pytest.raises(ValueError, match="webhook_url is required"):
TelegramConfig(enabled=True, token="123:abc", mode="webhook")
with pytest.raises(ValueError, match="public HTTPS URL"):
TelegramConfig(
enabled=True,
token="123:abc",
mode="webhook",
webhook_url="http://example.com/telegram",
webhook_secret_token="secret",
)
with pytest.raises(ValueError, match="webhook_secret_token is required"):
TelegramConfig(
enabled=True,
token="123:abc",
mode="webhook",
webhook_url="https://example.com/telegram",
)
@pytest.mark.asyncio
async def test_start_webhook_mode(monkeypatch) -> None:
_FakeHTTPXRequest.clear()
config = TelegramConfig(
enabled=True,
token="123:abc",
allow_from=["*"],
mode="webhook",
webhook_url="https://example.com/telegram",
webhook_listen_host="127.0.0.1",
webhook_listen_port=8081,
webhook_path="/telegram",
webhook_secret_token="secret-token",
webhook_max_connections=1,
)
bus = MessageBus()
channel = TelegramChannel(config, bus)
app = _FakeApp(lambda: setattr(channel, "_running", False))
builder = _FakeBuilder(app)
monkeypatch.setattr("nanobot.channels.telegram.HTTPXRequest", _FakeHTTPXRequest)
monkeypatch.setattr(
"nanobot.channels.telegram.Application",
SimpleNamespace(builder=lambda: builder),
)
await channel.start()
assert app.updater.start_polling_kwargs is None
assert app.updater.start_webhook_kwargs == {
"listen": "127.0.0.1",
"port": 8081,
"url_path": "telegram",
"webhook_url": "https://example.com/telegram",
"allowed_updates": ["message"],
"drop_pending_updates": False,
"secret_token": "secret-token",
"max_connections": 1,
}
@pytest.mark.asyncio
async def test_running_message_handler_reorders_same_session_updates() -> None:
channel = TelegramChannel(
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
MessageBus(),
)
seen: list[int] = []
async def fake_process(update, context) -> None:
seen.append(update.message.message_id)
channel._process_message_update = fake_process
channel._running = True
first = _make_telegram_update(text="first")
first.update_id = 100
first.message.message_id = 1
second = _make_telegram_update(text="second")
second.update_id = 101
second.message.message_id = 2
await channel._on_message(second, None)
await channel._on_message(first, None)
await asyncio.sleep(0.3)
channel._running = False
assert seen == [1, 2]
@pytest.mark.asyncio
async def test_send_text_retries_on_timeout() -> None:
"""_send_text retries on TimedOut before succeeding."""