From 2eea82f5ee0941a32f7e9d766fa4bc41f02a4a86 Mon Sep 17 00:00:00 2001 From: stutiredboy Date: Mon, 20 Apr 2026 11:33:44 +0800 Subject: [PATCH] fix(telegram): split oversized stream buffer mid-flight Cherry-picked from #3311 (stutiredboy). Streaming edits called edit_message_text(text=buf.text) without chunking, so once accumulated deltas crossed Telegram's 4096-char limit an ongoing stream would fail with BadRequest. Extracts _flush_stream_overflow helper that edits the first chunk in place, sends any middle chunks, and re-anchors the buffer to a new message for the tail so subsequent deltas keep streaming. Co-Authored-By: stutiredboy --- nanobot/channels/telegram.py | 104 ++++++++++++------------ tests/channels/test_telegram_channel.py | 33 ++++++++ 2 files changed, 86 insertions(+), 51 deletions(-) diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index d2265e386..bfb7e70a9 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -638,58 +638,60 @@ class TelegramChannel(BaseChannel): raise # Let ChannelManager handle retry elif (now - buf.last_edit) >= self.config.stream_edit_interval: if len(buf.text) > TELEGRAM_MAX_MESSAGE_LEN: - # Finish current message - current_text = buf.text[:TELEGRAM_MAX_MESSAGE_LEN] - try: - await self._call_with_retry( - self._app.bot.edit_message_text, - chat_id=int_chat_id, - message_id=buf.message_id, - text=current_text, - ) - except Exception as e: - logger.warning("Failed to edit current message before splitting: {}", e) - raise # Let ChannelManager handle retry - - # Prepare remaining content for a new message - remaining = buf.text[TELEGRAM_MAX_MESSAGE_LEN:] - logger.debug(f"[!] Splitting long message: {len(buf.text)} chars → new message with {len(remaining)} chars") - - # Create new buffer for the next chunk - self._stream_bufs[chat_id] = _StreamBuf(stream_id=stream_id) - new_buf = self._stream_bufs[chat_id] - new_buf.text = remaining - new_buf.last_edit = now - - # Immediately start the new message - if remaining.strip(): - try: - sent = await self._call_with_retry( - self._app.bot.send_message, - chat_id=int_chat_id, - text=remaining[:TELEGRAM_MAX_MESSAGE_LEN], - **thread_kwargs - ) - new_buf.message_id = sent.message_id - except Exception as e: - logger.error("Failed to send new message chunk after split: {}", e) - raise # Let ChannelManager handle retry - else: - # Normal edit (message is still under the limit) - try: - await self._call_with_retry( - self._app.bot.edit_message_text, - chat_id=int_chat_id, - message_id=buf.message_id, - text=buf.text, - ) + await self._flush_stream_overflow(int_chat_id, buf, thread_kwargs) + buf.last_edit = now + return + try: + await self._call_with_retry( + self._app.bot.edit_message_text, + chat_id=int_chat_id, message_id=buf.message_id, + text=buf.text, + ) + buf.last_edit = now + except Exception as e: + if self._is_not_modified_error(e): buf.last_edit = now - except Exception as e: - if self._is_not_modified_error(e): - buf.last_edit = now - return - logger.warning("Stream edit failed: {}", e) - raise # Let ChannelManager handle retry + return + logger.warning("Stream edit failed: {}", e) + raise # Let ChannelManager handle retry + + async def _flush_stream_overflow( + self, + chat_id: int, + buf: "_StreamBuf", + thread_kwargs: dict, + ) -> None: + """Split an oversized stream buffer mid-flight. + + Edits the current stream message with the first chunk, sends any + intermediate chunks as standalone messages, then opens a new message + for the tail so subsequent deltas continue streaming into it. + """ + chunks = split_message(buf.text, TELEGRAM_MAX_MESSAGE_LEN) + if len(chunks) <= 1: + return + try: + await self._call_with_retry( + self._app.bot.edit_message_text, + chat_id=chat_id, message_id=buf.message_id, + text=chunks[0], + ) + except Exception as e: + if not self._is_not_modified_error(e): + logger.warning("Stream overflow edit failed: {}", e) + raise + for chunk in chunks[1:-1]: + await self._call_with_retry( + self._app.bot.send_message, + chat_id=chat_id, text=chunk, **thread_kwargs, + ) + tail = chunks[-1] + sent = await self._call_with_retry( + self._app.bot.send_message, + chat_id=chat_id, text=tail, **thread_kwargs, + ) + buf.message_id = sent.message_id + buf.text = tail async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle /start command.""" diff --git a/tests/channels/test_telegram_channel.py b/tests/channels/test_telegram_channel.py index 6b24bc1ac..c5cc3cf4e 100644 --- a/tests/channels/test_telegram_channel.py +++ b/tests/channels/test_telegram_channel.py @@ -574,6 +574,39 @@ async def test_send_delta_incremental_edit_treats_not_modified_as_success() -> N assert channel._stream_bufs["123"].last_edit > 0.0 +@pytest.mark.asyncio +async def test_send_delta_incremental_edit_splits_oversized_buffer() -> None: + """Mid-stream overflow: once buf.text exceeds Telegram's limit, split into + chunks, edit the current message with the first chunk, and re-anchor the + buffer to a new message for the tail so further deltas keep streaming.""" + from nanobot.channels.telegram import TELEGRAM_MAX_MESSAGE_LEN + + channel = TelegramChannel( + TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]), + MessageBus(), + ) + channel._app = _FakeApp(lambda: None) + channel._app.bot.edit_message_text = AsyncMock() + channel._app.bot.send_message = AsyncMock(return_value=SimpleNamespace(message_id=99)) + + oversized = "x" * (TELEGRAM_MAX_MESSAGE_LEN + 500) + channel._stream_bufs["123"] = _StreamBuf( + text=oversized, message_id=7, last_edit=0.0, stream_id="s:0" + ) + + await channel.send_delta("123", "y", {"_stream_delta": True, "_stream_id": "s:0"}) + + channel._app.bot.edit_message_text.assert_called_once() + edit_text = channel._app.bot.edit_message_text.call_args.kwargs.get("text", "") + assert len(edit_text) <= TELEGRAM_MAX_MESSAGE_LEN + + channel._app.bot.send_message.assert_called_once() + buf = channel._stream_bufs["123"] + assert buf.message_id == 99 + assert len(buf.text) <= TELEGRAM_MAX_MESSAGE_LEN + assert buf.last_edit > 0.0 + + @pytest.mark.asyncio async def test_send_delta_initial_send_keeps_message_in_thread() -> None: channel = TelegramChannel(