nanobot/tests/session/test_turn_continuation.py

128 lines
4.2 KiB
Python

"""Tests for internal turn continuation policy."""
from __future__ import annotations
import asyncio
from types import SimpleNamespace
import pytest
from nanobot.bus.events import InboundMessage
from nanobot.session.goal_state import GOAL_STATE_KEY
from nanobot.session.turn_continuation import (
INTERNAL_CONTINUATION_KIND_META,
INTERNAL_CONTINUATION_META,
INTERNAL_CONTINUATION_PENDING_META,
INTERNAL_CONTINUATION_RUN_STARTED_AT_META,
internal_continuation_pending,
internal_continuation_run_started_at,
maybe_continue_turn,
should_stream_budget_response,
)
@pytest.mark.asyncio
async def test_maybe_continue_turn_queues_internal_message():
meta = {
GOAL_STATE_KEY: {
"status": "active",
"objective": "Finish the migration.",
"ui_summary": "migration",
},
}
messages = [
{"role": "system", "content": "system"},
{"role": "user", "content": "start"},
{"role": "assistant", "content": "paused"},
]
pending: asyncio.Queue[InboundMessage] = asyncio.Queue()
ctx = SimpleNamespace(
session=SimpleNamespace(metadata=meta),
msg=InboundMessage(
channel="feishu",
sender_id="u1",
chat_id="c1",
content="start",
metadata={
"message_id": "msg-1",
"origin_message_id": "msg-0",
"_wants_stream": True,
"_stream_id": "stream-1",
"_stream_delta": True,
"_stream_end": True,
"_resuming": True,
"webui": True,
},
),
session_key="feishu:c1",
pending_queue=pending,
stop_reason="max_iterations",
final_content="paused",
all_messages=messages,
suppress_response=False,
visible_run_started_at=1234.5,
)
assert await maybe_continue_turn(ctx) is True
queued = pending.get_nowait()
assert queued.sender_id == "system:continuation"
assert queued.metadata[INTERNAL_CONTINUATION_META] is True
assert queued.metadata[INTERNAL_CONTINUATION_KIND_META] == "sustained_goal"
assert queued.metadata[INTERNAL_CONTINUATION_RUN_STARTED_AT_META] == 1234.5
assert internal_continuation_run_started_at(queued.metadata) == 1234.5
assert internal_continuation_pending(ctx.msg.metadata)
assert queued.metadata["webui"] is True
assert queued.metadata["message_id"] == "msg-1"
assert queued.metadata["origin_message_id"] == "msg-0"
assert queued.metadata["_wants_stream"] is True
assert "_stream_id" not in queued.metadata
assert "_stream_delta" not in queued.metadata
assert "_stream_end" not in queued.metadata
assert "_resuming" not in queued.metadata
assert "Finish the migration." in queued.content
assert ctx.all_messages == messages[:-1]
assert ctx.final_content == ""
assert ctx.suppress_response is True
assert ctx.msg.metadata[INTERNAL_CONTINUATION_PENDING_META] is True
assert meta["_sustained_goal_continuation_rounds"] == 1
@pytest.mark.asyncio
async def test_internal_continuation_respects_round_limit():
meta = {
GOAL_STATE_KEY: {"status": "active", "objective": "x"},
"_sustained_goal_continuation_rounds": 12,
}
ctx = SimpleNamespace(
session=SimpleNamespace(metadata=meta),
msg=InboundMessage(channel="feishu", sender_id="u1", chat_id="c1", content="start"),
session_key="feishu:c1",
pending_queue=asyncio.Queue(),
stop_reason="max_iterations",
final_content="paused",
all_messages=[],
)
assert should_stream_budget_response(
stop_reason="max_iterations",
pending_queue_available=True,
session_metadata=meta,
)
assert await maybe_continue_turn(ctx) is False
def test_internal_continuation_requires_budget_boundary_and_queue():
meta = {GOAL_STATE_KEY: {"status": "active", "objective": "x"}}
assert should_stream_budget_response(
stop_reason="completed",
pending_queue_available=True,
session_metadata=meta,
)
assert should_stream_budget_response(
stop_reason="max_iterations",
pending_queue_available=False,
session_metadata=meta,
)