From 15007afd4ab00a90373e895b1615a1f13b21a4f4 Mon Sep 17 00:00:00 2001 From: coldxiangyu Date: Fri, 1 May 2026 18:54:32 +0800 Subject: [PATCH] fix(matrix): skip events received before bot startup Matrix sync replays the room timeline on each startup or `/restart`, causing already-handled messages to be reprocessed (#3553). Even with `store_sync_tokens=True`, the sync token isn't reliably re-injected when restoring a session via access_token + load_store(), so the client re-reads recent timeline entries. Filter `event.server_timestamp` against the process start time so old events are dropped at the `_on_message` / `_on_media_message` entry points. Trade-off: messages received during downtime won't be processed, which matches the issue reporter's expectation. Closes #3553 Co-Authored-By: Claude Opus 4.7 (1M context) --- nanobot/channels/matrix.py | 24 +++++++++++- tests/channels/test_matrix_channel.py | 56 +++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 2 deletions(-) diff --git a/nanobot/channels/matrix.py b/nanobot/channels/matrix.py index a234e8cef..2e0b4de6d 100644 --- a/nanobot/channels/matrix.py +++ b/nanobot/channels/matrix.py @@ -252,11 +252,13 @@ class MatrixChannel(BaseChannel): self._server_upload_limit_bytes: int | None = None self._server_upload_limit_checked = False self._stream_bufs: dict[str, _StreamBuf] = {} + self._started_at_ms: int = 0 async def start(self) -> None: """Start Matrix client and begin sync loop.""" self._running = True + self._started_at_ms = int(time.time() * 1000) _configure_nio_logging_bridge() self.store_path = get_data_dir() / "matrix-store" @@ -667,6 +669,16 @@ class MatrixChannel(BaseChannel): return True return bool(self.config.allow_room_mentions and mentions.get("room") is True) + def _is_pre_startup_event(self, event: RoomMessage) -> bool: + """Skip events that landed in the timeline before this process started. + + Matrix sync replays the room timeline on each startup/restart; without + this filter old messages would be re-handled as if they were fresh + (#3553). + """ + ts = getattr(event, "server_timestamp", None) + return isinstance(ts, int) and ts < self._started_at_ms + def _should_process_message(self, room: MatrixRoom, event: RoomMessage) -> bool: """Apply sender and room policy checks.""" if not self.is_allowed(event.sender): @@ -851,7 +863,11 @@ class MatrixChannel(BaseChannel): return meta async def _on_message(self, room: MatrixRoom, event: RoomMessageText) -> None: - if event.sender == self.config.user_id or not self._should_process_message(room, event): + if ( + event.sender == self.config.user_id + or self._is_pre_startup_event(event) + or not self._should_process_message(room, event) + ): return await self._start_typing_keepalive(room.room_id) try: @@ -864,7 +880,11 @@ class MatrixChannel(BaseChannel): raise async def _on_media_message(self, room: MatrixRoom, event: MatrixMediaEvent) -> None: - if event.sender == self.config.user_id or not self._should_process_message(room, event): + if ( + event.sender == self.config.user_id + or self._is_pre_startup_event(event) + or not self._should_process_message(room, event) + ): return attachment, marker = await self._fetch_media_attachment(room, event) parts: list[str] = [] diff --git a/tests/channels/test_matrix_channel.py b/tests/channels/test_matrix_channel.py index 27b7e1255..bf7a09e23 100644 --- a/tests/channels/test_matrix_channel.py +++ b/tests/channels/test_matrix_channel.py @@ -380,6 +380,62 @@ async def test_on_message_skips_typing_for_self_message() -> None: assert client.typing_calls == [] +@pytest.mark.asyncio +async def test_on_message_skips_pre_startup_event() -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + channel._started_at_ms = 1_000_000 + + handled: list[str] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs["sender_id"]) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room") + old_event = SimpleNamespace( + sender="@alice:matrix.org", body="old", source={}, server_timestamp=999_999 + ) + fresh_event = SimpleNamespace( + sender="@alice:matrix.org", body="fresh", source={}, server_timestamp=1_000_001 + ) + + await channel._on_message(room, old_event) + await channel._on_message(room, fresh_event) + + assert handled == ["@alice:matrix.org"] + assert client.typing_calls == [ + ("!room:matrix.org", True, TYPING_NOTICE_TIMEOUT_MS), + ] + + +@pytest.mark.asyncio +async def test_on_media_message_skips_pre_startup_event() -> None: + channel = MatrixChannel(_make_config(), MessageBus()) + client = _FakeAsyncClient("", "", "", None) + channel.client = client + channel._started_at_ms = 1_000_000 + + handled: list[str] = [] + + async def _fake_handle_message(**kwargs) -> None: + handled.append(kwargs["sender_id"]) + + channel._handle_message = _fake_handle_message # type: ignore[method-assign] + + room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room") + old_event = SimpleNamespace( + sender="@alice:matrix.org", body="old", source={}, server_timestamp=999_999 + ) + + await channel._on_media_message(room, old_event) + + assert handled == [] + assert client.typing_calls == [] + + @pytest.mark.asyncio async def test_on_message_skips_typing_for_denied_sender() -> None: channel = MatrixChannel(_make_config(allow_from=["@bob:matrix.org"]), MessageBus())