diff --git a/nanobot/channels/signal.py b/nanobot/channels/signal.py index 84bec425d..83999877c 100644 --- a/nanobot/channels/signal.py +++ b/nanobot/channels/signal.py @@ -208,6 +208,54 @@ def _markdown_to_signal(text: str) -> tuple[str, list[str]]: return plain_text, text_styles +def _partition_styles( + plain_text: str, chunks: list[str], text_styles: list[str] +) -> list[list[str]]: + """Partition Signal textStyle ranges across message chunks. + + ``split_message`` slices ``plain_text`` into pieces (optionally trimming + whitespace at the boundaries), but the style ranges produced by + ``_markdown_to_signal`` are expressed in UTF-16 offsets relative to the + full ``plain_text``. This redistributes them per chunk with offsets + rebased to each chunk's start. Ranges that span a boundary are split + across the chunks they touch; ranges that fall entirely in trimmed + whitespace are dropped. + """ + if not chunks: + return [] + if not text_styles: + return [[] for _ in chunks] + + # Locate each chunk's UTF-16 start in plain_text. split_message lstrips at + # boundaries (but not before the first chunk), so we skip whitespace + # between chunks to mirror that. + chunk_ranges: list[tuple[int, int]] = [] + cursor = 0 # Python codepoint cursor in plain_text + for i, chunk in enumerate(chunks): + if i > 0: + while cursor < len(plain_text) and plain_text[cursor].isspace(): + cursor += 1 + utf16_start = _utf16_len(plain_text[:cursor]) + utf16_end = utf16_start + _utf16_len(chunk) + chunk_ranges.append((utf16_start, utf16_end)) + cursor += len(chunk) + + result: list[list[str]] = [[] for _ in chunks] + for entry in text_styles: + s, ln, style = entry.split(":", 2) + r_start = int(s) + r_end = r_start + int(ln) + for i, (c_start, c_end) in enumerate(chunk_ranges): + if r_end <= c_start or r_start >= c_end: + continue + new_start = max(r_start, c_start) - c_start + new_end = min(r_end, c_end) - c_start + new_length = new_end - new_start + if new_length > 0: + result[i].append(f"{new_start}:{new_length}:{style}") + return result + + class SignalDMConfig(Base): """Signal DM policy configuration.""" @@ -392,10 +440,11 @@ class SignalChannel(BaseChannel): recipient_params = self._recipient_params(msg.chat_id) chunks = split_message(plain_text, self._MAX_MESSAGE_LEN) if plain_text else [""] + chunk_styles = _partition_styles(plain_text, chunks, text_styles) for i, chunk in enumerate(chunks): params: dict[str, Any] = {"message": chunk} - if text_styles and i == 0: - params["textStyle"] = text_styles + if chunk_styles[i]: + params["textStyle"] = chunk_styles[i] params.update(recipient_params) if msg.media and i == 0: params["attachments"] = msg.media diff --git a/tests/channels/test_signal_channel.py b/tests/channels/test_signal_channel.py index b5149459b..ecdeda334 100644 --- a/tests/channels/test_signal_channel.py +++ b/tests/channels/test_signal_channel.py @@ -834,6 +834,38 @@ class TestSend: assert "textStyle" in params assert any("BOLD" in s for s in params["textStyle"]) + @pytest.mark.asyncio + async def test_send_split_message_redistributes_text_styles(self): + """Long message split across chunks: each chunk gets its own textStyle + with offsets rebased to that chunk.""" + ch, client = self._make_send_channel() + ch._MAX_MESSAGE_LEN = 12 # type: ignore[attr-defined] + msg = OutboundMessage( + channel="signal", + chat_id="+19995550001", + content="**head** middle and **tail**", + ) + await ch.send(msg) + assert len(client.posts) >= 2 + # Chunk 0 has BOLD for "head"; chunk 1+ must also carry BOLD for "tail". + bold_chunks = [ + p["json"]["params"] + for p in client.posts + if any("BOLD" in s for s in p["json"]["params"].get("textStyle", [])) + ] + assert len(bold_chunks) >= 2, ( + "expected BOLD ranges in more than one chunk; got " + f"{[p['json']['params'] for p in client.posts]}" + ) + # Each emitted range must point inside its own chunk's text. + for params in bold_chunks: + chunk_text = params["message"] + for entry in params["textStyle"]: + s, ln, _ = entry.split(":", 2) + start, length = int(s), int(ln) + end_units = start + length + assert end_units <= len(chunk_text.encode("utf-16-le")) // 2 + @pytest.mark.asyncio async def test_send_empty_content_skips_rpc(self): ch, client = self._make_send_channel() diff --git a/tests/channels/test_signal_markdown.py b/tests/channels/test_signal_markdown.py index 36b75f163..55b081095 100644 --- a/tests/channels/test_signal_markdown.py +++ b/tests/channels/test_signal_markdown.py @@ -1,6 +1,9 @@ """Unit tests for the Signal markdown → plain text + textStyle converter.""" -from nanobot.channels.signal import _markdown_to_signal +import pytest + +from nanobot.channels.signal import _markdown_to_signal, _partition_styles +from nanobot.utils.helpers import split_message def _utf16_len(s: str) -> int: @@ -351,3 +354,108 @@ def test_reported_daily_brief_pattern(): assert sd.get("Local") == ["ITALIC"] assert sd.get("Quote of the Day") == ["BOLD"] assert_within_utf16_bounds(plain, styles) + + +# --------------------------------------------------------------------------- +# Chunk redistribution +# +# split_message can break a long Signal payload into multiple chunks. The +# style ranges from _markdown_to_signal are anchored to the full text, so +# they must be redistributed per-chunk with rebased offsets — otherwise +# styles for chunks 1..N are silently lost. +# --------------------------------------------------------------------------- + + +def _resolve_chunk_styles(text: str, max_len: int) -> tuple[list[str], list[list[str]]]: + """Helper: full markdown → signal pipeline, including chunking.""" + plain, styles = _markdown_to_signal(text) + chunks = split_message(plain, max_len) if plain else [""] + return chunks, _partition_styles(plain, chunks, styles) + + +def test_partition_styles_single_chunk_passthrough(): + plain, styles = _markdown_to_signal("**bold** plain *it*") + parts = _partition_styles(plain, [plain], styles) + assert parts == [styles] + + +def test_partition_styles_no_styles(): + plain = "hello world" + assert _partition_styles(plain, [plain], []) == [[]] + assert _partition_styles(plain, ["hello", "world"], []) == [[], []] + + +def test_partition_styles_drops_styles_outside_chunks(): + """Whitespace trimmed by split_message must not carry a style range.""" + plain = "a b" + # Fake a style spanning the trimmed whitespace only. + chunks = ["a", "b"] + parts = _partition_styles(plain, chunks, ["1:3:BOLD"]) + assert parts == [[], []] + + +def test_partition_styles_long_message_preserves_chunk_one_styles(): + """A bold span deep in the message must follow the message into chunk 1.""" + # Two ~30-char paragraphs separated by a blank line, then **tail**. + line_a = "alpha " * 5 # 30 chars, ends with space + line_b = "beta " * 5 + md = f"{line_a.strip()}\n\n{line_b.strip()}\n\n**tail**" + plain, styles = _markdown_to_signal(md) + # Force a split between the paragraphs. + max_len = len(line_a.strip()) + 2 # fits paragraph A + the "\n\n" + chunks = split_message(plain, max_len) + assert len(chunks) >= 2, "test setup must produce a split" + parts = _partition_styles(plain, chunks, styles) + # The bold "tail" should land in the last chunk, with chunk-relative offset. + final_chunk = chunks[-1] + final_styles = parts[-1] + assert any("BOLD" in s for s in final_styles) + for entry in final_styles: + s, ln, _ = entry.split(":", 2) + start, length = int(s), int(ln) + slice_ = final_chunk.encode("utf-16-le")[start * 2 : (start + length) * 2].decode("utf-16-le") + assert slice_ == "tail" + + +def test_partition_styles_chunk_zero_styles_unchanged(): + """Styles entirely in chunk 0 keep their original offsets.""" + md = "**head** middle and **tail**" + plain, styles = _markdown_to_signal(md) + # Split so chunk 0 contains "head" and part of the rest, chunk 1 contains "tail". + chunks = split_message(plain, 12) + assert len(chunks) >= 2 + parts = _partition_styles(plain, chunks, styles) + # "head" lives in chunk 0; assert its offset is unchanged (chunk 0 starts at 0). + head_entries = [s for s in parts[0] if "BOLD" in s] + assert any(s.startswith("0:4:") for s in head_entries) + + +def test_partition_styles_with_non_bmp_chunk_offset(): + """Chunk-start offsets must be expressed in UTF-16 code units.""" + # Emoji in chunk 0, bold in chunk 1. + md = "🎉 alpha beta gamma\n\n**tail**" + plain, styles = _markdown_to_signal(md) + chunks = split_message(plain, 18) + assert len(chunks) >= 2 + parts = _partition_styles(plain, chunks, styles) + final_styles = parts[-1] + assert any("BOLD" in s for s in final_styles) + final_chunk = chunks[-1] + for entry in final_styles: + s, ln, _ = entry.split(":", 2) + start, length = int(s), int(ln) + slice_ = final_chunk.encode("utf-16-le")[start * 2 : (start + length) * 2].decode("utf-16-le") + assert slice_ == "tail" + + +def test_partition_styles_range_spanning_chunks_is_split(): + """A style range that straddles a chunk boundary gets sliced into both chunks.""" + # Construct manually: plain = "abc def", style covers "abc def" (whole thing). + plain = "abc def" + chunks = split_message(plain, 4) # "abc" / "def" + assert chunks == ["abc", "def"] + parts = _partition_styles(plain, chunks, ["0:7:BOLD"]) + # Chunk 0 holds 0:3:BOLD, chunk 1 holds 0:3:BOLD (length=3 each, "def" only + # since the space was trimmed by lstrip). + assert parts[0] == ["0:3:BOLD"] + assert parts[1] == ["0:3:BOLD"]