Add Microsoft Teams channel on current nightly base

This commit is contained in:
T3chC0wb0y 2026-04-02 02:13:36 -05:00 committed by chengyongru
parent 7d2c62716c
commit 5857f7fdd0
3 changed files with 1260 additions and 0 deletions

68
docs/MSTEAMS.md Normal file
View File

@ -0,0 +1,68 @@
# Microsoft Teams (MVP)
This repository includes a built-in `msteams` channel MVP for Microsoft Teams direct messages.
## Current scope
- Direct-message text in/out
- Tenant-aware OAuth token acquisition
- Conversation reference persistence for replies
- Public HTTPS webhook support through a tunnel or reverse proxy
## Not yet included
- Group/channel handling
- Attachments and cards
- Polls
- Richer Teams activity handling
## Example config
```json
{
"channels": {
"msteams": {
"enabled": true,
"appId": "YOUR_APP_ID",
"appPassword": "YOUR_APP_SECRET",
"tenantId": "YOUR_TENANT_ID",
"host": "0.0.0.0",
"port": 3978,
"path": "/api/messages",
"allowFrom": ["*"],
"replyInThread": true,
"mentionOnlyResponse": "Hi — what can I help with?",
"validateInboundAuth": false,
"restartNotifyEnabled": false,
"restartNotifyPreMessage": "Nanobot agent initiated a gateway restart. I will message again when the gateway is back online.",
"restartNotifyPostMessage": "Nanobot gateway is back online."
}
}
}
```
## Behavior notes
- `replyInThread: true` replies to the triggering Teams activity when a stored `activity_id` is available.
- `replyInThread: false` posts replies as normal conversation messages.
- If `replyInThread` is enabled but no `activity_id` is stored, Nanobot falls back to a normal conversation message.
- `mentionOnlyResponse` controls what Nanobot receives when a user sends only a bot mention such as `<at>Nanobot</at>`.
- Set `mentionOnlyResponse` to an empty string to ignore mention-only messages.
- `validateInboundAuth: true` enables inbound Bot Framework bearer-token validation.
- `validateInboundAuth: false` leaves inbound auth unenforced, which is safer while first validating a new relay, tunnel, or proxy path.
- When enabled, Nanobot validates the inbound bearer token signature, issuer, audience, token lifetime, and `serviceUrl` claim when present.
- `restartNotifyEnabled: true` enables optional Teams restart-notification configuration for external wrapper-script driven restarts.
- `restartNotifyPreMessage` and `restartNotifyPostMessage` control the before/after announcement text used by that external wrapper.
## Setup notes
1. Create or reuse a Microsoft Teams / Azure bot app registration.
2. Set the bot messaging endpoint to a public HTTPS URL ending in `/api/messages`.
3. Forward that public endpoint to `http://localhost:3978/api/messages`.
4. Start Nanobot with:
```bash
nanobot gateway
```
5. Optional: if you use an external restart wrapper (for example a script that stops and restarts the gateway), you can enable Teams restart announcements with `restartNotifyEnabled: true` and have the wrapper send `restartNotifyPreMessage` before restart and `restartNotifyPostMessage` after the gateway is back online.

508
nanobot/channels/msteams.py Normal file
View File

@ -0,0 +1,508 @@
"""Microsoft Teams channel MVP using a tiny built-in HTTP webhook server.
Scope:
- DM-focused MVP
- text inbound/outbound
- conversation reference persistence
- sender allowlist support
- optional inbound Bot Framework bearer-token validation
- no attachments/cards/polls yet
"""
from __future__ import annotations
import asyncio
import html
import json
import re
import threading
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
import httpx
import jwt
from loguru import logger
from pydantic import Field
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.paths import get_workspace_path
from nanobot.config.schema import Base
class MSTeamsConfig(Base):
"""Microsoft Teams channel configuration."""
enabled: bool = False
app_id: str = ""
app_password: str = ""
tenant_id: str = ""
host: str = "0.0.0.0"
port: int = 3978
path: str = "/api/messages"
allow_from: list[str] = Field(default_factory=list)
reply_in_thread: bool = True
mention_only_response: str = "Hi — what can I help with?"
validate_inbound_auth: bool = False
restart_notify_enabled: bool = False
restart_notify_pre_message: str = (
"Nanobot agent initiated a gateway restart. I will message again when the gateway is back online."
)
restart_notify_post_message: str = "Nanobot gateway is back online."
@dataclass
class ConversationRef:
"""Minimal stored conversation reference for replies."""
service_url: str
conversation_id: str
bot_id: str | None = None
activity_id: str | None = None
conversation_type: str | None = None
tenant_id: str | None = None
class MSTeamsChannel(BaseChannel):
"""Microsoft Teams channel (DM-first MVP)."""
name = "msteams"
display_name = "Microsoft Teams"
@classmethod
def default_config(cls) -> dict[str, Any]:
return MSTeamsConfig().model_dump(by_alias=True)
def __init__(self, config: Any, bus: MessageBus):
if isinstance(config, dict):
config = MSTeamsConfig.model_validate(config)
super().__init__(config, bus)
self.config: MSTeamsConfig = config
self._loop: asyncio.AbstractEventLoop | None = None
self._server: ThreadingHTTPServer | None = None
self._server_thread: threading.Thread | None = None
self._http: httpx.AsyncClient | None = None
self._token: str | None = None
self._token_expires_at: float = 0.0
self._botframework_openid_config_url = (
"https://login.botframework.com/v1/.well-known/openidconfiguration"
)
self._botframework_openid_config: dict[str, Any] | None = None
self._botframework_openid_config_expires_at: float = 0.0
self._botframework_jwks: dict[str, Any] | None = None
self._botframework_jwks_expires_at: float = 0.0
self._refs_path = get_workspace_path() / "state" / "msteams_conversations.json"
self._refs_path.parent.mkdir(parents=True, exist_ok=True)
self._conversation_refs: dict[str, ConversationRef] = self._load_refs()
async def start(self) -> None:
"""Start the Teams webhook listener."""
if not self.config.app_id or not self.config.app_password:
logger.error("MSTeams app_id/app_password not configured")
return
self._loop = asyncio.get_running_loop()
self._http = httpx.AsyncClient(timeout=30.0)
self._running = True
channel = self
class Handler(BaseHTTPRequestHandler):
def do_POST(self) -> None:
if self.path != channel.config.path:
self.send_response(404)
self.end_headers()
return
try:
length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(length) if length > 0 else b"{}"
payload = json.loads(raw.decode("utf-8"))
except Exception as e:
logger.warning("MSTeams invalid request body: {}", e)
self.send_response(400)
self.end_headers()
return
auth_header = self.headers.get("Authorization", "")
if channel.config.validate_inbound_auth:
try:
fut = asyncio.run_coroutine_threadsafe(
channel._validate_inbound_auth(auth_header, payload),
channel._loop,
)
fut.result(timeout=15)
except Exception as e:
logger.warning("MSTeams inbound auth validation failed: {}", e)
self.send_response(401)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(b'{"error":"unauthorized"}')
return
try:
fut = asyncio.run_coroutine_threadsafe(
channel._handle_activity(payload),
channel._loop,
)
fut.result(timeout=15)
except Exception as e:
logger.warning("MSTeams activity handling failed: {}", e)
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(b"{}")
def log_message(self, format: str, *args: Any) -> None:
return
self._server = ThreadingHTTPServer((self.config.host, self.config.port), Handler)
self._server_thread = threading.Thread(
target=self._server.serve_forever,
name="nanobot-msteams",
daemon=True,
)
self._server_thread.start()
logger.info(
"MSTeams webhook listening on http://{}:{}{}",
self.config.host,
self.config.port,
self.config.path,
)
while self._running:
await asyncio.sleep(1)
async def stop(self) -> None:
"""Stop the channel."""
self._running = False
if self._server:
self._server.shutdown()
self._server.server_close()
self._server = None
if self._server_thread and self._server_thread.is_alive():
self._server_thread.join(timeout=2)
self._server_thread = None
if self._http:
await self._http.aclose()
self._http = None
async def send(self, msg: OutboundMessage) -> None:
"""Send a plain text reply into an existing Teams conversation."""
if not self._http:
logger.warning("MSTeams HTTP client not initialized")
return
ref = self._conversation_refs.get(str(msg.chat_id))
if not ref:
logger.warning("MSTeams conversation ref not found for chat_id={}", msg.chat_id)
return
token = await self._get_access_token()
base_url = f"{ref.service_url.rstrip('/')}/v3/conversations/{ref.conversation_id}/activities"
use_thread_reply = self.config.reply_in_thread and bool(ref.activity_id)
url = (
f"{base_url}/{ref.activity_id}"
if use_thread_reply
else base_url
)
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
payload = {
"type": "message",
"text": msg.content or " ",
}
if use_thread_reply:
payload["replyToId"] = ref.activity_id
try:
resp = await self._http.post(url, headers=headers, json=payload)
resp.raise_for_status()
logger.info("MSTeams message sent to {}", ref.conversation_id)
except Exception as e:
logger.error("MSTeams send failed: {}", e)
async def _handle_activity(self, activity: dict[str, Any]) -> None:
"""Handle inbound Teams/Bot Framework activity."""
if activity.get("type") != "message":
return
conversation = activity.get("conversation") or {}
from_user = activity.get("from") or {}
recipient = activity.get("recipient") or {}
channel_data = activity.get("channelData") or {}
sender_id = str(from_user.get("aadObjectId") or from_user.get("id") or "").strip()
conversation_id = str(conversation.get("id") or "").strip()
text = str(activity.get("text") or "").strip()
service_url = str(activity.get("serviceUrl") or "").strip()
activity_id = str(activity.get("id") or "").strip()
conversation_type = str(conversation.get("conversationType") or "").strip()
if not sender_id or not conversation_id or not service_url:
return
if recipient.get("id") and from_user.get("id") == recipient.get("id"):
return
# DM-only MVP: ignore group/channel traffic for now
if conversation_type and conversation_type not in ("personal", ""):
logger.debug("MSTeams ignoring non-DM conversation {}", conversation_type)
return
if not self.is_allowed(sender_id):
return
text = self._sanitize_inbound_text(activity)
if not text:
text = self.config.mention_only_response.strip()
if not text:
logger.debug("MSTeams ignoring empty message after Teams text sanitization")
return
self._conversation_refs[conversation_id] = ConversationRef(
service_url=service_url,
conversation_id=conversation_id,
bot_id=str(recipient.get("id") or "") or None,
activity_id=activity_id or None,
conversation_type=conversation_type or None,
tenant_id=str((channel_data.get("tenant") or {}).get("id") or "") or None,
)
self._save_refs()
await self._handle_message(
sender_id=sender_id,
chat_id=conversation_id,
content=text,
metadata={
"msteams": {
"activity_id": activity_id,
"conversation_id": conversation_id,
"conversation_type": conversation_type or "personal",
"from_name": from_user.get("name"),
}
},
)
def _sanitize_inbound_text(self, activity: dict[str, Any]) -> str:
"""Extract the user-authored text from a Teams activity."""
text = str(activity.get("text") or "")
text = self._strip_possible_bot_mention(text)
channel_data = activity.get("channelData") or {}
reply_to_id = str(activity.get("replyToId") or "").strip()
normalized_preview = html.unescape(text).replace("&rsquo", "").strip()
normalized_preview = normalized_preview.replace("\r\n", "\n").replace("\r", "\n")
preview_lines = [line.strip() for line in normalized_preview.split("\n")]
while preview_lines and not preview_lines[0]:
preview_lines.pop(0)
first_line = preview_lines[0] if preview_lines else ""
looks_like_quote_wrapper = first_line.lower().startswith("replying to ") or first_line.startswith("FWDIOC-BOT")
if reply_to_id or channel_data.get("messageType") == "reply" or looks_like_quote_wrapper:
text = self._normalize_teams_reply_quote(text)
return text.strip()
def _strip_possible_bot_mention(self, text: str) -> str:
"""Remove simple Teams mention markup from message text."""
cleaned = re.sub(r"<at\b[^>]*>.*?</at>", " ", text, flags=re.IGNORECASE | re.DOTALL)
cleaned = re.sub(r"[^\S\r\n]+", " ", cleaned)
cleaned = re.sub(r"(?:\r?\n){3,}", "\n\n", cleaned)
return cleaned.strip()
def _normalize_teams_reply_quote(self, text: str) -> str:
"""Normalize Teams quoted replies into a compact structured form."""
cleaned = html.unescape(text).replace("&rsquo", "").strip()
if not cleaned:
return ""
normalized_newlines = cleaned.replace("\r\n", "\n").replace("\r", "\n")
lines = [line.strip() for line in normalized_newlines.split("\n")]
while lines and not lines[0]:
lines.pop(0)
if len(lines) >= 2 and lines[0].lower().startswith("replying to "):
quoted = lines[0][len("replying to ") :].strip(" :")
reply = "\n".join(lines[1:]).strip()
return self._format_reply_with_quote(quoted, reply)
if lines and lines[0].strip().startswith("FWDIOC-BOT"):
body = normalized_newlines.split("\n", 1)[1] if "\n" in normalized_newlines else ""
body = body.lstrip()
parts = re.split(r"\n\s*\n", body, maxsplit=1)
if len(parts) == 2:
quoted = re.sub(r"\s+", " ", parts[0]).strip()
reply = re.sub(r"\s+", " ", parts[1]).strip()
if quoted or reply:
return self._format_reply_with_quote(quoted, reply)
body_lines = [line.strip() for line in body.split("\n") if line.strip()]
if body_lines:
quoted = " ".join(body_lines[:-1]).strip()
reply = body_lines[-1].strip()
if quoted and reply:
return self._format_reply_with_quote(quoted, reply)
compact = re.sub(r"\s+", " ", normalized_newlines).strip()
if compact.startswith("FWDIOC-BOT "):
compact = compact[len("FWDIOC-BOT ") :].strip()
marker = " Reply with quote test"
if compact.endswith(marker):
quoted = compact[: -len(marker)].strip()
reply = marker.strip()
return self._format_reply_with_quote(quoted, reply)
return cleaned
def _format_reply_with_quote(self, quoted: str, reply: str) -> str:
"""Format a quoted reply for the model without Teams wrapper noise."""
quoted = quoted.strip()
reply = reply.strip()
if quoted and reply:
return f"User is replying to: {quoted}\nUser reply: {reply}"
if reply:
return reply
return quoted
async def _validate_inbound_auth(self, auth_header: str, activity: dict[str, Any]) -> None:
"""Validate inbound Bot Framework bearer token."""
if not auth_header.lower().startswith("bearer "):
raise ValueError("missing bearer token")
token = auth_header.split(" ", 1)[1].strip()
if not token:
raise ValueError("empty bearer token")
header = jwt.get_unverified_header(token)
kid = str(header.get("kid") or "").strip()
if not kid:
raise ValueError("missing token kid")
jwks = await self._get_botframework_jwks()
keys = jwks.get("keys") or []
jwk = next((key for key in keys if key.get("kid") == kid), None)
if not jwk:
raise ValueError(f"signing key not found for kid={kid}")
public_key = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(jwk))
claims = jwt.decode(
token,
key=public_key,
algorithms=["RS256"],
audience=self.config.app_id,
issuer="https://api.botframework.com",
options={
"require": ["exp", "nbf", "iss", "aud"],
},
)
claim_service_url = str(
claims.get("serviceurl") or claims.get("serviceUrl") or "",
).strip()
activity_service_url = str(activity.get("serviceUrl") or "").strip()
if claim_service_url and activity_service_url and claim_service_url != activity_service_url:
raise ValueError("serviceUrl claim mismatch")
async def _get_botframework_openid_config(self) -> dict[str, Any]:
"""Fetch and cache Bot Framework OpenID configuration."""
import time
now = time.time()
if self._botframework_openid_config and now < self._botframework_openid_config_expires_at:
return self._botframework_openid_config
if not self._http:
raise RuntimeError("MSTeams HTTP client not initialized")
resp = await self._http.get(self._botframework_openid_config_url)
resp.raise_for_status()
self._botframework_openid_config = resp.json()
self._botframework_openid_config_expires_at = now + 3600
return self._botframework_openid_config
async def _get_botframework_jwks(self) -> dict[str, Any]:
"""Fetch and cache Bot Framework JWKS."""
import time
now = time.time()
if self._botframework_jwks and now < self._botframework_jwks_expires_at:
return self._botframework_jwks
if not self._http:
raise RuntimeError("MSTeams HTTP client not initialized")
openid_config = await self._get_botframework_openid_config()
jwks_uri = str(openid_config.get("jwks_uri") or "").strip()
if not jwks_uri:
raise RuntimeError("Bot Framework OpenID config missing jwks_uri")
resp = await self._http.get(jwks_uri)
resp.raise_for_status()
self._botframework_jwks = resp.json()
self._botframework_jwks_expires_at = now + 3600
return self._botframework_jwks
def _load_refs(self) -> dict[str, ConversationRef]:
"""Load stored conversation references."""
if not self._refs_path.exists():
return {}
try:
data = json.loads(self._refs_path.read_text(encoding="utf-8"))
out: dict[str, ConversationRef] = {}
for key, value in data.items():
out[key] = ConversationRef(**value)
return out
except Exception as e:
logger.warning("Failed to load MSTeams conversation refs: {}", e)
return {}
def _save_refs(self) -> None:
"""Persist conversation references."""
try:
data = {
key: {
"service_url": ref.service_url,
"conversation_id": ref.conversation_id,
"bot_id": ref.bot_id,
"activity_id": ref.activity_id,
"conversation_type": ref.conversation_type,
"tenant_id": ref.tenant_id,
}
for key, ref in self._conversation_refs.items()
}
self._refs_path.write_text(json.dumps(data, indent=2), encoding="utf-8")
except Exception as e:
logger.warning("Failed to save MSTeams conversation refs: {}", e)
async def _get_access_token(self) -> str:
"""Fetch an access token for Bot Framework / Azure Bot auth."""
import time
now = time.time()
if self._token and now < self._token_expires_at - 60:
return self._token
if not self._http:
raise RuntimeError("MSTeams HTTP client not initialized")
tenant = (self.config.tenant_id or "").strip() or "botframework.com"
token_url = f"https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token"
data = {
"grant_type": "client_credentials",
"client_id": self.config.app_id,
"client_secret": self.config.app_password,
"scope": "https://api.botframework.com/.default",
}
resp = await self._http.post(token_url, data=data)
resp.raise_for_status()
payload = resp.json()
self._token = payload["access_token"]
self._token_expires_at = now + int(payload.get("expires_in", 3600))
return self._token

684
tests/test_msteams.py Normal file
View File

@ -0,0 +1,684 @@
import json
import jwt
import pytest
from cryptography.hazmat.primitives.asymmetric import rsa
from nanobot.bus.events import OutboundMessage
from nanobot.channels.msteams import ConversationRef, MSTeamsChannel, MSTeamsConfig
class DummyBus:
def __init__(self):
self.inbound = []
async def publish_inbound(self, msg):
self.inbound.append(msg)
class FakeResponse:
def __init__(self, payload):
self._payload = payload
def raise_for_status(self):
return None
def json(self):
return self._payload
class FakeHttpClient:
def __init__(self, payload=None):
self.payload = payload or {"access_token": "tok", "expires_in": 3600}
self.calls = []
async def post(self, url, **kwargs):
self.calls.append((url, kwargs))
return FakeResponse(self.payload)
@pytest.mark.asyncio
async def test_handle_activity_personal_message_publishes_and_stores_ref(tmp_path, monkeypatch):
monkeypatch.setattr("nanobot.channels.msteams.get_workspace_path", lambda: tmp_path)
bus = DummyBus()
ch = MSTeamsChannel(
{
"enabled": True,
"appId": "app-id",
"appPassword": "secret",
"tenantId": "tenant-id",
"allowFrom": ["*"],
},
bus,
)
activity = {
"type": "message",
"id": "activity-1",
"text": "Hello from Teams",
"serviceUrl": "https://smba.trafficmanager.net/amer/",
"conversation": {
"id": "conv-123",
"conversationType": "personal",
},
"from": {
"id": "29:user-id",
"aadObjectId": "aad-user-1",
"name": "Bob",
},
"recipient": {
"id": "28:bot-id",
"name": "nanobot",
},
"channelData": {
"tenant": {"id": "tenant-id"},
},
}
await ch._handle_activity(activity)
assert len(bus.inbound) == 1
msg = bus.inbound[0]
assert msg.channel == "msteams"
assert msg.sender_id == "aad-user-1"
assert msg.chat_id == "conv-123"
assert msg.content == "Hello from Teams"
assert msg.metadata["msteams"]["conversation_id"] == "conv-123"
assert "conv-123" in ch._conversation_refs
saved = json.loads((tmp_path / "state" / "msteams_conversations.json").read_text(encoding="utf-8"))
assert saved["conv-123"]["conversation_id"] == "conv-123"
assert saved["conv-123"]["tenant_id"] == "tenant-id"
@pytest.mark.asyncio
async def test_handle_activity_ignores_group_messages(tmp_path, monkeypatch):
monkeypatch.setattr("nanobot.channels.msteams.get_workspace_path", lambda: tmp_path)
bus = DummyBus()
ch = MSTeamsChannel(
{
"enabled": True,
"appId": "app-id",
"appPassword": "secret",
"tenantId": "tenant-id",
"allowFrom": ["*"],
},
bus,
)
activity = {
"type": "message",
"id": "activity-2",
"text": "Hello group",
"serviceUrl": "https://smba.trafficmanager.net/amer/",
"conversation": {
"id": "conv-group",
"conversationType": "channel",
},
"from": {
"id": "29:user-id",
"aadObjectId": "aad-user-1",
"name": "Bob",
},
"recipient": {
"id": "28:bot-id",
"name": "nanobot",
},
}
await ch._handle_activity(activity)
assert bus.inbound == []
assert ch._conversation_refs == {}
@pytest.mark.asyncio
async def test_handle_activity_mention_only_uses_default_response(tmp_path, monkeypatch):
monkeypatch.setattr("nanobot.channels.msteams.get_workspace_path", lambda: tmp_path)
bus = DummyBus()
ch = MSTeamsChannel(
{
"enabled": True,
"appId": "app-id",
"appPassword": "secret",
"tenantId": "tenant-id",
"allowFrom": ["*"],
},
bus,
)
activity = {
"type": "message",
"id": "activity-3",
"text": "<at>Nanobot</at>",
"serviceUrl": "https://smba.trafficmanager.net/amer/",
"conversation": {
"id": "conv-empty",
"conversationType": "personal",
},
"from": {
"id": "29:user-id",
"aadObjectId": "aad-user-1",
"name": "Bob",
},
"recipient": {
"id": "28:bot-id",
"name": "nanobot",
},
}
await ch._handle_activity(activity)
assert len(bus.inbound) == 1
assert bus.inbound[0].content == "Hi — what can I help with?"
assert "conv-empty" in ch._conversation_refs
@pytest.mark.asyncio
async def test_handle_activity_mention_only_ignores_when_response_disabled(tmp_path, monkeypatch):
monkeypatch.setattr("nanobot.channels.msteams.get_workspace_path", lambda: tmp_path)
bus = DummyBus()
ch = MSTeamsChannel(
{
"enabled": True,
"appId": "app-id",
"appPassword": "secret",
"tenantId": "tenant-id",
"allowFrom": ["*"],
"mentionOnlyResponse": " ",
},
bus,
)
activity = {
"type": "message",
"id": "activity-4",
"text": "<at>Nanobot</at>",
"serviceUrl": "https://smba.trafficmanager.net/amer/",
"conversation": {
"id": "conv-empty-disabled",
"conversationType": "personal",
},
"from": {
"id": "29:user-id",
"aadObjectId": "aad-user-1",
"name": "Bob",
},
"recipient": {
"id": "28:bot-id",
"name": "nanobot",
},
}
await ch._handle_activity(activity)
assert bus.inbound == []
assert ch._conversation_refs == {}
def test_strip_possible_bot_mention_removes_generic_at_tags(tmp_path, monkeypatch):
monkeypatch.setattr("nanobot.channels.msteams.get_workspace_path", lambda: tmp_path)
bus = DummyBus()
ch = MSTeamsChannel(
{
"enabled": True,
"appId": "app-id",
"appPassword": "secret",
"tenantId": "tenant-id",
"allowFrom": ["*"],
},
bus,
)
assert ch._strip_possible_bot_mention("<at>Nanobot</at> hello") == "hello"
assert ch._strip_possible_bot_mention("hi <at>Some Bot</at> there") == "hi there"
def test_sanitize_inbound_text_keeps_normal_inline_message(tmp_path, monkeypatch):
monkeypatch.setattr("nanobot.channels.msteams.get_workspace_path", lambda: tmp_path)
bus = DummyBus()
ch = MSTeamsChannel(
{
"enabled": True,
"appId": "app-id",
"appPassword": "secret",
"tenantId": "tenant-id",
"allowFrom": ["*"],
},
bus,
)
activity = {
"text": "<at>Nanobot</at> normal inline message",
"channelData": {},
}
assert ch._sanitize_inbound_text(activity) == "normal inline message"
def test_sanitize_inbound_text_normalizes_fwdioc_wrapper_without_reply_metadata(tmp_path, monkeypatch):
monkeypatch.setattr("nanobot.channels.msteams.get_workspace_path", lambda: tmp_path)
bus = DummyBus()
ch = MSTeamsChannel(
{
"enabled": True,
"appId": "app-id",
"appPassword": "secret",
"tenantId": "tenant-id",
"allowFrom": ["*"],
},
bus,
)
activity = {
"text": "FWDIOC-BOT \r\nQuoted prior message\r\n\r\nThis is a reply with quote test",
"channelData": {},
}
assert ch._sanitize_inbound_text(activity) == (
"User is replying to: Quoted prior message\n"
"User reply: This is a reply with quote test"
)
def test_sanitize_inbound_text_structures_reply_quote_prefix(tmp_path, monkeypatch):
monkeypatch.setattr("nanobot.channels.msteams.get_workspace_path", lambda: tmp_path)
bus = DummyBus()
ch = MSTeamsChannel(
{
"enabled": True,
"appId": "app-id",
"appPassword": "secret",
"tenantId": "tenant-id",
"allowFrom": ["*"],
},
bus,
)
activity = {
"text": "Replying to Bob Smith\nactual reply text",
"replyToId": "parent-activity",
"channelData": {"messageType": "reply"},
}
assert ch._sanitize_inbound_text(activity) == "User is replying to: Bob Smith\nUser reply: actual reply text"
def test_sanitize_inbound_text_structures_live_fwdioc_quote_shape(tmp_path, monkeypatch):
monkeypatch.setattr("nanobot.channels.msteams.get_workspace_path", lambda: tmp_path)
bus = DummyBus()
ch = MSTeamsChannel(
{
"enabled": True,
"appId": "app-id",
"appPassword": "secret",
"tenantId": "tenant-id",
"allowFrom": ["*"],
},
bus,
)
activity = {
"text": "FWDIOC-BOT Got it. Ill watch for the exact text reply with quote test and then inspect that turn specifically. Reply with quote test",
"replyToId": "parent-activity",
"channelData": {"messageType": "reply"},
}
assert ch._sanitize_inbound_text(activity) == (
"User is replying to: Got it. Ill watch for the exact text reply with quote test and then inspect that turn specifically.\n"
"User reply: Reply with quote test"
)
def test_sanitize_inbound_text_structures_multiline_fwdioc_quote_shape(tmp_path, monkeypatch):
monkeypatch.setattr("nanobot.channels.msteams.get_workspace_path", lambda: tmp_path)
bus = DummyBus()
ch = MSTeamsChannel(
{
"enabled": True,
"appId": "app-id",
"appPassword": "secret",
"tenantId": "tenant-id",
"allowFrom": ["*"],
},
bus,
)
activity = {
"text": (
"FWDIOC-BOT\r\n"
"Understood — then the restart already happened, and the new Teams quote normalization should now be live. "
"Next best step: • send one more real reply-with-quote message in Teams • I&rsquo…\r\n"
"\r\n"
"This is a reply with quote"
),
"replyToId": "parent-activity",
"channelData": {"messageType": "reply"},
}
assert ch._sanitize_inbound_text(activity) == (
"User is replying to: Understood — then the restart already happened, and the new Teams quote normalization should now be live. "
"Next best step: • send one more real reply-with-quote message in Teams • I\n"
"User reply: This is a reply with quote"
)
def test_sanitize_inbound_text_structures_exact_live_crlf_fwdioc_shape(tmp_path, monkeypatch):
monkeypatch.setattr("nanobot.channels.msteams.get_workspace_path", lambda: tmp_path)
bus = DummyBus()
ch = MSTeamsChannel(
{
"enabled": True,
"appId": "app-id",
"appPassword": "secret",
"tenantId": "tenant-id",
"allowFrom": ["*"],
},
bus,
)
activity = {
"text": (
"FWDIOC-BOT \r\n"
"Please send one real reply-with-quote message in Teams. That single test should be enough now: "
"• Ill check the new MSTeams sanitized inbound text ... log • and compare it to the prompt…\r\n"
"\r\n"
"This is a reply with quote test"
),
"replyToId": "parent-activity",
"channelData": {"messageType": "reply"},
}
assert ch._sanitize_inbound_text(activity) == (
"User is replying to: Please send one real reply-with-quote message in Teams. That single test should be enough now: "
"• Ill check the new MSTeams sanitized inbound text ... log • and compare it to the prompt…\n"
"User reply: This is a reply with quote test"
)
@pytest.mark.asyncio
async def test_get_access_token_uses_configured_tenant(tmp_path, monkeypatch):
monkeypatch.setattr("nanobot.channels.msteams.get_workspace_path", lambda: tmp_path)
bus = DummyBus()
ch = MSTeamsChannel(
{
"enabled": True,
"appId": "app-id",
"appPassword": "secret",
"tenantId": "tenant-123",
"allowFrom": ["*"],
},
bus,
)
fake_http = FakeHttpClient()
ch._http = fake_http
token = await ch._get_access_token()
assert token == "tok"
assert len(fake_http.calls) == 1
url, kwargs = fake_http.calls[0]
assert url == "https://login.microsoftonline.com/tenant-123/oauth2/v2.0/token"
assert kwargs["data"]["client_id"] == "app-id"
assert kwargs["data"]["client_secret"] == "secret"
assert kwargs["data"]["scope"] == "https://api.botframework.com/.default"
@pytest.mark.asyncio
async def test_send_replies_to_activity_when_reply_in_thread_enabled(tmp_path, monkeypatch):
monkeypatch.setattr("nanobot.channels.msteams.get_workspace_path", lambda: tmp_path)
bus = DummyBus()
ch = MSTeamsChannel(
{
"enabled": True,
"appId": "app-id",
"appPassword": "secret",
"tenantId": "tenant-id",
"allowFrom": ["*"],
"replyInThread": True,
},
bus,
)
fake_http = FakeHttpClient()
ch._http = fake_http
ch._token = "tok"
ch._token_expires_at = 9999999999
ch._conversation_refs["conv-123"] = ConversationRef(
service_url="https://smba.trafficmanager.net/amer/",
conversation_id="conv-123",
activity_id="activity-1",
)
await ch.send(OutboundMessage(channel="msteams", chat_id="conv-123", content="Reply text"))
assert len(fake_http.calls) == 1
url, kwargs = fake_http.calls[0]
assert url == "https://smba.trafficmanager.net/amer/v3/conversations/conv-123/activities/activity-1"
assert kwargs["headers"]["Authorization"] == "Bearer tok"
assert kwargs["json"]["text"] == "Reply text"
assert kwargs["json"]["replyToId"] == "activity-1"
@pytest.mark.asyncio
async def test_send_posts_to_conversation_when_thread_reply_disabled(tmp_path, monkeypatch):
monkeypatch.setattr("nanobot.channels.msteams.get_workspace_path", lambda: tmp_path)
bus = DummyBus()
ch = MSTeamsChannel(
{
"enabled": True,
"appId": "app-id",
"appPassword": "secret",
"tenantId": "tenant-id",
"allowFrom": ["*"],
"replyInThread": False,
},
bus,
)
fake_http = FakeHttpClient()
ch._http = fake_http
ch._token = "tok"
ch._token_expires_at = 9999999999
ch._conversation_refs["conv-123"] = ConversationRef(
service_url="https://smba.trafficmanager.net/amer/",
conversation_id="conv-123",
activity_id="activity-1",
)
await ch.send(OutboundMessage(channel="msteams", chat_id="conv-123", content="Reply text"))
assert len(fake_http.calls) == 1
url, kwargs = fake_http.calls[0]
assert url == "https://smba.trafficmanager.net/amer/v3/conversations/conv-123/activities"
assert kwargs["headers"]["Authorization"] == "Bearer tok"
assert kwargs["json"]["text"] == "Reply text"
assert "replyToId" not in kwargs["json"]
@pytest.mark.asyncio
async def test_send_posts_to_conversation_when_thread_reply_enabled_but_no_activity_id(tmp_path, monkeypatch):
monkeypatch.setattr("nanobot.channels.msteams.get_workspace_path", lambda: tmp_path)
bus = DummyBus()
ch = MSTeamsChannel(
{
"enabled": True,
"appId": "app-id",
"appPassword": "secret",
"tenantId": "tenant-id",
"allowFrom": ["*"],
"replyInThread": True,
},
bus,
)
fake_http = FakeHttpClient()
ch._http = fake_http
ch._token = "tok"
ch._token_expires_at = 9999999999
ch._conversation_refs["conv-123"] = ConversationRef(
service_url="https://smba.trafficmanager.net/amer/",
conversation_id="conv-123",
activity_id=None,
)
await ch.send(OutboundMessage(channel="msteams", chat_id="conv-123", content="Reply text"))
assert len(fake_http.calls) == 1
url, kwargs = fake_http.calls[0]
assert url == "https://smba.trafficmanager.net/amer/v3/conversations/conv-123/activities"
assert kwargs["headers"]["Authorization"] == "Bearer tok"
assert kwargs["json"]["text"] == "Reply text"
assert "replyToId" not in kwargs["json"]
def _make_test_rsa_jwk(kid: str = "test-kid"):
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key = private_key.public_key()
jwk = json.loads(jwt.algorithms.RSAAlgorithm.to_jwk(public_key))
jwk["kid"] = kid
jwk["use"] = "sig"
jwk["kty"] = "RSA"
jwk["alg"] = "RS256"
return private_key, jwk
@pytest.mark.asyncio
async def test_validate_inbound_auth_accepts_observed_botframework_shape(tmp_path, monkeypatch):
monkeypatch.setattr("nanobot.channels.msteams.get_workspace_path", lambda: tmp_path)
bus = DummyBus()
ch = MSTeamsChannel(
{
"enabled": True,
"appId": "app-id",
"appPassword": "secret",
"tenantId": "tenant-id",
"allowFrom": ["*"],
"validateInboundAuth": True,
},
bus,
)
private_key, jwk = _make_test_rsa_jwk()
ch._botframework_jwks = {"keys": [jwk]}
ch._botframework_jwks_expires_at = 9999999999
service_url = "https://smba.trafficmanager.net/amer/tenant/"
token = jwt.encode(
{
"iss": "https://api.botframework.com",
"aud": "app-id",
"serviceurl": service_url,
"nbf": 1700000000,
"exp": 4100000000,
},
private_key,
algorithm="RS256",
headers={"kid": jwk["kid"]},
)
await ch._validate_inbound_auth(
f"Bearer {token}",
{"serviceUrl": service_url},
)
@pytest.mark.asyncio
async def test_validate_inbound_auth_rejects_service_url_mismatch(tmp_path, monkeypatch):
monkeypatch.setattr("nanobot.channels.msteams.get_workspace_path", lambda: tmp_path)
bus = DummyBus()
ch = MSTeamsChannel(
{
"enabled": True,
"appId": "app-id",
"appPassword": "secret",
"tenantId": "tenant-id",
"allowFrom": ["*"],
"validateInboundAuth": True,
},
bus,
)
private_key, jwk = _make_test_rsa_jwk()
ch._botframework_jwks = {"keys": [jwk]}
ch._botframework_jwks_expires_at = 9999999999
token = jwt.encode(
{
"iss": "https://api.botframework.com",
"aud": "app-id",
"serviceurl": "https://smba.trafficmanager.net/amer/tenant-a/",
"nbf": 1700000000,
"exp": 4100000000,
},
private_key,
algorithm="RS256",
headers={"kid": jwk["kid"]},
)
with pytest.raises(ValueError, match="serviceUrl claim mismatch"):
await ch._validate_inbound_auth(
f"Bearer {token}",
{"serviceUrl": "https://smba.trafficmanager.net/amer/tenant-b/"},
)
@pytest.mark.asyncio
async def test_validate_inbound_auth_rejects_missing_bearer_token(tmp_path, monkeypatch):
monkeypatch.setattr("nanobot.channels.msteams.get_workspace_path", lambda: tmp_path)
bus = DummyBus()
ch = MSTeamsChannel(
{
"enabled": True,
"appId": "app-id",
"appPassword": "secret",
"tenantId": "tenant-id",
"allowFrom": ["*"],
"validateInboundAuth": True,
},
bus,
)
with pytest.raises(ValueError, match="missing bearer token"):
await ch._validate_inbound_auth("", {"serviceUrl": "https://smba.trafficmanager.net/amer/tenant/"})
def test_msteams_default_config_includes_restart_notify_fields():
cfg = MSTeamsChannel.default_config()
assert cfg["restartNotifyEnabled"] is False
assert "restartNotifyPreMessage" in cfg
assert "restartNotifyPostMessage" in cfg
def test_msteams_config_accepts_restart_notify_aliases():
cfg = MSTeamsConfig.model_validate(
{
"restartNotifyEnabled": True,
"restartNotifyPreMessage": "Restarting now.",
"restartNotifyPostMessage": "Back online.",
}
)
assert cfg.restart_notify_enabled is True
assert cfg.restart_notify_pre_message == "Restarting now."
assert cfg.restart_notify_post_message == "Back online."