mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-05-27 03:52:35 +00:00
fix(matrix): stop sync loop on irrecoverable auth errors
When the Matrix homeserver returns M_UNKNOWN_TOKEN / M_FORBIDDEN / M_UNAUTHORIZED (or soft_logout), the previous _sync_loop kept retrying sync_forever every 2 seconds forever, spamming the homeserver and filling logs (#1851). The auth state cannot recover by retrying, so this is pure noise and a soft DoS on the homeserver. - Extract `_is_fatal_auth_response()` helper - In `_on_sync_error`, on fatal auth: set `_running=False` and call `stop_sync_forever()` so the loop exits cleanly - Add exponential backoff (2s → 60s cap) to the generic exception path in `_sync_loop` so transient network blips also stop hammering Closes #1851 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
539d82eadc
commit
4860a9a6c9
@ -590,15 +590,26 @@ class MatrixChannel(BaseChannel):
|
|||||||
self.client.add_response_callback(self._on_join_error, JoinError)
|
self.client.add_response_callback(self._on_join_error, JoinError)
|
||||||
self.client.add_response_callback(self._on_send_error, RoomSendError)
|
self.client.add_response_callback(self._on_send_error, RoomSendError)
|
||||||
|
|
||||||
def _log_response_error(self, label: str, response: Any) -> None:
|
def _is_fatal_auth_response(self, response: Any) -> bool:
|
||||||
"""Log Matrix response errors — auth errors at ERROR level, rest at WARNING."""
|
|
||||||
code = getattr(response, "status_code", None)
|
code = getattr(response, "status_code", None)
|
||||||
is_auth = code in {"M_UNKNOWN_TOKEN", "M_FORBIDDEN", "M_UNAUTHORIZED"}
|
is_auth = code in {"M_UNKNOWN_TOKEN", "M_FORBIDDEN", "M_UNAUTHORIZED"}
|
||||||
is_fatal = is_auth or getattr(response, "soft_logout", False)
|
return is_auth or bool(getattr(response, "soft_logout", False))
|
||||||
|
|
||||||
|
def _log_response_error(self, label: str, response: Any) -> None:
|
||||||
|
"""Log Matrix response errors — auth errors at ERROR level, rest at WARNING."""
|
||||||
|
is_fatal = self._is_fatal_auth_response(response)
|
||||||
(logger.error if is_fatal else logger.warning)("Matrix {} failed: {}", label, response)
|
(logger.error if is_fatal else logger.warning)("Matrix {} failed: {}", label, response)
|
||||||
|
|
||||||
async def _on_sync_error(self, response: SyncError) -> None:
|
async def _on_sync_error(self, response: SyncError) -> None:
|
||||||
self._log_response_error("sync", response)
|
self._log_response_error("sync", response)
|
||||||
|
if self._is_fatal_auth_response(response):
|
||||||
|
# Auth errors won't recover by retry; stop the sync loop instead of
|
||||||
|
# spamming the homeserver every 2s (#1851).
|
||||||
|
logger.error("Matrix authentication failed irrecoverably; stopping sync loop")
|
||||||
|
self._running = False
|
||||||
|
if self.client:
|
||||||
|
with suppress(Exception):
|
||||||
|
self.client.stop_sync_forever()
|
||||||
|
|
||||||
async def _on_join_error(self, response: JoinError) -> None:
|
async def _on_join_error(self, response: JoinError) -> None:
|
||||||
self._log_response_error("join", response)
|
self._log_response_error("join", response)
|
||||||
@ -640,13 +651,18 @@ class MatrixChannel(BaseChannel):
|
|||||||
await self._set_typing(room_id, False)
|
await self._set_typing(room_id, False)
|
||||||
|
|
||||||
async def _sync_loop(self) -> None:
|
async def _sync_loop(self) -> None:
|
||||||
|
backoff = 2.0
|
||||||
while self._running:
|
while self._running:
|
||||||
try:
|
try:
|
||||||
await self.client.sync_forever(timeout=30000, full_state=True)
|
await self.client.sync_forever(timeout=30000, full_state=True)
|
||||||
|
backoff = 2.0
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
break
|
break
|
||||||
except Exception:
|
except Exception:
|
||||||
await asyncio.sleep(2)
|
if not self._running:
|
||||||
|
break
|
||||||
|
await asyncio.sleep(backoff)
|
||||||
|
backoff = min(backoff * 2, 60.0)
|
||||||
|
|
||||||
async def _on_room_invite(self, room: MatrixRoom, event: InviteEvent) -> None:
|
async def _on_room_invite(self, room: MatrixRoom, event: InviteEvent) -> None:
|
||||||
if self.is_allowed(event.sender):
|
if self.is_allowed(event.sender):
|
||||||
|
|||||||
@ -7,7 +7,7 @@ import pytest
|
|||||||
pytest.importorskip("nio")
|
pytest.importorskip("nio")
|
||||||
pytest.importorskip("nh3")
|
pytest.importorskip("nh3")
|
||||||
pytest.importorskip("mistune")
|
pytest.importorskip("mistune")
|
||||||
from nio import RoomSendResponse
|
from nio import RoomSendResponse, SyncError
|
||||||
|
|
||||||
from nanobot.channels.matrix import _build_matrix_text_content
|
from nanobot.channels.matrix import _build_matrix_text_content
|
||||||
|
|
||||||
@ -266,6 +266,61 @@ async def test_start_disables_e2ee_when_configured(
|
|||||||
await channel.stop()
|
await channel.stop()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_on_sync_error_stops_loop_on_unknown_token() -> None:
|
||||||
|
channel = MatrixChannel(_make_config(), MessageBus())
|
||||||
|
client = _FakeAsyncClient("", "", "", None)
|
||||||
|
channel.client = client
|
||||||
|
channel._running = True
|
||||||
|
|
||||||
|
await channel._on_sync_error(SyncError(message="bad", status_code="M_UNKNOWN_TOKEN"))
|
||||||
|
|
||||||
|
assert channel._running is False
|
||||||
|
assert client.stop_sync_forever_called is True
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_on_sync_error_keeps_running_on_transient_error() -> None:
|
||||||
|
channel = MatrixChannel(_make_config(), MessageBus())
|
||||||
|
client = _FakeAsyncClient("", "", "", None)
|
||||||
|
channel.client = client
|
||||||
|
channel._running = True
|
||||||
|
|
||||||
|
await channel._on_sync_error(SyncError(message="oops", status_code="M_LIMIT_EXCEEDED"))
|
||||||
|
|
||||||
|
assert channel._running is True
|
||||||
|
assert client.stop_sync_forever_called is False
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_sync_loop_backs_off_on_repeated_errors(monkeypatch) -> None:
|
||||||
|
channel = MatrixChannel(_make_config(), MessageBus())
|
||||||
|
|
||||||
|
sleeps: list[float] = []
|
||||||
|
|
||||||
|
async def _fake_sleep(delay: float) -> None:
|
||||||
|
sleeps.append(delay)
|
||||||
|
|
||||||
|
monkeypatch.setattr(matrix_module.asyncio, "sleep", _fake_sleep)
|
||||||
|
|
||||||
|
call_count = {"n": 0}
|
||||||
|
|
||||||
|
class _BoomClient:
|
||||||
|
async def sync_forever(self, **_kwargs) -> None:
|
||||||
|
call_count["n"] += 1
|
||||||
|
if call_count["n"] > 4:
|
||||||
|
channel._running = False
|
||||||
|
return
|
||||||
|
raise RuntimeError("boom")
|
||||||
|
|
||||||
|
channel.client = _BoomClient()
|
||||||
|
channel._running = True
|
||||||
|
|
||||||
|
await channel._sync_loop()
|
||||||
|
|
||||||
|
assert sleeps == [2.0, 4.0, 8.0, 16.0]
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_stop_stops_sync_forever_before_close(monkeypatch) -> None:
|
async def test_stop_stops_sync_forever_before_close(monkeypatch) -> None:
|
||||||
channel = MatrixChannel(_make_config(device_id="DEVICE"), MessageBus())
|
channel = MatrixChannel(_make_config(device_id="DEVICE"), MessageBus())
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user