fix(matrix): skip events received before bot startup

Matrix sync replays the room timeline on each startup or `/restart`,
causing already-handled messages to be reprocessed (#3553). Even with
`store_sync_tokens=True`, the sync token isn't reliably re-injected
when restoring a session via access_token + load_store(), so the
client re-reads recent timeline entries.

Filter `event.server_timestamp` against the process start time so old
events are dropped at the `_on_message` / `_on_media_message` entry
points. Trade-off: messages received during downtime won't be
processed, which matches the issue reporter's expectation.

Closes #3553

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
coldxiangyu 2026-05-01 18:54:32 +08:00 committed by Xubin Ren
parent d9800ecdd2
commit 15007afd4a
2 changed files with 78 additions and 2 deletions

View File

@ -252,11 +252,13 @@ class MatrixChannel(BaseChannel):
self._server_upload_limit_bytes: int | None = None
self._server_upload_limit_checked = False
self._stream_bufs: dict[str, _StreamBuf] = {}
self._started_at_ms: int = 0
async def start(self) -> None:
"""Start Matrix client and begin sync loop."""
self._running = True
self._started_at_ms = int(time.time() * 1000)
_configure_nio_logging_bridge()
self.store_path = get_data_dir() / "matrix-store"
@ -667,6 +669,16 @@ class MatrixChannel(BaseChannel):
return True
return bool(self.config.allow_room_mentions and mentions.get("room") is True)
def _is_pre_startup_event(self, event: RoomMessage) -> bool:
"""Skip events that landed in the timeline before this process started.
Matrix sync replays the room timeline on each startup/restart; without
this filter old messages would be re-handled as if they were fresh
(#3553).
"""
ts = getattr(event, "server_timestamp", None)
return isinstance(ts, int) and ts < self._started_at_ms
def _should_process_message(self, room: MatrixRoom, event: RoomMessage) -> bool:
"""Apply sender and room policy checks."""
if not self.is_allowed(event.sender):
@ -851,7 +863,11 @@ class MatrixChannel(BaseChannel):
return meta
async def _on_message(self, room: MatrixRoom, event: RoomMessageText) -> None:
if event.sender == self.config.user_id or not self._should_process_message(room, event):
if (
event.sender == self.config.user_id
or self._is_pre_startup_event(event)
or not self._should_process_message(room, event)
):
return
await self._start_typing_keepalive(room.room_id)
try:
@ -864,7 +880,11 @@ class MatrixChannel(BaseChannel):
raise
async def _on_media_message(self, room: MatrixRoom, event: MatrixMediaEvent) -> None:
if event.sender == self.config.user_id or not self._should_process_message(room, event):
if (
event.sender == self.config.user_id
or self._is_pre_startup_event(event)
or not self._should_process_message(room, event)
):
return
attachment, marker = await self._fetch_media_attachment(room, event)
parts: list[str] = []

View File

@ -380,6 +380,62 @@ async def test_on_message_skips_typing_for_self_message() -> None:
assert client.typing_calls == []
@pytest.mark.asyncio
async def test_on_message_skips_pre_startup_event() -> None:
channel = MatrixChannel(_make_config(), MessageBus())
client = _FakeAsyncClient("", "", "", None)
channel.client = client
channel._started_at_ms = 1_000_000
handled: list[str] = []
async def _fake_handle_message(**kwargs) -> None:
handled.append(kwargs["sender_id"])
channel._handle_message = _fake_handle_message # type: ignore[method-assign]
room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room")
old_event = SimpleNamespace(
sender="@alice:matrix.org", body="old", source={}, server_timestamp=999_999
)
fresh_event = SimpleNamespace(
sender="@alice:matrix.org", body="fresh", source={}, server_timestamp=1_000_001
)
await channel._on_message(room, old_event)
await channel._on_message(room, fresh_event)
assert handled == ["@alice:matrix.org"]
assert client.typing_calls == [
("!room:matrix.org", True, TYPING_NOTICE_TIMEOUT_MS),
]
@pytest.mark.asyncio
async def test_on_media_message_skips_pre_startup_event() -> None:
channel = MatrixChannel(_make_config(), MessageBus())
client = _FakeAsyncClient("", "", "", None)
channel.client = client
channel._started_at_ms = 1_000_000
handled: list[str] = []
async def _fake_handle_message(**kwargs) -> None:
handled.append(kwargs["sender_id"])
channel._handle_message = _fake_handle_message # type: ignore[method-assign]
room = SimpleNamespace(room_id="!room:matrix.org", display_name="Test room")
old_event = SimpleNamespace(
sender="@alice:matrix.org", body="old", source={}, server_timestamp=999_999
)
await channel._on_media_message(room, old_event)
assert handled == []
assert client.typing_calls == []
@pytest.mark.asyncio
async def test_on_message_skips_typing_for_denied_sender() -> None:
channel = MatrixChannel(_make_config(allow_from=["@bob:matrix.org"]), MessageBus())