mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-04-04 18:32:44 +00:00
* fix(telegram): support commands with bot username suffix in groups * fix(command): preserve metadata in builtin command responses
161 lines
5.3 KiB
Python
161 lines
5.3 KiB
Python
"""Built-in slash command handlers."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
|
|
from nanobot import __version__
|
|
from nanobot.bus.events import OutboundMessage
|
|
from nanobot.command.router import CommandContext, CommandRouter
|
|
from nanobot.utils.helpers import build_status_content
|
|
|
|
|
|
async def cmd_stop(ctx: CommandContext) -> OutboundMessage:
|
|
"""Cancel all active tasks and subagents for the session."""
|
|
loop = ctx.loop
|
|
msg = ctx.msg
|
|
tasks = loop._active_tasks.pop(msg.session_key, [])
|
|
cancelled = sum(1 for t in tasks if not t.done() and t.cancel())
|
|
for t in tasks:
|
|
try:
|
|
await t
|
|
except (asyncio.CancelledError, Exception):
|
|
pass
|
|
sub_cancelled = await loop.subagents.cancel_by_session(msg.session_key)
|
|
total = cancelled + sub_cancelled
|
|
content = f"Stopped {total} task(s)." if total else "No active task to stop."
|
|
return OutboundMessage(
|
|
channel=msg.channel, chat_id=msg.chat_id, content=content,
|
|
metadata=dict(msg.metadata or {})
|
|
)
|
|
|
|
|
|
async def cmd_restart(ctx: CommandContext) -> OutboundMessage:
|
|
"""Restart the process in-place via os.execv."""
|
|
msg = ctx.msg
|
|
|
|
async def _do_restart():
|
|
await asyncio.sleep(1)
|
|
os.execv(sys.executable, [sys.executable, "-m", "nanobot"] + sys.argv[1:])
|
|
|
|
asyncio.create_task(_do_restart())
|
|
return OutboundMessage(
|
|
channel=msg.channel, chat_id=msg.chat_id, content="Restarting...",
|
|
metadata=dict(msg.metadata or {})
|
|
)
|
|
|
|
|
|
async def cmd_status(ctx: CommandContext) -> OutboundMessage:
|
|
"""Build an outbound status message for a session."""
|
|
loop = ctx.loop
|
|
session = ctx.session or loop.sessions.get_or_create(ctx.key)
|
|
ctx_est = 0
|
|
try:
|
|
ctx_est, _ = loop.consolidator.estimate_session_prompt_tokens(session)
|
|
except Exception:
|
|
pass
|
|
if ctx_est <= 0:
|
|
ctx_est = loop._last_usage.get("prompt_tokens", 0)
|
|
return OutboundMessage(
|
|
channel=ctx.msg.channel,
|
|
chat_id=ctx.msg.chat_id,
|
|
content=build_status_content(
|
|
version=__version__, model=loop.model,
|
|
start_time=loop._start_time, last_usage=loop._last_usage,
|
|
context_window_tokens=loop.context_window_tokens,
|
|
session_msg_count=len(session.get_history(max_messages=0)),
|
|
context_tokens_estimate=ctx_est,
|
|
),
|
|
metadata={**dict(ctx.msg.metadata or {}), "render_as": "text"},
|
|
)
|
|
|
|
|
|
async def cmd_new(ctx: CommandContext) -> OutboundMessage:
|
|
"""Start a fresh session."""
|
|
loop = ctx.loop
|
|
session = ctx.session or loop.sessions.get_or_create(ctx.key)
|
|
snapshot = session.messages[session.last_consolidated:]
|
|
session.clear()
|
|
loop.sessions.save(session)
|
|
loop.sessions.invalidate(session.key)
|
|
if snapshot:
|
|
loop._schedule_background(loop.consolidator.archive(snapshot))
|
|
return OutboundMessage(
|
|
channel=ctx.msg.channel, chat_id=ctx.msg.chat_id,
|
|
content="New session started.",
|
|
metadata=dict(ctx.msg.metadata or {})
|
|
)
|
|
|
|
|
|
async def cmd_dream(ctx: CommandContext) -> OutboundMessage:
|
|
"""Manually trigger a Dream consolidation run."""
|
|
loop = ctx.loop
|
|
try:
|
|
did_work = await loop.dream.run()
|
|
content = "Dream completed." if did_work else "Dream: nothing to process."
|
|
except Exception as e:
|
|
content = f"Dream failed: {e}"
|
|
return OutboundMessage(
|
|
channel=ctx.msg.channel, chat_id=ctx.msg.chat_id, content=content,
|
|
)
|
|
|
|
|
|
async def cmd_dream_log(ctx: CommandContext) -> OutboundMessage:
|
|
"""Show the Dream consolidation log."""
|
|
loop = ctx.loop
|
|
store = loop.consolidator.store
|
|
log = store.read_dream_log()
|
|
if not log:
|
|
# Check if Dream has ever processed anything
|
|
if store.get_last_dream_cursor() == 0:
|
|
content = "Dream has not run yet."
|
|
else:
|
|
content = "No dream log yet."
|
|
else:
|
|
content = f"## Dream Log\n\n{log}"
|
|
return OutboundMessage(
|
|
channel=ctx.msg.channel,
|
|
chat_id=ctx.msg.chat_id,
|
|
content=content,
|
|
metadata={"render_as": "text"},
|
|
)
|
|
|
|
|
|
async def cmd_help(ctx: CommandContext) -> OutboundMessage:
|
|
"""Return available slash commands."""
|
|
return OutboundMessage(
|
|
channel=ctx.msg.channel,
|
|
chat_id=ctx.msg.chat_id,
|
|
content=build_help_text(),
|
|
metadata={**dict(ctx.msg.metadata or {}), "render_as": "text"},
|
|
)
|
|
|
|
|
|
def build_help_text() -> str:
|
|
"""Build canonical help text shared across channels."""
|
|
lines = [
|
|
"🐈 nanobot commands:",
|
|
"/new — Start a new conversation",
|
|
"/stop — Stop the current task",
|
|
"/restart — Restart the bot",
|
|
"/status — Show bot status",
|
|
"/dream — Manually trigger Dream consolidation",
|
|
"/dream-log — Show Dream consolidation log",
|
|
"/help — Show available commands",
|
|
]
|
|
return "\n".join(lines)
|
|
|
|
|
|
def register_builtin_commands(router: CommandRouter) -> None:
|
|
"""Register the default set of slash commands."""
|
|
router.priority("/stop", cmd_stop)
|
|
router.priority("/restart", cmd_restart)
|
|
router.priority("/status", cmd_status)
|
|
router.exact("/new", cmd_new)
|
|
router.exact("/status", cmd_status)
|
|
router.exact("/dream", cmd_dream)
|
|
router.exact("/dream-log", cmd_dream_log)
|
|
router.exact("/help", cmd_help)
|