fix(channel): stop delta coalescing at stream boundaries

This commit is contained in:
Xubin Ren 2026-03-27 13:35:26 +00:00 committed by Xubin Ren
parent 5ff9146a24
commit cf25a582ba
2 changed files with 40 additions and 2 deletions

View File

@ -180,7 +180,8 @@ class ChannelManager:
final_metadata = dict(first_msg.metadata or {})
non_matching: list[OutboundMessage] = []
# Drain all pending _stream_delta messages for the same (channel, chat_id)
# Only merge consecutive deltas. As soon as we hit any other message,
# stop and hand that boundary back to the dispatcher via `pending`.
while True:
try:
next_msg = self.bus.outbound.get_nowait()
@ -201,8 +202,9 @@ class ChannelManager:
# Stream ended - stop coalescing this stream
break
else:
# Keep for later processing
# First non-matching message defines the coalescing boundary.
non_matching.append(next_msg)
break
merged = OutboundMessage(
channel=first_msg.channel,

View File

@ -169,6 +169,42 @@ class TestDeltaCoalescing:
# No pending
assert len(pending) == 0
@pytest.mark.asyncio
async def test_coalescing_stops_at_first_non_matching_boundary(self, manager, bus):
"""Only consecutive deltas should be merged; later deltas stay queued."""
await bus.publish_outbound(OutboundMessage(
channel="mock",
chat_id="chat1",
content="Hello",
metadata={"_stream_delta": True, "_stream_id": "seg-1"},
))
await bus.publish_outbound(OutboundMessage(
channel="mock",
chat_id="chat1",
content="",
metadata={"_stream_end": True, "_stream_id": "seg-1"},
))
await bus.publish_outbound(OutboundMessage(
channel="mock",
chat_id="chat1",
content="world",
metadata={"_stream_delta": True, "_stream_id": "seg-2"},
))
first_msg = await bus.consume_outbound()
merged, pending = manager._coalesce_stream_deltas(first_msg)
assert merged.content == "Hello"
assert merged.metadata.get("_stream_end") is None
assert len(pending) == 1
assert pending[0].metadata.get("_stream_end") is True
assert pending[0].metadata.get("_stream_id") == "seg-1"
# The next stream segment must remain in queue order for later dispatch.
remaining = await bus.consume_outbound()
assert remaining.content == "world"
assert remaining.metadata.get("_stream_id") == "seg-2"
@pytest.mark.asyncio
async def test_non_delta_message_preserved(self, manager, bus):
"""Non-delta messages should be preserved in pending list."""