nanobot/tests/agent/test_loop_save_turn.py
Xubin Ren b964a894d2 test(agent): cover early user-message persistence
Use session.add_message for the pre-turn user-message flush and add focused regression tests for crash-time persistence and duplicate-free successful saves.

Made-with: Cursor
2026-04-13 10:26:09 +08:00

265 lines
9.0 KiB
Python

from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from nanobot.agent.context import ContextBuilder
from nanobot.agent.loop import AgentLoop
from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.session.manager import Session
def _mk_loop() -> AgentLoop:
loop = AgentLoop.__new__(AgentLoop)
from nanobot.config.schema import AgentDefaults
loop.max_tool_result_chars = AgentDefaults().max_tool_result_chars
return loop
def _make_full_loop(tmp_path: Path) -> AgentLoop:
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
return AgentLoop(bus=MessageBus(), provider=provider, workspace=tmp_path, model="test-model")
def test_save_turn_skips_multimodal_user_when_only_runtime_context() -> None:
loop = _mk_loop()
session = Session(key="test:runtime-only")
runtime = ContextBuilder._RUNTIME_CONTEXT_TAG + "\nCurrent Time: now (UTC)"
loop._save_turn(
session,
[{"role": "user", "content": [{"type": "text", "text": runtime}]}],
skip=0,
)
assert session.messages == []
def test_save_turn_keeps_image_placeholder_with_path_after_runtime_strip() -> None:
loop = _mk_loop()
session = Session(key="test:image")
runtime = ContextBuilder._RUNTIME_CONTEXT_TAG + "\nCurrent Time: now (UTC)"
loop._save_turn(
session,
[{
"role": "user",
"content": [
{"type": "text", "text": runtime},
{"type": "image_url", "image_url": {"url": "data:image/png;base64,abc"}, "_meta": {"path": "/media/feishu/photo.jpg"}},
],
}],
skip=0,
)
assert session.messages[0]["content"] == [{"type": "text", "text": "[image: /media/feishu/photo.jpg]"}]
def test_save_turn_keeps_image_placeholder_without_meta() -> None:
loop = _mk_loop()
session = Session(key="test:image-no-meta")
runtime = ContextBuilder._RUNTIME_CONTEXT_TAG + "\nCurrent Time: now (UTC)"
loop._save_turn(
session,
[{
"role": "user",
"content": [
{"type": "text", "text": runtime},
{"type": "image_url", "image_url": {"url": "data:image/png;base64,abc"}},
],
}],
skip=0,
)
assert session.messages[0]["content"] == [{"type": "text", "text": "[image]"}]
def test_save_turn_keeps_tool_results_under_16k() -> None:
loop = _mk_loop()
session = Session(key="test:tool-result")
content = "x" * 12_000
loop._save_turn(
session,
[{"role": "tool", "tool_call_id": "call_1", "name": "read_file", "content": content}],
skip=0,
)
assert session.messages[0]["content"] == content
def test_restore_runtime_checkpoint_rehydrates_completed_and_pending_tools() -> None:
loop = _mk_loop()
session = Session(
key="test:checkpoint",
metadata={
AgentLoop._RUNTIME_CHECKPOINT_KEY: {
"assistant_message": {
"role": "assistant",
"content": "working",
"tool_calls": [
{
"id": "call_done",
"type": "function",
"function": {"name": "read_file", "arguments": "{}"},
},
{
"id": "call_pending",
"type": "function",
"function": {"name": "exec", "arguments": "{}"},
},
],
},
"completed_tool_results": [
{
"role": "tool",
"tool_call_id": "call_done",
"name": "read_file",
"content": "ok",
}
],
"pending_tool_calls": [
{
"id": "call_pending",
"type": "function",
"function": {"name": "exec", "arguments": "{}"},
}
],
}
},
)
restored = loop._restore_runtime_checkpoint(session)
assert restored is True
assert session.metadata.get(AgentLoop._RUNTIME_CHECKPOINT_KEY) is None
assert session.messages[0]["role"] == "assistant"
assert session.messages[1]["tool_call_id"] == "call_done"
assert session.messages[2]["tool_call_id"] == "call_pending"
assert "interrupted before this tool finished" in session.messages[2]["content"].lower()
def test_restore_runtime_checkpoint_dedupes_overlapping_tail() -> None:
loop = _mk_loop()
session = Session(
key="test:checkpoint-overlap",
messages=[
{
"role": "assistant",
"content": "working",
"tool_calls": [
{
"id": "call_done",
"type": "function",
"function": {"name": "read_file", "arguments": "{}"},
},
{
"id": "call_pending",
"type": "function",
"function": {"name": "exec", "arguments": "{}"},
},
],
},
{
"role": "tool",
"tool_call_id": "call_done",
"name": "read_file",
"content": "ok",
},
],
metadata={
AgentLoop._RUNTIME_CHECKPOINT_KEY: {
"assistant_message": {
"role": "assistant",
"content": "working",
"tool_calls": [
{
"id": "call_done",
"type": "function",
"function": {"name": "read_file", "arguments": "{}"},
},
{
"id": "call_pending",
"type": "function",
"function": {"name": "exec", "arguments": "{}"},
},
],
},
"completed_tool_results": [
{
"role": "tool",
"tool_call_id": "call_done",
"name": "read_file",
"content": "ok",
}
],
"pending_tool_calls": [
{
"id": "call_pending",
"type": "function",
"function": {"name": "exec", "arguments": "{}"},
}
],
}
},
)
restored = loop._restore_runtime_checkpoint(session)
assert restored is True
assert session.metadata.get(AgentLoop._RUNTIME_CHECKPOINT_KEY) is None
assert len(session.messages) == 3
assert session.messages[0]["role"] == "assistant"
assert session.messages[1]["tool_call_id"] == "call_done"
assert session.messages[2]["tool_call_id"] == "call_pending"
@pytest.mark.asyncio
async def test_process_message_persists_user_message_before_turn_completes(tmp_path: Path) -> None:
loop = _make_full_loop(tmp_path)
loop.consolidator.maybe_consolidate_by_tokens = AsyncMock(return_value=False) # type: ignore[method-assign]
loop._run_agent_loop = AsyncMock(side_effect=RuntimeError("boom")) # type: ignore[method-assign]
msg = InboundMessage(channel="feishu", sender_id="u1", chat_id="c1", content="persist me")
with pytest.raises(RuntimeError, match="boom"):
await loop._process_message(msg)
loop.sessions.invalidate("feishu:c1")
persisted = loop.sessions.get_or_create("feishu:c1")
assert [m["role"] for m in persisted.messages] == ["user"]
assert persisted.messages[0]["content"] == "persist me"
assert persisted.updated_at >= persisted.created_at
@pytest.mark.asyncio
async def test_process_message_does_not_duplicate_early_persisted_user_message(tmp_path: Path) -> None:
loop = _make_full_loop(tmp_path)
loop.consolidator.maybe_consolidate_by_tokens = AsyncMock(return_value=False) # type: ignore[method-assign]
loop._run_agent_loop = AsyncMock(return_value=(
"done",
None,
[
{"role": "system", "content": "system"},
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "done"},
],
"stop",
False,
)) # type: ignore[method-assign]
result = await loop._process_message(
InboundMessage(channel="feishu", sender_id="u1", chat_id="c2", content="hello")
)
assert result is not None
assert result.content == "done"
session = loop.sessions.get_or_create("feishu:c2")
assert [
{k: v for k, v in m.items() if k in {"role", "content"}}
for m in session.messages
] == [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "done"},
]