From 908f1246d89f1fa50953900f1e7e7fd8a7ad251c Mon Sep 17 00:00:00 2001 From: chengyongru <2755839590@qq.com> Date: Fri, 8 May 2026 21:36:51 +0800 Subject: [PATCH] fix(cli): sanitize surrogate code points before entering message bus MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On Windows, prompt_toolkit produces lone surrogate code points (e.g. 🐈) for emoji input. These propagate through the message bus and crash at json.dumps() / file write time because surrogates cannot be encoded as UTF-8. Extract _sanitize_surrogates() that round-trips through UTF-16 to reconstruct paired surrogates into real characters (e.g. 🐈 β†’ 🐈), replacing unpaired surrogates with U+FFFD. Apply it at the CLI input path and reuse in SafeFileHistory. --- nanobot/cli/commands.py | 16 ++++++++++++--- tests/cli/test_safe_file_history.py | 32 +++++++++++++++++++++++++++-- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 453f40f42..1468d775d 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -50,6 +50,17 @@ from rich.text import Text from nanobot import __logo__, __version__ +def _sanitize_surrogates(text: str) -> str: + """Reconstruct surrogate pairs into real characters; replace lone surrogates. + + On Windows, console input may produce lone surrogate code points (e.g. + ``\\ud83d\\udc08`` for U+1F408). Round-tripping through UTF-16 reconstructs + paired surrogates into their actual characters and replaces unpaired ones + with U+FFFD. + """ + return text.encode("utf-16-le", errors="surrogatepass").decode("utf-16-le", errors="replace") + + class SafeFileHistory(FileHistory): """FileHistory subclass that sanitizes surrogate characters on write. @@ -59,8 +70,7 @@ class SafeFileHistory(FileHistory): """ def store_string(self, string: str) -> None: - safe = string.encode("utf-8", errors="surrogateescape").decode("utf-8", errors="replace") - super().store_string(safe) + super().store_string(_sanitize_surrogates(string)) from nanobot.cli.stream import StreamRenderer, ThinkingSpinner from nanobot.config.paths import get_workspace_path, is_default_workspace from nanobot.config.schema import Config @@ -1224,7 +1234,7 @@ def agent( # Stop spinner before user input to avoid prompt_toolkit conflicts if renderer: renderer.stop_for_input() - user_input = await _read_interactive_input_async() + user_input = _sanitize_surrogates(await _read_interactive_input_async()) command = user_input.strip() if not command: continue diff --git a/tests/cli/test_safe_file_history.py b/tests/cli/test_safe_file_history.py index 78b5e2339..5d4efe89e 100644 --- a/tests/cli/test_safe_file_history.py +++ b/tests/cli/test_safe_file_history.py @@ -3,12 +3,40 @@ Surrogate characters in CLI input must not crash history file writes. """ -from nanobot.cli.commands import SafeFileHistory +from nanobot.cli.commands import SafeFileHistory, _sanitize_surrogates + + +class TestSanitizeSurrogates: + def test_paired_surrogates_reconstructed(self): + """Windows console produces \\ud83d\\udc08 for U+1F408 β€” must be restored.""" + result = _sanitize_surrogates("δ½ δΈΊδ»€δΉˆδΌšη”¨ 🐈") + assert result == "δ½ δΈΊδ»€δΉˆδΌšη”¨ 🐈" + + def test_lone_surrogates_replaced(self): + result = _sanitize_surrogates("hello \udce9 world") + assert "\udce9" not in result + assert "hello" in result + assert "world" in result + + def test_normal_text_unchanged(self): + assert _sanitize_surrogates("normal ascii text") == "normal ascii text" + + def test_emoji_already_correct(self): + """Properly encoded emoji should pass through unchanged.""" + assert _sanitize_surrogates("hello 🐈 nanobot") == "hello 🐈 nanobot" + + def test_mixed_unicode_preserved(self): + assert _sanitize_surrogates("δ½ ε₯½ hello こんにけは πŸŽ‰") == "δ½ ε₯½ hello こんにけは πŸŽ‰" + + def test_multiple_lone_surrogates(self): + result = _sanitize_surrogates("\udce9\udcf1\udcff") + assert "\udce9" not in result + assert "\udcf1" not in result + assert "\udcff" not in result class TestSafeFileHistory: def test_surrogate_replaced(self, tmp_path): - """Surrogate pairs are replaced with U+FFFD, not crash.""" hist = SafeFileHistory(str(tmp_path / "history")) hist.store_string("hello \udce9 world") entries = list(hist.load_history_strings())