diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index afe62ca28..3482e38d2 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -373,17 +373,35 @@ class AgentLoop: try: on_stream = on_stream_end = None if msg.metadata.get("_wants_stream"): + # Split one answer into distinct stream segments. + stream_base_id = f"{msg.session_key}:{time.time_ns()}" + stream_segment = 0 + + def _current_stream_id() -> str: + return f"{stream_base_id}:{stream_segment}" + async def on_stream(delta: str) -> None: await self.bus.publish_outbound(OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, - content=delta, metadata={"_stream_delta": True}, + content=delta, + metadata={ + "_stream_delta": True, + "_stream_id": _current_stream_id(), + }, )) async def on_stream_end(*, resuming: bool = False) -> None: + nonlocal stream_segment await self.bus.publish_outbound(OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, - content="", metadata={"_stream_end": True, "_resuming": resuming}, + content="", + metadata={ + "_stream_end": True, + "_resuming": resuming, + "_stream_id": _current_stream_id(), + }, )) + stream_segment += 1 response = await self._process_message( msg, on_stream=on_stream, on_stream_end=on_stream_end, diff --git a/nanobot/channels/base.py b/nanobot/channels/base.py index 5a776eed4..86e991344 100644 --- a/nanobot/channels/base.py +++ b/nanobot/channels/base.py @@ -96,6 +96,10 @@ class BaseChannel(ABC): Override in subclasses to enable streaming. Implementations should raise on delivery failure so the channel manager can retry. + + Streaming contract: ``_stream_delta`` is a chunk, ``_stream_end`` ends + the current segment, and stateful implementations must key buffers by + ``_stream_id`` rather than only by ``chat_id``. """ pass diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index c3041c9d2..feb908657 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -12,7 +12,7 @@ from typing import Any, Literal from loguru import logger from pydantic import Field from telegram import BotCommand, ReactionTypeEmoji, ReplyParameters, Update -from telegram.error import TimedOut +from telegram.error import BadRequest, TimedOut from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters from telegram.request import HTTPXRequest @@ -163,6 +163,7 @@ class _StreamBuf: text: str = "" message_id: int | None = None last_edit: float = 0.0 + stream_id: str | None = None class TelegramConfig(Base): @@ -478,17 +479,24 @@ class TelegramChannel(BaseChannel): logger.error("Error sending Telegram message: {}", e2) raise + @staticmethod + def _is_not_modified_error(exc: Exception) -> bool: + return isinstance(exc, BadRequest) and "message is not modified" in str(exc).lower() + async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None: """Progressive message editing: send on first delta, edit on subsequent ones.""" if not self._app: return meta = metadata or {} int_chat_id = int(chat_id) + stream_id = meta.get("_stream_id") if meta.get("_stream_end"): buf = self._stream_bufs.get(chat_id) if not buf or not buf.message_id or not buf.text: return + if stream_id is not None and buf.stream_id is not None and buf.stream_id != stream_id: + return self._stop_typing(chat_id) try: html = _markdown_to_telegram_html(buf.text) @@ -498,6 +506,10 @@ class TelegramChannel(BaseChannel): text=html, parse_mode="HTML", ) except Exception as e: + if self._is_not_modified_error(e): + logger.debug("Final stream edit already applied for {}", chat_id) + self._stream_bufs.pop(chat_id, None) + return logger.debug("Final stream edit failed (HTML), trying plain: {}", e) try: await self._call_with_retry( @@ -506,15 +518,21 @@ class TelegramChannel(BaseChannel): text=buf.text, ) except Exception as e2: + if self._is_not_modified_error(e2): + logger.debug("Final stream plain edit already applied for {}", chat_id) + self._stream_bufs.pop(chat_id, None) + return logger.warning("Final stream edit failed: {}", e2) raise # Let ChannelManager handle retry self._stream_bufs.pop(chat_id, None) return buf = self._stream_bufs.get(chat_id) - if buf is None: - buf = _StreamBuf() + if buf is None or (stream_id is not None and buf.stream_id is not None and buf.stream_id != stream_id): + buf = _StreamBuf(stream_id=stream_id) self._stream_bufs[chat_id] = buf + elif buf.stream_id is None: + buf.stream_id = stream_id buf.text += delta if not buf.text.strip(): @@ -541,6 +559,9 @@ class TelegramChannel(BaseChannel): ) buf.last_edit = now except Exception as e: + if self._is_not_modified_error(e): + buf.last_edit = now + return logger.warning("Stream edit failed: {}", e) raise # Let ChannelManager handle retry diff --git a/tests/channels/test_telegram_channel.py b/tests/channels/test_telegram_channel.py index 6b4c008e0..d5dafdee7 100644 --- a/tests/channels/test_telegram_channel.py +++ b/tests/channels/test_telegram_channel.py @@ -50,8 +50,9 @@ class _FakeBot: async def set_my_commands(self, commands) -> None: self.commands = commands - async def send_message(self, **kwargs) -> None: + async def send_message(self, **kwargs): self.sent_messages.append(kwargs) + return SimpleNamespace(message_id=len(self.sent_messages)) async def send_photo(self, **kwargs) -> None: self.sent_media.append({"kind": "photo", **kwargs}) @@ -295,6 +296,62 @@ async def test_send_delta_stream_end_raises_and_keeps_buffer_on_failure() -> Non assert "123" in channel._stream_bufs +@pytest.mark.asyncio +async def test_send_delta_stream_end_treats_not_modified_as_success() -> None: + from telegram.error import BadRequest + + channel = TelegramChannel( + TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]), + MessageBus(), + ) + channel._app = _FakeApp(lambda: None) + channel._app.bot.edit_message_text = AsyncMock(side_effect=BadRequest("Message is not modified")) + channel._stream_bufs["123"] = _StreamBuf(text="hello", message_id=7, last_edit=0.0, stream_id="s:0") + + await channel.send_delta("123", "", {"_stream_end": True, "_stream_id": "s:0"}) + + assert "123" not in channel._stream_bufs + + +@pytest.mark.asyncio +async def test_send_delta_new_stream_id_replaces_stale_buffer() -> None: + channel = TelegramChannel( + TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]), + MessageBus(), + ) + channel._app = _FakeApp(lambda: None) + channel._stream_bufs["123"] = _StreamBuf( + text="hello", + message_id=7, + last_edit=0.0, + stream_id="old:0", + ) + + await channel.send_delta("123", "world", {"_stream_delta": True, "_stream_id": "new:0"}) + + buf = channel._stream_bufs["123"] + assert buf.text == "world" + assert buf.stream_id == "new:0" + assert buf.message_id == 1 + + +@pytest.mark.asyncio +async def test_send_delta_incremental_edit_treats_not_modified_as_success() -> None: + from telegram.error import BadRequest + + channel = TelegramChannel( + TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]), + MessageBus(), + ) + channel._app = _FakeApp(lambda: None) + channel._stream_bufs["123"] = _StreamBuf(text="hello", message_id=7, last_edit=0.0, stream_id="s:0") + channel._app.bot.edit_message_text = AsyncMock(side_effect=BadRequest("Message is not modified")) + + await channel.send_delta("123", "", {"_stream_delta": True, "_stream_id": "s:0"}) + + assert channel._stream_bufs["123"].last_edit > 0.0 + + def test_derive_topic_session_key_uses_thread_id() -> None: message = SimpleNamespace( chat=SimpleNamespace(type="supergroup"),