nanobot/webui/src/hooks/useNanobotStream.ts

215 lines
6.8 KiB
TypeScript

import { useCallback, useEffect, useRef, useState } from "react";
import { useClient } from "@/providers/ClientProvider";
import type { StreamError } from "@/lib/nanobot-client";
import type {
InboundEvent,
OutboundMedia,
UIImage,
UIMessage,
} from "@/lib/types";
interface StreamBuffer {
/** ID of the assistant message currently receiving deltas. */
messageId: string;
/** Sequence of deltas accumulated in order. */
parts: string[];
}
/**
* Subscribe to a chat by ID. Returns the in-memory message list for the chat,
* a streaming flag, and a ``send`` function. Initial history must be seeded
* separately (e.g. via ``fetchSessionMessages``) since the server only replays
* live events.
*/
/** Payload passed to ``send`` when the user attaches one or more images.
*
* ``media`` is handed to the wire client verbatim; ``preview`` powers the
* optimistic user bubble (blob URLs so the preview appears before the server
* acks the frame). Keeping the two separate lets the bubble re-use the local
* blob URL even after the server persists the file under a different name. */
export interface SendImage {
media: OutboundMedia;
preview: UIImage;
}
export function useNanobotStream(
chatId: string | null,
initialMessages: UIMessage[] = [],
): {
messages: UIMessage[];
isStreaming: boolean;
send: (content: string, images?: SendImage[]) => void;
setMessages: React.Dispatch<React.SetStateAction<UIMessage[]>>;
/** Latest transport-level fault raised since the last ``dismissStreamError``.
* ``null`` when there is nothing to show. */
streamError: StreamError | null;
/** Clear the current ``streamError`` (e.g. after the user dismisses the
* notification or starts a fresh action). */
dismissStreamError: () => void;
} {
const { client } = useClient();
const [messages, setMessages] = useState<UIMessage[]>(initialMessages);
const [isStreaming, setIsStreaming] = useState(false);
const [streamError, setStreamError] = useState<StreamError | null>(null);
const buffer = useRef<StreamBuffer | null>(null);
useEffect(() => {
return client.onError((err) => setStreamError(err));
}, [client]);
const dismissStreamError = useCallback(() => setStreamError(null), []);
// Reset local state when switching chats. ``streamError`` is scoped to the
// send that triggered it, so a chat swap should wipe it out: a stale
// "Message too large" banner on a freshly-opened chat-B would confuse the
// user about which send actually failed (and in which chat).
useEffect(() => {
setMessages(initialMessages);
setIsStreaming(false);
setStreamError(null);
buffer.current = null;
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [chatId]);
useEffect(() => {
if (!chatId) return;
const handle = (ev: InboundEvent) => {
if (ev.event === "delta") {
const id = buffer.current?.messageId ?? crypto.randomUUID();
if (!buffer.current) {
buffer.current = { messageId: id, parts: [] };
setMessages((prev) => [
...prev,
{
id,
role: "assistant",
content: "",
isStreaming: true,
createdAt: Date.now(),
},
]);
setIsStreaming(true);
}
buffer.current.parts.push(ev.text);
const combined = buffer.current.parts.join("");
const targetId = buffer.current.messageId;
setMessages((prev) =>
prev.map((m) => (m.id === targetId ? { ...m, content: combined } : m)),
);
return;
}
if (ev.event === "stream_end") {
if (!buffer.current) {
setIsStreaming(false);
return;
}
const finalId = buffer.current.messageId;
buffer.current = null;
setIsStreaming(false);
setMessages((prev) =>
prev.map((m) =>
m.id === finalId ? { ...m, isStreaming: false } : m,
),
);
return;
}
if (ev.event === "message") {
// Intermediate agent breadcrumbs (tool-call hints, raw progress).
// Attach them to the last trace row if it was the last emitted item
// so a sequence of calls collapses into one compact trace group.
if (ev.kind === "tool_hint" || ev.kind === "progress") {
const line = ev.text;
setMessages((prev) => {
const last = prev[prev.length - 1];
if (last && last.kind === "trace" && !last.isStreaming) {
const merged: UIMessage = {
...last,
traces: [...(last.traces ?? [last.content]), line],
content: line,
};
return [...prev.slice(0, -1), merged];
}
return [
...prev,
{
id: crypto.randomUUID(),
role: "tool",
kind: "trace",
content: line,
traces: [line],
createdAt: Date.now(),
},
];
});
return;
}
// A complete (non-streamed) assistant message. If a stream was in
// flight, drop the placeholder so we don't render the text twice.
const activeId = buffer.current?.messageId;
buffer.current = null;
setIsStreaming(false);
setMessages((prev) => {
const filtered = activeId ? prev.filter((m) => m.id !== activeId) : prev;
return [
...filtered,
{
id: crypto.randomUUID(),
role: "assistant",
content: ev.text,
createdAt: Date.now(),
},
];
});
return;
}
// ``attached`` / ``error`` frames aren't actionable here; the client
// shell handles them separately.
};
const unsub = client.onChat(chatId, handle);
return () => {
unsub();
buffer.current = null;
};
}, [chatId, client]);
const send = useCallback(
(content: string, images?: SendImage[]) => {
if (!chatId) return;
const hasImages = !!images && images.length > 0;
// Text is optional when images are attached — the agent will still see
// the image blocks via ``media`` paths.
if (!hasImages && !content.trim()) return;
const previews = hasImages ? images!.map((i) => i.preview) : undefined;
setMessages((prev) => [
...prev,
{
id: crypto.randomUUID(),
role: "user",
content,
createdAt: Date.now(),
...(previews ? { images: previews } : {}),
},
]);
const wireMedia = hasImages ? images!.map((i) => i.media) : undefined;
client.sendMessage(chatId, content, wireMedia);
},
[chatId, client],
);
return {
messages,
isStreaming,
send,
setMessages,
streamError,
dismissStreamError,
};
}