mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-04-08 20:23:41 +00:00
feat(whatsapp): add voice message transcription via OpenAI/Groq Whisper
Automatically transcribe WhatsApp voice messages using OpenAI Whisper
or Groq. Configurable via transcriptionProvider and transcriptionApiKey.
Config:
"whatsapp": {
"transcriptionProvider": "openai",
"transcriptionApiKey": "sk-..."
}
This commit is contained in:
parent
e7d371ec1e
commit
db50dd8a77
@ -37,13 +37,17 @@ class BaseChannel(ABC):
|
||||
self._running = False
|
||||
|
||||
async def transcribe_audio(self, file_path: str | Path) -> str:
|
||||
"""Transcribe an audio file via Groq Whisper. Returns empty string on failure."""
|
||||
"""Transcribe an audio file via Whisper (OpenAI or Groq). Returns empty string on failure."""
|
||||
if not self.transcription_api_key:
|
||||
return ""
|
||||
try:
|
||||
from nanobot.providers.transcription import GroqTranscriptionProvider
|
||||
|
||||
provider = GroqTranscriptionProvider(api_key=self.transcription_api_key)
|
||||
provider_name = getattr(self, "transcription_provider", "groq")
|
||||
if provider_name == "openai":
|
||||
from nanobot.providers.transcription import OpenAITranscriptionProvider
|
||||
provider = OpenAITranscriptionProvider(api_key=self.transcription_api_key)
|
||||
else:
|
||||
from nanobot.providers.transcription import GroqTranscriptionProvider
|
||||
provider = GroqTranscriptionProvider(api_key=self.transcription_api_key)
|
||||
return await provider.transcribe(file_path)
|
||||
except Exception as e:
|
||||
logger.warning("{}: audio transcription failed: {}", self.name, e)
|
||||
|
||||
@ -26,6 +26,8 @@ class WhatsAppConfig(Base):
|
||||
bridge_url: str = "ws://localhost:3001"
|
||||
bridge_token: str = ""
|
||||
allow_from: list[str] = Field(default_factory=list)
|
||||
transcription_provider: str = "openai" # openai or groq
|
||||
transcription_api_key: str = ""
|
||||
group_policy: Literal["open", "mention"] = "open" # "open" responds to all, "mention" only when @mentioned
|
||||
|
||||
|
||||
@ -51,6 +53,8 @@ class WhatsAppChannel(BaseChannel):
|
||||
self._ws = None
|
||||
self._connected = False
|
||||
self._processed_message_ids: OrderedDict[str, None] = OrderedDict()
|
||||
self.transcription_api_key = config.transcription_api_key
|
||||
self.transcription_provider = config.transcription_provider
|
||||
|
||||
async def login(self, force: bool = False) -> bool:
|
||||
"""
|
||||
@ -203,11 +207,16 @@ class WhatsAppChannel(BaseChannel):
|
||||
|
||||
# Handle voice transcription if it's a voice message
|
||||
if content == "[Voice Message]":
|
||||
logger.info(
|
||||
"Voice message received from {}, but direct download from bridge is not yet supported.",
|
||||
sender_id,
|
||||
)
|
||||
content = "[Voice Message: Transcription not available for WhatsApp yet]"
|
||||
if media_paths:
|
||||
logger.info("Transcribing voice message from {}...", sender_id)
|
||||
transcription = await self.transcribe_audio(media_paths[0])
|
||||
if transcription:
|
||||
content = transcription
|
||||
logger.info("Transcribed voice from {}: {}...", sender_id, transcription[:50])
|
||||
else:
|
||||
content = "[Voice Message: Transcription failed]"
|
||||
else:
|
||||
content = "[Voice Message: Audio not available]"
|
||||
|
||||
# Extract media paths (images/documents/videos downloaded by the bridge)
|
||||
media_paths = data.get("media") or []
|
||||
|
||||
@ -1,8 +1,36 @@
|
||||
"""Voice transcription provider using Groq."""
|
||||
"""Voice transcription providers (Groq and OpenAI Whisper)."""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class OpenAITranscriptionProvider:
|
||||
"""Voice transcription provider using OpenAI's Whisper API."""
|
||||
|
||||
def __init__(self, api_key: str | None = None):
|
||||
self.api_key = api_key or os.environ.get("OPENAI_API_KEY")
|
||||
self.api_url = "https://api.openai.com/v1/audio/transcriptions"
|
||||
|
||||
async def transcribe(self, file_path: str | Path) -> str:
|
||||
if not self.api_key:
|
||||
return ""
|
||||
path = Path(file_path)
|
||||
if not path.exists():
|
||||
return ""
|
||||
try:
|
||||
import httpx
|
||||
async with httpx.AsyncClient() as client:
|
||||
with open(path, "rb") as f:
|
||||
files = {"file": (path.name, f), "model": (None, "whisper-1")}
|
||||
headers = {"Authorization": f"Bearer {self.api_key}"}
|
||||
response = await client.post(
|
||||
self.api_url, headers=headers, files=files, timeout=60.0,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json().get("text", "")
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
import httpx
|
||||
from loguru import logger
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user