chengyongru cf224c9701 code-review fixes: fsync, entropy, is_dm propagation, tests
- Add os.fsync with Windows-compatible directory flush in pairing store
- Increase pairing code length from 6 -> 8 characters for higher entropy
- Remove SystemExit on empty allowFrom; empty list now defers to pairing
- Update is_allowed docstring to document pairing fallback semantics
- Propagate is_dm to Matrix (direct rooms) and Slack (im channels)
- Slack _is_allowed now checks pairing store for DM allowlist mode
- Fix /pairing revoke to accept optional channel argument
- Move inline import time to module top-level
- Add WebSocket comment explaining is_dm=True assumption
- Add comprehensive tests for store and BaseChannel pairing integration
- Fix existing tests that expected empty allowFrom to hard-exit

Refs #3774
2026-05-15 10:05:30 +08:00

370 lines
13 KiB
Python

"""Base channel interface for chat platforms."""
from __future__ import annotations
import time
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any
from loguru import logger
from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.pairing import (
approve_code,
deny_code,
generate_code,
is_approved,
list_pending,
revoke,
)
class BaseChannel(ABC):
"""
Abstract base class for chat channel implementations.
Each channel (Telegram, Discord, etc.) should implement this interface
to integrate with the nanobot message bus.
"""
name: str = "base"
display_name: str = "Base"
transcription_provider: str = "groq"
transcription_api_key: str = ""
transcription_api_base: str = ""
transcription_language: str | None = None
send_progress: bool = True
send_tool_hints: bool = False
show_reasoning: bool = True
def __init__(self, config: Any, bus: MessageBus):
"""
Initialize the channel.
Args:
config: Channel-specific configuration.
bus: The message bus for communication.
"""
self.config = config
self.logger = logger.bind(channel=self.name)
self.bus = bus
self._running = False
async def transcribe_audio(self, file_path: str | Path) -> str:
"""Transcribe an audio file via Whisper (OpenAI or Groq). Returns empty string on failure."""
if not self.transcription_api_key:
return ""
try:
if self.transcription_provider == "openai":
from nanobot.providers.transcription import OpenAITranscriptionProvider
provider = OpenAITranscriptionProvider(
api_key=self.transcription_api_key,
api_base=self.transcription_api_base or None,
language=self.transcription_language or None,
)
else:
from nanobot.providers.transcription import GroqTranscriptionProvider
provider = GroqTranscriptionProvider(
api_key=self.transcription_api_key,
api_base=self.transcription_api_base or None,
language=self.transcription_language or None,
)
return await provider.transcribe(file_path)
except Exception:
self.logger.exception("Audio transcription failed")
return ""
async def login(self, force: bool = False) -> bool:
"""
Perform channel-specific interactive login (e.g. QR code scan).
Args:
force: If True, ignore existing credentials and force re-authentication.
Returns True if already authenticated or login succeeds.
Override in subclasses that support interactive login.
"""
return True
@abstractmethod
async def start(self) -> None:
"""
Start the channel and begin listening for messages.
This should be a long-running async task that:
1. Connects to the chat platform
2. Listens for incoming messages
3. Forwards messages to the bus via _handle_message()
"""
pass
@abstractmethod
async def stop(self) -> None:
"""Stop the channel and clean up resources."""
pass
@abstractmethod
async def send(self, msg: OutboundMessage) -> None:
"""
Send a message through this channel.
Args:
msg: The message to send.
Implementations should raise on delivery failure so the channel manager
can apply any retry policy in one place.
"""
pass
async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None:
"""Deliver a streaming text chunk.
Override in subclasses to enable streaming. Implementations should
raise on delivery failure so the channel manager can retry.
Streaming contract: ``_stream_delta`` is a chunk, ``_stream_end`` ends
the current segment, and stateful implementations must key buffers by
``_stream_id`` rather than only by ``chat_id``.
"""
pass
async def send_reasoning_delta(
self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None
) -> None:
"""Stream a chunk of model reasoning/thinking content.
Default is no-op. Channels with a native low-emphasis primitive
(Slack context block, Telegram expandable blockquote, Discord
subtext, WebUI italic bubble, ...) override to render reasoning
as a subordinate trace that updates in place as the model thinks.
Streaming contract mirrors :meth:`send_delta`: ``_reasoning_delta``
is a chunk, ``_reasoning_end`` ends the current reasoning segment,
and stateful implementations should key buffers by ``_stream_id``
rather than only by ``chat_id``.
"""
return
async def send_reasoning_end(
self, chat_id: str, metadata: dict[str, Any] | None = None
) -> None:
"""Mark the end of a reasoning stream segment.
Default is no-op. Channels that buffer ``send_reasoning_delta``
chunks for in-place updates use this signal to flush and freeze
the rendered group; one-shot channels can ignore it entirely.
"""
return
async def send_reasoning(self, msg: OutboundMessage) -> None:
"""Deliver a complete reasoning block.
Default implementation reuses the streaming pair so plugins only
need to override the delta/end methods. Equivalent to one delta
with the full content followed immediately by an end marker —
keeps a single rendering path for both streamed and one-shot
reasoning (e.g. DeepSeek-R1's final-response ``reasoning_content``).
"""
if not msg.content:
return
meta = dict(msg.metadata or {})
meta.setdefault("_reasoning_delta", True)
await self.send_reasoning_delta(msg.chat_id, msg.content, meta)
end_meta = dict(meta)
end_meta.pop("_reasoning_delta", None)
end_meta["_reasoning_end"] = True
await self.send_reasoning_end(msg.chat_id, end_meta)
@property
def supports_streaming(self) -> bool:
"""True when config enables streaming AND this subclass implements send_delta."""
cfg = self.config
streaming = cfg.get("streaming", False) if isinstance(cfg, dict) else getattr(cfg, "streaming", False)
return bool(streaming) and type(self).send_delta is not BaseChannel.send_delta
def is_allowed(self, sender_id: str) -> bool:
"""Check if *sender_id* is permitted.
Priority:
1. ``allowFrom: ["*"]`` → allow all.
2. ``allowFrom`` list → allow if sender_id is present.
3. Pairing store approved list → allow if previously approved.
4. Otherwise deny.
An empty ``allowFrom`` list does not cause a hard exit; instead it
defers to the pairing store so that unknown DM senders can request
access via a pairing code.
"""
if isinstance(self.config, dict):
if "allow_from" in self.config:
allow_list = self.config.get("allow_from")
else:
allow_list = self.config.get("allowFrom", [])
else:
allow_list = getattr(self.config, "allow_from", [])
if "*" in allow_list:
return True
if str(sender_id) in allow_list:
return True
if is_approved(self.name, str(sender_id)):
return True
return False
async def _handle_message(
self,
sender_id: str,
chat_id: str,
content: str,
media: list[str] | None = None,
metadata: dict[str, Any] | None = None,
session_key: str | None = None,
is_dm: bool = False,
) -> None:
"""
Handle an incoming message from the chat platform.
This method checks permissions and forwards to the bus.
For DM messages from unrecognised senders, a pairing code is
issued instead of silently dropping the message.
Args:
sender_id: The sender's identifier.
chat_id: The chat/channel identifier.
content: Message text content.
media: Optional list of media URLs.
metadata: Optional channel-specific metadata.
session_key: Optional session key override (e.g. thread-scoped sessions).
is_dm: Whether the message is a direct / private message.
"""
if not self.is_allowed(sender_id):
if is_dm:
code = generate_code(self.name, str(sender_id))
reply = (
"This assistant requires approval before it can respond.\n"
f"Your pairing code is: `{code}`\n"
f"Ask the owner to run: `nanobot pairing approve {code}`"
)
await self.send(
OutboundMessage(
channel=self.name,
chat_id=str(chat_id),
content=reply,
metadata={"_pairing_code": code},
)
)
self.logger.info(
"Sent pairing code {} to sender {} in chat {}",
code, sender_id, chat_id,
)
else:
self.logger.warning(
"Access denied for sender {}. "
"Add them to allowFrom list in config to grant access.",
sender_id,
)
return
# Intercept /pairing slash commands before they reach the agent loop
if content.strip().startswith("/pairing"):
await self._handle_pairing_command(sender_id, chat_id, content.strip())
return
meta = metadata or {}
if self.supports_streaming:
meta = {**meta, "_wants_stream": True}
msg = InboundMessage(
channel=self.name,
sender_id=str(sender_id),
chat_id=str(chat_id),
content=content,
media=media or [],
metadata=meta,
session_key_override=session_key,
)
await self.bus.publish_inbound(msg)
async def _handle_pairing_command(
self, sender_id: str, chat_id: str, content: str
) -> None:
"""Execute a ``/pairing`` slash command and reply directly to the user."""
parts = content.split()
sub = parts[1] if len(parts) > 1 else "list"
arg = parts[2] if len(parts) > 2 else None
if sub in ("list",):
pending = list_pending()
if not pending:
reply = "No pending pairing requests."
else:
lines = ["Pending pairing requests:"]
for item in pending:
remaining = int(item.get("expires_at", 0) - time.time())
expiry = f"{remaining}s" if remaining > 0 else "expired"
lines.append(
f"- `{item['code']}` | {item['channel']} | {item['sender_id']} | {expiry}"
)
reply = "\n".join(lines)
elif sub == "approve":
if arg is None:
reply = "Usage: `/pairing approve <code>`"
else:
result = approve_code(arg)
if result is None:
reply = f"Invalid or expired pairing code: `{arg}`"
else:
channel, sid = result
reply = (
f"Approved pairing code `{arg}` — "
f"{sid} can now access {channel}"
)
elif sub == "deny":
if arg is None:
reply = "Usage: `/pairing deny <code>`"
else:
if deny_code(arg):
reply = f"Denied pairing code `{arg}`"
else:
reply = f"Pairing code `{arg}` not found or already expired"
elif sub == "revoke":
if arg is None:
reply = "Usage: `/pairing revoke <user_id>` or `/pairing revoke <channel> <user_id>`"
else:
target_channel = parts[3] if len(parts) > 3 else self.name
target_user = arg if len(parts) <= 3 else parts[3]
if revoke(target_channel, target_user):
reply = f"Revoked {target_user} from {target_channel}"
else:
reply = f"{target_user} was not in the approved list for {target_channel}"
else:
reply = (
"Unknown pairing command.\n"
"Usage: `/pairing [list|approve <code>|deny <code>|revoke <user_id>]`"
)
await self.send(
OutboundMessage(
channel=self.name,
chat_id=str(chat_id),
content=reply,
metadata={"_pairing_command": True},
)
)
@classmethod
def default_config(cls) -> dict[str, Any]:
"""Return default config for onboard. Override in plugins to auto-populate config.json."""
return {"enabled": False}
@property
def is_running(self) -> bool:
"""Check if the channel is running."""
return self._running