feat(msteams): split ref storage into main+meta sidecar files

- Separate updated_at into a meta sidecar file (msteams_conversations_meta.json)
    to keep backward compatibility with legacy data that never had updated_at.
    On first upgrade, legacy refs are kept alive by initializing updated_at to now
    instead of purging them immediately.
  - Add cross-process locking via fcntl (with Windows fallback) to prevent
    concurrent writes from different gateway processes overwriting each other.
  - Add ref_touch_interval_s config (default 300s) to throttle how often
    successful sends refresh updated_at, preventing unnecessary I/O.
  - Touch active refs on send success to prevent them from expiring while in use.
  - Add _safe_float and _normalize_ref_record for robust schema migration.
  - All refs operations now use threading.RLock within a process.
This commit is contained in:
zhuzhh 2026-04-25 15:39:43 +08:00
parent 15e9d0471f
commit fe928a0d94
3 changed files with 283 additions and 52 deletions

View File

@ -645,7 +645,8 @@ Create or reuse a Microsoft Teams / Azure bot app registration. Set the bot mess
"validateInboundAuth": true,
"refTtlDays": 30,
"pruneWebChatRefs": true,
"pruneNonPersonalRefs": true
"pruneNonPersonalRefs": true,
"refTouchIntervalS": 300
}
}
}
@ -657,6 +658,7 @@ Create or reuse a Microsoft Teams / Azure bot app registration. Set the bot mess
> - `refTtlDays` (default `30`) controls how old stored conversation refs can be before they are pruned.
> - `pruneWebChatRefs` (default `true`) drops refs with `webchat.botframework.com` service URLs.
> - `pruneNonPersonalRefs` (default `true`) drops refs whose `conversation_type` is not `personal`.
> - `refTouchIntervalS` (default `300`) throttles how often successful sends refresh `updated_at` for active refs.
**4. Run**

View File

@ -20,11 +20,17 @@ import re
import tempfile
import threading
import time
from contextlib import contextmanager
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse
try: # pragma: no cover - Windows fallback path
import fcntl
except ImportError: # pragma: no cover
fcntl = None
import httpx
from loguru import logger
from pydantic import Field
@ -49,6 +55,9 @@ if MSTEAMS_AVAILABLE:
MSTEAMS_REF_TTL_DAYS = 30
MSTEAMS_REF_TTL_S = MSTEAMS_REF_TTL_DAYS * 24 * 60 * 60
MSTEAMS_WEBCHAT_HOST = "webchat.botframework.com"
MSTEAMS_REF_META_FILENAME = "msteams_conversations_meta.json"
MSTEAMS_REF_LOCK_FILENAME = "msteams_conversations.lock"
MSTEAMS_REF_TOUCH_INTERVAL_S = 300
class MSTeamsConfig(Base):
@ -68,6 +77,7 @@ class MSTeamsConfig(Base):
ref_ttl_days: int = Field(default=MSTEAMS_REF_TTL_DAYS, ge=1)
prune_web_chat_refs: bool = True
prune_non_personal_refs: bool = True
ref_touch_interval_s: int = Field(default=MSTEAMS_REF_TOUCH_INTERVAL_S, ge=0)
@dataclass
@ -113,9 +123,13 @@ class MSTeamsChannel(BaseChannel):
self._botframework_jwks_expires_at: float = 0.0
self._refs_path = get_workspace_path() / "state" / "msteams_conversations.json"
self._refs_path.parent.mkdir(parents=True, exist_ok=True)
self._refs_meta_path = self._refs_path.parent / MSTEAMS_REF_META_FILENAME
self._refs_lock_path = self._refs_path.parent / MSTEAMS_REF_LOCK_FILENAME
self._refs_guard = threading.RLock()
self._conversation_refs: dict[str, ConversationRef] = self._load_refs()
if self._prune_conversation_refs():
self._save_refs(prune=False)
with self._refs_guard:
if self._prune_conversation_refs():
self._save_refs_locked(prune=True)
async def start(self) -> None:
"""Start the Teams webhook listener."""
@ -249,6 +263,7 @@ class MSTeamsChannel(BaseChannel):
resp = await self._http.post(url, headers=headers, json=payload)
resp.raise_for_status()
logger.info("MSTeams message sent to {}", ref.conversation_id)
self._touch_conversation_ref(str(msg.chat_id), persist=True)
except Exception as e:
logger.error("MSTeams send failed: {}", e)
raise
@ -295,16 +310,17 @@ class MSTeamsChannel(BaseChannel):
)
return
self._conversation_refs[conversation_id] = ConversationRef(
service_url=service_url,
conversation_id=conversation_id,
bot_id=str(recipient.get("id") or "") or None,
activity_id=activity_id or None,
conversation_type=conversation_type or None,
tenant_id=str((channel_data.get("tenant") or {}).get("id") or "") or None,
updated_at=time.time(),
)
self._save_refs()
with self._refs_guard:
self._conversation_refs[conversation_id] = ConversationRef(
service_url=service_url,
conversation_id=conversation_id,
bot_id=str(recipient.get("id") or "") or None,
activity_id=activity_id or None,
conversation_type=conversation_type or None,
tenant_id=str((channel_data.get("tenant") or {}).get("id") or "") or None,
updated_at=time.time(),
)
self._save_refs_locked()
await self._handle_message(
sender_id=sender_id,
@ -491,19 +507,109 @@ class MSTeamsChannel(BaseChannel):
self._botframework_jwks_expires_at = now + 3600
return self._botframework_jwks
@staticmethod
def _safe_float(value: Any) -> float | None:
try:
out = float(value)
if out > 0:
return out
except (TypeError, ValueError):
return None
return None
def _normalize_ref_record(self, value: Any) -> ConversationRef | None:
"""Normalize a stored ref record from legacy/current schema."""
if not isinstance(value, dict):
return None
service_url = str(value.get("service_url") or "").strip()
conversation_id = str(value.get("conversation_id") or "").strip()
if not service_url or not conversation_id:
return None
return ConversationRef(
service_url=service_url,
conversation_id=conversation_id,
bot_id=str(value.get("bot_id") or "") or None,
activity_id=str(value.get("activity_id") or "") or None,
conversation_type=str(value.get("conversation_type") or "") or None,
tenant_id=str(value.get("tenant_id") or "") or None,
updated_at=self._safe_float(value.get("updated_at")),
)
def _load_refs_raw(self) -> tuple[dict[str, Any], dict[str, Any], bool]:
"""Load raw refs/main+meta JSON payloads."""
main_data: dict[str, Any] = {}
meta_data: dict[str, Any] = {}
meta_exists = self._refs_meta_path.exists()
if self._refs_path.exists():
try:
loaded = json.loads(self._refs_path.read_text(encoding="utf-8"))
if isinstance(loaded, dict):
main_data = loaded
except Exception as e:
logger.warning("Failed to load MSTeams conversation refs: {}", e)
if meta_exists:
try:
loaded_meta = json.loads(self._refs_meta_path.read_text(encoding="utf-8"))
if isinstance(loaded_meta, dict):
meta_data = loaded_meta
except Exception as e:
logger.warning("Failed to load MSTeams conversation refs metadata: {}", e)
return main_data, meta_data, meta_exists
def _load_refs_from_disk(self) -> dict[str, ConversationRef]:
"""Load refs from disk with compatibility fallback for legacy layouts."""
main_data, meta_data, meta_exists = self._load_refs_raw()
if not main_data:
return {}
out: dict[str, ConversationRef] = {}
now = time.time()
for key, value in main_data.items():
ref = self._normalize_ref_record(value)
if not ref:
continue
meta_entry = meta_data.get(key) if isinstance(meta_data, dict) else None
meta_ts = None
if isinstance(meta_entry, dict):
meta_ts = self._safe_float(meta_entry.get("updated_at"))
elif meta_entry is not None:
meta_ts = self._safe_float(meta_entry)
if meta_ts is not None:
ref.updated_at = meta_ts
elif not meta_exists:
# First run after introducing meta sidecar: keep legacy refs alive
# by initializing timestamps to "now" instead of purging immediately.
ref.updated_at = now
elif ref.updated_at is None:
ref.updated_at = now
out[key] = ref
return out
def _load_refs(self) -> dict[str, ConversationRef]:
"""Load stored conversation references."""
if not self._refs_path.exists():
return {}
return self._load_refs_from_disk()
@contextmanager
def _refs_file_lock(self):
"""Cross-process lock while merging and writing refs state."""
self._refs_path.parent.mkdir(parents=True, exist_ok=True)
lock_fp = self._refs_lock_path.open("a+", encoding="utf-8")
try:
data = json.loads(self._refs_path.read_text(encoding="utf-8"))
out: dict[str, ConversationRef] = {}
for key, value in data.items():
out[key] = ConversationRef(**value)
return out
except Exception as e:
logger.warning("Failed to load MSTeams conversation refs: {}", e)
return {}
if fcntl is not None:
fcntl.flock(lock_fp.fileno(), fcntl.LOCK_EX)
yield
finally:
try:
if fcntl is not None:
fcntl.flock(lock_fp.fileno(), fcntl.LOCK_UN)
finally:
lock_fp.close()
def _is_webchat_service_url(self, service_url: str) -> bool:
"""Return True when service URL points to unsupported Bot Framework Web Chat."""
@ -554,21 +660,49 @@ class MSTeamsChannel(BaseChannel):
)
return True
def _write_refs_atomically(self, data: dict[str, Any]) -> None:
def _merge_refs_from_disk_locked(self) -> None:
"""Merge disk refs into memory to reduce lost updates across processes."""
disk_refs = self._load_refs_from_disk()
for key, disk_ref in disk_refs.items():
mem_ref = self._conversation_refs.get(key)
if mem_ref is None:
self._conversation_refs[key] = disk_ref
continue
disk_ts = self._safe_float(disk_ref.updated_at) or 0.0
mem_ts = self._safe_float(mem_ref.updated_at) or 0.0
if disk_ts > mem_ts:
self._conversation_refs[key] = disk_ref
def _touch_conversation_ref(self, chat_id: str, *, persist: bool = False) -> None:
"""Refresh updated_at for an active ref to keep it from expiring while used."""
with self._refs_guard:
ref = self._conversation_refs.get(str(chat_id))
if not ref:
return
now = time.time()
prev = self._safe_float(ref.updated_at) or 0.0
min_interval = max(0, int(self.config.ref_touch_interval_s))
if min_interval > 0 and prev > 0 and now - prev < min_interval:
return
ref.updated_at = now
if persist:
self._save_refs_locked()
def _write_json_atomically(self, path, data: dict[str, Any]) -> None:
"""Write refs JSON atomically to reduce corruption risk during crashes."""
payload = json.dumps(data, indent=2)
tmp_path: str | None = None
try:
fd, tmp_path = tempfile.mkstemp(
dir=str(self._refs_path.parent),
prefix=f"{self._refs_path.name}.",
dir=str(path.parent),
prefix=f"{path.name}.",
suffix=".tmp",
)
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(payload)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, self._refs_path)
os.replace(tmp_path, path)
finally:
if tmp_path and os.path.exists(tmp_path):
try:
@ -576,27 +710,40 @@ class MSTeamsChannel(BaseChannel):
except OSError:
pass
def _save_refs(self, *, prune: bool = True) -> None:
"""Persist conversation references."""
def _save_refs_locked(self, *, prune: bool = True) -> None:
"""Persist conversation references (caller must hold _refs_guard)."""
try:
if prune:
self._prune_conversation_refs()
data = {
key: {
"service_url": ref.service_url,
"conversation_id": ref.conversation_id,
"bot_id": ref.bot_id,
"activity_id": ref.activity_id,
"conversation_type": ref.conversation_type,
"tenant_id": ref.tenant_id,
"updated_at": ref.updated_at,
with self._refs_file_lock():
self._merge_refs_from_disk_locked()
if prune:
self._prune_conversation_refs()
refs_data = {
key: {
"service_url": ref.service_url,
"conversation_id": ref.conversation_id,
"bot_id": ref.bot_id,
"activity_id": ref.activity_id,
"conversation_type": ref.conversation_type,
"tenant_id": ref.tenant_id,
}
for key, ref in self._conversation_refs.items()
}
for key, ref in self._conversation_refs.items()
}
self._write_refs_atomically(data)
refs_meta = {
key: {
"updated_at": self._safe_float(ref.updated_at),
}
for key, ref in self._conversation_refs.items()
}
self._write_json_atomically(self._refs_path, refs_data)
self._write_json_atomically(self._refs_meta_path, refs_meta)
except Exception as e:
logger.warning("Failed to save MSTeams conversation refs: {}", e)
def _save_refs(self, *, prune: bool = True) -> None:
"""Persist conversation references."""
with self._refs_guard:
self._save_refs_locked(prune=prune)
async def _get_access_token(self) -> str:
"""Fetch an access token for Bot Framework / Azure Bot auth."""

View File

@ -115,7 +115,10 @@ async def test_handle_activity_personal_message_publishes_and_stores_ref(make_ch
saved = json.loads((tmp_path / "state" / "msteams_conversations.json").read_text(encoding="utf-8"))
assert saved["conv-123"]["conversation_id"] == "conv-123"
assert saved["conv-123"]["tenant_id"] == "tenant-id"
assert float(saved["conv-123"]["updated_at"]) > 0
saved_meta = json.loads(
(tmp_path / "state" / msteams_module.MSTEAMS_REF_META_FILENAME).read_text(encoding="utf-8"),
)
assert float(saved_meta["conv-123"]["updated_at"]) > 0
def test_init_prunes_stale_and_unsupported_conversation_refs(make_channel, tmp_path, monkeypatch):
@ -125,6 +128,7 @@ def test_init_prunes_stale_and_unsupported_conversation_refs(make_channel, tmp_p
state_dir = tmp_path / "state"
state_dir.mkdir(parents=True, exist_ok=True)
refs_path = state_dir / "msteams_conversations.json"
refs_meta_path = state_dir / msteams_module.MSTEAMS_REF_META_FILENAME
refs_path.write_text(
json.dumps(
{
@ -132,25 +136,21 @@ def test_init_prunes_stale_and_unsupported_conversation_refs(make_channel, tmp_p
"service_url": "https://smba.trafficmanager.net/amer/",
"conversation_id": "conv-valid",
"conversation_type": "personal",
"updated_at": now - 60,
},
"conv-webchat": {
"service_url": "https://webchat.botframework.com/",
"conversation_id": "conv-webchat",
"conversation_type": "personal",
"updated_at": now - 60,
},
"conv-group": {
"service_url": "https://smba.trafficmanager.net/amer/",
"conversation_id": "conv-group",
"conversation_type": "channel",
"updated_at": now - 60,
},
"conv-stale": {
"service_url": "https://smba.trafficmanager.net/amer/",
"conversation_id": "conv-stale",
"conversation_type": "personal",
"updated_at": now - msteams_module.MSTEAMS_REF_TTL_S - 1,
},
"conv-missing-ts": {
"service_url": "https://smba.trafficmanager.net/amer/",
@ -162,14 +162,27 @@ def test_init_prunes_stale_and_unsupported_conversation_refs(make_channel, tmp_p
),
encoding="utf-8",
)
refs_meta_path.write_text(
json.dumps(
{
"conv-valid": {"updated_at": now - 60},
"conv-webchat": {"updated_at": now - 60},
"conv-group": {"updated_at": now - 60},
"conv-stale": {"updated_at": now - msteams_module.MSTEAMS_REF_TTL_S - 1},
},
indent=2,
),
encoding="utf-8",
)
ch = make_channel()
assert set(ch._conversation_refs.keys()) == {"conv-valid"}
assert set(ch._conversation_refs.keys()) == {"conv-valid", "conv-missing-ts"}
assert ch._conversation_refs["conv-valid"].conversation_id == "conv-valid"
assert ch._conversation_refs["conv-missing-ts"].conversation_id == "conv-missing-ts"
persisted = json.loads(refs_path.read_text(encoding="utf-8"))
assert set(persisted.keys()) == {"conv-valid"}
assert set(persisted.keys()) == {"conv-valid", "conv-missing-ts"}
def test_save_prunes_unsupported_conversation_refs(make_channel, tmp_path, monkeypatch):
@ -204,6 +217,10 @@ def test_save_prunes_unsupported_conversation_refs(make_channel, tmp_path, monke
saved = json.loads((tmp_path / "state" / "msteams_conversations.json").read_text(encoding="utf-8"))
assert set(saved.keys()) == {"conv-valid"}
saved_meta = json.loads(
(tmp_path / "state" / msteams_module.MSTEAMS_REF_META_FILENAME).read_text(encoding="utf-8"),
)
assert set(saved_meta.keys()) == {"conv-valid"}
def test_init_respects_prune_toggle_flags(make_channel, tmp_path, monkeypatch):
@ -248,6 +265,7 @@ def test_init_respects_custom_ref_ttl_days(make_channel, tmp_path, monkeypatch):
state_dir = tmp_path / "state"
state_dir.mkdir(parents=True, exist_ok=True)
refs_path = state_dir / "msteams_conversations.json"
refs_meta_path = state_dir / msteams_module.MSTEAMS_REF_META_FILENAME
refs_path.write_text(
json.dumps(
{
@ -255,19 +273,27 @@ def test_init_respects_custom_ref_ttl_days(make_channel, tmp_path, monkeypatch):
"service_url": "https://smba.trafficmanager.net/amer/",
"conversation_id": "conv-fresh",
"conversation_type": "personal",
"updated_at": now - 12 * 60 * 60,
},
"conv-old": {
"service_url": "https://smba.trafficmanager.net/amer/",
"conversation_id": "conv-old",
"conversation_type": "personal",
"updated_at": now - 10 * 24 * 60 * 60,
},
},
indent=2,
),
encoding="utf-8",
)
refs_meta_path.write_text(
json.dumps(
{
"conv-fresh": {"updated_at": now - 12 * 60 * 60},
"conv-old": {"updated_at": now - 10 * 24 * 60 * 60},
},
indent=2,
),
encoding="utf-8",
)
ch = make_channel(refTtlDays=1)
@ -276,6 +302,34 @@ def test_init_respects_custom_ref_ttl_days(make_channel, tmp_path, monkeypatch):
assert set(persisted.keys()) == {"conv-fresh"}
def test_init_without_meta_keeps_legacy_refs_alive(make_channel, tmp_path, monkeypatch):
now = 1_800_000_000.0
monkeypatch.setattr(msteams_module.time, "time", lambda: now)
state_dir = tmp_path / "state"
state_dir.mkdir(parents=True, exist_ok=True)
refs_path = state_dir / "msteams_conversations.json"
refs_path.write_text(
json.dumps(
{
"conv-legacy": {
"service_url": "https://smba.trafficmanager.net/amer/",
"conversation_id": "conv-legacy",
"conversation_type": "personal",
}
},
indent=2,
),
encoding="utf-8",
)
ch = make_channel(refTtlDays=1)
assert set(ch._conversation_refs.keys()) == {"conv-legacy"}
assert ch._conversation_refs["conv-legacy"].updated_at == now
assert not (state_dir / msteams_module.MSTEAMS_REF_META_FILENAME).exists()
def test_save_uses_atomic_replace_and_keeps_existing_file_on_replace_error(make_channel, tmp_path, monkeypatch):
ch = make_channel()
refs_path = tmp_path / "state" / "msteams_conversations.json"
@ -591,6 +645,33 @@ async def test_send_replies_to_activity_when_reply_in_thread_enabled(make_channe
assert kwargs["json"]["replyToId"] == "activity-1"
@pytest.mark.asyncio
async def test_send_success_refreshes_updated_at_and_persists_meta(make_channel, tmp_path, monkeypatch):
now = {"value": 1_800_000_000.0}
monkeypatch.setattr(msteams_module.time, "time", lambda: now["value"])
ch = make_channel(refTouchIntervalS=0)
fake_http = FakeHttpClient()
ch._http = fake_http
ch._token = "tok"
ch._token_expires_at = 9_999_999_999
ch._conversation_refs["conv-123"] = ConversationRef(
service_url="https://smba.trafficmanager.net/amer/",
conversation_id="conv-123",
activity_id="activity-1",
updated_at=now["value"] - 100,
)
now["value"] += 5
await ch.send(OutboundMessage(channel="msteams", chat_id="conv-123", content="Reply text"))
assert ch._conversation_refs["conv-123"].updated_at == now["value"]
saved_meta = json.loads(
(tmp_path / "state" / msteams_module.MSTEAMS_REF_META_FILENAME).read_text(encoding="utf-8"),
)
assert saved_meta["conv-123"]["updated_at"] == now["value"]
@pytest.mark.asyncio
async def test_send_posts_to_conversation_when_thread_reply_disabled(make_channel):
ch = make_channel(replyInThread=False)
@ -756,6 +837,7 @@ def test_msteams_default_config_includes_restart_notify_fields():
assert cfg["refTtlDays"] == msteams_module.MSTEAMS_REF_TTL_DAYS
assert cfg["pruneWebChatRefs"] is True
assert cfg["pruneNonPersonalRefs"] is True
assert cfg["refTouchIntervalS"] == msteams_module.MSTEAMS_REF_TOUCH_INTERVAL_S
assert "restartNotifyEnabled" not in cfg
assert "restartNotifyPreMessage" not in cfg
assert "restartNotifyPostMessage" not in cfg