From fe928a0d94736d26dbfaacdf68723449da61ebb6 Mon Sep 17 00:00:00 2001 From: zhuzhh Date: Sat, 25 Apr 2026 15:39:43 +0800 Subject: [PATCH] feat(msteams): split ref storage into main+meta sidecar files - Separate updated_at into a meta sidecar file (msteams_conversations_meta.json) to keep backward compatibility with legacy data that never had updated_at. On first upgrade, legacy refs are kept alive by initializing updated_at to now instead of purging them immediately. - Add cross-process locking via fcntl (with Windows fallback) to prevent concurrent writes from different gateway processes overwriting each other. - Add ref_touch_interval_s config (default 300s) to throttle how often successful sends refresh updated_at, preventing unnecessary I/O. - Touch active refs on send success to prevent them from expiring while in use. - Add _safe_float and _normalize_ref_record for robust schema migration. - All refs operations now use threading.RLock within a process. --- docs/chat-apps.md | 4 +- nanobot/channels/msteams.py | 231 +++++++++++++++++++++++++++++------- tests/test_msteams.py | 100 ++++++++++++++-- 3 files changed, 283 insertions(+), 52 deletions(-) diff --git a/docs/chat-apps.md b/docs/chat-apps.md index 3d5e4dbd5..6eea7d92e 100644 --- a/docs/chat-apps.md +++ b/docs/chat-apps.md @@ -645,7 +645,8 @@ Create or reuse a Microsoft Teams / Azure bot app registration. Set the bot mess "validateInboundAuth": true, "refTtlDays": 30, "pruneWebChatRefs": true, - "pruneNonPersonalRefs": true + "pruneNonPersonalRefs": true, + "refTouchIntervalS": 300 } } } @@ -657,6 +658,7 @@ Create or reuse a Microsoft Teams / Azure bot app registration. Set the bot mess > - `refTtlDays` (default `30`) controls how old stored conversation refs can be before they are pruned. > - `pruneWebChatRefs` (default `true`) drops refs with `webchat.botframework.com` service URLs. > - `pruneNonPersonalRefs` (default `true`) drops refs whose `conversation_type` is not `personal`. +> - `refTouchIntervalS` (default `300`) throttles how often successful sends refresh `updated_at` for active refs. **4. Run** diff --git a/nanobot/channels/msteams.py b/nanobot/channels/msteams.py index 7b294a830..685774bf5 100644 --- a/nanobot/channels/msteams.py +++ b/nanobot/channels/msteams.py @@ -20,11 +20,17 @@ import re import tempfile import threading import time +from contextlib import contextmanager from dataclasses import dataclass from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from typing import TYPE_CHECKING, Any from urllib.parse import urlparse +try: # pragma: no cover - Windows fallback path + import fcntl +except ImportError: # pragma: no cover + fcntl = None + import httpx from loguru import logger from pydantic import Field @@ -49,6 +55,9 @@ if MSTEAMS_AVAILABLE: MSTEAMS_REF_TTL_DAYS = 30 MSTEAMS_REF_TTL_S = MSTEAMS_REF_TTL_DAYS * 24 * 60 * 60 MSTEAMS_WEBCHAT_HOST = "webchat.botframework.com" +MSTEAMS_REF_META_FILENAME = "msteams_conversations_meta.json" +MSTEAMS_REF_LOCK_FILENAME = "msteams_conversations.lock" +MSTEAMS_REF_TOUCH_INTERVAL_S = 300 class MSTeamsConfig(Base): @@ -68,6 +77,7 @@ class MSTeamsConfig(Base): ref_ttl_days: int = Field(default=MSTEAMS_REF_TTL_DAYS, ge=1) prune_web_chat_refs: bool = True prune_non_personal_refs: bool = True + ref_touch_interval_s: int = Field(default=MSTEAMS_REF_TOUCH_INTERVAL_S, ge=0) @dataclass @@ -113,9 +123,13 @@ class MSTeamsChannel(BaseChannel): self._botframework_jwks_expires_at: float = 0.0 self._refs_path = get_workspace_path() / "state" / "msteams_conversations.json" self._refs_path.parent.mkdir(parents=True, exist_ok=True) + self._refs_meta_path = self._refs_path.parent / MSTEAMS_REF_META_FILENAME + self._refs_lock_path = self._refs_path.parent / MSTEAMS_REF_LOCK_FILENAME + self._refs_guard = threading.RLock() self._conversation_refs: dict[str, ConversationRef] = self._load_refs() - if self._prune_conversation_refs(): - self._save_refs(prune=False) + with self._refs_guard: + if self._prune_conversation_refs(): + self._save_refs_locked(prune=True) async def start(self) -> None: """Start the Teams webhook listener.""" @@ -249,6 +263,7 @@ class MSTeamsChannel(BaseChannel): resp = await self._http.post(url, headers=headers, json=payload) resp.raise_for_status() logger.info("MSTeams message sent to {}", ref.conversation_id) + self._touch_conversation_ref(str(msg.chat_id), persist=True) except Exception as e: logger.error("MSTeams send failed: {}", e) raise @@ -295,16 +310,17 @@ class MSTeamsChannel(BaseChannel): ) return - self._conversation_refs[conversation_id] = ConversationRef( - service_url=service_url, - conversation_id=conversation_id, - bot_id=str(recipient.get("id") or "") or None, - activity_id=activity_id or None, - conversation_type=conversation_type or None, - tenant_id=str((channel_data.get("tenant") or {}).get("id") or "") or None, - updated_at=time.time(), - ) - self._save_refs() + with self._refs_guard: + self._conversation_refs[conversation_id] = ConversationRef( + service_url=service_url, + conversation_id=conversation_id, + bot_id=str(recipient.get("id") or "") or None, + activity_id=activity_id or None, + conversation_type=conversation_type or None, + tenant_id=str((channel_data.get("tenant") or {}).get("id") or "") or None, + updated_at=time.time(), + ) + self._save_refs_locked() await self._handle_message( sender_id=sender_id, @@ -491,19 +507,109 @@ class MSTeamsChannel(BaseChannel): self._botframework_jwks_expires_at = now + 3600 return self._botframework_jwks + @staticmethod + def _safe_float(value: Any) -> float | None: + try: + out = float(value) + if out > 0: + return out + except (TypeError, ValueError): + return None + return None + + def _normalize_ref_record(self, value: Any) -> ConversationRef | None: + """Normalize a stored ref record from legacy/current schema.""" + if not isinstance(value, dict): + return None + service_url = str(value.get("service_url") or "").strip() + conversation_id = str(value.get("conversation_id") or "").strip() + if not service_url or not conversation_id: + return None + return ConversationRef( + service_url=service_url, + conversation_id=conversation_id, + bot_id=str(value.get("bot_id") or "") or None, + activity_id=str(value.get("activity_id") or "") or None, + conversation_type=str(value.get("conversation_type") or "") or None, + tenant_id=str(value.get("tenant_id") or "") or None, + updated_at=self._safe_float(value.get("updated_at")), + ) + + def _load_refs_raw(self) -> tuple[dict[str, Any], dict[str, Any], bool]: + """Load raw refs/main+meta JSON payloads.""" + main_data: dict[str, Any] = {} + meta_data: dict[str, Any] = {} + meta_exists = self._refs_meta_path.exists() + + if self._refs_path.exists(): + try: + loaded = json.loads(self._refs_path.read_text(encoding="utf-8")) + if isinstance(loaded, dict): + main_data = loaded + except Exception as e: + logger.warning("Failed to load MSTeams conversation refs: {}", e) + + if meta_exists: + try: + loaded_meta = json.loads(self._refs_meta_path.read_text(encoding="utf-8")) + if isinstance(loaded_meta, dict): + meta_data = loaded_meta + except Exception as e: + logger.warning("Failed to load MSTeams conversation refs metadata: {}", e) + + return main_data, meta_data, meta_exists + + def _load_refs_from_disk(self) -> dict[str, ConversationRef]: + """Load refs from disk with compatibility fallback for legacy layouts.""" + main_data, meta_data, meta_exists = self._load_refs_raw() + if not main_data: + return {} + + out: dict[str, ConversationRef] = {} + now = time.time() + for key, value in main_data.items(): + ref = self._normalize_ref_record(value) + if not ref: + continue + + meta_entry = meta_data.get(key) if isinstance(meta_data, dict) else None + meta_ts = None + if isinstance(meta_entry, dict): + meta_ts = self._safe_float(meta_entry.get("updated_at")) + elif meta_entry is not None: + meta_ts = self._safe_float(meta_entry) + + if meta_ts is not None: + ref.updated_at = meta_ts + elif not meta_exists: + # First run after introducing meta sidecar: keep legacy refs alive + # by initializing timestamps to "now" instead of purging immediately. + ref.updated_at = now + elif ref.updated_at is None: + ref.updated_at = now + + out[key] = ref + return out + def _load_refs(self) -> dict[str, ConversationRef]: """Load stored conversation references.""" - if not self._refs_path.exists(): - return {} + return self._load_refs_from_disk() + + @contextmanager + def _refs_file_lock(self): + """Cross-process lock while merging and writing refs state.""" + self._refs_path.parent.mkdir(parents=True, exist_ok=True) + lock_fp = self._refs_lock_path.open("a+", encoding="utf-8") try: - data = json.loads(self._refs_path.read_text(encoding="utf-8")) - out: dict[str, ConversationRef] = {} - for key, value in data.items(): - out[key] = ConversationRef(**value) - return out - except Exception as e: - logger.warning("Failed to load MSTeams conversation refs: {}", e) - return {} + if fcntl is not None: + fcntl.flock(lock_fp.fileno(), fcntl.LOCK_EX) + yield + finally: + try: + if fcntl is not None: + fcntl.flock(lock_fp.fileno(), fcntl.LOCK_UN) + finally: + lock_fp.close() def _is_webchat_service_url(self, service_url: str) -> bool: """Return True when service URL points to unsupported Bot Framework Web Chat.""" @@ -554,21 +660,49 @@ class MSTeamsChannel(BaseChannel): ) return True - def _write_refs_atomically(self, data: dict[str, Any]) -> None: + def _merge_refs_from_disk_locked(self) -> None: + """Merge disk refs into memory to reduce lost updates across processes.""" + disk_refs = self._load_refs_from_disk() + for key, disk_ref in disk_refs.items(): + mem_ref = self._conversation_refs.get(key) + if mem_ref is None: + self._conversation_refs[key] = disk_ref + continue + disk_ts = self._safe_float(disk_ref.updated_at) or 0.0 + mem_ts = self._safe_float(mem_ref.updated_at) or 0.0 + if disk_ts > mem_ts: + self._conversation_refs[key] = disk_ref + + def _touch_conversation_ref(self, chat_id: str, *, persist: bool = False) -> None: + """Refresh updated_at for an active ref to keep it from expiring while used.""" + with self._refs_guard: + ref = self._conversation_refs.get(str(chat_id)) + if not ref: + return + now = time.time() + prev = self._safe_float(ref.updated_at) or 0.0 + min_interval = max(0, int(self.config.ref_touch_interval_s)) + if min_interval > 0 and prev > 0 and now - prev < min_interval: + return + ref.updated_at = now + if persist: + self._save_refs_locked() + + def _write_json_atomically(self, path, data: dict[str, Any]) -> None: """Write refs JSON atomically to reduce corruption risk during crashes.""" payload = json.dumps(data, indent=2) tmp_path: str | None = None try: fd, tmp_path = tempfile.mkstemp( - dir=str(self._refs_path.parent), - prefix=f"{self._refs_path.name}.", + dir=str(path.parent), + prefix=f"{path.name}.", suffix=".tmp", ) with os.fdopen(fd, "w", encoding="utf-8") as f: f.write(payload) f.flush() os.fsync(f.fileno()) - os.replace(tmp_path, self._refs_path) + os.replace(tmp_path, path) finally: if tmp_path and os.path.exists(tmp_path): try: @@ -576,27 +710,40 @@ class MSTeamsChannel(BaseChannel): except OSError: pass - def _save_refs(self, *, prune: bool = True) -> None: - """Persist conversation references.""" + def _save_refs_locked(self, *, prune: bool = True) -> None: + """Persist conversation references (caller must hold _refs_guard).""" try: - if prune: - self._prune_conversation_refs() - data = { - key: { - "service_url": ref.service_url, - "conversation_id": ref.conversation_id, - "bot_id": ref.bot_id, - "activity_id": ref.activity_id, - "conversation_type": ref.conversation_type, - "tenant_id": ref.tenant_id, - "updated_at": ref.updated_at, + with self._refs_file_lock(): + self._merge_refs_from_disk_locked() + if prune: + self._prune_conversation_refs() + refs_data = { + key: { + "service_url": ref.service_url, + "conversation_id": ref.conversation_id, + "bot_id": ref.bot_id, + "activity_id": ref.activity_id, + "conversation_type": ref.conversation_type, + "tenant_id": ref.tenant_id, + } + for key, ref in self._conversation_refs.items() } - for key, ref in self._conversation_refs.items() - } - self._write_refs_atomically(data) + refs_meta = { + key: { + "updated_at": self._safe_float(ref.updated_at), + } + for key, ref in self._conversation_refs.items() + } + self._write_json_atomically(self._refs_path, refs_data) + self._write_json_atomically(self._refs_meta_path, refs_meta) except Exception as e: logger.warning("Failed to save MSTeams conversation refs: {}", e) + def _save_refs(self, *, prune: bool = True) -> None: + """Persist conversation references.""" + with self._refs_guard: + self._save_refs_locked(prune=prune) + async def _get_access_token(self) -> str: """Fetch an access token for Bot Framework / Azure Bot auth.""" diff --git a/tests/test_msteams.py b/tests/test_msteams.py index da6bf511c..dae2bbfa8 100644 --- a/tests/test_msteams.py +++ b/tests/test_msteams.py @@ -115,7 +115,10 @@ async def test_handle_activity_personal_message_publishes_and_stores_ref(make_ch saved = json.loads((tmp_path / "state" / "msteams_conversations.json").read_text(encoding="utf-8")) assert saved["conv-123"]["conversation_id"] == "conv-123" assert saved["conv-123"]["tenant_id"] == "tenant-id" - assert float(saved["conv-123"]["updated_at"]) > 0 + saved_meta = json.loads( + (tmp_path / "state" / msteams_module.MSTEAMS_REF_META_FILENAME).read_text(encoding="utf-8"), + ) + assert float(saved_meta["conv-123"]["updated_at"]) > 0 def test_init_prunes_stale_and_unsupported_conversation_refs(make_channel, tmp_path, monkeypatch): @@ -125,6 +128,7 @@ def test_init_prunes_stale_and_unsupported_conversation_refs(make_channel, tmp_p state_dir = tmp_path / "state" state_dir.mkdir(parents=True, exist_ok=True) refs_path = state_dir / "msteams_conversations.json" + refs_meta_path = state_dir / msteams_module.MSTEAMS_REF_META_FILENAME refs_path.write_text( json.dumps( { @@ -132,25 +136,21 @@ def test_init_prunes_stale_and_unsupported_conversation_refs(make_channel, tmp_p "service_url": "https://smba.trafficmanager.net/amer/", "conversation_id": "conv-valid", "conversation_type": "personal", - "updated_at": now - 60, }, "conv-webchat": { "service_url": "https://webchat.botframework.com/", "conversation_id": "conv-webchat", "conversation_type": "personal", - "updated_at": now - 60, }, "conv-group": { "service_url": "https://smba.trafficmanager.net/amer/", "conversation_id": "conv-group", "conversation_type": "channel", - "updated_at": now - 60, }, "conv-stale": { "service_url": "https://smba.trafficmanager.net/amer/", "conversation_id": "conv-stale", "conversation_type": "personal", - "updated_at": now - msteams_module.MSTEAMS_REF_TTL_S - 1, }, "conv-missing-ts": { "service_url": "https://smba.trafficmanager.net/amer/", @@ -162,14 +162,27 @@ def test_init_prunes_stale_and_unsupported_conversation_refs(make_channel, tmp_p ), encoding="utf-8", ) + refs_meta_path.write_text( + json.dumps( + { + "conv-valid": {"updated_at": now - 60}, + "conv-webchat": {"updated_at": now - 60}, + "conv-group": {"updated_at": now - 60}, + "conv-stale": {"updated_at": now - msteams_module.MSTEAMS_REF_TTL_S - 1}, + }, + indent=2, + ), + encoding="utf-8", + ) ch = make_channel() - assert set(ch._conversation_refs.keys()) == {"conv-valid"} + assert set(ch._conversation_refs.keys()) == {"conv-valid", "conv-missing-ts"} assert ch._conversation_refs["conv-valid"].conversation_id == "conv-valid" + assert ch._conversation_refs["conv-missing-ts"].conversation_id == "conv-missing-ts" persisted = json.loads(refs_path.read_text(encoding="utf-8")) - assert set(persisted.keys()) == {"conv-valid"} + assert set(persisted.keys()) == {"conv-valid", "conv-missing-ts"} def test_save_prunes_unsupported_conversation_refs(make_channel, tmp_path, monkeypatch): @@ -204,6 +217,10 @@ def test_save_prunes_unsupported_conversation_refs(make_channel, tmp_path, monke saved = json.loads((tmp_path / "state" / "msteams_conversations.json").read_text(encoding="utf-8")) assert set(saved.keys()) == {"conv-valid"} + saved_meta = json.loads( + (tmp_path / "state" / msteams_module.MSTEAMS_REF_META_FILENAME).read_text(encoding="utf-8"), + ) + assert set(saved_meta.keys()) == {"conv-valid"} def test_init_respects_prune_toggle_flags(make_channel, tmp_path, monkeypatch): @@ -248,6 +265,7 @@ def test_init_respects_custom_ref_ttl_days(make_channel, tmp_path, monkeypatch): state_dir = tmp_path / "state" state_dir.mkdir(parents=True, exist_ok=True) refs_path = state_dir / "msteams_conversations.json" + refs_meta_path = state_dir / msteams_module.MSTEAMS_REF_META_FILENAME refs_path.write_text( json.dumps( { @@ -255,19 +273,27 @@ def test_init_respects_custom_ref_ttl_days(make_channel, tmp_path, monkeypatch): "service_url": "https://smba.trafficmanager.net/amer/", "conversation_id": "conv-fresh", "conversation_type": "personal", - "updated_at": now - 12 * 60 * 60, }, "conv-old": { "service_url": "https://smba.trafficmanager.net/amer/", "conversation_id": "conv-old", "conversation_type": "personal", - "updated_at": now - 10 * 24 * 60 * 60, }, }, indent=2, ), encoding="utf-8", ) + refs_meta_path.write_text( + json.dumps( + { + "conv-fresh": {"updated_at": now - 12 * 60 * 60}, + "conv-old": {"updated_at": now - 10 * 24 * 60 * 60}, + }, + indent=2, + ), + encoding="utf-8", + ) ch = make_channel(refTtlDays=1) @@ -276,6 +302,34 @@ def test_init_respects_custom_ref_ttl_days(make_channel, tmp_path, monkeypatch): assert set(persisted.keys()) == {"conv-fresh"} +def test_init_without_meta_keeps_legacy_refs_alive(make_channel, tmp_path, monkeypatch): + now = 1_800_000_000.0 + monkeypatch.setattr(msteams_module.time, "time", lambda: now) + + state_dir = tmp_path / "state" + state_dir.mkdir(parents=True, exist_ok=True) + refs_path = state_dir / "msteams_conversations.json" + refs_path.write_text( + json.dumps( + { + "conv-legacy": { + "service_url": "https://smba.trafficmanager.net/amer/", + "conversation_id": "conv-legacy", + "conversation_type": "personal", + } + }, + indent=2, + ), + encoding="utf-8", + ) + + ch = make_channel(refTtlDays=1) + + assert set(ch._conversation_refs.keys()) == {"conv-legacy"} + assert ch._conversation_refs["conv-legacy"].updated_at == now + assert not (state_dir / msteams_module.MSTEAMS_REF_META_FILENAME).exists() + + def test_save_uses_atomic_replace_and_keeps_existing_file_on_replace_error(make_channel, tmp_path, monkeypatch): ch = make_channel() refs_path = tmp_path / "state" / "msteams_conversations.json" @@ -591,6 +645,33 @@ async def test_send_replies_to_activity_when_reply_in_thread_enabled(make_channe assert kwargs["json"]["replyToId"] == "activity-1" +@pytest.mark.asyncio +async def test_send_success_refreshes_updated_at_and_persists_meta(make_channel, tmp_path, monkeypatch): + now = {"value": 1_800_000_000.0} + monkeypatch.setattr(msteams_module.time, "time", lambda: now["value"]) + + ch = make_channel(refTouchIntervalS=0) + fake_http = FakeHttpClient() + ch._http = fake_http + ch._token = "tok" + ch._token_expires_at = 9_999_999_999 + ch._conversation_refs["conv-123"] = ConversationRef( + service_url="https://smba.trafficmanager.net/amer/", + conversation_id="conv-123", + activity_id="activity-1", + updated_at=now["value"] - 100, + ) + + now["value"] += 5 + await ch.send(OutboundMessage(channel="msteams", chat_id="conv-123", content="Reply text")) + + assert ch._conversation_refs["conv-123"].updated_at == now["value"] + saved_meta = json.loads( + (tmp_path / "state" / msteams_module.MSTEAMS_REF_META_FILENAME).read_text(encoding="utf-8"), + ) + assert saved_meta["conv-123"]["updated_at"] == now["value"] + + @pytest.mark.asyncio async def test_send_posts_to_conversation_when_thread_reply_disabled(make_channel): ch = make_channel(replyInThread=False) @@ -756,6 +837,7 @@ def test_msteams_default_config_includes_restart_notify_fields(): assert cfg["refTtlDays"] == msteams_module.MSTEAMS_REF_TTL_DAYS assert cfg["pruneWebChatRefs"] is True assert cfg["pruneNonPersonalRefs"] is True + assert cfg["refTouchIntervalS"] == msteams_module.MSTEAMS_REF_TOUCH_INTERVAL_S assert "restartNotifyEnabled" not in cfg assert "restartNotifyPreMessage" not in cfg assert "restartNotifyPostMessage" not in cfg