fix telegram streaming message boundaries

This commit is contained in:
Xubin Ren 2026-03-26 02:35:12 +00:00
parent 813de554c9
commit 33abe915e7
4 changed files with 106 additions and 6 deletions

View File

@ -373,17 +373,35 @@ class AgentLoop:
try:
on_stream = on_stream_end = None
if msg.metadata.get("_wants_stream"):
# Split one answer into distinct stream segments.
stream_base_id = f"{msg.session_key}:{time.time_ns()}"
stream_segment = 0
def _current_stream_id() -> str:
return f"{stream_base_id}:{stream_segment}"
async def on_stream(delta: str) -> None:
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id,
content=delta, metadata={"_stream_delta": True},
content=delta,
metadata={
"_stream_delta": True,
"_stream_id": _current_stream_id(),
},
))
async def on_stream_end(*, resuming: bool = False) -> None:
nonlocal stream_segment
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id,
content="", metadata={"_stream_end": True, "_resuming": resuming},
content="",
metadata={
"_stream_end": True,
"_resuming": resuming,
"_stream_id": _current_stream_id(),
},
))
stream_segment += 1
response = await self._process_message(
msg, on_stream=on_stream, on_stream_end=on_stream_end,

View File

@ -96,6 +96,10 @@ class BaseChannel(ABC):
Override in subclasses to enable streaming. Implementations should
raise on delivery failure so the channel manager can retry.
Streaming contract: ``_stream_delta`` is a chunk, ``_stream_end`` ends
the current segment, and stateful implementations must key buffers by
``_stream_id`` rather than only by ``chat_id``.
"""
pass

View File

@ -12,7 +12,7 @@ from typing import Any, Literal
from loguru import logger
from pydantic import Field
from telegram import BotCommand, ReactionTypeEmoji, ReplyParameters, Update
from telegram.error import TimedOut
from telegram.error import BadRequest, TimedOut
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
from telegram.request import HTTPXRequest
@ -163,6 +163,7 @@ class _StreamBuf:
text: str = ""
message_id: int | None = None
last_edit: float = 0.0
stream_id: str | None = None
class TelegramConfig(Base):
@ -478,17 +479,24 @@ class TelegramChannel(BaseChannel):
logger.error("Error sending Telegram message: {}", e2)
raise
@staticmethod
def _is_not_modified_error(exc: Exception) -> bool:
return isinstance(exc, BadRequest) and "message is not modified" in str(exc).lower()
async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None:
"""Progressive message editing: send on first delta, edit on subsequent ones."""
if not self._app:
return
meta = metadata or {}
int_chat_id = int(chat_id)
stream_id = meta.get("_stream_id")
if meta.get("_stream_end"):
buf = self._stream_bufs.get(chat_id)
if not buf or not buf.message_id or not buf.text:
return
if stream_id is not None and buf.stream_id is not None and buf.stream_id != stream_id:
return
self._stop_typing(chat_id)
try:
html = _markdown_to_telegram_html(buf.text)
@ -498,6 +506,10 @@ class TelegramChannel(BaseChannel):
text=html, parse_mode="HTML",
)
except Exception as e:
if self._is_not_modified_error(e):
logger.debug("Final stream edit already applied for {}", chat_id)
self._stream_bufs.pop(chat_id, None)
return
logger.debug("Final stream edit failed (HTML), trying plain: {}", e)
try:
await self._call_with_retry(
@ -506,15 +518,21 @@ class TelegramChannel(BaseChannel):
text=buf.text,
)
except Exception as e2:
if self._is_not_modified_error(e2):
logger.debug("Final stream plain edit already applied for {}", chat_id)
self._stream_bufs.pop(chat_id, None)
return
logger.warning("Final stream edit failed: {}", e2)
raise # Let ChannelManager handle retry
self._stream_bufs.pop(chat_id, None)
return
buf = self._stream_bufs.get(chat_id)
if buf is None:
buf = _StreamBuf()
if buf is None or (stream_id is not None and buf.stream_id is not None and buf.stream_id != stream_id):
buf = _StreamBuf(stream_id=stream_id)
self._stream_bufs[chat_id] = buf
elif buf.stream_id is None:
buf.stream_id = stream_id
buf.text += delta
if not buf.text.strip():
@ -541,6 +559,9 @@ class TelegramChannel(BaseChannel):
)
buf.last_edit = now
except Exception as e:
if self._is_not_modified_error(e):
buf.last_edit = now
return
logger.warning("Stream edit failed: {}", e)
raise # Let ChannelManager handle retry

View File

@ -50,8 +50,9 @@ class _FakeBot:
async def set_my_commands(self, commands) -> None:
self.commands = commands
async def send_message(self, **kwargs) -> None:
async def send_message(self, **kwargs):
self.sent_messages.append(kwargs)
return SimpleNamespace(message_id=len(self.sent_messages))
async def send_photo(self, **kwargs) -> None:
self.sent_media.append({"kind": "photo", **kwargs})
@ -295,6 +296,62 @@ async def test_send_delta_stream_end_raises_and_keeps_buffer_on_failure() -> Non
assert "123" in channel._stream_bufs
@pytest.mark.asyncio
async def test_send_delta_stream_end_treats_not_modified_as_success() -> None:
from telegram.error import BadRequest
channel = TelegramChannel(
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
MessageBus(),
)
channel._app = _FakeApp(lambda: None)
channel._app.bot.edit_message_text = AsyncMock(side_effect=BadRequest("Message is not modified"))
channel._stream_bufs["123"] = _StreamBuf(text="hello", message_id=7, last_edit=0.0, stream_id="s:0")
await channel.send_delta("123", "", {"_stream_end": True, "_stream_id": "s:0"})
assert "123" not in channel._stream_bufs
@pytest.mark.asyncio
async def test_send_delta_new_stream_id_replaces_stale_buffer() -> None:
channel = TelegramChannel(
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
MessageBus(),
)
channel._app = _FakeApp(lambda: None)
channel._stream_bufs["123"] = _StreamBuf(
text="hello",
message_id=7,
last_edit=0.0,
stream_id="old:0",
)
await channel.send_delta("123", "world", {"_stream_delta": True, "_stream_id": "new:0"})
buf = channel._stream_bufs["123"]
assert buf.text == "world"
assert buf.stream_id == "new:0"
assert buf.message_id == 1
@pytest.mark.asyncio
async def test_send_delta_incremental_edit_treats_not_modified_as_success() -> None:
from telegram.error import BadRequest
channel = TelegramChannel(
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
MessageBus(),
)
channel._app = _FakeApp(lambda: None)
channel._stream_bufs["123"] = _StreamBuf(text="hello", message_id=7, last_edit=0.0, stream_id="s:0")
channel._app.bot.edit_message_text = AsyncMock(side_effect=BadRequest("Message is not modified"))
await channel.send_delta("123", "", {"_stream_delta": True, "_stream_id": "s:0"})
assert channel._stream_bufs["123"].last_edit > 0.0
def test_derive_topic_session_key_uses_thread_id() -> None:
message = SimpleNamespace(
chat=SimpleNamespace(type="supergroup"),