构建实时语音驱动RAG系统:从架构设计到生产部署的全栈指南

多数团队都在谈论构建对话代理,但真正将其打磨到可用于生产环境却充满挑战。语音系统尤为严苛:延迟会立刻显现,检索失误会破坏信任,而语音、语言与响应之间的任何断层,都会让用户体验大打折扣。本文将带你构建一个“声音原生”的对话代理,实现端到端自然流畅的交互。你将了解语音如何在实时流程中依次经过转写、推理、检索与合成,以及各层如何协同工作以保持体验的连贯性。阅读本部分后,你将掌握设计、构建与运营一个能在真实场景中随用量扩展的语音代理的核心知识。

构建实时语音驱动RAG系统:从架构设计到生产部署的全栈指南

架构概览

用户语音是系统管道的起点。Pipecat 框架负责管理完整的对话闭环,协调语音在不同服务间的流转。Deepgram 首先进行监听,并以低延迟将语音转换为文本,为后续的LLM处理提供干净的转写结果。运行在 Venice 上的LLM负责解释用户意图、决定执行动作,并在需要时触发知识检索。LlamaIndex 在模型与 Qdrant 向量数据库之间进行协调,以结构化且高效的方式编排检索过程;它向 Qdrant 发送语义查询,后者作为承载领域知识的向量搜索引擎。检索到的上下文返回给LLM,使其能够生成有根据的回答。答案生成后,交由 Cartesia 将文本合成为清晰、自然的语音。最后,Pipecat 将音频回传给用户,完成交互循环。整个架构设计聚焦于保持实时性、可靠性,并满足人类对自然对话的期待。

  • Pipecat:一个用于构建语音与多模态对话式AI应用的开源框架,在一条管道中编排STT、LLM、TTS等组件。
  • Deepgram:一个语音转文本(STT)API,能够在实时条件下以高精度和低延迟将口语音频转换为文本。
  • Cartesia:一个文本转语音(TTS)API,可将文本转换为富有表现力且逼真的人声音频。

实现细节

项目采用以下目录结构:

.
└── pipecat_playground
├── bot.py
├── pipecat_playground.iml
├── prompts.py
├── requirements.txt
├── .env
└── tool.py

所有与AI模型和向量搜索引擎相关的环境变量均在 .env 文件中集中管理:

“`
DEEPGRAM_API_KEY=bc22d3b8f6…
OPENAI_API_KEY=DVfuZhM… # venice api key
CARTESIA_API_KEY=sk_car_…

QDRANT_URL=http://localhost:6333
QDRANT_API_KEY=th3s3c…
COLLECTION_NAME=leukemia_knowledge

HF_API_KEY=hf_sfmuAPx…
GEMINI_API_KEY=AIzaSyDk9qzI8T…
ANTHROPIC_API_KEY=sk-ant-api03-…

TOKENIZERS_PARALLELISM=false
“`

所有系统提示词统一在 prompts.py 文件中管理:

SYSTEM_PROMPT = (
"You are an expert Leukemia Doctor | Medical Oncologist "
"specialized in Acute myeloid leukemia(AML), Acute lymphoblastic leukemia(ALL), "
"Chronic lymphocytic leukemia(CLL), Acute leukemia(AL). Your primary role is to provide accurate and detailed "
"answers to questions *about* Leukemia and summarize details "
"or the overall content of the provided context about leukemia."
"IMPORTANT: Do not use external knowledge, forget your previous knowledge or don't make up information. "
"Only use the context provided to you. If you don't find the answer politely say you don't know the answer. "
"-------------------------------------------------------------------"
"MOST IMPORTANT: Never answer anything outside Leukemia. "
"If you get anything out of Leukemia just respond you are not a specialist and cannot answer that question even though you have the context just ignore it."
)

使用 LlamaIndex 编排的上下文检索由如下工具函数专门负责。

“`python
import os
from typing import List

from llama_index.core.indices import VectorStoreIndex
from llama_index.core.schema import NodeWithScore
from llama_index.core.settings import Settings
from llama_index.core.storage import StorageContext
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.embeddings.fastembed import FastEmbedEmbedding
from dotenv import load_dotenv, find_dotenv
from pipecat.services.llm_service import FunctionCallParams
from qdrant_client import QdrantClient

load_dotenv(find_dotenv())

Settings.embed_model = FastEmbedEmbedding(model_name=”BAAI/bge-base-en-v1.5″)

qdrant_connector = QdrantClient(url=”http://localhost:6333″, api_key=”th3s3cr3tk3y”)

async def retrieve_leukemia_knowledge_base(params: FunctionCallParams, query: str):
“””
Use this tool to retrieve knowledge about leukemia.

Args:
    query: str, user query to search the knowledge base
"""
try:
    if not query:
        await params.result_callback({"error": "No query provided"})
        return

    if qdrant_connector.collection_exists(collection_name=os.environ.get("COLLECTION_NAME")):
        vector_store = QdrantVectorStore(
            client=qdrant_connector,
            collection_name=os.environ.get("COLLECTION_NAME")
        )
        storage_context = StorageContext.from_defaults(vector_store=vector_store)
        index = VectorStoreIndex.from_vector_store(
            vector_store=vector_store,
            storage_context=storage_context
        )
        retriever = index.as_retriever(top_k=15)

        context = ''
        nodes: List[NodeWithScore] = retriever.retrieve(query)
        for node in nodes:
            context += node.text

        print(f"Context retrieved: {context}")
        await params.result_callback(context)
    else:
        await params.result_callback({
            "error": "Collection does not exist in Qdrant"
        })
except Exception as e:
    print(f"Error in retrieve_leukemia_knowledge_base: {str(e)}")
    await params.result_callback({
        "error": f"Failed to get context: {str(e)}"
    })

“`

上述函数的关键在于其参数:第一个参数是 FunctionCallParams,它让我们能够通过回调将检索到的上下文发送回 Pipecat 的 pipeline。

本节将完成 Pipecat 语音机器人的必要导入与初始化配置。由于 SileroVAD 与 LocalSmartTurnAnalyzerV3 等模型在首次运行时加载耗时较长,代码通过分阶段日志为用户提供清晰的加载进度反馈。核心步骤包括:加载环境变量中的各类 API Key,导入自定义组件(如系统提示词与知识库检索工具),并创建一个 ToolsSchema,将白血病知识库检索函数包装为可供 LLM 调用的工具。

“`python
import os
from typing import Any

from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.tools_schema import ToolsSchema

from prompts import SYSTEM_PROMPT
from tool import retrieve_leukemia_knowledge_base

print(“🚀 Starting Pipecat bot…”)
print(“⏳ Loading models and imports (20 seconds, first run only)n”)

logger.info(“Loading Local Smart Turn Analyzer V3…”)
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3

logger.info(“✅ Local Smart Turn Analyzer V3 loaded”)
logger.info(“Loading Silero VAD model…”)
from pipecat.audio.vad.silero import SileroVADAnalyzer

logger.info(“✅ Silero VAD model loaded”)

from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame

logger.info(“Loading pipeline components…”)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.ollama.llm import OLLamaLLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams

logger.info(“✅ All components loaded successfully!”)

load_dotenv(override=True)

tools = ToolsSchema(standard_tools=[retrieve_leukemia_knowledge_base])
“`

本节将组装对话式 AI 的处理流水线(pipeline)。该流水线整合了多个核心服务:Deepgram 负责语音转文本(STT),LLM 通过 Venice.ai 的 API 处理查询并调用已注册的工具函数,Cartesia 则使用英式女声进行语音合成(TTS)。数据在流水线中的流动顺序如下:

  1. 传输输入:接收原始音频流。
  2. RTVI 处理:进行实时语音活动检测与端点检测。
  3. STT 转写:将语音转换为文本。
  4. 用户上下文聚合:将用户输入文本整合到会话历史中。
  5. LLM 处理:生成文本响应。
  6. TTS 合成:将文本响应转换为语音。
  7. 传输输出:输出合成的音频流。
  8. 助手上下文聚合:将助手的响应整合到会话历史中,以维护连贯的对话上下文。

PipelineTask 封装了整个流水线,并启用了指标采集与中断处理功能,允许用户在机器人回答过程中自然插话。

“`python
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f”Starting bot”)

stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

tts = CartesiaTTSService(
    api_key=os.getenv("CARTESIA_API_KEY"),
    voice_id="71a7ad14-091c-4e8e-a314-022ece01c121",  # British Reading Lady
)

llm = OpenAILLMService(base_url="https://api.venice.ai/api/v1", model="openai-gpt-oss-120b")

llm.register_direct_function(
    retrieve_leukemia_knowledge_base,
    cancel_on_interruption=False,
)

messages: Any = [
    {
        "role": "system",
        "content": SYSTEM_PROMPT
    },
]

context = LLMContext(messages=messages, tools=tools)
context_aggregator = LLMContextAggregatorPair(context)

rtvi = RTVIProcessor(config=RTVIConfig(config=[]))

pipeline = Pipeline(
    [
        transport.input(),
        rtvi,
        stt,
        context_aggregator.user(),
        llm,
        tts,
        transport.output(),
        context_aggregator.assistant(),
    ]
)

task = PipelineTask(
    pipeline,
    params=PipelineParams(
        enable_metrics=True,
        enable_usage_metrics=True,
        allow_interruptions=True,
    ),
    observers=[RTVIObserver(rtvi)],
)

“`

本节定义了语音机器人的连接生命周期与传输配置。当客户端连接时,机器人会自动以系统消息的方式将自己介绍为白血病领域专家,并触发一次LLM运行以生成问候语。客户端断开连接时,相关任务会被正确取消以释放资源。

传输层支持 Daily 与 WebRTC 协议,并配置为双向音频流。系统使用 SileroVAD 进行语音活动检测,停顿阈值设为 0.2 秒,并由 LocalSmartTurnAnalyzerV3 管理智能对话轮次切换。主入口通过 Pipecat 框架的 runner 统一管理机器人的生命周期,包括处理信号中断与实现优雅关停。

“`python
@transport.event_handler(“on_client_connected”)
async def on_client_connected(transport, client):
logger.info(f”Client connected”)
messages.append(
{
“role”: “system”,
“content”: “Say hello and briefly introduce yourself that you are a Leukemia Specialist ”
“and you can help from the domain of Leukemia.”
}
)
await task.queue_frames([LLMRunFrame()])

@transport.event_handler(“on_client_disconnected”)
async def on_client_disconnected(transport, client):
logger.info(f”Client disconnected”)
await task.cancel()

runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)

async def bot(runner_args: RunnerArguments):
“””Main bot entry point for the bot starter.”””

transport_params = {
    "daily": lambda: DailyParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
        turn_analyzer=LocalSmartTurnAnalyzerV3(),
    ),
    "webrtc": lambda: TransportParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
        turn_analyzer=LocalSmartTurnAnalyzerV3(),
    ),
}

transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)

if name == “main“:
from pipecat.runner.run import main
main()
“`

“`python
import os
from typing import Any

from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.tools_schema import ToolsSchema

from prompts import SYSTEM_PROMPT
from tool import retrieve_leukemia_knowledge_base

print(“🚀 Starting Pipecat bot…”)
print(“⏳ Loading models and imports (20 seconds, first run only)n”)

logger.info(“Loading Local Smart Turn Analyzer V3…”)
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3

logger.info(“✅ Local Smart Turn Analyzer V3 loaded”)
logger.info(“Loading Silero VAD model…”)
from pipecat.audio.vad.silero import SileroVADAnalyzer

logger.info(“✅ Silero VAD model loaded”)

from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame

logger.info(“Loading pipeline components…”)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.ollama.llm import OLLamaLLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams

logger.info(“✅ All components loaded successfully!”)

load_dotenv(override=True)

query = “How does the blast percentage differ between acute and chronic leukemia?”

query = “What chromosomal translocation defines CML, and what genes are involved?”

Create a tools schema, passing your function directly to it

tools = ToolsSchema(standard_tools=[retrieve_leukemia_knowledge_base])

async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f”Starting bot”)

stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

tts = CartesiaTTSService(
    api_key=os.getenv("CARTESIA_API_KEY"),
    voice_id="71a7ad14-091c-4e8e-a314-022ece01c121",  # British Reading Lady
)

# llm = OpenAILLMService(model="gpt-4o")
llm = OpenAILLMService(base_url="https://api.venice.ai/api/v1", model="openai-gpt-oss-120b")
# llm = OLLamaLLMService(model="granite3.2-vision:latest")

# Register the function
llm.register_direct_function(
    retrieve_leukemia_knowledge_base,
    cancel_on_interruption=False,  # Cancel if user interrupts (default: True)
)

messages: Any = [
    {
        "role": "system",
        "content": SYSTEM_PROMPT
    },
]

context = LLMContext(messages=messages, tools=tools)
context_aggregator = LLMContextAggregatorPair(context)

rtvi = RTVIProcessor(config=RTVIConfig(config=[]))

pipeline = Pipeline(
    [
        transport.input(),  # Transport user input
        rtvi,  # RTVI processor
        stt,
        context_aggregator.user(),  # User responses
        llm,  # LLM
        tts,  # TTS
        transport.output(),  # Transport bot output
        context_aggregator.assistant(),  # Assistant spoken responses
    ]
)

task = PipelineTask(
    pipeline,
    params=PipelineParams(
        enable_metrics=True,
        enable_usage_metrics=True,
        allow_interruptions=True,
    ),
    observers=[RTVIObserver(rtvi)],
)

@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
    logger.info(f"Client connected")
    # Kick off the conversation.
    messages.append(
        {
            "role": "system",
            "content": "Say hello and briefly introduce yourself that you are a Leukemia Specialist "
                       "and you can help from the domain of Leukemia."
        }
    )
    await task.queue_frames([LLMRunFrame()])

@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
    logger.info(f"Client disconnected")
    await task.cancel()

runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)

await runner.run(task)

async def bot(runner_args: RunnerArguments):
“””Main bot entry point for the bot starter.”””

transport_params = {
    "daily": lambda: DailyParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
        turn_analyzer=LocalSmartTurnAnalyzerV3(),
    ),
    "webrtc": lambda: TransportParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
        turn_analyzer=LocalSmartTurnAnalyzerV3(),
    ),
}

transport = await create_transport(runner_args, transport_params)

await run_bot(transport, runner_args)

if name == “main“:
from pipecat.runner.run import main

main()

“`

结论

语音代理已不再是演示品或实验品,它们正逐渐成为用户期望中可靠、快速且自然可用的真实交互界面。本文所阐述的架构揭示了一个核心理念:构建此类系统,与其盲目追逐最新模型,不如将重心放在语音、推理、检索与合成等核心组件之间的清晰编排与高效协同上。

当系统中的每一层都职责清晰,并与其他层紧密集成时,我们才能构建出让用户真正信赖的流畅对话体验。这种以架构和集成为先的方法,为您提供了一份务实的蓝图,旨在帮助您将语音代理从概念原型平稳推进至生产级部署,并使其能够伴随您的数据、模型与具体应用场景的演进而持续优化与成长。


关注“鲸栖”小程序,掌握最新AI资讯

本文来自网络搜集,不代表鲸林向海立场,如有侵权,联系删除。转载请注明出处:http://www.itsolotime.com/archives/16372

(0)
上一篇 2025年12月30日 上午7:22
下一篇 2025年12月30日 上午11:24

相关推荐

  • 8个Python库:让机器学习从入门到精通只需一杯咖啡时间

    你能在一杯咖啡还没喝完时搭建出你的第一个模型。 先说一句可能听起来有点“逆风”的话: 机器学习并不难。难的是在不必要的复杂性里摸爬滚打。 多数人被机器学习劝退,不是因为不够聪明,而是因为在他们还没开始训练模型前,整个生态就已经把微积分、矩阵符号和数千页的文档砸过来了。 在用 Python 写代码 4 年多、并教过不少开发者(包括非常资深的)之后,我得出一个结…

    2026年1月23日
    2600
  • 澳洲放羊大叔的AI编程革命:5行Bash脚本引爆硅谷,睡觉时AI自动完成5万美元项目

    最近,一个名为“Ralph”的AI编程技巧在全球技术社区迅速走红。其核心魔力在于:用户无需手动编写代码,只需设定目标,AI便能在后台自动完成整个开发流程,甚至在你睡觉时完成工作。令人惊讶的是,如此强大的工具,其核心代码仅由5行Bash脚本构成。 在Ralph迅速走红之后,Claude Code官方也推出了一套Ralph Wiggum插件。该插件通过“停止钩子…

    2026年1月23日
    4600
  • Attention机制暗藏偏置陷阱:上海大学团队提出去偏修正公式,提升多模态大模型剪枝效率

    Attention机制暗藏偏置陷阱:上海大学团队提出去偏修正公式,提升多模态大模型剪枝效率 近年来,视觉-语言模型在多模态理解任务中取得了显著进展。为了降低推理成本,模型通常通过语言到视觉的注意力来衡量视觉标记与文本之间的相关性,并据此进行视觉标记剪枝。 然而,一个长期被忽视的问题是:注意力本身是否真的能够作为“语义重要性”的可靠指标? 上海大学曾丹团队在最…

    5天前
    4300
  • GraphMind:构建具备深度推理能力的全栈Agentic RAG架构

    GraphMind:构建具备深度推理能力的全栈Agentic RAG架构 本文介绍一套可用于生产环境的完整架构,该架构利用GraphRAG流水线将复杂的非结构化数据转化为高准确度、可检索的知识。我们将阐述Chonkie如何通过语义切分保留上下文,Neo4j如何同时存储向量和图表示以实现双重检索,以及LiteLLM如何编排推理流程。同时,文章将解释系统如何通过…

    2026年1月1日
    10800
  • KlingAvatar2.0:时空级联框架与共推理导演系统,让数字人拥有生动灵魂与5分钟长视频生成能力

    还记得几个月前那个能随着音乐节拍自然舞动的 KlingAvatar 数字人吗?现在,它迎来了史诗级进化。 近日,快手可灵团队正式发布了 KlingAvatar2.0 技术报告。这一次,数字人不仅能“表演”,更能“生动表达”——它们将拥有更丰富的情感层次、更精准的多角色互动,对复杂文本指令的深度理解能力,以及支持长达 5 分钟的视频生成。目前该模型已经在可灵平…

    2025年12月24日
    16200