feat(mcp): expose MCP resources and prompts as read-only tools

Add MCPResourceWrapper and MCPPromptWrapper classes that expose MCP
server resources and prompts as nanobot tools. Resources are read-only
tools that fetch content by URI, and prompts are read-only tools that
return filled prompt templates with optional arguments.

- MCPResourceWrapper: reads resource content (text and binary) via URI
- MCPPromptWrapper: gets prompt templates with typed arguments
- Both handle timeouts, cancellation, and MCP SDK 1.x error types
- Resources and prompts are registered during server connection
- Gracefully handles servers that don't support resources/prompts
This commit is contained in:
Tim O'Brien 2026-04-06 16:10:18 +00:00 committed by Xubin Ren
parent ce7986e492
commit 7cc527cf65
2 changed files with 457 additions and 4 deletions

View File

@ -135,10 +135,178 @@ class MCPToolWrapper(Tool):
return "\n".join(parts) or "(no output)"
class MCPResourceWrapper(Tool):
"""Wraps an MCP resource URI as a read-only nanobot Tool."""
def __init__(
self, session, server_name: str, resource_def, resource_timeout: int = 30
):
self._session = session
self._uri = resource_def.uri
self._name = f"mcp_{server_name}_resource_{resource_def.name}"
desc = resource_def.description or resource_def.name
self._description = f"[MCP Resource] {desc}\nURI: {self._uri}"
self._parameters: dict[str, Any] = {
"type": "object",
"properties": {},
"required": [],
}
self._resource_timeout = resource_timeout
@property
def name(self) -> str:
return self._name
@property
def description(self) -> str:
return self._description
@property
def parameters(self) -> dict[str, Any]:
return self._parameters
@property
def read_only(self) -> bool:
return True
async def execute(self, **kwargs: Any) -> str:
from mcp import types
try:
result = await asyncio.wait_for(
self._session.read_resource(self._uri),
timeout=self._resource_timeout,
)
except asyncio.TimeoutError:
logger.warning(
"MCP resource '{}' timed out after {}s", self._name, self._resource_timeout
)
return f"(MCP resource read timed out after {self._resource_timeout}s)"
except asyncio.CancelledError:
task = asyncio.current_task()
if task is not None and task.cancelling() > 0:
raise
logger.warning("MCP resource '{}' was cancelled by server/SDK", self._name)
return "(MCP resource read was cancelled)"
except Exception as exc:
logger.exception(
"MCP resource '{}' failed: {}: {}",
self._name,
type(exc).__name__,
exc,
)
return f"(MCP resource read failed: {type(exc).__name__})"
parts: list[str] = []
for block in result.contents:
if isinstance(block, types.TextResourceContents):
parts.append(block.text)
elif isinstance(block, types.BlobResourceContents):
parts.append(f"[Binary resource: {len(block.blob)} bytes]")
else:
parts.append(str(block))
return "\n".join(parts) or "(no output)"
class MCPPromptWrapper(Tool):
"""Wraps an MCP prompt as a read-only nanobot Tool."""
def __init__(
self, session, server_name: str, prompt_def, prompt_timeout: int = 30
):
self._session = session
self._prompt_name = prompt_def.name
self._name = f"mcp_{server_name}_prompt_{prompt_def.name}"
desc = prompt_def.description or prompt_def.name
self._description = (
f"[MCP Prompt] {desc}\n"
"Returns a filled prompt template that can be used as a workflow guide."
)
self._prompt_timeout = prompt_timeout
# Build parameters from prompt arguments
properties: dict[str, Any] = {}
required: list[str] = []
for arg in prompt_def.arguments or []:
properties[arg.name] = {"type": "string"}
if arg.required:
required.append(arg.name)
self._parameters: dict[str, Any] = {
"type": "object",
"properties": properties,
"required": required,
}
@property
def name(self) -> str:
return self._name
@property
def description(self) -> str:
return self._description
@property
def parameters(self) -> dict[str, Any]:
return self._parameters
@property
def read_only(self) -> bool:
return True
async def execute(self, **kwargs: Any) -> str:
from mcp import types
from mcp.shared.exceptions import McpError
try:
result = await asyncio.wait_for(
self._session.get_prompt(self._prompt_name, arguments=kwargs),
timeout=self._prompt_timeout,
)
except asyncio.TimeoutError:
logger.warning(
"MCP prompt '{}' timed out after {}s", self._name, self._prompt_timeout
)
return f"(MCP prompt call timed out after {self._prompt_timeout}s)"
except asyncio.CancelledError:
task = asyncio.current_task()
if task is not None and task.cancelling() > 0:
raise
logger.warning("MCP prompt '{}' was cancelled by server/SDK", self._name)
return "(MCP prompt call was cancelled)"
except McpError as exc:
logger.error(
"MCP prompt '{}' failed: code={} message={}",
self._name, exc.error.code, exc.error.message,
)
return f"(MCP prompt call failed: {exc.error.message} [code {exc.error.code}])"
except Exception as exc:
logger.exception(
"MCP prompt '{}' failed: {}: {}",
self._name, type(exc).__name__, exc,
)
return f"(MCP prompt call failed: {type(exc).__name__}: {exc})"
parts: list[str] = []
for message in result.messages:
content = message.content
# content is a single ContentBlock (not a list) in MCP SDK >= 1.x
if isinstance(content, types.TextContent):
parts.append(content.text)
elif isinstance(content, list):
for block in content:
if isinstance(block, types.TextContent):
parts.append(block.text)
else:
parts.append(str(block))
else:
parts.append(str(content))
return "\n".join(parts) or "(no output)"
async def connect_mcp_servers(
mcp_servers: dict, registry: ToolRegistry, stack: AsyncExitStack
) -> None:
"""Connect to configured MCP servers and register their tools."""
"""Connect to configured MCP servers and register their tools, resources, and prompts."""
from mcp import ClientSession, StdioServerParameters
from mcp.client.sse import sse_client
from mcp.client.stdio import stdio_client
@ -247,6 +415,38 @@ async def connect_mcp_servers(
", ".join(available_wrapped_names) or "(none)",
)
logger.info("MCP server '{}': connected, {} tools registered", name, registered_count)
# --- Register resources ---
try:
resources_result = await session.list_resources()
for resource in resources_result.resources:
wrapper = MCPResourceWrapper(
session, name, resource, resource_timeout=cfg.tool_timeout
)
registry.register(wrapper)
registered_count += 1
logger.debug(
"MCP: registered resource '{}' from server '{}'", wrapper.name, name
)
except Exception as e:
logger.debug("MCP server '{}': resources not supported or failed: {}", name, e)
# --- Register prompts ---
try:
prompts_result = await session.list_prompts()
for prompt in prompts_result.prompts:
wrapper = MCPPromptWrapper(
session, name, prompt, prompt_timeout=cfg.tool_timeout
)
registry.register(wrapper)
registered_count += 1
logger.debug(
"MCP: registered prompt '{}' from server '{}'", wrapper.name, name
)
except Exception as e:
logger.debug("MCP server '{}': prompts not supported or failed: {}", name, e)
logger.info(
"MCP server '{}': connected, {} capabilities registered", name, registered_count
)
except Exception as e:
logger.error("MCP server '{}': failed to connect: {}", name, e)

View File

@ -7,7 +7,12 @@ from types import ModuleType, SimpleNamespace
import pytest
from nanobot.agent.tools.mcp import MCPToolWrapper, connect_mcp_servers
from nanobot.agent.tools.mcp import (
MCPResourceWrapper,
MCPPromptWrapper,
MCPToolWrapper,
connect_mcp_servers,
)
from nanobot.agent.tools.registry import ToolRegistry
from nanobot.config.schema import MCPServerConfig
@ -17,6 +22,16 @@ class _FakeTextContent:
self.text = text
class _FakeTextResourceContents:
def __init__(self, text: str) -> None:
self.text = text
class _FakeBlobResourceContents:
def __init__(self, blob: bytes) -> None:
self.blob = blob
@pytest.fixture
def fake_mcp_runtime() -> dict[str, object | None]:
return {"session": None}
@ -27,7 +42,11 @@ def _fake_mcp_module(
monkeypatch: pytest.MonkeyPatch, fake_mcp_runtime: dict[str, object | None]
) -> None:
mod = ModuleType("mcp")
mod.types = SimpleNamespace(TextContent=_FakeTextContent)
mod.types = SimpleNamespace(
TextContent=_FakeTextContent,
TextResourceContents=_FakeTextResourceContents,
BlobResourceContents=_FakeBlobResourceContents,
)
class _FakeStdioServerParameters:
def __init__(self, command: str, args: list[str], env: dict | None = None) -> None:
@ -343,3 +362,237 @@ async def test_connect_mcp_servers_enabled_tools_warns_on_unknown_entries(
assert "enabledTools entries not found: unknown" in warnings[-1]
assert "Available raw names: demo" in warnings[-1]
assert "Available wrapped names: mcp_test_demo" in warnings[-1]
# ---------------------------------------------------------------------------
# MCPResourceWrapper tests
# ---------------------------------------------------------------------------
def _make_resource_def(
name: str = "myres",
uri: str = "file:///tmp/data.txt",
description: str = "A test resource",
) -> SimpleNamespace:
return SimpleNamespace(name=name, uri=uri, description=description)
def _make_resource_wrapper(
session: object, *, timeout: float = 0.1
) -> MCPResourceWrapper:
return MCPResourceWrapper(session, "srv", _make_resource_def(), resource_timeout=timeout)
def test_resource_wrapper_properties() -> None:
wrapper = MCPResourceWrapper(None, "myserver", _make_resource_def())
assert wrapper.name == "mcp_myserver_resource_myres"
assert "[MCP Resource]" in wrapper.description
assert "A test resource" in wrapper.description
assert "file:///tmp/data.txt" in wrapper.description
assert wrapper.parameters == {"type": "object", "properties": {}, "required": []}
assert wrapper.read_only is True
@pytest.mark.asyncio
async def test_resource_wrapper_execute_returns_text() -> None:
async def read_resource(uri: str) -> object:
assert uri == "file:///tmp/data.txt"
return SimpleNamespace(
contents=[_FakeTextResourceContents("line1"), _FakeTextResourceContents("line2")]
)
wrapper = _make_resource_wrapper(SimpleNamespace(read_resource=read_resource))
result = await wrapper.execute()
assert result == "line1\nline2"
@pytest.mark.asyncio
async def test_resource_wrapper_execute_handles_blob() -> None:
async def read_resource(uri: str) -> object:
return SimpleNamespace(contents=[_FakeBlobResourceContents(b"\x00\x01\x02")])
wrapper = _make_resource_wrapper(SimpleNamespace(read_resource=read_resource))
result = await wrapper.execute()
assert "[Binary resource: 3 bytes]" in result
@pytest.mark.asyncio
async def test_resource_wrapper_execute_handles_timeout() -> None:
async def read_resource(uri: str) -> object:
await asyncio.sleep(1)
return SimpleNamespace(contents=[])
wrapper = _make_resource_wrapper(
SimpleNamespace(read_resource=read_resource), timeout=0.01
)
result = await wrapper.execute()
assert result == "(MCP resource read timed out after 0.01s)"
@pytest.mark.asyncio
async def test_resource_wrapper_execute_handles_error() -> None:
async def read_resource(uri: str) -> object:
raise RuntimeError("boom")
wrapper = _make_resource_wrapper(SimpleNamespace(read_resource=read_resource))
result = await wrapper.execute()
assert result == "(MCP resource read failed: RuntimeError)"
# ---------------------------------------------------------------------------
# MCPPromptWrapper tests
# ---------------------------------------------------------------------------
def _make_prompt_def(
name: str = "myprompt",
description: str = "A test prompt",
arguments: list | None = None,
) -> SimpleNamespace:
return SimpleNamespace(name=name, description=description, arguments=arguments)
def _make_prompt_wrapper(
session: object, *, timeout: float = 0.1
) -> MCPPromptWrapper:
return MCPPromptWrapper(
session, "srv", _make_prompt_def(), prompt_timeout=timeout
)
def test_prompt_wrapper_properties() -> None:
arg1 = SimpleNamespace(name="topic", required=True)
arg2 = SimpleNamespace(name="style", required=False)
wrapper = MCPPromptWrapper(
None, "myserver", _make_prompt_def(arguments=[arg1, arg2])
)
assert wrapper.name == "mcp_myserver_prompt_myprompt"
assert "[MCP Prompt]" in wrapper.description
assert "A test prompt" in wrapper.description
assert "workflow guide" in wrapper.description
assert wrapper.parameters["properties"]["topic"] == {"type": "string"}
assert wrapper.parameters["properties"]["style"] == {"type": "string"}
assert wrapper.parameters["required"] == ["topic"]
assert wrapper.read_only is True
def test_prompt_wrapper_no_arguments() -> None:
wrapper = MCPPromptWrapper(None, "myserver", _make_prompt_def())
assert wrapper.parameters == {"type": "object", "properties": {}, "required": []}
@pytest.mark.asyncio
async def test_prompt_wrapper_execute_returns_text() -> None:
async def get_prompt(name: str, arguments: dict | None = None) -> object:
assert name == "myprompt"
msg1 = SimpleNamespace(
role="user",
content=[_FakeTextContent("You are an expert on {{topic}}.")],
)
msg2 = SimpleNamespace(
role="assistant",
content=[_FakeTextContent("Understood. Ask me anything.")],
)
return SimpleNamespace(messages=[msg1, msg2])
wrapper = _make_prompt_wrapper(SimpleNamespace(get_prompt=get_prompt))
result = await wrapper.execute(topic="AI")
assert "You are an expert on {{topic}}." in result
assert "Understood. Ask me anything." in result
@pytest.mark.asyncio
async def test_prompt_wrapper_execute_handles_timeout() -> None:
async def get_prompt(name: str, arguments: dict | None = None) -> object:
await asyncio.sleep(1)
return SimpleNamespace(messages=[])
wrapper = _make_prompt_wrapper(
SimpleNamespace(get_prompt=get_prompt), timeout=0.01
)
result = await wrapper.execute()
assert result == "(MCP prompt call timed out after 0.01s)"
@pytest.mark.asyncio
async def test_prompt_wrapper_execute_handles_error() -> None:
async def get_prompt(name: str, arguments: dict | None = None) -> object:
raise RuntimeError("boom")
wrapper = _make_prompt_wrapper(SimpleNamespace(get_prompt=get_prompt))
result = await wrapper.execute()
assert result == "(MCP prompt call failed: RuntimeError)"
# ---------------------------------------------------------------------------
# connect_mcp_servers: resources + prompts integration
# ---------------------------------------------------------------------------
def _make_fake_session_with_capabilities(
tool_names: list[str],
resource_names: list[str] | None = None,
prompt_names: list[str] | None = None,
) -> SimpleNamespace:
async def initialize() -> None:
return None
async def list_tools() -> SimpleNamespace:
return SimpleNamespace(tools=[_make_tool_def(name) for name in tool_names])
async def list_resources() -> SimpleNamespace:
resources = []
for rname in resource_names or []:
resources.append(
SimpleNamespace(
name=rname,
uri=f"file:///{rname}",
description=f"{rname} resource",
)
)
return SimpleNamespace(resources=resources)
async def list_prompts() -> SimpleNamespace:
prompts = []
for pname in prompt_names or []:
prompts.append(
SimpleNamespace(
name=pname,
description=f"{pname} prompt",
arguments=None,
)
)
return SimpleNamespace(prompts=prompts)
return SimpleNamespace(
initialize=initialize,
list_tools=list_tools,
list_resources=list_resources,
list_prompts=list_prompts,
)
@pytest.mark.asyncio
async def test_connect_registers_resources_and_prompts(
fake_mcp_runtime: dict[str, object | None],
) -> None:
fake_mcp_runtime["session"] = _make_fake_session_with_capabilities(
tool_names=["tool_a"],
resource_names=["res_b"],
prompt_names=["prompt_c"],
)
registry = ToolRegistry()
stack = AsyncExitStack()
await stack.__aenter__()
try:
await connect_mcp_servers(
{"test": MCPServerConfig(command="fake")},
registry,
stack,
)
finally:
await stack.aclose()
assert "mcp_test_tool_a" in registry.tool_names
assert "mcp_test_resource_res_b" in registry.tool_names
assert "mcp_test_prompt_prompt_c" in registry.tool_names