diff --git a/tests/agent/test_loop_direct_websocket_status.py b/tests/agent/test_loop_direct_websocket_status.py index 2c8581e2a..dff6df803 100644 --- a/tests/agent/test_loop_direct_websocket_status.py +++ b/tests/agent/test_loop_direct_websocket_status.py @@ -1,8 +1,10 @@ +import asyncio from unittest.mock import AsyncMock, MagicMock import pytest from nanobot.agent.loop import AgentLoop +from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.providers.base import GenerationSettings, LLMResponse @@ -53,3 +55,37 @@ async def test_process_direct_websocket_clears_run_status(tmp_path) -> None: assert [status["goal_status"] for status in statuses] == ["running", "idle"] assert isinstance(statuses[0].get("started_at"), float) assert "started_at" not in statuses[1] + + +@pytest.mark.asyncio +async def test_process_direct_reuses_existing_session_lock(tmp_path) -> None: + loop = _make_loop(tmp_path) + loop._connect_mcp = AsyncMock() + session_key = "api:fixed" + lock = loop._session_locks.setdefault(session_key, asyncio.Lock()) + await lock.acquire() + entered = asyncio.Event() + + async def _process_message(msg, **_kwargs): + entered.set() + return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content=msg.content) + + loop._process_message = _process_message + task = asyncio.create_task(loop.process_direct("direct", session_key=session_key)) + try: + await asyncio.sleep(0) + assert not entered.is_set() + + lock.release() + response = await asyncio.wait_for(task, timeout=1.0) + + assert entered.is_set() + assert response is not None + assert response.content == "direct" + finally: + if lock.locked(): + lock.release() + if not task.done(): + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task