mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-04-14 23:19:55 +00:00
2413 lines
83 KiB
Python
2413 lines
83 KiB
Python
"""Tests for the shared agent runner and its integration contracts."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import base64
|
|
import os
|
|
import time
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from nanobot.config.schema import AgentDefaults
|
|
from nanobot.agent.tools.base import Tool
|
|
from nanobot.agent.tools.registry import ToolRegistry
|
|
from nanobot.providers.base import LLMResponse, ToolCallRequest
|
|
|
|
_MAX_TOOL_RESULT_CHARS = AgentDefaults().max_tool_result_chars
|
|
|
|
|
|
def _make_loop(tmp_path):
|
|
from nanobot.agent.loop import AgentLoop
|
|
from nanobot.bus.queue import MessageBus
|
|
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
|
|
with patch("nanobot.agent.loop.ContextBuilder"), \
|
|
patch("nanobot.agent.loop.SessionManager"), \
|
|
patch("nanobot.agent.loop.SubagentManager") as MockSubMgr:
|
|
MockSubMgr.return_value.cancel_by_session = AsyncMock(return_value=0)
|
|
loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path)
|
|
return loop
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_preserves_reasoning_fields_and_tool_results():
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
captured_second_call: list[dict] = []
|
|
call_count = {"n": 0}
|
|
|
|
async def chat_with_retry(*, messages, **kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
return LLMResponse(
|
|
content="thinking",
|
|
tool_calls=[ToolCallRequest(id="call_1", name="list_dir", arguments={"path": "."})],
|
|
reasoning_content="hidden reasoning",
|
|
thinking_blocks=[{"type": "thinking", "thinking": "step"}],
|
|
usage={"prompt_tokens": 5, "completion_tokens": 3},
|
|
)
|
|
captured_second_call[:] = messages
|
|
return LLMResponse(content="done", tool_calls=[], usage={})
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
tools.execute = AsyncMock(return_value="tool result")
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[
|
|
{"role": "system", "content": "system"},
|
|
{"role": "user", "content": "do task"},
|
|
],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=3,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
))
|
|
|
|
assert result.final_content == "done"
|
|
assert result.tools_used == ["list_dir"]
|
|
assert result.tool_events == [
|
|
{"name": "list_dir", "status": "ok", "detail": "tool result"}
|
|
]
|
|
|
|
assistant_messages = [
|
|
msg for msg in captured_second_call
|
|
if msg.get("role") == "assistant" and msg.get("tool_calls")
|
|
]
|
|
assert len(assistant_messages) == 1
|
|
assert assistant_messages[0]["reasoning_content"] == "hidden reasoning"
|
|
assert assistant_messages[0]["thinking_blocks"] == [{"type": "thinking", "thinking": "step"}]
|
|
assert any(
|
|
msg.get("role") == "tool" and msg.get("content") == "tool result"
|
|
for msg in captured_second_call
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_calls_hooks_in_order():
|
|
from nanobot.agent.hook import AgentHook, AgentHookContext
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
call_count = {"n": 0}
|
|
events: list[tuple] = []
|
|
|
|
async def chat_with_retry(**kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
return LLMResponse(
|
|
content="thinking",
|
|
tool_calls=[ToolCallRequest(id="call_1", name="list_dir", arguments={"path": "."})],
|
|
)
|
|
return LLMResponse(content="done", tool_calls=[], usage={})
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
tools.execute = AsyncMock(return_value="tool result")
|
|
|
|
class RecordingHook(AgentHook):
|
|
async def before_iteration(self, context: AgentHookContext) -> None:
|
|
events.append(("before_iteration", context.iteration))
|
|
|
|
async def before_execute_tools(self, context: AgentHookContext) -> None:
|
|
events.append((
|
|
"before_execute_tools",
|
|
context.iteration,
|
|
[tc.name for tc in context.tool_calls],
|
|
))
|
|
|
|
async def after_iteration(self, context: AgentHookContext) -> None:
|
|
events.append((
|
|
"after_iteration",
|
|
context.iteration,
|
|
context.final_content,
|
|
list(context.tool_results),
|
|
list(context.tool_events),
|
|
context.stop_reason,
|
|
))
|
|
|
|
def finalize_content(self, context: AgentHookContext, content: str | None) -> str | None:
|
|
events.append(("finalize_content", context.iteration, content))
|
|
return content.upper() if content else content
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=3,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
hook=RecordingHook(),
|
|
))
|
|
|
|
assert result.final_content == "DONE"
|
|
assert events == [
|
|
("before_iteration", 0),
|
|
("before_execute_tools", 0, ["list_dir"]),
|
|
(
|
|
"after_iteration",
|
|
0,
|
|
None,
|
|
["tool result"],
|
|
[{"name": "list_dir", "status": "ok", "detail": "tool result"}],
|
|
None,
|
|
),
|
|
("before_iteration", 1),
|
|
("finalize_content", 1, "done"),
|
|
("after_iteration", 1, "DONE", [], [], "completed"),
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_streaming_hook_receives_deltas_and_end_signal():
|
|
from nanobot.agent.hook import AgentHook, AgentHookContext
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
streamed: list[str] = []
|
|
endings: list[bool] = []
|
|
|
|
async def chat_stream_with_retry(*, on_content_delta, **kwargs):
|
|
await on_content_delta("he")
|
|
await on_content_delta("llo")
|
|
return LLMResponse(content="hello", tool_calls=[], usage={})
|
|
|
|
provider.chat_stream_with_retry = chat_stream_with_retry
|
|
provider.chat_with_retry = AsyncMock()
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
|
|
class StreamingHook(AgentHook):
|
|
def wants_streaming(self) -> bool:
|
|
return True
|
|
|
|
async def on_stream(self, context: AgentHookContext, delta: str) -> None:
|
|
streamed.append(delta)
|
|
|
|
async def on_stream_end(self, context: AgentHookContext, *, resuming: bool) -> None:
|
|
endings.append(resuming)
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=1,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
hook=StreamingHook(),
|
|
))
|
|
|
|
assert result.final_content == "hello"
|
|
assert streamed == ["he", "llo"]
|
|
assert endings == [False]
|
|
provider.chat_with_retry.assert_not_awaited()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_returns_max_iterations_fallback():
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(
|
|
content="still working",
|
|
tool_calls=[ToolCallRequest(id="call_1", name="list_dir", arguments={"path": "."})],
|
|
))
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
tools.execute = AsyncMock(return_value="tool result")
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=2,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
))
|
|
|
|
assert result.stop_reason == "max_iterations"
|
|
assert result.final_content == (
|
|
"I reached the maximum number of tool call iterations (2) "
|
|
"without completing the task. You can try breaking the task into smaller steps."
|
|
)
|
|
assert result.messages[-1]["role"] == "assistant"
|
|
assert result.messages[-1]["content"] == result.final_content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_returns_structured_tool_error():
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(
|
|
content="working",
|
|
tool_calls=[ToolCallRequest(id="call_1", name="list_dir", arguments={})],
|
|
))
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
tools.execute = AsyncMock(side_effect=RuntimeError("boom"))
|
|
|
|
runner = AgentRunner(provider)
|
|
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=2,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
fail_on_tool_error=True,
|
|
))
|
|
|
|
assert result.stop_reason == "tool_error"
|
|
assert result.error == "Error: RuntimeError: boom"
|
|
assert result.tool_events == [
|
|
{"name": "list_dir", "status": "error", "detail": "boom"}
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_persists_large_tool_results_for_follow_up_calls(tmp_path):
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
captured_second_call: list[dict] = []
|
|
call_count = {"n": 0}
|
|
|
|
async def chat_with_retry(*, messages, **kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
return LLMResponse(
|
|
content="working",
|
|
tool_calls=[ToolCallRequest(id="call_big", name="list_dir", arguments={"path": "."})],
|
|
usage={"prompt_tokens": 5, "completion_tokens": 3},
|
|
)
|
|
captured_second_call[:] = messages
|
|
return LLMResponse(content="done", tool_calls=[], usage={})
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
tools.execute = AsyncMock(return_value="x" * 20_000)
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[{"role": "user", "content": "do task"}],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=2,
|
|
workspace=tmp_path,
|
|
session_key="test:runner",
|
|
max_tool_result_chars=2048,
|
|
))
|
|
|
|
assert result.final_content == "done"
|
|
tool_message = next(msg for msg in captured_second_call if msg.get("role") == "tool")
|
|
assert "[tool output persisted]" in tool_message["content"]
|
|
assert "tool-results" in tool_message["content"]
|
|
assert (tmp_path / ".nanobot" / "tool-results" / "test_runner" / "call_big.txt").exists()
|
|
|
|
|
|
def test_persist_tool_result_prunes_old_session_buckets(tmp_path):
|
|
from nanobot.utils.helpers import maybe_persist_tool_result
|
|
|
|
root = tmp_path / ".nanobot" / "tool-results"
|
|
old_bucket = root / "old_session"
|
|
recent_bucket = root / "recent_session"
|
|
old_bucket.mkdir(parents=True)
|
|
recent_bucket.mkdir(parents=True)
|
|
(old_bucket / "old.txt").write_text("old", encoding="utf-8")
|
|
(recent_bucket / "recent.txt").write_text("recent", encoding="utf-8")
|
|
|
|
stale = time.time() - (8 * 24 * 60 * 60)
|
|
os.utime(old_bucket, (stale, stale))
|
|
os.utime(old_bucket / "old.txt", (stale, stale))
|
|
|
|
persisted = maybe_persist_tool_result(
|
|
tmp_path,
|
|
"current:session",
|
|
"call_big",
|
|
"x" * 5000,
|
|
max_chars=64,
|
|
)
|
|
|
|
assert "[tool output persisted]" in persisted
|
|
assert not old_bucket.exists()
|
|
assert recent_bucket.exists()
|
|
assert (root / "current_session" / "call_big.txt").exists()
|
|
|
|
|
|
def test_persist_tool_result_leaves_no_temp_files(tmp_path):
|
|
from nanobot.utils.helpers import maybe_persist_tool_result
|
|
|
|
root = tmp_path / ".nanobot" / "tool-results"
|
|
maybe_persist_tool_result(
|
|
tmp_path,
|
|
"current:session",
|
|
"call_big",
|
|
"x" * 5000,
|
|
max_chars=64,
|
|
)
|
|
|
|
assert (root / "current_session" / "call_big.txt").exists()
|
|
assert list((root / "current_session").glob("*.tmp")) == []
|
|
|
|
|
|
def test_persist_tool_result_logs_cleanup_failures(monkeypatch, tmp_path):
|
|
from nanobot.utils.helpers import maybe_persist_tool_result
|
|
|
|
warnings: list[str] = []
|
|
|
|
monkeypatch.setattr(
|
|
"nanobot.utils.helpers._cleanup_tool_result_buckets",
|
|
lambda *_args, **_kwargs: (_ for _ in ()).throw(OSError("busy")),
|
|
)
|
|
monkeypatch.setattr(
|
|
"nanobot.utils.helpers.logger.warning",
|
|
lambda message, *args: warnings.append(message.format(*args)),
|
|
)
|
|
|
|
persisted = maybe_persist_tool_result(
|
|
tmp_path,
|
|
"current:session",
|
|
"call_big",
|
|
"x" * 5000,
|
|
max_chars=64,
|
|
)
|
|
|
|
assert "[tool output persisted]" in persisted
|
|
assert warnings and "Failed to clean stale tool result buckets" in warnings[0]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_replaces_empty_tool_result_with_marker():
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
captured_second_call: list[dict] = []
|
|
call_count = {"n": 0}
|
|
|
|
async def chat_with_retry(*, messages, **kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
return LLMResponse(
|
|
content="working",
|
|
tool_calls=[ToolCallRequest(id="call_1", name="noop", arguments={})],
|
|
usage={},
|
|
)
|
|
captured_second_call[:] = messages
|
|
return LLMResponse(content="done", tool_calls=[], usage={})
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
tools.execute = AsyncMock(return_value="")
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[{"role": "user", "content": "do task"}],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=2,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
))
|
|
|
|
assert result.final_content == "done"
|
|
tool_message = next(msg for msg in captured_second_call if msg.get("role") == "tool")
|
|
assert tool_message["content"] == "(noop completed with no output)"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_uses_raw_messages_when_context_governance_fails():
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
captured_messages: list[dict] = []
|
|
|
|
async def chat_with_retry(*, messages, **kwargs):
|
|
captured_messages[:] = messages
|
|
return LLMResponse(content="done", tool_calls=[], usage={})
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
initial_messages = [
|
|
{"role": "system", "content": "system"},
|
|
{"role": "user", "content": "hello"},
|
|
]
|
|
|
|
runner = AgentRunner(provider)
|
|
runner._snip_history = MagicMock(side_effect=RuntimeError("boom")) # type: ignore[method-assign]
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=initial_messages,
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=1,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
))
|
|
|
|
assert result.final_content == "done"
|
|
assert captured_messages == initial_messages
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_retries_empty_final_response_with_summary_prompt():
|
|
"""Empty responses get 2 silent retries before finalization kicks in."""
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
calls: list[dict] = []
|
|
|
|
async def chat_with_retry(*, messages, tools=None, **kwargs):
|
|
calls.append({"messages": messages, "tools": tools})
|
|
if len(calls) <= 2:
|
|
return LLMResponse(
|
|
content=None,
|
|
tool_calls=[],
|
|
usage={"prompt_tokens": 5, "completion_tokens": 1},
|
|
)
|
|
return LLMResponse(
|
|
content="final answer",
|
|
tool_calls=[],
|
|
usage={"prompt_tokens": 3, "completion_tokens": 7},
|
|
)
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[{"role": "user", "content": "do task"}],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=3,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
))
|
|
|
|
assert result.final_content == "final answer"
|
|
# 2 silent retries (iterations 0,1) + finalization on iteration 1
|
|
assert len(calls) == 3
|
|
assert calls[0]["tools"] is not None
|
|
assert calls[1]["tools"] is not None
|
|
assert calls[2]["tools"] is None
|
|
assert result.usage["prompt_tokens"] == 13
|
|
assert result.usage["completion_tokens"] == 9
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_uses_specific_message_after_empty_finalization_retry():
|
|
"""After silent retries + finalization all return empty, stop_reason is empty_final_response."""
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
from nanobot.utils.runtime import EMPTY_FINAL_RESPONSE_MESSAGE
|
|
|
|
provider = MagicMock()
|
|
|
|
async def chat_with_retry(*, messages, **kwargs):
|
|
return LLMResponse(content=None, tool_calls=[], usage={})
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[{"role": "user", "content": "do task"}],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=3,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
))
|
|
|
|
assert result.final_content == EMPTY_FINAL_RESPONSE_MESSAGE
|
|
assert result.stop_reason == "empty_final_response"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_empty_response_does_not_break_tool_chain():
|
|
"""An empty intermediate response must not kill an ongoing tool chain.
|
|
|
|
Sequence: tool_call → empty → tool_call → final text.
|
|
The runner should recover via silent retry and complete normally.
|
|
"""
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
call_count = 0
|
|
|
|
async def chat_with_retry(*, messages, tools=None, **kwargs):
|
|
nonlocal call_count
|
|
call_count += 1
|
|
if call_count == 1:
|
|
return LLMResponse(
|
|
content=None,
|
|
tool_calls=[ToolCallRequest(id="tc1", name="read_file", arguments={"path": "a.txt"})],
|
|
usage={"prompt_tokens": 10, "completion_tokens": 5},
|
|
)
|
|
if call_count == 2:
|
|
return LLMResponse(content=None, tool_calls=[], usage={"prompt_tokens": 10, "completion_tokens": 1})
|
|
if call_count == 3:
|
|
return LLMResponse(
|
|
content=None,
|
|
tool_calls=[ToolCallRequest(id="tc2", name="read_file", arguments={"path": "b.txt"})],
|
|
usage={"prompt_tokens": 10, "completion_tokens": 5},
|
|
)
|
|
return LLMResponse(
|
|
content="Here are the results.",
|
|
tool_calls=[],
|
|
usage={"prompt_tokens": 10, "completion_tokens": 10},
|
|
)
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
provider.chat_stream_with_retry = chat_with_retry
|
|
|
|
async def fake_tool(name, args, **kw):
|
|
return "file content"
|
|
|
|
tool_registry = MagicMock()
|
|
tool_registry.get_definitions.return_value = [{"type": "function", "function": {"name": "read_file"}}]
|
|
tool_registry.execute = AsyncMock(side_effect=fake_tool)
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[{"role": "user", "content": "read both files"}],
|
|
tools=tool_registry,
|
|
model="test-model",
|
|
max_iterations=10,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
))
|
|
|
|
assert result.final_content == "Here are the results."
|
|
assert result.stop_reason == "completed"
|
|
assert call_count == 4
|
|
assert "read_file" in result.tools_used
|
|
|
|
|
|
def test_snip_history_drops_orphaned_tool_results_from_trimmed_slice(monkeypatch):
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
runner = AgentRunner(provider)
|
|
messages = [
|
|
{"role": "system", "content": "system"},
|
|
{"role": "user", "content": "old user"},
|
|
{
|
|
"role": "assistant",
|
|
"content": "tool call",
|
|
"tool_calls": [{"id": "call_1", "type": "function", "function": {"name": "ls", "arguments": "{}"}}],
|
|
},
|
|
{"role": "tool", "tool_call_id": "call_1", "content": "tool output"},
|
|
{"role": "assistant", "content": "after tool"},
|
|
]
|
|
spec = AgentRunSpec(
|
|
initial_messages=messages,
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=1,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
context_window_tokens=2000,
|
|
context_block_limit=100,
|
|
)
|
|
|
|
monkeypatch.setattr("nanobot.agent.runner.estimate_prompt_tokens_chain", lambda *_args, **_kwargs: (500, None))
|
|
token_sizes = {
|
|
"old user": 120,
|
|
"tool call": 120,
|
|
"tool output": 40,
|
|
"after tool": 40,
|
|
"system": 0,
|
|
}
|
|
monkeypatch.setattr(
|
|
"nanobot.agent.runner.estimate_message_tokens",
|
|
lambda msg: token_sizes.get(str(msg.get("content")), 40),
|
|
)
|
|
|
|
trimmed = runner._snip_history(spec, messages)
|
|
|
|
assert trimmed == [
|
|
{"role": "system", "content": "system"},
|
|
{"role": "assistant", "content": "after tool"},
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_keeps_going_when_tool_result_persistence_fails():
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
captured_second_call: list[dict] = []
|
|
call_count = {"n": 0}
|
|
|
|
async def chat_with_retry(*, messages, **kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
return LLMResponse(
|
|
content="working",
|
|
tool_calls=[ToolCallRequest(id="call_1", name="list_dir", arguments={"path": "."})],
|
|
usage={"prompt_tokens": 5, "completion_tokens": 3},
|
|
)
|
|
captured_second_call[:] = messages
|
|
return LLMResponse(content="done", tool_calls=[], usage={})
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
tools.execute = AsyncMock(return_value="tool result")
|
|
|
|
runner = AgentRunner(provider)
|
|
with patch("nanobot.agent.runner.maybe_persist_tool_result", side_effect=RuntimeError("disk full")):
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[{"role": "user", "content": "do task"}],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=2,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
))
|
|
|
|
assert result.final_content == "done"
|
|
tool_message = next(msg for msg in captured_second_call if msg.get("role") == "tool")
|
|
assert tool_message["content"] == "tool result"
|
|
|
|
|
|
class _DelayTool(Tool):
|
|
def __init__(self, name: str, *, delay: float, read_only: bool, shared_events: list[str]):
|
|
self._name = name
|
|
self._delay = delay
|
|
self._read_only = read_only
|
|
self._shared_events = shared_events
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return self._name
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
return self._name
|
|
|
|
@property
|
|
def parameters(self) -> dict:
|
|
return {"type": "object", "properties": {}, "required": []}
|
|
|
|
@property
|
|
def read_only(self) -> bool:
|
|
return self._read_only
|
|
|
|
async def execute(self, **kwargs):
|
|
self._shared_events.append(f"start:{self._name}")
|
|
await asyncio.sleep(self._delay)
|
|
self._shared_events.append(f"end:{self._name}")
|
|
return self._name
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_batches_read_only_tools_before_exclusive_work():
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
tools = ToolRegistry()
|
|
shared_events: list[str] = []
|
|
read_a = _DelayTool("read_a", delay=0.05, read_only=True, shared_events=shared_events)
|
|
read_b = _DelayTool("read_b", delay=0.05, read_only=True, shared_events=shared_events)
|
|
write_a = _DelayTool("write_a", delay=0.01, read_only=False, shared_events=shared_events)
|
|
tools.register(read_a)
|
|
tools.register(read_b)
|
|
tools.register(write_a)
|
|
|
|
runner = AgentRunner(MagicMock())
|
|
await runner._execute_tools(
|
|
AgentRunSpec(
|
|
initial_messages=[],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=1,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
concurrent_tools=True,
|
|
),
|
|
[
|
|
ToolCallRequest(id="ro1", name="read_a", arguments={}),
|
|
ToolCallRequest(id="ro2", name="read_b", arguments={}),
|
|
ToolCallRequest(id="rw1", name="write_a", arguments={}),
|
|
],
|
|
{},
|
|
)
|
|
|
|
assert shared_events[0:2] == ["start:read_a", "start:read_b"]
|
|
assert "end:read_a" in shared_events and "end:read_b" in shared_events
|
|
assert shared_events.index("end:read_a") < shared_events.index("start:write_a")
|
|
assert shared_events.index("end:read_b") < shared_events.index("start:write_a")
|
|
assert shared_events[-2:] == ["start:write_a", "end:write_a"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_blocks_repeated_external_fetches():
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
captured_final_call: list[dict] = []
|
|
call_count = {"n": 0}
|
|
|
|
async def chat_with_retry(*, messages, **kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] <= 3:
|
|
return LLMResponse(
|
|
content="working",
|
|
tool_calls=[ToolCallRequest(id=f"call_{call_count['n']}", name="web_fetch", arguments={"url": "https://example.com"})],
|
|
usage={},
|
|
)
|
|
captured_final_call[:] = messages
|
|
return LLMResponse(content="done", tool_calls=[], usage={})
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
tools.execute = AsyncMock(return_value="page content")
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[{"role": "user", "content": "research task"}],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=4,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
))
|
|
|
|
assert result.final_content == "done"
|
|
assert tools.execute.await_count == 2
|
|
blocked_tool_message = [
|
|
msg for msg in captured_final_call
|
|
if msg.get("role") == "tool" and msg.get("tool_call_id") == "call_3"
|
|
][0]
|
|
assert "repeated external lookup blocked" in blocked_tool_message["content"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_loop_max_iterations_message_stays_stable(tmp_path):
|
|
loop = _make_loop(tmp_path)
|
|
loop.provider.chat_with_retry = AsyncMock(return_value=LLMResponse(
|
|
content="working",
|
|
tool_calls=[ToolCallRequest(id="call_1", name="list_dir", arguments={})],
|
|
))
|
|
loop.tools.get_definitions = MagicMock(return_value=[])
|
|
loop.tools.execute = AsyncMock(return_value="ok")
|
|
loop.max_iterations = 2
|
|
|
|
final_content, _, _, _, _ = await loop._run_agent_loop([])
|
|
|
|
assert final_content == (
|
|
"I reached the maximum number of tool call iterations (2) "
|
|
"without completing the task. You can try breaking the task into smaller steps."
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_loop_stream_filter_handles_think_only_prefix_without_crashing(tmp_path):
|
|
loop = _make_loop(tmp_path)
|
|
deltas: list[str] = []
|
|
endings: list[bool] = []
|
|
|
|
async def chat_stream_with_retry(*, on_content_delta, **kwargs):
|
|
await on_content_delta("<think>hidden")
|
|
await on_content_delta("</think>Hello")
|
|
return LLMResponse(content="<think>hidden</think>Hello", tool_calls=[], usage={})
|
|
|
|
loop.provider.chat_stream_with_retry = chat_stream_with_retry
|
|
|
|
async def on_stream(delta: str) -> None:
|
|
deltas.append(delta)
|
|
|
|
async def on_stream_end(*, resuming: bool = False) -> None:
|
|
endings.append(resuming)
|
|
|
|
final_content, _, _, _, _ = await loop._run_agent_loop(
|
|
[],
|
|
on_stream=on_stream,
|
|
on_stream_end=on_stream_end,
|
|
)
|
|
|
|
assert final_content == "Hello"
|
|
assert deltas == ["Hello"]
|
|
assert endings == [False]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_loop_retries_think_only_final_response(tmp_path):
|
|
loop = _make_loop(tmp_path)
|
|
call_count = {"n": 0}
|
|
|
|
async def chat_with_retry(**kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
return LLMResponse(content="<think>hidden</think>", tool_calls=[], usage={})
|
|
return LLMResponse(content="Recovered answer", tool_calls=[], usage={})
|
|
|
|
loop.provider.chat_with_retry = chat_with_retry
|
|
|
|
final_content, _, _, _, _ = await loop._run_agent_loop([])
|
|
|
|
assert final_content == "Recovered answer"
|
|
assert call_count["n"] == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_llm_error_not_appended_to_session_messages():
|
|
"""When LLM returns finish_reason='error', the error content must NOT be
|
|
appended to the messages list (prevents polluting session history)."""
|
|
from nanobot.agent.runner import (
|
|
AgentRunSpec,
|
|
AgentRunner,
|
|
_PERSISTED_MODEL_ERROR_PLACEHOLDER,
|
|
)
|
|
|
|
provider = MagicMock()
|
|
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(
|
|
content="429 rate limit exceeded", finish_reason="error", tool_calls=[], usage={},
|
|
))
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[{"role": "user", "content": "hello"}],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=5,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
))
|
|
|
|
assert result.stop_reason == "error"
|
|
assert result.final_content == "429 rate limit exceeded"
|
|
assistant_msgs = [m for m in result.messages if m.get("role") == "assistant"]
|
|
assert all("429" not in (m.get("content") or "") for m in assistant_msgs), \
|
|
"Error content should not appear in session messages"
|
|
assert assistant_msgs[-1]["content"] == _PERSISTED_MODEL_ERROR_PLACEHOLDER
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_streamed_flag_not_set_on_llm_error(tmp_path):
|
|
"""When LLM errors during a streaming-capable channel interaction,
|
|
_streamed must NOT be set so ChannelManager delivers the error."""
|
|
from nanobot.agent.loop import AgentLoop
|
|
from nanobot.bus.events import InboundMessage
|
|
from nanobot.bus.queue import MessageBus
|
|
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model")
|
|
error_resp = LLMResponse(
|
|
content="503 service unavailable", finish_reason="error", tool_calls=[], usage={},
|
|
)
|
|
loop.provider.chat_with_retry = AsyncMock(return_value=error_resp)
|
|
loop.provider.chat_stream_with_retry = AsyncMock(return_value=error_resp)
|
|
loop.tools.get_definitions = MagicMock(return_value=[])
|
|
|
|
msg = InboundMessage(
|
|
channel="feishu", sender_id="u1", chat_id="c1", content="hi",
|
|
)
|
|
result = await loop._process_message(
|
|
msg,
|
|
on_stream=AsyncMock(),
|
|
on_stream_end=AsyncMock(),
|
|
)
|
|
|
|
assert result is not None
|
|
assert "503" in result.content
|
|
assert not result.metadata.get("_streamed"), \
|
|
"_streamed must not be set when stop_reason is error"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_next_turn_after_llm_error_keeps_turn_boundary(tmp_path):
|
|
from nanobot.agent.loop import AgentLoop
|
|
from nanobot.agent.runner import _PERSISTED_MODEL_ERROR_PLACEHOLDER
|
|
from nanobot.bus.events import InboundMessage
|
|
from nanobot.bus.queue import MessageBus
|
|
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
provider.chat_with_retry = AsyncMock(side_effect=[
|
|
LLMResponse(content="429 rate limit exceeded", finish_reason="error", tool_calls=[], usage={}),
|
|
LLMResponse(content="Recovered answer", tool_calls=[], usage={}),
|
|
])
|
|
|
|
loop = AgentLoop(bus=MessageBus(), provider=provider, workspace=tmp_path, model="test-model")
|
|
loop.tools.get_definitions = MagicMock(return_value=[])
|
|
loop.consolidator.maybe_consolidate_by_tokens = AsyncMock(return_value=False) # type: ignore[method-assign]
|
|
|
|
first = await loop._process_message(
|
|
InboundMessage(channel="cli", sender_id="user", chat_id="test", content="first question")
|
|
)
|
|
assert first is not None
|
|
assert first.content == "429 rate limit exceeded"
|
|
|
|
session = loop.sessions.get_or_create("cli:test")
|
|
assert [
|
|
{key: value for key, value in message.items() if key in {"role", "content"}}
|
|
for message in session.messages
|
|
] == [
|
|
{"role": "user", "content": "first question"},
|
|
{"role": "assistant", "content": _PERSISTED_MODEL_ERROR_PLACEHOLDER},
|
|
]
|
|
|
|
second = await loop._process_message(
|
|
InboundMessage(channel="cli", sender_id="user", chat_id="test", content="second question")
|
|
)
|
|
assert second is not None
|
|
assert second.content == "Recovered answer"
|
|
|
|
request_messages = provider.chat_with_retry.await_args_list[1].kwargs["messages"]
|
|
non_system = [message for message in request_messages if message.get("role") != "system"]
|
|
assert non_system[0] == {"role": "user", "content": "first question"}
|
|
assert non_system[1] == {
|
|
"role": "assistant",
|
|
"content": _PERSISTED_MODEL_ERROR_PLACEHOLDER,
|
|
}
|
|
assert non_system[2]["role"] == "user"
|
|
assert "second question" in non_system[2]["content"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_tool_error_sets_final_content():
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
|
|
async def chat_with_retry(*, messages, **kwargs):
|
|
return LLMResponse(
|
|
content="working",
|
|
tool_calls=[ToolCallRequest(id="call_1", name="read_file", arguments={"path": "x"})],
|
|
usage={},
|
|
)
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
tools.execute = AsyncMock(side_effect=RuntimeError("boom"))
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[{"role": "user", "content": "do task"}],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=1,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
fail_on_tool_error=True,
|
|
))
|
|
|
|
assert result.final_content == "Error: RuntimeError: boom"
|
|
assert result.stop_reason == "tool_error"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_subagent_max_iterations_announces_existing_fallback(tmp_path, monkeypatch):
|
|
from nanobot.agent.subagent import SubagentManager
|
|
from nanobot.bus.queue import MessageBus
|
|
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(
|
|
content="working",
|
|
tool_calls=[ToolCallRequest(id="call_1", name="list_dir", arguments={"path": "."})],
|
|
))
|
|
mgr = SubagentManager(
|
|
provider=provider,
|
|
workspace=tmp_path,
|
|
bus=bus,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
)
|
|
mgr._announce_result = AsyncMock()
|
|
|
|
async def fake_execute(self, **kwargs):
|
|
return "tool result"
|
|
|
|
monkeypatch.setattr("nanobot.agent.tools.filesystem.ListDirTool.execute", fake_execute)
|
|
|
|
await mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"})
|
|
|
|
mgr._announce_result.assert_awaited_once()
|
|
args = mgr._announce_result.await_args.args
|
|
assert args[3] == "Task completed but no final response was generated."
|
|
assert args[5] == "ok"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_accumulates_usage_and_preserves_cached_tokens():
|
|
"""Runner should accumulate prompt/completion tokens across iterations
|
|
and preserve cached_tokens from provider responses."""
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
call_count = {"n": 0}
|
|
|
|
async def chat_with_retry(*, messages, **kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
return LLMResponse(
|
|
content="thinking",
|
|
tool_calls=[ToolCallRequest(id="call_1", name="read_file", arguments={"path": "x"})],
|
|
usage={"prompt_tokens": 100, "completion_tokens": 10, "cached_tokens": 80},
|
|
)
|
|
return LLMResponse(
|
|
content="done",
|
|
tool_calls=[],
|
|
usage={"prompt_tokens": 200, "completion_tokens": 20, "cached_tokens": 150},
|
|
)
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
tools.execute = AsyncMock(return_value="file content")
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[{"role": "user", "content": "do task"}],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=3,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
))
|
|
|
|
# Usage should be accumulated across iterations
|
|
assert result.usage["prompt_tokens"] == 300 # 100 + 200
|
|
assert result.usage["completion_tokens"] == 30 # 10 + 20
|
|
assert result.usage["cached_tokens"] == 230 # 80 + 150
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_passes_cached_tokens_to_hook_context():
|
|
"""Hook context.usage should contain cached_tokens."""
|
|
from nanobot.agent.hook import AgentHook, AgentHookContext
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
captured_usage: list[dict] = []
|
|
|
|
class UsageHook(AgentHook):
|
|
async def after_iteration(self, context: AgentHookContext) -> None:
|
|
captured_usage.append(dict(context.usage))
|
|
|
|
async def chat_with_retry(**kwargs):
|
|
return LLMResponse(
|
|
content="done",
|
|
tool_calls=[],
|
|
usage={"prompt_tokens": 200, "completion_tokens": 20, "cached_tokens": 150},
|
|
)
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
|
|
runner = AgentRunner(provider)
|
|
await runner.run(AgentRunSpec(
|
|
initial_messages=[],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=1,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
hook=UsageHook(),
|
|
))
|
|
|
|
assert len(captured_usage) == 1
|
|
assert captured_usage[0]["cached_tokens"] == 150
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Length recovery (auto-continue on finish_reason == "length")
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_length_recovery_continues_from_truncated_output():
|
|
"""When finish_reason is 'length', runner should insert a continuation
|
|
prompt and retry, stitching partial outputs into the final result."""
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
call_count = {"n": 0}
|
|
|
|
async def chat_with_retry(*, messages, **kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] <= 2:
|
|
return LLMResponse(
|
|
content=f"part{call_count['n']} ",
|
|
finish_reason="length",
|
|
usage={},
|
|
)
|
|
return LLMResponse(content="final", finish_reason="stop", usage={})
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[{"role": "user", "content": "write a long essay"}],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=10,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
))
|
|
|
|
assert result.stop_reason == "completed"
|
|
assert result.final_content == "final"
|
|
assert call_count["n"] == 3
|
|
roles = [m["role"] for m in result.messages if m["role"] == "user"]
|
|
assert len(roles) >= 3 # original + 2 recovery prompts
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_length_recovery_streaming_calls_on_stream_end_with_resuming():
|
|
"""During length recovery with streaming, on_stream_end should be called
|
|
with resuming=True so the hook knows the conversation is continuing."""
|
|
from nanobot.agent.hook import AgentHook, AgentHookContext
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
call_count = {"n": 0}
|
|
stream_end_calls: list[bool] = []
|
|
|
|
class StreamHook(AgentHook):
|
|
def wants_streaming(self) -> bool:
|
|
return True
|
|
|
|
async def on_stream(self, context: AgentHookContext, delta: str) -> None:
|
|
pass
|
|
|
|
async def on_stream_end(self, context: AgentHookContext, resuming: bool = False) -> None:
|
|
stream_end_calls.append(resuming)
|
|
|
|
async def chat_stream_with_retry(*, messages, on_content_delta=None, **kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
return LLMResponse(content="partial ", finish_reason="length", usage={})
|
|
return LLMResponse(content="done", finish_reason="stop", usage={})
|
|
|
|
provider.chat_stream_with_retry = chat_stream_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
|
|
runner = AgentRunner(provider)
|
|
await runner.run(AgentRunSpec(
|
|
initial_messages=[{"role": "user", "content": "go"}],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=10,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
hook=StreamHook(),
|
|
))
|
|
|
|
assert len(stream_end_calls) == 2
|
|
assert stream_end_calls[0] is True # length recovery: resuming
|
|
assert stream_end_calls[1] is False # final response: done
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_length_recovery_gives_up_after_max_retries():
|
|
"""After _MAX_LENGTH_RECOVERIES attempts the runner should stop retrying."""
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner, _MAX_LENGTH_RECOVERIES
|
|
|
|
provider = MagicMock()
|
|
call_count = {"n": 0}
|
|
|
|
async def chat_with_retry(*, messages, **kwargs):
|
|
call_count["n"] += 1
|
|
return LLMResponse(
|
|
content=f"chunk{call_count['n']}",
|
|
finish_reason="length",
|
|
usage={},
|
|
)
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[{"role": "user", "content": "go"}],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=20,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
))
|
|
|
|
assert call_count["n"] == _MAX_LENGTH_RECOVERIES + 1
|
|
assert result.final_content is not None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Backfill missing tool_results
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_backfill_missing_tool_results_inserts_error():
|
|
"""Orphaned tool_use (no matching tool_result) should get a synthetic error."""
|
|
from nanobot.agent.runner import AgentRunner, _BACKFILL_CONTENT
|
|
|
|
messages = [
|
|
{"role": "user", "content": "hi"},
|
|
{
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": [
|
|
{"id": "call_a", "type": "function", "function": {"name": "exec", "arguments": "{}"}},
|
|
{"id": "call_b", "type": "function", "function": {"name": "read_file", "arguments": "{}"}},
|
|
],
|
|
},
|
|
{"role": "tool", "tool_call_id": "call_a", "name": "exec", "content": "ok"},
|
|
]
|
|
result = AgentRunner._backfill_missing_tool_results(messages)
|
|
tool_msgs = [m for m in result if m.get("role") == "tool"]
|
|
assert len(tool_msgs) == 2
|
|
backfilled = [m for m in tool_msgs if m.get("tool_call_id") == "call_b"]
|
|
assert len(backfilled) == 1
|
|
assert backfilled[0]["content"] == _BACKFILL_CONTENT
|
|
assert backfilled[0]["name"] == "read_file"
|
|
|
|
|
|
def test_drop_orphan_tool_results_removes_unmatched_tool_messages():
|
|
from nanobot.agent.runner import AgentRunner
|
|
|
|
messages = [
|
|
{"role": "system", "content": "system"},
|
|
{"role": "user", "content": "old user"},
|
|
{
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": [
|
|
{"id": "call_ok", "type": "function", "function": {"name": "read_file", "arguments": "{}"}},
|
|
],
|
|
},
|
|
{"role": "tool", "tool_call_id": "call_ok", "name": "read_file", "content": "ok"},
|
|
{"role": "tool", "tool_call_id": "call_orphan", "name": "exec", "content": "stale"},
|
|
{"role": "assistant", "content": "after tool"},
|
|
]
|
|
|
|
cleaned = AgentRunner._drop_orphan_tool_results(messages)
|
|
|
|
assert cleaned == [
|
|
{"role": "system", "content": "system"},
|
|
{"role": "user", "content": "old user"},
|
|
{
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": [
|
|
{"id": "call_ok", "type": "function", "function": {"name": "read_file", "arguments": "{}"}},
|
|
],
|
|
},
|
|
{"role": "tool", "tool_call_id": "call_ok", "name": "read_file", "content": "ok"},
|
|
{"role": "assistant", "content": "after tool"},
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_backfill_noop_when_complete():
|
|
"""Complete message chains should not be modified."""
|
|
from nanobot.agent.runner import AgentRunner
|
|
|
|
messages = [
|
|
{"role": "user", "content": "hi"},
|
|
{
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": [
|
|
{"id": "call_x", "type": "function", "function": {"name": "exec", "arguments": "{}"}},
|
|
],
|
|
},
|
|
{"role": "tool", "tool_call_id": "call_x", "name": "exec", "content": "done"},
|
|
{"role": "assistant", "content": "all good"},
|
|
]
|
|
result = AgentRunner._backfill_missing_tool_results(messages)
|
|
assert result is messages # same object — no copy
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_drops_orphan_tool_results_before_model_request():
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
captured_messages: list[dict] = []
|
|
|
|
async def chat_with_retry(*, messages, **kwargs):
|
|
captured_messages[:] = messages
|
|
return LLMResponse(content="done", tool_calls=[], usage={})
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[
|
|
{"role": "system", "content": "system"},
|
|
{"role": "user", "content": "old user"},
|
|
{"role": "tool", "tool_call_id": "call_orphan", "name": "exec", "content": "stale"},
|
|
{"role": "assistant", "content": "after orphan"},
|
|
{"role": "user", "content": "new prompt"},
|
|
],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=1,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
))
|
|
|
|
assert all(
|
|
message.get("tool_call_id") != "call_orphan"
|
|
for message in captured_messages
|
|
if message.get("role") == "tool"
|
|
)
|
|
assert result.messages[2]["tool_call_id"] == "call_orphan"
|
|
assert result.final_content == "done"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_backfill_repairs_model_context_without_shifting_save_turn_boundary(tmp_path):
|
|
"""Historical backfill should not duplicate old tail messages on persist."""
|
|
from nanobot.agent.loop import AgentLoop
|
|
from nanobot.agent.runner import _BACKFILL_CONTENT
|
|
from nanobot.bus.events import InboundMessage
|
|
from nanobot.bus.queue import MessageBus
|
|
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
response = LLMResponse(content="new answer", tool_calls=[], usage={})
|
|
provider.chat_with_retry = AsyncMock(return_value=response)
|
|
provider.chat_stream_with_retry = AsyncMock(return_value=response)
|
|
|
|
loop = AgentLoop(
|
|
bus=MessageBus(),
|
|
provider=provider,
|
|
workspace=tmp_path,
|
|
model="test-model",
|
|
)
|
|
loop.tools.get_definitions = MagicMock(return_value=[])
|
|
loop.consolidator.maybe_consolidate_by_tokens = AsyncMock(return_value=False) # type: ignore[method-assign]
|
|
|
|
session = loop.sessions.get_or_create("cli:test")
|
|
session.messages = [
|
|
{"role": "user", "content": "old user", "timestamp": "2026-01-01T00:00:00"},
|
|
{
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": [
|
|
{
|
|
"id": "call_missing",
|
|
"type": "function",
|
|
"function": {"name": "read_file", "arguments": "{}"},
|
|
}
|
|
],
|
|
"timestamp": "2026-01-01T00:00:01",
|
|
},
|
|
{"role": "assistant", "content": "old tail", "timestamp": "2026-01-01T00:00:02"},
|
|
]
|
|
loop.sessions.save(session)
|
|
|
|
result = await loop._process_message(
|
|
InboundMessage(channel="cli", sender_id="user", chat_id="test", content="new prompt")
|
|
)
|
|
|
|
assert result is not None
|
|
assert result.content == "new answer"
|
|
|
|
request_messages = provider.chat_with_retry.await_args.kwargs["messages"]
|
|
synthetic = [
|
|
message
|
|
for message in request_messages
|
|
if message.get("role") == "tool" and message.get("tool_call_id") == "call_missing"
|
|
]
|
|
assert len(synthetic) == 1
|
|
assert synthetic[0]["content"] == _BACKFILL_CONTENT
|
|
|
|
session_after = loop.sessions.get_or_create("cli:test")
|
|
assert [
|
|
{
|
|
key: value
|
|
for key, value in message.items()
|
|
if key in {"role", "content", "tool_call_id", "name", "tool_calls"}
|
|
}
|
|
for message in session_after.messages
|
|
] == [
|
|
{"role": "user", "content": "old user"},
|
|
{
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": [
|
|
{
|
|
"id": "call_missing",
|
|
"type": "function",
|
|
"function": {"name": "read_file", "arguments": "{}"},
|
|
}
|
|
],
|
|
},
|
|
{"role": "assistant", "content": "old tail"},
|
|
{"role": "user", "content": "new prompt"},
|
|
{"role": "assistant", "content": "new answer"},
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_backfill_only_mutates_model_context_not_returned_messages():
|
|
"""Runner should repair orphaned tool calls for the model without rewriting result.messages."""
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner, _BACKFILL_CONTENT
|
|
|
|
provider = MagicMock()
|
|
captured_messages: list[dict] = []
|
|
|
|
async def chat_with_retry(*, messages, **kwargs):
|
|
captured_messages[:] = messages
|
|
return LLMResponse(content="done", tool_calls=[], usage={})
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
|
|
initial_messages = [
|
|
{"role": "system", "content": "system"},
|
|
{"role": "user", "content": "old user"},
|
|
{
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": [
|
|
{
|
|
"id": "call_missing",
|
|
"type": "function",
|
|
"function": {"name": "read_file", "arguments": "{}"},
|
|
}
|
|
],
|
|
},
|
|
{"role": "assistant", "content": "old tail"},
|
|
{"role": "user", "content": "new prompt"},
|
|
]
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=initial_messages,
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=3,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
))
|
|
|
|
synthetic = [
|
|
message
|
|
for message in captured_messages
|
|
if message.get("role") == "tool" and message.get("tool_call_id") == "call_missing"
|
|
]
|
|
assert len(synthetic) == 1
|
|
assert synthetic[0]["content"] == _BACKFILL_CONTENT
|
|
|
|
assert [
|
|
{
|
|
key: value
|
|
for key, value in message.items()
|
|
if key in {"role", "content", "tool_call_id", "name", "tool_calls"}
|
|
}
|
|
for message in result.messages
|
|
] == [
|
|
{"role": "system", "content": "system"},
|
|
{"role": "user", "content": "old user"},
|
|
{
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": [
|
|
{
|
|
"id": "call_missing",
|
|
"type": "function",
|
|
"function": {"name": "read_file", "arguments": "{}"},
|
|
}
|
|
],
|
|
},
|
|
{"role": "assistant", "content": "old tail"},
|
|
{"role": "user", "content": "new prompt"},
|
|
{"role": "assistant", "content": "done"},
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Microcompact (stale tool result compaction)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_microcompact_replaces_old_tool_results():
|
|
"""Tool results beyond _MICROCOMPACT_KEEP_RECENT should be summarized."""
|
|
from nanobot.agent.runner import AgentRunner, _MICROCOMPACT_KEEP_RECENT
|
|
|
|
total = _MICROCOMPACT_KEEP_RECENT + 5
|
|
long_content = "x" * 600
|
|
messages: list[dict] = [{"role": "system", "content": "sys"}]
|
|
for i in range(total):
|
|
messages.append({
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": [{"id": f"c{i}", "type": "function", "function": {"name": "read_file", "arguments": "{}"}}],
|
|
})
|
|
messages.append({
|
|
"role": "tool", "tool_call_id": f"c{i}", "name": "read_file",
|
|
"content": long_content,
|
|
})
|
|
|
|
result = AgentRunner._microcompact(messages)
|
|
tool_msgs = [m for m in result if m.get("role") == "tool"]
|
|
stale_count = total - _MICROCOMPACT_KEEP_RECENT
|
|
compacted = [m for m in tool_msgs if "omitted from context" in str(m.get("content", ""))]
|
|
preserved = [m for m in tool_msgs if m.get("content") == long_content]
|
|
assert len(compacted) == stale_count
|
|
assert len(preserved) == _MICROCOMPACT_KEEP_RECENT
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_microcompact_preserves_short_results():
|
|
"""Short tool results (< _MICROCOMPACT_MIN_CHARS) should not be replaced."""
|
|
from nanobot.agent.runner import AgentRunner, _MICROCOMPACT_KEEP_RECENT
|
|
|
|
total = _MICROCOMPACT_KEEP_RECENT + 5
|
|
messages: list[dict] = []
|
|
for i in range(total):
|
|
messages.append({
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": [{"id": f"c{i}", "type": "function", "function": {"name": "exec", "arguments": "{}"}}],
|
|
})
|
|
messages.append({
|
|
"role": "tool", "tool_call_id": f"c{i}", "name": "exec",
|
|
"content": "short",
|
|
})
|
|
|
|
result = AgentRunner._microcompact(messages)
|
|
assert result is messages # no copy needed — all stale results are short
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_microcompact_skips_non_compactable_tools():
|
|
"""Non-compactable tools (e.g. 'message') should never be replaced."""
|
|
from nanobot.agent.runner import AgentRunner, _MICROCOMPACT_KEEP_RECENT
|
|
|
|
total = _MICROCOMPACT_KEEP_RECENT + 5
|
|
long_content = "y" * 1000
|
|
messages: list[dict] = []
|
|
for i in range(total):
|
|
messages.append({
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": [{"id": f"c{i}", "type": "function", "function": {"name": "message", "arguments": "{}"}}],
|
|
})
|
|
messages.append({
|
|
"role": "tool", "tool_call_id": f"c{i}", "name": "message",
|
|
"content": long_content,
|
|
})
|
|
|
|
result = AgentRunner._microcompact(messages)
|
|
assert result is messages # no compactable tools found
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_tool_error_preserves_tool_results_in_messages():
|
|
"""When a tool raises a fatal error, its results must still be appended
|
|
to messages so the session never contains orphan tool_calls (#2943)."""
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
|
|
async def chat_with_retry(*, messages, **kwargs):
|
|
return LLMResponse(
|
|
content=None,
|
|
tool_calls=[
|
|
ToolCallRequest(id="tc1", name="read_file", arguments={"path": "a"}),
|
|
ToolCallRequest(id="tc2", name="exec", arguments={"cmd": "bad"}),
|
|
],
|
|
usage={},
|
|
)
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
provider.chat_stream_with_retry = chat_with_retry
|
|
|
|
call_idx = 0
|
|
|
|
async def fake_execute(name, args, **kw):
|
|
nonlocal call_idx
|
|
call_idx += 1
|
|
if call_idx == 2:
|
|
raise RuntimeError("boom")
|
|
return "file content"
|
|
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
tools.execute = AsyncMock(side_effect=fake_execute)
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[{"role": "user", "content": "do stuff"}],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=1,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
fail_on_tool_error=True,
|
|
))
|
|
|
|
assert result.stop_reason == "tool_error"
|
|
# Both tool results must be in messages even though tc2 had a fatal error.
|
|
tool_msgs = [m for m in result.messages if m.get("role") == "tool"]
|
|
assert len(tool_msgs) == 2
|
|
assert tool_msgs[0]["tool_call_id"] == "tc1"
|
|
assert tool_msgs[1]["tool_call_id"] == "tc2"
|
|
# The assistant message with tool_calls must precede the tool results.
|
|
asst_tc_idx = next(
|
|
i for i, m in enumerate(result.messages)
|
|
if m.get("role") == "assistant" and m.get("tool_calls")
|
|
)
|
|
tool_indices = [
|
|
i for i, m in enumerate(result.messages) if m.get("role") == "tool"
|
|
]
|
|
assert all(ti > asst_tc_idx for ti in tool_indices)
|
|
|
|
|
|
def test_governance_repairs_orphans_after_snip():
|
|
"""After _snip_history clips an assistant+tool_calls, the second
|
|
_drop_orphan_tool_results pass must clean up the resulting orphans."""
|
|
from nanobot.agent.runner import AgentRunner
|
|
|
|
messages = [
|
|
{"role": "system", "content": "system"},
|
|
{"role": "user", "content": "old msg"},
|
|
{"role": "assistant", "content": None,
|
|
"tool_calls": [{"id": "tc_old", "type": "function",
|
|
"function": {"name": "search", "arguments": "{}"}}]},
|
|
{"role": "tool", "tool_call_id": "tc_old", "name": "search",
|
|
"content": "old result"},
|
|
{"role": "assistant", "content": "old answer"},
|
|
{"role": "user", "content": "new msg"},
|
|
]
|
|
|
|
# Simulate snipping that keeps only the tail: drop the assistant with
|
|
# tool_calls but keep its tool result (orphan).
|
|
snipped = [
|
|
{"role": "system", "content": "system"},
|
|
{"role": "tool", "tool_call_id": "tc_old", "name": "search",
|
|
"content": "old result"},
|
|
{"role": "assistant", "content": "old answer"},
|
|
{"role": "user", "content": "new msg"},
|
|
]
|
|
|
|
cleaned = AgentRunner._drop_orphan_tool_results(snipped)
|
|
# The orphan tool result should be removed.
|
|
assert not any(
|
|
m.get("role") == "tool" and m.get("tool_call_id") == "tc_old"
|
|
for m in cleaned
|
|
)
|
|
|
|
|
|
def test_governance_fallback_still_repairs_orphans():
|
|
"""When full governance fails, the fallback must still run
|
|
_drop_orphan_tool_results and _backfill_missing_tool_results."""
|
|
from nanobot.agent.runner import AgentRunner
|
|
|
|
# Messages with an orphan tool result (no matching assistant tool_call).
|
|
messages = [
|
|
{"role": "user", "content": "hello"},
|
|
{"role": "tool", "tool_call_id": "orphan_tc", "name": "read",
|
|
"content": "stale"},
|
|
{"role": "assistant", "content": "hi"},
|
|
]
|
|
|
|
repaired = AgentRunner._drop_orphan_tool_results(messages)
|
|
repaired = AgentRunner._backfill_missing_tool_results(repaired)
|
|
# Orphan tool result should be gone.
|
|
assert not any(m.get("tool_call_id") == "orphan_tc" for m in repaired)
|
|
# ── Mid-turn injection tests ──────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_drain_injections_returns_empty_when_no_callback():
|
|
"""No injection_callback → empty list."""
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
runner = AgentRunner(provider)
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
spec = AgentRunSpec(
|
|
initial_messages=[], tools=tools, model="m",
|
|
max_iterations=1, max_tool_result_chars=1000,
|
|
injection_callback=None,
|
|
)
|
|
result = await runner._drain_injections(spec)
|
|
assert result == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_drain_injections_extracts_content_from_inbound_messages():
|
|
"""Should extract .content from InboundMessage objects."""
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
from nanobot.bus.events import InboundMessage
|
|
|
|
provider = MagicMock()
|
|
runner = AgentRunner(provider)
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
|
|
msgs = [
|
|
InboundMessage(channel="cli", sender_id="u", chat_id="c", content="hello"),
|
|
InboundMessage(channel="cli", sender_id="u", chat_id="c", content="world"),
|
|
]
|
|
|
|
async def cb():
|
|
return msgs
|
|
|
|
spec = AgentRunSpec(
|
|
initial_messages=[], tools=tools, model="m",
|
|
max_iterations=1, max_tool_result_chars=1000,
|
|
injection_callback=cb,
|
|
)
|
|
result = await runner._drain_injections(spec)
|
|
assert result == [
|
|
{"role": "user", "content": "hello"},
|
|
{"role": "user", "content": "world"},
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_drain_injections_passes_limit_to_callback_when_supported():
|
|
"""Limit-aware callbacks can preserve overflow in their own queue."""
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner, _MAX_INJECTIONS_PER_TURN
|
|
from nanobot.bus.events import InboundMessage
|
|
|
|
provider = MagicMock()
|
|
runner = AgentRunner(provider)
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
seen_limits: list[int] = []
|
|
|
|
msgs = [
|
|
InboundMessage(channel="cli", sender_id="u", chat_id="c", content=f"msg{i}")
|
|
for i in range(_MAX_INJECTIONS_PER_TURN + 3)
|
|
]
|
|
|
|
async def cb(*, limit: int):
|
|
seen_limits.append(limit)
|
|
return msgs[:limit]
|
|
|
|
spec = AgentRunSpec(
|
|
initial_messages=[], tools=tools, model="m",
|
|
max_iterations=1, max_tool_result_chars=1000,
|
|
injection_callback=cb,
|
|
)
|
|
result = await runner._drain_injections(spec)
|
|
assert seen_limits == [_MAX_INJECTIONS_PER_TURN]
|
|
assert result == [
|
|
{"role": "user", "content": "msg0"},
|
|
{"role": "user", "content": "msg1"},
|
|
{"role": "user", "content": "msg2"},
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_drain_injections_skips_empty_content():
|
|
"""Messages with blank content should be filtered out."""
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
from nanobot.bus.events import InboundMessage
|
|
|
|
provider = MagicMock()
|
|
runner = AgentRunner(provider)
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
|
|
msgs = [
|
|
InboundMessage(channel="cli", sender_id="u", chat_id="c", content=""),
|
|
InboundMessage(channel="cli", sender_id="u", chat_id="c", content=" "),
|
|
InboundMessage(channel="cli", sender_id="u", chat_id="c", content="valid"),
|
|
]
|
|
|
|
async def cb():
|
|
return msgs
|
|
|
|
spec = AgentRunSpec(
|
|
initial_messages=[], tools=tools, model="m",
|
|
max_iterations=1, max_tool_result_chars=1000,
|
|
injection_callback=cb,
|
|
)
|
|
result = await runner._drain_injections(spec)
|
|
assert result == [{"role": "user", "content": "valid"}]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_drain_injections_handles_callback_exception():
|
|
"""If the callback raises, return empty list (error is logged)."""
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
runner = AgentRunner(provider)
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
|
|
async def cb():
|
|
raise RuntimeError("boom")
|
|
|
|
spec = AgentRunSpec(
|
|
initial_messages=[], tools=tools, model="m",
|
|
max_iterations=1, max_tool_result_chars=1000,
|
|
injection_callback=cb,
|
|
)
|
|
result = await runner._drain_injections(spec)
|
|
assert result == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_checkpoint1_injects_after_tool_execution():
|
|
"""Follow-up messages are injected after tool execution, before next LLM call."""
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
from nanobot.bus.events import InboundMessage
|
|
|
|
provider = MagicMock()
|
|
call_count = {"n": 0}
|
|
captured_messages = []
|
|
|
|
async def chat_with_retry(*, messages, **kwargs):
|
|
call_count["n"] += 1
|
|
captured_messages.append(list(messages))
|
|
if call_count["n"] == 1:
|
|
return LLMResponse(
|
|
content="using tool",
|
|
tool_calls=[ToolCallRequest(id="c1", name="read_file", arguments={"path": "x"})],
|
|
usage={},
|
|
)
|
|
return LLMResponse(content="final answer", tool_calls=[], usage={})
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
tools.execute = AsyncMock(return_value="file content")
|
|
|
|
injection_queue = asyncio.Queue()
|
|
|
|
async def inject_cb():
|
|
items = []
|
|
while not injection_queue.empty():
|
|
items.append(await injection_queue.get())
|
|
return items
|
|
|
|
# Put a follow-up message in the queue before the run starts
|
|
await injection_queue.put(
|
|
InboundMessage(channel="cli", sender_id="u", chat_id="c", content="follow-up question")
|
|
)
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[{"role": "user", "content": "hello"}],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=5,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
injection_callback=inject_cb,
|
|
))
|
|
|
|
assert result.had_injections is True
|
|
assert result.final_content == "final answer"
|
|
# The second call should have the injected user message
|
|
assert call_count["n"] == 2
|
|
last_messages = captured_messages[-1]
|
|
injected = [m for m in last_messages if m.get("role") == "user" and m.get("content") == "follow-up question"]
|
|
assert len(injected) == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_checkpoint2_injects_after_final_response_with_resuming_stream():
|
|
"""After final response, if injections exist, stream_end should get resuming=True."""
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
from nanobot.agent.hook import AgentHook, AgentHookContext
|
|
from nanobot.bus.events import InboundMessage
|
|
|
|
provider = MagicMock()
|
|
call_count = {"n": 0}
|
|
stream_end_calls = []
|
|
|
|
class TrackingHook(AgentHook):
|
|
def wants_streaming(self) -> bool:
|
|
return True
|
|
|
|
async def on_stream_end(self, context: AgentHookContext, *, resuming: bool) -> None:
|
|
stream_end_calls.append(resuming)
|
|
|
|
def finalize_content(self, context: AgentHookContext, content: str | None) -> str | None:
|
|
return content
|
|
|
|
async def chat_stream_with_retry(*, messages, on_content_delta=None, **kwargs):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
return LLMResponse(content="first answer", tool_calls=[], usage={})
|
|
return LLMResponse(content="second answer", tool_calls=[], usage={})
|
|
|
|
provider.chat_stream_with_retry = chat_stream_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
|
|
injection_queue = asyncio.Queue()
|
|
|
|
async def inject_cb():
|
|
items = []
|
|
while not injection_queue.empty():
|
|
items.append(await injection_queue.get())
|
|
return items
|
|
|
|
# Inject a follow-up that arrives during the first response
|
|
await injection_queue.put(
|
|
InboundMessage(channel="cli", sender_id="u", chat_id="c", content="quick follow-up")
|
|
)
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[{"role": "user", "content": "hello"}],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=5,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
hook=TrackingHook(),
|
|
injection_callback=inject_cb,
|
|
))
|
|
|
|
assert result.had_injections is True
|
|
assert result.final_content == "second answer"
|
|
assert call_count["n"] == 2
|
|
# First stream_end should have resuming=True (because injections found)
|
|
assert stream_end_calls[0] is True
|
|
# Second (final) stream_end should have resuming=False
|
|
assert stream_end_calls[-1] is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_checkpoint2_preserves_final_response_in_history_before_followup():
|
|
"""A follow-up injected after a final answer must still see that answer in history."""
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
from nanobot.bus.events import InboundMessage
|
|
|
|
provider = MagicMock()
|
|
call_count = {"n": 0}
|
|
captured_messages = []
|
|
|
|
async def chat_with_retry(*, messages, **kwargs):
|
|
call_count["n"] += 1
|
|
captured_messages.append([dict(message) for message in messages])
|
|
if call_count["n"] == 1:
|
|
return LLMResponse(content="first answer", tool_calls=[], usage={})
|
|
return LLMResponse(content="second answer", tool_calls=[], usage={})
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
|
|
injection_queue = asyncio.Queue()
|
|
|
|
async def inject_cb():
|
|
items = []
|
|
while not injection_queue.empty():
|
|
items.append(await injection_queue.get())
|
|
return items
|
|
|
|
await injection_queue.put(
|
|
InboundMessage(channel="cli", sender_id="u", chat_id="c", content="follow-up question")
|
|
)
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[{"role": "user", "content": "hello"}],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=5,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
injection_callback=inject_cb,
|
|
))
|
|
|
|
assert result.final_content == "second answer"
|
|
assert call_count["n"] == 2
|
|
assert captured_messages[-1] == [
|
|
{"role": "user", "content": "hello"},
|
|
{"role": "assistant", "content": "first answer"},
|
|
{"role": "user", "content": "follow-up question"},
|
|
]
|
|
assert [
|
|
{"role": message["role"], "content": message["content"]}
|
|
for message in result.messages
|
|
if message.get("role") == "assistant"
|
|
] == [
|
|
{"role": "assistant", "content": "first answer"},
|
|
{"role": "assistant", "content": "second answer"},
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_loop_injected_followup_preserves_image_media(tmp_path):
|
|
"""Mid-turn follow-ups with images should keep multimodal content."""
|
|
from nanobot.agent.loop import AgentLoop
|
|
from nanobot.bus.events import InboundMessage
|
|
from nanobot.bus.queue import MessageBus
|
|
|
|
image_path = tmp_path / "followup.png"
|
|
image_path.write_bytes(base64.b64decode(
|
|
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAwMCAO+yF9kAAAAASUVORK5CYII="
|
|
))
|
|
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
captured_messages: list[list[dict]] = []
|
|
call_count = {"n": 0}
|
|
|
|
async def chat_with_retry(*, messages, **kwargs):
|
|
call_count["n"] += 1
|
|
captured_messages.append(list(messages))
|
|
if call_count["n"] == 1:
|
|
return LLMResponse(content="first answer", tool_calls=[], usage={})
|
|
return LLMResponse(content="second answer", tool_calls=[], usage={})
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model")
|
|
loop.tools.get_definitions = MagicMock(return_value=[])
|
|
|
|
pending_queue = asyncio.Queue()
|
|
await pending_queue.put(InboundMessage(
|
|
channel="cli",
|
|
sender_id="u",
|
|
chat_id="c",
|
|
content="",
|
|
media=[str(image_path)],
|
|
))
|
|
|
|
final_content, _, _, _, had_injections = await loop._run_agent_loop(
|
|
[{"role": "user", "content": "hello"}],
|
|
channel="cli",
|
|
chat_id="c",
|
|
pending_queue=pending_queue,
|
|
)
|
|
|
|
assert final_content == "second answer"
|
|
assert had_injections is True
|
|
assert call_count["n"] == 2
|
|
injected_user_messages = [
|
|
message for message in captured_messages[-1]
|
|
if message.get("role") == "user" and isinstance(message.get("content"), list)
|
|
]
|
|
assert injected_user_messages
|
|
assert any(
|
|
block.get("type") == "image_url"
|
|
for block in injected_user_messages[-1]["content"]
|
|
if isinstance(block, dict)
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runner_merges_multiple_injected_user_messages_without_losing_media():
|
|
"""Multiple injected follow-ups should not create lossy consecutive user messages."""
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
call_count = {"n": 0}
|
|
captured_messages = []
|
|
|
|
async def chat_with_retry(*, messages, **kwargs):
|
|
call_count["n"] += 1
|
|
captured_messages.append([dict(message) for message in messages])
|
|
if call_count["n"] == 1:
|
|
return LLMResponse(content="first answer", tool_calls=[], usage={})
|
|
return LLMResponse(content="second answer", tool_calls=[], usage={})
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
|
|
async def inject_cb():
|
|
if call_count["n"] == 1:
|
|
return [
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "image_url", "image_url": {"url": "data:image/png;base64,abc"}},
|
|
{"type": "text", "text": "look at this"},
|
|
],
|
|
},
|
|
{"role": "user", "content": "and answer briefly"},
|
|
]
|
|
return []
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[{"role": "user", "content": "hello"}],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=5,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
injection_callback=inject_cb,
|
|
))
|
|
|
|
assert result.final_content == "second answer"
|
|
assert call_count["n"] == 2
|
|
second_call = captured_messages[-1]
|
|
user_messages = [message for message in second_call if message.get("role") == "user"]
|
|
assert len(user_messages) == 2
|
|
injected = user_messages[-1]
|
|
assert isinstance(injected["content"], list)
|
|
assert any(
|
|
block.get("type") == "image_url"
|
|
for block in injected["content"]
|
|
if isinstance(block, dict)
|
|
)
|
|
assert any(
|
|
block.get("type") == "text" and block.get("text") == "and answer briefly"
|
|
for block in injected["content"]
|
|
if isinstance(block, dict)
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_injection_cycles_capped_at_max():
|
|
"""Injection cycles should be capped at _MAX_INJECTION_CYCLES."""
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner, _MAX_INJECTION_CYCLES
|
|
from nanobot.bus.events import InboundMessage
|
|
|
|
provider = MagicMock()
|
|
call_count = {"n": 0}
|
|
|
|
async def chat_with_retry(*, messages, **kwargs):
|
|
call_count["n"] += 1
|
|
return LLMResponse(content=f"answer-{call_count['n']}", tool_calls=[], usage={})
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
|
|
drain_count = {"n": 0}
|
|
|
|
async def inject_cb():
|
|
drain_count["n"] += 1
|
|
# Only inject for the first _MAX_INJECTION_CYCLES drains
|
|
if drain_count["n"] <= _MAX_INJECTION_CYCLES:
|
|
return [InboundMessage(channel="cli", sender_id="u", chat_id="c", content=f"msg-{drain_count['n']}")]
|
|
return []
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[{"role": "user", "content": "start"}],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=20,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
injection_callback=inject_cb,
|
|
))
|
|
|
|
assert result.had_injections is True
|
|
# Should be capped: _MAX_INJECTION_CYCLES injection rounds + 1 final round
|
|
assert call_count["n"] == _MAX_INJECTION_CYCLES + 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_injections_flag_is_false_by_default():
|
|
"""had_injections should be False when no injection callback or no messages."""
|
|
from nanobot.agent.runner import AgentRunSpec, AgentRunner
|
|
|
|
provider = MagicMock()
|
|
|
|
async def chat_with_retry(**kwargs):
|
|
return LLMResponse(content="done", tool_calls=[], usage={})
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
tools = MagicMock()
|
|
tools.get_definitions.return_value = []
|
|
|
|
runner = AgentRunner(provider)
|
|
result = await runner.run(AgentRunSpec(
|
|
initial_messages=[{"role": "user", "content": "hi"}],
|
|
tools=tools,
|
|
model="test-model",
|
|
max_iterations=1,
|
|
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
|
|
))
|
|
|
|
assert result.had_injections is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pending_queue_cleanup_on_dispatch(tmp_path):
|
|
"""_pending_queues should be cleaned up after _dispatch completes."""
|
|
loop = _make_loop(tmp_path)
|
|
|
|
async def chat_with_retry(**kwargs):
|
|
return LLMResponse(content="done", tool_calls=[], usage={})
|
|
|
|
loop.provider.chat_with_retry = chat_with_retry
|
|
|
|
from nanobot.bus.events import InboundMessage
|
|
|
|
msg = InboundMessage(channel="cli", sender_id="u", chat_id="c", content="hello")
|
|
# The queue should not exist before dispatch
|
|
assert msg.session_key not in loop._pending_queues
|
|
|
|
await loop._dispatch(msg)
|
|
|
|
# The queue should be cleaned up after dispatch
|
|
assert msg.session_key not in loop._pending_queues
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_followup_routed_to_pending_queue(tmp_path):
|
|
"""Unified-session follow-ups should route into the active pending queue."""
|
|
from nanobot.agent.loop import UNIFIED_SESSION_KEY
|
|
from nanobot.bus.events import InboundMessage
|
|
|
|
loop = _make_loop(tmp_path)
|
|
loop._unified_session = True
|
|
loop._dispatch = AsyncMock() # type: ignore[method-assign]
|
|
|
|
pending = asyncio.Queue(maxsize=20)
|
|
loop._pending_queues[UNIFIED_SESSION_KEY] = pending
|
|
|
|
run_task = asyncio.create_task(loop.run())
|
|
msg = InboundMessage(channel="discord", sender_id="u", chat_id="c", content="follow-up")
|
|
await loop.bus.publish_inbound(msg)
|
|
|
|
deadline = time.time() + 2
|
|
while pending.empty() and time.time() < deadline:
|
|
await asyncio.sleep(0.01)
|
|
|
|
loop.stop()
|
|
await asyncio.wait_for(run_task, timeout=2)
|
|
|
|
assert loop._dispatch.await_count == 0
|
|
assert not pending.empty()
|
|
queued_msg = pending.get_nowait()
|
|
assert queued_msg.content == "follow-up"
|
|
assert queued_msg.session_key == UNIFIED_SESSION_KEY
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pending_queue_preserves_overflow_for_next_injection_cycle(tmp_path):
|
|
"""Pending queue should leave overflow messages queued for later drains."""
|
|
from nanobot.agent.loop import AgentLoop
|
|
from nanobot.bus.events import InboundMessage
|
|
from nanobot.bus.queue import MessageBus
|
|
from nanobot.agent.runner import _MAX_INJECTIONS_PER_TURN
|
|
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
captured_messages: list[list[dict]] = []
|
|
call_count = {"n": 0}
|
|
|
|
async def chat_with_retry(*, messages, **kwargs):
|
|
call_count["n"] += 1
|
|
captured_messages.append([dict(message) for message in messages])
|
|
return LLMResponse(content=f"answer-{call_count['n']}", tool_calls=[], usage={})
|
|
|
|
provider.chat_with_retry = chat_with_retry
|
|
loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model")
|
|
loop.tools.get_definitions = MagicMock(return_value=[])
|
|
|
|
pending_queue = asyncio.Queue()
|
|
total_followups = _MAX_INJECTIONS_PER_TURN + 2
|
|
for idx in range(total_followups):
|
|
await pending_queue.put(InboundMessage(
|
|
channel="cli",
|
|
sender_id="u",
|
|
chat_id="c",
|
|
content=f"follow-up-{idx}",
|
|
))
|
|
|
|
final_content, _, _, _, had_injections = await loop._run_agent_loop(
|
|
[{"role": "user", "content": "hello"}],
|
|
channel="cli",
|
|
chat_id="c",
|
|
pending_queue=pending_queue,
|
|
)
|
|
|
|
assert final_content == "answer-3"
|
|
assert had_injections is True
|
|
assert call_count["n"] == 3
|
|
flattened_user_content = "\n".join(
|
|
message["content"]
|
|
for message in captured_messages[-1]
|
|
if message.get("role") == "user" and isinstance(message.get("content"), str)
|
|
)
|
|
for idx in range(total_followups):
|
|
assert f"follow-up-{idx}" in flattened_user_content
|
|
assert pending_queue.empty()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pending_queue_full_falls_back_to_queued_task(tmp_path):
|
|
"""QueueFull should preserve the message by dispatching a queued task."""
|
|
from nanobot.bus.events import InboundMessage
|
|
|
|
loop = _make_loop(tmp_path)
|
|
loop._dispatch = AsyncMock() # type: ignore[method-assign]
|
|
|
|
pending = asyncio.Queue(maxsize=1)
|
|
pending.put_nowait(InboundMessage(channel="cli", sender_id="u", chat_id="c", content="already queued"))
|
|
loop._pending_queues["cli:c"] = pending
|
|
|
|
run_task = asyncio.create_task(loop.run())
|
|
msg = InboundMessage(channel="cli", sender_id="u", chat_id="c", content="follow-up")
|
|
await loop.bus.publish_inbound(msg)
|
|
|
|
deadline = time.time() + 2
|
|
while loop._dispatch.await_count == 0 and time.time() < deadline:
|
|
await asyncio.sleep(0.01)
|
|
|
|
loop.stop()
|
|
await asyncio.wait_for(run_task, timeout=2)
|
|
|
|
assert loop._dispatch.await_count == 1
|
|
dispatched_msg = loop._dispatch.await_args.args[0]
|
|
assert dispatched_msg.content == "follow-up"
|
|
assert pending.qsize() == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dispatch_republishes_leftover_queue_messages(tmp_path):
|
|
"""Messages left in the pending queue after _dispatch are re-published to the bus.
|
|
|
|
This tests the finally-block cleanup that prevents message loss when
|
|
the runner exits early (e.g., max_iterations, tool_error) with messages
|
|
still in the queue.
|
|
"""
|
|
from nanobot.bus.events import InboundMessage
|
|
|
|
loop = _make_loop(tmp_path)
|
|
bus = loop.bus
|
|
|
|
# Simulate a completed dispatch by manually registering a queue
|
|
# with leftover messages, then running the cleanup logic directly.
|
|
pending = asyncio.Queue(maxsize=20)
|
|
session_key = "cli:c"
|
|
loop._pending_queues[session_key] = pending
|
|
pending.put_nowait(InboundMessage(channel="cli", sender_id="u", chat_id="c", content="leftover-1"))
|
|
pending.put_nowait(InboundMessage(channel="cli", sender_id="u", chat_id="c", content="leftover-2"))
|
|
|
|
# Execute the cleanup logic from the finally block
|
|
queue = loop._pending_queues.pop(session_key, None)
|
|
assert queue is not None
|
|
leftover = 0
|
|
while True:
|
|
try:
|
|
item = queue.get_nowait()
|
|
except asyncio.QueueEmpty:
|
|
break
|
|
await bus.publish_inbound(item)
|
|
leftover += 1
|
|
|
|
assert leftover == 2
|
|
|
|
# Verify the messages are now on the bus
|
|
msgs = []
|
|
while not bus.inbound.empty():
|
|
msgs.append(await asyncio.wait_for(bus.consume_inbound(), timeout=0.5))
|
|
contents = [m.content for m in msgs]
|
|
assert "leftover-1" in contents
|
|
assert "leftover-2" in contents
|