nanobot/tests/channels/test_websocket_channel.py
Xubin Ren 1c2ea1aad2
feat(goal): /goal command & long-running tasks (long_task)
* feat(long-task): add LongTaskTool for multi-step agent tasks

Implements a meta-ReAct loop where long-running tasks are broken into
sequential subagent steps, each starting fresh with the original goal
and progress from the previous step. This prevents context drift when
agents work on complex, multi-step tasks.

- Extract build_tool_registry() from SubagentManager for reuse
- Add run_step() for synchronous subagent execution (no bus announcement)
- Add HandoffTool and CompleteTool as signal mechanisms via shared dict
- Add LongTaskTool orchestrator with simplified prompt (8 iterations/step)
- Register LongTaskTool in main agent loop
- Add _extract_handoff_from_messages fallback for robustness

* fix(long-task): add debug logging for step-level observability

* feat(long-task): major overhaul with structured handoffs, validation, and observability

- Structured HandoffState: HandoffTool now accepts files_created,
  files_modified, next_step_hint, and verification fields instead of
  a plain string. Progress is passed between steps as structured data.

- Completion validation round: After complete() is called, a dedicated
  validator step runs to verify the claim against the original goal.
  If validation fails, the task continues rather than returning
  a false completion.

- Dynamic prompt system: 3 Jinja2 templates (step_start, step_middle,
  step_final) selected based on step number. Final steps get tighter
  budget and stronger "wrap up" guidance.

- Automatic file change tracking: Extracts write_file/edit_file events
  from tool_events and injects them into the next step's context if
  the subagent forgot to report them explicitly.

- Budget tracking & adaptive strategy: Cumulative token usage is tracked
  across steps. Per-step tool budget drops from 8 to 4 in the last
  two steps to force handoff/completion.

- Crash retry with graceful degradation: A step that crashes is retried
  once. Persistent crashes terminate the task and return partial progress.

- Full observability hooks for future WebUI integration:
  - set_hooks() with on_step_start, on_step_complete, on_handoff,
    on_validation_started, on_validation_passed, on_validation_failed,
    on_task_complete, on_task_error, and catch-all on_event.
  - Readable state properties: current_step, total_steps, status,
    last_handoff, cumulative_usage, goal.
  - inject_correction() allows external code to send user corrections
    that are injected into the next step's prompt.

- run_step() accepts optional max_iterations for dynamic budget control.

All 27 long-task tests and 11 subagent tests pass.

* test(long-task): add boundary tests and fix race conditions

- Add 7 edge-case tests: validation crash resilience, hook exception safety, mid-run correction injection, FIFO correction ordering, explicit file changes overriding auto-detection, final budget for max_steps=1, and dynamic budget switching boundaries

- Fix assertion in test_long_task_completes_after_multiple_handoffs to match exact prompt format

- Remove asyncio timing hack from test_state_exposure

- Add asyncio.sleep(0) yield in test_inject_correction_during_execution to prevent race between signal injection and step continuation

- All 34 tests passing

* fix(long-task): address code review findings

- Declare _scopes = {"core"} explicitly to prevent recursive nesting in subagent scope
- Document fragile coupling in _extract_file_changes: path extraction depends on
  write_file/edit_file detail format; add debug log for unexpected formats
- Align final-template threshold (max_steps - 2) with budget switch threshold
- Eliminate hasattr(self, "_state") in _reset_state by initializing in __init__

* fix(long-task): honor final signal and file tracking

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(long-task): improve prompt structure and agent contract

- Expand LongTaskTool.description to instruct parent agent on goal
  construction, return value semantics, and how to handle results.
- Expand CompleteTool.description to emphasize that the summary IS the
  final answer returned to the parent agent.
- Prefix validated return value with an explicit "final answer" directive
  to stop parent agent from re-running work.
- Redesign step_start.md: Step 1 is now explicitly for exploration,
  planning, and skeleton-building. complete() is discouraged.
- Remove bulky payload debug logging from _emit(); add targeted
  info/warning/error logs at key state transitions instead.
- Add signal_type to HandoffState for cleaner signal detection.

* test(long-task): expect wrapped completion message after validation

Align assertions with LongTaskTool final return shape on main.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(webui): turn timing strip, latency, and session-switch restore

- Agent loop: publish goal_status run/idle for WebSocket turns; attach
  wall-clock latency_ms on turn_end and persisted assistant metadata.
- WebSocket channel: forward goal_status and latency fields to clients.
- NanobotClient: track goal_status started_at per chat without requiring
  onChat; useNanobotStream restores run strip when returning to a chat.
- Thread UI: composer/shell viewport hooks for run duration and latency;
  format helpers and i18n strings.
- MessageBubble: drop trailing StreamCursor (layout artifact vs block markdown).
- Builtin / tests: model command coverage, websocket and loop tests.

Covers multi-session UX and round-trip timing visibility for the WebUI.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix: keep message-tool file attachments after canonical history hydrate

- MessageTool records per-turn media paths delivered to the active chat.
- nanobot.utils.session_attachments stages out-of-media-root files and
  merges into the last assistant message before save (loop stays a thin call).
- WebUI MediaCell: use a signed URL as a real download link when present.

Fixes attachments flashing then vanishing on turn_end when paths lived
outside get_media_dir (e.g. workspace files).

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(webui): agent activity cluster, stable keys, LTR sheen labels

- Group reasoning and tool traces in AgentActivityCluster with i18n summaries
- Stabilize React list keys for activity clusters (first message id anchor)
- Replace background-clip shimmer with overlay sheen for streaming labels
- ThreadMessages/MessageList integration and locale strings

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(webui): render assistant reasoning with Markdown + deferred stream

- Use MarkdownText for ReasoningBubble body (same GFM/KaTeX path as replies)
- Apply muted/italic prose tokens so thinking stays visually subordinate
- useDeferredValue while reasoningStreaming to ease parser work during deltas
- Preload markdown chunk when trace opens; add regression test with preloaded renderer

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(webui): default-collapse agent activity cluster while Working

Outer fold no longer auto-expands during isTurnStreaming; user opens to see traces.
Header sheen and live summary unchanged.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(long_task): cumulative run history, file union, and prompt tuning

Inject cross-step summaries and merged file paths into middle/final step
templates so chains do not lose early context. Strip the last run-history
block when it duplicates Previous Progress to save tokens. Add optional
cumulative_prompt_max_chars and cumulative_step_body_max_chars parameters
with clamped defaults.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(webui): session switch keeps in-flight thread and replays buffered WS

Save the prior chat message list to the per-chat cache in a layout effect
when chatId changes (before stale writes could corrupt another chat).
Skip one post-switch layout cache tick so we do not snapshot the wrong tab.

Buffer inbound events per chat_id when no onChat subscriber is registered
(e.g. user focused another session) and drain on resubscribe up to a cap,
so streaming deltas are not lost while off-tab.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(webui): snap thread scroll to bottom on session open (no smooth glide)

Use scroll-behavior auto on the viewport, instant programmatic scroll when
following new messages and on scrollToBottomSignal. Keep smooth only for
the explicit scroll-to-bottom button.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(webui): respect manual scroll-up after opening a session

Track when the user leaves the bottom with a ref and skip ResizeObserver
and deferred bottom snaps until they return or the conversation is reset.
Remove the time-based force-bottom window that overrode atBottom.

Multi-frame scrollToBottom honours the same guard unless force (scroll button).

Co-authored-by: Cursor <cursoragent@cursor.com>

* Publish long_task UI snapshots on outbound metadata

- Add OUTBOUND_META_AGENT_UI (_agent_ui) for channel-agnostic structured state
- LongTaskTool publishes {kind: long_task, data: snapshot} on the bus with _progress
- WebSocket send forwards metadata as agent_ui for WebUI clients
- Tests for bus payload, WS frame, and progress assertions
- Fix loop progress tests: ignore _goal_status in streaming final filter and
  avoid brittle outbound[-1] ordering after goal status idle messages

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat: WebUI long_task activity card and resilient history merge

Add optional ui_summary to the long_task tool for one-line UI labels. Stream
long_task agent_ui into a dedicated message row with timeline, markdown peek,
and a right sheet for details. Merge canonical history after turn_end while
re-inserting long_task rows before the final assistant reply. Collapse
duplicate task_start/step_start steps in the timeline and extend i18n.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor: align long_task with thread_goal and drop orchestrator UI

- Persist sustained objectives via session metadata (long_task / complete_goal); no subagent wiring or tool-driven agent_ui payloads.\n- Remove WebUI long-task activity UI, types, and translations; history merge preserves trace replay only, with legacy long_task rows normalized to traces.\n- Drop long_task prompt templates and get_long_task_run_dir; add webui thread disk helper for gateway persistence tests.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(agent): thread goal runtime context, tools, and skill

- Add thread_goal_state helper and mirror active objectives into Runtime Context
- Wire loop/context/memory/events as needed for goal metadata in turns
- Expand long_task / complete_goal semantics (pivot/cancel/honest recap)
- Add always-on thread-goal SKILL.md; align /goal command prompt
- Tests for context builder and thread goal state
- Remove unused webui ChatPane component

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(thread-goal): add websocket snapshot helper and publish goal updates from long_task

Introduce thread_goal_ws_blob for bounded JSON snapshots, attach snapshots to
websocket turn_end metadata in AgentLoop, and let long_task fan-out dedicated
thread_goal frames on the websocket channel after persisting session metadata.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(channels): websocket thread_goal frames, turn_end replay, and session API scrub for subagent inject

Emit thread_goal events and optional thread_goal on turn_end; scrub persisted
subagent announce blobs on GET /api/sessions/.../messages and shorten session
list previews so WebUI does not surface full Task/Summarize scaffolding.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(webui): merge ephemeral traces per user turn when reconciling canonical history

Preserve disk/live trace rows inside the matching user–assistant segment instead
of stacking every trace before the final assistant reply (fixes inflated tool
counts after refresh or session switch).

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(webui): show assistant reply copy only on the last slice before the next user turn

Avoid duplicate copy affordances on intermediate assistant bubbles that precede
more agent activity in the same turn (tools or further assistant text).

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(webui): thread_goal stream plumbing, composer goal strip, sky glow, and client-side subagent scrub projection

Track thread_goal and turn_goal snapshots in NanobotClient, hydrate React state
from thread_goal frames and turn_end, surface objective/elapsed in the composer,
add breathing sky halo CSS while goals are active, mirror server scrub logic on
history hydration and webui_thread snapshots, and extend tests/client mocks.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(channels): add Slack Socket Mode connect timeout with actionable timeout errors

Abort hung websockets.connect handshakes after a bounded wait, log REST-vs-WSS
guidance, surface RuntimeError to channel startup, and log successful WSS setup.

Co-authored-by: Cursor <cursoragent@cursor.com>

* webui: expand thread goal in composer bottom sheet

Add ChevronUp control on the run/goal strip that opens a bottom Sheet
with full ui_summary and objective. Inline preview logic in RunElapsedStrip,
add i18n strings across locales, and a composer unit test.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(webui): widen dedupeToolCallsForUi input for session API typing

fetchSessionMessages types tool_calls as unknown; accept unknown so tsc
build passes when passing message.tool_calls through.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(agent): extract WebSocket turn run status to webui_turn_helpers

* refactor(skills): rename thread-goal to long-task and document idempotent goals

* feat(skills): rename sustained-goal skill to long-goal and tighten long_task guidance

* chore: remove unused subagent/context/router helpers

* feat(session): rename sustained goal to goal_state and align WS/WebUI

- Move helpers from agent/thread_goal_state to session/goal_state:
  GOAL_STATE_KEY, goal_state_runtime_lines, goal_state_ws_blob, parse_goal_state.
- Session metadata now uses "goal_state"; still read legacy "thread_goal";
  long_task writes drop the legacy key after save.
- WebSocket: event/field goal_state, _goal_state_sync; turn_end carries goal_state;
  accept legacy _thread_goal_sync/thread_goal inbound metadata for dispatch.
- WebUI: GoalStateWsPayload, goalState hook/client props, i18n keys goalState*.
- Runtime Context copy uses "Goal (active):" instead of "Thread goal".

* feat(agent): stream Anthropic thinking deltas and fix stream idle timeout

* refactor(webui): transcript jsonl as sole timeline source

* fix(agent): reject mismatched WS message chat_id and stream reasoning deltas

* feat(webui): hydrate sustained goal and run timer after websocket subscribe

* chore(webui,websocket): remove unused fetch helpers and legacy thread_goal WS paths

* Raise default max_tokens and context window in agent schema.

Align AgentDefaults and ModelPresetConfig with typical Claude-scale usage
(32k completion budget, 256k context window) and update migration tests.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(gateway): bootstrap prefers in-memory model; clarify websocket naming

* fix(websocket): websocket _handle_message passes is_dm; refresh /status test expectations

---------

Co-authored-by: chengyongru <2755839590@qq.com>
Co-authored-by: chengyongru <chengyongru.ai@gmail.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-16 01:14:11 +08:00

1506 lines
51 KiB
Python

"""Unit and lightweight integration tests for the WebSocket channel."""
import asyncio
import functools
import json
import time
from typing import Any
from unittest.mock import AsyncMock, MagicMock
import httpx
import pytest
import websockets
from websockets.exceptions import ConnectionClosed
from websockets.frames import Close
from nanobot.bus.events import OUTBOUND_META_AGENT_UI, OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.websocket import (
WebSocketChannel,
WebSocketConfig,
_is_valid_chat_id,
_issue_route_secret_matches,
_normalize_config_path,
_normalize_http_path,
_parse_envelope,
_parse_inbound_payload,
_parse_query,
_parse_request_path,
publish_runtime_model_update,
)
from nanobot.config.loader import load_config, save_config
from nanobot.config.schema import Config
# -- Shared helpers (aligned with test_websocket_integration.py) ---------------
_PORT = 29876
def _ch(bus: Any, **kw: Any) -> WebSocketChannel:
cfg: dict[str, Any] = {
"enabled": True,
"allowFrom": ["*"],
"host": "127.0.0.1",
"port": _PORT,
"path": "/ws",
"websocketRequiresToken": False,
}
cfg.update(kw)
return WebSocketChannel(cfg, bus)
@pytest.fixture()
def bus() -> MagicMock:
b = MagicMock()
b.publish_inbound = AsyncMock()
return b
async def _http_get(url: str, headers: dict[str, str] | None = None) -> httpx.Response:
"""Run GET in a thread to avoid blocking the asyncio loop shared with websockets."""
return await asyncio.to_thread(
functools.partial(httpx.get, url, headers=headers or {}, timeout=5.0)
)
def test_normalize_http_path_strips_trailing_slash_except_root() -> None:
assert _normalize_http_path("/chat/") == "/chat"
assert _normalize_http_path("/chat?x=1") == "/chat"
assert _normalize_http_path("/") == "/"
def test_parse_request_path_matches_normalize_and_query() -> None:
path, query = _parse_request_path("/ws/?token=secret&client_id=u1")
assert path == _normalize_http_path("/ws/?token=secret&client_id=u1")
assert query == _parse_query("/ws/?token=secret&client_id=u1")
def test_normalize_config_path_matches_request() -> None:
assert _normalize_config_path("/ws/") == "/ws"
assert _normalize_config_path("/") == "/"
def test_parse_query_extracts_token_and_client_id() -> None:
query = _parse_query("/?token=secret&client_id=u1")
assert query.get("token") == ["secret"]
assert query.get("client_id") == ["u1"]
@pytest.mark.parametrize(
("raw", "expected"),
[
("plain", "plain"),
('{"content": "hi"}', "hi"),
('{"text": "there"}', "there"),
('{"message": "x"}', "x"),
(" ", None),
("{}", None),
],
)
def test_parse_inbound_payload(raw: str, expected: str | None) -> None:
assert _parse_inbound_payload(raw) == expected
def test_parse_inbound_invalid_json_falls_back_to_raw_string() -> None:
assert _parse_inbound_payload("{not json") == "{not json"
@pytest.mark.parametrize(
("raw", "expected"),
[
('{"content": ""}', None), # empty string content
('{"content": 123}', None), # non-string content
('{"content": " "}', None), # whitespace-only content
('["hello"]', '["hello"]'), # JSON array: not a dict, treated as plain text
('{"unknown_key": "val"}', None), # unrecognized key
('{"content": null}', None), # null content
],
)
def test_parse_inbound_payload_edge_cases(raw: str, expected: str | None) -> None:
assert _parse_inbound_payload(raw) == expected
def test_web_socket_config_path_must_start_with_slash() -> None:
with pytest.raises(ValueError, match='path must start with "/"'):
WebSocketConfig(path="bad")
def test_ssl_context_requires_both_cert_and_key_files() -> None:
bus = MagicMock()
channel = WebSocketChannel(
{"enabled": True, "allowFrom": ["*"], "sslCertfile": "/tmp/c.pem", "sslKeyfile": ""},
bus,
)
with pytest.raises(ValueError, match="ssl_certfile and ssl_keyfile"):
channel._build_ssl_context()
def test_default_config_includes_safe_bind_and_streaming() -> None:
defaults = WebSocketChannel.default_config()
assert defaults["enabled"] is False
assert defaults["host"] == "127.0.0.1"
assert defaults["streaming"] is True
assert defaults["allowFrom"] == ["*"]
assert defaults.get("tokenIssuePath", "") == ""
def test_token_issue_path_must_differ_from_websocket_path() -> None:
with pytest.raises(ValueError, match="token_issue_path must differ"):
WebSocketConfig(path="/ws", token_issue_path="/ws")
def test_issue_route_secret_matches_bearer_and_header() -> None:
from websockets.datastructures import Headers
secret = "my-secret"
bearer_headers = Headers([("Authorization", "Bearer my-secret")])
assert _issue_route_secret_matches(bearer_headers, secret) is True
x_headers = Headers([("X-Nanobot-Auth", "my-secret")])
assert _issue_route_secret_matches(x_headers, secret) is True
wrong = Headers([("Authorization", "Bearer other")])
assert _issue_route_secret_matches(wrong, secret) is False
def test_issue_route_secret_matches_empty_secret() -> None:
from websockets.datastructures import Headers
# Empty secret always returns True regardless of headers
assert _issue_route_secret_matches(Headers([]), "") is True
assert _issue_route_secret_matches(Headers([("Authorization", "Bearer anything")]), "") is True
@pytest.mark.asyncio
async def test_webui_message_envelope_marks_inbound_metadata(bus: MagicMock) -> None:
channel = _ch(bus)
conn = MagicMock()
conn.remote_address = ("127.0.0.1", 50123)
await channel._dispatch_envelope(
conn,
"webui-client",
{"type": "message", "chat_id": "chat-1", "content": "hello", "webui": True},
)
msg = bus.publish_inbound.await_args.args[0]
assert msg.channel == "websocket"
assert msg.chat_id == "chat-1"
assert msg.metadata["webui"] is True
assert msg.metadata["_wants_stream"] is True
@pytest.mark.asyncio
async def test_plain_websocket_message_does_not_mark_webui(bus: MagicMock) -> None:
channel = _ch(bus)
conn = MagicMock()
await channel._dispatch_envelope(
conn,
"custom-client",
{"type": "message", "chat_id": "chat-1", "content": "hello"},
)
msg = bus.publish_inbound.await_args.args[0]
assert "webui" not in msg.metadata
@pytest.mark.asyncio
async def test_send_delivers_json_message_with_media_and_reply() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
msg = OutboundMessage(
channel="websocket",
chat_id="chat-1",
content="hello",
reply_to="m1",
media=["/tmp/a.png"],
buttons=[["Yes", "No"]],
)
await channel.send(msg)
mock_ws.send.assert_awaited_once()
payload = json.loads(mock_ws.send.call_args[0][0])
assert payload["event"] == "message"
assert payload["chat_id"] == "chat-1"
assert payload["text"] == "hello"
assert payload["reply_to"] == "m1"
assert payload["media"] == ["/tmp/a.png"]
@pytest.mark.asyncio
async def test_send_broadcasts_runtime_model_updates() -> None:
bus = MessageBus()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
publish_runtime_model_update(bus, "openai/gpt-4.1", "fast")
await channel.send(bus.outbound.get_nowait())
payload = json.loads(mock_ws.send.call_args[0][0])
assert payload["event"] == "runtime_model_updated"
assert payload["model_name"] == "openai/gpt-4.1"
assert payload["model_preset"] == "fast"
@pytest.mark.asyncio
async def test_runtime_model_update_publisher_uses_websocket_outbound_event() -> None:
bus = MessageBus()
publish_runtime_model_update(
bus,
"openai/gpt-4.1",
"fast",
)
event = bus.outbound.get_nowait()
assert event.channel == "websocket"
assert event.chat_id == "*"
assert event.content == ""
assert event.metadata == {
"_runtime_model_updated": True,
"model": "openai/gpt-4.1",
"model_preset": "fast",
}
@pytest.mark.asyncio
async def test_send_stages_external_media_as_signed_url(monkeypatch, tmp_path) -> None:
bus = MagicMock()
media_root = tmp_path / "media"
ws_media = media_root / "websocket"
ws_media.mkdir(parents=True)
external = tmp_path / "clip.mp4"
external.write_bytes(b"video")
def fake_media_dir(channel: str | None = None):
return ws_media if channel == "websocket" else media_root
monkeypatch.setattr("nanobot.channels.websocket.get_media_dir", fake_media_dir)
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
await channel.send(
OutboundMessage(
channel="websocket",
chat_id="chat-1",
content="video",
media=[str(external)],
)
)
payload = json.loads(mock_ws.send.call_args[0][0])
assert payload["media"] == [str(external)]
assert payload["media_urls"][0]["name"] == "clip.mp4"
assert payload["media_urls"][0]["url"].startswith("/api/media/")
assert any(p.name.endswith("-clip.mp4") for p in ws_media.iterdir())
@pytest.mark.asyncio
async def test_send_missing_connection_is_noop_without_error() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
msg = OutboundMessage(channel="websocket", chat_id="missing", content="x")
await channel.send(msg)
@pytest.mark.asyncio
async def test_send_removes_connection_on_connection_closed() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
mock_ws.send.side_effect = ConnectionClosed(Close(1006, ""), Close(1006, ""), True)
channel._attach(mock_ws, "chat-1")
msg = OutboundMessage(channel="websocket", chat_id="chat-1", content="hello")
await channel.send(msg)
assert "chat-1" not in channel._subs
assert mock_ws not in channel._conn_chats
@pytest.mark.asyncio
async def test_send_progress_includes_structured_tool_events() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
await channel.send(OutboundMessage(
channel="websocket",
chat_id="chat-1",
content='search "hermes"',
metadata={
"_progress": True,
"_tool_hint": True,
"_tool_events": [
{
"version": 1,
"phase": "start",
"call_id": "call-1",
"name": "web_search",
"arguments": {"query": "hermes", "count": 8},
"result": None,
"error": None,
"files": [],
"embeds": [],
}
],
},
))
payload = json.loads(mock_ws.send.await_args.args[0])
assert payload["event"] == "message"
assert payload["kind"] == "tool_hint"
assert payload["tool_events"] == [
{
"version": 1,
"phase": "start",
"call_id": "call-1",
"name": "web_search",
"arguments": {"query": "hermes", "count": 8},
"result": None,
"error": None,
"files": [],
"embeds": [],
}
]
@pytest.mark.asyncio
async def test_send_progress_includes_agent_ui_blob() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
blob = {
"kind": "panel",
"data": {"version": 1, "event": "tick", "id": "r1"},
}
await channel.send(OutboundMessage(
channel="websocket",
chat_id="chat-1",
content="progress · panel",
metadata={"_progress": True, OUTBOUND_META_AGENT_UI: blob},
))
payload = json.loads(mock_ws.send.await_args.args[0])
assert payload["event"] == "message"
assert payload["kind"] == "progress"
assert payload["agent_ui"] == blob
@pytest.mark.asyncio
async def test_send_delta_removes_connection_on_connection_closed() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"], "streaming": True}, bus)
mock_ws = AsyncMock()
mock_ws.send.side_effect = ConnectionClosed(Close(1006, ""), Close(1006, ""), True)
channel._attach(mock_ws, "chat-1")
await channel.send_delta("chat-1", "chunk", {"_stream_delta": True, "_stream_id": "s1"})
assert "chat-1" not in channel._subs
assert mock_ws not in channel._conn_chats
@pytest.mark.asyncio
async def test_send_delta_emits_delta_and_stream_end() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"], "streaming": True}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
await channel.send_delta("chat-1", "part", {"_stream_delta": True, "_stream_id": "sid"})
await channel.send_delta("chat-1", "", {"_stream_end": True, "_stream_id": "sid"})
assert mock_ws.send.await_count == 2
first = json.loads(mock_ws.send.call_args_list[0][0][0])
second = json.loads(mock_ws.send.call_args_list[1][0][0])
assert first["event"] == "delta"
assert first["chat_id"] == "chat-1"
assert first["text"] == "part"
assert first["stream_id"] == "sid"
assert second["event"] == "stream_end"
assert second["chat_id"] == "chat-1"
assert second["stream_id"] == "sid"
@pytest.mark.asyncio
async def test_send_reasoning_delta_emits_streaming_frame() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
await channel.send_reasoning_delta(
"chat-1",
"step-by-step thinking",
{"_reasoning_delta": True, "_stream_id": "r1"},
)
mock_ws.send.assert_awaited_once()
payload = json.loads(mock_ws.send.await_args.args[0])
assert payload["event"] == "reasoning_delta"
assert payload["chat_id"] == "chat-1"
assert payload["text"] == "step-by-step thinking"
assert payload["stream_id"] == "r1"
@pytest.mark.asyncio
async def test_send_reasoning_end_emits_close_frame() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
await channel.send_reasoning_end("chat-1", {"_reasoning_end": True, "_stream_id": "r1"})
payload = json.loads(mock_ws.send.await_args.args[0])
assert payload == {"event": "reasoning_end", "chat_id": "chat-1", "stream_id": "r1"}
@pytest.mark.asyncio
async def test_send_reasoning_one_shot_expands_to_delta_plus_end() -> None:
"""``send_reasoning`` is back-compat for hooks that haven't migrated:
the base implementation must produce one delta and one end so the
WebUI sees the same shape either way."""
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
await channel.send_reasoning(OutboundMessage(
channel="websocket",
chat_id="chat-1",
content="thinking",
metadata={"_reasoning": True},
))
assert mock_ws.send.await_count == 2
first = json.loads(mock_ws.send.call_args_list[0][0][0])
second = json.loads(mock_ws.send.call_args_list[1][0][0])
assert first["event"] == "reasoning_delta"
assert first["text"] == "thinking"
assert second["event"] == "reasoning_end"
@pytest.mark.asyncio
async def test_send_reasoning_delta_drops_empty_chunks() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
await channel.send_reasoning_delta("chat-1", "", {"_reasoning_delta": True})
mock_ws.send.assert_not_awaited()
@pytest.mark.asyncio
async def test_send_reasoning_without_subscribers_is_noop() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
await channel.send_reasoning_delta("unattached", "thinking", None)
await channel.send_reasoning_end("unattached", None)
# No subscribers, no exception, no send.
@pytest.mark.asyncio
async def test_send_turn_end_emits_turn_end_event() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
await channel.send(OutboundMessage(
channel="websocket",
chat_id="chat-1",
content="",
metadata={"_turn_end": True},
))
mock_ws.send.assert_awaited_once()
body = json.loads(mock_ws.send.await_args.args[0])
assert body == {"event": "turn_end", "chat_id": "chat-1"}
@pytest.mark.asyncio
async def test_send_turn_end_includes_latency_ms_when_present() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
await channel.send(OutboundMessage(
channel="websocket",
chat_id="chat-1",
content="",
metadata={"_turn_end": True, "latency_ms": 1500},
))
mock_ws.send.assert_awaited_once()
body = json.loads(mock_ws.send.await_args.args[0])
assert body == {"event": "turn_end", "chat_id": "chat-1", "latency_ms": 1500}
@pytest.mark.asyncio
async def test_send_turn_end_includes_goal_state_when_present() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
blob = {"active": True, "ui_summary": "Explore codebase"}
await channel.send(OutboundMessage(
channel="websocket",
chat_id="chat-1",
content="",
metadata={"_turn_end": True, "goal_state": blob},
))
mock_ws.send.assert_awaited_once()
body = json.loads(mock_ws.send.await_args.args[0])
assert body == {"event": "turn_end", "chat_id": "chat-1", "goal_state": blob}
@pytest.mark.asyncio
async def test_send_goal_status_running_emits_event_with_started_at() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
await channel.send(OutboundMessage(
channel="websocket",
chat_id="chat-1",
content="",
metadata={
"_goal_status": True,
"goal_status": "running",
"started_at": 1_700_000_000.5,
},
))
mock_ws.send.assert_awaited_once()
body = json.loads(mock_ws.send.await_args.args[0])
assert body == {
"event": "goal_status",
"chat_id": "chat-1",
"status": "running",
"started_at": 1_700_000_000.5,
}
@pytest.mark.asyncio
async def test_send_goal_status_idle_omits_started_at() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
await channel.send(OutboundMessage(
channel="websocket",
chat_id="chat-1",
content="",
metadata={
"_goal_status": True,
"goal_status": "idle",
"goal_started_at": 99.0,
},
))
mock_ws.send.assert_awaited_once()
body = json.loads(mock_ws.send.await_args.args[0])
assert body == {"event": "goal_status", "chat_id": "chat-1", "status": "idle"}
@pytest.mark.asyncio
async def test_send_goal_state_emits_blob_per_chat() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_a = AsyncMock()
mock_b = AsyncMock()
channel._attach(mock_a, "chat-a")
channel._attach(mock_b, "chat-b")
await channel.send(OutboundMessage(
channel="websocket",
chat_id="chat-a",
content="",
metadata={
"_goal_state_sync": True,
"goal_state": {"active": True, "ui_summary": "A"},
},
))
mock_a.send.assert_awaited_once()
mock_b.send.assert_not_called()
body = json.loads(mock_a.send.await_args.args[0])
assert body == {
"event": "goal_state",
"chat_id": "chat-a",
"goal_state": {"active": True, "ui_summary": "A"},
}
@pytest.mark.asyncio
async def test_maybe_push_active_goal_state_noop_without_session_manager() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
channel._session_manager = None
await channel._maybe_push_active_goal_state("chat-1")
mock_ws.send.assert_not_called()
@pytest.mark.asyncio
async def test_maybe_push_active_goal_state_skips_when_no_goal_on_disk() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
sm = MagicMock()
sm.read_session_file.return_value = None
channel._session_manager = sm
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
await channel._maybe_push_active_goal_state("chat-1")
mock_ws.send.assert_not_called()
@pytest.mark.asyncio
async def test_maybe_push_active_goal_state_notifies_when_goal_active_on_disk() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
sm = MagicMock()
sm.read_session_file.return_value = {
"metadata": {
"goal_state": {
"status": "active",
"objective": "finish docs",
"ui_summary": "Docs",
},
},
"messages": [],
}
channel._session_manager = sm
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
await channel._maybe_push_active_goal_state("chat-1")
mock_ws.send.assert_awaited_once()
body = json.loads(mock_ws.send.await_args.args[0])
assert body["event"] == "goal_state"
assert body["chat_id"] == "chat-1"
assert body["goal_state"]["active"] is True
assert body["goal_state"]["objective"] == "finish docs"
assert body["goal_state"]["ui_summary"] == "Docs"
@pytest.mark.asyncio
async def test_maybe_push_turn_run_wall_clock_skips_when_no_active_turn() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
from nanobot.utils import webui_turn_helpers as wth
wth._WEBSOCKET_TURN_WALL_STARTED_AT.clear()
await channel._maybe_push_turn_run_wall_clock("chat-1")
mock_ws.send.assert_not_called()
@pytest.mark.asyncio
async def test_maybe_push_turn_run_wall_clock_replays_running() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
from nanobot.utils import webui_turn_helpers as wth
wth._WEBSOCKET_TURN_WALL_STARTED_AT.clear()
try:
wth._WEBSOCKET_TURN_WALL_STARTED_AT["chat-1"] = 1_700_000_000.0
await channel._maybe_push_turn_run_wall_clock("chat-1")
finally:
wth._WEBSOCKET_TURN_WALL_STARTED_AT.pop("chat-1", None)
mock_ws.send.assert_awaited_once()
body = json.loads(mock_ws.send.await_args.args[0])
assert body == {
"event": "goal_status",
"chat_id": "chat-1",
"status": "running",
"started_at": 1_700_000_000.0,
}
@pytest.mark.asyncio
async def test_send_session_updated_emits_session_updated_event() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
await channel.send(OutboundMessage(
channel="websocket",
chat_id="chat-1",
content="",
metadata={"_session_updated": True},
))
mock_ws.send.assert_awaited_once()
body = json.loads(mock_ws.send.await_args.args[0])
assert body == {"event": "session_updated", "chat_id": "chat-1"}
@pytest.mark.asyncio
async def test_send_non_connection_closed_exception_is_raised() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
mock_ws.send.side_effect = RuntimeError("unexpected")
channel._attach(mock_ws, "chat-1")
msg = OutboundMessage(channel="websocket", chat_id="chat-1", content="hello")
with pytest.raises(RuntimeError, match="unexpected"):
await channel.send(msg)
@pytest.mark.asyncio
async def test_send_delta_missing_connection_is_noop() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"], "streaming": True}, bus)
# No exception, no error — just a no-op
await channel.send_delta("nonexistent", "chunk", {"_stream_delta": True, "_stream_id": "s1"})
@pytest.mark.asyncio
async def test_stop_is_idempotent() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
# stop() before start() should not raise
await channel.stop()
await channel.stop()
@pytest.mark.asyncio
async def test_end_to_end_client_receives_ready_and_agent_sees_inbound(bus: MagicMock) -> None:
port = 29876
channel = _ch(bus, port=port)
server_task = asyncio.create_task(channel.start())
await asyncio.sleep(0.3)
try:
async with websockets.connect(f"ws://127.0.0.1:{port}/ws?client_id=tester") as client:
ready_raw = await client.recv()
ready = json.loads(ready_raw)
assert ready["event"] == "ready"
assert ready["client_id"] == "tester"
chat_id = ready["chat_id"]
await client.send(json.dumps({"content": "ping from client"}))
await asyncio.sleep(0.08)
bus.publish_inbound.assert_awaited()
inbound = bus.publish_inbound.call_args[0][0]
assert inbound.channel == "websocket"
assert inbound.sender_id == "tester"
assert inbound.chat_id == chat_id
assert inbound.content == "ping from client"
await client.send("plain text frame")
await asyncio.sleep(0.08)
assert bus.publish_inbound.await_count >= 2
second = [c[0][0] for c in bus.publish_inbound.call_args_list][-1]
assert second.content == "plain text frame"
finally:
await channel.stop()
await server_task
@pytest.mark.asyncio
async def test_token_rejects_handshake_when_mismatch(bus: MagicMock) -> None:
port = 29877
channel = _ch(bus, port=port, path="/", token="secret")
server_task = asyncio.create_task(channel.start())
await asyncio.sleep(0.3)
try:
with pytest.raises(websockets.exceptions.InvalidStatus) as excinfo:
async with websockets.connect(f"ws://127.0.0.1:{port}/?token=wrong"):
pass
assert excinfo.value.response.status_code == 401
finally:
await channel.stop()
await server_task
@pytest.mark.asyncio
async def test_wrong_path_returns_404(bus: MagicMock) -> None:
port = 29878
channel = _ch(bus, port=port)
server_task = asyncio.create_task(channel.start())
await asyncio.sleep(0.3)
try:
with pytest.raises(websockets.exceptions.InvalidStatus) as excinfo:
async with websockets.connect(f"ws://127.0.0.1:{port}/other"):
pass
assert excinfo.value.response.status_code == 404
finally:
await channel.stop()
await server_task
def test_registry_discovers_websocket_channel() -> None:
from nanobot.channels.registry import load_channel_class
cls = load_channel_class("websocket")
assert cls.name == "websocket"
@pytest.mark.asyncio
async def test_http_route_issues_token_then_websocket_requires_it(bus: MagicMock) -> None:
port = 29879
channel = _ch(
bus, port=port,
tokenIssuePath="/auth/token",
tokenIssueSecret="route-secret",
websocketRequiresToken=True,
)
server_task = asyncio.create_task(channel.start())
await asyncio.sleep(0.3)
try:
deny = await _http_get(f"http://127.0.0.1:{port}/auth/token")
assert deny.status_code == 401
issue = await _http_get(
f"http://127.0.0.1:{port}/auth/token",
headers={"Authorization": "Bearer route-secret"},
)
assert issue.status_code == 200
token = issue.json()["token"]
assert token.startswith("nbwt_")
with pytest.raises(websockets.exceptions.InvalidStatus) as missing_token:
async with websockets.connect(f"ws://127.0.0.1:{port}/ws?client_id=x"):
pass
assert missing_token.value.response.status_code == 401
uri = f"ws://127.0.0.1:{port}/ws?token={token}&client_id=caller"
async with websockets.connect(uri) as client:
ready = json.loads(await client.recv())
assert ready["event"] == "ready"
assert ready["client_id"] == "caller"
with pytest.raises(websockets.exceptions.InvalidStatus) as reuse:
async with websockets.connect(uri):
pass
assert reuse.value.response.status_code == 401
finally:
await channel.stop()
await server_task
@pytest.mark.asyncio
async def test_settings_api_returns_safe_subset_and_updates_whitelist(
bus: MagicMock,
monkeypatch,
tmp_path,
) -> None:
port = 29891
config_path = tmp_path / "config.json"
config = Config()
config.agents.defaults.model = "openai/gpt-4o"
config.providers.openai.api_key = "secret-key"
config.tools.web.search.provider = "brave"
config.tools.web.search.api_key = "brave-secret"
save_config(config, config_path)
monkeypatch.setattr("nanobot.config.loader._current_config_path", config_path)
channel = _ch(bus, port=port)
channel._api_tokens["tok"] = time.monotonic() + 300
server_task = asyncio.create_task(channel.start())
await asyncio.sleep(0.3)
try:
settings = await _http_get(
f"http://127.0.0.1:{port}/api/settings",
headers={"Authorization": "Bearer tok"},
)
assert settings.status_code == 200
body = settings.json()
assert body["agent"]["model"] == "openai/gpt-4o"
assert body["agent"]["provider"] == "openai"
providers = {provider["name"]: provider for provider in body["providers"]}
assert providers["openai"]["configured"] is True
assert providers["openai"]["api_key_hint"] == "secr••••-key"
assert providers["openrouter"]["configured"] is False
assert body["agent"]["has_api_key"] is True
assert body["web_search"]["provider"] == "brave"
assert body["web_search"]["api_key_hint"] == "brav••••cret"
search_providers = {provider["name"]: provider for provider in body["web_search"]["providers"]}
assert search_providers["duckduckgo"]["credential"] == "none"
assert search_providers["searxng"]["credential"] == "base_url"
assert "secret-key" not in settings.text
assert "brave-secret" not in settings.text
provider_updated = await _http_get(
"http://127.0.0.1:"
f"{port}/api/settings/provider/update?provider=openrouter"
"&api_key=sk-or-test&api_base=https%3A%2F%2Fopenrouter.ai%2Fapi%2Fv1",
headers={"Authorization": "Bearer tok"},
)
assert provider_updated.status_code == 200
provider_body = provider_updated.json()
assert provider_body["requires_restart"] is False
provider_rows = {provider["name"]: provider for provider in provider_body["providers"]}
assert provider_rows["openrouter"]["configured"] is True
assert "sk-or-test" not in provider_updated.text
updated = await _http_get(
"http://127.0.0.1:"
f"{port}/api/settings/update?model=openrouter/test"
"&provider=openrouter",
headers={"Authorization": "Bearer tok"},
)
assert updated.status_code == 200
assert updated.json()["requires_restart"] is False
search_updated = await _http_get(
"http://127.0.0.1:"
f"{port}/api/settings/web-search/update?provider=searxng"
"&base_url=https%3A%2F%2Fsearch.example.com",
headers={"Authorization": "Bearer tok"},
)
assert search_updated.status_code == 200
search_body = search_updated.json()
assert search_body["requires_restart"] is False
assert search_body["web_search"]["provider"] == "searxng"
assert search_body["web_search"]["api_key_hint"] is None
assert search_body["web_search"]["base_url"] == "https://search.example.com"
saved = load_config(config_path)
assert saved.agents.defaults.model == "openrouter/test"
assert saved.agents.defaults.provider == "openrouter"
assert saved.providers.openrouter.api_key == "sk-or-test"
assert saved.providers.openrouter.api_base == "https://openrouter.ai/api/v1"
assert saved.tools.web.search.provider == "searxng"
assert saved.tools.web.search.api_key == ""
assert saved.tools.web.search.base_url == "https://search.example.com"
finally:
await channel.stop()
await server_task
@pytest.mark.asyncio
async def test_commands_api_returns_slash_command_metadata(bus: MagicMock) -> None:
port = 29892
channel = _ch(bus, port=port)
channel._api_tokens["tok"] = time.monotonic() + 300
server_task = asyncio.create_task(channel.start())
await asyncio.sleep(0.3)
try:
denied = await _http_get(f"http://127.0.0.1:{port}/api/commands")
assert denied.status_code == 401
response = await _http_get(
f"http://127.0.0.1:{port}/api/commands",
headers={"Authorization": "Bearer tok"},
)
assert response.status_code == 200
body = response.json()
commands = {row["command"]: row for row in body["commands"]}
assert commands["/stop"]["title"] == "Stop current task"
assert commands["/history"]["arg_hint"] == "[n]"
assert all("description" in row for row in body["commands"])
finally:
await channel.stop()
await server_task
def test_settings_payload_normalizes_camel_case_provider(
bus: MagicMock,
monkeypatch,
tmp_path,
) -> None:
config_path = tmp_path / "config.json"
config = Config()
config.agents.defaults.provider = "minimaxAnthropic"
save_config(config, config_path)
monkeypatch.setattr("nanobot.config.loader._current_config_path", config_path)
body = _ch(bus)._settings_payload()
assert body["agent"]["provider"] == "minimax_anthropic"
@pytest.mark.asyncio
async def test_end_to_end_server_pushes_streaming_deltas_to_client(bus: MagicMock) -> None:
port = 29880
channel = _ch(bus, port=port, streaming=True)
server_task = asyncio.create_task(channel.start())
await asyncio.sleep(0.3)
try:
async with websockets.connect(f"ws://127.0.0.1:{port}/ws?client_id=stream-tester") as client:
ready_raw = await client.recv()
ready = json.loads(ready_raw)
chat_id = ready["chat_id"]
# Server pushes deltas directly
await channel.send_delta(
chat_id, "Hello ", {"_stream_delta": True, "_stream_id": "s1"}
)
await channel.send_delta(
chat_id, "world", {"_stream_delta": True, "_stream_id": "s1"}
)
await channel.send_delta(
chat_id, "", {"_stream_end": True, "_stream_id": "s1"}
)
delta1 = json.loads(await client.recv())
assert delta1["event"] == "delta"
assert delta1["text"] == "Hello "
assert delta1["stream_id"] == "s1"
delta2 = json.loads(await client.recv())
assert delta2["event"] == "delta"
assert delta2["text"] == "world"
assert delta2["stream_id"] == "s1"
end = json.loads(await client.recv())
assert end["event"] == "stream_end"
assert end["stream_id"] == "s1"
await channel.send(OutboundMessage(
channel="websocket",
chat_id=chat_id,
content="",
metadata={"_turn_end": True},
))
turn_end = json.loads(await client.recv())
assert turn_end == {"event": "turn_end", "chat_id": chat_id}
finally:
await channel.stop()
await server_task
@pytest.mark.asyncio
async def test_token_issue_rejects_when_at_capacity(bus: MagicMock) -> None:
port = 29881
channel = _ch(bus, port=port, tokenIssuePath="/auth/token", tokenIssueSecret="s")
server_task = asyncio.create_task(channel.start())
await asyncio.sleep(0.3)
try:
# Fill issued tokens to capacity
channel._issued_tokens = {
f"nbwt_fill_{i}": time.monotonic() + 300 for i in range(channel._MAX_ISSUED_TOKENS)
}
resp = await _http_get(
f"http://127.0.0.1:{port}/auth/token",
headers={"Authorization": "Bearer s"},
)
assert resp.status_code == 429
data = resp.json()
assert "error" in data
finally:
await channel.stop()
await server_task
@pytest.mark.asyncio
async def test_allow_from_rejects_unauthorized_client_id(bus: MagicMock) -> None:
port = 29882
channel = _ch(bus, port=port, allowFrom=["alice", "bob"])
server_task = asyncio.create_task(channel.start())
await asyncio.sleep(0.3)
try:
with pytest.raises(websockets.exceptions.InvalidStatus) as exc_info:
async with websockets.connect(f"ws://127.0.0.1:{port}/ws?client_id=eve"):
pass
assert exc_info.value.response.status_code == 403
finally:
await channel.stop()
await server_task
@pytest.mark.asyncio
async def test_client_id_truncation(bus: MagicMock) -> None:
port = 29883
channel = _ch(bus, port=port)
server_task = asyncio.create_task(channel.start())
await asyncio.sleep(0.3)
try:
long_id = "x" * 200
async with websockets.connect(f"ws://127.0.0.1:{port}/ws?client_id={long_id}") as client:
ready = json.loads(await client.recv())
assert ready["client_id"] == "x" * 128
assert len(ready["client_id"]) == 128
finally:
await channel.stop()
await server_task
@pytest.mark.asyncio
async def test_non_utf8_binary_frame_ignored(bus: MagicMock) -> None:
port = 29884
channel = _ch(bus, port=port)
server_task = asyncio.create_task(channel.start())
await asyncio.sleep(0.3)
try:
async with websockets.connect(f"ws://127.0.0.1:{port}/ws?client_id=bin-test") as client:
await client.recv() # consume ready
# Send non-UTF-8 bytes
await client.send(b"\xff\xfe\xfd")
await asyncio.sleep(0.05)
# publish_inbound should NOT have been called
bus.publish_inbound.assert_not_awaited()
finally:
await channel.stop()
await server_task
@pytest.mark.asyncio
async def test_static_token_accepts_issued_token_as_fallback(bus: MagicMock) -> None:
port = 29885
channel = _ch(
bus, port=port,
token="static-secret",
tokenIssuePath="/auth/token",
tokenIssueSecret="route-secret",
)
server_task = asyncio.create_task(channel.start())
await asyncio.sleep(0.3)
try:
# Get an issued token
resp = await _http_get(
f"http://127.0.0.1:{port}/auth/token",
headers={"Authorization": "Bearer route-secret"},
)
assert resp.status_code == 200
issued_token = resp.json()["token"]
# Connect using issued token (not the static one)
async with websockets.connect(f"ws://127.0.0.1:{port}/ws?token={issued_token}&client_id=caller") as client:
ready = json.loads(await client.recv())
assert ready["event"] == "ready"
finally:
await channel.stop()
await server_task
@pytest.mark.asyncio
async def test_allow_from_empty_list_denies_all(bus: MagicMock) -> None:
port = 29886
channel = _ch(bus, port=port, allowFrom=[])
server_task = asyncio.create_task(channel.start())
await asyncio.sleep(0.3)
try:
with pytest.raises(websockets.exceptions.InvalidStatus) as exc_info:
async with websockets.connect(f"ws://127.0.0.1:{port}/ws?client_id=anyone"):
pass
assert exc_info.value.response.status_code == 403
finally:
await channel.stop()
await server_task
@pytest.mark.asyncio
async def test_websocket_requires_token_without_issue_path(bus: MagicMock) -> None:
"""When websocket_requires_token is True but no token or issue path configured, all connections are rejected."""
port = 29887
channel = _ch(bus, port=port, websocketRequiresToken=True)
server_task = asyncio.create_task(channel.start())
await asyncio.sleep(0.3)
try:
# No token at all → 401
with pytest.raises(websockets.exceptions.InvalidStatus) as exc_info:
async with websockets.connect(f"ws://127.0.0.1:{port}/ws?client_id=u"):
pass
assert exc_info.value.response.status_code == 401
# Wrong token → 401
with pytest.raises(websockets.exceptions.InvalidStatus) as exc_info:
async with websockets.connect(f"ws://127.0.0.1:{port}/ws?client_id=u&token=wrong"):
pass
assert exc_info.value.response.status_code == 401
finally:
await channel.stop()
await server_task
# -- Multi-chat multiplexing -------------------------------------------------
#
# The multiplex protocol lets one WS connection route N logical chats over
# typed envelopes (`new_chat` / `attach` / `message`). Legacy frames must keep
# working on the connection's default chat_id.
@pytest.mark.asyncio
async def test_multiplex_legacy_still_works(bus: MagicMock) -> None:
port = 29930
channel = _ch(bus, port=port)
server_task = asyncio.create_task(channel.start())
await asyncio.sleep(0.3)
try:
async with websockets.connect(f"ws://127.0.0.1:{port}/ws?client_id=legacy") as client:
ready = json.loads(await client.recv())
default_chat = ready["chat_id"]
# Plain text frame routes to default chat_id
await client.send("hello from legacy")
await asyncio.sleep(0.1)
inbound = bus.publish_inbound.call_args[0][0]
assert inbound.chat_id == default_chat
assert inbound.content == "hello from legacy"
# {"content": ...} frame routes to default chat_id
await client.send(json.dumps({"content": "structured legacy"}))
await asyncio.sleep(0.1)
assert bus.publish_inbound.call_args[0][0].chat_id == default_chat
assert bus.publish_inbound.call_args[0][0].content == "structured legacy"
# Outbound still reaches the legacy client, with chat_id annotated
await channel.send(
OutboundMessage(channel="websocket", chat_id=default_chat, content="reply")
)
reply = json.loads(await client.recv())
assert reply["event"] == "message"
assert reply["chat_id"] == default_chat
assert reply["text"] == "reply"
finally:
await channel.stop()
await server_task
@pytest.mark.asyncio
async def test_multiplex_new_chat_roundtrip(bus: MagicMock) -> None:
port = 29931
channel = _ch(bus, port=port)
server_task = asyncio.create_task(channel.start())
await asyncio.sleep(0.3)
try:
async with websockets.connect(f"ws://127.0.0.1:{port}/ws?client_id=mp") as client:
ready = json.loads(await client.recv())
default_chat = ready["chat_id"]
await client.send(json.dumps({"type": "new_chat"}))
attached = json.loads(await client.recv())
assert attached["event"] == "attached"
new_chat = attached["chat_id"]
assert new_chat and new_chat != default_chat
# Send on the new chat via typed envelope
await client.send(
json.dumps({"type": "message", "chat_id": new_chat, "content": "hi on new"})
)
await asyncio.sleep(0.1)
inbound = bus.publish_inbound.call_args[0][0]
assert inbound.chat_id == new_chat
assert inbound.content == "hi on new"
# Server pushes a message back; chat_id must match
await channel.send(
OutboundMessage(channel="websocket", chat_id=new_chat, content="ok")
)
reply = json.loads(await client.recv())
assert reply["event"] == "message"
assert reply["chat_id"] == new_chat
assert reply["text"] == "ok"
finally:
await channel.stop()
await server_task
@pytest.mark.asyncio
async def test_multiplex_two_chats_isolated(bus: MagicMock) -> None:
port = 29932
channel = _ch(bus, port=port)
server_task = asyncio.create_task(channel.start())
await asyncio.sleep(0.3)
try:
async with websockets.connect(f"ws://127.0.0.1:{port}/ws?client_id=two") as client:
await client.recv() # ready
await client.send(json.dumps({"type": "new_chat"}))
chat_a = json.loads(await client.recv())["chat_id"]
await client.send(json.dumps({"type": "new_chat"}))
chat_b = json.loads(await client.recv())["chat_id"]
assert chat_a != chat_b
# Push A → client sees A only (FIFO over the single WS).
await channel.send(
OutboundMessage(channel="websocket", chat_id=chat_a, content="for-A")
)
msg_a = json.loads(await client.recv())
assert msg_a["chat_id"] == chat_a
assert msg_a["text"] == "for-A"
# Push B → client sees B only.
await channel.send(
OutboundMessage(channel="websocket", chat_id=chat_b, content="for-B")
)
msg_b = json.loads(await client.recv())
assert msg_b["chat_id"] == chat_b
assert msg_b["text"] == "for-B"
finally:
await channel.stop()
await server_task
@pytest.mark.asyncio
async def test_multiplex_invalid_frames_return_error(bus: MagicMock) -> None:
port = 29933
channel = _ch(bus, port=port)
server_task = asyncio.create_task(channel.start())
await asyncio.sleep(0.3)
try:
async with websockets.connect(f"ws://127.0.0.1:{port}/ws?client_id=bad") as client:
await client.recv() # ready
# attach with bad chat_id
await client.send(json.dumps({"type": "attach", "chat_id": "has space"}))
err1 = json.loads(await client.recv())
assert err1["event"] == "error"
# message with missing content
await client.send(json.dumps({"type": "message", "chat_id": "abc", "content": ""}))
err2 = json.loads(await client.recv())
assert err2["event"] == "error"
# unknown type
await client.send(json.dumps({"type": "nope"}))
err3 = json.loads(await client.recv())
assert err3["event"] == "error"
# Connection survives: legacy frame still works.
await client.send("still-alive")
await asyncio.sleep(0.1)
bus.publish_inbound.assert_awaited()
assert bus.publish_inbound.call_args[0][0].content == "still-alive"
finally:
await channel.stop()
await server_task
@pytest.mark.asyncio
async def test_multiplex_cleanup_on_disconnect(bus: MagicMock) -> None:
port = 29934
channel = _ch(bus, port=port)
server_task = asyncio.create_task(channel.start())
await asyncio.sleep(0.3)
try:
async with websockets.connect(f"ws://127.0.0.1:{port}/ws?client_id=dc") as client:
ready = json.loads(await client.recv())
default_chat = ready["chat_id"]
await client.send(json.dumps({"type": "new_chat"}))
extra_chat = json.loads(await client.recv())["chat_id"]
assert default_chat in channel._subs
assert extra_chat in channel._subs
# Client gone. Server-side tracking must be empty.
await asyncio.sleep(0.2)
assert default_chat not in channel._subs
assert extra_chat not in channel._subs
assert not channel._conn_chats
assert not channel._conn_default
finally:
await channel.stop()
await server_task
def test_parse_envelope_detects_typed_frames() -> None:
assert _parse_envelope('{"type":"new_chat"}') == {"type": "new_chat"}
env = _parse_envelope('{"type":"message","chat_id":"abc","content":"hi"}')
assert env == {"type": "message", "chat_id": "abc", "content": "hi"}
def test_parse_envelope_rejects_legacy_and_garbage() -> None:
# No `type` field → legacy, caller falls back to _parse_inbound_payload.
assert _parse_envelope('{"content":"hi"}') is None
assert _parse_envelope("plain text") is None
assert _parse_envelope("{broken") is None
assert _parse_envelope("[1,2,3]") is None
# Non-string `type` is not a valid envelope.
assert _parse_envelope('{"type":123}') is None
@pytest.mark.parametrize(
("value", "expected"),
[
("abc", True),
("a1b2_c:d-e", True),
("x" * 64, True),
("unified:default", True),
("", False),
("x" * 65, False),
("has space", False),
("a/b", False),
("a.b", False),
(None, False),
(123, False),
],
)
def test_is_valid_chat_id(value: Any, expected: bool) -> None:
assert _is_valid_chat_id(value) is expected
def test_handle_webui_thread_get_returns_json(tmp_path, monkeypatch) -> None:
from urllib.parse import quote
from websockets.datastructures import Headers
from websockets.http11 import Request
from nanobot.utils.webui_transcript import append_transcript_object
monkeypatch.setattr("nanobot.config.paths.get_data_dir", lambda: tmp_path)
key = "websocket:c1"
append_transcript_object(key, {"event": "user", "chat_id": "c1", "text": "hi"})
bus = MagicMock()
channel = _ch(bus)
channel._api_tokens["tok"] = time.monotonic() + 300.0
enc = quote(key, safe="")
req = Request(f"/api/sessions/{enc}/webui-thread", Headers([("Authorization", "Bearer tok")]))
resp = channel._handle_webui_thread_get(req, enc)
assert resp.status_code == 200
body = json.loads(resp.body.decode())
assert body["sessionKey"] == key
assert len(body["messages"]) == 1
assert body["messages"][0]["role"] == "user"
assert body["messages"][0]["content"] == "hi"