mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-04-30 06:45:55 +00:00
Problem: Modern LLMs (GPT-5.4, Claude, Gemini) produce markdown-heavy responses with numbered lists, headers, and nested formatting. The Telegram channel's _markdown_to_telegram_html() converter has gaps that leave these poorly formatted: 1. Numbered lists (1. 2. 3.) have zero handling — sent as raw text 2. Headers (# Title) are stripped to plain text, losing visual hierarchy 3. Mid-stream edits send raw markdown (users see **bold** and ### headers while the response generates, before the final HTML conversion) Root Cause: _markdown_to_telegram_html() handles bullets (- *) but skips numbered lists entirely. Headers are stripped of # but not given any emphasis. The streaming path in send_delta() sends buf.text as-is during mid-stream edits (plain text, no parse_mode) — only the final _stream_end edit converts to HTML. Fix: 1. Headers now render as <b>bold</b> in the final HTML (using placeholder markers that survive HTML escaping, restored after all other processing) 2. Numbered lists are normalized (extra whitespace after the dot is cleaned) 3. New _strip_md_block() function strips markdown syntax for readable plain-text preview during streaming mid-edits The final _stream_end HTML conversion is unchanged — it still produces full HTML with parse_mode=HTML. Only the intermediate edits are improved. Tests: Added 10 new tests covering: - Headers converting to bold HTML - Numbered list preservation and whitespace normalization - Headers with HTML special characters - Mixed formatting (headers + bullets + numbers + bold) - _strip_md_block for inline formatting, headers, bullets, numbers, links - Streaming mid-edit markdown stripping (initial send + edit)
1594 lines
55 KiB
Python
1594 lines
55 KiB
Python
import asyncio
|
||
from pathlib import Path
|
||
from types import SimpleNamespace
|
||
from unittest.mock import AsyncMock
|
||
|
||
import pytest
|
||
|
||
# Check optional Telegram dependencies before running tests
|
||
try:
|
||
import telegram # noqa: F401
|
||
except ImportError:
|
||
pytest.skip("Telegram dependencies not installed (python-telegram-bot)", allow_module_level=True)
|
||
|
||
from nanobot.bus.events import OutboundMessage
|
||
from nanobot.bus.queue import MessageBus
|
||
from nanobot.channels.telegram import TELEGRAM_REPLY_CONTEXT_MAX_LEN, TelegramChannel, _StreamBuf
|
||
from nanobot.channels.telegram import TelegramConfig
|
||
|
||
|
||
class _FakeHTTPXRequest:
|
||
instances: list["_FakeHTTPXRequest"] = []
|
||
|
||
def __init__(self, **kwargs) -> None:
|
||
self.kwargs = kwargs
|
||
self.__class__.instances.append(self)
|
||
|
||
@classmethod
|
||
def clear(cls) -> None:
|
||
cls.instances.clear()
|
||
|
||
|
||
class _FakeUpdater:
|
||
def __init__(self, on_start_polling) -> None:
|
||
self._on_start_polling = on_start_polling
|
||
self.start_polling_kwargs = None
|
||
|
||
async def start_polling(self, **kwargs) -> None:
|
||
self.start_polling_kwargs = kwargs
|
||
self._on_start_polling()
|
||
|
||
|
||
class _FakeBot:
|
||
def __init__(self) -> None:
|
||
self.sent_messages: list[dict] = []
|
||
self.sent_media: list[dict] = []
|
||
self.get_me_calls = 0
|
||
|
||
async def get_me(self):
|
||
self.get_me_calls += 1
|
||
return SimpleNamespace(id=999, username="nanobot_test")
|
||
|
||
async def set_my_commands(self, commands) -> None:
|
||
self.commands = commands
|
||
|
||
async def send_message(self, **kwargs):
|
||
self.sent_messages.append(kwargs)
|
||
return SimpleNamespace(message_id=len(self.sent_messages))
|
||
|
||
async def send_photo(self, **kwargs) -> None:
|
||
self.sent_media.append({"kind": "photo", **kwargs})
|
||
|
||
async def send_voice(self, **kwargs) -> None:
|
||
self.sent_media.append({"kind": "voice", **kwargs})
|
||
|
||
async def send_audio(self, **kwargs) -> None:
|
||
self.sent_media.append({"kind": "audio", **kwargs})
|
||
|
||
async def send_document(self, **kwargs) -> None:
|
||
self.sent_media.append({"kind": "document", **kwargs})
|
||
|
||
async def send_chat_action(self, **kwargs) -> None:
|
||
pass
|
||
|
||
async def get_file(self, file_id: str):
|
||
"""Return a fake file that 'downloads' to a path (for reply-to-media tests)."""
|
||
async def _fake_download(path) -> None:
|
||
pass
|
||
return SimpleNamespace(download_to_drive=_fake_download)
|
||
|
||
|
||
class _FakeApp:
|
||
def __init__(self, on_start_polling) -> None:
|
||
self.bot = _FakeBot()
|
||
self.updater = _FakeUpdater(on_start_polling)
|
||
self.handlers = []
|
||
self.error_handlers = []
|
||
|
||
def add_error_handler(self, handler) -> None:
|
||
self.error_handlers.append(handler)
|
||
|
||
def add_handler(self, handler) -> None:
|
||
self.handlers.append(handler)
|
||
|
||
async def initialize(self) -> None:
|
||
pass
|
||
|
||
async def start(self) -> None:
|
||
pass
|
||
|
||
|
||
class _FakeBuilder:
|
||
def __init__(self, app: _FakeApp) -> None:
|
||
self.app = app
|
||
self.token_value = None
|
||
self.request_value = None
|
||
self.get_updates_request_value = None
|
||
|
||
def token(self, token: str):
|
||
self.token_value = token
|
||
return self
|
||
|
||
def request(self, request):
|
||
self.request_value = request
|
||
return self
|
||
|
||
def get_updates_request(self, request):
|
||
self.get_updates_request_value = request
|
||
return self
|
||
|
||
def proxy(self, _proxy):
|
||
raise AssertionError("builder.proxy should not be called when request is set")
|
||
|
||
def get_updates_proxy(self, _proxy):
|
||
raise AssertionError("builder.get_updates_proxy should not be called when request is set")
|
||
|
||
def build(self):
|
||
return self.app
|
||
|
||
|
||
def _make_telegram_update(
|
||
*,
|
||
chat_type: str = "group",
|
||
text: str | None = None,
|
||
caption: str | None = None,
|
||
entities=None,
|
||
caption_entities=None,
|
||
reply_to_message=None,
|
||
location=None,
|
||
):
|
||
user = SimpleNamespace(id=12345, username="alice", first_name="Alice")
|
||
message = SimpleNamespace(
|
||
chat=SimpleNamespace(type=chat_type, is_forum=False),
|
||
chat_id=-100123,
|
||
text=text,
|
||
caption=caption,
|
||
entities=entities or [],
|
||
caption_entities=caption_entities or [],
|
||
reply_to_message=reply_to_message,
|
||
photo=None,
|
||
voice=None,
|
||
audio=None,
|
||
document=None,
|
||
location=location,
|
||
media_group_id=None,
|
||
message_thread_id=None,
|
||
message_id=1,
|
||
)
|
||
return SimpleNamespace(message=message, effective_user=user)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_start_creates_separate_pools_with_proxy(monkeypatch) -> None:
|
||
_FakeHTTPXRequest.clear()
|
||
config = TelegramConfig(
|
||
enabled=True,
|
||
token="123:abc",
|
||
allow_from=["*"],
|
||
proxy="http://127.0.0.1:7890",
|
||
)
|
||
bus = MessageBus()
|
||
channel = TelegramChannel(config, bus)
|
||
app = _FakeApp(lambda: setattr(channel, "_running", False))
|
||
builder = _FakeBuilder(app)
|
||
|
||
monkeypatch.setattr("nanobot.channels.telegram.HTTPXRequest", _FakeHTTPXRequest)
|
||
monkeypatch.setattr(
|
||
"nanobot.channels.telegram.Application",
|
||
SimpleNamespace(builder=lambda: builder),
|
||
)
|
||
|
||
await channel.start()
|
||
|
||
assert len(_FakeHTTPXRequest.instances) == 2
|
||
api_req, poll_req = _FakeHTTPXRequest.instances
|
||
assert api_req.kwargs["proxy"] == config.proxy
|
||
assert poll_req.kwargs["proxy"] == config.proxy
|
||
assert api_req.kwargs["connection_pool_size"] == 32
|
||
assert poll_req.kwargs["connection_pool_size"] == 4
|
||
assert builder.request_value is api_req
|
||
assert builder.get_updates_request_value is poll_req
|
||
assert callable(app.updater.start_polling_kwargs["error_callback"])
|
||
assert any(cmd.command == "status" for cmd in app.bot.commands)
|
||
assert any(cmd.command == "dream" for cmd in app.bot.commands)
|
||
assert any(cmd.command == "dream_log" for cmd in app.bot.commands)
|
||
assert any(cmd.command == "dream_restore" for cmd in app.bot.commands)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_start_respects_custom_pool_config(monkeypatch) -> None:
|
||
_FakeHTTPXRequest.clear()
|
||
config = TelegramConfig(
|
||
enabled=True,
|
||
token="123:abc",
|
||
allow_from=["*"],
|
||
connection_pool_size=32,
|
||
pool_timeout=10.0,
|
||
)
|
||
bus = MessageBus()
|
||
channel = TelegramChannel(config, bus)
|
||
app = _FakeApp(lambda: setattr(channel, "_running", False))
|
||
builder = _FakeBuilder(app)
|
||
|
||
monkeypatch.setattr("nanobot.channels.telegram.HTTPXRequest", _FakeHTTPXRequest)
|
||
monkeypatch.setattr(
|
||
"nanobot.channels.telegram.Application",
|
||
SimpleNamespace(builder=lambda: builder),
|
||
)
|
||
|
||
await channel.start()
|
||
|
||
api_req = _FakeHTTPXRequest.instances[0]
|
||
poll_req = _FakeHTTPXRequest.instances[1]
|
||
assert api_req.kwargs["connection_pool_size"] == 32
|
||
assert api_req.kwargs["pool_timeout"] == 10.0
|
||
assert poll_req.kwargs["pool_timeout"] == 10.0
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_text_retries_on_timeout() -> None:
|
||
"""_send_text retries on TimedOut before succeeding."""
|
||
from telegram.error import TimedOut
|
||
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
|
||
call_count = 0
|
||
original_send = channel._app.bot.send_message
|
||
|
||
async def flaky_send(**kwargs):
|
||
nonlocal call_count
|
||
call_count += 1
|
||
if call_count <= 2:
|
||
raise TimedOut()
|
||
return await original_send(**kwargs)
|
||
|
||
channel._app.bot.send_message = flaky_send
|
||
|
||
import nanobot.channels.telegram as tg_mod
|
||
orig_delay = tg_mod._SEND_RETRY_BASE_DELAY
|
||
tg_mod._SEND_RETRY_BASE_DELAY = 0.01
|
||
try:
|
||
await channel._send_text(123, "hello", None, {})
|
||
finally:
|
||
tg_mod._SEND_RETRY_BASE_DELAY = orig_delay
|
||
|
||
assert call_count == 3
|
||
assert len(channel._app.bot.sent_messages) == 1
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_text_gives_up_after_max_retries() -> None:
|
||
"""_send_text raises TimedOut after exhausting all retries."""
|
||
from telegram.error import TimedOut
|
||
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
|
||
async def always_timeout(**kwargs):
|
||
raise TimedOut()
|
||
|
||
channel._app.bot.send_message = always_timeout
|
||
|
||
import nanobot.channels.telegram as tg_mod
|
||
orig_delay = tg_mod._SEND_RETRY_BASE_DELAY
|
||
tg_mod._SEND_RETRY_BASE_DELAY = 0.01
|
||
try:
|
||
with pytest.raises(TimedOut):
|
||
await channel._send_text(123, "hello", None, {})
|
||
finally:
|
||
tg_mod._SEND_RETRY_BASE_DELAY = orig_delay
|
||
|
||
assert channel._app.bot.sent_messages == []
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_on_error_logs_network_issues_as_warning(monkeypatch) -> None:
|
||
from telegram.error import NetworkError
|
||
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
recorded: list[tuple[str, str]] = []
|
||
|
||
monkeypatch.setattr(
|
||
"nanobot.channels.telegram.logger.warning",
|
||
lambda message, error: recorded.append(("warning", message.format(error))),
|
||
)
|
||
monkeypatch.setattr(
|
||
"nanobot.channels.telegram.logger.error",
|
||
lambda message, error: recorded.append(("error", message.format(error))),
|
||
)
|
||
|
||
await channel._on_error(object(), SimpleNamespace(error=NetworkError("proxy disconnected")))
|
||
|
||
assert recorded == [("warning", "Telegram network issue: proxy disconnected")]
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_on_error_summarizes_empty_network_error(monkeypatch) -> None:
|
||
from telegram.error import NetworkError
|
||
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
recorded: list[tuple[str, str]] = []
|
||
|
||
monkeypatch.setattr(
|
||
"nanobot.channels.telegram.logger.warning",
|
||
lambda message, error: recorded.append(("warning", message.format(error))),
|
||
)
|
||
|
||
await channel._on_error(object(), SimpleNamespace(error=NetworkError("")))
|
||
|
||
assert recorded == [("warning", "Telegram network issue: NetworkError")]
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_on_error_keeps_non_network_exceptions_as_error(monkeypatch) -> None:
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
recorded: list[tuple[str, str]] = []
|
||
|
||
monkeypatch.setattr(
|
||
"nanobot.channels.telegram.logger.warning",
|
||
lambda message, error: recorded.append(("warning", message.format(error))),
|
||
)
|
||
monkeypatch.setattr(
|
||
"nanobot.channels.telegram.logger.error",
|
||
lambda message, error: recorded.append(("error", message.format(error))),
|
||
)
|
||
|
||
await channel._on_error(object(), SimpleNamespace(error=RuntimeError("boom")))
|
||
|
||
assert recorded == [("error", "Telegram error: boom")]
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_delta_stream_end_raises_and_keeps_buffer_on_failure() -> None:
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
channel._app.bot.edit_message_text = AsyncMock(side_effect=RuntimeError("boom"))
|
||
channel._stream_bufs["123"] = _StreamBuf(text="hello", message_id=7, last_edit=0.0)
|
||
|
||
with pytest.raises(RuntimeError, match="boom"):
|
||
await channel.send_delta("123", "", {"_stream_end": True})
|
||
|
||
assert "123" in channel._stream_bufs
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_delta_stream_end_treats_not_modified_as_success() -> None:
|
||
from telegram.error import BadRequest
|
||
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
channel._app.bot.edit_message_text = AsyncMock(side_effect=BadRequest("Message is not modified"))
|
||
channel._stream_bufs["123"] = _StreamBuf(text="hello", message_id=7, last_edit=0.0, stream_id="s:0")
|
||
|
||
await channel.send_delta("123", "", {"_stream_end": True, "_stream_id": "s:0"})
|
||
|
||
assert "123" not in channel._stream_bufs
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_delta_stream_end_does_not_fallback_on_network_timeout() -> None:
|
||
"""TimedOut during HTML edit should propagate, never fall back to plain text."""
|
||
from telegram.error import TimedOut
|
||
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
# _call_with_retry retries TimedOut up to 3 times, so the mock will be called
|
||
# multiple times – but all calls must be with parse_mode="HTML" (no plain fallback).
|
||
channel._app.bot.edit_message_text = AsyncMock(side_effect=TimedOut("network timeout"))
|
||
channel._stream_bufs["123"] = _StreamBuf(text="hello", message_id=7, last_edit=0.0)
|
||
|
||
with pytest.raises(TimedOut, match="network timeout"):
|
||
await channel.send_delta("123", "", {"_stream_end": True})
|
||
|
||
# Every call to edit_message_text must have used parse_mode="HTML" —
|
||
# no plain-text fallback call should have been made.
|
||
for call in channel._app.bot.edit_message_text.call_args_list:
|
||
assert call.kwargs.get("parse_mode") == "HTML"
|
||
# Buffer should still be present (not cleaned up on error)
|
||
assert "123" in channel._stream_bufs
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_delta_stream_end_does_not_fallback_on_network_error() -> None:
|
||
"""NetworkError during HTML edit should propagate, never fall back to plain text."""
|
||
from telegram.error import NetworkError
|
||
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
channel._app.bot.edit_message_text = AsyncMock(side_effect=NetworkError("connection reset"))
|
||
channel._stream_bufs["123"] = _StreamBuf(text="hello", message_id=7, last_edit=0.0)
|
||
|
||
with pytest.raises(NetworkError, match="connection reset"):
|
||
await channel.send_delta("123", "", {"_stream_end": True})
|
||
|
||
# Every call to edit_message_text must have used parse_mode="HTML" —
|
||
# no plain-text fallback call should have been made.
|
||
for call in channel._app.bot.edit_message_text.call_args_list:
|
||
assert call.kwargs.get("parse_mode") == "HTML"
|
||
# Buffer should still be present (not cleaned up on error)
|
||
assert "123" in channel._stream_bufs
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_delta_stream_end_falls_back_on_bad_request() -> None:
|
||
"""BadRequest (HTML parse error) should still trigger plain-text fallback."""
|
||
from telegram.error import BadRequest
|
||
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
|
||
# First call (HTML) raises BadRequest, second call (plain) succeeds
|
||
channel._app.bot.edit_message_text = AsyncMock(
|
||
side_effect=[BadRequest("Can't parse entities"), None]
|
||
)
|
||
channel._stream_bufs["123"] = _StreamBuf(text="hello <bad>", message_id=7, last_edit=0.0)
|
||
|
||
await channel.send_delta("123", "", {"_stream_end": True})
|
||
|
||
# edit_message_text should have been called twice: once for HTML, once for plain fallback
|
||
assert channel._app.bot.edit_message_text.call_count == 2
|
||
# Second call should not use parse_mode="HTML"
|
||
second_call_kwargs = channel._app.bot.edit_message_text.call_args_list[1].kwargs
|
||
assert "parse_mode" not in second_call_kwargs or second_call_kwargs.get("parse_mode") is None
|
||
# Buffer should be cleaned up on success
|
||
assert "123" not in channel._stream_bufs
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_delta_stream_end_splits_oversized_reply() -> None:
|
||
"""Final streamed reply exceeding Telegram limit is split into chunks.
|
||
|
||
The fix converts markdown to HTML first, then splits by 4096 (actual Telegram
|
||
limit), ensuring the edited message always fits within Telegram's constraint.
|
||
Previously, the code split by 4000 (TELEGRAM_MAX_MESSAGE_LEN) before HTML
|
||
conversion, which could still overflow when HTML tags were added.
|
||
"""
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
channel._app.bot.edit_message_text = AsyncMock()
|
||
channel._app.bot.send_message = AsyncMock(return_value=SimpleNamespace(message_id=99))
|
||
|
||
oversized = "x" * (4000 + 500)
|
||
channel._stream_bufs["123"] = _StreamBuf(text=oversized, message_id=7, last_edit=0.0)
|
||
|
||
await channel.send_delta("123", "", {"_stream_end": True})
|
||
|
||
channel._app.bot.edit_message_text.assert_called_once()
|
||
edit_text = channel._app.bot.edit_message_text.call_args.kwargs.get("text", "")
|
||
assert len(edit_text) <= 4096, f"edit_text length {len(edit_text)} exceeds Telegram's 4096 limit"
|
||
|
||
channel._app.bot.send_message.assert_called_once()
|
||
send_text = channel._app.bot.send_message.call_args.kwargs.get("text", "")
|
||
assert len(send_text) <= 4096
|
||
assert "123" not in channel._stream_bufs
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_delta_stream_end_html_expansion_does_not_overflow() -> None:
|
||
"""Markdown that expands when converted to HTML is still split correctly.
|
||
|
||
This is the actual bug from issue #3315: markdown like **bold** expands to
|
||
<b>bold</b>, adding ~33% characters. A 3600-char message with heavy markdown
|
||
could become 4800+ chars after HTML conversion, exceeding 4096 limit.
|
||
The fix converts to HTML first, THEN splits by 4096.
|
||
"""
|
||
from nanobot.channels.telegram import _markdown_to_telegram_html
|
||
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
channel._app.bot.edit_message_text = AsyncMock()
|
||
channel._app.bot.send_message = AsyncMock(return_value=SimpleNamespace(message_id=99))
|
||
|
||
markdown_text = "**bold** " * 400 # 3600 chars raw, expands ~33% to 4800 HTML
|
||
raw_len = len(markdown_text)
|
||
html_len = len(_markdown_to_telegram_html(markdown_text))
|
||
assert html_len > 4096, f"Test precondition failed: HTML should exceed 4096 (was {html_len})"
|
||
|
||
channel._stream_bufs["123"] = _StreamBuf(text=markdown_text, message_id=7, last_edit=0.0)
|
||
|
||
await channel.send_delta("123", "", {"_stream_end": True})
|
||
|
||
channel._app.bot.edit_message_text.assert_called_once()
|
||
edit_text = channel._app.bot.edit_message_text.call_args.kwargs.get("text", "")
|
||
assert len(edit_text) <= 4096, (
|
||
f"HTML text length {len(edit_text)} exceeds Telegram's 4096 limit. "
|
||
f"Raw was {raw_len}, HTML was {html_len}."
|
||
)
|
||
|
||
channel._app.bot.send_message.assert_called_once()
|
||
assert "123" not in channel._stream_bufs
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_delta_new_stream_id_replaces_stale_buffer() -> None:
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
channel._stream_bufs["123"] = _StreamBuf(
|
||
text="hello",
|
||
message_id=7,
|
||
last_edit=0.0,
|
||
stream_id="old:0",
|
||
)
|
||
|
||
await channel.send_delta("123", "world", {"_stream_delta": True, "_stream_id": "new:0"})
|
||
|
||
buf = channel._stream_bufs["123"]
|
||
assert buf.text == "world"
|
||
assert buf.stream_id == "new:0"
|
||
assert buf.message_id == 1
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_delta_incremental_edit_treats_not_modified_as_success() -> None:
|
||
from telegram.error import BadRequest
|
||
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
channel._stream_bufs["123"] = _StreamBuf(text="hello", message_id=7, last_edit=0.0, stream_id="s:0")
|
||
channel._app.bot.edit_message_text = AsyncMock(side_effect=BadRequest("Message is not modified"))
|
||
|
||
await channel.send_delta("123", "", {"_stream_delta": True, "_stream_id": "s:0"})
|
||
|
||
assert channel._stream_bufs["123"].last_edit > 0.0
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_delta_incremental_edit_splits_oversized_buffer() -> None:
|
||
"""Mid-stream overflow: once buf.text exceeds Telegram's limit, split into
|
||
chunks, edit the current message with the first chunk, and re-anchor the
|
||
buffer to a new message for the tail so further deltas keep streaming."""
|
||
from nanobot.channels.telegram import TELEGRAM_MAX_MESSAGE_LEN
|
||
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
channel._app.bot.edit_message_text = AsyncMock()
|
||
channel._app.bot.send_message = AsyncMock(return_value=SimpleNamespace(message_id=99))
|
||
|
||
oversized = "x" * (TELEGRAM_MAX_MESSAGE_LEN + 500)
|
||
channel._stream_bufs["123"] = _StreamBuf(
|
||
text=oversized, message_id=7, last_edit=0.0, stream_id="s:0"
|
||
)
|
||
|
||
await channel.send_delta("123", "y", {"_stream_delta": True, "_stream_id": "s:0"})
|
||
|
||
channel._app.bot.edit_message_text.assert_called_once()
|
||
edit_text = channel._app.bot.edit_message_text.call_args.kwargs.get("text", "")
|
||
assert len(edit_text) <= TELEGRAM_MAX_MESSAGE_LEN
|
||
|
||
channel._app.bot.send_message.assert_called_once()
|
||
buf = channel._stream_bufs["123"]
|
||
assert buf.message_id == 99
|
||
assert len(buf.text) <= TELEGRAM_MAX_MESSAGE_LEN
|
||
assert buf.last_edit > 0.0
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_delta_initial_send_keeps_message_in_thread() -> None:
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
|
||
await channel.send_delta(
|
||
"123",
|
||
"hello",
|
||
{"_stream_delta": True, "_stream_id": "s:0", "message_thread_id": 42},
|
||
)
|
||
|
||
assert channel._app.bot.sent_messages[0]["message_thread_id"] == 42
|
||
|
||
|
||
def test_derive_topic_session_key_uses_thread_id() -> None:
|
||
message = SimpleNamespace(
|
||
chat=SimpleNamespace(type="supergroup"),
|
||
chat_id=-100123,
|
||
message_thread_id=42,
|
||
)
|
||
|
||
assert TelegramChannel._derive_topic_session_key(message) == "telegram:-100123:topic:42"
|
||
|
||
|
||
def test_derive_topic_session_key_private_dm_thread() -> None:
|
||
"""Private DM threads (Telegram Threaded Mode) must get their own session key."""
|
||
message = SimpleNamespace(
|
||
chat=SimpleNamespace(type="private"),
|
||
chat_id=999,
|
||
message_thread_id=7,
|
||
)
|
||
assert TelegramChannel._derive_topic_session_key(message) == "telegram:999:topic:7"
|
||
|
||
|
||
def test_derive_topic_session_key_none_without_thread() -> None:
|
||
"""No thread id → no topic session key, regardless of chat type."""
|
||
for chat_type in ("private", "supergroup", "group"):
|
||
message = SimpleNamespace(
|
||
chat=SimpleNamespace(type=chat_type),
|
||
chat_id=123,
|
||
message_thread_id=None,
|
||
)
|
||
assert TelegramChannel._derive_topic_session_key(message) is None
|
||
|
||
|
||
def test_get_extension_falls_back_to_original_filename() -> None:
|
||
channel = TelegramChannel(TelegramConfig(), MessageBus())
|
||
|
||
assert channel._get_extension("file", None, "report.pdf") == ".pdf"
|
||
assert channel._get_extension("file", None, "archive.tar.gz") == ".tar.gz"
|
||
|
||
|
||
def test_telegram_group_policy_defaults_to_mention() -> None:
|
||
assert TelegramConfig().group_policy == "mention"
|
||
|
||
|
||
def test_is_allowed_accepts_legacy_telegram_id_username_formats() -> None:
|
||
channel = TelegramChannel(TelegramConfig(allow_from=["12345", "alice", "67890|bob"]), MessageBus())
|
||
|
||
assert channel.is_allowed("12345|carol") is True
|
||
assert channel.is_allowed("99999|alice") is True
|
||
assert channel.is_allowed("67890|bob") is True
|
||
|
||
|
||
def test_is_allowed_rejects_invalid_legacy_telegram_sender_shapes() -> None:
|
||
channel = TelegramChannel(TelegramConfig(allow_from=["alice"]), MessageBus())
|
||
|
||
assert channel.is_allowed("attacker|alice|extra") is False
|
||
assert channel.is_allowed("not-a-number|alice") is False
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_progress_keeps_message_in_topic() -> None:
|
||
config = TelegramConfig(enabled=True, token="123:abc", allow_from=["*"])
|
||
channel = TelegramChannel(config, MessageBus())
|
||
channel._app = _FakeApp(lambda: None)
|
||
|
||
await channel.send(
|
||
OutboundMessage(
|
||
channel="telegram",
|
||
chat_id="123",
|
||
content="hello",
|
||
metadata={"_progress": True, "message_thread_id": 42},
|
||
)
|
||
)
|
||
|
||
assert channel._app.bot.sent_messages[0]["message_thread_id"] == 42
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_reply_infers_topic_from_message_id_cache() -> None:
|
||
config = TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], reply_to_message=True)
|
||
channel = TelegramChannel(config, MessageBus())
|
||
channel._app = _FakeApp(lambda: None)
|
||
channel._message_threads[("123", 10)] = 42
|
||
|
||
await channel.send(
|
||
OutboundMessage(
|
||
channel="telegram",
|
||
chat_id="123",
|
||
content="hello",
|
||
metadata={"message_id": 10},
|
||
)
|
||
)
|
||
|
||
assert channel._app.bot.sent_messages[0]["message_thread_id"] == 42
|
||
assert channel._app.bot.sent_messages[0]["reply_parameters"].message_id == 10
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_remote_media_url_after_security_validation(monkeypatch) -> None:
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
monkeypatch.setattr("nanobot.channels.telegram.validate_url_target", lambda url: (True, ""))
|
||
|
||
await channel.send(
|
||
OutboundMessage(
|
||
channel="telegram",
|
||
chat_id="123",
|
||
content="",
|
||
media=["https://example.com/cat.jpg"],
|
||
)
|
||
)
|
||
|
||
assert channel._app.bot.sent_media == [
|
||
{
|
||
"kind": "photo",
|
||
"chat_id": 123,
|
||
"photo": "https://example.com/cat.jpg",
|
||
"reply_parameters": None,
|
||
}
|
||
]
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_blocks_unsafe_remote_media_url(monkeypatch) -> None:
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
monkeypatch.setattr(
|
||
"nanobot.channels.telegram.validate_url_target",
|
||
lambda url: (False, "Blocked: example.com resolves to private/internal address 127.0.0.1"),
|
||
)
|
||
|
||
await channel.send(
|
||
OutboundMessage(
|
||
channel="telegram",
|
||
chat_id="123",
|
||
content="",
|
||
media=["http://example.com/internal.jpg"],
|
||
)
|
||
)
|
||
|
||
assert channel._app.bot.sent_media == []
|
||
assert channel._app.bot.sent_messages == [
|
||
{
|
||
"chat_id": 123,
|
||
"text": "[Failed to send: internal.jpg]",
|
||
"reply_parameters": None,
|
||
}
|
||
]
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_group_policy_mention_ignores_unmentioned_group_message() -> None:
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="mention"),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
|
||
handled = []
|
||
|
||
async def capture_handle(**kwargs) -> None:
|
||
handled.append(kwargs)
|
||
|
||
channel._handle_message = capture_handle
|
||
channel._start_typing = lambda _chat_id: None
|
||
|
||
await channel._on_message(_make_telegram_update(text="hello everyone"), None)
|
||
|
||
assert handled == []
|
||
assert channel._app.bot.get_me_calls == 1
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_group_policy_mention_accepts_text_mention_and_caches_bot_identity() -> None:
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="mention"),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
|
||
handled = []
|
||
|
||
async def capture_handle(**kwargs) -> None:
|
||
handled.append(kwargs)
|
||
|
||
channel._handle_message = capture_handle
|
||
channel._start_typing = lambda _chat_id: None
|
||
|
||
mention = SimpleNamespace(type="mention", offset=0, length=13)
|
||
await channel._on_message(_make_telegram_update(text="@nanobot_test hi", entities=[mention]), None)
|
||
await channel._on_message(_make_telegram_update(text="@nanobot_test again", entities=[mention]), None)
|
||
|
||
assert len(handled) == 2
|
||
assert channel._app.bot.get_me_calls == 1
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_group_policy_mention_accepts_caption_mention() -> None:
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="mention"),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
|
||
handled = []
|
||
|
||
async def capture_handle(**kwargs) -> None:
|
||
handled.append(kwargs)
|
||
|
||
channel._handle_message = capture_handle
|
||
channel._start_typing = lambda _chat_id: None
|
||
|
||
mention = SimpleNamespace(type="mention", offset=0, length=13)
|
||
await channel._on_message(
|
||
_make_telegram_update(caption="@nanobot_test photo", caption_entities=[mention]),
|
||
None,
|
||
)
|
||
|
||
assert len(handled) == 1
|
||
assert handled[0]["content"] == "@nanobot_test photo"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_group_policy_mention_accepts_reply_to_bot() -> None:
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="mention"),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
|
||
handled = []
|
||
|
||
async def capture_handle(**kwargs) -> None:
|
||
handled.append(kwargs)
|
||
|
||
channel._handle_message = capture_handle
|
||
channel._start_typing = lambda _chat_id: None
|
||
|
||
reply = SimpleNamespace(from_user=SimpleNamespace(id=999))
|
||
await channel._on_message(_make_telegram_update(text="reply", reply_to_message=reply), None)
|
||
|
||
assert len(handled) == 1
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_group_policy_open_accepts_plain_group_message() -> None:
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
|
||
handled = []
|
||
|
||
async def capture_handle(**kwargs) -> None:
|
||
handled.append(kwargs)
|
||
|
||
channel._handle_message = capture_handle
|
||
channel._start_typing = lambda _chat_id: None
|
||
|
||
await channel._on_message(_make_telegram_update(text="hello group"), None)
|
||
|
||
assert len(handled) == 1
|
||
assert channel._app.bot.get_me_calls == 0
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_extract_reply_context_no_reply() -> None:
|
||
"""When there is no reply_to_message, _extract_reply_context returns None."""
|
||
channel = TelegramChannel(TelegramConfig(enabled=True, token="123:abc"), MessageBus())
|
||
message = SimpleNamespace(reply_to_message=None)
|
||
assert await channel._extract_reply_context(message) is None
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_extract_reply_context_with_text() -> None:
|
||
"""When reply has text, return prefixed string."""
|
||
channel = TelegramChannel(TelegramConfig(enabled=True, token="123:abc"), MessageBus())
|
||
channel._app = _FakeApp(lambda: None)
|
||
reply = SimpleNamespace(text="Hello world", caption=None, from_user=SimpleNamespace(id=2, username="testuser", first_name="Test"))
|
||
message = SimpleNamespace(reply_to_message=reply)
|
||
assert await channel._extract_reply_context(message) == "[Reply to @testuser: Hello world]"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_extract_reply_context_with_caption_only() -> None:
|
||
"""When reply has only caption (no text), caption is used."""
|
||
channel = TelegramChannel(TelegramConfig(enabled=True, token="123:abc"), MessageBus())
|
||
channel._app = _FakeApp(lambda: None)
|
||
reply = SimpleNamespace(text=None, caption="Photo caption", from_user=SimpleNamespace(id=2, username=None, first_name="Test"))
|
||
message = SimpleNamespace(reply_to_message=reply)
|
||
assert await channel._extract_reply_context(message) == "[Reply to Test: Photo caption]"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_extract_reply_context_truncation() -> None:
|
||
"""Reply text is truncated at TELEGRAM_REPLY_CONTEXT_MAX_LEN."""
|
||
channel = TelegramChannel(TelegramConfig(enabled=True, token="123:abc"), MessageBus())
|
||
channel._app = _FakeApp(lambda: None)
|
||
long_text = "x" * (TELEGRAM_REPLY_CONTEXT_MAX_LEN + 100)
|
||
reply = SimpleNamespace(text=long_text, caption=None, from_user=SimpleNamespace(id=2, username=None, first_name=None))
|
||
message = SimpleNamespace(reply_to_message=reply)
|
||
result = await channel._extract_reply_context(message)
|
||
assert result is not None
|
||
assert result.startswith("[Reply to: ")
|
||
assert result.endswith("...]")
|
||
assert len(result) == len("[Reply to: ]") + TELEGRAM_REPLY_CONTEXT_MAX_LEN + len("...")
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_extract_reply_context_no_text_returns_none() -> None:
|
||
"""When reply has no text/caption, _extract_reply_context returns None (media handled separately)."""
|
||
channel = TelegramChannel(TelegramConfig(enabled=True, token="123:abc"), MessageBus())
|
||
reply = SimpleNamespace(text=None, caption=None)
|
||
message = SimpleNamespace(reply_to_message=reply)
|
||
assert await channel._extract_reply_context(message) is None
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_on_message_includes_reply_context() -> None:
|
||
"""When user replies to a message, content passed to bus starts with reply context."""
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
handled = []
|
||
async def capture_handle(**kwargs) -> None:
|
||
handled.append(kwargs)
|
||
channel._handle_message = capture_handle
|
||
channel._start_typing = lambda _chat_id: None
|
||
|
||
reply = SimpleNamespace(text="Hello", message_id=2, from_user=SimpleNamespace(id=1))
|
||
update = _make_telegram_update(text="translate this", reply_to_message=reply)
|
||
await channel._on_message(update, None)
|
||
|
||
assert len(handled) == 1
|
||
assert handled[0]["content"].startswith("[Reply to: Hello]")
|
||
assert "translate this" in handled[0]["content"]
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_download_message_media_returns_path_when_download_succeeds(
|
||
monkeypatch, tmp_path
|
||
) -> None:
|
||
"""_download_message_media returns (paths, content_parts) when bot.get_file and download succeed."""
|
||
media_dir = tmp_path / "media" / "telegram"
|
||
media_dir.mkdir(parents=True)
|
||
monkeypatch.setattr(
|
||
"nanobot.channels.telegram.get_media_dir",
|
||
lambda channel=None: media_dir if channel else tmp_path / "media",
|
||
)
|
||
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
channel._app.bot.get_file = AsyncMock(
|
||
return_value=SimpleNamespace(download_to_drive=AsyncMock(return_value=None))
|
||
)
|
||
|
||
msg = SimpleNamespace(
|
||
photo=[SimpleNamespace(file_id="fid123", mime_type="image/jpeg")],
|
||
voice=None,
|
||
audio=None,
|
||
document=None,
|
||
video=None,
|
||
video_note=None,
|
||
animation=None,
|
||
)
|
||
paths, parts = await channel._download_message_media(msg)
|
||
assert len(paths) == 1
|
||
assert len(parts) == 1
|
||
assert "fid123" in paths[0]
|
||
assert "[image:" in parts[0]
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_download_message_media_uses_file_unique_id_when_available(
|
||
monkeypatch, tmp_path
|
||
) -> None:
|
||
media_dir = tmp_path / "media" / "telegram"
|
||
media_dir.mkdir(parents=True)
|
||
monkeypatch.setattr(
|
||
"nanobot.channels.telegram.get_media_dir",
|
||
lambda channel=None: media_dir if channel else tmp_path / "media",
|
||
)
|
||
|
||
downloaded: dict[str, str] = {}
|
||
|
||
async def _download_to_drive(path: str) -> None:
|
||
downloaded["path"] = path
|
||
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
app = _FakeApp(lambda: None)
|
||
app.bot.get_file = AsyncMock(
|
||
return_value=SimpleNamespace(download_to_drive=_download_to_drive)
|
||
)
|
||
channel._app = app
|
||
|
||
msg = SimpleNamespace(
|
||
photo=[
|
||
SimpleNamespace(
|
||
file_id="file-id-that-should-not-be-used",
|
||
file_unique_id="stable-unique-id",
|
||
mime_type="image/jpeg",
|
||
file_name=None,
|
||
)
|
||
],
|
||
voice=None,
|
||
audio=None,
|
||
document=None,
|
||
video=None,
|
||
video_note=None,
|
||
animation=None,
|
||
)
|
||
|
||
paths, parts = await channel._download_message_media(msg)
|
||
|
||
assert downloaded["path"].endswith("stable-unique-id.jpg")
|
||
assert paths == [str(media_dir / "stable-unique-id.jpg")]
|
||
assert parts == [f"[image: {media_dir / 'stable-unique-id.jpg'}]"]
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_on_message_attaches_reply_to_media_when_available(monkeypatch, tmp_path) -> None:
|
||
"""When user replies to a message with media, that media is downloaded and attached to the turn."""
|
||
media_dir = tmp_path / "media" / "telegram"
|
||
media_dir.mkdir(parents=True)
|
||
monkeypatch.setattr(
|
||
"nanobot.channels.telegram.get_media_dir",
|
||
lambda channel=None: media_dir if channel else tmp_path / "media",
|
||
)
|
||
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"),
|
||
MessageBus(),
|
||
)
|
||
app = _FakeApp(lambda: None)
|
||
app.bot.get_file = AsyncMock(
|
||
return_value=SimpleNamespace(download_to_drive=AsyncMock(return_value=None))
|
||
)
|
||
channel._app = app
|
||
handled = []
|
||
async def capture_handle(**kwargs) -> None:
|
||
handled.append(kwargs)
|
||
channel._handle_message = capture_handle
|
||
channel._start_typing = lambda _chat_id: None
|
||
|
||
reply_with_photo = SimpleNamespace(
|
||
text=None,
|
||
caption=None,
|
||
photo=[SimpleNamespace(file_id="reply_photo_fid", mime_type="image/jpeg")],
|
||
document=None,
|
||
voice=None,
|
||
audio=None,
|
||
video=None,
|
||
video_note=None,
|
||
animation=None,
|
||
)
|
||
update = _make_telegram_update(
|
||
text="what is the image?",
|
||
reply_to_message=reply_with_photo,
|
||
)
|
||
await channel._on_message(update, None)
|
||
|
||
assert len(handled) == 1
|
||
assert handled[0]["content"].startswith("[Reply to: [image:")
|
||
assert "what is the image?" in handled[0]["content"]
|
||
assert len(handled[0]["media"]) == 1
|
||
assert "reply_photo_fid" in handled[0]["media"][0]
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_on_message_reply_to_media_fallback_when_download_fails() -> None:
|
||
"""When reply has media but download fails, no media attached and no reply tag."""
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
channel._app.bot.get_file = None
|
||
handled = []
|
||
async def capture_handle(**kwargs) -> None:
|
||
handled.append(kwargs)
|
||
channel._handle_message = capture_handle
|
||
channel._start_typing = lambda _chat_id: None
|
||
|
||
reply_with_photo = SimpleNamespace(
|
||
text=None,
|
||
caption=None,
|
||
photo=[SimpleNamespace(file_id="x", mime_type="image/jpeg")],
|
||
document=None,
|
||
voice=None,
|
||
audio=None,
|
||
video=None,
|
||
video_note=None,
|
||
animation=None,
|
||
)
|
||
update = _make_telegram_update(text="what is this?", reply_to_message=reply_with_photo)
|
||
await channel._on_message(update, None)
|
||
|
||
assert len(handled) == 1
|
||
assert "what is this?" in handled[0]["content"]
|
||
assert handled[0]["media"] == []
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_on_message_reply_to_caption_and_media(monkeypatch, tmp_path) -> None:
|
||
"""When replying to a message with caption + photo, both text context and media are included."""
|
||
media_dir = tmp_path / "media" / "telegram"
|
||
media_dir.mkdir(parents=True)
|
||
monkeypatch.setattr(
|
||
"nanobot.channels.telegram.get_media_dir",
|
||
lambda channel=None: media_dir if channel else tmp_path / "media",
|
||
)
|
||
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"),
|
||
MessageBus(),
|
||
)
|
||
app = _FakeApp(lambda: None)
|
||
app.bot.get_file = AsyncMock(
|
||
return_value=SimpleNamespace(download_to_drive=AsyncMock(return_value=None))
|
||
)
|
||
channel._app = app
|
||
handled = []
|
||
async def capture_handle(**kwargs) -> None:
|
||
handled.append(kwargs)
|
||
channel._handle_message = capture_handle
|
||
channel._start_typing = lambda _chat_id: None
|
||
|
||
reply_with_caption_and_photo = SimpleNamespace(
|
||
text=None,
|
||
caption="A cute cat",
|
||
photo=[SimpleNamespace(file_id="cat_fid", mime_type="image/jpeg")],
|
||
document=None,
|
||
voice=None,
|
||
audio=None,
|
||
video=None,
|
||
video_note=None,
|
||
animation=None,
|
||
)
|
||
update = _make_telegram_update(
|
||
text="what breed is this?",
|
||
reply_to_message=reply_with_caption_and_photo,
|
||
)
|
||
await channel._on_message(update, None)
|
||
|
||
assert len(handled) == 1
|
||
assert "[Reply to: A cute cat]" in handled[0]["content"]
|
||
assert "what breed is this?" in handled[0]["content"]
|
||
assert len(handled[0]["media"]) == 1
|
||
assert "cat_fid" in handled[0]["media"][0]
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_forward_command_does_not_inject_reply_context() -> None:
|
||
"""Slash commands forwarded via _forward_command must not include reply context."""
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
handled = []
|
||
async def capture_handle(**kwargs) -> None:
|
||
handled.append(kwargs)
|
||
channel._handle_message = capture_handle
|
||
|
||
reply = SimpleNamespace(text="some old message", message_id=2, from_user=SimpleNamespace(id=1))
|
||
update = _make_telegram_update(text="/new", reply_to_message=reply)
|
||
await channel._forward_command(update, None)
|
||
|
||
assert len(handled) == 1
|
||
assert handled[0]["content"] == "/new"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_forward_command_preserves_dream_log_args_and_strips_bot_suffix() -> None:
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
handled = []
|
||
|
||
async def capture_handle(**kwargs) -> None:
|
||
handled.append(kwargs)
|
||
|
||
channel._handle_message = capture_handle
|
||
update = _make_telegram_update(text="/dream-log@nanobot_test deadbeef", reply_to_message=None)
|
||
|
||
await channel._forward_command(update, None)
|
||
|
||
assert len(handled) == 1
|
||
assert handled[0]["content"] == "/dream-log deadbeef"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_forward_command_normalizes_telegram_safe_dream_aliases() -> None:
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
handled = []
|
||
|
||
async def capture_handle(**kwargs) -> None:
|
||
handled.append(kwargs)
|
||
|
||
channel._handle_message = capture_handle
|
||
update = _make_telegram_update(text="/dream_restore@nanobot_test deadbeef", reply_to_message=None)
|
||
|
||
await channel._forward_command(update, None)
|
||
|
||
assert len(handled) == 1
|
||
assert handled[0]["content"] == "/dream-restore deadbeef"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_on_help_includes_restart_command() -> None:
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"),
|
||
MessageBus(),
|
||
)
|
||
update = _make_telegram_update(text="/help", chat_type="private")
|
||
update.message.reply_text = AsyncMock()
|
||
|
||
await channel._on_help(update, None)
|
||
|
||
update.message.reply_text.assert_awaited_once()
|
||
help_text = update.message.reply_text.await_args.args[0]
|
||
assert "/restart" in help_text
|
||
assert "/status" in help_text
|
||
assert "/dream" in help_text
|
||
assert "/dream-log" in help_text
|
||
assert "/dream-restore" in help_text
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_on_message_location_content() -> None:
|
||
"""Location messages are forwarded as [location: lat, lon] content."""
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
handled = []
|
||
async def capture_handle(**kwargs) -> None:
|
||
handled.append(kwargs)
|
||
channel._handle_message = capture_handle
|
||
channel._start_typing = lambda _chat_id: None
|
||
|
||
location = SimpleNamespace(latitude=48.8566, longitude=2.3522)
|
||
update = _make_telegram_update(location=location)
|
||
await channel._on_message(update, None)
|
||
|
||
assert len(handled) == 1
|
||
assert handled[0]["content"] == "[location: 48.8566, 2.3522]"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_on_message_location_with_text() -> None:
|
||
"""Location messages with accompanying text include both in content."""
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
handled = []
|
||
async def capture_handle(**kwargs) -> None:
|
||
handled.append(kwargs)
|
||
channel._handle_message = capture_handle
|
||
channel._start_typing = lambda _chat_id: None
|
||
|
||
location = SimpleNamespace(latitude=51.5074, longitude=-0.1278)
|
||
update = _make_telegram_update(text="meet me here", location=location)
|
||
await channel._on_message(update, None)
|
||
|
||
assert len(handled) == 1
|
||
assert "meet me here" in handled[0]["content"]
|
||
assert "[location: 51.5074, -0.1278]" in handled[0]["content"]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Tests for retry amplification fix (issue #3050)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_text_does_not_fallback_on_network_timeout() -> None:
|
||
"""TimedOut should propagate immediately, NOT trigger plain-text fallback.
|
||
|
||
Before the fix, _send_text caught ALL exceptions (including TimedOut)
|
||
and retried as plain text, doubling connection demand during pool
|
||
exhaustion — see issue #3050.
|
||
"""
|
||
from telegram.error import TimedOut
|
||
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
|
||
call_count = 0
|
||
|
||
async def always_timeout(**kwargs):
|
||
nonlocal call_count
|
||
call_count += 1
|
||
raise TimedOut()
|
||
|
||
channel._app.bot.send_message = always_timeout
|
||
|
||
import nanobot.channels.telegram as tg_mod
|
||
orig_delay = tg_mod._SEND_RETRY_BASE_DELAY
|
||
tg_mod._SEND_RETRY_BASE_DELAY = 0.01
|
||
try:
|
||
with pytest.raises(TimedOut):
|
||
await channel._send_text(123, "hello", None, {})
|
||
finally:
|
||
tg_mod._SEND_RETRY_BASE_DELAY = orig_delay
|
||
|
||
# With the fix: only _call_with_retry's 3 HTML attempts (no plain fallback).
|
||
# Before the fix: 3 HTML + 3 plain = 6 attempts.
|
||
assert call_count == 3, (
|
||
f"Expected 3 calls (HTML retries only), got {call_count} "
|
||
"(plain-text fallback should not trigger on TimedOut)"
|
||
)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_text_does_not_fallback_on_network_error() -> None:
|
||
"""NetworkError should propagate immediately, NOT trigger plain-text fallback."""
|
||
from telegram.error import NetworkError
|
||
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
|
||
call_count = 0
|
||
|
||
async def always_network_error(**kwargs):
|
||
nonlocal call_count
|
||
call_count += 1
|
||
raise NetworkError("Connection reset")
|
||
|
||
channel._app.bot.send_message = always_network_error
|
||
|
||
import nanobot.channels.telegram as tg_mod
|
||
orig_delay = tg_mod._SEND_RETRY_BASE_DELAY
|
||
tg_mod._SEND_RETRY_BASE_DELAY = 0.01
|
||
try:
|
||
with pytest.raises(NetworkError):
|
||
await channel._send_text(123, "hello", None, {})
|
||
finally:
|
||
tg_mod._SEND_RETRY_BASE_DELAY = orig_delay
|
||
|
||
# _call_with_retry does NOT retry NetworkError (only TimedOut/RetryAfter),
|
||
# so it raises after 1 attempt. The fix prevents plain-text fallback.
|
||
# Before the fix: 1 HTML + 1 plain = 2. After the fix: 1 HTML only.
|
||
assert call_count == 1, (
|
||
f"Expected 1 call (HTML only, no plain fallback), got {call_count}"
|
||
)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_text_falls_back_on_bad_request() -> None:
|
||
"""BadRequest (HTML parse error) should still trigger plain-text fallback."""
|
||
from telegram.error import BadRequest
|
||
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
|
||
original_send = channel._app.bot.send_message
|
||
html_call_count = 0
|
||
|
||
async def html_fails(**kwargs):
|
||
nonlocal html_call_count
|
||
if kwargs.get("parse_mode") == "HTML":
|
||
html_call_count += 1
|
||
raise BadRequest("Can't parse entities")
|
||
return await original_send(**kwargs)
|
||
|
||
channel._app.bot.send_message = html_fails
|
||
|
||
import nanobot.channels.telegram as tg_mod
|
||
orig_delay = tg_mod._SEND_RETRY_BASE_DELAY
|
||
tg_mod._SEND_RETRY_BASE_DELAY = 0.01
|
||
try:
|
||
await channel._send_text(123, "hello **world**", None, {})
|
||
finally:
|
||
tg_mod._SEND_RETRY_BASE_DELAY = orig_delay
|
||
|
||
# HTML attempt failed with BadRequest → fallback to plain text succeeds.
|
||
assert html_call_count == 1, f"Expected 1 HTML attempt, got {html_call_count}"
|
||
assert len(channel._app.bot.sent_messages) == 1
|
||
# Plain text send should NOT have parse_mode
|
||
assert channel._app.bot.sent_messages[0].get("parse_mode") is None
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_text_bad_request_plain_fallback_exhausted() -> None:
|
||
"""When both HTML and plain-text fallback fail with BadRequest, the error propagates."""
|
||
from telegram.error import BadRequest
|
||
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
|
||
call_count = 0
|
||
|
||
async def always_bad_request(**kwargs):
|
||
nonlocal call_count
|
||
call_count += 1
|
||
raise BadRequest("Bad request")
|
||
|
||
channel._app.bot.send_message = always_bad_request
|
||
|
||
import nanobot.channels.telegram as tg_mod
|
||
orig_delay = tg_mod._SEND_RETRY_BASE_DELAY
|
||
tg_mod._SEND_RETRY_BASE_DELAY = 0.01
|
||
try:
|
||
with pytest.raises(BadRequest):
|
||
await channel._send_text(123, "hello", None, {})
|
||
finally:
|
||
tg_mod._SEND_RETRY_BASE_DELAY = orig_delay
|
||
|
||
# _call_with_retry does NOT retry BadRequest (only TimedOut/RetryAfter),
|
||
# so HTML fails after 1 attempt → fallback to plain also fails after 1 attempt.
|
||
# Before the fix: 2 total. After the fix: still 2 (BadRequest SHOULD fallback).
|
||
assert call_count == 2, f"Expected 2 calls (1 HTML + 1 plain), got {call_count}"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# _markdown_to_telegram_html formatting tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def test_markdown_to_html_headers_become_bold() -> None:
|
||
from nanobot.channels.telegram import _markdown_to_telegram_html
|
||
|
||
assert _markdown_to_telegram_html("# Title") == "<b>Title</b>"
|
||
assert _markdown_to_telegram_html("## Subtitle") == "<b>Subtitle</b>"
|
||
assert _markdown_to_telegram_html("### Deep") == "<b>Deep</b>"
|
||
|
||
|
||
def test_markdown_to_html_numbered_lists_preserved() -> None:
|
||
from nanobot.channels.telegram import _markdown_to_telegram_html
|
||
|
||
text = "1. First\n2. Second\n3. Third"
|
||
result = _markdown_to_telegram_html(text)
|
||
assert "1. First" in result
|
||
assert "2. Second" in result
|
||
assert "3. Third" in result
|
||
|
||
|
||
def test_markdown_to_html_numbered_list_normalizes_whitespace() -> None:
|
||
from nanobot.channels.telegram import _markdown_to_telegram_html
|
||
|
||
# Extra spaces after dot should be normalized
|
||
text = "1. Lots of space\n2. Two spaces"
|
||
result = _markdown_to_telegram_html(text)
|
||
assert "1. Lots of space" in result
|
||
assert "2. Two spaces" in result
|
||
|
||
|
||
def test_markdown_to_html_headers_survive_html_escaping() -> None:
|
||
"""Headers containing special HTML chars should still render as bold."""
|
||
from nanobot.channels.telegram import _markdown_to_telegram_html
|
||
|
||
result = _markdown_to_telegram_html("# A < B & C > D")
|
||
assert "<b>A < B & C > D</b>" == result
|
||
|
||
|
||
def test_markdown_to_html_mixed_formatting() -> None:
|
||
"""Headers, bullets, numbered lists, and bold coexist correctly."""
|
||
from nanobot.channels.telegram import _markdown_to_telegram_html
|
||
|
||
text = "# Overview\n\n- bullet one\n- bullet two\n\n1. step one\n2. step two\n\n**bold text**"
|
||
result = _markdown_to_telegram_html(text)
|
||
assert "<b>Overview</b>" in result
|
||
assert "\u2022 bullet one" in result
|
||
assert "1. step one" in result
|
||
assert "<b>bold text</b>" in result
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# _strip_md_block tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def test_strip_md_block_removes_inline_formatting() -> None:
|
||
from nanobot.channels.telegram import _strip_md_block
|
||
|
||
text = "**bold** and _italic_ and ~~struck~~"
|
||
result = _strip_md_block(text)
|
||
assert result == "bold and italic and struck"
|
||
|
||
|
||
def test_strip_md_block_strips_headers() -> None:
|
||
from nanobot.channels.telegram import _strip_md_block
|
||
|
||
assert _strip_md_block("## Title\nBody") == "Title\nBody"
|
||
|
||
|
||
def test_strip_md_block_converts_bullets_and_numbers() -> None:
|
||
from nanobot.channels.telegram import _strip_md_block
|
||
|
||
text = "- item a\n1. item b\n2. item c"
|
||
result = _strip_md_block(text)
|
||
assert "\u2022 item a" in result
|
||
assert "1. item b" in result
|
||
assert "2. item c" in result
|
||
|
||
|
||
def test_strip_md_block_strips_links() -> None:
|
||
from nanobot.channels.telegram import _strip_md_block
|
||
|
||
assert _strip_md_block("[click here](https://example.com)") == "click here"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Streaming mid-edit uses _strip_md_block
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_delta_mid_stream_strips_markdown() -> None:
|
||
"""Mid-stream edits should strip markdown so users see clean text."""
|
||
channel = TelegramChannel(
|
||
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||
MessageBus(),
|
||
)
|
||
channel._app = _FakeApp(lambda: None)
|
||
channel._app.bot.send_message = AsyncMock(return_value=SimpleNamespace(message_id=42))
|
||
channel._app.bot.edit_message_text = AsyncMock()
|
||
|
||
# Initial send with markdown
|
||
await channel.send_delta("999", "**hello** world")
|
||
sent_text = channel._app.bot.send_message.call_args.kwargs.get("text", "")
|
||
# Should NOT contain raw markdown asterisks
|
||
assert "**" not in sent_text
|
||
assert "hello world" in sent_text
|
||
|
||
# Mid-stream edit
|
||
import time
|
||
buf = channel._stream_bufs["999"]
|
||
buf.last_edit = time.monotonic() - 10 # force edit interval
|
||
await channel.send_delta("999", "\n### Title\n1. step")
|
||
edited_text = channel._app.bot.edit_message_text.call_args.kwargs.get("text", "")
|
||
assert "###" not in edited_text
|
||
assert "**" not in edited_text
|
||
assert "Title" in edited_text
|
||
assert "1. step" in edited_text
|