mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-05-20 08:32:25 +00:00
refactor(pairing): move /pairing from BaseChannel to CommandRouter
/pairing is now a first-class built-in command dispatched through CommandRouter, just like /status, /model, /dream, etc. Benefits: - WebUI automatically shows /pairing in the slash command palette (because builtin_command_palette() feeds /api/commands). - All channels (Telegram, Discord, WebSocket, etc.) use the same dispatch path for /pairing; no more channel-level interception. - The command still only works for already-authorised users because is_allowed() gates message ingestion before the bus. Changes: - Add handle_pairing_command() to nanobot.pairing.store — pure function callable from CLI, CommandRouter, and tests. - Add cmd_pairing to nanobot.command.builtin and register in BUILTIN_COMMAND_SPECS + register_builtin_commands(). - Remove BaseChannel._handle_pairing_command() and the /pairing interception logic from _handle_message(). - Clean up unused pairing imports from base.py. - Add unit tests for handle_pairing_command and cmd_pairing dispatch.
This commit is contained in:
parent
f3cae85bb1
commit
f9d404618b
@ -11,14 +11,9 @@ from loguru import logger
|
|||||||
from nanobot.bus.events import InboundMessage, OutboundMessage
|
from nanobot.bus.events import InboundMessage, OutboundMessage
|
||||||
from nanobot.bus.queue import MessageBus
|
from nanobot.bus.queue import MessageBus
|
||||||
from nanobot.pairing import (
|
from nanobot.pairing import (
|
||||||
approve_code,
|
|
||||||
deny_code,
|
|
||||||
format_expiry,
|
|
||||||
format_pairing_reply,
|
format_pairing_reply,
|
||||||
generate_code,
|
generate_code,
|
||||||
is_approved,
|
is_approved,
|
||||||
list_pending,
|
|
||||||
revoke,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -247,12 +242,6 @@ class BaseChannel(ABC):
|
|||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
# Intercept /pairing slash commands before they reach the agent loop
|
|
||||||
parts = content.strip().split(None, 1)
|
|
||||||
if parts and parts[0] == "/pairing":
|
|
||||||
await self._handle_pairing_command(sender_id, chat_id, parts[1] if len(parts) > 1 else "")
|
|
||||||
return
|
|
||||||
|
|
||||||
meta = metadata or {}
|
meta = metadata or {}
|
||||||
if self.supports_streaming:
|
if self.supports_streaming:
|
||||||
meta = {**meta, "_wants_stream": True}
|
meta = {**meta, "_wants_stream": True}
|
||||||
@ -269,83 +258,6 @@ class BaseChannel(ABC):
|
|||||||
|
|
||||||
await self.bus.publish_inbound(msg)
|
await self.bus.publish_inbound(msg)
|
||||||
|
|
||||||
async def _handle_pairing_command(
|
|
||||||
self, sender_id: str, chat_id: str, subcommand_text: str
|
|
||||||
) -> None:
|
|
||||||
"""Execute a ``/pairing`` slash command and reply directly to the user."""
|
|
||||||
parts = subcommand_text.split()
|
|
||||||
sub = parts[0] if parts else "list"
|
|
||||||
arg = parts[1] if len(parts) > 1 else None
|
|
||||||
|
|
||||||
if sub in ("list",):
|
|
||||||
pending = list_pending()
|
|
||||||
if not pending:
|
|
||||||
reply = "No pending pairing requests."
|
|
||||||
else:
|
|
||||||
lines = ["Pending pairing requests:"]
|
|
||||||
for item in pending:
|
|
||||||
expiry = format_expiry(item.get("expires_at", 0))
|
|
||||||
lines.append(
|
|
||||||
f"- `{item['code']}` | {item['channel']} | {item['sender_id']} | {expiry}"
|
|
||||||
)
|
|
||||||
reply = "\n".join(lines)
|
|
||||||
|
|
||||||
elif sub == "approve":
|
|
||||||
if arg is None:
|
|
||||||
reply = "Usage: `/pairing approve <code>`"
|
|
||||||
else:
|
|
||||||
result = approve_code(arg)
|
|
||||||
if result is None:
|
|
||||||
reply = f"Invalid or expired pairing code: `{arg}`"
|
|
||||||
else:
|
|
||||||
channel, sid = result
|
|
||||||
reply = (
|
|
||||||
f"Approved pairing code `{arg}` — "
|
|
||||||
f"{sid} can now access {channel}"
|
|
||||||
)
|
|
||||||
|
|
||||||
elif sub == "deny":
|
|
||||||
if arg is None:
|
|
||||||
reply = "Usage: `/pairing deny <code>`"
|
|
||||||
else:
|
|
||||||
if deny_code(arg):
|
|
||||||
reply = f"Denied pairing code `{arg}`"
|
|
||||||
else:
|
|
||||||
reply = f"Pairing code `{arg}` not found or already expired"
|
|
||||||
|
|
||||||
elif sub == "revoke":
|
|
||||||
if arg is None:
|
|
||||||
reply = "Usage: `/pairing revoke <user_id>` or `/pairing revoke <channel> <user_id>`"
|
|
||||||
elif len(parts) == 2:
|
|
||||||
reply = (
|
|
||||||
f"Revoked {arg} from {self.name}"
|
|
||||||
if revoke(self.name, arg)
|
|
||||||
else f"{arg} was not in the approved list for {self.name}"
|
|
||||||
)
|
|
||||||
elif len(parts) == 3:
|
|
||||||
reply = (
|
|
||||||
f"Revoked {parts[2]} from {arg}"
|
|
||||||
if revoke(arg, parts[2])
|
|
||||||
else f"{parts[2]} was not in the approved list for {arg}"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
reply = "Usage: `/pairing revoke <user_id>` or `/pairing revoke <channel> <user_id>`"
|
|
||||||
|
|
||||||
else:
|
|
||||||
reply = (
|
|
||||||
"Unknown pairing command.\n"
|
|
||||||
"Usage: `/pairing [list|approve <code>|deny <code>|revoke <user_id>]`"
|
|
||||||
)
|
|
||||||
|
|
||||||
await self.send(
|
|
||||||
OutboundMessage(
|
|
||||||
channel=self.name,
|
|
||||||
chat_id=str(chat_id),
|
|
||||||
content=reply,
|
|
||||||
metadata={"_pairing_command": True},
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def default_config(cls) -> dict[str, Any]:
|
def default_config(cls) -> dict[str, Any]:
|
||||||
"""Return default config for onboard. Override in plugins to auto-populate config.json."""
|
"""Return default config for onboard. Override in plugins to auto-populate config.json."""
|
||||||
|
|||||||
@ -96,6 +96,13 @@ BUILTIN_COMMAND_SPECS: tuple[BuiltinCommandSpec, ...] = (
|
|||||||
"List available slash commands.",
|
"List available slash commands.",
|
||||||
"circle-help",
|
"circle-help",
|
||||||
),
|
),
|
||||||
|
BuiltinCommandSpec(
|
||||||
|
"/pairing",
|
||||||
|
"Manage pairing",
|
||||||
|
"List, approve, deny or revoke pairing requests.",
|
||||||
|
"shield",
|
||||||
|
"[list|approve <code>|deny <code>|revoke <user_id>]",
|
||||||
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -539,6 +546,19 @@ async def cmd_history(ctx: CommandContext) -> OutboundMessage:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def cmd_pairing(ctx: CommandContext) -> OutboundMessage:
|
||||||
|
"""List, approve, deny or revoke pairing requests."""
|
||||||
|
from nanobot.pairing import handle_pairing_command
|
||||||
|
|
||||||
|
reply = handle_pairing_command(ctx.msg.channel, ctx.args)
|
||||||
|
return OutboundMessage(
|
||||||
|
channel=ctx.msg.channel,
|
||||||
|
chat_id=ctx.msg.chat_id,
|
||||||
|
content=reply,
|
||||||
|
metadata={"_pairing_command": True},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def cmd_help(ctx: CommandContext) -> OutboundMessage:
|
async def cmd_help(ctx: CommandContext) -> OutboundMessage:
|
||||||
"""Return available slash commands."""
|
"""Return available slash commands."""
|
||||||
return OutboundMessage(
|
return OutboundMessage(
|
||||||
@ -577,3 +597,5 @@ def register_builtin_commands(router: CommandRouter) -> None:
|
|||||||
router.exact("/dream-restore", cmd_dream_restore)
|
router.exact("/dream-restore", cmd_dream_restore)
|
||||||
router.prefix("/dream-restore ", cmd_dream_restore)
|
router.prefix("/dream-restore ", cmd_dream_restore)
|
||||||
router.exact("/help", cmd_help)
|
router.exact("/help", cmd_help)
|
||||||
|
router.exact("/pairing", cmd_pairing)
|
||||||
|
router.prefix("/pairing ", cmd_pairing)
|
||||||
|
|||||||
@ -7,6 +7,7 @@ from nanobot.pairing.store import (
|
|||||||
format_pairing_reply,
|
format_pairing_reply,
|
||||||
generate_code,
|
generate_code,
|
||||||
get_approved,
|
get_approved,
|
||||||
|
handle_pairing_command,
|
||||||
is_approved,
|
is_approved,
|
||||||
list_pending,
|
list_pending,
|
||||||
revoke,
|
revoke,
|
||||||
@ -19,6 +20,7 @@ __all__ = [
|
|||||||
"format_pairing_reply",
|
"format_pairing_reply",
|
||||||
"generate_code",
|
"generate_code",
|
||||||
"get_approved",
|
"get_approved",
|
||||||
|
"handle_pairing_command",
|
||||||
"is_approved",
|
"is_approved",
|
||||||
"list_pending",
|
"list_pending",
|
||||||
"revoke",
|
"revoke",
|
||||||
|
|||||||
@ -189,3 +189,65 @@ def format_expiry(expires_at: float) -> str:
|
|||||||
"""Return a human-readable expiry string (e.g. ``"120s"`` or ``"expired"``)."""
|
"""Return a human-readable expiry string (e.g. ``"120s"`` or ``"expired"``)."""
|
||||||
remaining = int(expires_at - time.time())
|
remaining = int(expires_at - time.time())
|
||||||
return f"{remaining}s" if remaining > 0 else "expired"
|
return f"{remaining}s" if remaining > 0 else "expired"
|
||||||
|
|
||||||
|
|
||||||
|
def handle_pairing_command(channel: str, subcommand_text: str) -> str:
|
||||||
|
"""Execute a pairing subcommand and return the reply text.
|
||||||
|
|
||||||
|
This is a pure function (no side effects other than store mutations)
|
||||||
|
so it can be used from both the CLI and the agent CommandRouter.
|
||||||
|
"""
|
||||||
|
parts = subcommand_text.split()
|
||||||
|
sub = parts[0] if parts else "list"
|
||||||
|
arg = parts[1] if len(parts) > 1 else None
|
||||||
|
|
||||||
|
if sub in ("list",):
|
||||||
|
pending = list_pending()
|
||||||
|
if not pending:
|
||||||
|
return "No pending pairing requests."
|
||||||
|
lines = ["Pending pairing requests:"]
|
||||||
|
for item in pending:
|
||||||
|
expiry = format_expiry(item.get("expires_at", 0))
|
||||||
|
lines.append(
|
||||||
|
f"- `{item['code']}` | {item['channel']} | {item['sender_id']} | {expiry}"
|
||||||
|
)
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
elif sub == "approve":
|
||||||
|
if arg is None:
|
||||||
|
return "Usage: `/pairing approve <code>`"
|
||||||
|
result = approve_code(arg)
|
||||||
|
if result is None:
|
||||||
|
return f"Invalid or expired pairing code: `{arg}`"
|
||||||
|
ch, sid = result
|
||||||
|
return f"Approved pairing code `{arg}` — {sid} can now access {ch}"
|
||||||
|
|
||||||
|
elif sub == "deny":
|
||||||
|
if arg is None:
|
||||||
|
return "Usage: `/pairing deny <code>`"
|
||||||
|
if deny_code(arg):
|
||||||
|
return f"Denied pairing code `{arg}`"
|
||||||
|
return f"Pairing code `{arg}` not found or already expired"
|
||||||
|
|
||||||
|
elif sub == "revoke":
|
||||||
|
if arg is None:
|
||||||
|
return "Usage: `/pairing revoke <user_id>` or `/pairing revoke <channel> <user_id>`"
|
||||||
|
elif len(parts) == 2:
|
||||||
|
return (
|
||||||
|
f"Revoked {arg} from {channel}"
|
||||||
|
if revoke(channel, arg)
|
||||||
|
else f"{arg} was not in the approved list for {channel}"
|
||||||
|
)
|
||||||
|
elif len(parts) == 3:
|
||||||
|
return (
|
||||||
|
f"Revoked {parts[2]} from {arg}"
|
||||||
|
if revoke(arg, parts[2])
|
||||||
|
else f"{parts[2]} was not in the approved list for {arg}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
return "Usage: `/pairing revoke <user_id>` or `/pairing revoke <channel> <user_id>`"
|
||||||
|
|
||||||
|
return (
|
||||||
|
"Unknown pairing command.\n"
|
||||||
|
"Usage: `/pairing [list|approve <code>|deny <code>|revoke <user_id>]`"
|
||||||
|
)
|
||||||
|
|||||||
@ -85,51 +85,3 @@ async def test_handle_message_group_ignores_unknown() -> None:
|
|||||||
|
|
||||||
assert channel._sent == []
|
assert channel._sent == []
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_handle_pairing_command_list(monkeypatch) -> None:
|
|
||||||
channel = _DummyChannel({"allowFrom": ["owner"]}, MessageBus())
|
|
||||||
monkeypatch.setattr(
|
|
||||||
"nanobot.channels.base.list_pending",
|
|
||||||
lambda: [
|
|
||||||
{
|
|
||||||
"code": "ABCD-EFGH",
|
|
||||||
"channel": "dummy",
|
|
||||||
"sender_id": "123",
|
|
||||||
"expires_at": 9999999999,
|
|
||||||
}
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
await channel._handle_pairing_command("owner", "chat1", "list")
|
|
||||||
|
|
||||||
assert len(channel._sent) == 1
|
|
||||||
assert "ABCD-EFGH" in channel._sent[0].content
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_handle_pairing_command_approve(monkeypatch) -> None:
|
|
||||||
channel = _DummyChannel({"allowFrom": ["owner"]}, MessageBus())
|
|
||||||
monkeypatch.setattr(
|
|
||||||
"nanobot.channels.base.approve_code",
|
|
||||||
lambda code: ("dummy", "123") if code == "ABCD-EFGH" else None,
|
|
||||||
)
|
|
||||||
|
|
||||||
await channel._handle_pairing_command("owner", "chat1", "approve ABCD-EFGH")
|
|
||||||
|
|
||||||
assert len(channel._sent) == 1
|
|
||||||
assert "Approved" in channel._sent[0].content
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_handle_pairing_command_revoke(monkeypatch) -> None:
|
|
||||||
channel = _DummyChannel({"allowFrom": ["owner"]}, MessageBus())
|
|
||||||
monkeypatch.setattr(
|
|
||||||
"nanobot.channels.base.revoke",
|
|
||||||
lambda ch, sid: sid == "123",
|
|
||||||
)
|
|
||||||
|
|
||||||
await channel._handle_pairing_command("owner", "chat1", "revoke 123")
|
|
||||||
|
|
||||||
assert len(channel._sent) == 1
|
|
||||||
assert "Revoked" in channel._sent[0].content
|
|
||||||
|
|||||||
@ -26,11 +26,14 @@ class TestIsDispatchableCommand:
|
|||||||
assert router.is_dispatchable_command("/dream")
|
assert router.is_dispatchable_command("/dream")
|
||||||
assert router.is_dispatchable_command("/dream-log")
|
assert router.is_dispatchable_command("/dream-log")
|
||||||
assert router.is_dispatchable_command("/dream-restore")
|
assert router.is_dispatchable_command("/dream-restore")
|
||||||
|
assert router.is_dispatchable_command("/pairing")
|
||||||
|
|
||||||
def test_prefix_commands_match(self, router: CommandRouter) -> None:
|
def test_prefix_commands_match(self, router: CommandRouter) -> None:
|
||||||
assert router.is_dispatchable_command("/dream-log abc123")
|
assert router.is_dispatchable_command("/dream-log abc123")
|
||||||
assert router.is_dispatchable_command("/dream-restore def456")
|
assert router.is_dispatchable_command("/dream-restore def456")
|
||||||
assert router.is_dispatchable_command("/model fast")
|
assert router.is_dispatchable_command("/model fast")
|
||||||
|
assert router.is_dispatchable_command("/pairing list")
|
||||||
|
assert router.is_dispatchable_command("/pairing approve CODE")
|
||||||
|
|
||||||
def test_priority_commands_not_matched(self, router: CommandRouter) -> None:
|
def test_priority_commands_not_matched(self, router: CommandRouter) -> None:
|
||||||
# Priority commands are NOT in the dispatchable tiers — they are
|
# Priority commands are NOT in the dispatchable tiers — they are
|
||||||
@ -46,9 +49,11 @@ class TestIsDispatchableCommand:
|
|||||||
def test_case_insensitive(self, router: CommandRouter) -> None:
|
def test_case_insensitive(self, router: CommandRouter) -> None:
|
||||||
assert router.is_dispatchable_command("/NEW")
|
assert router.is_dispatchable_command("/NEW")
|
||||||
assert router.is_dispatchable_command("/Help")
|
assert router.is_dispatchable_command("/Help")
|
||||||
|
assert router.is_dispatchable_command("/PAIRING")
|
||||||
|
|
||||||
def test_strips_whitespace(self, router: CommandRouter) -> None:
|
def test_strips_whitespace(self, router: CommandRouter) -> None:
|
||||||
assert router.is_dispatchable_command(" /new ")
|
assert router.is_dispatchable_command(" /new ")
|
||||||
|
assert router.is_dispatchable_command(" /pairing list ")
|
||||||
|
|
||||||
def test_unknown_slash_command_not_matched(self, router: CommandRouter) -> None:
|
def test_unknown_slash_command_not_matched(self, router: CommandRouter) -> None:
|
||||||
assert not router.is_dispatchable_command("/unknown")
|
assert not router.is_dispatchable_command("/unknown")
|
||||||
@ -143,3 +148,82 @@ class TestMidTurnCommandDispatchedDirectly:
|
|||||||
)
|
)
|
||||||
result = await router.dispatch(ctx)
|
result = await router.dispatch(ctx)
|
||||||
assert result is None
|
assert result is None
|
||||||
|
|
||||||
|
|
||||||
|
class TestPairingCommandDispatch:
|
||||||
|
"""Verify /pairing works via CommandRouter."""
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def router(self) -> CommandRouter:
|
||||||
|
r = CommandRouter()
|
||||||
|
register_builtin_commands(r)
|
||||||
|
return r
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def fake_msg(self) -> MagicMock:
|
||||||
|
msg = MagicMock()
|
||||||
|
msg.channel = "telegram"
|
||||||
|
msg.chat_id = "chat1"
|
||||||
|
msg.content = "/pairing list"
|
||||||
|
msg.metadata = {}
|
||||||
|
return msg
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_pairing_list_dispatched(
|
||||||
|
self, router: CommandRouter, fake_msg: MagicMock, monkeypatch,
|
||||||
|
) -> None:
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"nanobot.pairing.store.list_pending",
|
||||||
|
lambda: [
|
||||||
|
{
|
||||||
|
"code": "ABCD-EFGH",
|
||||||
|
"channel": "telegram",
|
||||||
|
"sender_id": "123",
|
||||||
|
"expires_at": 9999999999,
|
||||||
|
}
|
||||||
|
],
|
||||||
|
)
|
||||||
|
ctx = CommandContext(
|
||||||
|
msg=fake_msg, session=None,
|
||||||
|
key="telegram:chat1", raw="/pairing list", args="list", loop=MagicMock(),
|
||||||
|
)
|
||||||
|
result = await router.dispatch(ctx)
|
||||||
|
assert result is not None
|
||||||
|
assert "ABCD-EFGH" in result.content
|
||||||
|
assert result.metadata.get("_pairing_command") is True
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_pairing_approve_dispatched(
|
||||||
|
self, router: CommandRouter, fake_msg: MagicMock, monkeypatch,
|
||||||
|
) -> None:
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"nanobot.pairing.store.approve_code",
|
||||||
|
lambda code: ("telegram", "123") if code == "ABCD-EFGH" else None,
|
||||||
|
)
|
||||||
|
fake_msg.content = "/pairing approve ABCD-EFGH"
|
||||||
|
ctx = CommandContext(
|
||||||
|
msg=fake_msg, session=None,
|
||||||
|
key="telegram:chat1", raw="/pairing approve ABCD-EFGH",
|
||||||
|
args="approve ABCD-EFGH", loop=MagicMock(),
|
||||||
|
)
|
||||||
|
result = await router.dispatch(ctx)
|
||||||
|
assert result is not None
|
||||||
|
assert "Approved" in result.content
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_pairing_revoke_dispatched(
|
||||||
|
self, router: CommandRouter, fake_msg: MagicMock, monkeypatch,
|
||||||
|
) -> None:
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"nanobot.pairing.store.revoke",
|
||||||
|
lambda ch, sid: sid == "123",
|
||||||
|
)
|
||||||
|
fake_msg.content = "/pairing revoke 123"
|
||||||
|
ctx = CommandContext(
|
||||||
|
msg=fake_msg, session=None,
|
||||||
|
key="telegram:chat1", raw="/pairing revoke 123",
|
||||||
|
args="revoke 123", loop=MagicMock(),
|
||||||
|
)
|
||||||
|
result = await router.dispatch(ctx)
|
||||||
|
assert result is not None
|
||||||
|
assert "Revoked" in result.content
|
||||||
|
|||||||
@ -88,6 +88,76 @@ class TestListPending:
|
|||||||
assert store.list_pending() == []
|
assert store.list_pending() == []
|
||||||
|
|
||||||
|
|
||||||
|
class TestHandlePairingCommand:
|
||||||
|
def test_list_empty(self) -> None:
|
||||||
|
reply = store.handle_pairing_command("telegram", "list")
|
||||||
|
assert reply == "No pending pairing requests."
|
||||||
|
|
||||||
|
def test_list_pending(self) -> None:
|
||||||
|
store.generate_code("telegram", "123")
|
||||||
|
reply = store.handle_pairing_command("telegram", "list")
|
||||||
|
assert "Pending pairing requests:" in reply
|
||||||
|
assert "telegram" in reply
|
||||||
|
assert "123" in reply
|
||||||
|
|
||||||
|
def test_approve(self) -> None:
|
||||||
|
code = store.generate_code("telegram", "123")
|
||||||
|
reply = store.handle_pairing_command("telegram", f"approve {code}")
|
||||||
|
assert "Approved" in reply
|
||||||
|
assert "123" in reply
|
||||||
|
assert store.is_approved("telegram", "123") is True
|
||||||
|
|
||||||
|
def test_approve_invalid(self) -> None:
|
||||||
|
reply = store.handle_pairing_command("telegram", "approve BAD-CODE")
|
||||||
|
assert "Invalid or expired" in reply
|
||||||
|
|
||||||
|
def test_approve_no_arg(self) -> None:
|
||||||
|
reply = store.handle_pairing_command("telegram", "approve")
|
||||||
|
assert "Usage:" in reply
|
||||||
|
|
||||||
|
def test_deny(self) -> None:
|
||||||
|
code = store.generate_code("telegram", "123")
|
||||||
|
reply = store.handle_pairing_command("telegram", f"deny {code}")
|
||||||
|
assert "Denied" in reply
|
||||||
|
assert store.approve_code(code) is None
|
||||||
|
|
||||||
|
def test_deny_unknown(self) -> None:
|
||||||
|
reply = store.handle_pairing_command("telegram", "deny BAD-CODE")
|
||||||
|
assert "not found" in reply
|
||||||
|
|
||||||
|
def test_revoke_current_channel(self) -> None:
|
||||||
|
code = store.generate_code("telegram", "123")
|
||||||
|
store.approve_code(code)
|
||||||
|
reply = store.handle_pairing_command("telegram", "revoke 123")
|
||||||
|
assert "Revoked" in reply
|
||||||
|
assert store.is_approved("telegram", "123") is False
|
||||||
|
|
||||||
|
def test_revoke_other_channel(self) -> None:
|
||||||
|
code = store.generate_code("discord", "456")
|
||||||
|
store.approve_code(code)
|
||||||
|
# Two-arg form: first arg is channel, second is user
|
||||||
|
reply = store.handle_pairing_command("telegram", "revoke discord 456")
|
||||||
|
assert "Revoked" in reply
|
||||||
|
assert store.is_approved("discord", "456") is False
|
||||||
|
|
||||||
|
def test_revoke_unknown(self) -> None:
|
||||||
|
reply = store.handle_pairing_command("telegram", "revoke 999")
|
||||||
|
assert "was not in the approved list" in reply
|
||||||
|
|
||||||
|
def test_revoke_no_arg(self) -> None:
|
||||||
|
reply = store.handle_pairing_command("telegram", "revoke")
|
||||||
|
assert "Usage:" in reply
|
||||||
|
|
||||||
|
def test_unknown_subcommand(self) -> None:
|
||||||
|
reply = store.handle_pairing_command("telegram", "foo")
|
||||||
|
assert "Unknown pairing command" in reply
|
||||||
|
|
||||||
|
def test_default_to_list(self) -> None:
|
||||||
|
store.generate_code("telegram", "123")
|
||||||
|
reply = store.handle_pairing_command("telegram", "")
|
||||||
|
assert "Pending pairing requests:" in reply
|
||||||
|
|
||||||
|
|
||||||
class TestStoreDurability:
|
class TestStoreDurability:
|
||||||
def test_corruption_recovery(self, tmp_path, monkeypatch) -> None:
|
def test_corruption_recovery(self, tmp_path, monkeypatch) -> None:
|
||||||
path = tmp_path / "pairing.json"
|
path = tmp_path / "pairing.json"
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user