mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-04-05 02:42:41 +00:00
50 lines
1.5 KiB
Python
50 lines
1.5 KiB
Python
"""Shared lifecycle hook primitives for agent runs."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
from nanobot.providers.base import LLMResponse, ToolCallRequest
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class AgentHookContext:
|
|
"""Mutable per-iteration state exposed to runner hooks."""
|
|
|
|
iteration: int
|
|
messages: list[dict[str, Any]]
|
|
response: LLMResponse | None = None
|
|
usage: dict[str, int] = field(default_factory=dict)
|
|
tool_calls: list[ToolCallRequest] = field(default_factory=list)
|
|
tool_results: list[Any] = field(default_factory=list)
|
|
tool_events: list[dict[str, str]] = field(default_factory=list)
|
|
final_content: str | None = None
|
|
stop_reason: str | None = None
|
|
error: str | None = None
|
|
|
|
|
|
class AgentHook:
|
|
"""Minimal lifecycle surface for shared runner customization."""
|
|
|
|
def wants_streaming(self) -> bool:
|
|
return False
|
|
|
|
async def before_iteration(self, context: AgentHookContext) -> None:
|
|
pass
|
|
|
|
async def on_stream(self, context: AgentHookContext, delta: str) -> None:
|
|
pass
|
|
|
|
async def on_stream_end(self, context: AgentHookContext, *, resuming: bool) -> None:
|
|
pass
|
|
|
|
async def before_execute_tools(self, context: AgentHookContext) -> None:
|
|
pass
|
|
|
|
async def after_iteration(self, context: AgentHookContext) -> None:
|
|
pass
|
|
|
|
def finalize_content(self, context: AgentHookContext, content: str | None) -> str | None:
|
|
return content
|