From cf25a582bab6bea041285ca9e0b128a016c0ba4d Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Fri, 27 Mar 2026 13:35:26 +0000 Subject: [PATCH] fix(channel): stop delta coalescing at stream boundaries --- nanobot/channels/manager.py | 6 ++-- .../test_channel_manager_delta_coalescing.py | 36 +++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/nanobot/channels/manager.py b/nanobot/channels/manager.py index b21781487..0d6232251 100644 --- a/nanobot/channels/manager.py +++ b/nanobot/channels/manager.py @@ -180,7 +180,8 @@ class ChannelManager: final_metadata = dict(first_msg.metadata or {}) non_matching: list[OutboundMessage] = [] - # Drain all pending _stream_delta messages for the same (channel, chat_id) + # Only merge consecutive deltas. As soon as we hit any other message, + # stop and hand that boundary back to the dispatcher via `pending`. while True: try: next_msg = self.bus.outbound.get_nowait() @@ -201,8 +202,9 @@ class ChannelManager: # Stream ended - stop coalescing this stream break else: - # Keep for later processing + # First non-matching message defines the coalescing boundary. non_matching.append(next_msg) + break merged = OutboundMessage( channel=first_msg.channel, diff --git a/tests/channels/test_channel_manager_delta_coalescing.py b/tests/channels/test_channel_manager_delta_coalescing.py index 8b1bed5ef..0fa97f5b8 100644 --- a/tests/channels/test_channel_manager_delta_coalescing.py +++ b/tests/channels/test_channel_manager_delta_coalescing.py @@ -169,6 +169,42 @@ class TestDeltaCoalescing: # No pending assert len(pending) == 0 + @pytest.mark.asyncio + async def test_coalescing_stops_at_first_non_matching_boundary(self, manager, bus): + """Only consecutive deltas should be merged; later deltas stay queued.""" + await bus.publish_outbound(OutboundMessage( + channel="mock", + chat_id="chat1", + content="Hello", + metadata={"_stream_delta": True, "_stream_id": "seg-1"}, + )) + await bus.publish_outbound(OutboundMessage( + channel="mock", + chat_id="chat1", + content="", + metadata={"_stream_end": True, "_stream_id": "seg-1"}, + )) + await bus.publish_outbound(OutboundMessage( + channel="mock", + chat_id="chat1", + content="world", + metadata={"_stream_delta": True, "_stream_id": "seg-2"}, + )) + + first_msg = await bus.consume_outbound() + merged, pending = manager._coalesce_stream_deltas(first_msg) + + assert merged.content == "Hello" + assert merged.metadata.get("_stream_end") is None + assert len(pending) == 1 + assert pending[0].metadata.get("_stream_end") is True + assert pending[0].metadata.get("_stream_id") == "seg-1" + + # The next stream segment must remain in queue order for later dispatch. + remaining = await bus.consume_outbound() + assert remaining.content == "world" + assert remaining.metadata.get("_stream_id") == "seg-2" + @pytest.mark.asyncio async def test_non_delta_message_preserved(self, manager, bus): """Non-delta messages should be preserved in pending list."""