fix(cli): sanitize surrogate code points before entering message bus

On Windows, prompt_toolkit produces lone surrogate code points (e.g.
🐈) for emoji input. These propagate through the message bus
and crash at json.dumps() / file write time because surrogates cannot
be encoded as UTF-8.

Extract _sanitize_surrogates() that round-trips through UTF-16 to
reconstruct paired surrogates into real characters (e.g. 🐈🐈), replacing unpaired surrogates with U+FFFD. Apply it at the CLI
input path and reuse in SafeFileHistory.
This commit is contained in:
chengyongru 2026-05-08 21:36:51 +08:00 committed by Xubin Ren
parent bbdf1db30d
commit 908f1246d8
2 changed files with 43 additions and 5 deletions

View File

@ -50,6 +50,17 @@ from rich.text import Text
from nanobot import __logo__, __version__ from nanobot import __logo__, __version__
def _sanitize_surrogates(text: str) -> str:
"""Reconstruct surrogate pairs into real characters; replace lone surrogates.
On Windows, console input may produce lone surrogate code points (e.g.
``\\ud83d\\udc08`` for U+1F408). Round-tripping through UTF-16 reconstructs
paired surrogates into their actual characters and replaces unpaired ones
with U+FFFD.
"""
return text.encode("utf-16-le", errors="surrogatepass").decode("utf-16-le", errors="replace")
class SafeFileHistory(FileHistory): class SafeFileHistory(FileHistory):
"""FileHistory subclass that sanitizes surrogate characters on write. """FileHistory subclass that sanitizes surrogate characters on write.
@ -59,8 +70,7 @@ class SafeFileHistory(FileHistory):
""" """
def store_string(self, string: str) -> None: def store_string(self, string: str) -> None:
safe = string.encode("utf-8", errors="surrogateescape").decode("utf-8", errors="replace") super().store_string(_sanitize_surrogates(string))
super().store_string(safe)
from nanobot.cli.stream import StreamRenderer, ThinkingSpinner from nanobot.cli.stream import StreamRenderer, ThinkingSpinner
from nanobot.config.paths import get_workspace_path, is_default_workspace from nanobot.config.paths import get_workspace_path, is_default_workspace
from nanobot.config.schema import Config from nanobot.config.schema import Config
@ -1224,7 +1234,7 @@ def agent(
# Stop spinner before user input to avoid prompt_toolkit conflicts # Stop spinner before user input to avoid prompt_toolkit conflicts
if renderer: if renderer:
renderer.stop_for_input() renderer.stop_for_input()
user_input = await _read_interactive_input_async() user_input = _sanitize_surrogates(await _read_interactive_input_async())
command = user_input.strip() command = user_input.strip()
if not command: if not command:
continue continue

View File

@ -3,12 +3,40 @@
Surrogate characters in CLI input must not crash history file writes. Surrogate characters in CLI input must not crash history file writes.
""" """
from nanobot.cli.commands import SafeFileHistory from nanobot.cli.commands import SafeFileHistory, _sanitize_surrogates
class TestSanitizeSurrogates:
def test_paired_surrogates_reconstructed(self):
"""Windows console produces \\ud83d\\udc08 for U+1F408 — must be restored."""
result = _sanitize_surrogates("你为什么会用 🐈")
assert result == "你为什么会用 🐈"
def test_lone_surrogates_replaced(self):
result = _sanitize_surrogates("hello \udce9 world")
assert "\udce9" not in result
assert "hello" in result
assert "world" in result
def test_normal_text_unchanged(self):
assert _sanitize_surrogates("normal ascii text") == "normal ascii text"
def test_emoji_already_correct(self):
"""Properly encoded emoji should pass through unchanged."""
assert _sanitize_surrogates("hello 🐈 nanobot") == "hello 🐈 nanobot"
def test_mixed_unicode_preserved(self):
assert _sanitize_surrogates("你好 hello こんにちは 🎉") == "你好 hello こんにちは 🎉"
def test_multiple_lone_surrogates(self):
result = _sanitize_surrogates("\udce9\udcf1\udcff")
assert "\udce9" not in result
assert "\udcf1" not in result
assert "\udcff" not in result
class TestSafeFileHistory: class TestSafeFileHistory:
def test_surrogate_replaced(self, tmp_path): def test_surrogate_replaced(self, tmp_path):
"""Surrogate pairs are replaced with U+FFFD, not crash."""
hist = SafeFileHistory(str(tmp_path / "history")) hist = SafeFileHistory(str(tmp_path / "history"))
hist.store_string("hello \udce9 world") hist.store_string("hello \udce9 world")
entries = list(hist.load_history_strings()) entries = list(hist.load_history_strings())