多数团队都在谈论构建对话代理,但真正将其打磨到可用于生产环境却充满挑战。语音系统尤为严苛:延迟会立刻显现,检索失误会破坏信任,而语音、语言与响应之间的任何断层,都会让用户体验大打折扣。本文将带你构建一个“声音原生”的对话代理,实现端到端自然流畅的交互。你将了解语音如何在实时流程中依次经过转写、推理、检索与合成,以及各层如何协同工作以保持体验的连贯性。阅读本部分后,你将掌握设计、构建与运营一个能在真实场景中随用量扩展的语音代理的核心知识。

架构概览
用户语音是系统管道的起点。Pipecat 框架负责管理完整的对话闭环,协调语音在不同服务间的流转。Deepgram 首先进行监听,并以低延迟将语音转换为文本,为后续的LLM处理提供干净的转写结果。运行在 Venice 上的LLM负责解释用户意图、决定执行动作,并在需要时触发知识检索。LlamaIndex 在模型与 Qdrant 向量数据库之间进行协调,以结构化且高效的方式编排检索过程;它向 Qdrant 发送语义查询,后者作为承载领域知识的向量搜索引擎。检索到的上下文返回给LLM,使其能够生成有根据的回答。答案生成后,交由 Cartesia 将文本合成为清晰、自然的语音。最后,Pipecat 将音频回传给用户,完成交互循环。整个架构设计聚焦于保持实时性、可靠性,并满足人类对自然对话的期待。
- Pipecat:一个用于构建语音与多模态对话式AI应用的开源框架,在一条管道中编排STT、LLM、TTS等组件。
- Deepgram:一个语音转文本(STT)API,能够在实时条件下以高精度和低延迟将口语音频转换为文本。
- Cartesia:一个文本转语音(TTS)API,可将文本转换为富有表现力且逼真的人声音频。
实现细节
项目采用以下目录结构:
.
└── pipecat_playground
├── bot.py
├── pipecat_playground.iml
├── prompts.py
├── requirements.txt
├── .env
└── tool.py
所有与AI模型和向量搜索引擎相关的环境变量均在 .env 文件中集中管理:
“`
DEEPGRAM_API_KEY=bc22d3b8f6…
OPENAI_API_KEY=DVfuZhM… # venice api key
CARTESIA_API_KEY=sk_car_…
QDRANT_URL=http://localhost:6333
QDRANT_API_KEY=th3s3c…
COLLECTION_NAME=leukemia_knowledge
HF_API_KEY=hf_sfmuAPx…
GEMINI_API_KEY=AIzaSyDk9qzI8T…
ANTHROPIC_API_KEY=sk-ant-api03-…
TOKENIZERS_PARALLELISM=false
“`
所有系统提示词统一在 prompts.py 文件中管理:
SYSTEM_PROMPT = (
"You are an expert Leukemia Doctor | Medical Oncologist "
"specialized in Acute myeloid leukemia(AML), Acute lymphoblastic leukemia(ALL), "
"Chronic lymphocytic leukemia(CLL), Acute leukemia(AL). Your primary role is to provide accurate and detailed "
"answers to questions *about* Leukemia and summarize details "
"or the overall content of the provided context about leukemia."
"IMPORTANT: Do not use external knowledge, forget your previous knowledge or don't make up information. "
"Only use the context provided to you. If you don't find the answer politely say you don't know the answer. "
"-------------------------------------------------------------------"
"MOST IMPORTANT: Never answer anything outside Leukemia. "
"If you get anything out of Leukemia just respond you are not a specialist and cannot answer that question even though you have the context just ignore it."
)
使用 LlamaIndex 编排的上下文检索由如下工具函数专门负责。
“`python
import os
from typing import List
from llama_index.core.indices import VectorStoreIndex
from llama_index.core.schema import NodeWithScore
from llama_index.core.settings import Settings
from llama_index.core.storage import StorageContext
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.embeddings.fastembed import FastEmbedEmbedding
from dotenv import load_dotenv, find_dotenv
from pipecat.services.llm_service import FunctionCallParams
from qdrant_client import QdrantClient
load_dotenv(find_dotenv())
Settings.embed_model = FastEmbedEmbedding(model_name=”BAAI/bge-base-en-v1.5″)
qdrant_connector = QdrantClient(url=”http://localhost:6333″, api_key=”th3s3cr3tk3y”)
async def retrieve_leukemia_knowledge_base(params: FunctionCallParams, query: str):
“””
Use this tool to retrieve knowledge about leukemia.
Args:
query: str, user query to search the knowledge base
"""
try:
if not query:
await params.result_callback({"error": "No query provided"})
return
if qdrant_connector.collection_exists(collection_name=os.environ.get("COLLECTION_NAME")):
vector_store = QdrantVectorStore(
client=qdrant_connector,
collection_name=os.environ.get("COLLECTION_NAME")
)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_vector_store(
vector_store=vector_store,
storage_context=storage_context
)
retriever = index.as_retriever(top_k=15)
context = ''
nodes: List[NodeWithScore] = retriever.retrieve(query)
for node in nodes:
context += node.text
print(f"Context retrieved: {context}")
await params.result_callback(context)
else:
await params.result_callback({
"error": "Collection does not exist in Qdrant"
})
except Exception as e:
print(f"Error in retrieve_leukemia_knowledge_base: {str(e)}")
await params.result_callback({
"error": f"Failed to get context: {str(e)}"
})
“`
上述函数的关键在于其参数:第一个参数是 FunctionCallParams,它让我们能够通过回调将检索到的上下文发送回 Pipecat 的 pipeline。
本节将完成 Pipecat 语音机器人的必要导入与初始化配置。由于 SileroVAD 与 LocalSmartTurnAnalyzerV3 等模型在首次运行时加载耗时较长,代码通过分阶段日志为用户提供清晰的加载进度反馈。核心步骤包括:加载环境变量中的各类 API Key,导入自定义组件(如系统提示词与知识库检索工具),并创建一个 ToolsSchema,将白血病知识库检索函数包装为可供 LLM 调用的工具。
“`python
import os
from typing import Any
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from prompts import SYSTEM_PROMPT
from tool import retrieve_leukemia_knowledge_base
print(“🚀 Starting Pipecat bot…”)
print(“⏳ Loading models and imports (20 seconds, first run only)n”)
logger.info(“Loading Local Smart Turn Analyzer V3…”)
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
logger.info(“✅ Local Smart Turn Analyzer V3 loaded”)
logger.info(“Loading Silero VAD model…”)
from pipecat.audio.vad.silero import SileroVADAnalyzer
logger.info(“✅ Silero VAD model loaded”)
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
logger.info(“Loading pipeline components…”)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.ollama.llm import OLLamaLLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
logger.info(“✅ All components loaded successfully!”)
load_dotenv(override=True)
tools = ToolsSchema(standard_tools=[retrieve_leukemia_knowledge_base])
“`
本节将组装对话式 AI 的处理流水线(pipeline)。该流水线整合了多个核心服务:Deepgram 负责语音转文本(STT),LLM 通过 Venice.ai 的 API 处理查询并调用已注册的工具函数,Cartesia 则使用英式女声进行语音合成(TTS)。数据在流水线中的流动顺序如下:
- 传输输入:接收原始音频流。
- RTVI 处理:进行实时语音活动检测与端点检测。
- STT 转写:将语音转换为文本。
- 用户上下文聚合:将用户输入文本整合到会话历史中。
- LLM 处理:生成文本响应。
- TTS 合成:将文本响应转换为语音。
- 传输输出:输出合成的音频流。
- 助手上下文聚合:将助手的响应整合到会话历史中,以维护连贯的对话上下文。
PipelineTask 封装了整个流水线,并启用了指标采集与中断处理功能,允许用户在机器人回答过程中自然插话。
“`python
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f”Starting bot”)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(base_url="https://api.venice.ai/api/v1", model="openai-gpt-oss-120b")
llm.register_direct_function(
retrieve_leukemia_knowledge_base,
cancel_on_interruption=False,
)
messages: Any = [
{
"role": "system",
"content": SYSTEM_PROMPT
},
]
context = LLMContext(messages=messages, tools=tools)
context_aggregator = LLMContextAggregatorPair(context)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(),
rtvi,
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
allow_interruptions=True,
),
observers=[RTVIObserver(rtvi)],
)
“`
本节定义了语音机器人的连接生命周期与传输配置。当客户端连接时,机器人会自动以系统消息的方式将自己介绍为白血病领域专家,并触发一次LLM运行以生成问候语。客户端断开连接时,相关任务会被正确取消以释放资源。
传输层支持 Daily 与 WebRTC 协议,并配置为双向音频流。系统使用 SileroVAD 进行语音活动检测,停顿阈值设为 0.2 秒,并由 LocalSmartTurnAnalyzerV3 管理智能对话轮次切换。主入口通过 Pipecat 框架的 runner 统一管理机器人的生命周期,包括处理信号中断与实现优雅关停。
“`python
@transport.event_handler(“on_client_connected”)
async def on_client_connected(transport, client):
logger.info(f”Client connected”)
messages.append(
{
“role”: “system”,
“content”: “Say hello and briefly introduce yourself that you are a Leukemia Specialist ”
“and you can help from the domain of Leukemia.”
}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler(“on_client_disconnected”)
async def on_client_disconnected(transport, client):
logger.info(f”Client disconnected”)
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
“””Main bot entry point for the bot starter.”””
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(),
),
}
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if name == “main“:
from pipecat.runner.run import main
main()
“`
“`python
import os
from typing import Any
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from prompts import SYSTEM_PROMPT
from tool import retrieve_leukemia_knowledge_base
print(“🚀 Starting Pipecat bot…”)
print(“⏳ Loading models and imports (20 seconds, first run only)n”)
logger.info(“Loading Local Smart Turn Analyzer V3…”)
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
logger.info(“✅ Local Smart Turn Analyzer V3 loaded”)
logger.info(“Loading Silero VAD model…”)
from pipecat.audio.vad.silero import SileroVADAnalyzer
logger.info(“✅ Silero VAD model loaded”)
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
logger.info(“Loading pipeline components…”)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.ollama.llm import OLLamaLLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
logger.info(“✅ All components loaded successfully!”)
load_dotenv(override=True)
query = “How does the blast percentage differ between acute and chronic leukemia?”
query = “What chromosomal translocation defines CML, and what genes are involved?”
Create a tools schema, passing your function directly to it
tools = ToolsSchema(standard_tools=[retrieve_leukemia_knowledge_base])
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f”Starting bot”)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
# llm = OpenAILLMService(model="gpt-4o")
llm = OpenAILLMService(base_url="https://api.venice.ai/api/v1", model="openai-gpt-oss-120b")
# llm = OLLamaLLMService(model="granite3.2-vision:latest")
# Register the function
llm.register_direct_function(
retrieve_leukemia_knowledge_base,
cancel_on_interruption=False, # Cancel if user interrupts (default: True)
)
messages: Any = [
{
"role": "system",
"content": SYSTEM_PROMPT
},
]
context = LLMContext(messages=messages, tools=tools)
context_aggregator = LLMContextAggregatorPair(context)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(), # Transport user input
rtvi, # RTVI processor
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
allow_interruptions=True,
),
observers=[RTVIObserver(rtvi)],
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append(
{
"role": "system",
"content": "Say hello and briefly introduce yourself that you are a Leukemia Specialist "
"and you can help from the domain of Leukemia."
}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
“””Main bot entry point for the bot starter.”””
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(),
),
}
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if name == “main“:
from pipecat.runner.run import main
main()
“`
结论
语音代理已不再是演示品或实验品,它们正逐渐成为用户期望中可靠、快速且自然可用的真实交互界面。本文所阐述的架构揭示了一个核心理念:构建此类系统,与其盲目追逐最新模型,不如将重心放在语音、推理、检索与合成等核心组件之间的清晰编排与高效协同上。
当系统中的每一层都职责清晰,并与其他层紧密集成时,我们才能构建出让用户真正信赖的流畅对话体验。这种以架构和集成为先的方法,为您提供了一份务实的蓝图,旨在帮助您将语音代理从概念原型平稳推进至生产级部署,并使其能够伴随您的数据、模型与具体应用场景的演进而持续优化与成长。
关注“鲸栖”小程序,掌握最新AI资讯
本文来自网络搜集,不代表鲸林向海立场,如有侵权,联系删除。转载请注明出处:http://www.itsolotime.com/archives/16372
