fix(channel): preserve threaded streaming context

This commit is contained in:
Xubin Ren 2026-03-31 11:18:18 +00:00 committed by Xubin Ren
parent 8956df3668
commit f450c6ef6c
5 changed files with 155 additions and 19 deletions

View File

@ -403,25 +403,25 @@ class AgentLoop:
return f"{stream_base_id}:{stream_segment}"
async def on_stream(delta: str) -> None:
meta = dict(msg.metadata or {})
meta["_stream_delta"] = True
meta["_stream_id"] = _current_stream_id()
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id,
content=delta,
metadata={
"_stream_delta": True,
"_stream_id": _current_stream_id(),
},
metadata=meta,
))
async def on_stream_end(*, resuming: bool = False) -> None:
nonlocal stream_segment
meta = dict(msg.metadata or {})
meta["_stream_end"] = True
meta["_resuming"] = resuming
meta["_stream_id"] = _current_stream_id()
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id,
content="",
metadata={
"_stream_end": True,
"_resuming": resuming,
"_stream_id": _current_stream_id(),
},
metadata=meta,
))
stream_segment += 1

View File

@ -132,7 +132,11 @@ def _render_markdown_html(text: str) -> str | None:
return formatted
def _build_matrix_text_content(text: str, event_id: str | None = None) -> dict[str, object]:
def _build_matrix_text_content(
text: str,
event_id: str | None = None,
thread_relates_to: dict[str, object] | None = None,
) -> dict[str, object]:
"""
Constructs and returns a dictionary representing the matrix text content with optional
HTML formatting and reference to an existing event for replacement. This function is
@ -144,6 +148,9 @@ def _build_matrix_text_content(text: str, event_id: str | None = None) -> dict[s
include information indicating that the message is a replacement of the specified
event.
:type event_id: str | None
:param thread_relates_to: Optional Matrix thread relation metadata. For edits this is
stored in ``m.new_content`` so the replacement remains in the same thread.
:type thread_relates_to: dict[str, object] | None
:return: A dictionary containing the matrix text content, potentially enriched with
HTML formatting and replacement metadata if applicable.
:rtype: dict[str, object]
@ -153,14 +160,18 @@ def _build_matrix_text_content(text: str, event_id: str | None = None) -> dict[s
content["format"] = MATRIX_HTML_FORMAT
content["formatted_body"] = html
if event_id:
content["m.new_content"] = {
content["m.new_content"] = {
"body": text,
"msgtype": "m.text"
"msgtype": "m.text",
}
content["m.relates_to"] = {
"rel_type": "m.replace",
"event_id": event_id
"event_id": event_id,
}
if thread_relates_to:
content["m.new_content"]["m.relates_to"] = thread_relates_to
elif thread_relates_to:
content["m.relates_to"] = thread_relates_to
return content
@ -475,9 +486,11 @@ class MatrixChannel(BaseChannel):
await self._stop_typing_keepalive(chat_id, clear_typing=True)
content = _build_matrix_text_content(buf.text, buf.event_id)
if relates_to:
content["m.relates_to"] = relates_to
content = _build_matrix_text_content(
buf.text,
buf.event_id,
thread_relates_to=relates_to,
)
await self._send_room_content(chat_id, content)
return
@ -494,14 +507,18 @@ class MatrixChannel(BaseChannel):
if not buf.last_edit or (now - buf.last_edit) >= self._STREAM_EDIT_INTERVAL:
try:
content = _build_matrix_text_content(buf.text, buf.event_id)
content = _build_matrix_text_content(
buf.text,
buf.event_id,
thread_relates_to=relates_to,
)
response = await self._send_room_content(chat_id, content)
buf.last_edit = now
if not buf.event_id:
# we are editing the same message all the time, so only the first time the event id needs to be set
buf.event_id = response.event_id
except Exception:
await self._stop_typing_keepalive(metadata["room_id"], clear_typing=True)
await self._stop_typing_keepalive(chat_id, clear_typing=True)
pass

View File

@ -117,6 +117,43 @@ class TestDispatch:
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert out.content == "hi"
@pytest.mark.asyncio
async def test_dispatch_streaming_preserves_message_metadata(self):
from nanobot.bus.events import InboundMessage
loop, bus = _make_loop()
msg = InboundMessage(
channel="matrix",
sender_id="u1",
chat_id="!room:matrix.org",
content="hello",
metadata={
"_wants_stream": True,
"thread_root_event_id": "$root1",
"thread_reply_to_event_id": "$reply1",
},
)
async def fake_process(_msg, *, on_stream=None, on_stream_end=None, **kwargs):
assert on_stream is not None
assert on_stream_end is not None
await on_stream("hi")
await on_stream_end(resuming=False)
return None
loop._process_message = fake_process
await loop._dispatch(msg)
first = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
second = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert first.metadata["thread_root_event_id"] == "$root1"
assert first.metadata["thread_reply_to_event_id"] == "$reply1"
assert first.metadata["_stream_delta"] is True
assert second.metadata["thread_root_event_id"] == "$root1"
assert second.metadata["thread_reply_to_event_id"] == "$reply1"
assert second.metadata["_stream_end"] is True
@pytest.mark.asyncio
async def test_processing_lock_serializes(self):
from nanobot.bus.events import InboundMessage, OutboundMessage

View File

@ -4,8 +4,8 @@ import asyncio
from pathlib import Path
from types import SimpleNamespace
import discord
import pytest
discord = pytest.importorskip("discord")
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus

View File

@ -1367,6 +1367,23 @@ def test_build_matrix_text_content_with_event_id() -> None:
assert result["m.relates_to"]["event_id"] == event_id
def test_build_matrix_text_content_with_event_id_preserves_thread_relation() -> None:
"""Thread relations for edits should stay inside m.new_content."""
relates_to = {
"rel_type": "m.thread",
"event_id": "$root1",
"m.in_reply_to": {"event_id": "$reply1"},
"is_falling_back": True,
}
result = _build_matrix_text_content("Updated message", "event-1", relates_to)
assert result["m.relates_to"] == {
"rel_type": "m.replace",
"event_id": "event-1",
}
assert result["m.new_content"]["m.relates_to"] == relates_to
def test_build_matrix_text_content_no_event_id() -> None:
"""Test that when event_id is not provided, no extra properties are added."""
result = _build_matrix_text_content("Regular message")
@ -1500,6 +1517,71 @@ async def test_send_delta_stream_end_replaces_existing_message() -> None:
}
@pytest.mark.asyncio
async def test_send_delta_starts_threaded_stream_inside_thread() -> None:
channel = MatrixChannel(_make_config(), MessageBus())
client = _FakeAsyncClient("", "", "", None)
channel.client = client
client.room_send_response.event_id = "event-1"
metadata = {
"thread_root_event_id": "$root1",
"thread_reply_to_event_id": "$reply1",
}
await channel.send_delta("!room:matrix.org", "Hello", metadata)
assert client.room_send_calls[0]["content"]["m.relates_to"] == {
"rel_type": "m.thread",
"event_id": "$root1",
"m.in_reply_to": {"event_id": "$reply1"},
"is_falling_back": True,
}
@pytest.mark.asyncio
async def test_send_delta_threaded_edit_keeps_replace_and_thread_relation(monkeypatch) -> None:
channel = MatrixChannel(_make_config(), MessageBus())
client = _FakeAsyncClient("", "", "", None)
channel.client = client
client.room_send_response.event_id = "event-1"
times = [100.0, 102.0, 104.0]
times.reverse()
monkeypatch.setattr(channel, "monotonic_time", lambda: times and times.pop())
metadata = {
"thread_root_event_id": "$root1",
"thread_reply_to_event_id": "$reply1",
}
await channel.send_delta("!room:matrix.org", "Hello", metadata)
await channel.send_delta("!room:matrix.org", " world", metadata)
await channel.send_delta("!room:matrix.org", "", {"_stream_end": True, **metadata})
edit_content = client.room_send_calls[1]["content"]
final_content = client.room_send_calls[2]["content"]
assert edit_content["m.relates_to"] == {
"rel_type": "m.replace",
"event_id": "event-1",
}
assert edit_content["m.new_content"]["m.relates_to"] == {
"rel_type": "m.thread",
"event_id": "$root1",
"m.in_reply_to": {"event_id": "$reply1"},
"is_falling_back": True,
}
assert final_content["m.relates_to"] == {
"rel_type": "m.replace",
"event_id": "event-1",
}
assert final_content["m.new_content"]["m.relates_to"] == {
"rel_type": "m.thread",
"event_id": "$root1",
"m.in_reply_to": {"event_id": "$reply1"},
"is_falling_back": True,
}
@pytest.mark.asyncio
async def test_send_delta_stream_end_noop_when_buffer_missing() -> None:
channel = MatrixChannel(_make_config(), MessageBus())