From f450c6ef6c0ca9afc2c03c91fd727e94f28464a6 Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Tue, 31 Mar 2026 11:18:18 +0000 Subject: [PATCH] fix(channel): preserve threaded streaming context --- nanobot/agent/loop.py | 18 +++--- nanobot/channels/matrix.py | 35 ++++++++--- tests/agent/test_task_cancel.py | 37 ++++++++++++ tests/channels/test_discord_channel.py | 2 +- tests/channels/test_matrix_channel.py | 82 ++++++++++++++++++++++++++ 5 files changed, 155 insertions(+), 19 deletions(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 97d352cb8..a9dc589e8 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -403,25 +403,25 @@ class AgentLoop: return f"{stream_base_id}:{stream_segment}" async def on_stream(delta: str) -> None: + meta = dict(msg.metadata or {}) + meta["_stream_delta"] = True + meta["_stream_id"] = _current_stream_id() await self.bus.publish_outbound(OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=delta, - metadata={ - "_stream_delta": True, - "_stream_id": _current_stream_id(), - }, + metadata=meta, )) async def on_stream_end(*, resuming: bool = False) -> None: nonlocal stream_segment + meta = dict(msg.metadata or {}) + meta["_stream_end"] = True + meta["_resuming"] = resuming + meta["_stream_id"] = _current_stream_id() await self.bus.publish_outbound(OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content="", - metadata={ - "_stream_end": True, - "_resuming": resuming, - "_stream_id": _current_stream_id(), - }, + metadata=meta, )) stream_segment += 1 diff --git a/nanobot/channels/matrix.py b/nanobot/channels/matrix.py index dcece1043..bc6d9398a 100644 --- a/nanobot/channels/matrix.py +++ b/nanobot/channels/matrix.py @@ -132,7 +132,11 @@ def _render_markdown_html(text: str) -> str | None: return formatted -def _build_matrix_text_content(text: str, event_id: str | None = None) -> dict[str, object]: +def _build_matrix_text_content( + text: str, + event_id: str | None = None, + thread_relates_to: dict[str, object] | None = None, +) -> dict[str, object]: """ Constructs and returns a dictionary representing the matrix text content with optional HTML formatting and reference to an existing event for replacement. This function is @@ -144,6 +148,9 @@ def _build_matrix_text_content(text: str, event_id: str | None = None) -> dict[s include information indicating that the message is a replacement of the specified event. :type event_id: str | None + :param thread_relates_to: Optional Matrix thread relation metadata. For edits this is + stored in ``m.new_content`` so the replacement remains in the same thread. + :type thread_relates_to: dict[str, object] | None :return: A dictionary containing the matrix text content, potentially enriched with HTML formatting and replacement metadata if applicable. :rtype: dict[str, object] @@ -153,14 +160,18 @@ def _build_matrix_text_content(text: str, event_id: str | None = None) -> dict[s content["format"] = MATRIX_HTML_FORMAT content["formatted_body"] = html if event_id: - content["m.new_content"] = { + content["m.new_content"] = { "body": text, - "msgtype": "m.text" + "msgtype": "m.text", } content["m.relates_to"] = { "rel_type": "m.replace", - "event_id": event_id + "event_id": event_id, } + if thread_relates_to: + content["m.new_content"]["m.relates_to"] = thread_relates_to + elif thread_relates_to: + content["m.relates_to"] = thread_relates_to return content @@ -475,9 +486,11 @@ class MatrixChannel(BaseChannel): await self._stop_typing_keepalive(chat_id, clear_typing=True) - content = _build_matrix_text_content(buf.text, buf.event_id) - if relates_to: - content["m.relates_to"] = relates_to + content = _build_matrix_text_content( + buf.text, + buf.event_id, + thread_relates_to=relates_to, + ) await self._send_room_content(chat_id, content) return @@ -494,14 +507,18 @@ class MatrixChannel(BaseChannel): if not buf.last_edit or (now - buf.last_edit) >= self._STREAM_EDIT_INTERVAL: try: - content = _build_matrix_text_content(buf.text, buf.event_id) + content = _build_matrix_text_content( + buf.text, + buf.event_id, + thread_relates_to=relates_to, + ) response = await self._send_room_content(chat_id, content) buf.last_edit = now if not buf.event_id: # we are editing the same message all the time, so only the first time the event id needs to be set buf.event_id = response.event_id except Exception: - await self._stop_typing_keepalive(metadata["room_id"], clear_typing=True) + await self._stop_typing_keepalive(chat_id, clear_typing=True) pass diff --git a/tests/agent/test_task_cancel.py b/tests/agent/test_task_cancel.py index 4902a4c80..70f7621d1 100644 --- a/tests/agent/test_task_cancel.py +++ b/tests/agent/test_task_cancel.py @@ -117,6 +117,43 @@ class TestDispatch: out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) assert out.content == "hi" + @pytest.mark.asyncio + async def test_dispatch_streaming_preserves_message_metadata(self): + from nanobot.bus.events import InboundMessage + + loop, bus = _make_loop() + msg = InboundMessage( + channel="matrix", + sender_id="u1", + chat_id="!room:matrix.org", + content="hello", + metadata={ + "_wants_stream": True, + "thread_root_event_id": "$root1", + "thread_reply_to_event_id": "$reply1", + }, + ) + + async def fake_process(_msg, *, on_stream=None, on_stream_end=None, **kwargs): + assert on_stream is not None + assert on_stream_end is not None + await on_stream("hi") + await on_stream_end(resuming=False) + return None + + loop._process_message = fake_process + + await loop._dispatch(msg) + first = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + second = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + + assert first.metadata["thread_root_event_id"] == "$root1" + assert first.metadata["thread_reply_to_event_id"] == "$reply1" + assert first.metadata["_stream_delta"] is True + assert second.metadata["thread_root_event_id"] == "$root1" + assert second.metadata["thread_reply_to_event_id"] == "$reply1" + assert second.metadata["_stream_end"] is True + @pytest.mark.asyncio async def test_processing_lock_serializes(self): from nanobot.bus.events import InboundMessage, OutboundMessage diff --git a/tests/channels/test_discord_channel.py b/tests/channels/test_discord_channel.py index 3f1f996fc..d352c788c 100644 --- a/tests/channels/test_discord_channel.py +++ b/tests/channels/test_discord_channel.py @@ -4,8 +4,8 @@ import asyncio from pathlib import Path from types import SimpleNamespace -import discord import pytest +discord = pytest.importorskip("discord") from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus diff --git a/tests/channels/test_matrix_channel.py b/tests/channels/test_matrix_channel.py index 3ad65e76b..18a8e1097 100644 --- a/tests/channels/test_matrix_channel.py +++ b/tests/channels/test_matrix_channel.py @@ -1367,6 +1367,23 @@ def test_build_matrix_text_content_with_event_id() -> None: assert result["m.relates_to"]["event_id"] == event_id +def test_build_matrix_text_content_with_event_id_preserves_thread_relation() -> None: + """Thread relations for edits should stay inside m.new_content.""" + relates_to = { + "rel_type": "m.thread", + "event_id": "$root1", + "m.in_reply_to": {"event_id": "$reply1"}, + "is_falling_back": True, + } + result = _build_matrix_text_content("Updated message", "event-1", relates_to) + + assert result["m.relates_to"] == { + "rel_type": "m.replace", + "event_id": "event-1", + } + assert result["m.new_content"]["m.relates_to"] == relates_to + + def test_build_matrix_text_content_no_event_id() -> None: """Test that when event_id is not provided, no extra properties are added.""" result = _build_matrix_text_content("Regular message") @@ -1500,6 +1517,71 @@ async def test_send_delta_stream_end_replaces_existing_message() -> None: } +@pytest.mark.asyncio +async def test_send_delta_starts_threaded_stream_inside_thread() -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + client.room_send_response.event_id = "event-1" + + metadata = { + "thread_root_event_id": "$root1", + "thread_reply_to_event_id": "$reply1", + } + await channel.send_delta("!room:matrix.org", "Hello", metadata) + + assert client.room_send_calls[0]["content"]["m.relates_to"] == { + "rel_type": "m.thread", + "event_id": "$root1", + "m.in_reply_to": {"event_id": "$reply1"}, + "is_falling_back": True, + } + + +@pytest.mark.asyncio +async def test_send_delta_threaded_edit_keeps_replace_and_thread_relation(monkeypatch) -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + client.room_send_response.event_id = "event-1" + + times = [100.0, 102.0, 104.0] + times.reverse() + monkeypatch.setattr(channel, "monotonic_time", lambda: times and times.pop()) + + metadata = { + "thread_root_event_id": "$root1", + "thread_reply_to_event_id": "$reply1", + } + await channel.send_delta("!room:matrix.org", "Hello", metadata) + await channel.send_delta("!room:matrix.org", " world", metadata) + await channel.send_delta("!room:matrix.org", "", {"_stream_end": True, **metadata}) + + edit_content = client.room_send_calls[1]["content"] + final_content = client.room_send_calls[2]["content"] + + assert edit_content["m.relates_to"] == { + "rel_type": "m.replace", + "event_id": "event-1", + } + assert edit_content["m.new_content"]["m.relates_to"] == { + "rel_type": "m.thread", + "event_id": "$root1", + "m.in_reply_to": {"event_id": "$reply1"}, + "is_falling_back": True, + } + assert final_content["m.relates_to"] == { + "rel_type": "m.replace", + "event_id": "event-1", + } + assert final_content["m.new_content"]["m.relates_to"] == { + "rel_type": "m.thread", + "event_id": "$root1", + "m.in_reply_to": {"event_id": "$reply1"}, + "is_falling_back": True, + } + + @pytest.mark.asyncio async def test_send_delta_stream_end_noop_when_buffer_missing() -> None: channel = MatrixChannel(_make_config(), MessageBus())