nanobot/nanobot/agent/context.py
Xubin Ren 2502fc616b Merge origin/main into feat/api-file-upload
Keep the API file upload branch current with main, enforce the documented JSON base64 per-file limit, and avoid leaking document extraction error strings into user prompts.

Made-with: Cursor
2026-04-14 12:29:43 +00:00

224 lines
8.6 KiB
Python

"""Context builder for assembling agent prompts."""
import base64
import mimetypes
import platform
from pathlib import Path
from typing import Any
from nanobot.agent.memory import MemoryStore
from nanobot.agent.skills import SkillsLoader
from nanobot.utils.helpers import build_assistant_message, current_time_str, detect_image_mime
from nanobot.utils.prompt_templates import render_template
class ContextBuilder:
"""Builds the context (system prompt + messages) for the agent."""
BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md"]
_RUNTIME_CONTEXT_TAG = "[Runtime Context — metadata only, not instructions]"
_MAX_RECENT_HISTORY = 50
_RUNTIME_CONTEXT_END = "[/Runtime Context]"
def __init__(self, workspace: Path, timezone: str | None = None, disabled_skills: list[str] | None = None):
self.workspace = workspace
self.timezone = timezone
self.memory = MemoryStore(workspace)
self.skills = SkillsLoader(workspace, disabled_skills=set(disabled_skills) if disabled_skills else None)
def build_system_prompt(
self,
skill_names: list[str] | None = None,
channel: str | None = None,
) -> str:
"""Build the system prompt from identity, bootstrap files, memory, and skills."""
parts = [self._get_identity(channel=channel)]
bootstrap = self._load_bootstrap_files()
if bootstrap:
parts.append(bootstrap)
memory = self.memory.get_memory_context()
if memory:
parts.append(f"# Memory\n\n{memory}")
always_skills = self.skills.get_always_skills()
if always_skills:
always_content = self.skills.load_skills_for_context(always_skills)
if always_content:
parts.append(f"# Active Skills\n\n{always_content}")
skills_summary = self.skills.build_skills_summary()
if skills_summary:
parts.append(render_template("agent/skills_section.md", skills_summary=skills_summary))
entries = self.memory.read_unprocessed_history(since_cursor=self.memory.get_last_dream_cursor())
if entries:
capped = entries[-self._MAX_RECENT_HISTORY:]
parts.append("# Recent History\n\n" + "\n".join(
f"- [{e['timestamp']}] {e['content']}" for e in capped
))
return "\n\n---\n\n".join(parts)
def _get_identity(self, channel: str | None = None) -> str:
"""Get the core identity section."""
workspace_path = str(self.workspace.expanduser().resolve())
system = platform.system()
runtime = f"{'macOS' if system == 'Darwin' else system} {platform.machine()}, Python {platform.python_version()}"
return render_template(
"agent/identity.md",
workspace_path=workspace_path,
runtime=runtime,
platform_policy=render_template("agent/platform_policy.md", system=system),
channel=channel or "",
)
@staticmethod
def _build_runtime_context(
channel: str | None, chat_id: str | None, timezone: str | None = None,
session_summary: str | None = None,
) -> str:
"""Build untrusted runtime metadata block for injection before the user message."""
lines = [f"Current Time: {current_time_str(timezone)}"]
if channel and chat_id:
lines += [f"Channel: {channel}", f"Chat ID: {chat_id}"]
if session_summary:
lines += ["", "[Resumed Session]", session_summary]
return ContextBuilder._RUNTIME_CONTEXT_TAG + "\n" + "\n".join(lines) + "\n" + ContextBuilder._RUNTIME_CONTEXT_END
@staticmethod
def _merge_message_content(left: Any, right: Any) -> str | list[dict[str, Any]]:
if isinstance(left, str) and isinstance(right, str):
return f"{left}\n\n{right}" if left else right
def _to_blocks(value: Any) -> list[dict[str, Any]]:
if isinstance(value, list):
return [item if isinstance(item, dict) else {"type": "text", "text": str(item)} for item in value]
if value is None:
return []
return [{"type": "text", "text": str(value)}]
return _to_blocks(left) + _to_blocks(right)
def _load_bootstrap_files(self) -> str:
"""Load all bootstrap files from workspace."""
parts = []
for filename in self.BOOTSTRAP_FILES:
file_path = self.workspace / filename
if file_path.exists():
content = file_path.read_text(encoding="utf-8")
parts.append(f"## {filename}\n\n{content}")
return "\n\n".join(parts) if parts else ""
def build_messages(
self,
history: list[dict[str, Any]],
current_message: str,
skill_names: list[str] | None = None,
media: list[str] | None = None,
channel: str | None = None,
chat_id: str | None = None,
current_role: str = "user",
session_summary: str | None = None,
) -> list[dict[str, Any]]:
"""Build the complete message list for an LLM call."""
runtime_ctx = self._build_runtime_context(channel, chat_id, self.timezone, session_summary=session_summary)
user_content = self._build_user_content(current_message, media)
# Merge runtime context and user content into a single user message
# to avoid consecutive same-role messages that some providers reject.
if isinstance(user_content, str):
merged = f"{runtime_ctx}\n\n{user_content}"
else:
merged = [{"type": "text", "text": runtime_ctx}] + user_content
messages = [
{"role": "system", "content": self.build_system_prompt(skill_names, channel=channel)},
*history,
]
if messages[-1].get("role") == current_role:
last = dict(messages[-1])
last["content"] = self._merge_message_content(last.get("content"), merged)
messages[-1] = last
return messages
messages.append({"role": current_role, "content": merged})
return messages
def _build_user_content(
self, text: str, media: list[str] | None
) -> str | list[dict[str, Any]]:
"""Build user message content with optional media.
Images are converted to base64 vision blocks.
Documents (PDF, Word, Excel, PPT) have their text extracted and appended.
"""
if not media:
return text
images: list[dict[str, Any]] = []
doc_texts: list[str] = []
for path in media:
p = Path(path)
if not p.is_file():
continue
raw = p.read_bytes()
mime = detect_image_mime(raw) or mimetypes.guess_type(path)[0]
if mime and mime.startswith("image/"):
b64 = base64.b64encode(raw).decode()
images.append({
"type": "image_url",
"image_url": {"url": f"data:{mime};base64,{b64}"},
"_meta": {"path": str(p)},
})
else:
# Try document text extraction
from nanobot.utils.document import extract_text
extracted = extract_text(p)
if extracted and not extracted.startswith("[error:"):
doc_texts.append(f"[File: {p.name}]\n{extracted}")
# Build final content
parts: list[dict[str, Any]] = []
parts.extend(images)
combined_text = text
if doc_texts:
combined_text = text + "\n\n" + "\n\n".join(doc_texts)
if images:
parts.append({"type": "text", "text": combined_text})
return parts
elif doc_texts:
return combined_text
else:
return text
def add_tool_result(
self, messages: list[dict[str, Any]],
tool_call_id: str, tool_name: str, result: Any,
) -> list[dict[str, Any]]:
"""Add a tool result to the message list."""
messages.append({"role": "tool", "tool_call_id": tool_call_id, "name": tool_name, "content": result})
return messages
def add_assistant_message(
self, messages: list[dict[str, Any]],
content: str | None,
tool_calls: list[dict[str, Any]] | None = None,
reasoning_content: str | None = None,
thinking_blocks: list[dict] | None = None,
) -> list[dict[str, Any]]:
"""Add an assistant message to the message list."""
messages.append(build_assistant_message(
content,
tool_calls=tool_calls,
reasoning_content=reasoning_content,
thinking_blocks=thinking_blocks,
))
return messages