test(signal): cover SSE receive loop and the empty-phone start guard

Previously the SSE loop and the empty-phone-number short-circuit in start()
had zero coverage. Both now have tests: a fake httpx stream feeds canned
SSE lines, exercising the valid-frame, invalid-JSON, non-200, and
no-http-client paths; start() with an empty phone number is asserted to
return without entering the HTTP loop.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Kaloyan Tenchov 2026-05-16 11:41:04 -04:00 committed by chengyongru
parent 7caf492ae2
commit 626f262121

View File

@ -3,8 +3,9 @@
from __future__ import annotations
import asyncio
from contextlib import asynccontextmanager
from pathlib import Path
from unittest.mock import AsyncMock
from unittest.mock import AsyncMock, MagicMock
import pytest
@ -891,6 +892,112 @@ class TestHandleDataMessageGroup:
assert ch._id_matches_account("new-bot-uuid")
# ---------------------------------------------------------------------------
# Lifecycle / SSE
# ---------------------------------------------------------------------------
class _FakeSSEResponse:
"""Minimal stand-in for httpx Response under stream()."""
def __init__(self, lines: list[str], status_code: int = 200) -> None:
self.status_code = status_code
self._lines = lines
async def aiter_lines(self):
for line in self._lines:
yield line
def _fake_streaming_client(lines: list[str], *, status_code: int = 200) -> MagicMock:
"""Return an httpx.AsyncClient stand-in whose .stream() yields a FakeSSEResponse."""
response = _FakeSSEResponse(lines, status_code=status_code)
@asynccontextmanager
async def _ctx(*_args, **_kwargs):
yield response
http = MagicMock()
http.stream = lambda *a, **kw: _ctx(*a, **kw)
return http
class TestLifecycle:
@pytest.mark.asyncio
async def test_start_returns_early_when_phone_missing(self):
"""start() with an empty phone number must not enter the HTTP loop."""
ch = _make_channel(phone_number="")
await ch.start()
assert ch._running is False
assert ch._http is None
assert ch._sse_task is None
class TestSSEReceiveLoop:
@pytest.mark.asyncio
async def test_dispatches_valid_envelope(self):
ch = _make_channel()
ch._running = True
captured: list[dict] = []
async def capture(params):
captured.append(params)
ch._handle_receive_notification = capture # type: ignore[method-assign]
ch._http = _fake_streaming_client(
['data: {"envelope":{"sourceNumber":"+19995550001"}}', ""]
)
# Loop ends when lines exhaust; the surrounding _start_http_mode would
# treat that as a disconnect, but the loop itself raises ConnectionError
# when the stream closes while still running.
with pytest.raises(ConnectionError):
await ch._sse_receive_loop()
assert captured == [{"envelope": {"sourceNumber": "+19995550001"}}]
@pytest.mark.asyncio
async def test_handles_invalid_json_frame(self):
"""An unparseable SSE frame is logged and skipped without crashing."""
ch = _make_channel()
ch._running = True
captured: list[dict] = []
async def capture(params):
captured.append(params)
ch._handle_receive_notification = capture # type: ignore[method-assign]
ch._http = _fake_streaming_client(
[
"data: this-is-not-json",
"", # event boundary triggers parse attempt
'data: {"envelope":{"sourceNumber":"+1"}}',
"",
]
)
with pytest.raises(ConnectionError):
await ch._sse_receive_loop()
# Bad frame skipped; good frame still dispatched.
assert captured == [{"envelope": {"sourceNumber": "+1"}}]
@pytest.mark.asyncio
async def test_non_200_status_raises(self):
ch = _make_channel()
ch._running = True
ch._http = _fake_streaming_client([], status_code=503)
with pytest.raises(ConnectionError, match="status 503"):
await ch._sse_receive_loop()
@pytest.mark.asyncio
async def test_no_http_client_raises(self):
ch = _make_channel()
ch._http = None
with pytest.raises(RuntimeError, match="HTTP client not initialized"):
await ch._sse_receive_loop()
# ---------------------------------------------------------------------------
# Command handling
# ---------------------------------------------------------------------------