构建实时语音驱动RAG系统:从架构设计到生产部署的全栈指南

多数团队都在谈论构建对话代理,但真正将其打磨到可用于生产环境却充满挑战。语音系统尤为严苛:延迟会立刻显现,检索失误会破坏信任,而语音、语言与响应之间的任何断层,都会让用户体验大打折扣。本文将带你构建一个“声音原生”的对话代理,实现端到端自然流畅的交互。你将了解语音如何在实时流程中依次经过转写、推理、检索与合成,以及各层如何协同工作以保持体验的连贯性。阅读本部分后,你将掌握设计、构建与运营一个能在真实场景中随用量扩展的语音代理的核心知识。

构建实时语音驱动RAG系统:从架构设计到生产部署的全栈指南

架构概览

用户语音是系统管道的起点。Pipecat 框架负责管理完整的对话闭环,协调语音在不同服务间的流转。Deepgram 首先进行监听,并以低延迟将语音转换为文本,为后续的LLM处理提供干净的转写结果。运行在 Venice 上的LLM负责解释用户意图、决定执行动作,并在需要时触发知识检索。LlamaIndex 在模型与 Qdrant 向量数据库之间进行协调,以结构化且高效的方式编排检索过程;它向 Qdrant 发送语义查询,后者作为承载领域知识的向量搜索引擎。检索到的上下文返回给LLM,使其能够生成有根据的回答。答案生成后,交由 Cartesia 将文本合成为清晰、自然的语音。最后,Pipecat 将音频回传给用户,完成交互循环。整个架构设计聚焦于保持实时性、可靠性,并满足人类对自然对话的期待。

  • Pipecat:一个用于构建语音与多模态对话式AI应用的开源框架,在一条管道中编排STT、LLM、TTS等组件。
  • Deepgram:一个语音转文本(STT)API,能够在实时条件下以高精度和低延迟将口语音频转换为文本。
  • Cartesia:一个文本转语音(TTS)API,可将文本转换为富有表现力且逼真的人声音频。

实现细节

项目采用以下目录结构:

.
└── pipecat_playground
├── bot.py
├── pipecat_playground.iml
├── prompts.py
├── requirements.txt
├── .env
└── tool.py

所有与AI模型和向量搜索引擎相关的环境变量均在 .env 文件中集中管理:

“`
DEEPGRAM_API_KEY=bc22d3b8f6…
OPENAI_API_KEY=DVfuZhM… # venice api key
CARTESIA_API_KEY=sk_car_…

QDRANT_URL=http://localhost:6333
QDRANT_API_KEY=th3s3c…
COLLECTION_NAME=leukemia_knowledge

HF_API_KEY=hf_sfmuAPx…
GEMINI_API_KEY=AIzaSyDk9qzI8T…
ANTHROPIC_API_KEY=sk-ant-api03-…

TOKENIZERS_PARALLELISM=false
“`

所有系统提示词统一在 prompts.py 文件中管理:

SYSTEM_PROMPT = (
"You are an expert Leukemia Doctor | Medical Oncologist "
"specialized in Acute myeloid leukemia(AML), Acute lymphoblastic leukemia(ALL), "
"Chronic lymphocytic leukemia(CLL), Acute leukemia(AL). Your primary role is to provide accurate and detailed "
"answers to questions *about* Leukemia and summarize details "
"or the overall content of the provided context about leukemia."
"IMPORTANT: Do not use external knowledge, forget your previous knowledge or don't make up information. "
"Only use the context provided to you. If you don't find the answer politely say you don't know the answer. "
"-------------------------------------------------------------------"
"MOST IMPORTANT: Never answer anything outside Leukemia. "
"If you get anything out of Leukemia just respond you are not a specialist and cannot answer that question even though you have the context just ignore it."
)

使用 LlamaIndex 编排的上下文检索由如下工具函数专门负责。

“`python
import os
from typing import List

from llama_index.core.indices import VectorStoreIndex
from llama_index.core.schema import NodeWithScore
from llama_index.core.settings import Settings
from llama_index.core.storage import StorageContext
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.embeddings.fastembed import FastEmbedEmbedding
from dotenv import load_dotenv, find_dotenv
from pipecat.services.llm_service import FunctionCallParams
from qdrant_client import QdrantClient

load_dotenv(find_dotenv())

Settings.embed_model = FastEmbedEmbedding(model_name=”BAAI/bge-base-en-v1.5″)

qdrant_connector = QdrantClient(url=”http://localhost:6333″, api_key=”th3s3cr3tk3y”)

async def retrieve_leukemia_knowledge_base(params: FunctionCallParams, query: str):
“””
Use this tool to retrieve knowledge about leukemia.

Args:
    query: str, user query to search the knowledge base
"""
try:
    if not query:
        await params.result_callback({"error": "No query provided"})
        return

    if qdrant_connector.collection_exists(collection_name=os.environ.get("COLLECTION_NAME")):
        vector_store = QdrantVectorStore(
            client=qdrant_connector,
            collection_name=os.environ.get("COLLECTION_NAME")
        )
        storage_context = StorageContext.from_defaults(vector_store=vector_store)
        index = VectorStoreIndex.from_vector_store(
            vector_store=vector_store,
            storage_context=storage_context
        )
        retriever = index.as_retriever(top_k=15)

        context = ''
        nodes: List[NodeWithScore] = retriever.retrieve(query)
        for node in nodes:
            context += node.text

        print(f"Context retrieved: {context}")
        await params.result_callback(context)
    else:
        await params.result_callback({
            "error": "Collection does not exist in Qdrant"
        })
except Exception as e:
    print(f"Error in retrieve_leukemia_knowledge_base: {str(e)}")
    await params.result_callback({
        "error": f"Failed to get context: {str(e)}"
    })

“`

上述函数的关键在于其参数:第一个参数是 FunctionCallParams,它让我们能够通过回调将检索到的上下文发送回 Pipecat 的 pipeline。

本节将完成 Pipecat 语音机器人的必要导入与初始化配置。由于 SileroVAD 与 LocalSmartTurnAnalyzerV3 等模型在首次运行时加载耗时较长,代码通过分阶段日志为用户提供清晰的加载进度反馈。核心步骤包括:加载环境变量中的各类 API Key,导入自定义组件(如系统提示词与知识库检索工具),并创建一个 ToolsSchema,将白血病知识库检索函数包装为可供 LLM 调用的工具。

“`python
import os
from typing import Any

from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.tools_schema import ToolsSchema

from prompts import SYSTEM_PROMPT
from tool import retrieve_leukemia_knowledge_base

print(“🚀 Starting Pipecat bot…”)
print(“⏳ Loading models and imports (20 seconds, first run only)n”)

logger.info(“Loading Local Smart Turn Analyzer V3…”)
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3

logger.info(“✅ Local Smart Turn Analyzer V3 loaded”)
logger.info(“Loading Silero VAD model…”)
from pipecat.audio.vad.silero import SileroVADAnalyzer

logger.info(“✅ Silero VAD model loaded”)

from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame

logger.info(“Loading pipeline components…”)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.ollama.llm import OLLamaLLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams

logger.info(“✅ All components loaded successfully!”)

load_dotenv(override=True)

tools = ToolsSchema(standard_tools=[retrieve_leukemia_knowledge_base])
“`

本节将组装对话式 AI 的处理流水线(pipeline)。该流水线整合了多个核心服务:Deepgram 负责语音转文本(STT),LLM 通过 Venice.ai 的 API 处理查询并调用已注册的工具函数,Cartesia 则使用英式女声进行语音合成(TTS)。数据在流水线中的流动顺序如下:

  1. 传输输入:接收原始音频流。
  2. RTVI 处理:进行实时语音活动检测与端点检测。
  3. STT 转写:将语音转换为文本。
  4. 用户上下文聚合:将用户输入文本整合到会话历史中。
  5. LLM 处理:生成文本响应。
  6. TTS 合成:将文本响应转换为语音。
  7. 传输输出:输出合成的音频流。
  8. 助手上下文聚合:将助手的响应整合到会话历史中,以维护连贯的对话上下文。

PipelineTask 封装了整个流水线,并启用了指标采集与中断处理功能,允许用户在机器人回答过程中自然插话。

“`python
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f”Starting bot”)

stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

tts = CartesiaTTSService(
    api_key=os.getenv("CARTESIA_API_KEY"),
    voice_id="71a7ad14-091c-4e8e-a314-022ece01c121",  # British Reading Lady
)

llm = OpenAILLMService(base_url="https://api.venice.ai/api/v1", model="openai-gpt-oss-120b")

llm.register_direct_function(
    retrieve_leukemia_knowledge_base,
    cancel_on_interruption=False,
)

messages: Any = [
    {
        "role": "system",
        "content": SYSTEM_PROMPT
    },
]

context = LLMContext(messages=messages, tools=tools)
context_aggregator = LLMContextAggregatorPair(context)

rtvi = RTVIProcessor(config=RTVIConfig(config=[]))

pipeline = Pipeline(
    [
        transport.input(),
        rtvi,
        stt,
        context_aggregator.user(),
        llm,
        tts,
        transport.output(),
        context_aggregator.assistant(),
    ]
)

task = PipelineTask(
    pipeline,
    params=PipelineParams(
        enable_metrics=True,
        enable_usage_metrics=True,
        allow_interruptions=True,
    ),
    observers=[RTVIObserver(rtvi)],
)

“`

本节定义了语音机器人的连接生命周期与传输配置。当客户端连接时,机器人会自动以系统消息的方式将自己介绍为白血病领域专家,并触发一次LLM运行以生成问候语。客户端断开连接时,相关任务会被正确取消以释放资源。

传输层支持 Daily 与 WebRTC 协议,并配置为双向音频流。系统使用 SileroVAD 进行语音活动检测,停顿阈值设为 0.2 秒,并由 LocalSmartTurnAnalyzerV3 管理智能对话轮次切换。主入口通过 Pipecat 框架的 runner 统一管理机器人的生命周期,包括处理信号中断与实现优雅关停。

“`python
@transport.event_handler(“on_client_connected”)
async def on_client_connected(transport, client):
logger.info(f”Client connected”)
messages.append(
{
“role”: “system”,
“content”: “Say hello and briefly introduce yourself that you are a Leukemia Specialist ”
“and you can help from the domain of Leukemia.”
}
)
await task.queue_frames([LLMRunFrame()])

@transport.event_handler(“on_client_disconnected”)
async def on_client_disconnected(transport, client):
logger.info(f”Client disconnected”)
await task.cancel()

runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)

async def bot(runner_args: RunnerArguments):
“””Main bot entry point for the bot starter.”””

transport_params = {
    "daily": lambda: DailyParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
        turn_analyzer=LocalSmartTurnAnalyzerV3(),
    ),
    "webrtc": lambda: TransportParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
        turn_analyzer=LocalSmartTurnAnalyzerV3(),
    ),
}

transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)

if name == “main“:
from pipecat.runner.run import main
main()
“`

“`python
import os
from typing import Any

from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.tools_schema import ToolsSchema

from prompts import SYSTEM_PROMPT
from tool import retrieve_leukemia_knowledge_base

print(“🚀 Starting Pipecat bot…”)
print(“⏳ Loading models and imports (20 seconds, first run only)n”)

logger.info(“Loading Local Smart Turn Analyzer V3…”)
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3

logger.info(“✅ Local Smart Turn Analyzer V3 loaded”)
logger.info(“Loading Silero VAD model…”)
from pipecat.audio.vad.silero import SileroVADAnalyzer

logger.info(“✅ Silero VAD model loaded”)

from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame

logger.info(“Loading pipeline components…”)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.ollama.llm import OLLamaLLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams

logger.info(“✅ All components loaded successfully!”)

load_dotenv(override=True)

query = “How does the blast percentage differ between acute and chronic leukemia?”

query = “What chromosomal translocation defines CML, and what genes are involved?”

Create a tools schema, passing your function directly to it

tools = ToolsSchema(standard_tools=[retrieve_leukemia_knowledge_base])

async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f”Starting bot”)

stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

tts = CartesiaTTSService(
    api_key=os.getenv("CARTESIA_API_KEY"),
    voice_id="71a7ad14-091c-4e8e-a314-022ece01c121",  # British Reading Lady
)

# llm = OpenAILLMService(model="gpt-4o")
llm = OpenAILLMService(base_url="https://api.venice.ai/api/v1", model="openai-gpt-oss-120b")
# llm = OLLamaLLMService(model="granite3.2-vision:latest")

# Register the function
llm.register_direct_function(
    retrieve_leukemia_knowledge_base,
    cancel_on_interruption=False,  # Cancel if user interrupts (default: True)
)

messages: Any = [
    {
        "role": "system",
        "content": SYSTEM_PROMPT
    },
]

context = LLMContext(messages=messages, tools=tools)
context_aggregator = LLMContextAggregatorPair(context)

rtvi = RTVIProcessor(config=RTVIConfig(config=[]))

pipeline = Pipeline(
    [
        transport.input(),  # Transport user input
        rtvi,  # RTVI processor
        stt,
        context_aggregator.user(),  # User responses
        llm,  # LLM
        tts,  # TTS
        transport.output(),  # Transport bot output
        context_aggregator.assistant(),  # Assistant spoken responses
    ]
)

task = PipelineTask(
    pipeline,
    params=PipelineParams(
        enable_metrics=True,
        enable_usage_metrics=True,
        allow_interruptions=True,
    ),
    observers=[RTVIObserver(rtvi)],
)

@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
    logger.info(f"Client connected")
    # Kick off the conversation.
    messages.append(
        {
            "role": "system",
            "content": "Say hello and briefly introduce yourself that you are a Leukemia Specialist "
                       "and you can help from the domain of Leukemia."
        }
    )
    await task.queue_frames([LLMRunFrame()])

@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
    logger.info(f"Client disconnected")
    await task.cancel()

runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)

await runner.run(task)

async def bot(runner_args: RunnerArguments):
“””Main bot entry point for the bot starter.”””

transport_params = {
    "daily": lambda: DailyParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
        turn_analyzer=LocalSmartTurnAnalyzerV3(),
    ),
    "webrtc": lambda: TransportParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
        turn_analyzer=LocalSmartTurnAnalyzerV3(),
    ),
}

transport = await create_transport(runner_args, transport_params)

await run_bot(transport, runner_args)

if name == “main“:
from pipecat.runner.run import main

main()

“`

结论

语音代理已不再是演示品或实验品,它们正逐渐成为用户期望中可靠、快速且自然可用的真实交互界面。本文所阐述的架构揭示了一个核心理念:构建此类系统,与其盲目追逐最新模型,不如将重心放在语音、推理、检索与合成等核心组件之间的清晰编排与高效协同上。

当系统中的每一层都职责清晰,并与其他层紧密集成时,我们才能构建出让用户真正信赖的流畅对话体验。这种以架构和集成为先的方法,为您提供了一份务实的蓝图,旨在帮助您将语音代理从概念原型平稳推进至生产级部署,并使其能够伴随您的数据、模型与具体应用场景的演进而持续优化与成长。


关注“鲸栖”小程序,掌握最新AI资讯

本文来自网络搜集,不代表鲸林向海立场,如有侵权,联系删除。转载请注明出处:http://www.itsolotime.com/archives/16372

(0)
上一篇 2025年12月30日 上午7:22
下一篇 2025年12月30日 上午11:24

相关推荐

  • Agentic RAG实战指南:六种模式解析与生产级应用

    用真实生产取舍解释六种 Agentic RAG 模式 大多数 RAG 演示在理想环境下运行良好,但一旦面对真实用户,问题便接踵而至:检索到无关上下文、浪费大量 tokens,却依然无法避免幻觉。问题的根源往往不在于模型或检索算法本身。 而在于传统 RAG 对所有查询都采用千篇一律的处理方式。 Agentic RAG 改变了这一范式。系统不再机械地执行检索,而…

    2026年3月1日
    17800
  • JTok:大模型扩展新维度!上海交大提出token-indexed参数,不增算力也能提升性能

    大模型扩展的困境 大模型的发展长期遵循一条铁律:依据Scaling Law堆叠参数和数据,模型性能便会遵循负幂律持续提升。然而,这条道路正变得日益昂贵,因为传统的扩展方式始终无法摆脱一个根本性束缚——参数规模与计算量的深度绑定。 在传统的稠密模型中,扩展逻辑简单直接:加宽网络或加深层数。随之而来的硬伤是:参数规模一旦暴涨,计算量和显存需求便会线性飙升。在高质…

    2026年3月3日
    11000
  • 工具文档质量成AI智能体瓶颈?ICLR 2026新研究:简单文档扩展即可显著提升工具检索性能

    在大模型时代,工具调用(Tool-Use)已成为智能体能力的核心。从代码生成到复杂API调用,大语言模型正在学会使用各类工具。然而,一个日益凸显的现实问题是:工具真的难找。 来自宁波东方理工大学/宁波数字孪生研究院沈晓宇团队的一项研究,在ICLR 2026发表论文《Tools Are Under-Documented: Simple Document Exp…

    4小时前
    2300
  • 大模型流式输出打字机效果的前后端实现

    1. 背景 在使用ChatGPT时,发现输入 prompt 后,页面是逐步给出回复的,起初以为使用了 WebSckets 持久化连接协议,查看其网络请求,发现这个接口的通信方式并非传统的 http 接口或者 WebSockets,而是基于 EventStream 的事件流,像打字机一样,一段一段的返回答案。 ChatGPT 是一个基于深度学习的大型语言模型,…

    2025年10月1日
    62301
  • PostgreSQL向量检索实战解析:生产级应用还是技术炒作?

    一家电商初创公司的工程团队正面临一个典型的技术选型难题。他们的推荐系统需要实现语义搜索,以匹配用户查询与海量商品描述。团队的核心争议在于:是选择 Qdrant 或 Pinecone 这类专用向量数据库,还是采用 pgvector 扩展,将所有数据保留在 PostgreSQL 中? 这并非个例。随着 AI 驱动的搜索与 RAG(检索增强生成)系统在各行业普及,…

    2025年12月3日
    19800