Xubin Ren 1c2ea1aad2
feat(goal): /goal command & long-running tasks (long_task)
* feat(long-task): add LongTaskTool for multi-step agent tasks

Implements a meta-ReAct loop where long-running tasks are broken into
sequential subagent steps, each starting fresh with the original goal
and progress from the previous step. This prevents context drift when
agents work on complex, multi-step tasks.

- Extract build_tool_registry() from SubagentManager for reuse
- Add run_step() for synchronous subagent execution (no bus announcement)
- Add HandoffTool and CompleteTool as signal mechanisms via shared dict
- Add LongTaskTool orchestrator with simplified prompt (8 iterations/step)
- Register LongTaskTool in main agent loop
- Add _extract_handoff_from_messages fallback for robustness

* fix(long-task): add debug logging for step-level observability

* feat(long-task): major overhaul with structured handoffs, validation, and observability

- Structured HandoffState: HandoffTool now accepts files_created,
  files_modified, next_step_hint, and verification fields instead of
  a plain string. Progress is passed between steps as structured data.

- Completion validation round: After complete() is called, a dedicated
  validator step runs to verify the claim against the original goal.
  If validation fails, the task continues rather than returning
  a false completion.

- Dynamic prompt system: 3 Jinja2 templates (step_start, step_middle,
  step_final) selected based on step number. Final steps get tighter
  budget and stronger "wrap up" guidance.

- Automatic file change tracking: Extracts write_file/edit_file events
  from tool_events and injects them into the next step's context if
  the subagent forgot to report them explicitly.

- Budget tracking & adaptive strategy: Cumulative token usage is tracked
  across steps. Per-step tool budget drops from 8 to 4 in the last
  two steps to force handoff/completion.

- Crash retry with graceful degradation: A step that crashes is retried
  once. Persistent crashes terminate the task and return partial progress.

- Full observability hooks for future WebUI integration:
  - set_hooks() with on_step_start, on_step_complete, on_handoff,
    on_validation_started, on_validation_passed, on_validation_failed,
    on_task_complete, on_task_error, and catch-all on_event.
  - Readable state properties: current_step, total_steps, status,
    last_handoff, cumulative_usage, goal.
  - inject_correction() allows external code to send user corrections
    that are injected into the next step's prompt.

- run_step() accepts optional max_iterations for dynamic budget control.

All 27 long-task tests and 11 subagent tests pass.

* test(long-task): add boundary tests and fix race conditions

- Add 7 edge-case tests: validation crash resilience, hook exception safety, mid-run correction injection, FIFO correction ordering, explicit file changes overriding auto-detection, final budget for max_steps=1, and dynamic budget switching boundaries

- Fix assertion in test_long_task_completes_after_multiple_handoffs to match exact prompt format

- Remove asyncio timing hack from test_state_exposure

- Add asyncio.sleep(0) yield in test_inject_correction_during_execution to prevent race between signal injection and step continuation

- All 34 tests passing

* fix(long-task): address code review findings

- Declare _scopes = {"core"} explicitly to prevent recursive nesting in subagent scope
- Document fragile coupling in _extract_file_changes: path extraction depends on
  write_file/edit_file detail format; add debug log for unexpected formats
- Align final-template threshold (max_steps - 2) with budget switch threshold
- Eliminate hasattr(self, "_state") in _reset_state by initializing in __init__

* fix(long-task): honor final signal and file tracking

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(long-task): improve prompt structure and agent contract

- Expand LongTaskTool.description to instruct parent agent on goal
  construction, return value semantics, and how to handle results.
- Expand CompleteTool.description to emphasize that the summary IS the
  final answer returned to the parent agent.
- Prefix validated return value with an explicit "final answer" directive
  to stop parent agent from re-running work.
- Redesign step_start.md: Step 1 is now explicitly for exploration,
  planning, and skeleton-building. complete() is discouraged.
- Remove bulky payload debug logging from _emit(); add targeted
  info/warning/error logs at key state transitions instead.
- Add signal_type to HandoffState for cleaner signal detection.

* test(long-task): expect wrapped completion message after validation

Align assertions with LongTaskTool final return shape on main.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(webui): turn timing strip, latency, and session-switch restore

- Agent loop: publish goal_status run/idle for WebSocket turns; attach
  wall-clock latency_ms on turn_end and persisted assistant metadata.
- WebSocket channel: forward goal_status and latency fields to clients.
- NanobotClient: track goal_status started_at per chat without requiring
  onChat; useNanobotStream restores run strip when returning to a chat.
- Thread UI: composer/shell viewport hooks for run duration and latency;
  format helpers and i18n strings.
- MessageBubble: drop trailing StreamCursor (layout artifact vs block markdown).
- Builtin / tests: model command coverage, websocket and loop tests.

Covers multi-session UX and round-trip timing visibility for the WebUI.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix: keep message-tool file attachments after canonical history hydrate

- MessageTool records per-turn media paths delivered to the active chat.
- nanobot.utils.session_attachments stages out-of-media-root files and
  merges into the last assistant message before save (loop stays a thin call).
- WebUI MediaCell: use a signed URL as a real download link when present.

Fixes attachments flashing then vanishing on turn_end when paths lived
outside get_media_dir (e.g. workspace files).

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(webui): agent activity cluster, stable keys, LTR sheen labels

- Group reasoning and tool traces in AgentActivityCluster with i18n summaries
- Stabilize React list keys for activity clusters (first message id anchor)
- Replace background-clip shimmer with overlay sheen for streaming labels
- ThreadMessages/MessageList integration and locale strings

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(webui): render assistant reasoning with Markdown + deferred stream

- Use MarkdownText for ReasoningBubble body (same GFM/KaTeX path as replies)
- Apply muted/italic prose tokens so thinking stays visually subordinate
- useDeferredValue while reasoningStreaming to ease parser work during deltas
- Preload markdown chunk when trace opens; add regression test with preloaded renderer

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(webui): default-collapse agent activity cluster while Working

Outer fold no longer auto-expands during isTurnStreaming; user opens to see traces.
Header sheen and live summary unchanged.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(long_task): cumulative run history, file union, and prompt tuning

Inject cross-step summaries and merged file paths into middle/final step
templates so chains do not lose early context. Strip the last run-history
block when it duplicates Previous Progress to save tokens. Add optional
cumulative_prompt_max_chars and cumulative_step_body_max_chars parameters
with clamped defaults.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(webui): session switch keeps in-flight thread and replays buffered WS

Save the prior chat message list to the per-chat cache in a layout effect
when chatId changes (before stale writes could corrupt another chat).
Skip one post-switch layout cache tick so we do not snapshot the wrong tab.

Buffer inbound events per chat_id when no onChat subscriber is registered
(e.g. user focused another session) and drain on resubscribe up to a cap,
so streaming deltas are not lost while off-tab.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(webui): snap thread scroll to bottom on session open (no smooth glide)

Use scroll-behavior auto on the viewport, instant programmatic scroll when
following new messages and on scrollToBottomSignal. Keep smooth only for
the explicit scroll-to-bottom button.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(webui): respect manual scroll-up after opening a session

Track when the user leaves the bottom with a ref and skip ResizeObserver
and deferred bottom snaps until they return or the conversation is reset.
Remove the time-based force-bottom window that overrode atBottom.

Multi-frame scrollToBottom honours the same guard unless force (scroll button).

Co-authored-by: Cursor <cursoragent@cursor.com>

* Publish long_task UI snapshots on outbound metadata

- Add OUTBOUND_META_AGENT_UI (_agent_ui) for channel-agnostic structured state
- LongTaskTool publishes {kind: long_task, data: snapshot} on the bus with _progress
- WebSocket send forwards metadata as agent_ui for WebUI clients
- Tests for bus payload, WS frame, and progress assertions
- Fix loop progress tests: ignore _goal_status in streaming final filter and
  avoid brittle outbound[-1] ordering after goal status idle messages

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat: WebUI long_task activity card and resilient history merge

Add optional ui_summary to the long_task tool for one-line UI labels. Stream
long_task agent_ui into a dedicated message row with timeline, markdown peek,
and a right sheet for details. Merge canonical history after turn_end while
re-inserting long_task rows before the final assistant reply. Collapse
duplicate task_start/step_start steps in the timeline and extend i18n.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor: align long_task with thread_goal and drop orchestrator UI

- Persist sustained objectives via session metadata (long_task / complete_goal); no subagent wiring or tool-driven agent_ui payloads.\n- Remove WebUI long-task activity UI, types, and translations; history merge preserves trace replay only, with legacy long_task rows normalized to traces.\n- Drop long_task prompt templates and get_long_task_run_dir; add webui thread disk helper for gateway persistence tests.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(agent): thread goal runtime context, tools, and skill

- Add thread_goal_state helper and mirror active objectives into Runtime Context
- Wire loop/context/memory/events as needed for goal metadata in turns
- Expand long_task / complete_goal semantics (pivot/cancel/honest recap)
- Add always-on thread-goal SKILL.md; align /goal command prompt
- Tests for context builder and thread goal state
- Remove unused webui ChatPane component

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(thread-goal): add websocket snapshot helper and publish goal updates from long_task

Introduce thread_goal_ws_blob for bounded JSON snapshots, attach snapshots to
websocket turn_end metadata in AgentLoop, and let long_task fan-out dedicated
thread_goal frames on the websocket channel after persisting session metadata.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(channels): websocket thread_goal frames, turn_end replay, and session API scrub for subagent inject

Emit thread_goal events and optional thread_goal on turn_end; scrub persisted
subagent announce blobs on GET /api/sessions/.../messages and shorten session
list previews so WebUI does not surface full Task/Summarize scaffolding.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(webui): merge ephemeral traces per user turn when reconciling canonical history

Preserve disk/live trace rows inside the matching user–assistant segment instead
of stacking every trace before the final assistant reply (fixes inflated tool
counts after refresh or session switch).

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(webui): show assistant reply copy only on the last slice before the next user turn

Avoid duplicate copy affordances on intermediate assistant bubbles that precede
more agent activity in the same turn (tools or further assistant text).

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(webui): thread_goal stream plumbing, composer goal strip, sky glow, and client-side subagent scrub projection

Track thread_goal and turn_goal snapshots in NanobotClient, hydrate React state
from thread_goal frames and turn_end, surface objective/elapsed in the composer,
add breathing sky halo CSS while goals are active, mirror server scrub logic on
history hydration and webui_thread snapshots, and extend tests/client mocks.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(channels): add Slack Socket Mode connect timeout with actionable timeout errors

Abort hung websockets.connect handshakes after a bounded wait, log REST-vs-WSS
guidance, surface RuntimeError to channel startup, and log successful WSS setup.

Co-authored-by: Cursor <cursoragent@cursor.com>

* webui: expand thread goal in composer bottom sheet

Add ChevronUp control on the run/goal strip that opens a bottom Sheet
with full ui_summary and objective. Inline preview logic in RunElapsedStrip,
add i18n strings across locales, and a composer unit test.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(webui): widen dedupeToolCallsForUi input for session API typing

fetchSessionMessages types tool_calls as unknown; accept unknown so tsc
build passes when passing message.tool_calls through.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(agent): extract WebSocket turn run status to webui_turn_helpers

* refactor(skills): rename thread-goal to long-task and document idempotent goals

* feat(skills): rename sustained-goal skill to long-goal and tighten long_task guidance

* chore: remove unused subagent/context/router helpers

* feat(session): rename sustained goal to goal_state and align WS/WebUI

- Move helpers from agent/thread_goal_state to session/goal_state:
  GOAL_STATE_KEY, goal_state_runtime_lines, goal_state_ws_blob, parse_goal_state.
- Session metadata now uses "goal_state"; still read legacy "thread_goal";
  long_task writes drop the legacy key after save.
- WebSocket: event/field goal_state, _goal_state_sync; turn_end carries goal_state;
  accept legacy _thread_goal_sync/thread_goal inbound metadata for dispatch.
- WebUI: GoalStateWsPayload, goalState hook/client props, i18n keys goalState*.
- Runtime Context copy uses "Goal (active):" instead of "Thread goal".

* feat(agent): stream Anthropic thinking deltas and fix stream idle timeout

* refactor(webui): transcript jsonl as sole timeline source

* fix(agent): reject mismatched WS message chat_id and stream reasoning deltas

* feat(webui): hydrate sustained goal and run timer after websocket subscribe

* chore(webui,websocket): remove unused fetch helpers and legacy thread_goal WS paths

* Raise default max_tokens and context window in agent schema.

Align AgentDefaults and ModelPresetConfig with typical Claude-scale usage
(32k completion budget, 256k context window) and update migration tests.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(gateway): bootstrap prefers in-memory model; clarify websocket naming

* fix(websocket): websocket _handle_message passes is_dm; refresh /status test expectations

---------

Co-authored-by: chengyongru <2755839590@qq.com>
Co-authored-by: chengyongru <chengyongru.ai@gmail.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-16 01:14:11 +08:00

730 lines
29 KiB
Python

"""Slack channel implementation using Socket Mode."""
import asyncio
import re
from pathlib import Path
from typing import Any
import httpx
from pydantic import Field
from slack_sdk.socket_mode.request import SocketModeRequest
from slack_sdk.socket_mode.response import SocketModeResponse
from slack_sdk.socket_mode.websockets import SocketModeClient
from slack_sdk.web.async_client import AsyncWebClient
from slackify_markdown import slackify_markdown
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.paths import get_media_dir
from nanobot.config.schema import Base
from nanobot.pairing import is_approved
from nanobot.utils.helpers import safe_filename, split_message
class SlackDMConfig(Base):
"""Slack DM policy configuration."""
enabled: bool = True
policy: str = "open"
allow_from: list[str] = Field(default_factory=list)
class SlackConfig(Base):
"""Slack channel configuration."""
enabled: bool = False
mode: str = "socket"
webhook_path: str = "/slack/events"
bot_token: str = ""
app_token: str = ""
user_token_read_only: bool = True
reply_in_thread: bool = True
react_emoji: str = "eyes"
done_emoji: str = "white_check_mark"
include_thread_context: bool = True
thread_context_limit: int = 20
allow_from: list[str] = Field(default_factory=list)
group_policy: str = "mention"
group_allow_from: list[str] = Field(default_factory=list)
dm: SlackDMConfig = Field(default_factory=SlackDMConfig)
SLACK_MAX_MESSAGE_LEN = 39_000 # Slack API allows ~40k; leave margin
SLACK_DOWNLOAD_TIMEOUT = 30.0
# Abort Socket Mode WSS handshake after this many seconds. REST auth_test can still
# succeed while WSS blocks (firewall / region). slack-sdk does not apply HTTP(S)_PROXY
# to websockets.connect — see slack_sdk.socket_mode.websockets.SocketModeClient.connect.
SLACK_SOCKET_CONNECT_TIMEOUT_S = 45.0
_HTML_DOWNLOAD_PREFIXES = (b"<!doctype html", b"<html")
class SlackChannel(BaseChannel):
"""Slack channel using Socket Mode."""
name = "slack"
display_name = "Slack"
_SLACK_ID_RE = re.compile(r"^[CDGUW][A-Z0-9]{2,}$")
_SLACK_CHANNEL_REF_RE = re.compile(r"^<#([A-Z0-9]+)(?:\|[^>]+)?>$")
_SLACK_USER_REF_RE = re.compile(r"^<@([A-Z0-9]+)(?:\|[^>]+)?>$")
@classmethod
def default_config(cls) -> dict[str, Any]:
return SlackConfig().model_dump(by_alias=True)
_THREAD_CONTEXT_CACHE_LIMIT = 10_000
def __init__(self, config: Any, bus: MessageBus):
if isinstance(config, dict):
config = SlackConfig.model_validate(config)
super().__init__(config, bus)
self.config: SlackConfig = config
self._web_client: AsyncWebClient | None = None
self._socket_client: SocketModeClient | None = None
self._bot_user_id: str | None = None
self._target_cache: dict[str, str] = {}
self._thread_context_attempted: set[str] = set()
async def start(self) -> None:
"""Start the Slack Socket Mode client."""
if not self.config.bot_token or not self.config.app_token:
self.logger.error("bot/app token not configured")
return
if self.config.mode != "socket":
self.logger.error("Unsupported mode: {}", self.config.mode)
return
self._running = True
self._web_client = AsyncWebClient(token=self.config.bot_token)
self._socket_client = SocketModeClient(
app_token=self.config.app_token,
web_client=self._web_client,
)
self._socket_client.socket_mode_request_listeners.append(self._on_socket_request)
# Resolve bot user ID for mention handling
try:
auth = await self._web_client.auth_test()
self._bot_user_id = auth.get("user_id")
self.logger.info("bot connected as {}", self._bot_user_id)
except Exception as e:
self.logger.warning("auth_test failed: {}", e)
self.logger.info("Starting Socket Mode client...")
try:
await asyncio.wait_for(
self._socket_client.connect(),
timeout=SLACK_SOCKET_CONNECT_TIMEOUT_S,
)
except asyncio.TimeoutError:
self.logger.error(
"Slack Socket Mode WebSocket handshake timed out after {:.0f}s. "
"auth_test uses HTTPS and may still succeed while WSS is blocked. "
"Check outbound access to Slack WebSockets; slack-sdk Socket Mode "
"does not apply HTTP(S)_PROXY to websockets.connect.",
SLACK_SOCKET_CONNECT_TIMEOUT_S,
)
await self.stop()
raise RuntimeError("Slack Socket Mode WebSocket connect timed out") from None
self.logger.info("Slack Socket Mode WebSocket connected (events enabled)")
while self._running:
await asyncio.sleep(1)
async def stop(self) -> None:
"""Stop the Slack client."""
self._running = False
if self._socket_client:
try:
await self._socket_client.close()
except Exception as e:
self.logger.warning("socket close failed: {}", e)
self._socket_client = None
async def send(self, msg: OutboundMessage) -> None:
"""Send a message through Slack."""
if not self._web_client:
self.logger.warning("client not running")
return
try:
target_chat_id = await self._resolve_target_chat_id(msg.chat_id)
slack_meta = msg.metadata.get("slack", {}) if msg.metadata else {}
thread_ts = slack_meta.get("thread_ts")
origin_chat_id = str((slack_meta.get("event", {}) or {}).get("channel") or msg.chat_id)
# Reply in the same thread the inbound message belongs to (works
# for both real channel threads and DM threads). When the agent
# is forwarding to a different channel, drop thread_ts because it
# only makes sense within the originating conversation.
thread_ts_param = thread_ts if thread_ts and target_chat_id == origin_chat_id else None
is_progress = (msg.metadata or {}).get("_progress", False)
if is_progress and not msg.content:
pass # skip empty progress messages (e.g. tool-event-only updates)
elif msg.content or not (msg.media or []):
mrkdwn = self._to_mrkdwn(msg.content) if msg.content else " "
buttons = getattr(msg, "buttons", None) or []
chunks = split_message(mrkdwn, SLACK_MAX_MESSAGE_LEN)
for index, chunk in enumerate(chunks):
kwargs: dict[str, Any] = dict(
channel=target_chat_id, text=chunk, thread_ts=thread_ts_param,
)
if buttons and index == len(chunks) - 1:
kwargs["blocks"] = self._build_button_blocks(chunk, buttons)
await self._web_client.chat_postMessage(**kwargs)
for media_path in msg.media or []:
try:
await self._web_client.files_upload_v2(
channel=target_chat_id,
file=media_path,
thread_ts=thread_ts_param,
)
except Exception:
self.logger.exception("Failed to upload file {}", media_path)
# Update reaction emoji when the final (non-progress) response is sent
if not (msg.metadata or {}).get("_progress"):
event = slack_meta.get("event", {})
await self._update_react_emoji(origin_chat_id, event.get("ts"))
except Exception:
self.logger.exception("Error sending message")
raise
async def _resolve_target_chat_id(self, target: str) -> str:
"""Resolve human-friendly Slack targets to concrete IDs when needed."""
if not self._web_client:
return target
target = target.strip()
if not target:
return target
if match := self._SLACK_CHANNEL_REF_RE.fullmatch(target):
return match.group(1)
if match := self._SLACK_USER_REF_RE.fullmatch(target):
return await self._open_dm_for_user(match.group(1))
if self._SLACK_ID_RE.fullmatch(target):
if target.startswith(("U", "W")):
return await self._open_dm_for_user(target)
return target
if target.startswith("#"):
return await self._resolve_channel_name(target[1:])
if target.startswith("@"):
return await self._resolve_user_handle(target[1:])
try:
return await self._resolve_channel_name(target)
except ValueError:
return await self._resolve_user_handle(target)
async def _resolve_channel_name(self, name: str) -> str:
normalized = self._normalize_target_name(name)
if not normalized:
raise ValueError("Slack target channel name is empty")
cache_key = f"channel:{normalized}"
if cache_key in self._target_cache:
return self._target_cache[cache_key]
cursor: str | None = None
while True:
response = await self._web_client.conversations_list(
types="public_channel,private_channel",
exclude_archived=True,
limit=200,
cursor=cursor,
)
for channel in response.get("channels", []):
if self._normalize_target_name(str(channel.get("name") or "")) == normalized:
channel_id = str(channel.get("id") or "")
if channel_id:
self._target_cache[cache_key] = channel_id
return channel_id
cursor = ((response.get("response_metadata") or {}).get("next_cursor") or "").strip()
if not cursor:
break
raise ValueError(
f"Slack channel '{name}' was not found. Use a joined channel name like "
f"'#general' or a concrete channel ID."
)
async def _resolve_user_handle(self, handle: str) -> str:
normalized = self._normalize_target_name(handle)
if not normalized:
raise ValueError("Slack target user handle is empty")
cache_key = f"user:{normalized}"
if cache_key in self._target_cache:
return self._target_cache[cache_key]
cursor: str | None = None
while True:
response = await self._web_client.users_list(limit=200, cursor=cursor)
for member in response.get("members", []):
if self._member_matches_handle(member, normalized):
user_id = str(member.get("id") or "")
if not user_id:
continue
dm_id = await self._open_dm_for_user(user_id)
self._target_cache[cache_key] = dm_id
return dm_id
cursor = ((response.get("response_metadata") or {}).get("next_cursor") or "").strip()
if not cursor:
break
raise ValueError(
f"Slack user '{handle}' was not found. Use '@name' or a concrete DM/channel ID."
)
async def _open_dm_for_user(self, user_id: str) -> str:
response = await self._web_client.conversations_open(users=user_id)
channel_id = str(((response.get("channel") or {}).get("id")) or "")
if not channel_id:
raise ValueError(f"Slack DM target for user '{user_id}' could not be opened.")
return channel_id
@staticmethod
def _normalize_target_name(value: str) -> str:
return value.strip().lstrip("#@").lower()
@classmethod
def _member_matches_handle(cls, member: dict[str, Any], normalized: str) -> bool:
profile = member.get("profile") or {}
candidates = {
str(member.get("name") or ""),
str(profile.get("display_name") or ""),
str(profile.get("display_name_normalized") or ""),
str(profile.get("real_name") or ""),
str(profile.get("real_name_normalized") or ""),
}
return normalized in {cls._normalize_target_name(candidate) for candidate in candidates if candidate}
async def _on_socket_request(
self,
client: SocketModeClient,
req: SocketModeRequest,
) -> None:
"""Handle incoming Socket Mode requests."""
if req.type == "interactive":
await self._on_block_action(client, req)
return
if req.type != "events_api":
return
# Acknowledge right away
await client.send_socket_mode_response(
SocketModeResponse(envelope_id=req.envelope_id)
)
payload = req.payload or {}
event = payload.get("event") or {}
event_type = event.get("type")
# Handle app mentions or plain messages
if event_type not in ("message", "app_mention"):
return
sender_id = event.get("user")
chat_id = event.get("channel")
subtype = event.get("subtype")
# Slack uses subtype=file_share for user messages with attachments.
# Ignore other subtypes such as bot_message / message_changed / deleted.
if subtype and subtype != "file_share":
return
if self._bot_user_id and sender_id == self._bot_user_id:
return
# Avoid double-processing: Slack sends both `message` and `app_mention`
# for mentions in channels. Prefer `app_mention`.
text = event.get("text") or ""
if event_type == "message" and self._bot_user_id and f"<@{self._bot_user_id}>" in text:
return
# Debug: log basic event shape
self.logger.debug(
"event: type={} subtype={} user={} channel={} channel_type={} text={}",
event_type,
subtype,
sender_id,
chat_id,
event.get("channel_type"),
text[:80],
)
if not sender_id or not chat_id:
return
channel_type = event.get("channel_type") or ""
if not self._is_allowed(sender_id, chat_id, channel_type):
if channel_type == "im" and self.config.dm.enabled:
await self._handle_message(
sender_id=sender_id,
chat_id=chat_id,
content="",
is_dm=True,
)
return
if channel_type != "im" and not self._should_respond_in_channel(event_type, text, chat_id):
return
text = self._strip_bot_mention(text)
event_ts = event.get("ts")
raw_thread_ts = event.get("thread_ts")
thread_ts = raw_thread_ts
# In DMs we don't auto-open a thread on top-level messages (it would
# bury replies under "1 reply"). But if the user explicitly opened a
# thread inside the DM, raw_thread_ts is set and we honor it.
if (
self.config.reply_in_thread
and not thread_ts
and channel_type != "im"
):
thread_ts = event_ts
# Add :eyes: reaction to the triggering message (best-effort)
try:
if self._web_client and event.get("ts"):
await self._web_client.reactions_add(
channel=chat_id,
name=self.config.react_emoji,
timestamp=event.get("ts"),
)
except Exception as e:
self.logger.debug("reactions_add failed: {}", e)
# Thread-scoped session key whenever the user is in a real thread
# (raw_thread_ts is set). DM threads get their own session, separate
# from the DM root, so context doesn't bleed across thread boundaries.
session_key = (
f"slack:{chat_id}:{thread_ts}" if thread_ts and raw_thread_ts else None
)
media_paths: list[str] = []
file_markers: list[str] = []
for file_info in event.get("files") or []:
if not isinstance(file_info, dict):
continue
file_path, marker = await self._download_slack_file(file_info)
if file_path:
media_paths.append(file_path)
if marker:
file_markers.append(marker)
is_slash = text.strip().startswith("/")
content = text if is_slash else await self._with_thread_context(
text,
chat_id=chat_id,
channel_type=channel_type,
thread_ts=thread_ts,
raw_thread_ts=raw_thread_ts,
current_ts=event_ts,
)
if file_markers:
content = "\n".join(part for part in [content, *file_markers] if part)
if not content and not media_paths:
return
try:
await self._handle_message(
sender_id=sender_id,
chat_id=chat_id,
content=content,
media=media_paths,
metadata={
"slack": {
"event": event,
"thread_ts": thread_ts,
"channel_type": channel_type,
},
},
session_key=session_key,
)
except Exception:
self.logger.exception("Error handling message from {}", sender_id)
async def _download_slack_file(self, file_info: dict[str, Any]) -> tuple[str | None, str]:
"""Download a Slack private file to the local media directory."""
file_id = str(file_info.get("id") or "file")
name = str(
file_info.get("name")
or file_info.get("title")
or file_info.get("id")
or "slack-file"
)
marker_type = "image" if str(file_info.get("mimetype") or "").startswith("image/") else "file"
marker = f"[{marker_type}: {name}]"
url = str(file_info.get("url_private_download") or file_info.get("url_private") or "")
if not url:
return None, self._download_failure_marker(marker_type, name, "missing download url")
if not self.config.bot_token:
return None, self._download_failure_marker(marker_type, name, "missing bot token")
filename = safe_filename(f"{file_id}_{name}")
path = Path(get_media_dir("slack")) / filename
try:
async with httpx.AsyncClient(timeout=SLACK_DOWNLOAD_TIMEOUT, follow_redirects=True) as client:
response = await client.get(
url,
headers={"Authorization": f"Bearer {self.config.bot_token}"},
)
response.raise_for_status()
if self._looks_like_html_download(response):
raise ValueError("Slack returned HTML instead of file content")
path.write_bytes(response.content)
return str(path), marker
except Exception as e:
self.logger.warning("Failed to download file {}: {}", file_id, e)
return None, self._download_failure_marker(marker_type, name, "download failed")
@staticmethod
def _download_failure_marker(marker_type: str, name: str, reason: str) -> str:
return (
f"[{marker_type}: {name}: {reason}; not available to nanobot. "
"Check Slack files:read scope, reinstall the Slack app, and ensure the bot can access the file.]"
)
@staticmethod
def _looks_like_html_download(response: httpx.Response) -> bool:
content_type = response.headers.get("content-type", "").lower()
if "text/html" in content_type:
return True
preview = response.content[:256].lstrip().lower()
return preview.startswith(_HTML_DOWNLOAD_PREFIXES)
async def _on_block_action(self, client: SocketModeClient, req: SocketModeRequest) -> None:
"""Handle button clicks from inline action buttons."""
await client.send_socket_mode_response(SocketModeResponse(envelope_id=req.envelope_id))
payload = req.payload or {}
actions = payload.get("actions") or []
if not actions:
return
value = str(actions[0].get("value") or "")
user_info = payload.get("user") or {}
sender_id = str(user_info.get("id") or "")
channel_info = payload.get("channel") or {}
chat_id = str(channel_info.get("id") or "")
if not sender_id or not chat_id or not value:
return
message_info = payload.get("message") or {}
thread_ts = message_info.get("thread_ts") or message_info.get("ts")
channel_type = self._infer_channel_type(chat_id)
if not self._is_allowed(sender_id, chat_id, channel_type):
return
session_key = f"slack:{chat_id}:{thread_ts}" if thread_ts else None
try:
await self._handle_message(
sender_id=sender_id,
chat_id=chat_id,
content=value,
metadata={"slack": {"thread_ts": thread_ts, "channel_type": channel_type}},
session_key=session_key,
)
except Exception:
self.logger.exception("Error handling button click from {}", sender_id)
async def _with_thread_context(
self,
text: str,
*,
chat_id: str,
channel_type: str,
thread_ts: str | None,
raw_thread_ts: str | None,
current_ts: str | None,
) -> str:
"""Include thread history the first time the bot is pulled into a Slack thread."""
del channel_type # DM and channel threads are both fetched via conversations.replies
if (
not self.config.include_thread_context
or not self._web_client
or not raw_thread_ts
or not thread_ts
or current_ts == thread_ts
):
return text
key = f"{chat_id}:{thread_ts}"
if key in self._thread_context_attempted:
return text
if len(self._thread_context_attempted) >= self._THREAD_CONTEXT_CACHE_LIMIT:
self._thread_context_attempted.clear()
self._thread_context_attempted.add(key)
try:
response = await self._web_client.conversations_replies(
channel=chat_id,
ts=thread_ts,
limit=max(1, self.config.thread_context_limit),
)
except Exception as e:
self.logger.warning("thread context unavailable for {}: {}", key, e)
return text
lines = self._format_thread_context(
response.get("messages", []),
current_ts=current_ts,
)
if not lines:
return text
return "Slack thread context before this mention:\n" + "\n".join(lines) + f"\n\nCurrent message:\n{text}"
def _format_thread_context(self, messages: list[dict[str, Any]], *, current_ts: str | None) -> list[str]:
lines: list[str] = []
for item in messages:
if item.get("ts") == current_ts:
continue
if item.get("subtype"):
continue
sender = str(item.get("user") or item.get("bot_id") or "unknown")
is_bot = self._bot_user_id is not None and sender == self._bot_user_id
label = "bot" if is_bot else f"<@{sender}>"
text = str(item.get("text") or "").strip()
if not text:
continue
text = self._strip_bot_mention(text)
if len(text) > 500:
text = text[:500] + ""
lines.append(f"- {label}: {text}")
return lines
@staticmethod
def _build_button_blocks(text: str, buttons: list[list[str]]) -> list[dict[str, Any]]:
"""Build Slack Block Kit blocks with action buttons."""
blocks: list[dict[str, Any]] = [
{"type": "section", "text": {"type": "mrkdwn", "text": text[:3000]}},
]
elements = []
for row in buttons:
for label in row:
elements.append({
"type": "button",
"text": {"type": "plain_text", "text": label[:75]},
"value": label[:75],
"action_id": f"btn_{label[:50]}",
})
if elements:
blocks.append({"type": "actions", "elements": elements[:25]})
return blocks
async def _update_react_emoji(self, chat_id: str, ts: str | None) -> None:
"""Remove the in-progress reaction and optionally add a done reaction."""
if not self._web_client or not ts:
return
try:
await self._web_client.reactions_remove(
channel=chat_id,
name=self.config.react_emoji,
timestamp=ts,
)
except Exception as e:
self.logger.debug("reactions_remove failed: {}", e)
if self.config.done_emoji:
try:
await self._web_client.reactions_add(
channel=chat_id,
name=self.config.done_emoji,
timestamp=ts,
)
except Exception as e:
self.logger.debug("done reaction failed: {}", e)
def _is_allowed(self, sender_id: str, chat_id: str, channel_type: str) -> bool:
if channel_type == "im":
if not self.config.dm.enabled:
return False
if self.config.dm.policy == "allowlist":
return sender_id in self.config.dm.allow_from or is_approved(self.name, sender_id)
return True
# Group / channel messages
if self.config.group_policy == "allowlist":
return chat_id in self.config.group_allow_from
return True
def _should_respond_in_channel(self, event_type: str, text: str, chat_id: str) -> bool:
if self.config.group_policy == "open":
return True
if self.config.group_policy == "mention":
if event_type == "app_mention":
return True
return self._bot_user_id is not None and f"<@{self._bot_user_id}>" in text
if self.config.group_policy == "allowlist":
return chat_id in self.config.group_allow_from
return False
def is_allowed(self, sender_id: str) -> bool:
# Slack needs channel-aware policy checks, so _on_socket_request and
# _on_block_action call _is_allowed before handing off to BaseChannel.
return True
@staticmethod
def _infer_channel_type(chat_id: str) -> str:
if chat_id.startswith("D"):
return "im"
if chat_id.startswith("G"):
return "group"
return "channel"
def _strip_bot_mention(self, text: str) -> str:
if not text or not self._bot_user_id:
return text
return re.sub(rf"<@{re.escape(self._bot_user_id)}>\s*", "", text).strip()
_TABLE_RE = re.compile(r"(?m)^\|.*\|$(?:\n\|[\s:|-]*\|$)(?:\n\|.*\|$)*")
_CODE_FENCE_RE = re.compile(r"```[\s\S]*?```")
_INLINE_CODE_RE = re.compile(r"`[^`]+`")
_LEFTOVER_BOLD_RE = re.compile(r"\*\*(.+?)\*\*")
_LEFTOVER_HEADER_RE = re.compile(r"^#{1,6}\s+(.+)$", re.MULTILINE)
_BARE_URL_RE = re.compile(r"(?<![|<])(https?://\S+)")
@classmethod
def _to_mrkdwn(cls, text: str) -> str:
"""Convert Markdown to Slack mrkdwn, including tables."""
if not text:
return ""
text = cls._TABLE_RE.sub(cls._convert_table, text)
return cls._fixup_mrkdwn(slackify_markdown(text)).rstrip("\n")
@classmethod
def _fixup_mrkdwn(cls, text: str) -> str:
"""Fix markdown artifacts that slackify_markdown misses."""
code_blocks: list[str] = []
def _save_code(m: re.Match) -> str:
code_blocks.append(m.group(0))
return f"\x00CB{len(code_blocks) - 1}\x00"
text = cls._CODE_FENCE_RE.sub(_save_code, text)
text = cls._INLINE_CODE_RE.sub(_save_code, text)
text = cls._LEFTOVER_BOLD_RE.sub(r"*\1*", text)
text = cls._LEFTOVER_HEADER_RE.sub(r"*\1*", text)
text = cls._BARE_URL_RE.sub(lambda m: m.group(0).replace("&amp;", "&"), text)
for i, block in enumerate(code_blocks):
text = text.replace(f"\x00CB{i}\x00", block)
return text
@staticmethod
def _convert_table(match: re.Match) -> str:
"""Convert a Markdown table to a Slack-readable list."""
lines = [ln.strip() for ln in match.group(0).strip().splitlines() if ln.strip()]
if len(lines) < 2:
return match.group(0)
headers = [h.strip() for h in lines[0].strip("|").split("|")]
start = 2 if re.fullmatch(r"[|\s:\-]+", lines[1]) else 1
rows: list[str] = []
for line in lines[start:]:
cells = [c.strip() for c in line.strip("|").split("|")]
cells = (cells + [""] * len(headers))[: len(headers)]
parts = [f"**{headers[i]}**: {cells[i]}" for i in range(len(headers)) if cells[i]]
if parts:
rows.append(" · ".join(parts))
return "\n".join(rows)