nanobot/tests/agent/test_session_manager_history.py

476 lines
17 KiB
Python

from nanobot.session.manager import Session, SessionManager
def _assert_no_orphans(history: list[dict]) -> None:
"""Assert every tool result in history has a matching assistant tool_call."""
declared = {
tc["id"]
for m in history if m.get("role") == "assistant"
for tc in (m.get("tool_calls") or [])
}
orphans = [
m.get("tool_call_id") for m in history
if m.get("role") == "tool" and m.get("tool_call_id") not in declared
]
assert orphans == [], f"orphan tool_call_ids: {orphans}"
def _tool_turn(prefix: str, idx: int) -> list[dict]:
"""Helper: one assistant with 2 tool_calls + 2 tool results."""
return [
{
"role": "assistant",
"content": None,
"tool_calls": [
{"id": f"{prefix}_{idx}_a", "type": "function", "function": {"name": "x", "arguments": "{}"}},
{"id": f"{prefix}_{idx}_b", "type": "function", "function": {"name": "y", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": f"{prefix}_{idx}_a", "name": "x", "content": "ok"},
{"role": "tool", "tool_call_id": f"{prefix}_{idx}_b", "name": "y", "content": "ok"},
]
def test_list_sessions_includes_metadata_title(tmp_path):
manager = SessionManager(tmp_path)
session = manager.get_or_create("websocket:chat-title")
session.metadata["title"] = "自动生成标题"
manager.save(session)
rows = manager.list_sessions()
assert rows[0]["key"] == "websocket:chat-title"
assert rows[0]["title"] == "自动生成标题"
def test_list_sessions_includes_user_preview(tmp_path):
manager = SessionManager(tmp_path)
session = manager.get_or_create("websocket:chat-preview")
session.add_message("user", "帮我总结一下 OpenAI 的最新硬件计划")
session.add_message("assistant", "可以,我会先查最新消息。")
manager.save(session)
rows = manager.list_sessions()
assert rows[0]["key"] == "websocket:chat-preview"
assert rows[0]["preview"] == "帮我总结一下 OpenAI 的最新硬件计划"
# --- Original regression test (from PR 2075) ---
def test_get_history_drops_orphan_tool_results_when_window_cuts_tool_calls():
session = Session(key="telegram:test")
session.messages.append({"role": "user", "content": "old turn"})
for i in range(20):
session.messages.extend(_tool_turn("old", i))
session.messages.append({"role": "user", "content": "problem turn"})
for i in range(25):
session.messages.extend(_tool_turn("cur", i))
session.messages.append({"role": "user", "content": "new telegram question"})
history = session.get_history(max_messages=100)
_assert_no_orphans(history)
# --- Positive test: legitimate pairs survive trimming ---
def test_legitimate_tool_pairs_preserved_after_trim():
"""Complete tool-call groups within the window must not be dropped."""
session = Session(key="test:positive")
session.messages.append({"role": "user", "content": "hello"})
for i in range(5):
session.messages.extend(_tool_turn("ok", i))
session.messages.append({"role": "assistant", "content": "done"})
history = session.get_history(max_messages=500)
_assert_no_orphans(history)
tool_ids = [m["tool_call_id"] for m in history if m.get("role") == "tool"]
assert len(tool_ids) == 10
assert history[0]["role"] == "user"
def test_retain_recent_legal_suffix_keeps_recent_messages():
session = Session(key="test:trim")
for i in range(10):
session.messages.append({"role": "user", "content": f"msg{i}"})
session.retain_recent_legal_suffix(4)
assert len(session.messages) == 4
assert session.messages[0]["content"] == "msg6"
assert session.messages[-1]["content"] == "msg9"
def test_retain_recent_legal_suffix_adjusts_last_consolidated():
session = Session(key="test:trim-cons")
for i in range(10):
session.messages.append({"role": "user", "content": f"msg{i}"})
session.last_consolidated = 7
session.retain_recent_legal_suffix(4)
assert len(session.messages) == 4
assert session.last_consolidated == 1
def test_retain_recent_legal_suffix_zero_clears_session():
session = Session(key="test:trim-zero")
for i in range(10):
session.messages.append({"role": "user", "content": f"msg{i}"})
session.last_consolidated = 5
session.retain_recent_legal_suffix(0)
assert session.messages == []
assert session.last_consolidated == 0
def test_retain_recent_legal_suffix_keeps_legal_tool_boundary():
session = Session(key="test:trim-tools")
session.messages.append({"role": "user", "content": "old"})
session.messages.extend(_tool_turn("old", 0))
session.messages.append({"role": "user", "content": "keep"})
session.messages.extend(_tool_turn("keep", 0))
session.messages.append({"role": "assistant", "content": "done"})
session.retain_recent_legal_suffix(4)
history = session.get_history(max_messages=500)
_assert_no_orphans(history)
assert history[0]["role"] == "user"
assert history[0]["content"] == "keep"
# --- last_consolidated > 0 ---
def test_orphan_trim_with_last_consolidated():
"""Orphan trimming works correctly when session is partially consolidated."""
session = Session(key="test:consolidated")
for i in range(10):
session.messages.append({"role": "user", "content": f"old {i}"})
session.messages.extend(_tool_turn("cons", i))
session.last_consolidated = 30
session.messages.append({"role": "user", "content": "recent"})
for i in range(15):
session.messages.extend(_tool_turn("new", i))
session.messages.append({"role": "user", "content": "latest"})
history = session.get_history(max_messages=20)
_assert_no_orphans(history)
assert all(m.get("role") != "tool" or m["tool_call_id"].startswith("new_") for m in history)
# --- Edge: no tool messages at all ---
def test_no_tool_messages_unchanged():
session = Session(key="test:plain")
for i in range(5):
session.messages.append({"role": "user", "content": f"q{i}"})
session.messages.append({"role": "assistant", "content": f"a{i}"})
history = session.get_history(max_messages=6)
assert len(history) == 6
_assert_no_orphans(history)
# --- Edge: all leading messages are orphan tool results ---
def test_all_orphan_prefix_stripped():
"""If the window starts with orphan tool results and nothing else, they're all dropped."""
session = Session(key="test:all-orphan")
session.messages.append({"role": "tool", "tool_call_id": "gone_1", "name": "x", "content": "ok"})
session.messages.append({"role": "tool", "tool_call_id": "gone_2", "name": "y", "content": "ok"})
session.messages.append({"role": "user", "content": "fresh start"})
session.messages.append({"role": "assistant", "content": "hi"})
history = session.get_history(max_messages=500)
_assert_no_orphans(history)
assert history[0]["role"] == "user"
assert len(history) == 2
# --- Edge: empty session ---
def test_empty_session_history():
session = Session(key="test:empty")
history = session.get_history(max_messages=500)
assert history == []
def test_get_history_preserves_reasoning_content():
session = Session(key="test:reasoning")
session.messages.append({"role": "user", "content": "hi"})
session.messages.append({
"role": "assistant",
"content": "done",
"reasoning_content": "hidden chain of thought",
"thinking_blocks": [{"type": "thinking", "thinking": "hidden chain of thought", "signature": "sig"}],
})
history = session.get_history(max_messages=500)
assert history == [
{"role": "user", "content": "hi"},
{
"role": "assistant",
"content": "done",
"reasoning_content": "hidden chain of thought",
"thinking_blocks": [{
"type": "thinking",
"thinking": "hidden chain of thought",
"signature": "sig",
}],
},
]
def test_get_history_annotates_user_turns_but_not_assistant_turns():
"""Only user turns carry the timestamp prefix.
Annotating assistant turns trains the model (via in-context examples) to
start its own replies with ``[Message Time: ...]``. User-side stamps are
enough to pin adjacent assistant replies for relative-time reasoning.
"""
session = Session(key="test:timestamps")
session.messages.append({
"role": "user",
"content": "10 点提醒是昨天发生的",
"timestamp": "2026-04-26T22:00:00",
})
session.messages.append({
"role": "assistant",
"content": "记下来了",
"timestamp": "2026-04-26T22:00:05",
})
history = session.get_history(max_messages=500, include_timestamps=True)
assert history == [
{
"role": "user",
"content": "[Message Time: 2026-04-26T22:00:00]\n10 点提醒是昨天发生的",
},
{
"role": "assistant",
"content": "记下来了",
},
]
def test_get_history_does_not_annotate_proactive_assistant_deliveries_with_timestamps():
"""Assistant-side timestamp examples can leak back into future replies."""
session = Session(key="test:proactive-timestamps")
session.messages.append({
"role": "assistant",
"content": "记得喝水",
"timestamp": "2026-04-26T15:00:00",
"_channel_delivery": True,
})
session.messages.append({
"role": "user",
"content": "",
"timestamp": "2026-04-26T18:00:00",
})
history = session.get_history(max_messages=500, include_timestamps=True)
assert history == [
{
"role": "assistant",
"content": "记得喝水",
},
{
"role": "user",
"content": "[Message Time: 2026-04-26T18:00:00]\n",
},
]
def test_get_history_does_not_annotate_tool_results_with_timestamps():
session = Session(key="test:tool-timestamps")
session.messages.append({"role": "user", "content": "run tool"})
session.messages.extend(_tool_turn("ts", 0))
session.messages[-1]["timestamp"] = "2026-04-26T22:00:10"
history = session.get_history(max_messages=500, include_timestamps=True)
tool_result = history[-1]
assert tool_result["role"] == "tool"
assert tool_result["content"] == "ok"
# --- Window cuts mid-group: assistant present but some tool results orphaned ---
def test_window_cuts_mid_tool_group():
"""If the window starts between an assistant's tool results, the partial group is trimmed."""
session = Session(key="test:mid-cut")
session.messages.append({"role": "user", "content": "setup"})
session.messages.append({
"role": "assistant", "content": None,
"tool_calls": [
{"id": "split_a", "type": "function", "function": {"name": "x", "arguments": "{}"}},
{"id": "split_b", "type": "function", "function": {"name": "y", "arguments": "{}"}},
],
})
session.messages.append({"role": "tool", "tool_call_id": "split_a", "name": "x", "content": "ok"})
session.messages.append({"role": "tool", "tool_call_id": "split_b", "name": "y", "content": "ok"})
session.messages.append({"role": "user", "content": "next"})
session.messages.extend(_tool_turn("intact", 0))
session.messages.append({"role": "assistant", "content": "final"})
# Window of 6 should cut off the "setup" user msg and the assistant with split_a/split_b,
# leaving orphan tool results for split_a at the front.
history = session.get_history(max_messages=6)
_assert_no_orphans(history)
# --- Image breadcrumbs: media kwarg is synthesized into content for replay ---
def test_get_history_synthesizes_image_breadcrumb_from_media_kwarg():
"""Persisted user turns carry image paths as a ``media`` kwarg; LLM
replay must still see an ``[image: path]`` breadcrumb so the assistant's
follow-up reply has a referent instead of trailing an empty user row."""
session = Session(key="test:media")
session.messages.append(
{"role": "user", "content": "look", "media": ["/m/a.png", "/m/b.png"]}
)
session.messages.append({"role": "assistant", "content": "nice"})
history = session.get_history(max_messages=500)
assert history == [
{"role": "user", "content": "look\n[image: /m/a.png]\n[image: /m/b.png]"},
{"role": "assistant", "content": "nice"},
]
def test_get_history_synthesizes_breadcrumb_for_image_only_turn():
"""Turns with no text but attached images must not replay as empty
strings — the LLM would otherwise see a bare user turn followed by an
unexplained assistant answer."""
session = Session(key="test:image-only")
session.messages.append({"role": "user", "content": "", "media": ["/m/pic.png"]})
session.messages.append({"role": "assistant", "content": "I see a cat"})
history = session.get_history(max_messages=500)
assert history[0] == {"role": "user", "content": "[image: /m/pic.png]"}
def test_get_history_ignores_media_kwarg_on_non_user_rows():
"""``media`` only ever appears on user entries in practice, but the
synthesizer must be defensive: assistants / tools with list content
don't get the breadcrumb pasted on top."""
session = Session(key="test:defensive")
session.messages.append(
{
"role": "assistant",
"content": [{"type": "text", "text": "structured"}],
"media": ["/m/x.png"], # nonsense but shouldn't crash
}
)
history = session.get_history(max_messages=500)
# List content is passed through verbatim — the synthesizer only
# rewrites plain-string content.
assert history[0]["content"] == [{"type": "text", "text": "structured"}]
def test_get_history_does_not_paste_assistant_media_paths_into_replay():
session = Session(key="test:assistant-media")
session.messages.append(
{
"role": "assistant",
"content": "来了 🎨",
"media": ["/home/user/.nanobot/media/generated/img_abc.png"],
}
)
history = session.get_history(max_messages=500)
assert history == [{"role": "assistant", "content": "来了 🎨"}]
def test_get_history_sanitizes_existing_assistant_replay_artifacts():
session = Session(key="test:polluted-assistant")
session.messages.append(
{
"role": "assistant",
"content": (
"[Message Time: 2026-05-09 00:33:48]\n"
"来了 🎨\n"
"[image: /home/user/.nanobot/media/generated/img_old.png]\n\n"
"generate_image(\"16:9\")\n"
"message(\"来了 🎨\")"
),
}
)
history = session.get_history(max_messages=500, include_timestamps=True)
assert history == [{"role": "assistant", "content": "来了 🎨"}]
def test_get_history_respects_max_tokens(monkeypatch):
session = Session(key="test:token-cap")
session.messages.extend(
[
{"role": "user", "content": "u1"},
{"role": "assistant", "content": "a1"},
{"role": "user", "content": "u2"},
{"role": "assistant", "content": "a2"},
{"role": "user", "content": "u3"},
{"role": "assistant", "content": "a3"},
]
)
token_map = {"u1": 50, "a1": 50, "u2": 50, "a2": 50, "u3": 50, "a3": 50}
monkeypatch.setattr(
"nanobot.session.manager.estimate_message_tokens",
lambda message: token_map.get(message.get("content"), 0),
)
history = session.get_history(max_messages=500, max_tokens=120)
assert [m["content"] for m in history] == ["u3", "a3"]
def test_get_history_recovers_user_when_token_slice_would_be_assistant_only(monkeypatch):
session = Session(key="test:assistant-only-slice")
session.messages.extend(
[
{"role": "user", "content": "u1"},
{"role": "assistant", "content": "a1"},
{"role": "user", "content": "u2"},
{"role": "assistant", "content": "a2"},
]
)
token_map = {"u1": 100, "a1": 100, "u2": 100, "a2": 100}
monkeypatch.setattr(
"nanobot.session.manager.estimate_message_tokens",
lambda message: token_map.get(message.get("content"), 0),
)
history = session.get_history(max_messages=500, max_tokens=100)
assert [m["content"] for m in history] == ["u2", "a2"]
def test_retain_recent_legal_suffix_hard_cap_with_long_non_user_chain():
session = Session(key="test:hard-cap-chain")
session.messages.append({"role": "user", "content": "u0"})
session.messages.append(
{
"role": "assistant",
"content": None,
"tool_calls": [
{"id": "c1", "type": "function", "function": {"name": "x", "arguments": "{}"}}
],
}
)
for i in range(12):
session.messages.append({"role": "assistant", "content": f"a{i}"})
session.retain_recent_legal_suffix(6)
assert len(session.messages) <= 6