构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解

面向大型数据集、符合行业标准的 Agentic RAG Pipeline 需要基于清晰、可扩展的分层架构进行构建。我们将系统结构化,使得 Agent 能够并行地进行推理、获取上下文、使用工具以及与数据库交互。每一层都承担明确的职责,涵盖从数据摄取、模型服务到 Agent 协调的全过程。这种分层方法有助于系统实现可预测的扩展,同时为终端用户保持较低的响应延迟。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解 Scalable RAG Pipeline (Created by Fareed Khan)

一个围绕 Agent 构建的可扩展 RAG Pipeline,通常包含以下六个核心层:

  1. 数据摄取层:通过文档加载、分块、索引将原始数据转化为结构化知识;可结合 S3、关系型数据库、Ray 进行扩展。
  2. AI 计算层:高效运行大语言模型与嵌入模型,将模型映射到 GPU/CPU 资源,实现低延迟、大规模推理。
  3. Agentic AI 流程层:支持 Agent 推理、查询增强、工作流编排,配合 API、缓存与分布式执行。
  4. 工具与沙箱层:提供安全的计算、搜索与 API 测试环境,不影响生产工作负载。
  5. 基础设施即代码层:自动化部署、网络、集群与自动扩缩容,确保基础设施可复现、可扩展。
  6. 部署与评估层:处理密钥、数据库、集群、监控与日志,保障规模化下的可靠运行。

本文将围绕 Agent 构建上述六层 RAG Pipeline,展示如何搭建一个可扩展的生产级系统。

我们的“大数据”与噪声文档

企业级代码库通常包含多种类型的文档,如演示文稿、技术文档、报告、网页等,其中总会掺杂大量噪声数据。对于 RAG Pipeline 而言,处理噪声数据是使系统在真实环境中更可靠、更精确所必须面对的挑战。

我们将使用 Kubernetes 官方文档作为“真实数据”,同时从 tpn/pdfs 仓库引入 95% 的“噪声数据”。该仓库包含大量与主题无关的随机 PDF 文件,用于模拟真实企业代码库的环境。

通过这种方式,我们能够评估在超高噪声环境下,企业级 RAG Pipeline 的真实表现。

首先,克隆 tpn/pdfs 仓库以获取噪声数据(该仓库包含 1800 多个文档,克隆耗时可能较长):

“`bash

克隆噪声数据仓库

git clone https://github.com/tpn/pdfs.git
“`

该仓库包含 PDF、DOCX、TXT、HTML 等多种文档类型。我们将从中随机抽样 950 个文档:

“`bash

创建 noisy_data 目录

mkdir noisy_data

从 tpn/pdfs 仓库中随机采样 950 个文档

find ./pdfs -type f ( -name “.pdf” -o -name “.docx” -o -name “.txt” -o -name “.html” ) | shuf -n 950 | xargs -I {} cp {} ./noisy_data/
“`

此操作将在 noisy_data 目录下生成 950 份随机采样的文档。

在真实知识来源方面,我们抓取了 Kubernetes 官方开源文档,并将其保存为 PDF、DOCX、TXT、HTML 等多种格式,存放在 true_data 目录中。

接下来,将噪声数据与真实数据合并到一个统一的 data 目录:

“`bash

创建 data 目录

mkdir data

拷贝真实数据

cp -r ./true_data/* ./data/

拷贝噪声数据

cp -r ./noisy_data/* ./data/
“`

最后,验证 data 目录中的文档总数:

“`bash

统计 data 目录总文档数

find ./data -type f ( -name “.pdf” -o -name “.docx” -o -name “.txt” -o -name “.html” ) | wc -l

输出示例

1000
“`

至此,我们在 data 目录中共有 1000 份文档,其中 950 份为噪声文档,50 份为真实文档。

接下来,我们将进入下一步:构建企业级 RAG Pipeline 的代码库。

构建企业级代码库

一个简单的 Agentic RAG Pipeline 代码库可能只包含一个向量数据库、少量 AI 模型和简单的数据摄取脚本。然而,随着系统复杂度的上升,我们需要将整体架构拆分为更小、更易于管理的组件。

建立如下有组织的目录结构:

scalable-rag-core/ # 最小可用的生产级 RAG 系统
├── infra/ # 核心云基础设施
│ ├── terraform/ # 集群、网络、存储
│ └── karpenter/ # 节点自动扩缩(CPU/GPU)

├── deploy/ # Kubernetes 部署层
│ ├── helm/ # 数据库与有状态服务
│ │ ├── qdrant/ # 向量数据库
│ │ └── neo4j/ # 知识图谱
│ ├── ray/ # Ray + Ray Serve(LLM & embeddings)
│ └── ingress/ # API ingress

├── models/ # 模型配置(与基础设施解耦)
│ ├── embeddings/ # Embedding 模型
│ ├── llm/ # LLM 推理配置
│ └── rerankers/ # Cross-encoder rerankers

├── pipelines/ # 离线与异步 RAG 流水线
│ └── ingestion/ # 文档 ingestion 流程
│ ├── loaders/ # PDF / HTML / DOC 加载器
│ ├── chunking/ # Chunking 与元数据
│ ├── embedding/ # Embedding 计算
│ ├── indexing/ # 向量 + 图谱索引
│ └── graph/ # 知识图谱抽取

├── libs/ # 共享核心库
│ ├── schemas/ # 请求/响应 schema
│ ├── retry/ # 弹性与重试
│ └── observability/ # Metrics & tracing

├── services/ # 在线服务层
│ ├── api/ # RAG API
│ │ └── app/
│ │ ├── agents/ # Agentic 编排
│ │ │ └── nodes/ # Planner / Retriever / Responder
│ │ ├── clients/ # Vector DB、Graph DB、Ray 客户端
│ │ ├── cache/ # 语义与响应缓存
│ │ ├── memory/ # 会话记忆
│ │ ├── enhancers/ # Query rewriting, HyDE
│ │ ├── routes/ # Chat & retrieval APIs
│ │ └── tools/ # Vector search, graph search
│ │
│ └── gateway/ # 限流 / API 保护

看起来复杂,但我们先聚焦最重要的目录:

  • deploy/:包含 Ray、ingress controller、密钥管理等组件的部署配置。
  • infra/:使用 Terraform 与 Karpenter 的 IaC 脚本,搭建云资源。
  • pipelines/:ingestion 与任务管理流水线,包括文档加载、chunking、embedding 计算、图谱抽取与索引。
  • services/:主应用服务,包括 API server、gateway 配置和执行不可信代码的 sandbox 环境。

很多组件(如 loaders、chunking)各自拥有子目录,进一步明确边界、提升可维护性。

开发流程管理

在编写 agent 架构之前,第一步是搭建本地开发环境。可扩展项目通常会自动化这一步,从而新成员加入无需重复手动配置。

开发环境通常包含三类内容:

  • .env.example:分享本地开发所需的环境变量。开发者复制为 .env,并按 dev/staging/prod 阶段填写值。
  • Makefile:封装构建、测试、部署等常用命令。
  • docker-compose.yml:定义本地运行整个 RAG pipeline 所需的所有服务(以容器形式)。

创建 .env.example 以共享本地开发的环境变量:

“`

.env.example

复制为 .env,并填写值

— APP SETTINGS —

ENV=dev
LOG_LEVEL=INFO
SECRET_KEY=change_this_to_a_secure_random_string_for_jwt
“`

首先定义基础应用配置,例如环境变量、日志级别和 JWT 密钥。这些配置用于标识应用当前所处的阶段(开发/预发/生产)。

“`bash

— DATABASE (Aurora Postgres) —

DATABASE_URL=postgresql+asyncpg://ragadmin:changeme@localhost:5432/rag_db

— CACHE (Redis) —

REDIS_URL=redis://localhost:6379/0

— VECTOR DB (Qdrant) —

QDRANT_HOST=localhost
QDRANT_PORT=6333
QDRANT_COLLECTION=rag_collection

— GRAPH DB (Neo4j) —

NEO4J_URI=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=password
“`

RAG Pipeline 依赖多种数据存储方案来满足不同的数据管理和追踪需求:

  1. Aurora Postgres:存储聊天历史与元数据。
  2. Redis:缓存高频访问的数据。
  3. Qdrant:作为向量数据库,存储和管理文本嵌入向量。
  4. Neo4j:作为图数据库,存储实体间的复杂关系。

“`bash

— AWS (Infrastructure) —

AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
S3_BUCKET_NAME=rag-platform-docs-dev

— RAY CLUSTER (AI Engines) —

在 K8s 环境中,这些应指向内部 Service DNS

本地开发可能需要端口转发

RAY_LLM_ENDPOINT=http://localhost:8000/llm
RAY_EMBED_ENDPOINT=http://localhost:8000/embed

— OBSERVABILITY —

OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
“`

为应对海量数据,系统采用 AWS S3 作为主文档存储,并使用集群化的 Ray Serving 来托管 AI 模型(如 LLM、Embedding 模型、Reranker 模型),以实现高效的存储与检索。

接下来,创建一个 Makefile 来自动化构建、测试和部署等常用任务:

“`makefile

Makefile

.PHONY: help install dev up down deploy test infra
help:
@echo “RAG Platform Commands:”
@echo ” make install – Install Python dependencies”
@echo ” make dev – Run FastAPI server locally”
@echo ” make up – Start local DBs (Docker)”
@echo ” make down – Stop local DBs”
@echo ” make deploy – Deploy to AWS EKS via Helm”
@echo ” make infra – Apply Terraform”

install:
pip install -r services/api/requirements.txt

本地开发环境

up:
docker-compose up -d
down:
docker-compose down

本地运行 API(热重载)

dev:
uvicorn services.api.main:app –reload –host 0.0.0.0 –port 8000 –env-file .env

基础设施

infra:
cd infra/terraform && terraform init && terraform apply

Kubernetes 部署

deploy:
# 更新依赖
helm dependency update deploy/helm/api
# 安装/升级
helm upgrade –install api deploy/helm/api –namespace default
helm upgrade –install ray-cluster kuberay/ray-cluster -f deploy/ray/ray-cluster.yaml

test:
pytest tests/
“`

Makefile 是可扩展项目中标准的自动化工具。此处定义了 installdevupdowndeployinfratest 等命令,以管理完整的开发工作流。

最后,创建 docker-compose.yml 文件,以便在本地以容器化方式运行整个 RAG pipeline。容器化有助于隔离不同组件并简化依赖管理。

“`yaml

docker-compose.yml

version: ‘3.8’

services:
# 1. Postgres(聊天历史)
postgres:
image: postgres:15-alpine
environment:
POSTGRES_USER: ragadmin
POSTGRES_PASSWORD: changeme
POSTGRES_DB: rag_db
ports:
– “5432:5432”
volumes:
– pg_data:/var/lib/postgresql/data

# 2. Redis(缓存)
redis:
image: redis:7-alpine
ports:
– “6379:6379”

# 3. Qdrant(向量数据库)
qdrant:
image: qdrant/qdrant:v1.7.3
ports:
– “6333:6333”
volumes:
– qdrant_data:/qdrant/storage

# 4. Neo4j(图数据库)
neo4j:
image: neo4j:5.16.0-community
environment:
NEO4J_AUTH: neo4j/password
NEO4J_dbms_memory_pagecache_size: 1G
ports:
– “7474:7474” # HTTP
– “7687:7687” # Bolt
volumes:
– neo4j_data:/data

# 5. MinIO(S3 Mock)- 全离线开发可选
minio:
image: minio/minio
command: server /data
ports:
– “9000:9000”
– “9001:9001”
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin

volumes:
pg_data:
qdrant_data:
neo4j_data:
“`

在小规模项目中,通常使用 pip 或 virtualenv 管理依赖;而对于需要扩展的项目,更推荐使用 Docker 容器来隔离每个组件。在上述 YAML 配置中,我们为每个服务指定了不同的端口,以避免冲突并便于监控,这也是大型项目中的最佳实践。

核心通用工具

在项目结构与开发流程准备就绪后,首先需要建立统一的 ID 生成策略。当用户发送一条聊天消息时,会触发多个并行流程;将这些流程关联起来,有助于跨组件追踪整个会话的相关问题。

这在生产系统中非常常见:根据用户操作(如点击、请求)进行全链路关联,便于后续的监控、调试与追踪。

创建 libs/utils/ids.py 文件,用于为会话、文件上传及 OpenTelemetry 追踪生成唯一 ID。

“`python

libs/utils/ids.py

import uuid
import hashlib

def generate_session_id() -> str:
“””为聊天会话生成标准 UUID”””
return str(uuid.uuid4())

def generate_file_id(content: bytes) -> str:
“””
基于文件内容生成确定性 ID。
防止重复上传同一文件。
“””
return hashlib.md5(content).hexdigest()

def generate_trace_id() -> str:
“””为 OpenTelemetry 追踪生成 ID”””
return uuid.uuid4().hex
“`

同时,我们需要对各个函数的执行时间进行度量,以便进行性能监控与优化。创建 libs/utils/timing.py 文件,分别处理同步与异步函数的计时。

“`python

libs/utils/timing.py

import functools
import time
import logging

logger = logging.getLogger(name)

def measure_time(func):
“””
记录同步函数的执行时间。
“””
@functools.wraps(func)
def wrapper(args, kwargs):
start_time = time.perf_counter()
result = func(
args, **kwargs)
end_time = time.perf_counter()
execution_time = (end_time – start_time) * 1000 # ms
logger.info(f”Function ‘{func.name}’ took {execution_time:.2f} ms”)
return result
return wrapper

def measure_time_async(func):
“””
记录异步函数的执行时间。
“””
@functools.wraps(func)
async def wrapper(args, kwargs):
start_time = time.perf_counter()
result = await func(
args, **kwargs)
end_time = time.perf_counter()
execution_time = (end_time – start_time) * 1000 # ms
logger.info(f”Async Function ‘{func.name}’ took {execution_time:.2f} ms”)
return result
return wrapper
“`

最后,生产级 RAG 系统需要实现重试机制。我们通常采用指数退避策略来处理错误。创建 libs/retry/backoff.py

“`python
def exponential_backoff(max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 10.0):
“””
指数退避 + 抖动 的装饰器。
捕获异常并重试(异步函数)。
“””
def decorator(func):
@wraps(func)
async def wrapper(args, kwargs):
retries = 0
while True:
try:
return await func(
args, **kwargs)
except Exception as e:
if retries >= max_retries:
logger.error(f”Max retries reached for {func.name}: {e}”)
raise e

                # 算法: base * (2 ^ retries) + random_jitter
                # 抖动避免服务器端的 “Thundering Herd” 问题
                delay = min(base_delay * (2 ** retries), max_delay)
                jitter = random.uniform(0, 0.5)
                sleep_time = delay + jitter

                logger.warning(f"Error in {func.__name__}: {e}. Retrying in {sleep_time:.2f}s...")
                await asyncio.sleep(sleep_time)
                retries += 1
    return wrapper
return decorator

“`

延迟计算公式 base * (2 ^ retries) + random_jitter 有助于避免“惊群效应”。

Data Ingestion Layer

RAG 流程的第一步是将文档引入系统。对于小规模应用,可以使用脚本顺序读取文件;而在企业级 RAG 流水线中,数据摄取需要具备高吞吐、异步处理能力,能够同时处理数千个文件,且不能拖垮 API 服务。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解 Data Ingestion Layer (Created by Fareed Khan)

我们使用 Ray Data 将摄取逻辑拆分为分布式流水线。

Ray Data 允许创建有向无环图,并在集群的多节点上并行执行任务。

这使得我们可以独立扩展解析(CPU 密集型)和向量化(GPU 密集型)任务。

文档加载与配置

首先,需要对摄取参数进行集中式配置管理。在生产环境中,将分块大小或数据库集合名称等参数硬编码是极不可靠的做法。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解 Document Processing (Created by Fareed Khan)

创建 pipelines/ingestion/config.yaml 来存放摄取流水线的所有配置:

“`yaml

pipelines/ingestion/config.yaml

chunking:
# 512 tokens 通常是 RAG 的“甜蜜点”(上下文足够、噪声不多)
chunk_size: 512

# 重叠确保分割点处的上下文不丢失
chunk_overlap: 50

# 递归切分的分隔符(段落 -> 句子 -> 词)
separators: [“nn”, “n”, ” “, “”]

embedding:
# Ray Serve 的 endpoint
endpoint: “http://ray-serve-embed:8000/embed”
batch_size: 100

graph:
# 控制 LLM 抽取的速度与成本
concurrency: 10

# true 时严格遵循 schema.py 本体
enforce_schema: true

vector_db:
collection_name: “rag_collection”
distance_metric: “Cosine”
“`

对于 loader:企业系统中的 PDF 文件通常体积庞大。将 100MB 的 PDF 直接读入内存可能导致 Kubernetes 工作节点内存溢出(OOM)。解决方案是使用临时文件配合 unstructured 库,以磁盘空间换取内存,具体实现见 pipelines/ingestion/loaders/pdf.py

“`python

pipelines/ingestion/loaders/pdf.py

import tempfile
from unstructured.partition.pdf import partition_pdf

def parse_pdf_bytes(file_bytes: bytes, filename: str):
“””
使用临时文件解析 PDF,降低内存压力。
“””
text_content = “”
# 使用磁盘而非 RAM,防止大文件导致 worker 崩溃
with tempfile.NamedTemporaryFile(suffix=”.pdf”, delete=True) as tmp_file:
tmp_file.write(file_bytes)
tmp_file.flush()

    # 'hi_res' 策略:OCR + 布局分析
    elements = partition_pdf(filename=tmp_file.name, strategy="hi_res")

    for el in elements:
        text_content += str(el) + "n"
return text_content, {"filename": filename, "type": "pdf"}

“`

对于其他格式,则使用轻量级解析器。Word 文档解析器位于 pipelines/ingestion/loaders/docx.py

“`python

pipelines/ingestion/loaders/docx.py

import docx
import io

def parse_docx_bytes(file_bytes: bytes, filename: str):
“””解析 .docx,提取文本与简单表格”””
doc = docx.Document(io.BytesIO(file_bytes))
full_text = []

for para in doc.paragraphs:
    if para.text.strip():
        full_text.append(para.text)

return "nn".join(full_text), {"filename": filename, "type": "docx"}

“`

HTML 解析器位于 pipelines/ingestion/loaders/html.py:其核心是去除 script 和 style 标签,避免 CSS/JS 代码污染向量化内容。

“`python

pipelines/ingestion/loaders/html.py

from bs4 import BeautifulSoup

def parse_html_bytes(file_bytes: bytes, filename: str):
“””清理脚本/样式,提取纯文本”””
soup = BeautifulSoup(file_bytes, “html.parser”)

# 去除干扰元素
for script in soup(["script", "style", "meta"]):
    script.decompose()

return soup.get_text(separator="n"), {"filename": filename, "type": "html"}

“`

Chunking 与知识图谱

提取出原始文本后,需要进行转换。我们在 pipelines/ingestion/chunking/splitter.py 中定义文本分割器,将文本切分为 512 个 token 的片段,这是许多嵌入模型的标准输入限制。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解 Chunking and KG (Created by Fareed Khan)

“`python

pipelines/ingestion/chunking/splitter.py

from langchain.text_splitter import RecursiveCharacterTextSplitter

def split_text(text: str, chunk_size: int = 512, overlap: int = 50):
“””将文本切成重叠 chunk,保留边界处上下文”””
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=overlap,
separators=[“nn”, “n”, “.”, ” “, “”]
)
chunks = splitter.create_documents([text])

# 为 Ray pipeline 映射为字典格式
return [{"text": c.page_content, "metadata": {"chunk_index": i}} for i, c in enumerate(chunks)]

“`

pipelines/ingestion/chunking/metadata.py 中增强元数据。在分布式系统中,去重是必要步骤,因此我们为每个文本块生成内容哈希:

“`python

pipelines/ingestion/chunking/metadata.py

import hashlib
import datetime

def enrich_metadata(base_metadata: dict, content: str) -> dict:
“””增加哈希与时间戳,用于去重与新鲜度追踪”””
return {
**base_metadata,
“chunk_hash”: hashlib.md5(content.encode(‘utf-8’)).hexdigest(),
“ingested_at”: datetime.datetime.utcnow().isoformat()
}
“`

接下来进行 GPU 工作——生成向量嵌入。不应在数据摄取脚本内直接加载模型(冷启动慢),而是调用 Ray Serve 服务端点。这样,摄取作业只需向常驻的模型服务发送 HTTP 请求。创建 pipelines/ingestion/embedding/compute.py

“`python

pipelines/ingestion/embedding/compute.py

import httpx

class BatchEmbedder:
“””Ray Actor:对文本批量调用 Embedding Service”””
def init(self):
# 指向 K8s 内部服务 DNS
self.endpoint = “http://ray-serve-embed:8000/embed”
self.client = httpx.Client(timeout=30.0)

def __call__(self, batch):
    """将一批文本发往 GPU 服务"""
    response = self.client.post(
        self.endpoint,
        json={"text": batch["text"], "task_type": "document"}
    )
    batch["vector"] = response.json()["embeddings"]
    return batch

“`

同时抽取知识图谱。为保持图谱结构清晰,在 pipelines/ingestion/graph/schema.py 中定义严格的模式,否则大语言模型容易产生随机的关系类型(幻觉)。

“`python

pipelines/ingestion/graph/schema.py

from typing import Literal

限制 LLM 仅使用这些实体/关系

VALID_NODE_LABELS = Literal[“Person”, “Organization”, “Location”, “Concept”, “Product”]

VALID_RELATION_TYPES = Literal[“WORKS_FOR”, “LOCATED_IN”, “RELATES_TO”, “PART_OF”]

class GraphSchema:
@staticmethod
def get_system_prompt() -> str:
return f”Extract nodes/edges. Allowed Labels: {VALID_NODE_LABELS.args}…”
“`

pipelines/ingestion/graph/extractor.py 中应用该模式,利用 LLM 理解文本结构(而非仅依赖语义相似度):

“`python

pipelines/ingestion/graph/extractor.py

import httpx
import json
from typing import Dict, Any
from pipelines.ingestion.graph.schema import GraphSchema

class GraphExtractor:
“””
用于图谱抽取的 Ray Actor。
调用内部 LLM Service 抽取实体。
“””
def init(self):
# 指向内部 Ray Serve LLM endpoint(K8s DNS)
self.llm_endpoint = “http://ray-serve-llm:8000/llm/chat”
self.client = httpx.Client(timeout=60.0) # 需要较长推理超时

def __call__(self, batch: Dict[str, Any]) -> Dict[str, Any]:
    """
    处理一批文本 chunk。
    """
    nodes_list = []
    edges_list = []

    for text in batch["text"]:
        try:
            # 1. 构造 Prompt
            prompt = f"""
            {GraphSchema.get_system_prompt()}

            Input Text:
            {text}
            """

            # 2. 调用 LLM(例如 Llama-3-70B)
            response = self.client.post(
                self.llm_endpoint,
                json={
                    "messages": [{"role": "user", "content": prompt}],
                    "temperature": 0.0,  # 确保确定性输出
                    "max_tokens": 1024
                }
            )
            response.raise_for_status()

            # 3. 解析 JSON 输出
            content = response.json()["choices"][0]["message"]["content"]
            graph_data = json.loads(content)

            # 4. 聚合结果
            nodes_list.append(graph_data.get("nodes", []))
            edges_list.append(graph_data.get("edges", []))

        except Exception as e:
            # 记录错误但不中断流水线;为该 chunk 返回空图
            print(f"Graph extraction failed for chunk: {e}")
            nodes_list.append([])
            edges_list.append([])

    batch["graph_nodes"] = nodes_list
    batch["graph_edges"] = edges_list
    return batch

“`

高吞吐索引

大规模 RAG 系统通常采用批量写入而非逐条记录写入,这能显著降低 GPU/CPU 开销与内存占用。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解

向量索引使用 pipelines/ingestion/indexing/qdrant.py,连接 Qdrant 集群并执行原子 upsert 操作:

“`python

pipelines/ingestion/indexing/qdrant.py

from qdrant_client import QdrantClient
from qdrant_client.http import models
import uuid

class QdrantIndexer:
“””以批量 upsert 写入 Qdrant”””
def init(self):
self.client = QdrantClient(host=”qdrant-service”, port=6333)

def write(self, batch):
    points = [
        models.PointStruct(id=str(uuid.uuid4()), vector=row["vector"], payload=row["metadata"])
        for row in batch if "vector" in row
    ]
    self.client.upsert(collection_name="rag_collection", points=points)

“`

图谱索引通过 pipelines/ingestion/indexing/neo4j.py 实现,利用 Cypher 的 MERGE 语句确保操作的幂等性,即使重复执行数据摄取也不会产生重复的节点:

“`python

pipelines/ingestion/indexing/neo4j.py

from neo4j import GraphDatabase

class Neo4jIndexer:
“””使用幂等 MERGE 写入图数据”””
def init(self):
self.driver = GraphDatabase.driver(“bolt://neo4j-cluster:7687”, auth=(“neo4j”, “pass”))
def write(self, batch):
with self.driver.session() as session:
# 扁平化 batch,并以单事务执行,提升性能
session.execute_write(self._merge_graph_data, batch)
“`

基于 Ray 的事件驱动工作流

最后,将这些组件组合起来。目标是让文件读取、分块(chunking)和向量嵌入(embedding)能够在不同的 CPU/GPU 节点上并行执行。创建 pipelines/ingestion/main.py 作为编排“指挥官”,使用 Ray Data 构建一个惰性执行的 DAG(有向无环图):

“`python

pipelines/ingestion/main.py

import ray
from pipelines.ingestion.embedding.compute import BatchEmbedder
from pipelines.ingestion.indexing.qdrant import QdrantIndexer

def main(bucket_name: str, prefix: str):
“””
主编排流程
“””
# 1. 使用 Ray Data 从 S3 读取(二进制)
ds = ray.data.read_binary_files(
paths=f”s3://{bucket_name}/{prefix}”,
include_paths=True
)

# 2. 解析 & Chunk(Map)
chunked_ds = ds.map_batches(
    process_batch,
    batch_size=10, # 每个 worker 一次处理 10 个文件
    num_cpus=1
)

# 3. 分叉 A:向量 Embedding(GPU 密集)
vector_ds = chunked_ds.map_batches(
    BatchEmbedder,
    concurrency=5,      # 5 个并发 embedder
    num_gpus=0.2,       # 每个 embedder 分配少量 GPU,重负载由 Ray Serve 处理
    batch_size=100      # 100 个 chunk 一批向量化
)

# 4. 分叉 B:图谱抽取(LLM 密集)
graph_ds = chunked_ds.map_batches(
    GraphExtractor,
    concurrency=10,
    num_gpus=0.5,       # 需要较强 LLM 推理性能
    batch_size=5
)

# 5. 写入(索引)
vector_ds.write_datasource(QdrantIndexer())
graph_ds.write_datasource(Neo4jIndexer())

print("Ingestion Job Completed Successfully.")

“`

在 Kubernetes 上运行此作业时,需要在 pipelines/jobs/ray_job.yaml 中指定运行时环境及其依赖:

“`yaml

pipelines/jobs/ray_job.yaml

entrypoint: “python pipelines/ingestion/main.py”
runtime_env:
working_dir: “./”
pip: [“boto3”, “qdrant-client”, “neo4j”, “langchain”, “unstructured”]
“`

在企业级架构中,我们通常不会手动触发作业,而是采用“事件驱动”模式:当文件被上传到 S3 存储桶时,由 AWS Lambda 函数触发 Ray 作业。相关逻辑见 pipelines/jobs/s3_event_handler.py

“`python

pipelines/jobs/s3_event_handler.py

from ray.job_submission import JobSubmissionClient

def handle_s3_event(event, context):
“””由 S3 上传触发 -> 提交 Ray Job”””
client = JobSubmissionClient(“http://rag-ray-cluster-head-svc:8265″)
client.submit_job(
entrypoint=f”python pipelines/ingestion/main.py {bucket} {key}”,
runtime_env={“working_dir”: “./”}
)
“`

为了测试整个流程,可以使用 scripts/bulk_upload_s3.py 脚本,通过多线程将准备好的噪声数据批量上传至 S3:

“`python

scripts/bulk_upload_s3.py

from concurrent.futures import ThreadPoolExecutor

def upload_directory(dir_path, bucket_name):
“””高性能多线程 S3 上传器”””
with ThreadPoolExecutor(max_workers=10) as executor:
# 将本地文件映射为 S3 上传任务
executor.map(upload_file, files_to_upload)
“`

至此,数据摄取层(Data Ingestion Layer)的构建已经完成。通过为底层的 Kubernetes 集群增加节点,该系统可以轻松扩展到处理数百万文档的规模。

AI Compute Layer

在构建了数据摄取管道之后,我们需要一个能够处理复杂推理任务的“计算层”。在单体应用中,直接在 Web 服务器内加载大型语言模型(LLM)或许可行,但在企业级 RAG 平台中,这种做法会带来严重的性能瓶颈和扩展性问题。

将 70B 参数量的模型加载到 Web 服务器内会极大拖累系统吞吐量,使得水平扩展几乎不可能实现。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解

因此,我们需要将应用服务(如 FastAPI)与 AI 模型推理服务解耦。本设计采用 Ray Serve 将模型托管为独立的微服务,使其能够根据 GPU 资源状况和请求流量进行自动伸缩。

模型配置与硬件映射

在生产环境中,绝不推荐在代码中硬编码模型参数。我们需要一套灵活的配置系统,以便能够无缝切换模型、调整量化策略、修改批处理大小等。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解

例如,在 models/llm/llama-70b.yaml 中定义主力 LLM 配置:使用 Llama-3-70B-Instruct 模型。其 FP16 格式约需 140GB 显存,因此采用 AWQ 量化技术,使其能够在更经济的 GPU 配置上运行。

许多公司的实践表明,应将精力更多放在数据质量与智能体(Agent)逻辑上,而非执着于选择“最终使用哪一个特定的大模型”。

“`yaml

models/llm/llama-70b.yaml

model_config:
model_id: “meta-llama/Meta-Llama-3-70B-Instruct”
quantization: “awq”
max_model_len: 8192
max_num_seqs: 128
gpu_memory_utilization: 0.90
tensor_parallel_size: 4
stop_token_ids: [128001, 128009]
“`

注意 tensor_parallel_size: 4 是一项企业级配置,表示需要将模型权重切分到 4 块 GPU 上进行张量并行推理。

同时,可以保留一个较小的模型配置 models/llm/llama-7b.yaml,用于查询重写(query rewriting)或摘要生成(summarization)等对成本敏感的任务:

“`yaml

models/llm/llama-7b.yaml

model_config:
model_id: “meta-llama/Meta-Llama-3-8B-Instruct”
quantization: “awq”
max_model_len: 8192
max_num_seqs: 256
gpu_memory_utilization: 0.85
tensor_parallel_size: 1
stop_token_ids: [128001, 128009]
“`

检索侧的嵌入模型配置于 models/embeddings/bge-m3.yaml。BGE-M3 模型具备稠密检索、稀疏检索和多语言检索能力,非常适合全球化平台:

“`yaml

models/embeddings/bge-m3.yaml

model_config:
model_id: “BAAI/bge-m3”
batch_size: 32
normalize_embeddings: true
dtype: “float16”
max_seq_length: 8192
“`

为了进一步提升检索准确率,引入重排序器(reranker)。配置 models/rerankers/bge-reranker.yaml,在将文档送入 LLM 前对 Top-K 结果进行重排,可显著降低模型幻觉:

“`yaml

models/rerankers/bge-reranker.yaml

model_config:
model_id: “BAAI/bge-reranker-v2-m3”
dtype: “float16”
max_length: 512
batch_size: 16
“`

使用 vLLM 与 Ray 进行模型服务

标准的 HuggingFace Transformers Pipeline 在高并发生产环境下往往效率不足。我们采用 vLLM 推理引擎,其核心的 PagedAttention 技术能极大提升吞吐量。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解

“`python

services/api/app/models/vllm_engine.py

from ray import serve
from vllm import AsyncLLMEngine, EngineArgs, SamplingParams
from transformers import AutoTokenizer
import os

@serve.deployment(autoscaling_config={“min_replicas”: 1, “max_replicas”: 10}, ray_actor_options={“num_gpus”: 1})
class VLLMDeployment:
def init(self):
model_id = os.getenv(“MODEL_ID”, “meta-llama/Meta-Llama-3-70B-Instruct”)
self.tokenizer = AutoTokenizer.from_pretrained(model_id)
args = EngineArgs(
model=model_id,
quantization=”awq”,
gpu_memory_utilization=0.90,
max_model_len=8192
)
self.engine = AsyncLLMEngine.from_engine_args(args)

async def __call__(self, request):
    body = await request.json()
    messages = body.get("messages", [])
    prompt = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    sampling_params = SamplingParams(
        temperature=body.get("temperature", 0.7),
        max_tokens=body.get("max_tokens", 1024),
        stop_token_ids=[self.tokenizer.eos_token_id, self.tokenizer.convert_tokens_to_ids("<|eot_id|>")]
    )
    request_id = str(os.urandom(8).hex())
    results_generator = self.engine.generate(prompt, sampling_params, request_id)
    final_output = None
    async for request_output in results_generator:
        final_output = request_output
    text_output = final_output.outputs[0].text
    return {"choices": [{"message": {"content": text_output, "role": "assistant"}}]}

app = VLLMDeployment.bind()
“`

@serve.deployment 装饰器将类的生命周期交由 Ray Serve 管理。autoscaling_config 是实现生产级弹性的关键配置,它允许服务在流量激增时自动扩容(最多至 10 个副本),并在空闲时缩容以优化资源成本。

部署 Embedding 与 Re-Ranker 模型

与 LLM 不同,Embedding 模型通常不需要复杂的自回归生成,但对批量推理的效率要求极高。例如,当 50 个用户并发进行检索时,系统应能通过一次 GPU 前向传播同时编码所有查询,而非串行处理 50 次。

以下代码在 services/api/app/models/embedding_engine.py 中实现了一个高效的 Embedding 服务。注意 ray_actor_options={"num_gpus": 0.5} 的配置,它允许两个 Embedding 模型副本共享同一块 GPU,从而显著节省硬件成本。

“`python

services/api/app/models/embedding_engine.py

from ray import serve
from sentence_transformers import SentenceTransformer
import os
import torch

@serve.deployment(
num_replicas=1,
ray_actor_options={“num_gpus”: 0.5} # 共享 GPU
)
class EmbedDeployment:
def init(self):
model_name = “BAAI/bge-m3″
self.model = SentenceTransformer(model_name, device=”cuda”)
self.model = torch.compile(self.model)

async def __call__(self, request):
    body = await request.json()
    texts = body.get("text")
    task_type = body.get("task_type", "document")
    if isinstance(texts, str):
        texts = [texts]
    embeddings = self.model.encode(texts, batch_size=32, normalize_embeddings=True)
    return {"embeddings": embeddings.tolist()}

app = EmbedDeployment.bind()
“`

该部署可同时服务于文档入库(ingestion)时的文档向量化与查询时的查询向量化任务。通过 torch.compile 对模型进行图优化,可以进一步挖掘 GPU 的计算性能。

异步内部服务客户端

API 网关层需要与上述部署在 Ray Serve 中的模型服务进行通信。由于这些模型已作为独立的微服务运行,通信通过 HTTP 协议完成。为了不阻塞 FastAPI 的高并发处理能力,必须使用异步客户端进行调用。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解 Async Calls (Created by Fareed Khan)

创建 services/api/app/clients/ray_llm.py,包含连接池与重试逻辑:

“`python

services/api/app/clients/ray_llm.py

import httpx
import logging
import backoff
from typing import List, Dict, Optional
from services.api.app.config import settings

logger = logging.getLogger(name)

class RayLLMClient:
“””
带连接池的异步客户端
“””
def init(self):
self.endpoint = settings.RAY_LLM_ENDPOINT
self.client: Optional[httpx.AsyncClient] = None

async def start(self):
    """应用启动时初始化"""
    limits = httpx.Limits(max_keepalive_connections=20, max_connections=50)
    self.client = httpx.AsyncClient(timeout=120.0, limits=limits)
    logger.info("Ray LLM Client initialized.")

async def close(self):
    if self.client:
        await self.client.aclose()

@backoff.on_exception(backoff.expo, httpx.HTTPError, max_tries=3)
async def chat_completion(self, messages: List[Dict], temperature: float = 0.7, json_mode: bool = False) -> str:
    if not self.client:
        raise RuntimeError("Client not initialized. Call start() first.")
    payload = {"messages": messages, "temperature": temperature, "max_tokens": 1024}
    response = await self.client.post(self.endpoint, json=payload)
    response.raise_for_status()
    return response.json()["choices"][0]["message"]["content"]

llm_client = RayLLMClient()
“`

backoff 库确保了在网络抖动或 Ray Serve 服务繁忙时,请求不会直接失败,而是采用指数退避策略进行重试。

Embedding 客户端 services/api/app/clients/ray_embed.py

“`python

services/api/app/clients/ray_embed.py

import httpx
from services.api.app.config import settings

class RayEmbedClient:
“””
Ray Serve Embedding Service 的客户端
“””
async def embed_query(self, text: str) -> list[float]:
async with httpx.AsyncClient() as client:
response = await client.post(
settings.RAY_EMBED_ENDPOINT,
json={“text”: text, “task_type”: “query”}
)
response.raise_for_status()
return response.json()[“embedding”]

async def embed_documents(self, texts: list[str]) -> list[list[float]]:
    async with httpx.AsyncClient(timeout=60.0) as client:
        response = await client.post(
            settings.RAY_EMBED_ENDPOINT,
            json={"text": texts, "task_type": "document"}
        )
        response.raise_for_status()
        return response.json()["embeddings"]

embed_client = RayEmbedClient()
“`

通过上述客户端,API 代码可以将 70B 大模型的调用简化为异步函数调用,完全屏蔽了底层 GPU 管理与分布式推理的复杂性。

接下来,我们将构建 Agentic Pipeline,将基础的 RAG 升级为具备决策能力的 Agentic RAG,使其能够判断何时调用何种模型来回答用户问题。

Agentic AI Pipeline

至此,我们已经拥有了数据摄取流水线和分布式模型服务。一个简单的 RAG 应用可能只是一个“检索 -> 生成”的线性流程。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解

然而,对于企业级的 Agentic 平台而言,这种线性链条过于脆弱:

当用户转换话题、要求进行数学计算或表达含糊不清时,线性流程极易出错。

因此,我们使用 FastAPI 与 LangGraph 构建一个“事件驱动的智能体”。该系统能够对用户意图进行“推理”:通过循环、纠错、动态选择工具,并异步处理成千上万个 WebSocket 连接。

API 基础与可观测性

首先,需要定义环境与安全基线。企业级 API 不能是一个“黑盒”,我们需要结构化的日志记录与链路追踪,以定位为何某次请求耗时 5 秒而非 500 毫秒。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解

services/api/requirements.txt 中声明依赖,包括 FastAPI、LangGraph、OpenTelemetry 等核心库:

“`txt

services/api/requirements.txt

Core Framework

fastapi==0.109.0
uvicorn[standard]==0.27.0
pydantic==2.6.0
pydantic-settings==2.1.0
simpleeval==0.9.13 # Safe math evaluation

Async Database & Cache

sqlalchemy==2.0.25
asyncpg==0.29.0
redis==5.0.1

AI & LLM Clients

openai==1.10.0
anthropic==0.8.0
tiktoken==0.5.2
sentence-transformers==2.3.1
transformers==4.37.0

Graph & Vector DBs

neo4j==5.16.0
qdrant-client==1.7.3

Agent Framework

langchain==0.1.5
langgraph==0.0.21

Observability & Ops

opentelemetry-api==1.22.0
opentelemetry-sdk==1.22.0
opentelemetry-exporter-otlp==1.22.0
prometheus-client==0.19.0
python-json-logger==2.0.7
backoff==2.2.1

Security

python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.6

Utilities

boto3==1.34.34
httpx==0.26.0
tenacity==8.2.3
“`

接着,在 services/api/app/config.py 中使用 Pydantic Settings 对数据库 URL、API 密钥等关键配置进行启动时校验,确保应用在启动阶段就能快速发现配置问题(Fail Fast):

“`python

services/api/app/config.py

from pydantic_settings import BaseSettings
from typing import Optional

class Settings(BaseSettings):
“””
应用配置:自动读取环境变量(大小写不敏感)
“””
ENV: str = “prod”
LOG_LEVEL: str = “INFO”
DATABASE_URL: str
REDIS_URL: str
QDRANT_HOST: str = “qdrant-service”
QDRANT_PORT: int = 6333
QDRANT_COLLECTION: str = “rag_collection”
NEO4J_URI: str = “bolt://neo4j-cluster:7687”
NEO4J_USER: str = “neo4j”
NEO4J_PASSWORD: str
AWS_REGION: str = “us-east-1”
S3_BUCKET_NAME: str
RAY_LLM_ENDPOINT: str = “http://llm-service:8000/llm”
RAY_EMBED_ENDPOINT: str = “http://embed-service:8000/embed”
JWT_SECRET_KEY: str
JWT_ALGORITHM: str = “HS256”

class Config:
    env_file = ".env"

settings = Settings()
“`

为日志接入结构化 JSON 格式 services/api/app/logging.py

“`python

services/api/app/logging.py

import logging
import json
import sys
from datetime import datetime

class JSONFormatter(logging.Formatter):
“””
将日志格式化为 JSON:包含时间戳、级别、消息等
“””
def format(self, record):
log_record = {
“timestamp”: datetime.utcnow().isoformat(),
“level”: record.levelname,
“logger”: record.name,
“message”: record.getMessage(),
“module”: record.module,
“line”: record.lineno
}
if record.exc_info:
log_record[“exception”] = self.formatException(record.exc_info)
if hasattr(record, “request_id”):
log_record[“request_id”] = record.request_id
return json.dumps(log_record)

def setup_logging():
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JSONFormatter())
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
if root_logger.handlers:
root_logger.handlers = []
root_logger.addHandler(handler)
logging.getLogger(“uvicorn.access”).disabled = True
logging.getLogger(“httpx”).setLevel(logging.WARNING)

setup_logging()
“`

启用分布式追踪 services/api/app/observability.py

“`python

services/api/app/observability.py

from fastapi import FastAPI
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from libs.observability.tracing import configure_tracing

def setup_observability(app: FastAPI):
configure_tracing(service_name=”rag-api-service”)
FastAPIInstrumentor.instrument_app(app)
“`

实现 JWT 校验中间件 services/api/app/auth/jwt.py

“`python

services/api/app/auth/jwt.py

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from services.api.app.config import settings
import time

oauth2_scheme = OAuth2PasswordBearer(tokenUrl=”token”)
async def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=”Could not validate credentials”,
headers={“WWW-Authenticate”: “Bearer”},
)
try:
payload = jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
user_id: str = payload.get(“sub”)
role: str = payload.get(“role”, “user”)
if user_id is None:
raise credentials_exception
exp = payload.get(“exp”)
if exp and time.time() > exp:
raise HTTPException(status_code=401, detail=”Token expired”)
return {“id”: user_id, “role”: role, “permissions”: payload.get(“permissions”, [])}
except JWTError:
raise credentials_exception
“`

数据契约 libs/schemas/chat.py

“`python

libs/schemas/chat.py

from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from datetime import datetime

class Message(BaseModel):
role: str # “user”, “assistant”, “system”
content: str
timestamp: datetime = Field(default_factory=datetime.utcnow)

class ChatRequest(BaseModel):
message: str
session_id: Optional[str] = None
stream: bool = True
filters: Optional[Dict[str, Any]] = None

class ChatResponse(BaseModel):
answer: str
session_id: str
citations: List[Dict[str, str]] = [] # [{“source”: “doc.pdf”, “text”: “…”}]
latency_ms: float

class RetrievalResult(BaseModel):
content: str
source: str
score: float
metadata: Dict[str, Any]
“`

异步数据网关

为保障可扩展性,API 应避免阻塞主线程。我们为所有数据库操作采用异步客户端,使单个工作进程能够在等待数据库响应时继续处理其他连接。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解

Redis 客户端 services/api/app/clients/redis.py

“`python

services/api/app/clients/redis.py

import redis.asyncio as redis
from services.api.app.config import settings

class RedisClient:
“””
Redis 连接池单例,用于限流与语义缓存
“””
def init(self):
self.redis = None

async def connect(self):
    if not self.redis:
        self.redis = redis.from_url(
            settings.REDIS_URL,
            encoding="utf-8",
            decode_responses=True
        )

async def close(self):
    if self.redis:
        await self.redis.close()

def get_client(self):
    return self.redis

redis_client = RedisClient()
“`

Qdrant 向量搜索客户端 services/api/app/clients/qdrant.py

“`python

services/api/app/clients/qdrant.py

from qdrant_client import AsyncQdrantClient
from services.api.app.config import settings

class VectorDBClient:
“””
Qdrant 异步客户端
“””
def init(self):
self.client = AsyncQdrantClient(
host=settings.QDRANT_HOST,
port=settings.QDRANT_PORT,
prefer_grpc=True
)

async def search(self, vector: list[float], limit: int = 5):
    return await self.client.search(
        collection_name=settings.QDRANT_COLLECTION,
        query_vector=vector,
        limit=limit,
        with_payload=True
    )

qdrant_client = VectorDBClient()
“`

Neo4j 图搜索 services/api/app/clients/neo4j.py

“`python

services/api/app/clients/neo4j.py

from neo4j import GraphDatabase, AsyncGraphDatabase
from services.api.app.config import settings
import logging

logger = logging.getLogger(name)

class Neo4jClient:
“””
Neo4j 驱动单例(异步)
“””
def init(self):
self._driver = None

def connect(self):
    if not self._driver:
        try:
            self._driver = AsyncGraphDatabase.driver(
                settings.NEO4J_URI,
                auth=(settings.NEO4J_USER, settings.NEO4J_PASSWORD)
            )
            logger.info("Connected to Neo4j successfully.")
        except Exception as e:
            logger.error(f"Failed to connect to Neo4j: {e}")
            raise

async def close(self):
    if self._driver:
        await self._driver.close()

async def query(self, cypher_query: str, parameters: dict = None):
    if not self._driver:
        await self.connect()
    async with self._driver.session() as session:
        result = await session.run(cypher_query, parameters or {})
        return [record.data() async for record in result]

neo4j_client = Neo4jClient()
“`

上下文记忆与语义缓存

Agent 的能力取决于“记忆”。我们使用 Postgres 存储完整对话历史,使 LLM 能回忆多轮上下文。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解

services/api/app/memory/models.py 定义 schema:

“`python

services/api/app/memory/models.py

from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String, Text, DateTime, JSON
from datetime import datetime

Base = declarative_base()

class ChatHistory(Base):
tablename = “chat_history”
id = Column(Integer, primary_key=True, autoincrement=True)
session_id = Column(String(255), index=True, nullable=False)
user_id = Column(String(255), index=True, nullable=False)
role = Column(String(50), nullable=False)
content = Column(Text, nullable=False)
metadata_ = Column(JSON, default={}, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
“`

“`python

services/api/app/memory/postgres.py

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, String, JSON, DateTime, Integer, Text, select
from datetime import datetime
from services.api.app.config import settings

Base = declarative_base()

class ChatHistory(Base):
tablename = “chat_history”
id = Column(Integer, primary_key=True, autoincrement=True)
session_id = Column(String, index=True)
user_id = Column(String, index=True)
role = Column(String)
content = Column(Text)
metadata_ = Column(JSON, default={})
created_at = Column(DateTime, default=datetime.utcnow)

engine = create_async_engine(settings.DATABASE_URL, echo=False)
AsyncSessionLocal = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)

class PostgresMemory:
async def add_message(self, session_id: str, role: str, content: str, user_id: str):
async with AsyncSessionLocal() as session:
async with session.begin():
msg = ChatHistory(session_id=session_id, role=role, content=content, user_id=user_id)
session.add(msg)

async def get_history(self, session_id: str, limit: int = 10):
    async with AsyncSessionLocal() as session:
        result = await session.execute(
            select(ChatHistory)
            .where(ChatHistory.session_id == session_id)
            .order_by(ChatHistory.created_at.desc())
            .limit(limit)
        )
        return result.scalars().all()[::-1]

postgres_memory = PostgresMemory()
“`

为优化成本,我们实现了语义缓存 services/api/app/cache/semantic.py。当语义相近的查询已被处理过时,系统可直接从缓存中返回答案,避免再次调用成本高昂的 70B 模型。

“`python

services/api/app/cache/semantic.py

import json
import logging
from typing import Optional
from services.api.app.clients.ray_embed import embed_client
from services.api.app.clients.qdrant import qdrant_client
from services.api.app.config import settings

logger = logging.getLogger(name)

class SemanticCache:
“””
基于向量相似度的语义缓存
“””
async def get_cached_response(self, query: str, threshold: float = 0.95) -> Optional[str]:
try:
vector = await embed_client.embed_query(query)
results = await qdrant_client.client.search(
collection_name=”semantic_cache”,
query_vector=vector,
limit=1,
with_payload=True,
score_threshold=threshold
)
if results:
logger.info(f”Semantic Cache Hit! Score: {results[0].score}”)
return results[0].payload[“answer”]
except Exception as e:
logger.warning(f”Semantic cache lookup failed: {e}”)
return None

async def set_cached_response(self, query: str, answer: str):
    try:
        vector = await embed_client.embed_query(query)
        import uuid
        from qdrant_client.http import models
        await qdrant_client.client.upsert(
            collection_name="semantic_cache",
            points=[
                models.PointStruct(
                    id=str(uuid.uuid4()),
                    vector=vector,
                    payload={"query": query, "answer": answer}
                )
            ]
        )
    except Exception as e:
        logger.warning(f"Failed to write to semantic cache: {e}")

semantic_cache = SemanticCache()
“`

此设计能显著降低处理常见问题(FAQ)的响应延迟与计算成本。

使用 LangGraph 构建工作流

Agent 的核心是一个“状态机”,它在思考(Thinking)、检索(Retrieving)、使用工具(Using Tools)和回答(Answering)等不同状态间转换。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解 LangGraph Workflow (Created by Fareed Khan)

我们在 services/api/app/agents/state.py 中定义 AgentState

“`python

services/api/app/agents/state.py

from typing import TypedDict, Annotated, List, Union
import operator

class AgentState(TypedDict):
messages: Annotated[List[dict], operator.add]
documents: List[str]
current_query: str
plan: List[str]
“`

“`python

services/api/app/agents/nodes/planner.py

import json
import logging
from services.api.app.agents.state import AgentState
from services.api.app.clients.ray_llm import llm_client

logger = logging.getLogger(name)

SYSTEM_PROMPT = “””
You are a RAG Planning Agent.
Analyze the User Query and Conversation History.
Decide the next step:
1. If the user greets (Hello/Hi), output “direct_answer”.
2. If the user asks a specific question requiring data, output “retrieve”.
3. If the user asks for math/code, output “tool_use”.
Output JSON format ONLY:
{
“action”: “retrieve” | “direct_answer” | “tool_use”,
“refined_query”: “The standalone search query”,
“reasoning”: “Why you chose this action”
}
“””

async def planner_node(state: AgentState) -> dict:
logger.info(“Planner Node: Analyzing query…”)
last_message = state[“messages”][-1]
user_query = last_message.content if hasattr(last_message, ‘content’) else last_message[‘content’]
try:
response_text = await llm_client.chat_completion(
messages=[
{“role”: “system”, “content”: SYSTEM_PROMPT},
{“role”: “user”, “content”: user_query}
],
temperature=0.0
)
plan = json.loads(response_text)
logger.info(f”Plan derived: {plan[‘action’]}”)
return {
“current_query”: plan.get(“refined_query”, user_query),
“plan”: [plan[“reasoning”]]
}
except Exception as e:
logger.error(f”Planning failed: {e}”)
return {“current_query”: user_query, “plan”: [“Error in planning, defaulting to retrieval.”]}
“`

Retriever 节点 services/api/app/agents/nodes/retriever.py 执行混合检索,并行调用 Qdrant 与 Neo4j:

“`python

services/api/app/agents/nodes/retriever.py

import asyncio
from typing import Dict, List
from services.api.app.agents.state import AgentState
from services.api.app.clients.qdrant import qdrant_client
from services.api.app.clients.neo4j import neo4j_client
from services.api.app.clients.ray_embed import embed_client
import logging

logger = logging.getLogger(name)

async def retrieve_node(state: AgentState) -> Dict:
query = state[“current_query”]
logger.info(f”Retrieving context for: {query}”)
query_vector = await embed_client.embed_query(query)

async def run_vector_search():
    results = await qdrant_client.search(vector=query_vector, limit=5)
    return [f"{r.payload['text']} [Source: {r.payload['metadata']['filename']}]" for r in results]

async def run_graph_search():
    cypher = """
    CALL db.index.fulltext.queryNodes("entity_index", $query) YIELD node, score
    MATCH (node)-[r]->(neighbor)
    RETURN node.name + ' ' + type(r) + ' ' + neighbor.name as text
    LIMIT 5
    """
    try:
        results = await neo4j_client.query(cypher, {"query": query})
        return [r['text'] for r in results]
    except Exception as e:
        logger.error(f"Graph search failed: {e}")
        return []

vector_docs, graph_docs = await asyncio.gather(run_vector_search(), run_graph_search())
combined_docs = list(set(vector_docs + graph_docs))
logger.info(f"Retrieved {len(combined_docs)} documents.")
return {"documents": combined_docs}

“`

Responder 节点 services/api/app/agents/nodes/responder.py 使用 Llama-70B 模型综合检索结果生成最终回答:

“`python

services/api/app/agents/nodes/responder.py

from services.api.app.agents.state import AgentState
from services.api.app.clients.ray_llm import llm_client

async def generate_node(state: AgentState) -> dict:
query = state[“current_query”]
documents = state.get(“documents”, [])
context_str = “nn”.join(documents)
prompt = f”””
You are a helpful Enterprise Assistant. Use the context below to answer the user’s question.

Context:
{context_str}

Question:
{query}

Instructions:
1. Cite sources using [Source: Filename].
2. If the answer is not in the context, say "I don't have that information in my documents."
3. Be concise and professional.
"""
answer = await llm_client.chat_completion(
    messages=[{"role": "user", "content": prompt}],
    temperature=0.3
)
return {"messages": [{"role": "assistant", "content": answer}]}

“`

Tool 节点:执行外部计算与搜索

services/api/app/agents/nodes/tool.py 节点负责执行外部计算或搜索任务,根据计划选择并调用相应的工具。

“`python

services/api/app/agents/nodes/tool.py

import logging
from services.api.app.agents.state import AgentState
from services.api.app.tools.calculator import calculate
from services.api.app.tools.graph_search import search_graph_tool
from services.api.app.tools.web_search import web_search

logger = logging.getLogger(name)

async def tool_node(state: AgentState) -> dict:
plan_data = state.get(“plan”, [])
if not plan_data:
return {“messages”: [{“role”: “system”, “content”: “No tool selected.”}]}
tool_name = state.get(“tool_choice”, “calculator”)
tool_input = state.get(“tool_input”, “0+0”)
result = “”
if tool_name == “calculator”:
logger.info(f”Executing Calculator: {tool_input}”)
result = calculate(tool_input)
elif tool_name == “graph_search”:
logger.info(f”Executing Graph Search: {tool_input}”)
result = await search_graph_tool(tool_input)
else:
result = “Unknown tool requested.”
return {“messages”: [{“role”: “user”, “content”: f”Tool Output: {result}”}]}
“`

节点编排:构建 LangGraph 工作流

services/api/app/agents/graph.py 中,将各个功能节点编排成一个有向工作流。

“`python

services/api/app/agents/graph.py

from langgraph.graph import StateGraph, END
from services.api.app.agents.state import AgentState
from services.api.app.agents.nodes.retriever import retrieve_node
from services.api.app.agents.nodes.responder import generate_node
from services.api.app.agents.nodes.planner import planner_node

workflow = StateGraph(AgentState)
workflow.add_node(“planner”, planner_node)
workflow.add_node(“retriever”, retrieve_node)
workflow.add_node(“responder”, generate_node)

workflow.set_entry_point(“planner”)
workflow.add_edge(“planner”, “retriever”)
workflow.add_edge(“retriever”, “responder”)
workflow.add_edge(“responder”, END)

agent_app = workflow.compile()
“`

查询增强:HyDE 实现

为提高检索的准确度,我们采用 HyDE(Hypothetical Document Embeddings)技术。其核心思想是让大语言模型(LLM)根据问题“幻写”一个假设性答案段落,然后使用该段落的语义向量进行检索,从而更容易找到具有相似语义模式的真实文档。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解 Query Enhance (Created by Fareed Khan)

实现位于 services/api/app/enhancers/hyde.py

“`python

services/api/app/enhancers/hyde.py

from services.api.app.clients.ray_llm import llm_client

SYSTEM_PROMPT = “””
You are a helpful assistant.
Write a hypothetical paragraph that answers the user’s question.
It does not need to be factually correct, but it must use the correct vocabulary and structure
that a relevant document would have.
Question: {question}
“””

async def generate_hypothetical_document(question: str) -> str:
try:
hypothetical_doc = await llm_client.chat_completion(
messages=[
{“role”: “system”, “content”: SYSTEM_PROMPT.format(question=question)},
],
temperature=0.7
)
return hypothetical_doc
except Exception:
return question
“`

同时在 services/api/app/enhancers/query_rewriter.py 中实现指代消解与查询重写,将“它/他们”等代词补全为完整的查询语句:

“`python

services/api/app/enhancers/query_rewriter.py

from typing import List, Dict
from services.api.app.clients.ray_llm import llm_client

SYSTEM_PROMPT = “””
You are a Query Rewriter.
Your task is to rewrite the latest user question to be a standalone search query,
resolving coreferences (he, she, it, they) using the conversation history.
History:
{history}
Latest Question: {question}
Output ONLY the rewritten question. If no rewriting is needed, output the latest question as is.
“””

async def rewrite_query(question: str, history: List[Dict[str, str]]) -> str:
if not history:
return question
history_str = “n”.join([f”{msg[‘role’]}: {msg[‘content’]}” for msg in history])
prompt = SYSTEM_PROMPT.format(history=history_str, question=question)
try:
rewritten = await llm_client.chat_completion(
messages=[{“role”: “user”, “content”: prompt}],
temperature=0.0
)
return rewritten.strip()
except Exception as e:
return question
“`

应用主入口 services/api/main.py 负责管理 FastAPI 应用的启动/关闭生命周期以及相关客户端的初始化和资源释放:

“`python

services/api/main.py

from contextlib import asynccontextmanager
from fastapi import FastAPI
from services.api.app.clients.neo4j import neo4j_client
from services.api.app.clients.ray_llm import llm_client
from services.api.app.clients.ray_embed import embed_client
from services.api.app.cache.redis import redis_client
from services.api.app.routes import chat, upload, health

@asynccontextmanager
async def lifespan(app: FastAPI):
print(“Initializing clients…”)
neo4j_client.connect()
await redis_client.connect()
await llm_client.start()
await embed_client.start()
yield
print(“Closing clients…”)
await neo4j_client.close()
await redis_client.close()
await llm_client.close()
await embed_client.close()

app = FastAPI(title=”Enterprise RAG Platform”, version=”1.0.0″, lifespan=lifespan)
app.include_router(chat.router, prefix=”/api/v1/chat”, tags=[“Chat”])
app.include_router(upload.router, prefix=”/api/v1/upload”, tags=[“Upload”])
app.include_router(health.router, prefix=”/health”, tags=[“Health”])

if name == “main“:
import uvicorn
uvicorn.run(app, host=”0.0.0.0”, port=8000)
“`

至此,我们已经构建了一个具备规划、检索与推理能力的智能体。接下来,我们将构建 Tools & Sandbox 层,使智能体能够安全地执行代码并访问外部知识。

Tools & Sandbox

在企业级 RAG 平台中,允许 AI 直接执行代码或查询内部数据库会带来巨大的安全风险。攻击者可能通过提示词注入等方式访问敏感数据。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解

我们采取分层策略来应对这一挑战:首先构建安全、确定性的工具;对于高风险操作(如 Python 代码执行),则在隔离且加固的沙箱容器中运行。

安全代码沙箱

让大语言模型生成并执行 Python 代码非常强大(例如进行数学计算、数据绘图或 CSV 分析),但也极其危险:模型可能产生诸如 os.system("rm -rf /") 的恶意指令,或尝试窃取数据。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解

解决方案是构建一个独立的 沙箱微服务。该服务在独立的容器中运行不可信代码,容器本身不具备网络访问权限,并受到严格的 CPU/RAM 资源限制与执行超时控制。

services/sandbox/Dockerfile:创建非 root 用户,限制权限。

“`dockerfile

services/sandbox/Dockerfile

FROM python:3.10-slim
ENV PYTHONDONTWRITEBYTECODE=1
PYTHONUNBUFFERED=1
RUN useradd -m -u 1000 sandbox_user
WORKDIR /app
COPY runner.py .
RUN pip install flask
USER sandbox_user
EXPOSE 8080
CMD [“python”, “runner.py”]
“`

services/sandbox/runner.py:Flask 接收代码,在独立进程执行,捕获 stdout,超时即杀:

“`python

services/sandbox/runner.py

from flask import Flask, request, jsonify
import sys
import io
import contextlib
import multiprocessing

app = Flask(name)

def execute_code_safe(code: str, queue):
buffer = io.StringIO()
try:
with contextlib.redirect_stdout(buffer):
exec(code, {“builtins“: builtins}, {})
queue.put({“status”: “success”, “output”: buffer.getvalue()})
except Exception as e:
queue.put({“status”: “error”, “output”: str(e)})

@app.route(“/execute”, methods=[“POST”])
def run_code():
data = request.json
code = data.get(“code”, “”)
timeout = data.get(“timeout”, 5)
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=execute_code_safe, args=(code, queue))
p.start()
p.join(timeout)
if p.is_alive():
p.terminate()
return jsonify({“output”: “Error: Execution timed out.”}), 408
if not queue.empty():
result = queue.get()
return jsonify(result)
return jsonify({“output”: “No output produced.”})

if name == “main“:
app.run(host=”0.0.0.0”, port=8080)
“`

资源限制 services/sandbox/limits.yaml(防止 while True 等攻击):

“`yaml

services/sandbox/limits.yaml

runtime:
timeout_seconds: 10
memory_limit_mb: 512
cpu_limit: 0.5
allow_network: false
files:
max_input_size_mb: 5
allowed_imports: [“math”, “datetime”, “json”, “pandas”, “numpy”]
“`

K8s NetworkPolicy 全面禁止 Sandbox egress services/sandbox/network-policy.yaml

“`yaml

services/sandbox/network-policy.yaml

apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: sandbox-deny-egress
namespace: default
spec:
podSelector:
matchLabels:
app: sandbox-service
policyTypes:
– Egress
egress: []
“`

services/api/app/tools/sandbox.py
“`python
import httpx
from services.api.app.config import settings

SANDBOX_URL = “http://sandbox-service:8080/execute”

async def run_python_code(code: str) -> str:
try:
async with httpx.AsyncClient() as client:
response = await client.post(
SANDBOX_URL,
json={“code”: code, “timeout”: 5},
timeout=6.0
)
if response.status_code == 200:
data = response.json()
if data.get(“status”) == “success”:
return f”Output:n{data[‘output’]}”
else:
return f”Execution Error:n{data[‘output’]}”
else:
return f”Sandbox Error: Status {response.status_code}”
except Exception as e:
return f”Sandbox Connection Failed: {str(e)}”
“`

确定性与搜索工具

鉴于代码执行风险较高,应优先选择更安全、确定性强的工具。对于数学计算,可使用 simpleeval 库实现安全计算器,位于 services/api/app/tools/calculator.py

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解

“`python

services/api/app/tools/calculator.py

from simpleeval import simple_eval

def calculate(expression: str) -> str:
“””
使用 simpleeval 安全计算表达式,避免远程代码执行(RCE)风险
“””
if len(expression) > 100:
return “Error: Expression too long.”
try:
result = simple_eval(expression)
return str(result)
except Exception as e:
return f”Error: {str(e)}”
“`

将图数据库(Graph DB)和向量数据库(Vector DB)的功能封装为工具对外暴露。图搜索工具 services/api/app/tools/graph_search.py 的实现逻辑是:首先由大语言模型(LLM)从用户问题中提取核心实体,然后执行参数化的 Cypher 查询。禁止直接让 LLM 生成 Cypher 语句,以防止潜在的注入攻击。

“`python

services/api/app/tools/graph_search.py

from services.api.app.clients.neo4j import neo4j_client
from services.api.app.clients.ray_llm import llm_client
import json

SYSTEM_PROMPT = “””
You are a Knowledge Graph Helper.
Extract the core entities from the user’s question to perform a search.
Question: {question}
Output JSON only: {“entities”: [“list”, “of”, “names”]}
“””
“`

工具的具体实现如下:

“`python
async def search_graph_tool(question: str) -> str:
try:
response_text = await llm_client.chat_completion(
messages=[{“role”: “system”, “content”: SYSTEM_PROMPT.format(question=question)}],
temperature=0.0,
json_mode=True
)
data = json.loads(response_text)
entities = data.get(“entities”, [])

    cypher_query = """
    UNWIND $names AS target_name
    CALL db.index.fulltext.queryNodes("entity_index", target_name) YIELD node, score
    MATCH (node)-[r]-(neighbor)
    RETURN node.name AS source, type(r) AS rel, neighbor.name AS target
    LIMIT 10
    """
    results = await neo4j_client.query(cypher_query, {"names": entities})
    return str(results) if results else "No connections found."
except Exception as e:
    return f"Graph search error: {str(e)}"

“`

向量搜索工具 services/api/app/tools/vector_search.py

“`python

services/api/app/tools/vector_search.py

from services.api.app.clients.qdrant import qdrant_client
from services.api.app.clients.ray_embed import embed_client

async def search_vector_tool(query: str) -> str:
try:
vector = await embed_client.embed_query(query)
results = await qdrant_client.search(vector, limit=3)
formatted = “”
for r in results:
meta = r.payload.get(“metadata”, {})
formatted += f”- {r.payload.get(‘text’, ”)[:200]}… [Source: {meta.get(‘filename’)}]n”
return formatted if formatted else “No relevant documents found.”
except Exception as e:
return f”Search Error: {str(e)}”
“`

外部搜索工具 services/api/app/tools/web_search.py(如 Tavily):

“`python

services/api/app/tools/web_search.py

import httpx
import os

async def web_search_tool(query: str) -> str:
api_key = os.getenv(“TAVILY_API_KEY”)
if not api_key:
return “Web search disabled.”
try:
async with httpx.AsyncClient() as client:
response = await client.post(
“https://api.tavily.com/search”,
json={“api_key”: api_key, “query”: query, “max_results”: 3}
)
data = response.json()
results = data.get(“results”, [])
return “n”.join([f”- {r[‘title’]}: {r[‘content’]}” for r in results])
except Exception as e:
return f”Web Search Error: {str(e)}”
“`

API 路由与网关逻辑

将系统通过 REST 暴露。主路由 services/api/app/routes/chat.py 处理流式响应、LangGraph 执行与后台日志记录:

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解 Route Logic (Created by Fareed Khan)

services/api/app/routes/chat.py

“`python
import uuid
import json
import logging
from typing import AsyncGenerator
from fastapi import APIRouter, Depends, BackgroundTasks
from fastapi.responses import StreamingResponse
from services.api.app.auth.jwt import get_current_user
from services.api.app.agents.graph import agent_app
from services.api.app.agents.state import AgentState
from services.api.app.memory.postgres import postgres_memory
from services.api.app.cache.semantic import semantic_cache

router = APIRouter()
logger = logging.getLogger(name)

class ChatRequest(BaseModel):
message: str
session_id: str = None

@router.post(“/stream”)
async def chat_stream(req: ChatRequest, background_tasks: BackgroundTasks, user: dict = Depends(get_current_user)):
session_id = req.session_id or str(uuid.uuid4())
user_id = user[“id”]
cached_ans = await semantic_cache.get_cached_response(req.message)
if cached_ans:
async def stream_cache():
yield json.dumps({“type”: “answer”, “content”: cached_ans}) + “n”
return StreamingResponse(stream_cache(), media_type=”application/x-ndjson”)

history_objs = await postgres_memory.get_history(session_id, limit=6)
history_dicts = [{"role": msg.role, "content": msg.content} for msg in history_objs]
history_dicts.append({"role": "user", "content": req.message})

initial_state = AgentState(messages=history_dicts, current_query=req.message, documents=[], plan=[])
async def event_generator():
    final_answer = ""
    async for event in agent_app.astream(initial_state):
        node_name = list(event.keys())[0]
        node_data = event[node_name]
        yield json.dumps({"type": "status", "node": node_name}) + "n"
        if node_name == "responder" and "messages" in node_data:
            final_answer = node_data["messages"][-1]["content"]
            yield json.dumps({"type": "answer", "content": final_answer}) + "n"
    if final_answer:
        await postgres_memory.add_message(session_id, "user", req.message, user_id)
        await postgres_memory.add_message(session_id, "assistant", final_answer, user_id)
        await semantic_cache.set_cached_response(req.message, final_answer)
return StreamingResponse(event_generator(), media_type="application/x-ndjson")

“`

文件上传 services/api/app/routes/upload.py:在企业场景中,通常不直接通过应用API上传大文件,而是生成S3预签名URL,让前端直接上传至对象存储:

“`python

services/api/app/routes/upload.py

import boto3
import uuid
from fastapi import APIRouter, Depends
from services.api.app.config import settings
from services.api.app.auth.jwt import get_current_user

router = APIRouter()

s3_client = boto3.client(“s3”, region_name=settings.AWS_REGION)

@router.post(“/generate-presigned-url”)
async def generate_upload_url(filename: str, content_type: str, user: dict = Depends(get_current_user)):
file_id = str(uuid.uuid4())
s3_key = f”uploads/{user[‘id’]}/{file_id}”
url = s3_client.generate_presigned_url(
ClientMethod=’put_object’,
Params={‘Bucket’: settings.S3_BUCKET_NAME, ‘Key’: s3_key, ‘ContentType’: content_type},
ExpiresIn=3600
)
return {“upload_url”: url, “file_id”: file_id, “s3_key”: s3_key}
“`

反馈与健康检查(示例代码结构):

“`python

services/api/app/routes/feedback.py

@router.post(“/”)
async def submit_feedback(
req: FeedbackRequest,
user: dict = Depends(get_current_user)
):

services/api/app/routes/health.py

@router.get(“/readiness”)
async def readiness(response: Response):

“`

网关限流 services/gateway/rate_limit.lua:在Nginx或Kong网关层运行Lua脚本,优先在边缘拦截滥用请求。

“`lua

— services/gateway/rate_limit.lua

local redis = require “resty.redis”
local red = redis:new()

red:set_timeout(100)

local ok, err = red:connect(“rag-redis-prod”, 6379)

if not ok then return ngx.exit(500) end
local key = “rate_limit:” .. ngx.var.remote_addr

local limit = 100

local current = red:incr(key)

if current == 1 then red:expire(key, 60) end

if current > limit then
ngx.status = 429
ngx.say(“Rate limit exceeded.”)
return ngx.exit(429)
end
“`

将限流逻辑前移到Nginx/Lua层,可以在请求抵达后端Python API前拦截恶意流量,有效节省服务器资源。

至此,我们已经完成了应用栈的核心部分:数据摄取(Ingestion)、模型(Models)、智能体(Agent)和工具(Tools)。接下来,我们将在AWS上配置运行该系统所需的基础设施。

Infrastructure as Code (IaC)

拥有软件栈后,我们需要一个可靠的环境来运行它。企业级平台不能依赖在AWS控制台手动操作(“ClickOps”),这会导致配置漂移、安全漏洞以及环境不可重复等问题。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解 Infra layer (Created by Fareed Khan)

  1. 使用Terraform将整个云基础设施代码化,实现分钟级拉起一致的开发(dev)、预发布(staging)和生产(prod)环境。
  2. 使用Karpenter实现智能、准实时的节点伸缩,相比传统的自动伸缩组(ASG)更快、更经济。

基础与网络

一切从网络开始。需要一个隔离的VPC,通常包含对外服务的负载均衡层(Public)、内部应用层(Private)和数据库层(Database),实现三层网络隔离。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解 Networking (Created by Fareed Khan)

infra/terraform/main.tf 配置远程状态后端(使用S3存储和DynamoDB锁),防止多人并发操作导致状态文件损坏:

“`hcl

infra/terraform/main.tf

terraform {
required_version = “>= 1.5.0”

backend “s3” {
bucket = “rag-platform-terraform-state-prod-001”
key = “platform/terraform.tfstate”
region = “us-east-1”
encrypt = true
dynamodb_table = “terraform-state-lock”
}

required_providers {
aws = { source = “hashicorp/aws”, version = “~> 5.0” }
kubernetes = { source = “hashicorp/kubernetes”, version = “~> 2.23” }
}
}

provider “aws” {
region = var.aws_region
default_tags {
tags = { Project = “Enterprise-RAG”, ManagedBy = “Terraform” }
}
}
“`

变量定义 infra/terraform/variables.tf

“`hcl

infra/terraform/variables.tf

variable “aws_region” {
description = “AWS region to deploy resources”
default = “us-east-1”
}

variable “cluster_name” {
description = “Name of the EKS Cluster”
default = “rag-platform-cluster”
}

variable “vpc_cidr” {
description = “CIDR block for the VPC”
default = “10.0.0.0/16”
}
“`

三层网络 infra/terraform/vpc.tf

“`hcl

infra/terraform/vpc.tf

module “vpc” {
source = “terraform-aws-modules/vpc/aws”
version = “5.1.0”

name = “${var.cluster_name}-vpc”
cidr = var.vpc_cidr

azs = [“us-east-1a”, “us-east-1b”, “us-east-1c”]
public_subnets = [“10.0.1.0/24”, “10.0.2.0/24”, “10.0.3.0/24”]
private_subnets = [“10.0.101.0/24”, “10.0.102.0/24”, “10.0.103.0/24”]
database_subnets = [“10.0.201.0/24”, “10.0.202.0/24”, “10.0.203.0/24”]

enable_nat_gateway = true
single_nat_gateway = false
enable_dns_hostnames = true

public_subnet_tags = {
“kubernetes.io/role/elb” = “1”
}
private_subnet_tags = {
“kubernetes.io/role/internal-elb” = “1”
}
}
“`

计算集群(EKS & IAM)

计算层以 Amazon EKS 作为控制平面。通过启用 OIDC(IRSA),Kubernetes ServiceAccount 可以临时扮演 AWS IAM 角色,从而避免长期访问密钥泄露的风险。配置示例如下 infra/terraform/eks.tf

“`hcl

infra/terraform/eks.tf

module “eks” {
source = “terraform-aws-modules/eks/aws”
version = “~> 19.0”

cluster_name = var.cluster_name
cluster_version = “1.29”
vpc_id = module.vpc.vpc_id
subnet_ids = module.vpc.private_subnets
enable_irsa = true

eks_managed_node_groups = {
system = {
name = “system-nodes”
instance_types = [“m6i.large”]
min_size = 2
max_size = 5
desired_size = 2
}
}
}
“`

遵循最小权限原则,仅为数据摄取作业授予必要的 S3 权限,并将其与特定的 ServiceAccount 绑定 infra/terraform/iam.tf

“`hcl

infra/terraform/iam.tf

resource “aws_iam_policy” “ingestion_policy” {
name = “RAG_Ingestion_S3_Policy”
policy = jsonencode({
Version = “2012-10-17”
Statement = [
{
Action = [“s3:GetObject”, “s3:PutObject”, “s3:ListBucket”]
Effect = “Allow”
Resource = [aws_s3_bucket.documents.arn, “${aws_s3_bucket.documents.arn}/*”]
}
]
})
}

module “ingestion_irsa_role” {
source = “terraform-aws-modules/iam/aws//modules/iam-role-for-service-account-eks”
role_name = “rag-ingestion-role”

oidc_providers = {
main = {
provider_arn = module.eks.oidc_provider_arn
namespace_service_accounts = [“default:ray-worker”]
}
}
role_policy_arns = {
policy = aws_iam_policy.ingestion_policy.arn
}
}
“`

托管数据存储

为降低维护成本与风险,生产系统优先采用托管服务,例如 Aurora(PostgreSQL)用于关系型数据,ElastiCache(Redis)用于缓存。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解 Data Store management (Created by Fareed Khan)

使用 Karpenter 实现自动扩缩容

传统的 Cluster Autoscaler 采用被动响应模式,扩缩容速度相对较慢。Karpenter 则采取主动策略,它会实时分析待调度 Pod 的资源需求(如 GPU、内存),并在数秒内启动最匹配的 EC2 实例。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解 Autoscale with karpenter (Created by Fareed Khan)

CPU Provisioner(无状态服务使用 Spot)infra/karpenter/provisioner-cpu.yaml

“`yaml

infra/karpenter/provisioner-cpu.yaml

apiVersion: karpenter.sh/v1beta1
kind: Provisioner
metadata:
name: cpu-provisioner
spec:
requirements:
– key: “karpenter.k8s.aws/instance-family”
operator: In
values: [“m6i”, “c6i”]
– key: “karpenter.sh/capacity-type”
operator: In
values: [“spot”]
limits:
resources:
cpu: 1000
consolidation:
enabled: true
“`

GPU Provisioner(支持 Scale-to-Zero)infra/karpenter/provisioner-gpu.yaml

“`yaml

infra/karpenter/provisioner-gpu.yaml

apiVersion: karpenter.sh/v1beta1
kind: Provisioner
metadata:
name: gpu-provisioner
spec:
requirements:
– key: “karpenter.k8s.aws/instance-category”
operator: In
values: [“g”]
– key: “karpenter.sh/capacity-type”
operator: In
values: [“on-demand”, “spot”]
ttlSecondsAfterEmpty: 30
“`

部署

生产级 RAG 平台的部署远不止执行 kubectl apply。它需要管理配置漂移、安全地处理密钥,并确保数据库具备容灾能力。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解 Deployment layer (Created by Fareed Khan)

使用 Helm 打包应用,通过统一的模板化配置,可以方便地按不同环境覆盖 values

集群引导与密钥管理

在部署自研组件之前,需要先安装 Ingress Controller、External Secrets Operator 和 KubeRay Operator。可以通过 scripts/bootstrap_cluster.sh 脚本一键安装:

“`bash

!/bin/bash

1. KubeRay Operator

helm repo add kuberay https://ray-project.github.io/kuberay-helm/
helm install kuberay-operator kuberay/kuberay-operator –version 1.0.0

2. External Secrets

helm repo add external-secrets https://charts.external-secrets.io
helm install external-secrets external-secrets/external-secrets

3. Nginx Ingress

helm repo add ingress-nginx https://kubernetes.github.io/ingress-nginx
helm install ingress-nginx ingress-nginx/ingress-nginx
“`

生产环境的密钥不应提交到 Git 仓库。我们采用 AWS Secrets Manager 进行存储,然后通过 External Secrets Operator 将其注入到 Kubernetes Pod 中:

“`yaml

deploy/secrets/external-secrets.yaml

apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
name: app-secrets
spec:
refreshInterval: 1h
secretStoreRef:
name: aws-secrets-manager
kind: ClusterSecretStore
target:
name: app-env-secret
data:
– secretKey: NEO4J_PASSWORD
remoteRef:
key: prod/rag/db_creds
property: neo4j_password
“`

数据库与 Ingress 部署

Qdrant 的 Helm 生产化配置 deploy/helm/qdrant/values.yaml

“`yaml

deploy/helm/qdrant/values.yaml

replicaCount: 3

config:
storage:
on_disk_payload: true
service:
enable_tls: false

resources:
requests:
cpu: “2”
memory: “4Gi”
limits:
cpu: “4”
memory: “8Gi”

persistence:
size: 50Gi
storageClassName: gp3
“`

Neo4j 的 Helm 生产化配置 deploy/helm/neo4j/values.yaml

“`yaml

deploy/helm/neo4j/values.yaml

neo4j:
name: “neo4j-cluster”
edition: “community”

core:
numberOfServers: 1
resources:
requests:
cpu: “2”
memory: “8Gi”

volumes:
data:
mode: “default”
storageClassName: “gp3”
size: “100Gi”
“`

Ingress 配置

Ingress 配置定义了外部访问集群服务的路由规则,位于 deploy/ingress/nginx.yaml

“`yaml

deploy/ingress/nginx.yaml

apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: rag-ingress
annotations:
kubernetes.io/ingress.class: nginx
nginx.ingress.kubernetes.io/proxy-body-size: “50m”
nginx.ingress.kubernetes.io/proxy-read-timeout: “3600”
nginx.ingress.kubernetes.io/proxy-send-timeout: “3600”
spec:
rules:
– host: api.your-rag-platform.com
http:
paths:
– path: /chat
pathType: Prefix
backend:
service:
name: api-service
port:
number: 80
– path: /upload
pathType: Prefix
backend:
service:
name: api-service
port:
number: 80
“`

Ray AI 集群部署

Ray 集群的部署配置用于管理分布式计算任务,主要包括自动扩缩、集群定义和服务部署。

Ray 自动扩缩配置

自动扩缩配置位于 deploy/ray/autoscaling.yaml,用于定义工作节点的弹性伸缩策略。

“`yaml

deploy/ray/autoscaling.yaml

autoscaling:
enabled: true
upscaling_speed: 1.0
idle_timeout_minutes: 5
worker_nodes:
gpu_worker_group:
min_workers: 0
max_workers: 20
resources: {“CPU”: 4, “memory”: “32Gi”, “GPU”: 1}
“`

RayCluster 定义

RayCluster 资源定义位于 deploy/ray/ray-cluster.yaml,用于声明 Ray 集群的头节点和工作节点组。

“`yaml

deploy/ray/ray-cluster.yaml

apiVersion: ray.io/v1
kind: RayCluster
metadata:
name: rag-ray-cluster
spec:
rayVersion: ‘2.9.0’
headGroupSpec:
serviceType: ClusterIP
template:
spec:
containers:
– name: ray-head
image: rayproject/ray:2.9.0-py310-gpu
resources: { requests: { cpu: “2”, memory: “8Gi” } }
workerGroupSpecs:
– groupName: gpu-workers
replicas: 0
minReplicas: 0
maxReplicas: 20
template:
spec:
containers:
– name: ray-worker
image: rayproject/ray:2.9.0-py310-gpu
resources:
limits: { nvidia.com/gpu: 1 }
requests: { nvidia.com/gpu: 1 }
tolerations:
– key: “nvidia.com/gpu”
operator: “Exists”
“`

RayService 部署(Embedding)

用于部署嵌入模型服务的 RayService 配置位于 deploy/ray/ray-serve-embed.yaml

“`yaml

deploy/ray/ray-serve-embed.yaml

apiVersion: ray.io/v1
kind: RayService
metadata:
name: embed-service
spec:
serveConfigV2: |
applications:
– name: bge-m3
import_path: services.api.app.models.embedding_engine:app
deployments:
– name: EmbedDeployment
autoscaling_config:
min_replicas: 1
max_replicas: 5
ray_actor_options:
num_gpus: 0.5
“`

RayService(LLM):deploy/ray/ray-serve-llm.yaml

“`yaml

deploy/ray/ray-serve-llm.yaml

apiVersion: ray.io/v1
kind: RayService
metadata:
name: llm-service
spec:
serveConfigV2: |
applications:
– name: llama3
import_path: services.api.app.models.vllm_engine:app
runtime_env:
pip: [“vllm==0.3.0”]
env_vars:
MODEL_ID: “meta-llama/Meta-Llama-3-70B-Instruct”
deployments:
– name: VLLMDeployment
autoscaling_config:
min_replicas: 1
max_replicas: 10
ray_actor_options:
num_gpus: 1
“`

成本清理脚本 scripts/cleanup.sh

“`bash

scripts/cleanup.sh

!/bin/bash

echo “⚠️ WARNING: THIS WILL DESTROY ALL CLOUD RESOURCES ⚠️”
echo “Includes: EKS Cluster, Databases (RDS/Neo4j/Redis), S3 Buckets, Load Balancers.”
echo “Cost-saving measure for Dev/Test environments.”
echo “”
read -p “Are you sure? Type ‘DESTROY’: ” confirm

if [ “$confirm” != “DESTROY” ]; then
echo “Aborted.”
exit 1
fi

echo “🔹 1. Deleting Kubernetes Resources (Helm)…”
helm uninstall api || true
helm uninstall qdrant || true
helm uninstall ray-cluster || true
kubectl delete -f deploy/ray/ || true

echo “🔹 2. Waiting for LBs to cleanup…”
sleep 20

echo “🔹 3. Running Terraform Destroy…”
cd infra/terraform
terraform destroy -auto-approve

echo “✅ All resources destroyed.”
“`

通过整合 Terraform、Helm 与 Ray,我们实现了对基础设施的全面掌控,并自动化了分布式系统的复杂性。接下来,我们将探讨如何在生产规模下进行监控与评估。

评估与运维

企业级 RAG 平台需要在开发阶段就重视评估。这要求我们建立可观测性(指标/追踪)、评估(准确性)以及运维(压测/维护)策略。

在生产环境中,用户的“点赞/点踩”反馈虽然有用,但通常稀疏且滞后,难以及时发现问题。

可观测性与追踪

“不可度量就不可优化”。对于涉及 Ray、Kubernetes 及多数据库的分布式系统,若缺乏分布式追踪,几乎无法定位性能瓶颈。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解 Tracing Logic (Created by Fareed Khan)

libs/observability/metrics.py 中定义 Prometheus 指标,用于追踪请求量、延迟和 Token 使用量:

“`python

libs/observability/metrics.py

from prometheus_client import Counter, Histogram

REQUEST_COUNT = Counter(
“rag_api_requests_total”,
“Total number of requests”,
[“method”, “endpoint”, “status”]
)

REQUEST_LATENCY = Histogram(
“rag_api_latency_seconds”,
“Request latency”,
[“endpoint”]
)

TOKEN_USAGE = Counter(
“rag_llm_tokens_total”,
“Total LLM tokens consumed”,
[“model”, “type”]
)

def track_request(method: str, endpoint: str, status: int):
REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
“`

OpenTelemetry 追踪配置 libs/observability/tracing.py

“`python

libs/observability/tracing.py

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter

def configure_tracing(service_name: str):
provider = TracerProvider()
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
return trace.get_tracer(service_name)
“`

持续评估流水线

采用“LLM-as-a-Judge”方法进行自动质量检查,并结合黄金数据集(Golden Dataset)来评估 RAG 系统的准确率。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解 Continuous Eval (Created by Fareed Khan)

eval/datasets/golden.json(示例):
json
// eval/datasets/golden.json
[
{
"question": "Explain the difference between the Horizontal Pod Autoscaler (HPA) and the Cluster Autoscaler.",
"ground_truth": "The HPA scales the number of Pods based on CPU/metrics. The Cluster Autoscaler scales the number of Nodes when pods are unschedulable.",
"contexts": [
"The Horizontal Pod Autoscaler is a Kubernetes component that adjusts the number of replicas...",
"The Cluster Autoscaler is a tool that automatically adjusts the size of the Kubernetes cluster..."
]
}
// ...
]

Judge 脚本 eval/judges/llm_judge.py
“`python

eval/judges/llm_judge.py

from pydantic import BaseModel
from services.api.app.clients.ray_llm import llm_client

class Grade(BaseModel):
score: int
reasoning: str

JUDGE_PROMPT = “””
You are an impartial judge evaluating a RAG system.
You will be given a Question, a Ground Truth Answer, and the System’s Answer.
Rate the System’s Answer on a scale of 1 to 5:
1: Completely wrong or hallucinated.
3: Partially correct but missing key details.
5: Perfect, comprehensive, and matches Ground Truth logic.
Output JSON only: {“score”: int, “reasoning”: “string”}
Question: {question}
Ground Truth: {ground_truth}
System Answer: {system_answer}
“””

async def grade_answer(question: str, ground_truth: str, system_answer: str) -> Grade:
import json
try:
response_text = await llm_client.chat_completion(
messages=[{“role”: “user”, “content”: JUDGE_PROMPT.format(
question=question,
ground_truth=ground_truth,
system_answer=system_answer
)}],
temperature=0.0
)
data = json.loads(response_text)
return Grade(**data)
except Exception as e:
return Grade(score=0, reasoning=f”Judge Error: {e}”)
“`

使用 Ragas 进行系统化评估 eval/ragas/run.py
“`python

eval/ragas/run.py

from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy
from datasets import Dataset

def run_evaluation(questions, answers, contexts, ground_truths):
data = {
“question”: questions,
“answer”: answers,
“contexts”: contexts,
“ground_truth”: ground_truths
}
dataset = Dataset.from_dict(data)
results = evaluate(
dataset=dataset,
metrics=[faithfulness, answer_relevancy],
)
df = results.to_pandas()
df.to_csv(“eval/reports/evaluation_results.csv”, index=False)
print(results)
“`

在 CI/CD 流程中,可以设定质量阈值(例如,当 Faithfulness 分数低于 0.8 时,阻断部署流程)。

卓越运营与维护

维护工具:包括压力测试、数据迁移、服务预热等。

构建可扩展、生产级的 Agentic RAG Pipeline:分层架构与六层核心设计详解 Mantaining Deployment (Created by Fareed Khan)

Locust 压测 scripts/load_test.py

“`python

scripts/load_test.py

from locust import HttpUser, task, between
import os

class RAGUser(HttpUser):
wait_time = between(1, 5)

@task
def chat_stream_task(self):
    headers = {"Authorization": f"Bearer {os.getenv('AUTH_TOKEN')}"}
    payload = {
        "message": "What is the warranty policy for the new X1 processor?",
        "session_id": "loadtest-user-123"
    }
    with self.client.post(
        "/api/v1/chat/stream",
        json=payload,
        headers=headers,
        stream=True,
        name="/chat/stream"
    ) as response:
        if response.status_code != 200:
            response.failure("Failed request")
        else:
            for line in response.iter_lines():
                if line:
                    pass
            response.success()

“`

数据库迁移 scripts/migrate_db.py(包装 Alembic):

“`python

scripts/migrate_db.py

import subprocess
import os
from services.api.app.config import settings

def run_migrations():
print(“Running database migrations…”)
env = os.environ.copy()
env[“DATABASE_URL”] = settings.DATABASE_URL
try:
subprocess.run(
[“alembic”, “upgrade”, “head”],
check=True,
env=env,
cwd=os.path.dirname(os.path.abspath(file))
)
print(“✅ Migrations applied successfully.”)
except subprocess.CalledProcessError as e:
print(f”❌ Migration failed: {e}”)
exit(1)

if name == “main“:
run_migrations()
“`

缓存预热 scripts/warmup_cache.py

“`python

scripts/warmup_cache.py

import asyncio
from services.api.app.cache.semantic import semantic_cache

FAQ = [
(“What are your business hours?”, “Our business hours are 9 AM to 5 PM.”),
(“What is the return policy?”, “Return within 30 days.”),
]

async def warmup():
print(“🔥 Warming up semantic cache…”)
for question, answer in FAQ:
await semantic_cache.set_cached_response(question, answer)
print(“✅ Cache warmup complete.”)

if name == “main“:
asyncio.run(warmup())
“`

端到端执行

我们已完成代码、基础设施与评估流水线。现在把全部部署起来,运行 RAG 平台。

需要先为 Terraform 配置远端状态:S3 + DynamoDB 锁(一次性)。

创建 EKS 集群

Terraform 后端就绪后,在终端创建 VPC、EKS 控制面与数据库:

注意:此步骤仅创建 EKS 控制面与一个小的 system node group(2 个小 CPU)。不会创建 GPU 节点——只有当软件需要时才按需创建,以节省费用。

执行:

“`bash

项目根目录

make infra
“`

输出略,完成后得到:

Outputs:
aurora_db_endpoint = "rag-platform-cluster-postgres.cluster-c8s7d6f5.us-east-1.rds.amazonaws.com"
redis_primary_endpoint = "rag-redis-prod.ng.0001.use1.cache.amazonaws.com"
s3_bucket_name = "rag-platform-documents-prod"

配置 kubectl 并引导安装系统控制器:

bash
aws eks update-kubeconfig --region us-east-1 --name rag-platform-cluster
./scripts/bootstrap_cluster.sh

接着安装 Karpenter 配置(略)。然后应用 Ray 配置,K8s 会尝试调度需要 GPU 的 pod,Karpenter 将自动购买合适的 g5 实例:

bash
kubectl apply -f deploy/ray/
kubectl get pods -w

等待约 45–90 秒,GPU 节点初始化并就绪,Ray worker 运行中,vLLM 开始加载量化后约 40GB 的 Llama-3-70B 模型。

推理与延迟测试

将 1000 份文档上传至 S3 并触发 Ray 作业(Karpenter 将按需启动 CPU 节点):

bash
python scripts/bulk_upload_s3.py ./data rag-platform-documents-prod
python -m pipelines.jobs.s3_event_handler

Ray Data 仪表盘显示读取、解析、向量化及写入的进度与吞吐。得益于 CPU 并行处理与 GPU 批处理能力,1000 份企业文档在 2 分钟内处理完毕。

随后发起复杂查询,观察 Agent 的规划与检索过程:

“`bash
kubectl get ingress

输出示例:api.your-rag-platform.com

curl -X POST https://api.your-rag-platform.com/api/v1/chat/stream
-H “Content-Type: application/json”
-d ‘{
“message”: “Compare the cost of the HPA vs VPA in Kubernetes based on the docs.”,
“session_id”: “demo-session-1”
}’
“`

系统返回流式响应,可观察到 planner -> retriever -> responder 的状态流转,并生成附带引用来源的专业回答。结构化日志提供了各阶段的延迟分解(如 planner 节点耗时、retrieval 并行时间、vLLM 首字生成时间等),整体响应时间在 2–4 秒内完成。

为在引入更多 Agent(如代码审查、网络搜索)后仍能维持约 3 秒的延迟,可将不同 Agent 分布到多个 Ray actors 中并行执行。

Redis 与 Grafana 仪表盘分析

使用 Locust 启动 500 并发压测:

bash
locust -f scripts/load_test.py --headless -u 500 -r 10 --host https://api.your-rag-platform.com

观察 Ray 与 Grafana 仪表盘:随着请求队列加深,Ray Autoscaler 请求更多 GPU worker,Karpenter 在秒级内拉起新的 g5 节点。压测初期延迟上升至约 5 秒,随后系统稳定,延迟恢复至约 1.2 秒。

Autoscaler 日志显示扩容触发,目标节点数提升,在 90 秒内处理容量实现翻倍,延迟恢复稳定。

至此,系统已实现在高噪声数据下保持高效的文档摄取与处理能力,具备自动伸缩性,并在负载降低后“缩容至零”以优化成本。

您可以通过以下 GitHub 项目获取相关实现:

https://github.com/FareedKhan-dev/scalable-rag-pipeline

基于此,您可以继续深入,构建更复杂的 Agentic RAG Pipeline。


关注“鲸栖”小程序,掌握最新AI资讯

本文来自网络搜集,不代表鲸林向海立场,如有侵权,联系删除。转载请注明出处:http://www.itsolotime.com/archives/18594

(0)
上一篇 2026年1月22日 上午7:07
下一篇 2026年1月22日 下午12:08

相关推荐

  • 揭秘Prompt工程:一个简单技巧让AI准确率提升200%

    一个简单技巧,让你的 AI 准确率飙升 200% 为什么你的 AI 总是出错(以及如何修复) 想象一下:深夜加班赶项目,你问 AI 助手:“Who is the current Prime Minister of the UK?” 它自信地回答:“Boris Johnson.” 但你知道这已经过时了。你甚至在对话中提供了最新的信息,可它却置若罔闻,固执地依赖…

    2026年1月14日
    6500
  • 2026年企业级RAG系统构建指南:8大关键工具避免生产环境陷阱

    每个可靠的企业级 RAG 系统背后的 8 个关键工具 我第一次认真构建检索增强生成(RAG)系统时,犯了大多数团队都会犯的同样错误。 我以为 RAG 的核心是选对模型。 其实不是。 企业级 RAG 关注的是模型之外的一切——那些枯燥、脆弱、不光鲜,但决定你的系统能否多年稳定运行、还是在真实流量面前崩塌的部分。 当你越过演示阶段,进入生产环境时,你会发现 RA…

    5天前
    6900
  • Agent Infra:驾驭不确定性,开启智能体工程化落地新纪元

    毋庸置疑,2025年堪称「Agent元年」。 从年初到年末,Agent的热度持续攀升——从Manus到近期的豆包手机,Agent已成为全行业关注的焦点。回顾这一年,也是Agent从技术萌芽走向工程化落地的关键一年。 为此,量子位邀请到两位行业专家——Dify开源生态负责人郑立与腾讯云云原生产品副总经理于广游,共同探讨Agent落地过程中的挑战、机遇与未来。核…

    2025年12月23日
    13000
  • PostgreSQL向量检索实战解析:生产级应用还是技术炒作?

    一家电商初创公司的工程团队正面临一个典型的技术选型难题。他们的推荐系统需要实现语义搜索,以匹配用户查询与海量商品描述。团队的核心争议在于:是选择 Qdrant 或 Pinecone 这类专用向量数据库,还是采用 pgvector 扩展,将所有数据保留在 PostgreSQL 中? 这并非个例。随着 AI 驱动的搜索与 RAG(检索增强生成)系统在各行业普及,…

    2025年12月3日
    8100
  • 揭秘RAG排序层:LambdaMART如何成为检索增强生成成败的关键

    那层几乎无人提及、却决定你AI应用成败的排序层。 Google、Netflix、具备联网搜索功能的ChatGPT,它们有何共通之处?都依赖一个排序算法来决定你首先看到什么。它不决定“有什么”,而是决定你“看见什么”。 当我们的团队调试RAG流水线,探究为何它对某些查询返回一堆无关内容时,“排序学习”问题一次次浮现。算法本身不难找到,但几乎没有人在构建AI应用…

    2025年12月9日
    9300