fix(telegram): split oversized stream buffer mid-flight

Cherry-picked from #3311 (stutiredboy). Streaming edits called
edit_message_text(text=buf.text) without chunking, so once accumulated
deltas crossed Telegram's 4096-char limit an ongoing stream would fail
with BadRequest.

Extracts _flush_stream_overflow helper that edits the first chunk in
place, sends any middle chunks, and re-anchors the buffer to a new
message for the tail so subsequent deltas keep streaming.

Co-Authored-By: stutiredboy <stutiredboy@users.noreply.github.com>
This commit is contained in:
stutiredboy 2026-04-20 11:33:44 +08:00 committed by Xubin Ren
parent fd8f08cc83
commit 2eea82f5ee
2 changed files with 86 additions and 51 deletions

View File

@ -638,49 +638,13 @@ class TelegramChannel(BaseChannel):
raise # Let ChannelManager handle retry raise # Let ChannelManager handle retry
elif (now - buf.last_edit) >= self.config.stream_edit_interval: elif (now - buf.last_edit) >= self.config.stream_edit_interval:
if len(buf.text) > TELEGRAM_MAX_MESSAGE_LEN: if len(buf.text) > TELEGRAM_MAX_MESSAGE_LEN:
# Finish current message await self._flush_stream_overflow(int_chat_id, buf, thread_kwargs)
current_text = buf.text[:TELEGRAM_MAX_MESSAGE_LEN] buf.last_edit = now
return
try: try:
await self._call_with_retry( await self._call_with_retry(
self._app.bot.edit_message_text, self._app.bot.edit_message_text,
chat_id=int_chat_id, chat_id=int_chat_id, message_id=buf.message_id,
message_id=buf.message_id,
text=current_text,
)
except Exception as e:
logger.warning("Failed to edit current message before splitting: {}", e)
raise # Let ChannelManager handle retry
# Prepare remaining content for a new message
remaining = buf.text[TELEGRAM_MAX_MESSAGE_LEN:]
logger.debug(f"[!] Splitting long message: {len(buf.text)} chars → new message with {len(remaining)} chars")
# Create new buffer for the next chunk
self._stream_bufs[chat_id] = _StreamBuf(stream_id=stream_id)
new_buf = self._stream_bufs[chat_id]
new_buf.text = remaining
new_buf.last_edit = now
# Immediately start the new message
if remaining.strip():
try:
sent = await self._call_with_retry(
self._app.bot.send_message,
chat_id=int_chat_id,
text=remaining[:TELEGRAM_MAX_MESSAGE_LEN],
**thread_kwargs
)
new_buf.message_id = sent.message_id
except Exception as e:
logger.error("Failed to send new message chunk after split: {}", e)
raise # Let ChannelManager handle retry
else:
# Normal edit (message is still under the limit)
try:
await self._call_with_retry(
self._app.bot.edit_message_text,
chat_id=int_chat_id,
message_id=buf.message_id,
text=buf.text, text=buf.text,
) )
buf.last_edit = now buf.last_edit = now
@ -691,6 +655,44 @@ class TelegramChannel(BaseChannel):
logger.warning("Stream edit failed: {}", e) logger.warning("Stream edit failed: {}", e)
raise # Let ChannelManager handle retry raise # Let ChannelManager handle retry
async def _flush_stream_overflow(
self,
chat_id: int,
buf: "_StreamBuf",
thread_kwargs: dict,
) -> None:
"""Split an oversized stream buffer mid-flight.
Edits the current stream message with the first chunk, sends any
intermediate chunks as standalone messages, then opens a new message
for the tail so subsequent deltas continue streaming into it.
"""
chunks = split_message(buf.text, TELEGRAM_MAX_MESSAGE_LEN)
if len(chunks) <= 1:
return
try:
await self._call_with_retry(
self._app.bot.edit_message_text,
chat_id=chat_id, message_id=buf.message_id,
text=chunks[0],
)
except Exception as e:
if not self._is_not_modified_error(e):
logger.warning("Stream overflow edit failed: {}", e)
raise
for chunk in chunks[1:-1]:
await self._call_with_retry(
self._app.bot.send_message,
chat_id=chat_id, text=chunk, **thread_kwargs,
)
tail = chunks[-1]
sent = await self._call_with_retry(
self._app.bot.send_message,
chat_id=chat_id, text=tail, **thread_kwargs,
)
buf.message_id = sent.message_id
buf.text = tail
async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /start command.""" """Handle /start command."""
if not update.message or not update.effective_user: if not update.message or not update.effective_user:

View File

@ -574,6 +574,39 @@ async def test_send_delta_incremental_edit_treats_not_modified_as_success() -> N
assert channel._stream_bufs["123"].last_edit > 0.0 assert channel._stream_bufs["123"].last_edit > 0.0
@pytest.mark.asyncio
async def test_send_delta_incremental_edit_splits_oversized_buffer() -> None:
"""Mid-stream overflow: once buf.text exceeds Telegram's limit, split into
chunks, edit the current message with the first chunk, and re-anchor the
buffer to a new message for the tail so further deltas keep streaming."""
from nanobot.channels.telegram import TELEGRAM_MAX_MESSAGE_LEN
channel = TelegramChannel(
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
MessageBus(),
)
channel._app = _FakeApp(lambda: None)
channel._app.bot.edit_message_text = AsyncMock()
channel._app.bot.send_message = AsyncMock(return_value=SimpleNamespace(message_id=99))
oversized = "x" * (TELEGRAM_MAX_MESSAGE_LEN + 500)
channel._stream_bufs["123"] = _StreamBuf(
text=oversized, message_id=7, last_edit=0.0, stream_id="s:0"
)
await channel.send_delta("123", "y", {"_stream_delta": True, "_stream_id": "s:0"})
channel._app.bot.edit_message_text.assert_called_once()
edit_text = channel._app.bot.edit_message_text.call_args.kwargs.get("text", "")
assert len(edit_text) <= TELEGRAM_MAX_MESSAGE_LEN
channel._app.bot.send_message.assert_called_once()
buf = channel._stream_bufs["123"]
assert buf.message_id == 99
assert len(buf.text) <= TELEGRAM_MAX_MESSAGE_LEN
assert buf.last_edit > 0.0
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_send_delta_initial_send_keeps_message_in_thread() -> None: async def test_send_delta_initial_send_keeps_message_in_thread() -> None:
channel = TelegramChannel( channel = TelegramChannel(