mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-04-26 12:55:58 +00:00
refactor(agent): streamline hook method calls and enhance error logging
- Introduced a helper method `_for_each_hook_safe` to reduce code duplication in hook method implementations. - Updated error logging to include the method name for better traceability. - Improved the `SkillsLoader` class by adding a new method `_skill_entries_from_dir` to simplify skill listing logic. - Enhanced skill loading and filtering logic, ensuring workspace skills take precedence over built-in ones. - Added comprehensive tests for `SkillsLoader` to validate functionality and edge cases.
This commit is contained in:
parent
bb9da29eff
commit
bcb8352235
@ -67,40 +67,27 @@ class CompositeHook(AgentHook):
|
|||||||
def wants_streaming(self) -> bool:
|
def wants_streaming(self) -> bool:
|
||||||
return any(h.wants_streaming() for h in self._hooks)
|
return any(h.wants_streaming() for h in self._hooks)
|
||||||
|
|
||||||
async def before_iteration(self, context: AgentHookContext) -> None:
|
async def _for_each_hook_safe(self, method_name: str, *args: Any, **kwargs: Any) -> None:
|
||||||
for h in self._hooks:
|
for h in self._hooks:
|
||||||
try:
|
try:
|
||||||
await h.before_iteration(context)
|
await getattr(h, method_name)(*args, **kwargs)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("AgentHook.before_iteration error in {}", type(h).__name__)
|
logger.exception("AgentHook.{} error in {}", method_name, type(h).__name__)
|
||||||
|
|
||||||
|
async def before_iteration(self, context: AgentHookContext) -> None:
|
||||||
|
await self._for_each_hook_safe("before_iteration", context)
|
||||||
|
|
||||||
async def on_stream(self, context: AgentHookContext, delta: str) -> None:
|
async def on_stream(self, context: AgentHookContext, delta: str) -> None:
|
||||||
for h in self._hooks:
|
await self._for_each_hook_safe("on_stream", context, delta)
|
||||||
try:
|
|
||||||
await h.on_stream(context, delta)
|
|
||||||
except Exception:
|
|
||||||
logger.exception("AgentHook.on_stream error in {}", type(h).__name__)
|
|
||||||
|
|
||||||
async def on_stream_end(self, context: AgentHookContext, *, resuming: bool) -> None:
|
async def on_stream_end(self, context: AgentHookContext, *, resuming: bool) -> None:
|
||||||
for h in self._hooks:
|
await self._for_each_hook_safe("on_stream_end", context, resuming=resuming)
|
||||||
try:
|
|
||||||
await h.on_stream_end(context, resuming=resuming)
|
|
||||||
except Exception:
|
|
||||||
logger.exception("AgentHook.on_stream_end error in {}", type(h).__name__)
|
|
||||||
|
|
||||||
async def before_execute_tools(self, context: AgentHookContext) -> None:
|
async def before_execute_tools(self, context: AgentHookContext) -> None:
|
||||||
for h in self._hooks:
|
await self._for_each_hook_safe("before_execute_tools", context)
|
||||||
try:
|
|
||||||
await h.before_execute_tools(context)
|
|
||||||
except Exception:
|
|
||||||
logger.exception("AgentHook.before_execute_tools error in {}", type(h).__name__)
|
|
||||||
|
|
||||||
async def after_iteration(self, context: AgentHookContext) -> None:
|
async def after_iteration(self, context: AgentHookContext) -> None:
|
||||||
for h in self._hooks:
|
await self._for_each_hook_safe("after_iteration", context)
|
||||||
try:
|
|
||||||
await h.after_iteration(context)
|
|
||||||
except Exception:
|
|
||||||
logger.exception("AgentHook.after_iteration error in {}", type(h).__name__)
|
|
||||||
|
|
||||||
def finalize_content(self, context: AgentHookContext, content: str | None) -> str | None:
|
def finalize_content(self, context: AgentHookContext, content: str | None) -> str | None:
|
||||||
for h in self._hooks:
|
for h in self._hooks:
|
||||||
|
|||||||
@ -9,6 +9,16 @@ from pathlib import Path
|
|||||||
# Default builtin skills directory (relative to this file)
|
# Default builtin skills directory (relative to this file)
|
||||||
BUILTIN_SKILLS_DIR = Path(__file__).parent.parent / "skills"
|
BUILTIN_SKILLS_DIR = Path(__file__).parent.parent / "skills"
|
||||||
|
|
||||||
|
# Opening ---, YAML body (group 1), closing --- on its own line; supports CRLF.
|
||||||
|
_STRIP_SKILL_FRONTMATTER = re.compile(
|
||||||
|
r"^---\s*\r?\n(.*?)\r?\n---\s*\r?\n?",
|
||||||
|
re.DOTALL,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _escape_xml(text: str) -> str:
|
||||||
|
return text.replace("&", "&").replace("<", "<").replace(">", ">")
|
||||||
|
|
||||||
|
|
||||||
class SkillsLoader:
|
class SkillsLoader:
|
||||||
"""
|
"""
|
||||||
@ -23,6 +33,22 @@ class SkillsLoader:
|
|||||||
self.workspace_skills = workspace / "skills"
|
self.workspace_skills = workspace / "skills"
|
||||||
self.builtin_skills = builtin_skills_dir or BUILTIN_SKILLS_DIR
|
self.builtin_skills = builtin_skills_dir or BUILTIN_SKILLS_DIR
|
||||||
|
|
||||||
|
def _skill_entries_from_dir(self, base: Path, source: str, *, skip_names: set[str] | None = None) -> list[dict[str, str]]:
|
||||||
|
if not base.exists():
|
||||||
|
return []
|
||||||
|
entries: list[dict[str, str]] = []
|
||||||
|
for skill_dir in base.iterdir():
|
||||||
|
if not skill_dir.is_dir():
|
||||||
|
continue
|
||||||
|
skill_file = skill_dir / "SKILL.md"
|
||||||
|
if not skill_file.exists():
|
||||||
|
continue
|
||||||
|
name = skill_dir.name
|
||||||
|
if skip_names is not None and name in skip_names:
|
||||||
|
continue
|
||||||
|
entries.append({"name": name, "path": str(skill_file), "source": source})
|
||||||
|
return entries
|
||||||
|
|
||||||
def list_skills(self, filter_unavailable: bool = True) -> list[dict[str, str]]:
|
def list_skills(self, filter_unavailable: bool = True) -> list[dict[str, str]]:
|
||||||
"""
|
"""
|
||||||
List all available skills.
|
List all available skills.
|
||||||
@ -33,27 +59,15 @@ class SkillsLoader:
|
|||||||
Returns:
|
Returns:
|
||||||
List of skill info dicts with 'name', 'path', 'source'.
|
List of skill info dicts with 'name', 'path', 'source'.
|
||||||
"""
|
"""
|
||||||
skills = []
|
skills = self._skill_entries_from_dir(self.workspace_skills, "workspace")
|
||||||
|
workspace_names = {entry["name"] for entry in skills}
|
||||||
# Workspace skills (highest priority)
|
|
||||||
if self.workspace_skills.exists():
|
|
||||||
for skill_dir in self.workspace_skills.iterdir():
|
|
||||||
if skill_dir.is_dir():
|
|
||||||
skill_file = skill_dir / "SKILL.md"
|
|
||||||
if skill_file.exists():
|
|
||||||
skills.append({"name": skill_dir.name, "path": str(skill_file), "source": "workspace"})
|
|
||||||
|
|
||||||
# Built-in skills
|
|
||||||
if self.builtin_skills and self.builtin_skills.exists():
|
if self.builtin_skills and self.builtin_skills.exists():
|
||||||
for skill_dir in self.builtin_skills.iterdir():
|
skills.extend(
|
||||||
if skill_dir.is_dir():
|
self._skill_entries_from_dir(self.builtin_skills, "builtin", skip_names=workspace_names)
|
||||||
skill_file = skill_dir / "SKILL.md"
|
)
|
||||||
if skill_file.exists() and not any(s["name"] == skill_dir.name for s in skills):
|
|
||||||
skills.append({"name": skill_dir.name, "path": str(skill_file), "source": "builtin"})
|
|
||||||
|
|
||||||
# Filter by requirements
|
|
||||||
if filter_unavailable:
|
if filter_unavailable:
|
||||||
return [s for s in skills if self._check_requirements(self._get_skill_meta(s["name"]))]
|
return [skill for skill in skills if self._check_requirements(self._get_skill_meta(skill["name"]))]
|
||||||
return skills
|
return skills
|
||||||
|
|
||||||
def load_skill(self, name: str) -> str | None:
|
def load_skill(self, name: str) -> str | None:
|
||||||
@ -66,17 +80,13 @@ class SkillsLoader:
|
|||||||
Returns:
|
Returns:
|
||||||
Skill content or None if not found.
|
Skill content or None if not found.
|
||||||
"""
|
"""
|
||||||
# Check workspace first
|
roots = [self.workspace_skills]
|
||||||
workspace_skill = self.workspace_skills / name / "SKILL.md"
|
|
||||||
if workspace_skill.exists():
|
|
||||||
return workspace_skill.read_text(encoding="utf-8")
|
|
||||||
|
|
||||||
# Check built-in
|
|
||||||
if self.builtin_skills:
|
if self.builtin_skills:
|
||||||
builtin_skill = self.builtin_skills / name / "SKILL.md"
|
roots.append(self.builtin_skills)
|
||||||
if builtin_skill.exists():
|
for root in roots:
|
||||||
return builtin_skill.read_text(encoding="utf-8")
|
path = root / name / "SKILL.md"
|
||||||
|
if path.exists():
|
||||||
|
return path.read_text(encoding="utf-8")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def load_skills_for_context(self, skill_names: list[str]) -> str:
|
def load_skills_for_context(self, skill_names: list[str]) -> str:
|
||||||
@ -89,14 +99,12 @@ class SkillsLoader:
|
|||||||
Returns:
|
Returns:
|
||||||
Formatted skills content.
|
Formatted skills content.
|
||||||
"""
|
"""
|
||||||
parts = []
|
parts = [
|
||||||
for name in skill_names:
|
f"### Skill: {name}\n\n{self._strip_frontmatter(markdown)}"
|
||||||
content = self.load_skill(name)
|
for name in skill_names
|
||||||
if content:
|
if (markdown := self.load_skill(name))
|
||||||
content = self._strip_frontmatter(content)
|
]
|
||||||
parts.append(f"### Skill: {name}\n\n{content}")
|
return "\n\n---\n\n".join(parts)
|
||||||
|
|
||||||
return "\n\n---\n\n".join(parts) if parts else ""
|
|
||||||
|
|
||||||
def build_skills_summary(self) -> str:
|
def build_skills_summary(self) -> str:
|
||||||
"""
|
"""
|
||||||
@ -112,44 +120,36 @@ class SkillsLoader:
|
|||||||
if not all_skills:
|
if not all_skills:
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
def escape_xml(s: str) -> str:
|
lines: list[str] = ["<skills>"]
|
||||||
return s.replace("&", "&").replace("<", "<").replace(">", ">")
|
for entry in all_skills:
|
||||||
|
skill_name = entry["name"]
|
||||||
lines = ["<skills>"]
|
meta = self._get_skill_meta(skill_name)
|
||||||
for s in all_skills:
|
available = self._check_requirements(meta)
|
||||||
name = escape_xml(s["name"])
|
lines.extend(
|
||||||
path = s["path"]
|
[
|
||||||
desc = escape_xml(self._get_skill_description(s["name"]))
|
f' <skill available="{str(available).lower()}">',
|
||||||
skill_meta = self._get_skill_meta(s["name"])
|
f" <name>{_escape_xml(skill_name)}</name>",
|
||||||
available = self._check_requirements(skill_meta)
|
f" <description>{_escape_xml(self._get_skill_description(skill_name))}</description>",
|
||||||
|
f" <location>{entry['path']}</location>",
|
||||||
lines.append(f" <skill available=\"{str(available).lower()}\">")
|
]
|
||||||
lines.append(f" <name>{name}</name>")
|
)
|
||||||
lines.append(f" <description>{desc}</description>")
|
|
||||||
lines.append(f" <location>{path}</location>")
|
|
||||||
|
|
||||||
# Show missing requirements for unavailable skills
|
|
||||||
if not available:
|
if not available:
|
||||||
missing = self._get_missing_requirements(skill_meta)
|
missing = self._get_missing_requirements(meta)
|
||||||
if missing:
|
if missing:
|
||||||
lines.append(f" <requires>{escape_xml(missing)}</requires>")
|
lines.append(f" <requires>{_escape_xml(missing)}</requires>")
|
||||||
|
|
||||||
lines.append(" </skill>")
|
lines.append(" </skill>")
|
||||||
lines.append("</skills>")
|
lines.append("</skills>")
|
||||||
|
|
||||||
return "\n".join(lines)
|
return "\n".join(lines)
|
||||||
|
|
||||||
def _get_missing_requirements(self, skill_meta: dict) -> str:
|
def _get_missing_requirements(self, skill_meta: dict) -> str:
|
||||||
"""Get a description of missing requirements."""
|
"""Get a description of missing requirements."""
|
||||||
missing = []
|
|
||||||
requires = skill_meta.get("requires", {})
|
requires = skill_meta.get("requires", {})
|
||||||
for b in requires.get("bins", []):
|
required_bins = requires.get("bins", [])
|
||||||
if not shutil.which(b):
|
required_env_vars = requires.get("env", [])
|
||||||
missing.append(f"CLI: {b}")
|
return ", ".join(
|
||||||
for env in requires.get("env", []):
|
[f"CLI: {command_name}" for command_name in required_bins if not shutil.which(command_name)]
|
||||||
if not os.environ.get(env):
|
+ [f"ENV: {env_name}" for env_name in required_env_vars if not os.environ.get(env_name)]
|
||||||
missing.append(f"ENV: {env}")
|
)
|
||||||
return ", ".join(missing)
|
|
||||||
|
|
||||||
def _get_skill_description(self, name: str) -> str:
|
def _get_skill_description(self, name: str) -> str:
|
||||||
"""Get the description of a skill from its frontmatter."""
|
"""Get the description of a skill from its frontmatter."""
|
||||||
@ -160,30 +160,32 @@ class SkillsLoader:
|
|||||||
|
|
||||||
def _strip_frontmatter(self, content: str) -> str:
|
def _strip_frontmatter(self, content: str) -> str:
|
||||||
"""Remove YAML frontmatter from markdown content."""
|
"""Remove YAML frontmatter from markdown content."""
|
||||||
if content.startswith("---"):
|
if not content.startswith("---"):
|
||||||
match = re.match(r"^---\n.*?\n---\n", content, re.DOTALL)
|
return content
|
||||||
if match:
|
match = _STRIP_SKILL_FRONTMATTER.match(content)
|
||||||
return content[match.end():].strip()
|
if match:
|
||||||
|
return content[match.end():].strip()
|
||||||
return content
|
return content
|
||||||
|
|
||||||
def _parse_nanobot_metadata(self, raw: str) -> dict:
|
def _parse_nanobot_metadata(self, raw: str) -> dict:
|
||||||
"""Parse skill metadata JSON from frontmatter (supports nanobot and openclaw keys)."""
|
"""Parse skill metadata JSON from frontmatter (supports nanobot and openclaw keys)."""
|
||||||
try:
|
try:
|
||||||
data = json.loads(raw)
|
data = json.loads(raw)
|
||||||
return data.get("nanobot", data.get("openclaw", {})) if isinstance(data, dict) else {}
|
|
||||||
except (json.JSONDecodeError, TypeError):
|
except (json.JSONDecodeError, TypeError):
|
||||||
return {}
|
return {}
|
||||||
|
if not isinstance(data, dict):
|
||||||
|
return {}
|
||||||
|
payload = data.get("nanobot", data.get("openclaw", {}))
|
||||||
|
return payload if isinstance(payload, dict) else {}
|
||||||
|
|
||||||
def _check_requirements(self, skill_meta: dict) -> bool:
|
def _check_requirements(self, skill_meta: dict) -> bool:
|
||||||
"""Check if skill requirements are met (bins, env vars)."""
|
"""Check if skill requirements are met (bins, env vars)."""
|
||||||
requires = skill_meta.get("requires", {})
|
requires = skill_meta.get("requires", {})
|
||||||
for b in requires.get("bins", []):
|
required_bins = requires.get("bins", [])
|
||||||
if not shutil.which(b):
|
required_env_vars = requires.get("env", [])
|
||||||
return False
|
return all(shutil.which(cmd) for cmd in required_bins) and all(
|
||||||
for env in requires.get("env", []):
|
os.environ.get(var) for var in required_env_vars
|
||||||
if not os.environ.get(env):
|
)
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
def _get_skill_meta(self, name: str) -> dict:
|
def _get_skill_meta(self, name: str) -> dict:
|
||||||
"""Get nanobot metadata for a skill (cached in frontmatter)."""
|
"""Get nanobot metadata for a skill (cached in frontmatter)."""
|
||||||
@ -192,13 +194,15 @@ class SkillsLoader:
|
|||||||
|
|
||||||
def get_always_skills(self) -> list[str]:
|
def get_always_skills(self) -> list[str]:
|
||||||
"""Get skills marked as always=true that meet requirements."""
|
"""Get skills marked as always=true that meet requirements."""
|
||||||
result = []
|
return [
|
||||||
for s in self.list_skills(filter_unavailable=True):
|
entry["name"]
|
||||||
meta = self.get_skill_metadata(s["name"]) or {}
|
for entry in self.list_skills(filter_unavailable=True)
|
||||||
skill_meta = self._parse_nanobot_metadata(meta.get("metadata", ""))
|
if (meta := self.get_skill_metadata(entry["name"]) or {})
|
||||||
if skill_meta.get("always") or meta.get("always"):
|
and (
|
||||||
result.append(s["name"])
|
self._parse_nanobot_metadata(meta.get("metadata", "")).get("always")
|
||||||
return result
|
or meta.get("always")
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
def get_skill_metadata(self, name: str) -> dict | None:
|
def get_skill_metadata(self, name: str) -> dict | None:
|
||||||
"""
|
"""
|
||||||
@ -211,18 +215,15 @@ class SkillsLoader:
|
|||||||
Metadata dict or None.
|
Metadata dict or None.
|
||||||
"""
|
"""
|
||||||
content = self.load_skill(name)
|
content = self.load_skill(name)
|
||||||
if not content:
|
if not content or not content.startswith("---"):
|
||||||
return None
|
return None
|
||||||
|
match = _STRIP_SKILL_FRONTMATTER.match(content)
|
||||||
if content.startswith("---"):
|
if not match:
|
||||||
match = re.match(r"^---\n(.*?)\n---", content, re.DOTALL)
|
return None
|
||||||
if match:
|
metadata: dict[str, str] = {}
|
||||||
# Simple YAML parsing
|
for line in match.group(1).splitlines():
|
||||||
metadata = {}
|
if ":" not in line:
|
||||||
for line in match.group(1).split("\n"):
|
continue
|
||||||
if ":" in line:
|
key, value = line.split(":", 1)
|
||||||
key, value = line.split(":", 1)
|
metadata[key.strip()] = value.strip().strip('"\'')
|
||||||
metadata[key.strip()] = value.strip().strip('"\'')
|
return metadata
|
||||||
return metadata
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|||||||
252
tests/agent/test_skills_loader.py
Normal file
252
tests/agent/test_skills_loader.py
Normal file
@ -0,0 +1,252 @@
|
|||||||
|
"""Tests for nanobot.agent.skills.SkillsLoader."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from nanobot.agent.skills import SkillsLoader
|
||||||
|
|
||||||
|
|
||||||
|
def _write_skill(
|
||||||
|
base: Path,
|
||||||
|
name: str,
|
||||||
|
*,
|
||||||
|
metadata_json: dict | None = None,
|
||||||
|
body: str = "# Skill\n",
|
||||||
|
) -> Path:
|
||||||
|
"""Create ``base / name / SKILL.md`` with optional nanobot metadata JSON."""
|
||||||
|
skill_dir = base / name
|
||||||
|
skill_dir.mkdir(parents=True)
|
||||||
|
lines = ["---"]
|
||||||
|
if metadata_json is not None:
|
||||||
|
payload = json.dumps({"nanobot": metadata_json}, separators=(",", ":"))
|
||||||
|
lines.append(f'metadata: {payload}')
|
||||||
|
lines.extend(["---", "", body])
|
||||||
|
path = skill_dir / "SKILL.md"
|
||||||
|
path.write_text("\n".join(lines), encoding="utf-8")
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_skills_empty_when_skills_dir_missing(tmp_path: Path) -> None:
|
||||||
|
workspace = tmp_path / "ws"
|
||||||
|
workspace.mkdir()
|
||||||
|
builtin = tmp_path / "builtin"
|
||||||
|
builtin.mkdir()
|
||||||
|
loader = SkillsLoader(workspace, builtin_skills_dir=builtin)
|
||||||
|
assert loader.list_skills(filter_unavailable=False) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_skills_empty_when_skills_dir_exists_but_empty(tmp_path: Path) -> None:
|
||||||
|
workspace = tmp_path / "ws"
|
||||||
|
(workspace / "skills").mkdir(parents=True)
|
||||||
|
builtin = tmp_path / "builtin"
|
||||||
|
builtin.mkdir()
|
||||||
|
loader = SkillsLoader(workspace, builtin_skills_dir=builtin)
|
||||||
|
assert loader.list_skills(filter_unavailable=False) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_skills_workspace_entry_shape_and_source(tmp_path: Path) -> None:
|
||||||
|
workspace = tmp_path / "ws"
|
||||||
|
skills_root = workspace / "skills"
|
||||||
|
skills_root.mkdir(parents=True)
|
||||||
|
skill_path = _write_skill(skills_root, "alpha", body="# Alpha")
|
||||||
|
builtin = tmp_path / "builtin"
|
||||||
|
builtin.mkdir()
|
||||||
|
|
||||||
|
loader = SkillsLoader(workspace, builtin_skills_dir=builtin)
|
||||||
|
entries = loader.list_skills(filter_unavailable=False)
|
||||||
|
assert entries == [
|
||||||
|
{"name": "alpha", "path": str(skill_path), "source": "workspace"},
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_skills_skips_non_directories_and_missing_skill_md(tmp_path: Path) -> None:
|
||||||
|
workspace = tmp_path / "ws"
|
||||||
|
skills_root = workspace / "skills"
|
||||||
|
skills_root.mkdir(parents=True)
|
||||||
|
(skills_root / "not_a_dir.txt").write_text("x", encoding="utf-8")
|
||||||
|
(skills_root / "no_skill_md").mkdir()
|
||||||
|
ok_path = _write_skill(skills_root, "ok", body="# Ok")
|
||||||
|
builtin = tmp_path / "builtin"
|
||||||
|
builtin.mkdir()
|
||||||
|
|
||||||
|
loader = SkillsLoader(workspace, builtin_skills_dir=builtin)
|
||||||
|
entries = loader.list_skills(filter_unavailable=False)
|
||||||
|
names = {entry["name"] for entry in entries}
|
||||||
|
assert names == {"ok"}
|
||||||
|
assert entries[0]["path"] == str(ok_path)
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_skills_workspace_shadows_builtin_same_name(tmp_path: Path) -> None:
|
||||||
|
workspace = tmp_path / "ws"
|
||||||
|
ws_skills = workspace / "skills"
|
||||||
|
ws_skills.mkdir(parents=True)
|
||||||
|
ws_path = _write_skill(ws_skills, "dup", body="# Workspace wins")
|
||||||
|
|
||||||
|
builtin = tmp_path / "builtin"
|
||||||
|
_write_skill(builtin, "dup", body="# Builtin")
|
||||||
|
|
||||||
|
loader = SkillsLoader(workspace, builtin_skills_dir=builtin)
|
||||||
|
entries = loader.list_skills(filter_unavailable=False)
|
||||||
|
assert len(entries) == 1
|
||||||
|
assert entries[0]["source"] == "workspace"
|
||||||
|
assert entries[0]["path"] == str(ws_path)
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_skills_merges_workspace_and_builtin(tmp_path: Path) -> None:
|
||||||
|
workspace = tmp_path / "ws"
|
||||||
|
ws_skills = workspace / "skills"
|
||||||
|
ws_skills.mkdir(parents=True)
|
||||||
|
ws_path = _write_skill(ws_skills, "ws_only", body="# W")
|
||||||
|
builtin = tmp_path / "builtin"
|
||||||
|
bi_path = _write_skill(builtin, "bi_only", body="# B")
|
||||||
|
|
||||||
|
loader = SkillsLoader(workspace, builtin_skills_dir=builtin)
|
||||||
|
entries = sorted(loader.list_skills(filter_unavailable=False), key=lambda item: item["name"])
|
||||||
|
assert entries == [
|
||||||
|
{"name": "bi_only", "path": str(bi_path), "source": "builtin"},
|
||||||
|
{"name": "ws_only", "path": str(ws_path), "source": "workspace"},
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_skills_builtin_omitted_when_dir_missing(tmp_path: Path) -> None:
|
||||||
|
workspace = tmp_path / "ws"
|
||||||
|
ws_skills = workspace / "skills"
|
||||||
|
ws_skills.mkdir(parents=True)
|
||||||
|
ws_path = _write_skill(ws_skills, "solo", body="# S")
|
||||||
|
missing_builtin = tmp_path / "no_such_builtin"
|
||||||
|
|
||||||
|
loader = SkillsLoader(workspace, builtin_skills_dir=missing_builtin)
|
||||||
|
entries = loader.list_skills(filter_unavailable=False)
|
||||||
|
assert entries == [{"name": "solo", "path": str(ws_path), "source": "workspace"}]
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_skills_filter_unavailable_excludes_unmet_bin_requirement(
|
||||||
|
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||||
|
) -> None:
|
||||||
|
workspace = tmp_path / "ws"
|
||||||
|
skills_root = workspace / "skills"
|
||||||
|
skills_root.mkdir(parents=True)
|
||||||
|
_write_skill(
|
||||||
|
skills_root,
|
||||||
|
"needs_bin",
|
||||||
|
metadata_json={"requires": {"bins": ["nanobot_test_fake_binary"]}},
|
||||||
|
)
|
||||||
|
builtin = tmp_path / "builtin"
|
||||||
|
builtin.mkdir()
|
||||||
|
|
||||||
|
def fake_which(cmd: str) -> str | None:
|
||||||
|
if cmd == "nanobot_test_fake_binary":
|
||||||
|
return None
|
||||||
|
return "/usr/bin/true"
|
||||||
|
|
||||||
|
monkeypatch.setattr("nanobot.agent.skills.shutil.which", fake_which)
|
||||||
|
|
||||||
|
loader = SkillsLoader(workspace, builtin_skills_dir=builtin)
|
||||||
|
assert loader.list_skills(filter_unavailable=True) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_skills_filter_unavailable_includes_when_bin_requirement_met(
|
||||||
|
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||||
|
) -> None:
|
||||||
|
workspace = tmp_path / "ws"
|
||||||
|
skills_root = workspace / "skills"
|
||||||
|
skills_root.mkdir(parents=True)
|
||||||
|
skill_path = _write_skill(
|
||||||
|
skills_root,
|
||||||
|
"has_bin",
|
||||||
|
metadata_json={"requires": {"bins": ["nanobot_test_fake_binary"]}},
|
||||||
|
)
|
||||||
|
builtin = tmp_path / "builtin"
|
||||||
|
builtin.mkdir()
|
||||||
|
|
||||||
|
def fake_which(cmd: str) -> str | None:
|
||||||
|
if cmd == "nanobot_test_fake_binary":
|
||||||
|
return "/fake/nanobot_test_fake_binary"
|
||||||
|
return None
|
||||||
|
|
||||||
|
monkeypatch.setattr("nanobot.agent.skills.shutil.which", fake_which)
|
||||||
|
|
||||||
|
loader = SkillsLoader(workspace, builtin_skills_dir=builtin)
|
||||||
|
entries = loader.list_skills(filter_unavailable=True)
|
||||||
|
assert entries == [
|
||||||
|
{"name": "has_bin", "path": str(skill_path), "source": "workspace"},
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_skills_filter_unavailable_false_keeps_unmet_requirements(
|
||||||
|
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||||
|
) -> None:
|
||||||
|
workspace = tmp_path / "ws"
|
||||||
|
skills_root = workspace / "skills"
|
||||||
|
skills_root.mkdir(parents=True)
|
||||||
|
skill_path = _write_skill(
|
||||||
|
skills_root,
|
||||||
|
"blocked",
|
||||||
|
metadata_json={"requires": {"bins": ["nanobot_test_fake_binary"]}},
|
||||||
|
)
|
||||||
|
builtin = tmp_path / "builtin"
|
||||||
|
builtin.mkdir()
|
||||||
|
|
||||||
|
monkeypatch.setattr("nanobot.agent.skills.shutil.which", lambda _cmd: None)
|
||||||
|
|
||||||
|
loader = SkillsLoader(workspace, builtin_skills_dir=builtin)
|
||||||
|
entries = loader.list_skills(filter_unavailable=False)
|
||||||
|
assert entries == [
|
||||||
|
{"name": "blocked", "path": str(skill_path), "source": "workspace"},
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_skills_filter_unavailable_excludes_unmet_env_requirement(
|
||||||
|
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||||
|
) -> None:
|
||||||
|
workspace = tmp_path / "ws"
|
||||||
|
skills_root = workspace / "skills"
|
||||||
|
skills_root.mkdir(parents=True)
|
||||||
|
_write_skill(
|
||||||
|
skills_root,
|
||||||
|
"needs_env",
|
||||||
|
metadata_json={"requires": {"env": ["NANOBOT_SKILLS_TEST_ENV_VAR"]}},
|
||||||
|
)
|
||||||
|
builtin = tmp_path / "builtin"
|
||||||
|
builtin.mkdir()
|
||||||
|
|
||||||
|
monkeypatch.delenv("NANOBOT_SKILLS_TEST_ENV_VAR", raising=False)
|
||||||
|
|
||||||
|
loader = SkillsLoader(workspace, builtin_skills_dir=builtin)
|
||||||
|
assert loader.list_skills(filter_unavailable=True) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_skills_openclaw_metadata_parsed_for_requirements(
|
||||||
|
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||||
|
) -> None:
|
||||||
|
workspace = tmp_path / "ws"
|
||||||
|
skills_root = workspace / "skills"
|
||||||
|
skills_root.mkdir(parents=True)
|
||||||
|
skill_dir = skills_root / "openclaw_skill"
|
||||||
|
skill_dir.mkdir(parents=True)
|
||||||
|
skill_path = skill_dir / "SKILL.md"
|
||||||
|
oc_payload = json.dumps({"openclaw": {"requires": {"bins": ["nanobot_oc_bin"]}}}, separators=(",", ":"))
|
||||||
|
skill_path.write_text(
|
||||||
|
"\n".join(["---", f"metadata: {oc_payload}", "---", "", "# OC"]),
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
builtin = tmp_path / "builtin"
|
||||||
|
builtin.mkdir()
|
||||||
|
|
||||||
|
monkeypatch.setattr("nanobot.agent.skills.shutil.which", lambda _cmd: None)
|
||||||
|
|
||||||
|
loader = SkillsLoader(workspace, builtin_skills_dir=builtin)
|
||||||
|
assert loader.list_skills(filter_unavailable=True) == []
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"nanobot.agent.skills.shutil.which",
|
||||||
|
lambda cmd: "/x" if cmd == "nanobot_oc_bin" else None,
|
||||||
|
)
|
||||||
|
entries = loader.list_skills(filter_unavailable=True)
|
||||||
|
assert entries == [
|
||||||
|
{"name": "openclaw_skill", "path": str(skill_path), "source": "workspace"},
|
||||||
|
]
|
||||||
@ -1,3 +1,6 @@
|
|||||||
|
import shlex
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from nanobot.agent.tools import (
|
from nanobot.agent.tools import (
|
||||||
@ -546,10 +549,15 @@ async def test_exec_head_tail_truncation() -> None:
|
|||||||
"""Long output should preserve both head and tail."""
|
"""Long output should preserve both head and tail."""
|
||||||
tool = ExecTool()
|
tool = ExecTool()
|
||||||
# Generate output that exceeds _MAX_OUTPUT (10_000 chars)
|
# Generate output that exceeds _MAX_OUTPUT (10_000 chars)
|
||||||
# Use python to generate output to avoid command line length limits
|
# Use current interpreter (PATH may not have `python`). ExecTool uses
|
||||||
result = await tool.execute(
|
# create_subprocess_shell: POSIX needs shlex.quote; Windows uses cmd.exe
|
||||||
command="python -c \"print('A' * 6000 + '\\n' + 'B' * 6000)\""
|
# rules, so list2cmdline is appropriate there.
|
||||||
)
|
script = "print('A' * 6000 + '\\n' + 'B' * 6000)"
|
||||||
|
if sys.platform == "win32":
|
||||||
|
command = subprocess.list2cmdline([sys.executable, "-c", script])
|
||||||
|
else:
|
||||||
|
command = f"{shlex.quote(sys.executable)} -c {shlex.quote(script)}"
|
||||||
|
result = await tool.execute(command=command)
|
||||||
assert "chars truncated" in result
|
assert "chars truncated" in result
|
||||||
# Head portion should start with As
|
# Head portion should start with As
|
||||||
assert result.startswith("A")
|
assert result.startswith("A")
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user