mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-05-21 09:02:32 +00:00
fix(signal): redistribute textStyle ranges across split message chunks
split_message can break a long Signal payload into multiple JSON-RPC sends, but the previous code attached the full textStyle list only to chunk 0. Style ranges in later chunks were dropped, and ranges whose offsets pointed past chunk 0's end were sent as invalid metadata against chunk 0. Add _partition_styles, which rebases each range against the chunk it lives in (in UTF-16 code units, matching the markdown converter) and splits boundary-spanning ranges across the chunks they touch. Whitespace trimmed by split_message's lstrip is skipped so offsets stay aligned. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
8a2a5eecdd
commit
6ec6c9bb83
@ -208,6 +208,54 @@ def _markdown_to_signal(text: str) -> tuple[str, list[str]]:
|
||||
return plain_text, text_styles
|
||||
|
||||
|
||||
def _partition_styles(
|
||||
plain_text: str, chunks: list[str], text_styles: list[str]
|
||||
) -> list[list[str]]:
|
||||
"""Partition Signal textStyle ranges across message chunks.
|
||||
|
||||
``split_message`` slices ``plain_text`` into pieces (optionally trimming
|
||||
whitespace at the boundaries), but the style ranges produced by
|
||||
``_markdown_to_signal`` are expressed in UTF-16 offsets relative to the
|
||||
full ``plain_text``. This redistributes them per chunk with offsets
|
||||
rebased to each chunk's start. Ranges that span a boundary are split
|
||||
across the chunks they touch; ranges that fall entirely in trimmed
|
||||
whitespace are dropped.
|
||||
"""
|
||||
if not chunks:
|
||||
return []
|
||||
if not text_styles:
|
||||
return [[] for _ in chunks]
|
||||
|
||||
# Locate each chunk's UTF-16 start in plain_text. split_message lstrips at
|
||||
# boundaries (but not before the first chunk), so we skip whitespace
|
||||
# between chunks to mirror that.
|
||||
chunk_ranges: list[tuple[int, int]] = []
|
||||
cursor = 0 # Python codepoint cursor in plain_text
|
||||
for i, chunk in enumerate(chunks):
|
||||
if i > 0:
|
||||
while cursor < len(plain_text) and plain_text[cursor].isspace():
|
||||
cursor += 1
|
||||
utf16_start = _utf16_len(plain_text[:cursor])
|
||||
utf16_end = utf16_start + _utf16_len(chunk)
|
||||
chunk_ranges.append((utf16_start, utf16_end))
|
||||
cursor += len(chunk)
|
||||
|
||||
result: list[list[str]] = [[] for _ in chunks]
|
||||
for entry in text_styles:
|
||||
s, ln, style = entry.split(":", 2)
|
||||
r_start = int(s)
|
||||
r_end = r_start + int(ln)
|
||||
for i, (c_start, c_end) in enumerate(chunk_ranges):
|
||||
if r_end <= c_start or r_start >= c_end:
|
||||
continue
|
||||
new_start = max(r_start, c_start) - c_start
|
||||
new_end = min(r_end, c_end) - c_start
|
||||
new_length = new_end - new_start
|
||||
if new_length > 0:
|
||||
result[i].append(f"{new_start}:{new_length}:{style}")
|
||||
return result
|
||||
|
||||
|
||||
class SignalDMConfig(Base):
|
||||
"""Signal DM policy configuration."""
|
||||
|
||||
@ -392,10 +440,11 @@ class SignalChannel(BaseChannel):
|
||||
recipient_params = self._recipient_params(msg.chat_id)
|
||||
|
||||
chunks = split_message(plain_text, self._MAX_MESSAGE_LEN) if plain_text else [""]
|
||||
chunk_styles = _partition_styles(plain_text, chunks, text_styles)
|
||||
for i, chunk in enumerate(chunks):
|
||||
params: dict[str, Any] = {"message": chunk}
|
||||
if text_styles and i == 0:
|
||||
params["textStyle"] = text_styles
|
||||
if chunk_styles[i]:
|
||||
params["textStyle"] = chunk_styles[i]
|
||||
params.update(recipient_params)
|
||||
if msg.media and i == 0:
|
||||
params["attachments"] = msg.media
|
||||
|
||||
@ -834,6 +834,38 @@ class TestSend:
|
||||
assert "textStyle" in params
|
||||
assert any("BOLD" in s for s in params["textStyle"])
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_split_message_redistributes_text_styles(self):
|
||||
"""Long message split across chunks: each chunk gets its own textStyle
|
||||
with offsets rebased to that chunk."""
|
||||
ch, client = self._make_send_channel()
|
||||
ch._MAX_MESSAGE_LEN = 12 # type: ignore[attr-defined]
|
||||
msg = OutboundMessage(
|
||||
channel="signal",
|
||||
chat_id="+19995550001",
|
||||
content="**head** middle and **tail**",
|
||||
)
|
||||
await ch.send(msg)
|
||||
assert len(client.posts) >= 2
|
||||
# Chunk 0 has BOLD for "head"; chunk 1+ must also carry BOLD for "tail".
|
||||
bold_chunks = [
|
||||
p["json"]["params"]
|
||||
for p in client.posts
|
||||
if any("BOLD" in s for s in p["json"]["params"].get("textStyle", []))
|
||||
]
|
||||
assert len(bold_chunks) >= 2, (
|
||||
"expected BOLD ranges in more than one chunk; got "
|
||||
f"{[p['json']['params'] for p in client.posts]}"
|
||||
)
|
||||
# Each emitted range must point inside its own chunk's text.
|
||||
for params in bold_chunks:
|
||||
chunk_text = params["message"]
|
||||
for entry in params["textStyle"]:
|
||||
s, ln, _ = entry.split(":", 2)
|
||||
start, length = int(s), int(ln)
|
||||
end_units = start + length
|
||||
assert end_units <= len(chunk_text.encode("utf-16-le")) // 2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_empty_content_skips_rpc(self):
|
||||
ch, client = self._make_send_channel()
|
||||
|
||||
@ -1,6 +1,9 @@
|
||||
"""Unit tests for the Signal markdown → plain text + textStyle converter."""
|
||||
|
||||
from nanobot.channels.signal import _markdown_to_signal
|
||||
import pytest
|
||||
|
||||
from nanobot.channels.signal import _markdown_to_signal, _partition_styles
|
||||
from nanobot.utils.helpers import split_message
|
||||
|
||||
|
||||
def _utf16_len(s: str) -> int:
|
||||
@ -351,3 +354,108 @@ def test_reported_daily_brief_pattern():
|
||||
assert sd.get("Local") == ["ITALIC"]
|
||||
assert sd.get("Quote of the Day") == ["BOLD"]
|
||||
assert_within_utf16_bounds(plain, styles)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Chunk redistribution
|
||||
#
|
||||
# split_message can break a long Signal payload into multiple chunks. The
|
||||
# style ranges from _markdown_to_signal are anchored to the full text, so
|
||||
# they must be redistributed per-chunk with rebased offsets — otherwise
|
||||
# styles for chunks 1..N are silently lost.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _resolve_chunk_styles(text: str, max_len: int) -> tuple[list[str], list[list[str]]]:
|
||||
"""Helper: full markdown → signal pipeline, including chunking."""
|
||||
plain, styles = _markdown_to_signal(text)
|
||||
chunks = split_message(plain, max_len) if plain else [""]
|
||||
return chunks, _partition_styles(plain, chunks, styles)
|
||||
|
||||
|
||||
def test_partition_styles_single_chunk_passthrough():
|
||||
plain, styles = _markdown_to_signal("**bold** plain *it*")
|
||||
parts = _partition_styles(plain, [plain], styles)
|
||||
assert parts == [styles]
|
||||
|
||||
|
||||
def test_partition_styles_no_styles():
|
||||
plain = "hello world"
|
||||
assert _partition_styles(plain, [plain], []) == [[]]
|
||||
assert _partition_styles(plain, ["hello", "world"], []) == [[], []]
|
||||
|
||||
|
||||
def test_partition_styles_drops_styles_outside_chunks():
|
||||
"""Whitespace trimmed by split_message must not carry a style range."""
|
||||
plain = "a b"
|
||||
# Fake a style spanning the trimmed whitespace only.
|
||||
chunks = ["a", "b"]
|
||||
parts = _partition_styles(plain, chunks, ["1:3:BOLD"])
|
||||
assert parts == [[], []]
|
||||
|
||||
|
||||
def test_partition_styles_long_message_preserves_chunk_one_styles():
|
||||
"""A bold span deep in the message must follow the message into chunk 1."""
|
||||
# Two ~30-char paragraphs separated by a blank line, then **tail**.
|
||||
line_a = "alpha " * 5 # 30 chars, ends with space
|
||||
line_b = "beta " * 5
|
||||
md = f"{line_a.strip()}\n\n{line_b.strip()}\n\n**tail**"
|
||||
plain, styles = _markdown_to_signal(md)
|
||||
# Force a split between the paragraphs.
|
||||
max_len = len(line_a.strip()) + 2 # fits paragraph A + the "\n\n"
|
||||
chunks = split_message(plain, max_len)
|
||||
assert len(chunks) >= 2, "test setup must produce a split"
|
||||
parts = _partition_styles(plain, chunks, styles)
|
||||
# The bold "tail" should land in the last chunk, with chunk-relative offset.
|
||||
final_chunk = chunks[-1]
|
||||
final_styles = parts[-1]
|
||||
assert any("BOLD" in s for s in final_styles)
|
||||
for entry in final_styles:
|
||||
s, ln, _ = entry.split(":", 2)
|
||||
start, length = int(s), int(ln)
|
||||
slice_ = final_chunk.encode("utf-16-le")[start * 2 : (start + length) * 2].decode("utf-16-le")
|
||||
assert slice_ == "tail"
|
||||
|
||||
|
||||
def test_partition_styles_chunk_zero_styles_unchanged():
|
||||
"""Styles entirely in chunk 0 keep their original offsets."""
|
||||
md = "**head** middle and **tail**"
|
||||
plain, styles = _markdown_to_signal(md)
|
||||
# Split so chunk 0 contains "head" and part of the rest, chunk 1 contains "tail".
|
||||
chunks = split_message(plain, 12)
|
||||
assert len(chunks) >= 2
|
||||
parts = _partition_styles(plain, chunks, styles)
|
||||
# "head" lives in chunk 0; assert its offset is unchanged (chunk 0 starts at 0).
|
||||
head_entries = [s for s in parts[0] if "BOLD" in s]
|
||||
assert any(s.startswith("0:4:") for s in head_entries)
|
||||
|
||||
|
||||
def test_partition_styles_with_non_bmp_chunk_offset():
|
||||
"""Chunk-start offsets must be expressed in UTF-16 code units."""
|
||||
# Emoji in chunk 0, bold in chunk 1.
|
||||
md = "🎉 alpha beta gamma\n\n**tail**"
|
||||
plain, styles = _markdown_to_signal(md)
|
||||
chunks = split_message(plain, 18)
|
||||
assert len(chunks) >= 2
|
||||
parts = _partition_styles(plain, chunks, styles)
|
||||
final_styles = parts[-1]
|
||||
assert any("BOLD" in s for s in final_styles)
|
||||
final_chunk = chunks[-1]
|
||||
for entry in final_styles:
|
||||
s, ln, _ = entry.split(":", 2)
|
||||
start, length = int(s), int(ln)
|
||||
slice_ = final_chunk.encode("utf-16-le")[start * 2 : (start + length) * 2].decode("utf-16-le")
|
||||
assert slice_ == "tail"
|
||||
|
||||
|
||||
def test_partition_styles_range_spanning_chunks_is_split():
|
||||
"""A style range that straddles a chunk boundary gets sliced into both chunks."""
|
||||
# Construct manually: plain = "abc def", style covers "abc def" (whole thing).
|
||||
plain = "abc def"
|
||||
chunks = split_message(plain, 4) # "abc" / "def"
|
||||
assert chunks == ["abc", "def"]
|
||||
parts = _partition_styles(plain, chunks, ["0:7:BOLD"])
|
||||
# Chunk 0 holds 0:3:BOLD, chunk 1 holds 0:3:BOLD (length=3 each, "def" only
|
||||
# since the space was trimmed by lstrip).
|
||||
assert parts[0] == ["0:3:BOLD"]
|
||||
assert parts[1] == ["0:3:BOLD"]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user