面向大型数据集、符合行业标准的 Agentic RAG Pipeline 需要基于清晰、可扩展的分层架构进行构建。我们将系统结构化,使得 Agent 能够并行地进行推理、获取上下文、使用工具以及与数据库交互。每一层都承担明确的职责,涵盖从数据摄取、模型服务到 Agent 协调的全过程。这种分层方法有助于系统实现可预测的扩展,同时为终端用户保持较低的响应延迟。
Scalable RAG Pipeline (Created by Fareed Khan)
一个围绕 Agent 构建的可扩展 RAG Pipeline,通常包含以下六个核心层:
- 数据摄取层:通过文档加载、分块、索引将原始数据转化为结构化知识;可结合 S3、关系型数据库、Ray 进行扩展。
- AI 计算层:高效运行大语言模型与嵌入模型,将模型映射到 GPU/CPU 资源,实现低延迟、大规模推理。
- Agentic AI 流程层:支持 Agent 推理、查询增强、工作流编排,配合 API、缓存与分布式执行。
- 工具与沙箱层:提供安全的计算、搜索与 API 测试环境,不影响生产工作负载。
- 基础设施即代码层:自动化部署、网络、集群与自动扩缩容,确保基础设施可复现、可扩展。
- 部署与评估层:处理密钥、数据库、集群、监控与日志,保障规模化下的可靠运行。
本文将围绕 Agent 构建上述六层 RAG Pipeline,展示如何搭建一个可扩展的生产级系统。
我们的“大数据”与噪声文档
企业级代码库通常包含多种类型的文档,如演示文稿、技术文档、报告、网页等,其中总会掺杂大量噪声数据。对于 RAG Pipeline 而言,处理噪声数据是使系统在真实环境中更可靠、更精确所必须面对的挑战。
我们将使用 Kubernetes 官方文档作为“真实数据”,同时从 tpn/pdfs 仓库引入 95% 的“噪声数据”。该仓库包含大量与主题无关的随机 PDF 文件,用于模拟真实企业代码库的环境。
通过这种方式,我们能够评估在超高噪声环境下,企业级 RAG Pipeline 的真实表现。
首先,克隆 tpn/pdfs 仓库以获取噪声数据(该仓库包含 1800 多个文档,克隆耗时可能较长):
“`bash
克隆噪声数据仓库
git clone https://github.com/tpn/pdfs.git
“`
该仓库包含 PDF、DOCX、TXT、HTML 等多种文档类型。我们将从中随机抽样 950 个文档:
“`bash
创建 noisy_data 目录
mkdir noisy_data
从 tpn/pdfs 仓库中随机采样 950 个文档
find ./pdfs -type f ( -name “.pdf” -o -name “.docx” -o -name “.txt” -o -name “.html” ) | shuf -n 950 | xargs -I {} cp {} ./noisy_data/
“`
此操作将在 noisy_data 目录下生成 950 份随机采样的文档。
在真实知识来源方面,我们抓取了 Kubernetes 官方开源文档,并将其保存为 PDF、DOCX、TXT、HTML 等多种格式,存放在 true_data 目录中。
接下来,将噪声数据与真实数据合并到一个统一的 data 目录:
“`bash
创建 data 目录
mkdir data
拷贝真实数据
cp -r ./true_data/* ./data/
拷贝噪声数据
cp -r ./noisy_data/* ./data/
“`
最后,验证 data 目录中的文档总数:
“`bash
统计 data 目录总文档数
find ./data -type f ( -name “.pdf” -o -name “.docx” -o -name “.txt” -o -name “.html” ) | wc -l
输出示例
1000
“`
至此,我们在 data 目录中共有 1000 份文档,其中 950 份为噪声文档,50 份为真实文档。
接下来,我们将进入下一步:构建企业级 RAG Pipeline 的代码库。
构建企业级代码库
一个简单的 Agentic RAG Pipeline 代码库可能只包含一个向量数据库、少量 AI 模型和简单的数据摄取脚本。然而,随着系统复杂度的上升,我们需要将整体架构拆分为更小、更易于管理的组件。
建立如下有组织的目录结构:
scalable-rag-core/ # 最小可用的生产级 RAG 系统
├── infra/ # 核心云基础设施
│ ├── terraform/ # 集群、网络、存储
│ └── karpenter/ # 节点自动扩缩(CPU/GPU)
│
├── deploy/ # Kubernetes 部署层
│ ├── helm/ # 数据库与有状态服务
│ │ ├── qdrant/ # 向量数据库
│ │ └── neo4j/ # 知识图谱
│ ├── ray/ # Ray + Ray Serve(LLM & embeddings)
│ └── ingress/ # API ingress
│
├── models/ # 模型配置(与基础设施解耦)
│ ├── embeddings/ # Embedding 模型
│ ├── llm/ # LLM 推理配置
│ └── rerankers/ # Cross-encoder rerankers
│
├── pipelines/ # 离线与异步 RAG 流水线
│ └── ingestion/ # 文档 ingestion 流程
│ ├── loaders/ # PDF / HTML / DOC 加载器
│ ├── chunking/ # Chunking 与元数据
│ ├── embedding/ # Embedding 计算
│ ├── indexing/ # 向量 + 图谱索引
│ └── graph/ # 知识图谱抽取
│
├── libs/ # 共享核心库
│ ├── schemas/ # 请求/响应 schema
│ ├── retry/ # 弹性与重试
│ └── observability/ # Metrics & tracing
│
├── services/ # 在线服务层
│ ├── api/ # RAG API
│ │ └── app/
│ │ ├── agents/ # Agentic 编排
│ │ │ └── nodes/ # Planner / Retriever / Responder
│ │ ├── clients/ # Vector DB、Graph DB、Ray 客户端
│ │ ├── cache/ # 语义与响应缓存
│ │ ├── memory/ # 会话记忆
│ │ ├── enhancers/ # Query rewriting, HyDE
│ │ ├── routes/ # Chat & retrieval APIs
│ │ └── tools/ # Vector search, graph search
│ │
│ └── gateway/ # 限流 / API 保护
看起来复杂,但我们先聚焦最重要的目录:
- deploy/:包含 Ray、ingress controller、密钥管理等组件的部署配置。
- infra/:使用 Terraform 与 Karpenter 的 IaC 脚本,搭建云资源。
- pipelines/:ingestion 与任务管理流水线,包括文档加载、chunking、embedding 计算、图谱抽取与索引。
- services/:主应用服务,包括 API server、gateway 配置和执行不可信代码的 sandbox 环境。
很多组件(如 loaders、chunking)各自拥有子目录,进一步明确边界、提升可维护性。
开发流程管理
在编写 agent 架构之前,第一步是搭建本地开发环境。可扩展项目通常会自动化这一步,从而新成员加入无需重复手动配置。
开发环境通常包含三类内容:
- .env.example:分享本地开发所需的环境变量。开发者复制为 .env,并按 dev/staging/prod 阶段填写值。
- Makefile:封装构建、测试、部署等常用命令。
- docker-compose.yml:定义本地运行整个 RAG pipeline 所需的所有服务(以容器形式)。
创建 .env.example 以共享本地开发的环境变量:
“`
.env.example
复制为 .env,并填写值
— APP SETTINGS —
ENV=dev
LOG_LEVEL=INFO
SECRET_KEY=change_this_to_a_secure_random_string_for_jwt
“`
首先定义基础应用配置,例如环境变量、日志级别和 JWT 密钥。这些配置用于标识应用当前所处的阶段(开发/预发/生产)。
“`bash
— DATABASE (Aurora Postgres) —
DATABASE_URL=postgresql+asyncpg://ragadmin:changeme@localhost:5432/rag_db
— CACHE (Redis) —
REDIS_URL=redis://localhost:6379/0
— VECTOR DB (Qdrant) —
QDRANT_HOST=localhost
QDRANT_PORT=6333
QDRANT_COLLECTION=rag_collection
— GRAPH DB (Neo4j) —
NEO4J_URI=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=password
“`
RAG Pipeline 依赖多种数据存储方案来满足不同的数据管理和追踪需求:
- Aurora Postgres:存储聊天历史与元数据。
- Redis:缓存高频访问的数据。
- Qdrant:作为向量数据库,存储和管理文本嵌入向量。
- Neo4j:作为图数据库,存储实体间的复杂关系。
“`bash
— AWS (Infrastructure) —
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
S3_BUCKET_NAME=rag-platform-docs-dev
— RAY CLUSTER (AI Engines) —
在 K8s 环境中,这些应指向内部 Service DNS
本地开发可能需要端口转发
RAY_LLM_ENDPOINT=http://localhost:8000/llm
RAY_EMBED_ENDPOINT=http://localhost:8000/embed
— OBSERVABILITY —
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
“`
为应对海量数据,系统采用 AWS S3 作为主文档存储,并使用集群化的 Ray Serving 来托管 AI 模型(如 LLM、Embedding 模型、Reranker 模型),以实现高效的存储与检索。
接下来,创建一个 Makefile 来自动化构建、测试和部署等常用任务:
“`makefile
Makefile
.PHONY: help install dev up down deploy test infra
help:
@echo “RAG Platform Commands:”
@echo ” make install – Install Python dependencies”
@echo ” make dev – Run FastAPI server locally”
@echo ” make up – Start local DBs (Docker)”
@echo ” make down – Stop local DBs”
@echo ” make deploy – Deploy to AWS EKS via Helm”
@echo ” make infra – Apply Terraform”
install:
pip install -r services/api/requirements.txt
本地开发环境
up:
docker-compose up -d
down:
docker-compose down
本地运行 API(热重载)
dev:
uvicorn services.api.main:app –reload –host 0.0.0.0 –port 8000 –env-file .env
基础设施
infra:
cd infra/terraform && terraform init && terraform apply
Kubernetes 部署
deploy:
# 更新依赖
helm dependency update deploy/helm/api
# 安装/升级
helm upgrade –install api deploy/helm/api –namespace default
helm upgrade –install ray-cluster kuberay/ray-cluster -f deploy/ray/ray-cluster.yaml
test:
pytest tests/
“`
Makefile 是可扩展项目中标准的自动化工具。此处定义了 install、dev、up、down、deploy、infra 和 test 等命令,以管理完整的开发工作流。
最后,创建 docker-compose.yml 文件,以便在本地以容器化方式运行整个 RAG pipeline。容器化有助于隔离不同组件并简化依赖管理。
“`yaml
docker-compose.yml
version: ‘3.8’
services:
# 1. Postgres(聊天历史)
postgres:
image: postgres:15-alpine
environment:
POSTGRES_USER: ragadmin
POSTGRES_PASSWORD: changeme
POSTGRES_DB: rag_db
ports:
– “5432:5432”
volumes:
– pg_data:/var/lib/postgresql/data
# 2. Redis(缓存)
redis:
image: redis:7-alpine
ports:
– “6379:6379”
# 3. Qdrant(向量数据库)
qdrant:
image: qdrant/qdrant:v1.7.3
ports:
– “6333:6333”
volumes:
– qdrant_data:/qdrant/storage
# 4. Neo4j(图数据库)
neo4j:
image: neo4j:5.16.0-community
environment:
NEO4J_AUTH: neo4j/password
NEO4J_dbms_memory_pagecache_size: 1G
ports:
– “7474:7474” # HTTP
– “7687:7687” # Bolt
volumes:
– neo4j_data:/data
# 5. MinIO(S3 Mock)- 全离线开发可选
minio:
image: minio/minio
command: server /data
ports:
– “9000:9000”
– “9001:9001”
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
volumes:
pg_data:
qdrant_data:
neo4j_data:
“`
在小规模项目中,通常使用 pip 或 virtualenv 管理依赖;而对于需要扩展的项目,更推荐使用 Docker 容器来隔离每个组件。在上述 YAML 配置中,我们为每个服务指定了不同的端口,以避免冲突并便于监控,这也是大型项目中的最佳实践。
核心通用工具
在项目结构与开发流程准备就绪后,首先需要建立统一的 ID 生成策略。当用户发送一条聊天消息时,会触发多个并行流程;将这些流程关联起来,有助于跨组件追踪整个会话的相关问题。
这在生产系统中非常常见:根据用户操作(如点击、请求)进行全链路关联,便于后续的监控、调试与追踪。
创建 libs/utils/ids.py 文件,用于为会话、文件上传及 OpenTelemetry 追踪生成唯一 ID。
“`python
libs/utils/ids.py
import uuid
import hashlib
def generate_session_id() -> str:
“””为聊天会话生成标准 UUID”””
return str(uuid.uuid4())
def generate_file_id(content: bytes) -> str:
“””
基于文件内容生成确定性 ID。
防止重复上传同一文件。
“””
return hashlib.md5(content).hexdigest()
def generate_trace_id() -> str:
“””为 OpenTelemetry 追踪生成 ID”””
return uuid.uuid4().hex
“`
同时,我们需要对各个函数的执行时间进行度量,以便进行性能监控与优化。创建 libs/utils/timing.py 文件,分别处理同步与异步函数的计时。
“`python
libs/utils/timing.py
import functools
import time
import logging
logger = logging.getLogger(name)
def measure_time(func):
“””
记录同步函数的执行时间。
“””
@functools.wraps(func)
def wrapper(args, kwargs):
start_time = time.perf_counter()
result = func(args, **kwargs)
end_time = time.perf_counter()
execution_time = (end_time – start_time) * 1000 # ms
logger.info(f”Function ‘{func.name}’ took {execution_time:.2f} ms”)
return result
return wrapper
def measure_time_async(func):
“””
记录异步函数的执行时间。
“””
@functools.wraps(func)
async def wrapper(args, kwargs):
start_time = time.perf_counter()
result = await func(args, **kwargs)
end_time = time.perf_counter()
execution_time = (end_time – start_time) * 1000 # ms
logger.info(f”Async Function ‘{func.name}’ took {execution_time:.2f} ms”)
return result
return wrapper
“`
最后,生产级 RAG 系统需要实现重试机制。我们通常采用指数退避策略来处理错误。创建 libs/retry/backoff.py:
“`python
def exponential_backoff(max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 10.0):
“””
指数退避 + 抖动 的装饰器。
捕获异常并重试(异步函数)。
“””
def decorator(func):
@wraps(func)
async def wrapper(args, kwargs):
retries = 0
while True:
try:
return await func(args, **kwargs)
except Exception as e:
if retries >= max_retries:
logger.error(f”Max retries reached for {func.name}: {e}”)
raise e
# 算法: base * (2 ^ retries) + random_jitter
# 抖动避免服务器端的 “Thundering Herd” 问题
delay = min(base_delay * (2 ** retries), max_delay)
jitter = random.uniform(0, 0.5)
sleep_time = delay + jitter
logger.warning(f"Error in {func.__name__}: {e}. Retrying in {sleep_time:.2f}s...")
await asyncio.sleep(sleep_time)
retries += 1
return wrapper
return decorator
“`
延迟计算公式 base * (2 ^ retries) + random_jitter 有助于避免“惊群效应”。
Data Ingestion Layer
RAG 流程的第一步是将文档引入系统。对于小规模应用,可以使用脚本顺序读取文件;而在企业级 RAG 流水线中,数据摄取需要具备高吞吐、异步处理能力,能够同时处理数千个文件,且不能拖垮 API 服务。
Data Ingestion Layer (Created by Fareed Khan)
我们使用 Ray Data 将摄取逻辑拆分为分布式流水线。
Ray Data 允许创建有向无环图,并在集群的多节点上并行执行任务。
这使得我们可以独立扩展解析(CPU 密集型)和向量化(GPU 密集型)任务。
文档加载与配置
首先,需要对摄取参数进行集中式配置管理。在生产环境中,将分块大小或数据库集合名称等参数硬编码是极不可靠的做法。
Document Processing (Created by Fareed Khan)
创建 pipelines/ingestion/config.yaml 来存放摄取流水线的所有配置:
“`yaml
pipelines/ingestion/config.yaml
chunking:
# 512 tokens 通常是 RAG 的“甜蜜点”(上下文足够、噪声不多)
chunk_size: 512
# 重叠确保分割点处的上下文不丢失
chunk_overlap: 50
# 递归切分的分隔符(段落 -> 句子 -> 词)
separators: [“nn”, “n”, ” “, “”]
embedding:
# Ray Serve 的 endpoint
endpoint: “http://ray-serve-embed:8000/embed”
batch_size: 100
graph:
# 控制 LLM 抽取的速度与成本
concurrency: 10
# true 时严格遵循 schema.py 本体
enforce_schema: true
vector_db:
collection_name: “rag_collection”
distance_metric: “Cosine”
“`
对于 loader:企业系统中的 PDF 文件通常体积庞大。将 100MB 的 PDF 直接读入内存可能导致 Kubernetes 工作节点内存溢出(OOM)。解决方案是使用临时文件配合 unstructured 库,以磁盘空间换取内存,具体实现见 pipelines/ingestion/loaders/pdf.py:
“`python
pipelines/ingestion/loaders/pdf.py
import tempfile
from unstructured.partition.pdf import partition_pdf
def parse_pdf_bytes(file_bytes: bytes, filename: str):
“””
使用临时文件解析 PDF,降低内存压力。
“””
text_content = “”
# 使用磁盘而非 RAM,防止大文件导致 worker 崩溃
with tempfile.NamedTemporaryFile(suffix=”.pdf”, delete=True) as tmp_file:
tmp_file.write(file_bytes)
tmp_file.flush()
# 'hi_res' 策略:OCR + 布局分析
elements = partition_pdf(filename=tmp_file.name, strategy="hi_res")
for el in elements:
text_content += str(el) + "n"
return text_content, {"filename": filename, "type": "pdf"}
“`
对于其他格式,则使用轻量级解析器。Word 文档解析器位于 pipelines/ingestion/loaders/docx.py:
“`python
pipelines/ingestion/loaders/docx.py
import docx
import io
def parse_docx_bytes(file_bytes: bytes, filename: str):
“””解析 .docx,提取文本与简单表格”””
doc = docx.Document(io.BytesIO(file_bytes))
full_text = []
for para in doc.paragraphs:
if para.text.strip():
full_text.append(para.text)
return "nn".join(full_text), {"filename": filename, "type": "docx"}
“`
HTML 解析器位于 pipelines/ingestion/loaders/html.py:其核心是去除 script 和 style 标签,避免 CSS/JS 代码污染向量化内容。
“`python
pipelines/ingestion/loaders/html.py
from bs4 import BeautifulSoup
def parse_html_bytes(file_bytes: bytes, filename: str):
“””清理脚本/样式,提取纯文本”””
soup = BeautifulSoup(file_bytes, “html.parser”)
# 去除干扰元素
for script in soup(["script", "style", "meta"]):
script.decompose()
return soup.get_text(separator="n"), {"filename": filename, "type": "html"}
“`
Chunking 与知识图谱
提取出原始文本后,需要进行转换。我们在 pipelines/ingestion/chunking/splitter.py 中定义文本分割器,将文本切分为 512 个 token 的片段,这是许多嵌入模型的标准输入限制。
Chunking and KG (Created by Fareed Khan)
“`python
pipelines/ingestion/chunking/splitter.py
from langchain.text_splitter import RecursiveCharacterTextSplitter
def split_text(text: str, chunk_size: int = 512, overlap: int = 50):
“””将文本切成重叠 chunk,保留边界处上下文”””
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=overlap,
separators=[“nn”, “n”, “.”, ” “, “”]
)
chunks = splitter.create_documents([text])
# 为 Ray pipeline 映射为字典格式
return [{"text": c.page_content, "metadata": {"chunk_index": i}} for i, c in enumerate(chunks)]
“`
在 pipelines/ingestion/chunking/metadata.py 中增强元数据。在分布式系统中,去重是必要步骤,因此我们为每个文本块生成内容哈希:
“`python
pipelines/ingestion/chunking/metadata.py
import hashlib
import datetime
def enrich_metadata(base_metadata: dict, content: str) -> dict:
“””增加哈希与时间戳,用于去重与新鲜度追踪”””
return {
**base_metadata,
“chunk_hash”: hashlib.md5(content.encode(‘utf-8’)).hexdigest(),
“ingested_at”: datetime.datetime.utcnow().isoformat()
}
“`
接下来进行 GPU 工作——生成向量嵌入。不应在数据摄取脚本内直接加载模型(冷启动慢),而是调用 Ray Serve 服务端点。这样,摄取作业只需向常驻的模型服务发送 HTTP 请求。创建 pipelines/ingestion/embedding/compute.py:
“`python
pipelines/ingestion/embedding/compute.py
import httpx
class BatchEmbedder:
“””Ray Actor:对文本批量调用 Embedding Service”””
def init(self):
# 指向 K8s 内部服务 DNS
self.endpoint = “http://ray-serve-embed:8000/embed”
self.client = httpx.Client(timeout=30.0)
def __call__(self, batch):
"""将一批文本发往 GPU 服务"""
response = self.client.post(
self.endpoint,
json={"text": batch["text"], "task_type": "document"}
)
batch["vector"] = response.json()["embeddings"]
return batch
“`
同时抽取知识图谱。为保持图谱结构清晰,在 pipelines/ingestion/graph/schema.py 中定义严格的模式,否则大语言模型容易产生随机的关系类型(幻觉)。
“`python
pipelines/ingestion/graph/schema.py
from typing import Literal
限制 LLM 仅使用这些实体/关系
VALID_NODE_LABELS = Literal[“Person”, “Organization”, “Location”, “Concept”, “Product”]
VALID_RELATION_TYPES = Literal[“WORKS_FOR”, “LOCATED_IN”, “RELATES_TO”, “PART_OF”]
class GraphSchema:
@staticmethod
def get_system_prompt() -> str:
return f”Extract nodes/edges. Allowed Labels: {VALID_NODE_LABELS.args}…”
“`
在 pipelines/ingestion/graph/extractor.py 中应用该模式,利用 LLM 理解文本结构(而非仅依赖语义相似度):
“`python
pipelines/ingestion/graph/extractor.py
import httpx
import json
from typing import Dict, Any
from pipelines.ingestion.graph.schema import GraphSchema
class GraphExtractor:
“””
用于图谱抽取的 Ray Actor。
调用内部 LLM Service 抽取实体。
“””
def init(self):
# 指向内部 Ray Serve LLM endpoint(K8s DNS)
self.llm_endpoint = “http://ray-serve-llm:8000/llm/chat”
self.client = httpx.Client(timeout=60.0) # 需要较长推理超时
def __call__(self, batch: Dict[str, Any]) -> Dict[str, Any]:
"""
处理一批文本 chunk。
"""
nodes_list = []
edges_list = []
for text in batch["text"]:
try:
# 1. 构造 Prompt
prompt = f"""
{GraphSchema.get_system_prompt()}
Input Text:
{text}
"""
# 2. 调用 LLM(例如 Llama-3-70B)
response = self.client.post(
self.llm_endpoint,
json={
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.0, # 确保确定性输出
"max_tokens": 1024
}
)
response.raise_for_status()
# 3. 解析 JSON 输出
content = response.json()["choices"][0]["message"]["content"]
graph_data = json.loads(content)
# 4. 聚合结果
nodes_list.append(graph_data.get("nodes", []))
edges_list.append(graph_data.get("edges", []))
except Exception as e:
# 记录错误但不中断流水线;为该 chunk 返回空图
print(f"Graph extraction failed for chunk: {e}")
nodes_list.append([])
edges_list.append([])
batch["graph_nodes"] = nodes_list
batch["graph_edges"] = edges_list
return batch
“`
高吞吐索引
大规模 RAG 系统通常采用批量写入而非逐条记录写入,这能显著降低 GPU/CPU 开销与内存占用。

向量索引使用 pipelines/ingestion/indexing/qdrant.py,连接 Qdrant 集群并执行原子 upsert 操作:
“`python
pipelines/ingestion/indexing/qdrant.py
from qdrant_client import QdrantClient
from qdrant_client.http import models
import uuid
class QdrantIndexer:
“””以批量 upsert 写入 Qdrant”””
def init(self):
self.client = QdrantClient(host=”qdrant-service”, port=6333)
def write(self, batch):
points = [
models.PointStruct(id=str(uuid.uuid4()), vector=row["vector"], payload=row["metadata"])
for row in batch if "vector" in row
]
self.client.upsert(collection_name="rag_collection", points=points)
“`
图谱索引通过 pipelines/ingestion/indexing/neo4j.py 实现,利用 Cypher 的 MERGE 语句确保操作的幂等性,即使重复执行数据摄取也不会产生重复的节点:
“`python
pipelines/ingestion/indexing/neo4j.py
from neo4j import GraphDatabase
class Neo4jIndexer:
“””使用幂等 MERGE 写入图数据”””
def init(self):
self.driver = GraphDatabase.driver(“bolt://neo4j-cluster:7687”, auth=(“neo4j”, “pass”))
def write(self, batch):
with self.driver.session() as session:
# 扁平化 batch,并以单事务执行,提升性能
session.execute_write(self._merge_graph_data, batch)
“`
基于 Ray 的事件驱动工作流
最后,将这些组件组合起来。目标是让文件读取、分块(chunking)和向量嵌入(embedding)能够在不同的 CPU/GPU 节点上并行执行。创建 pipelines/ingestion/main.py 作为编排“指挥官”,使用 Ray Data 构建一个惰性执行的 DAG(有向无环图):
“`python
pipelines/ingestion/main.py
import ray
from pipelines.ingestion.embedding.compute import BatchEmbedder
from pipelines.ingestion.indexing.qdrant import QdrantIndexer
def main(bucket_name: str, prefix: str):
“””
主编排流程
“””
# 1. 使用 Ray Data 从 S3 读取(二进制)
ds = ray.data.read_binary_files(
paths=f”s3://{bucket_name}/{prefix}”,
include_paths=True
)
# 2. 解析 & Chunk(Map)
chunked_ds = ds.map_batches(
process_batch,
batch_size=10, # 每个 worker 一次处理 10 个文件
num_cpus=1
)
# 3. 分叉 A:向量 Embedding(GPU 密集)
vector_ds = chunked_ds.map_batches(
BatchEmbedder,
concurrency=5, # 5 个并发 embedder
num_gpus=0.2, # 每个 embedder 分配少量 GPU,重负载由 Ray Serve 处理
batch_size=100 # 100 个 chunk 一批向量化
)
# 4. 分叉 B:图谱抽取(LLM 密集)
graph_ds = chunked_ds.map_batches(
GraphExtractor,
concurrency=10,
num_gpus=0.5, # 需要较强 LLM 推理性能
batch_size=5
)
# 5. 写入(索引)
vector_ds.write_datasource(QdrantIndexer())
graph_ds.write_datasource(Neo4jIndexer())
print("Ingestion Job Completed Successfully.")
“`
在 Kubernetes 上运行此作业时,需要在 pipelines/jobs/ray_job.yaml 中指定运行时环境及其依赖:
“`yaml
pipelines/jobs/ray_job.yaml
entrypoint: “python pipelines/ingestion/main.py”
runtime_env:
working_dir: “./”
pip: [“boto3”, “qdrant-client”, “neo4j”, “langchain”, “unstructured”]
“`
在企业级架构中,我们通常不会手动触发作业,而是采用“事件驱动”模式:当文件被上传到 S3 存储桶时,由 AWS Lambda 函数触发 Ray 作业。相关逻辑见 pipelines/jobs/s3_event_handler.py:
“`python
pipelines/jobs/s3_event_handler.py
from ray.job_submission import JobSubmissionClient
def handle_s3_event(event, context):
“””由 S3 上传触发 -> 提交 Ray Job”””
client = JobSubmissionClient(“http://rag-ray-cluster-head-svc:8265″)
client.submit_job(
entrypoint=f”python pipelines/ingestion/main.py {bucket} {key}”,
runtime_env={“working_dir”: “./”}
)
“`
为了测试整个流程,可以使用 scripts/bulk_upload_s3.py 脚本,通过多线程将准备好的噪声数据批量上传至 S3:
“`python
scripts/bulk_upload_s3.py
from concurrent.futures import ThreadPoolExecutor
def upload_directory(dir_path, bucket_name):
“””高性能多线程 S3 上传器”””
with ThreadPoolExecutor(max_workers=10) as executor:
# 将本地文件映射为 S3 上传任务
executor.map(upload_file, files_to_upload)
“`
至此,数据摄取层(Data Ingestion Layer)的构建已经完成。通过为底层的 Kubernetes 集群增加节点,该系统可以轻松扩展到处理数百万文档的规模。
AI Compute Layer
在构建了数据摄取管道之后,我们需要一个能够处理复杂推理任务的“计算层”。在单体应用中,直接在 Web 服务器内加载大型语言模型(LLM)或许可行,但在企业级 RAG 平台中,这种做法会带来严重的性能瓶颈和扩展性问题。
将 70B 参数量的模型加载到 Web 服务器内会极大拖累系统吞吐量,使得水平扩展几乎不可能实现。

因此,我们需要将应用服务(如 FastAPI)与 AI 模型推理服务解耦。本设计采用 Ray Serve 将模型托管为独立的微服务,使其能够根据 GPU 资源状况和请求流量进行自动伸缩。
模型配置与硬件映射
在生产环境中,绝不推荐在代码中硬编码模型参数。我们需要一套灵活的配置系统,以便能够无缝切换模型、调整量化策略、修改批处理大小等。

例如,在 models/llm/llama-70b.yaml 中定义主力 LLM 配置:使用 Llama-3-70B-Instruct 模型。其 FP16 格式约需 140GB 显存,因此采用 AWQ 量化技术,使其能够在更经济的 GPU 配置上运行。
许多公司的实践表明,应将精力更多放在数据质量与智能体(Agent)逻辑上,而非执着于选择“最终使用哪一个特定的大模型”。
“`yaml
models/llm/llama-70b.yaml
model_config:
model_id: “meta-llama/Meta-Llama-3-70B-Instruct”
quantization: “awq”
max_model_len: 8192
max_num_seqs: 128
gpu_memory_utilization: 0.90
tensor_parallel_size: 4
stop_token_ids: [128001, 128009]
“`
注意 tensor_parallel_size: 4 是一项企业级配置,表示需要将模型权重切分到 4 块 GPU 上进行张量并行推理。
同时,可以保留一个较小的模型配置 models/llm/llama-7b.yaml,用于查询重写(query rewriting)或摘要生成(summarization)等对成本敏感的任务:
“`yaml
models/llm/llama-7b.yaml
model_config:
model_id: “meta-llama/Meta-Llama-3-8B-Instruct”
quantization: “awq”
max_model_len: 8192
max_num_seqs: 256
gpu_memory_utilization: 0.85
tensor_parallel_size: 1
stop_token_ids: [128001, 128009]
“`
检索侧的嵌入模型配置于 models/embeddings/bge-m3.yaml。BGE-M3 模型具备稠密检索、稀疏检索和多语言检索能力,非常适合全球化平台:
“`yaml
models/embeddings/bge-m3.yaml
model_config:
model_id: “BAAI/bge-m3”
batch_size: 32
normalize_embeddings: true
dtype: “float16”
max_seq_length: 8192
“`
为了进一步提升检索准确率,引入重排序器(reranker)。配置 models/rerankers/bge-reranker.yaml,在将文档送入 LLM 前对 Top-K 结果进行重排,可显著降低模型幻觉:
“`yaml
models/rerankers/bge-reranker.yaml
model_config:
model_id: “BAAI/bge-reranker-v2-m3”
dtype: “float16”
max_length: 512
batch_size: 16
“`
使用 vLLM 与 Ray 进行模型服务
标准的 HuggingFace Transformers Pipeline 在高并发生产环境下往往效率不足。我们采用 vLLM 推理引擎,其核心的 PagedAttention 技术能极大提升吞吐量。

“`python
services/api/app/models/vllm_engine.py
from ray import serve
from vllm import AsyncLLMEngine, EngineArgs, SamplingParams
from transformers import AutoTokenizer
import os
@serve.deployment(autoscaling_config={“min_replicas”: 1, “max_replicas”: 10}, ray_actor_options={“num_gpus”: 1})
class VLLMDeployment:
def init(self):
model_id = os.getenv(“MODEL_ID”, “meta-llama/Meta-Llama-3-70B-Instruct”)
self.tokenizer = AutoTokenizer.from_pretrained(model_id)
args = EngineArgs(
model=model_id,
quantization=”awq”,
gpu_memory_utilization=0.90,
max_model_len=8192
)
self.engine = AsyncLLMEngine.from_engine_args(args)
async def __call__(self, request):
body = await request.json()
messages = body.get("messages", [])
prompt = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
sampling_params = SamplingParams(
temperature=body.get("temperature", 0.7),
max_tokens=body.get("max_tokens", 1024),
stop_token_ids=[self.tokenizer.eos_token_id, self.tokenizer.convert_tokens_to_ids("<|eot_id|>")]
)
request_id = str(os.urandom(8).hex())
results_generator = self.engine.generate(prompt, sampling_params, request_id)
final_output = None
async for request_output in results_generator:
final_output = request_output
text_output = final_output.outputs[0].text
return {"choices": [{"message": {"content": text_output, "role": "assistant"}}]}
app = VLLMDeployment.bind()
“`
@serve.deployment 装饰器将类的生命周期交由 Ray Serve 管理。autoscaling_config 是实现生产级弹性的关键配置,它允许服务在流量激增时自动扩容(最多至 10 个副本),并在空闲时缩容以优化资源成本。
部署 Embedding 与 Re-Ranker 模型
与 LLM 不同,Embedding 模型通常不需要复杂的自回归生成,但对批量推理的效率要求极高。例如,当 50 个用户并发进行检索时,系统应能通过一次 GPU 前向传播同时编码所有查询,而非串行处理 50 次。
以下代码在 services/api/app/models/embedding_engine.py 中实现了一个高效的 Embedding 服务。注意 ray_actor_options={"num_gpus": 0.5} 的配置,它允许两个 Embedding 模型副本共享同一块 GPU,从而显著节省硬件成本。
“`python
services/api/app/models/embedding_engine.py
from ray import serve
from sentence_transformers import SentenceTransformer
import os
import torch
@serve.deployment(
num_replicas=1,
ray_actor_options={“num_gpus”: 0.5} # 共享 GPU
)
class EmbedDeployment:
def init(self):
model_name = “BAAI/bge-m3″
self.model = SentenceTransformer(model_name, device=”cuda”)
self.model = torch.compile(self.model)
async def __call__(self, request):
body = await request.json()
texts = body.get("text")
task_type = body.get("task_type", "document")
if isinstance(texts, str):
texts = [texts]
embeddings = self.model.encode(texts, batch_size=32, normalize_embeddings=True)
return {"embeddings": embeddings.tolist()}
app = EmbedDeployment.bind()
“`
该部署可同时服务于文档入库(ingestion)时的文档向量化与查询时的查询向量化任务。通过 torch.compile 对模型进行图优化,可以进一步挖掘 GPU 的计算性能。
异步内部服务客户端
API 网关层需要与上述部署在 Ray Serve 中的模型服务进行通信。由于这些模型已作为独立的微服务运行,通信通过 HTTP 协议完成。为了不阻塞 FastAPI 的高并发处理能力,必须使用异步客户端进行调用。
Async Calls (Created by Fareed Khan)
创建 services/api/app/clients/ray_llm.py,包含连接池与重试逻辑:
“`python
services/api/app/clients/ray_llm.py
import httpx
import logging
import backoff
from typing import List, Dict, Optional
from services.api.app.config import settings
logger = logging.getLogger(name)
class RayLLMClient:
“””
带连接池的异步客户端
“””
def init(self):
self.endpoint = settings.RAY_LLM_ENDPOINT
self.client: Optional[httpx.AsyncClient] = None
async def start(self):
"""应用启动时初始化"""
limits = httpx.Limits(max_keepalive_connections=20, max_connections=50)
self.client = httpx.AsyncClient(timeout=120.0, limits=limits)
logger.info("Ray LLM Client initialized.")
async def close(self):
if self.client:
await self.client.aclose()
@backoff.on_exception(backoff.expo, httpx.HTTPError, max_tries=3)
async def chat_completion(self, messages: List[Dict], temperature: float = 0.7, json_mode: bool = False) -> str:
if not self.client:
raise RuntimeError("Client not initialized. Call start() first.")
payload = {"messages": messages, "temperature": temperature, "max_tokens": 1024}
response = await self.client.post(self.endpoint, json=payload)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
llm_client = RayLLMClient()
“`
backoff 库确保了在网络抖动或 Ray Serve 服务繁忙时,请求不会直接失败,而是采用指数退避策略进行重试。
Embedding 客户端 services/api/app/clients/ray_embed.py:
“`python
services/api/app/clients/ray_embed.py
import httpx
from services.api.app.config import settings
class RayEmbedClient:
“””
Ray Serve Embedding Service 的客户端
“””
async def embed_query(self, text: str) -> list[float]:
async with httpx.AsyncClient() as client:
response = await client.post(
settings.RAY_EMBED_ENDPOINT,
json={“text”: text, “task_type”: “query”}
)
response.raise_for_status()
return response.json()[“embedding”]
async def embed_documents(self, texts: list[str]) -> list[list[float]]:
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
settings.RAY_EMBED_ENDPOINT,
json={"text": texts, "task_type": "document"}
)
response.raise_for_status()
return response.json()["embeddings"]
embed_client = RayEmbedClient()
“`
通过上述客户端,API 代码可以将 70B 大模型的调用简化为异步函数调用,完全屏蔽了底层 GPU 管理与分布式推理的复杂性。
接下来,我们将构建 Agentic Pipeline,将基础的 RAG 升级为具备决策能力的 Agentic RAG,使其能够判断何时调用何种模型来回答用户问题。
Agentic AI Pipeline
至此,我们已经拥有了数据摄取流水线和分布式模型服务。一个简单的 RAG 应用可能只是一个“检索 -> 生成”的线性流程。

然而,对于企业级的 Agentic 平台而言,这种线性链条过于脆弱:
当用户转换话题、要求进行数学计算或表达含糊不清时,线性流程极易出错。
因此,我们使用 FastAPI 与 LangGraph 构建一个“事件驱动的智能体”。该系统能够对用户意图进行“推理”:通过循环、纠错、动态选择工具,并异步处理成千上万个 WebSocket 连接。
API 基础与可观测性
首先,需要定义环境与安全基线。企业级 API 不能是一个“黑盒”,我们需要结构化的日志记录与链路追踪,以定位为何某次请求耗时 5 秒而非 500 毫秒。

在 services/api/requirements.txt 中声明依赖,包括 FastAPI、LangGraph、OpenTelemetry 等核心库:
“`txt
services/api/requirements.txt
Core Framework
fastapi==0.109.0
uvicorn[standard]==0.27.0
pydantic==2.6.0
pydantic-settings==2.1.0
simpleeval==0.9.13 # Safe math evaluation
Async Database & Cache
sqlalchemy==2.0.25
asyncpg==0.29.0
redis==5.0.1
AI & LLM Clients
openai==1.10.0
anthropic==0.8.0
tiktoken==0.5.2
sentence-transformers==2.3.1
transformers==4.37.0
Graph & Vector DBs
neo4j==5.16.0
qdrant-client==1.7.3
Agent Framework
langchain==0.1.5
langgraph==0.0.21
Observability & Ops
opentelemetry-api==1.22.0
opentelemetry-sdk==1.22.0
opentelemetry-exporter-otlp==1.22.0
prometheus-client==0.19.0
python-json-logger==2.0.7
backoff==2.2.1
Security
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.6
Utilities
boto3==1.34.34
httpx==0.26.0
tenacity==8.2.3
“`
接着,在 services/api/app/config.py 中使用 Pydantic Settings 对数据库 URL、API 密钥等关键配置进行启动时校验,确保应用在启动阶段就能快速发现配置问题(Fail Fast):
“`python
services/api/app/config.py
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
“””
应用配置:自动读取环境变量(大小写不敏感)
“””
ENV: str = “prod”
LOG_LEVEL: str = “INFO”
DATABASE_URL: str
REDIS_URL: str
QDRANT_HOST: str = “qdrant-service”
QDRANT_PORT: int = 6333
QDRANT_COLLECTION: str = “rag_collection”
NEO4J_URI: str = “bolt://neo4j-cluster:7687”
NEO4J_USER: str = “neo4j”
NEO4J_PASSWORD: str
AWS_REGION: str = “us-east-1”
S3_BUCKET_NAME: str
RAY_LLM_ENDPOINT: str = “http://llm-service:8000/llm”
RAY_EMBED_ENDPOINT: str = “http://embed-service:8000/embed”
JWT_SECRET_KEY: str
JWT_ALGORITHM: str = “HS256”
class Config:
env_file = ".env"
settings = Settings()
“`
为日志接入结构化 JSON 格式 services/api/app/logging.py:
“`python
services/api/app/logging.py
import logging
import json
import sys
from datetime import datetime
class JSONFormatter(logging.Formatter):
“””
将日志格式化为 JSON:包含时间戳、级别、消息等
“””
def format(self, record):
log_record = {
“timestamp”: datetime.utcnow().isoformat(),
“level”: record.levelname,
“logger”: record.name,
“message”: record.getMessage(),
“module”: record.module,
“line”: record.lineno
}
if record.exc_info:
log_record[“exception”] = self.formatException(record.exc_info)
if hasattr(record, “request_id”):
log_record[“request_id”] = record.request_id
return json.dumps(log_record)
def setup_logging():
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JSONFormatter())
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
if root_logger.handlers:
root_logger.handlers = []
root_logger.addHandler(handler)
logging.getLogger(“uvicorn.access”).disabled = True
logging.getLogger(“httpx”).setLevel(logging.WARNING)
setup_logging()
“`
启用分布式追踪 services/api/app/observability.py:
“`python
services/api/app/observability.py
from fastapi import FastAPI
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from libs.observability.tracing import configure_tracing
def setup_observability(app: FastAPI):
configure_tracing(service_name=”rag-api-service”)
FastAPIInstrumentor.instrument_app(app)
“`
实现 JWT 校验中间件 services/api/app/auth/jwt.py:
“`python
services/api/app/auth/jwt.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from services.api.app.config import settings
import time
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=”token”)
async def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=”Could not validate credentials”,
headers={“WWW-Authenticate”: “Bearer”},
)
try:
payload = jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
user_id: str = payload.get(“sub”)
role: str = payload.get(“role”, “user”)
if user_id is None:
raise credentials_exception
exp = payload.get(“exp”)
if exp and time.time() > exp:
raise HTTPException(status_code=401, detail=”Token expired”)
return {“id”: user_id, “role”: role, “permissions”: payload.get(“permissions”, [])}
except JWTError:
raise credentials_exception
“`
数据契约 libs/schemas/chat.py:
“`python
libs/schemas/chat.py
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from datetime import datetime
class Message(BaseModel):
role: str # “user”, “assistant”, “system”
content: str
timestamp: datetime = Field(default_factory=datetime.utcnow)
class ChatRequest(BaseModel):
message: str
session_id: Optional[str] = None
stream: bool = True
filters: Optional[Dict[str, Any]] = None
class ChatResponse(BaseModel):
answer: str
session_id: str
citations: List[Dict[str, str]] = [] # [{“source”: “doc.pdf”, “text”: “…”}]
latency_ms: float
class RetrievalResult(BaseModel):
content: str
source: str
score: float
metadata: Dict[str, Any]
“`
异步数据网关
为保障可扩展性,API 应避免阻塞主线程。我们为所有数据库操作采用异步客户端,使单个工作进程能够在等待数据库响应时继续处理其他连接。

Redis 客户端 services/api/app/clients/redis.py:
“`python
services/api/app/clients/redis.py
import redis.asyncio as redis
from services.api.app.config import settings
class RedisClient:
“””
Redis 连接池单例,用于限流与语义缓存
“””
def init(self):
self.redis = None
async def connect(self):
if not self.redis:
self.redis = redis.from_url(
settings.REDIS_URL,
encoding="utf-8",
decode_responses=True
)
async def close(self):
if self.redis:
await self.redis.close()
def get_client(self):
return self.redis
redis_client = RedisClient()
“`
Qdrant 向量搜索客户端 services/api/app/clients/qdrant.py:
“`python
services/api/app/clients/qdrant.py
from qdrant_client import AsyncQdrantClient
from services.api.app.config import settings
class VectorDBClient:
“””
Qdrant 异步客户端
“””
def init(self):
self.client = AsyncQdrantClient(
host=settings.QDRANT_HOST,
port=settings.QDRANT_PORT,
prefer_grpc=True
)
async def search(self, vector: list[float], limit: int = 5):
return await self.client.search(
collection_name=settings.QDRANT_COLLECTION,
query_vector=vector,
limit=limit,
with_payload=True
)
qdrant_client = VectorDBClient()
“`
Neo4j 图搜索 services/api/app/clients/neo4j.py:
“`python
services/api/app/clients/neo4j.py
from neo4j import GraphDatabase, AsyncGraphDatabase
from services.api.app.config import settings
import logging
logger = logging.getLogger(name)
class Neo4jClient:
“””
Neo4j 驱动单例(异步)
“””
def init(self):
self._driver = None
def connect(self):
if not self._driver:
try:
self._driver = AsyncGraphDatabase.driver(
settings.NEO4J_URI,
auth=(settings.NEO4J_USER, settings.NEO4J_PASSWORD)
)
logger.info("Connected to Neo4j successfully.")
except Exception as e:
logger.error(f"Failed to connect to Neo4j: {e}")
raise
async def close(self):
if self._driver:
await self._driver.close()
async def query(self, cypher_query: str, parameters: dict = None):
if not self._driver:
await self.connect()
async with self._driver.session() as session:
result = await session.run(cypher_query, parameters or {})
return [record.data() async for record in result]
neo4j_client = Neo4jClient()
“`
上下文记忆与语义缓存
Agent 的能力取决于“记忆”。我们使用 Postgres 存储完整对话历史,使 LLM 能回忆多轮上下文。

在 services/api/app/memory/models.py 定义 schema:
“`python
services/api/app/memory/models.py
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String, Text, DateTime, JSON
from datetime import datetime
Base = declarative_base()
class ChatHistory(Base):
tablename = “chat_history”
id = Column(Integer, primary_key=True, autoincrement=True)
session_id = Column(String(255), index=True, nullable=False)
user_id = Column(String(255), index=True, nullable=False)
role = Column(String(50), nullable=False)
content = Column(Text, nullable=False)
metadata_ = Column(JSON, default={}, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
“`
“`python
services/api/app/memory/postgres.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, String, JSON, DateTime, Integer, Text, select
from datetime import datetime
from services.api.app.config import settings
Base = declarative_base()
class ChatHistory(Base):
tablename = “chat_history”
id = Column(Integer, primary_key=True, autoincrement=True)
session_id = Column(String, index=True)
user_id = Column(String, index=True)
role = Column(String)
content = Column(Text)
metadata_ = Column(JSON, default={})
created_at = Column(DateTime, default=datetime.utcnow)
engine = create_async_engine(settings.DATABASE_URL, echo=False)
AsyncSessionLocal = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
class PostgresMemory:
async def add_message(self, session_id: str, role: str, content: str, user_id: str):
async with AsyncSessionLocal() as session:
async with session.begin():
msg = ChatHistory(session_id=session_id, role=role, content=content, user_id=user_id)
session.add(msg)
async def get_history(self, session_id: str, limit: int = 10):
async with AsyncSessionLocal() as session:
result = await session.execute(
select(ChatHistory)
.where(ChatHistory.session_id == session_id)
.order_by(ChatHistory.created_at.desc())
.limit(limit)
)
return result.scalars().all()[::-1]
postgres_memory = PostgresMemory()
“`
为优化成本,我们实现了语义缓存 services/api/app/cache/semantic.py。当语义相近的查询已被处理过时,系统可直接从缓存中返回答案,避免再次调用成本高昂的 70B 模型。
“`python
services/api/app/cache/semantic.py
import json
import logging
from typing import Optional
from services.api.app.clients.ray_embed import embed_client
from services.api.app.clients.qdrant import qdrant_client
from services.api.app.config import settings
logger = logging.getLogger(name)
class SemanticCache:
“””
基于向量相似度的语义缓存
“””
async def get_cached_response(self, query: str, threshold: float = 0.95) -> Optional[str]:
try:
vector = await embed_client.embed_query(query)
results = await qdrant_client.client.search(
collection_name=”semantic_cache”,
query_vector=vector,
limit=1,
with_payload=True,
score_threshold=threshold
)
if results:
logger.info(f”Semantic Cache Hit! Score: {results[0].score}”)
return results[0].payload[“answer”]
except Exception as e:
logger.warning(f”Semantic cache lookup failed: {e}”)
return None
async def set_cached_response(self, query: str, answer: str):
try:
vector = await embed_client.embed_query(query)
import uuid
from qdrant_client.http import models
await qdrant_client.client.upsert(
collection_name="semantic_cache",
points=[
models.PointStruct(
id=str(uuid.uuid4()),
vector=vector,
payload={"query": query, "answer": answer}
)
]
)
except Exception as e:
logger.warning(f"Failed to write to semantic cache: {e}")
semantic_cache = SemanticCache()
“`
此设计能显著降低处理常见问题(FAQ)的响应延迟与计算成本。
使用 LangGraph 构建工作流
Agent 的核心是一个“状态机”,它在思考(Thinking)、检索(Retrieving)、使用工具(Using Tools)和回答(Answering)等不同状态间转换。
LangGraph Workflow (Created by Fareed Khan)
我们在 services/api/app/agents/state.py 中定义 AgentState:
“`python
services/api/app/agents/state.py
from typing import TypedDict, Annotated, List, Union
import operator
class AgentState(TypedDict):
messages: Annotated[List[dict], operator.add]
documents: List[str]
current_query: str
plan: List[str]
“`
“`python
services/api/app/agents/nodes/planner.py
import json
import logging
from services.api.app.agents.state import AgentState
from services.api.app.clients.ray_llm import llm_client
logger = logging.getLogger(name)
SYSTEM_PROMPT = “””
You are a RAG Planning Agent.
Analyze the User Query and Conversation History.
Decide the next step:
1. If the user greets (Hello/Hi), output “direct_answer”.
2. If the user asks a specific question requiring data, output “retrieve”.
3. If the user asks for math/code, output “tool_use”.
Output JSON format ONLY:
{
“action”: “retrieve” | “direct_answer” | “tool_use”,
“refined_query”: “The standalone search query”,
“reasoning”: “Why you chose this action”
}
“””
async def planner_node(state: AgentState) -> dict:
logger.info(“Planner Node: Analyzing query…”)
last_message = state[“messages”][-1]
user_query = last_message.content if hasattr(last_message, ‘content’) else last_message[‘content’]
try:
response_text = await llm_client.chat_completion(
messages=[
{“role”: “system”, “content”: SYSTEM_PROMPT},
{“role”: “user”, “content”: user_query}
],
temperature=0.0
)
plan = json.loads(response_text)
logger.info(f”Plan derived: {plan[‘action’]}”)
return {
“current_query”: plan.get(“refined_query”, user_query),
“plan”: [plan[“reasoning”]]
}
except Exception as e:
logger.error(f”Planning failed: {e}”)
return {“current_query”: user_query, “plan”: [“Error in planning, defaulting to retrieval.”]}
“`
Retriever 节点 services/api/app/agents/nodes/retriever.py 执行混合检索,并行调用 Qdrant 与 Neo4j:
“`python
services/api/app/agents/nodes/retriever.py
import asyncio
from typing import Dict, List
from services.api.app.agents.state import AgentState
from services.api.app.clients.qdrant import qdrant_client
from services.api.app.clients.neo4j import neo4j_client
from services.api.app.clients.ray_embed import embed_client
import logging
logger = logging.getLogger(name)
async def retrieve_node(state: AgentState) -> Dict:
query = state[“current_query”]
logger.info(f”Retrieving context for: {query}”)
query_vector = await embed_client.embed_query(query)
async def run_vector_search():
results = await qdrant_client.search(vector=query_vector, limit=5)
return [f"{r.payload['text']} [Source: {r.payload['metadata']['filename']}]" for r in results]
async def run_graph_search():
cypher = """
CALL db.index.fulltext.queryNodes("entity_index", $query) YIELD node, score
MATCH (node)-[r]->(neighbor)
RETURN node.name + ' ' + type(r) + ' ' + neighbor.name as text
LIMIT 5
"""
try:
results = await neo4j_client.query(cypher, {"query": query})
return [r['text'] for r in results]
except Exception as e:
logger.error(f"Graph search failed: {e}")
return []
vector_docs, graph_docs = await asyncio.gather(run_vector_search(), run_graph_search())
combined_docs = list(set(vector_docs + graph_docs))
logger.info(f"Retrieved {len(combined_docs)} documents.")
return {"documents": combined_docs}
“`
Responder 节点 services/api/app/agents/nodes/responder.py 使用 Llama-70B 模型综合检索结果生成最终回答:
“`python
services/api/app/agents/nodes/responder.py
from services.api.app.agents.state import AgentState
from services.api.app.clients.ray_llm import llm_client
async def generate_node(state: AgentState) -> dict:
query = state[“current_query”]
documents = state.get(“documents”, [])
context_str = “nn”.join(documents)
prompt = f”””
You are a helpful Enterprise Assistant. Use the context below to answer the user’s question.
Context:
{context_str}
Question:
{query}
Instructions:
1. Cite sources using [Source: Filename].
2. If the answer is not in the context, say "I don't have that information in my documents."
3. Be concise and professional.
"""
answer = await llm_client.chat_completion(
messages=[{"role": "user", "content": prompt}],
temperature=0.3
)
return {"messages": [{"role": "assistant", "content": answer}]}
“`
Tool 节点:执行外部计算与搜索
services/api/app/agents/nodes/tool.py 节点负责执行外部计算或搜索任务,根据计划选择并调用相应的工具。
“`python
services/api/app/agents/nodes/tool.py
import logging
from services.api.app.agents.state import AgentState
from services.api.app.tools.calculator import calculate
from services.api.app.tools.graph_search import search_graph_tool
from services.api.app.tools.web_search import web_search
logger = logging.getLogger(name)
async def tool_node(state: AgentState) -> dict:
plan_data = state.get(“plan”, [])
if not plan_data:
return {“messages”: [{“role”: “system”, “content”: “No tool selected.”}]}
tool_name = state.get(“tool_choice”, “calculator”)
tool_input = state.get(“tool_input”, “0+0”)
result = “”
if tool_name == “calculator”:
logger.info(f”Executing Calculator: {tool_input}”)
result = calculate(tool_input)
elif tool_name == “graph_search”:
logger.info(f”Executing Graph Search: {tool_input}”)
result = await search_graph_tool(tool_input)
else:
result = “Unknown tool requested.”
return {“messages”: [{“role”: “user”, “content”: f”Tool Output: {result}”}]}
“`
节点编排:构建 LangGraph 工作流
在 services/api/app/agents/graph.py 中,将各个功能节点编排成一个有向工作流。
“`python
services/api/app/agents/graph.py
from langgraph.graph import StateGraph, END
from services.api.app.agents.state import AgentState
from services.api.app.agents.nodes.retriever import retrieve_node
from services.api.app.agents.nodes.responder import generate_node
from services.api.app.agents.nodes.planner import planner_node
workflow = StateGraph(AgentState)
workflow.add_node(“planner”, planner_node)
workflow.add_node(“retriever”, retrieve_node)
workflow.add_node(“responder”, generate_node)
workflow.set_entry_point(“planner”)
workflow.add_edge(“planner”, “retriever”)
workflow.add_edge(“retriever”, “responder”)
workflow.add_edge(“responder”, END)
agent_app = workflow.compile()
“`
查询增强:HyDE 实现
为提高检索的准确度,我们采用 HyDE(Hypothetical Document Embeddings)技术。其核心思想是让大语言模型(LLM)根据问题“幻写”一个假设性答案段落,然后使用该段落的语义向量进行检索,从而更容易找到具有相似语义模式的真实文档。
Query Enhance (Created by Fareed Khan)
实现位于 services/api/app/enhancers/hyde.py:
“`python
services/api/app/enhancers/hyde.py
from services.api.app.clients.ray_llm import llm_client
SYSTEM_PROMPT = “””
You are a helpful assistant.
Write a hypothetical paragraph that answers the user’s question.
It does not need to be factually correct, but it must use the correct vocabulary and structure
that a relevant document would have.
Question: {question}
“””
async def generate_hypothetical_document(question: str) -> str:
try:
hypothetical_doc = await llm_client.chat_completion(
messages=[
{“role”: “system”, “content”: SYSTEM_PROMPT.format(question=question)},
],
temperature=0.7
)
return hypothetical_doc
except Exception:
return question
“`
同时在 services/api/app/enhancers/query_rewriter.py 中实现指代消解与查询重写,将“它/他们”等代词补全为完整的查询语句:
“`python
services/api/app/enhancers/query_rewriter.py
from typing import List, Dict
from services.api.app.clients.ray_llm import llm_client
SYSTEM_PROMPT = “””
You are a Query Rewriter.
Your task is to rewrite the latest user question to be a standalone search query,
resolving coreferences (he, she, it, they) using the conversation history.
History:
{history}
Latest Question: {question}
Output ONLY the rewritten question. If no rewriting is needed, output the latest question as is.
“””
async def rewrite_query(question: str, history: List[Dict[str, str]]) -> str:
if not history:
return question
history_str = “n”.join([f”{msg[‘role’]}: {msg[‘content’]}” for msg in history])
prompt = SYSTEM_PROMPT.format(history=history_str, question=question)
try:
rewritten = await llm_client.chat_completion(
messages=[{“role”: “user”, “content”: prompt}],
temperature=0.0
)
return rewritten.strip()
except Exception as e:
return question
“`
应用主入口 services/api/main.py 负责管理 FastAPI 应用的启动/关闭生命周期以及相关客户端的初始化和资源释放:
“`python
services/api/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from services.api.app.clients.neo4j import neo4j_client
from services.api.app.clients.ray_llm import llm_client
from services.api.app.clients.ray_embed import embed_client
from services.api.app.cache.redis import redis_client
from services.api.app.routes import chat, upload, health
@asynccontextmanager
async def lifespan(app: FastAPI):
print(“Initializing clients…”)
neo4j_client.connect()
await redis_client.connect()
await llm_client.start()
await embed_client.start()
yield
print(“Closing clients…”)
await neo4j_client.close()
await redis_client.close()
await llm_client.close()
await embed_client.close()
app = FastAPI(title=”Enterprise RAG Platform”, version=”1.0.0″, lifespan=lifespan)
app.include_router(chat.router, prefix=”/api/v1/chat”, tags=[“Chat”])
app.include_router(upload.router, prefix=”/api/v1/upload”, tags=[“Upload”])
app.include_router(health.router, prefix=”/health”, tags=[“Health”])
if name == “main“:
import uvicorn
uvicorn.run(app, host=”0.0.0.0”, port=8000)
“`
至此,我们已经构建了一个具备规划、检索与推理能力的智能体。接下来,我们将构建 Tools & Sandbox 层,使智能体能够安全地执行代码并访问外部知识。
Tools & Sandbox
在企业级 RAG 平台中,允许 AI 直接执行代码或查询内部数据库会带来巨大的安全风险。攻击者可能通过提示词注入等方式访问敏感数据。

我们采取分层策略来应对这一挑战:首先构建安全、确定性的工具;对于高风险操作(如 Python 代码执行),则在隔离且加固的沙箱容器中运行。
安全代码沙箱
让大语言模型生成并执行 Python 代码非常强大(例如进行数学计算、数据绘图或 CSV 分析),但也极其危险:模型可能产生诸如 os.system("rm -rf /") 的恶意指令,或尝试窃取数据。

解决方案是构建一个独立的 沙箱微服务。该服务在独立的容器中运行不可信代码,容器本身不具备网络访问权限,并受到严格的 CPU/RAM 资源限制与执行超时控制。
services/sandbox/Dockerfile:创建非 root 用户,限制权限。
“`dockerfile
services/sandbox/Dockerfile
FROM python:3.10-slim
ENV PYTHONDONTWRITEBYTECODE=1
PYTHONUNBUFFERED=1
RUN useradd -m -u 1000 sandbox_user
WORKDIR /app
COPY runner.py .
RUN pip install flask
USER sandbox_user
EXPOSE 8080
CMD [“python”, “runner.py”]
“`
services/sandbox/runner.py:Flask 接收代码,在独立进程执行,捕获 stdout,超时即杀:
“`python
services/sandbox/runner.py
from flask import Flask, request, jsonify
import sys
import io
import contextlib
import multiprocessing
app = Flask(name)
def execute_code_safe(code: str, queue):
buffer = io.StringIO()
try:
with contextlib.redirect_stdout(buffer):
exec(code, {“builtins“: builtins}, {})
queue.put({“status”: “success”, “output”: buffer.getvalue()})
except Exception as e:
queue.put({“status”: “error”, “output”: str(e)})
@app.route(“/execute”, methods=[“POST”])
def run_code():
data = request.json
code = data.get(“code”, “”)
timeout = data.get(“timeout”, 5)
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=execute_code_safe, args=(code, queue))
p.start()
p.join(timeout)
if p.is_alive():
p.terminate()
return jsonify({“output”: “Error: Execution timed out.”}), 408
if not queue.empty():
result = queue.get()
return jsonify(result)
return jsonify({“output”: “No output produced.”})
if name == “main“:
app.run(host=”0.0.0.0”, port=8080)
“`
资源限制 services/sandbox/limits.yaml(防止 while True 等攻击):
“`yaml
services/sandbox/limits.yaml
runtime:
timeout_seconds: 10
memory_limit_mb: 512
cpu_limit: 0.5
allow_network: false
files:
max_input_size_mb: 5
allowed_imports: [“math”, “datetime”, “json”, “pandas”, “numpy”]
“`
K8s NetworkPolicy 全面禁止 Sandbox egress services/sandbox/network-policy.yaml:
“`yaml
services/sandbox/network-policy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: sandbox-deny-egress
namespace: default
spec:
podSelector:
matchLabels:
app: sandbox-service
policyTypes:
– Egress
egress: []
“`
services/api/app/tools/sandbox.py:
“`python
import httpx
from services.api.app.config import settings
SANDBOX_URL = “http://sandbox-service:8080/execute”
async def run_python_code(code: str) -> str:
try:
async with httpx.AsyncClient() as client:
response = await client.post(
SANDBOX_URL,
json={“code”: code, “timeout”: 5},
timeout=6.0
)
if response.status_code == 200:
data = response.json()
if data.get(“status”) == “success”:
return f”Output:n{data[‘output’]}”
else:
return f”Execution Error:n{data[‘output’]}”
else:
return f”Sandbox Error: Status {response.status_code}”
except Exception as e:
return f”Sandbox Connection Failed: {str(e)}”
“`
确定性与搜索工具
鉴于代码执行风险较高,应优先选择更安全、确定性强的工具。对于数学计算,可使用 simpleeval 库实现安全计算器,位于 services/api/app/tools/calculator.py:

“`python
services/api/app/tools/calculator.py
from simpleeval import simple_eval
def calculate(expression: str) -> str:
“””
使用 simpleeval 安全计算表达式,避免远程代码执行(RCE)风险
“””
if len(expression) > 100:
return “Error: Expression too long.”
try:
result = simple_eval(expression)
return str(result)
except Exception as e:
return f”Error: {str(e)}”
“`
将图数据库(Graph DB)和向量数据库(Vector DB)的功能封装为工具对外暴露。图搜索工具 services/api/app/tools/graph_search.py 的实现逻辑是:首先由大语言模型(LLM)从用户问题中提取核心实体,然后执行参数化的 Cypher 查询。禁止直接让 LLM 生成 Cypher 语句,以防止潜在的注入攻击。
“`python
services/api/app/tools/graph_search.py
from services.api.app.clients.neo4j import neo4j_client
from services.api.app.clients.ray_llm import llm_client
import json
SYSTEM_PROMPT = “””
You are a Knowledge Graph Helper.
Extract the core entities from the user’s question to perform a search.
Question: {question}
Output JSON only: {“entities”: [“list”, “of”, “names”]}
“””
“`
工具的具体实现如下:
“`python
async def search_graph_tool(question: str) -> str:
try:
response_text = await llm_client.chat_completion(
messages=[{“role”: “system”, “content”: SYSTEM_PROMPT.format(question=question)}],
temperature=0.0,
json_mode=True
)
data = json.loads(response_text)
entities = data.get(“entities”, [])
cypher_query = """
UNWIND $names AS target_name
CALL db.index.fulltext.queryNodes("entity_index", target_name) YIELD node, score
MATCH (node)-[r]-(neighbor)
RETURN node.name AS source, type(r) AS rel, neighbor.name AS target
LIMIT 10
"""
results = await neo4j_client.query(cypher_query, {"names": entities})
return str(results) if results else "No connections found."
except Exception as e:
return f"Graph search error: {str(e)}"
“`
向量搜索工具 services/api/app/tools/vector_search.py:
“`python
services/api/app/tools/vector_search.py
from services.api.app.clients.qdrant import qdrant_client
from services.api.app.clients.ray_embed import embed_client
async def search_vector_tool(query: str) -> str:
try:
vector = await embed_client.embed_query(query)
results = await qdrant_client.search(vector, limit=3)
formatted = “”
for r in results:
meta = r.payload.get(“metadata”, {})
formatted += f”- {r.payload.get(‘text’, ”)[:200]}… [Source: {meta.get(‘filename’)}]n”
return formatted if formatted else “No relevant documents found.”
except Exception as e:
return f”Search Error: {str(e)}”
“`
外部搜索工具 services/api/app/tools/web_search.py(如 Tavily):
“`python
services/api/app/tools/web_search.py
import httpx
import os
async def web_search_tool(query: str) -> str:
api_key = os.getenv(“TAVILY_API_KEY”)
if not api_key:
return “Web search disabled.”
try:
async with httpx.AsyncClient() as client:
response = await client.post(
“https://api.tavily.com/search”,
json={“api_key”: api_key, “query”: query, “max_results”: 3}
)
data = response.json()
results = data.get(“results”, [])
return “n”.join([f”- {r[‘title’]}: {r[‘content’]}” for r in results])
except Exception as e:
return f”Web Search Error: {str(e)}”
“`
API 路由与网关逻辑
将系统通过 REST 暴露。主路由 services/api/app/routes/chat.py 处理流式响应、LangGraph 执行与后台日志记录:
Route Logic (Created by Fareed Khan)
services/api/app/routes/chat.py
“`python
import uuid
import json
import logging
from typing import AsyncGenerator
from fastapi import APIRouter, Depends, BackgroundTasks
from fastapi.responses import StreamingResponse
from services.api.app.auth.jwt import get_current_user
from services.api.app.agents.graph import agent_app
from services.api.app.agents.state import AgentState
from services.api.app.memory.postgres import postgres_memory
from services.api.app.cache.semantic import semantic_cache
router = APIRouter()
logger = logging.getLogger(name)
class ChatRequest(BaseModel):
message: str
session_id: str = None
@router.post(“/stream”)
async def chat_stream(req: ChatRequest, background_tasks: BackgroundTasks, user: dict = Depends(get_current_user)):
session_id = req.session_id or str(uuid.uuid4())
user_id = user[“id”]
cached_ans = await semantic_cache.get_cached_response(req.message)
if cached_ans:
async def stream_cache():
yield json.dumps({“type”: “answer”, “content”: cached_ans}) + “n”
return StreamingResponse(stream_cache(), media_type=”application/x-ndjson”)
history_objs = await postgres_memory.get_history(session_id, limit=6)
history_dicts = [{"role": msg.role, "content": msg.content} for msg in history_objs]
history_dicts.append({"role": "user", "content": req.message})
initial_state = AgentState(messages=history_dicts, current_query=req.message, documents=[], plan=[])
async def event_generator():
final_answer = ""
async for event in agent_app.astream(initial_state):
node_name = list(event.keys())[0]
node_data = event[node_name]
yield json.dumps({"type": "status", "node": node_name}) + "n"
if node_name == "responder" and "messages" in node_data:
final_answer = node_data["messages"][-1]["content"]
yield json.dumps({"type": "answer", "content": final_answer}) + "n"
if final_answer:
await postgres_memory.add_message(session_id, "user", req.message, user_id)
await postgres_memory.add_message(session_id, "assistant", final_answer, user_id)
await semantic_cache.set_cached_response(req.message, final_answer)
return StreamingResponse(event_generator(), media_type="application/x-ndjson")
“`
文件上传 services/api/app/routes/upload.py:在企业场景中,通常不直接通过应用API上传大文件,而是生成S3预签名URL,让前端直接上传至对象存储:
“`python
services/api/app/routes/upload.py
import boto3
import uuid
from fastapi import APIRouter, Depends
from services.api.app.config import settings
from services.api.app.auth.jwt import get_current_user
router = APIRouter()
s3_client = boto3.client(“s3”, region_name=settings.AWS_REGION)
@router.post(“/generate-presigned-url”)
async def generate_upload_url(filename: str, content_type: str, user: dict = Depends(get_current_user)):
file_id = str(uuid.uuid4())
s3_key = f”uploads/{user[‘id’]}/{file_id}”
url = s3_client.generate_presigned_url(
ClientMethod=’put_object’,
Params={‘Bucket’: settings.S3_BUCKET_NAME, ‘Key’: s3_key, ‘ContentType’: content_type},
ExpiresIn=3600
)
return {“upload_url”: url, “file_id”: file_id, “s3_key”: s3_key}
“`
反馈与健康检查(示例代码结构):
“`python
services/api/app/routes/feedback.py
@router.post(“/”)
async def submit_feedback(
req: FeedbackRequest,
user: dict = Depends(get_current_user)
):
…
services/api/app/routes/health.py
@router.get(“/readiness”)
async def readiness(response: Response):
…
“`
网关限流 services/gateway/rate_limit.lua:在Nginx或Kong网关层运行Lua脚本,优先在边缘拦截滥用请求。
“`lua
— services/gateway/rate_limit.lua
local redis = require “resty.redis”
local red = redis:new()
red:set_timeout(100)
local ok, err = red:connect(“rag-redis-prod”, 6379)
if not ok then return ngx.exit(500) end
local key = “rate_limit:” .. ngx.var.remote_addr
local limit = 100
local current = red:incr(key)
if current == 1 then red:expire(key, 60) end
if current > limit then
ngx.status = 429
ngx.say(“Rate limit exceeded.”)
return ngx.exit(429)
end
“`
将限流逻辑前移到Nginx/Lua层,可以在请求抵达后端Python API前拦截恶意流量,有效节省服务器资源。
至此,我们已经完成了应用栈的核心部分:数据摄取(Ingestion)、模型(Models)、智能体(Agent)和工具(Tools)。接下来,我们将在AWS上配置运行该系统所需的基础设施。
Infrastructure as Code (IaC)
拥有软件栈后,我们需要一个可靠的环境来运行它。企业级平台不能依赖在AWS控制台手动操作(“ClickOps”),这会导致配置漂移、安全漏洞以及环境不可重复等问题。
Infra layer (Created by Fareed Khan)
- 使用Terraform将整个云基础设施代码化,实现分钟级拉起一致的开发(dev)、预发布(staging)和生产(prod)环境。
- 使用Karpenter实现智能、准实时的节点伸缩,相比传统的自动伸缩组(ASG)更快、更经济。
基础与网络
一切从网络开始。需要一个隔离的VPC,通常包含对外服务的负载均衡层(Public)、内部应用层(Private)和数据库层(Database),实现三层网络隔离。
Networking (Created by Fareed Khan)
infra/terraform/main.tf 配置远程状态后端(使用S3存储和DynamoDB锁),防止多人并发操作导致状态文件损坏:
“`hcl
infra/terraform/main.tf
terraform {
required_version = “>= 1.5.0”
backend “s3” {
bucket = “rag-platform-terraform-state-prod-001”
key = “platform/terraform.tfstate”
region = “us-east-1”
encrypt = true
dynamodb_table = “terraform-state-lock”
}
required_providers {
aws = { source = “hashicorp/aws”, version = “~> 5.0” }
kubernetes = { source = “hashicorp/kubernetes”, version = “~> 2.23” }
}
}
provider “aws” {
region = var.aws_region
default_tags {
tags = { Project = “Enterprise-RAG”, ManagedBy = “Terraform” }
}
}
“`
变量定义 infra/terraform/variables.tf
“`hcl
infra/terraform/variables.tf
variable “aws_region” {
description = “AWS region to deploy resources”
default = “us-east-1”
}
variable “cluster_name” {
description = “Name of the EKS Cluster”
default = “rag-platform-cluster”
}
variable “vpc_cidr” {
description = “CIDR block for the VPC”
default = “10.0.0.0/16”
}
“`
三层网络 infra/terraform/vpc.tf
“`hcl
infra/terraform/vpc.tf
module “vpc” {
source = “terraform-aws-modules/vpc/aws”
version = “5.1.0”
name = “${var.cluster_name}-vpc”
cidr = var.vpc_cidr
azs = [“us-east-1a”, “us-east-1b”, “us-east-1c”]
public_subnets = [“10.0.1.0/24”, “10.0.2.0/24”, “10.0.3.0/24”]
private_subnets = [“10.0.101.0/24”, “10.0.102.0/24”, “10.0.103.0/24”]
database_subnets = [“10.0.201.0/24”, “10.0.202.0/24”, “10.0.203.0/24”]
enable_nat_gateway = true
single_nat_gateway = false
enable_dns_hostnames = true
public_subnet_tags = {
“kubernetes.io/role/elb” = “1”
}
private_subnet_tags = {
“kubernetes.io/role/internal-elb” = “1”
}
}
“`
计算集群(EKS & IAM)
计算层以 Amazon EKS 作为控制平面。通过启用 OIDC(IRSA),Kubernetes ServiceAccount 可以临时扮演 AWS IAM 角色,从而避免长期访问密钥泄露的风险。配置示例如下 infra/terraform/eks.tf:
“`hcl
infra/terraform/eks.tf
module “eks” {
source = “terraform-aws-modules/eks/aws”
version = “~> 19.0”
cluster_name = var.cluster_name
cluster_version = “1.29”
vpc_id = module.vpc.vpc_id
subnet_ids = module.vpc.private_subnets
enable_irsa = true
eks_managed_node_groups = {
system = {
name = “system-nodes”
instance_types = [“m6i.large”]
min_size = 2
max_size = 5
desired_size = 2
}
}
}
“`
遵循最小权限原则,仅为数据摄取作业授予必要的 S3 权限,并将其与特定的 ServiceAccount 绑定 infra/terraform/iam.tf:
“`hcl
infra/terraform/iam.tf
resource “aws_iam_policy” “ingestion_policy” {
name = “RAG_Ingestion_S3_Policy”
policy = jsonencode({
Version = “2012-10-17”
Statement = [
{
Action = [“s3:GetObject”, “s3:PutObject”, “s3:ListBucket”]
Effect = “Allow”
Resource = [aws_s3_bucket.documents.arn, “${aws_s3_bucket.documents.arn}/*”]
}
]
})
}
module “ingestion_irsa_role” {
source = “terraform-aws-modules/iam/aws//modules/iam-role-for-service-account-eks”
role_name = “rag-ingestion-role”
oidc_providers = {
main = {
provider_arn = module.eks.oidc_provider_arn
namespace_service_accounts = [“default:ray-worker”]
}
}
role_policy_arns = {
policy = aws_iam_policy.ingestion_policy.arn
}
}
“`
托管数据存储
为降低维护成本与风险,生产系统优先采用托管服务,例如 Aurora(PostgreSQL)用于关系型数据,ElastiCache(Redis)用于缓存。
Data Store management (Created by Fareed Khan)
使用 Karpenter 实现自动扩缩容
传统的 Cluster Autoscaler 采用被动响应模式,扩缩容速度相对较慢。Karpenter 则采取主动策略,它会实时分析待调度 Pod 的资源需求(如 GPU、内存),并在数秒内启动最匹配的 EC2 实例。
Autoscale with karpenter (Created by Fareed Khan)
CPU Provisioner(无状态服务使用 Spot)infra/karpenter/provisioner-cpu.yaml:
“`yaml
infra/karpenter/provisioner-cpu.yaml
apiVersion: karpenter.sh/v1beta1
kind: Provisioner
metadata:
name: cpu-provisioner
spec:
requirements:
– key: “karpenter.k8s.aws/instance-family”
operator: In
values: [“m6i”, “c6i”]
– key: “karpenter.sh/capacity-type”
operator: In
values: [“spot”]
limits:
resources:
cpu: 1000
consolidation:
enabled: true
“`
GPU Provisioner(支持 Scale-to-Zero)infra/karpenter/provisioner-gpu.yaml:
“`yaml
infra/karpenter/provisioner-gpu.yaml
apiVersion: karpenter.sh/v1beta1
kind: Provisioner
metadata:
name: gpu-provisioner
spec:
requirements:
– key: “karpenter.k8s.aws/instance-category”
operator: In
values: [“g”]
– key: “karpenter.sh/capacity-type”
operator: In
values: [“on-demand”, “spot”]
ttlSecondsAfterEmpty: 30
“`
部署
生产级 RAG 平台的部署远不止执行 kubectl apply。它需要管理配置漂移、安全地处理密钥,并确保数据库具备容灾能力。
Deployment layer (Created by Fareed Khan)
使用 Helm 打包应用,通过统一的模板化配置,可以方便地按不同环境覆盖 values。
集群引导与密钥管理
在部署自研组件之前,需要先安装 Ingress Controller、External Secrets Operator 和 KubeRay Operator。可以通过 scripts/bootstrap_cluster.sh 脚本一键安装:
“`bash
!/bin/bash
1. KubeRay Operator
helm repo add kuberay https://ray-project.github.io/kuberay-helm/
helm install kuberay-operator kuberay/kuberay-operator –version 1.0.0
2. External Secrets
helm repo add external-secrets https://charts.external-secrets.io
helm install external-secrets external-secrets/external-secrets
3. Nginx Ingress
helm repo add ingress-nginx https://kubernetes.github.io/ingress-nginx
helm install ingress-nginx ingress-nginx/ingress-nginx
“`
生产环境的密钥不应提交到 Git 仓库。我们采用 AWS Secrets Manager 进行存储,然后通过 External Secrets Operator 将其注入到 Kubernetes Pod 中:
“`yaml
deploy/secrets/external-secrets.yaml
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
name: app-secrets
spec:
refreshInterval: 1h
secretStoreRef:
name: aws-secrets-manager
kind: ClusterSecretStore
target:
name: app-env-secret
data:
– secretKey: NEO4J_PASSWORD
remoteRef:
key: prod/rag/db_creds
property: neo4j_password
“`
数据库与 Ingress 部署
Qdrant 的 Helm 生产化配置 deploy/helm/qdrant/values.yaml:
“`yaml
deploy/helm/qdrant/values.yaml
replicaCount: 3
config:
storage:
on_disk_payload: true
service:
enable_tls: false
resources:
requests:
cpu: “2”
memory: “4Gi”
limits:
cpu: “4”
memory: “8Gi”
persistence:
size: 50Gi
storageClassName: gp3
“`
Neo4j 的 Helm 生产化配置 deploy/helm/neo4j/values.yaml:
“`yaml
deploy/helm/neo4j/values.yaml
neo4j:
name: “neo4j-cluster”
edition: “community”
core:
numberOfServers: 1
resources:
requests:
cpu: “2”
memory: “8Gi”
volumes:
data:
mode: “default”
storageClassName: “gp3”
size: “100Gi”
“`
Ingress 配置
Ingress 配置定义了外部访问集群服务的路由规则,位于 deploy/ingress/nginx.yaml。
“`yaml
deploy/ingress/nginx.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: rag-ingress
annotations:
kubernetes.io/ingress.class: nginx
nginx.ingress.kubernetes.io/proxy-body-size: “50m”
nginx.ingress.kubernetes.io/proxy-read-timeout: “3600”
nginx.ingress.kubernetes.io/proxy-send-timeout: “3600”
spec:
rules:
– host: api.your-rag-platform.com
http:
paths:
– path: /chat
pathType: Prefix
backend:
service:
name: api-service
port:
number: 80
– path: /upload
pathType: Prefix
backend:
service:
name: api-service
port:
number: 80
“`
Ray AI 集群部署
Ray 集群的部署配置用于管理分布式计算任务,主要包括自动扩缩、集群定义和服务部署。
Ray 自动扩缩配置
自动扩缩配置位于 deploy/ray/autoscaling.yaml,用于定义工作节点的弹性伸缩策略。
“`yaml
deploy/ray/autoscaling.yaml
autoscaling:
enabled: true
upscaling_speed: 1.0
idle_timeout_minutes: 5
worker_nodes:
gpu_worker_group:
min_workers: 0
max_workers: 20
resources: {“CPU”: 4, “memory”: “32Gi”, “GPU”: 1}
“`
RayCluster 定义
RayCluster 资源定义位于 deploy/ray/ray-cluster.yaml,用于声明 Ray 集群的头节点和工作节点组。
“`yaml
deploy/ray/ray-cluster.yaml
apiVersion: ray.io/v1
kind: RayCluster
metadata:
name: rag-ray-cluster
spec:
rayVersion: ‘2.9.0’
headGroupSpec:
serviceType: ClusterIP
template:
spec:
containers:
– name: ray-head
image: rayproject/ray:2.9.0-py310-gpu
resources: { requests: { cpu: “2”, memory: “8Gi” } }
workerGroupSpecs:
– groupName: gpu-workers
replicas: 0
minReplicas: 0
maxReplicas: 20
template:
spec:
containers:
– name: ray-worker
image: rayproject/ray:2.9.0-py310-gpu
resources:
limits: { nvidia.com/gpu: 1 }
requests: { nvidia.com/gpu: 1 }
tolerations:
– key: “nvidia.com/gpu”
operator: “Exists”
“`
RayService 部署(Embedding)
用于部署嵌入模型服务的 RayService 配置位于 deploy/ray/ray-serve-embed.yaml。
“`yaml
deploy/ray/ray-serve-embed.yaml
apiVersion: ray.io/v1
kind: RayService
metadata:
name: embed-service
spec:
serveConfigV2: |
applications:
– name: bge-m3
import_path: services.api.app.models.embedding_engine:app
deployments:
– name: EmbedDeployment
autoscaling_config:
min_replicas: 1
max_replicas: 5
ray_actor_options:
num_gpus: 0.5
“`
RayService(LLM):deploy/ray/ray-serve-llm.yaml
“`yaml
deploy/ray/ray-serve-llm.yaml
apiVersion: ray.io/v1
kind: RayService
metadata:
name: llm-service
spec:
serveConfigV2: |
applications:
– name: llama3
import_path: services.api.app.models.vllm_engine:app
runtime_env:
pip: [“vllm==0.3.0”]
env_vars:
MODEL_ID: “meta-llama/Meta-Llama-3-70B-Instruct”
deployments:
– name: VLLMDeployment
autoscaling_config:
min_replicas: 1
max_replicas: 10
ray_actor_options:
num_gpus: 1
“`
成本清理脚本 scripts/cleanup.sh:
“`bash
scripts/cleanup.sh
!/bin/bash
echo “⚠️ WARNING: THIS WILL DESTROY ALL CLOUD RESOURCES ⚠️”
echo “Includes: EKS Cluster, Databases (RDS/Neo4j/Redis), S3 Buckets, Load Balancers.”
echo “Cost-saving measure for Dev/Test environments.”
echo “”
read -p “Are you sure? Type ‘DESTROY’: ” confirm
if [ “$confirm” != “DESTROY” ]; then
echo “Aborted.”
exit 1
fi
echo “🔹 1. Deleting Kubernetes Resources (Helm)…”
helm uninstall api || true
helm uninstall qdrant || true
helm uninstall ray-cluster || true
kubectl delete -f deploy/ray/ || true
echo “🔹 2. Waiting for LBs to cleanup…”
sleep 20
echo “🔹 3. Running Terraform Destroy…”
cd infra/terraform
terraform destroy -auto-approve
echo “✅ All resources destroyed.”
“`
通过整合 Terraform、Helm 与 Ray,我们实现了对基础设施的全面掌控,并自动化了分布式系统的复杂性。接下来,我们将探讨如何在生产规模下进行监控与评估。
评估与运维
企业级 RAG 平台需要在开发阶段就重视评估。这要求我们建立可观测性(指标/追踪)、评估(准确性)以及运维(压测/维护)策略。
在生产环境中,用户的“点赞/点踩”反馈虽然有用,但通常稀疏且滞后,难以及时发现问题。
可观测性与追踪
“不可度量就不可优化”。对于涉及 Ray、Kubernetes 及多数据库的分布式系统,若缺乏分布式追踪,几乎无法定位性能瓶颈。
Tracing Logic (Created by Fareed Khan)
在 libs/observability/metrics.py 中定义 Prometheus 指标,用于追踪请求量、延迟和 Token 使用量:
“`python
libs/observability/metrics.py
from prometheus_client import Counter, Histogram
REQUEST_COUNT = Counter(
“rag_api_requests_total”,
“Total number of requests”,
[“method”, “endpoint”, “status”]
)
REQUEST_LATENCY = Histogram(
“rag_api_latency_seconds”,
“Request latency”,
[“endpoint”]
)
TOKEN_USAGE = Counter(
“rag_llm_tokens_total”,
“Total LLM tokens consumed”,
[“model”, “type”]
)
def track_request(method: str, endpoint: str, status: int):
REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
“`
OpenTelemetry 追踪配置 libs/observability/tracing.py:
“`python
libs/observability/tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
def configure_tracing(service_name: str):
provider = TracerProvider()
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
return trace.get_tracer(service_name)
“`
持续评估流水线
采用“LLM-as-a-Judge”方法进行自动质量检查,并结合黄金数据集(Golden Dataset)来评估 RAG 系统的准确率。
Continuous Eval (Created by Fareed Khan)
eval/datasets/golden.json(示例):json
// eval/datasets/golden.json
[
{
"question": "Explain the difference between the Horizontal Pod Autoscaler (HPA) and the Cluster Autoscaler.",
"ground_truth": "The HPA scales the number of Pods based on CPU/metrics. The Cluster Autoscaler scales the number of Nodes when pods are unschedulable.",
"contexts": [
"The Horizontal Pod Autoscaler is a Kubernetes component that adjusts the number of replicas...",
"The Cluster Autoscaler is a tool that automatically adjusts the size of the Kubernetes cluster..."
]
}
// ...
]
Judge 脚本 eval/judges/llm_judge.py:
“`python
eval/judges/llm_judge.py
from pydantic import BaseModel
from services.api.app.clients.ray_llm import llm_client
class Grade(BaseModel):
score: int
reasoning: str
JUDGE_PROMPT = “””
You are an impartial judge evaluating a RAG system.
You will be given a Question, a Ground Truth Answer, and the System’s Answer.
Rate the System’s Answer on a scale of 1 to 5:
1: Completely wrong or hallucinated.
3: Partially correct but missing key details.
5: Perfect, comprehensive, and matches Ground Truth logic.
Output JSON only: {“score”: int, “reasoning”: “string”}
Question: {question}
Ground Truth: {ground_truth}
System Answer: {system_answer}
“””
async def grade_answer(question: str, ground_truth: str, system_answer: str) -> Grade:
import json
try:
response_text = await llm_client.chat_completion(
messages=[{“role”: “user”, “content”: JUDGE_PROMPT.format(
question=question,
ground_truth=ground_truth,
system_answer=system_answer
)}],
temperature=0.0
)
data = json.loads(response_text)
return Grade(**data)
except Exception as e:
return Grade(score=0, reasoning=f”Judge Error: {e}”)
“`
使用 Ragas 进行系统化评估 eval/ragas/run.py:
“`python
eval/ragas/run.py
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy
from datasets import Dataset
def run_evaluation(questions, answers, contexts, ground_truths):
data = {
“question”: questions,
“answer”: answers,
“contexts”: contexts,
“ground_truth”: ground_truths
}
dataset = Dataset.from_dict(data)
results = evaluate(
dataset=dataset,
metrics=[faithfulness, answer_relevancy],
)
df = results.to_pandas()
df.to_csv(“eval/reports/evaluation_results.csv”, index=False)
print(results)
“`
在 CI/CD 流程中,可以设定质量阈值(例如,当 Faithfulness 分数低于 0.8 时,阻断部署流程)。
卓越运营与维护
维护工具:包括压力测试、数据迁移、服务预热等。
Mantaining Deployment (Created by Fareed Khan)
Locust 压测 scripts/load_test.py:
“`python
scripts/load_test.py
from locust import HttpUser, task, between
import os
class RAGUser(HttpUser):
wait_time = between(1, 5)
@task
def chat_stream_task(self):
headers = {"Authorization": f"Bearer {os.getenv('AUTH_TOKEN')}"}
payload = {
"message": "What is the warranty policy for the new X1 processor?",
"session_id": "loadtest-user-123"
}
with self.client.post(
"/api/v1/chat/stream",
json=payload,
headers=headers,
stream=True,
name="/chat/stream"
) as response:
if response.status_code != 200:
response.failure("Failed request")
else:
for line in response.iter_lines():
if line:
pass
response.success()
“`
数据库迁移 scripts/migrate_db.py(包装 Alembic):
“`python
scripts/migrate_db.py
import subprocess
import os
from services.api.app.config import settings
def run_migrations():
print(“Running database migrations…”)
env = os.environ.copy()
env[“DATABASE_URL”] = settings.DATABASE_URL
try:
subprocess.run(
[“alembic”, “upgrade”, “head”],
check=True,
env=env,
cwd=os.path.dirname(os.path.abspath(file))
)
print(“✅ Migrations applied successfully.”)
except subprocess.CalledProcessError as e:
print(f”❌ Migration failed: {e}”)
exit(1)
if name == “main“:
run_migrations()
“`
缓存预热 scripts/warmup_cache.py:
“`python
scripts/warmup_cache.py
import asyncio
from services.api.app.cache.semantic import semantic_cache
FAQ = [
(“What are your business hours?”, “Our business hours are 9 AM to 5 PM.”),
(“What is the return policy?”, “Return within 30 days.”),
]
async def warmup():
print(“🔥 Warming up semantic cache…”)
for question, answer in FAQ:
await semantic_cache.set_cached_response(question, answer)
print(“✅ Cache warmup complete.”)
if name == “main“:
asyncio.run(warmup())
“`
端到端执行
我们已完成代码、基础设施与评估流水线。现在把全部部署起来,运行 RAG 平台。
需要先为 Terraform 配置远端状态:S3 + DynamoDB 锁(一次性)。
创建 EKS 集群
Terraform 后端就绪后,在终端创建 VPC、EKS 控制面与数据库:
注意:此步骤仅创建 EKS 控制面与一个小的 system node group(2 个小 CPU)。不会创建 GPU 节点——只有当软件需要时才按需创建,以节省费用。
执行:
“`bash
项目根目录
make infra
“`
输出略,完成后得到:
Outputs:
aurora_db_endpoint = "rag-platform-cluster-postgres.cluster-c8s7d6f5.us-east-1.rds.amazonaws.com"
redis_primary_endpoint = "rag-redis-prod.ng.0001.use1.cache.amazonaws.com"
s3_bucket_name = "rag-platform-documents-prod"
配置 kubectl 并引导安装系统控制器:
bash
aws eks update-kubeconfig --region us-east-1 --name rag-platform-cluster
./scripts/bootstrap_cluster.sh
接着安装 Karpenter 配置(略)。然后应用 Ray 配置,K8s 会尝试调度需要 GPU 的 pod,Karpenter 将自动购买合适的 g5 实例:
bash
kubectl apply -f deploy/ray/
kubectl get pods -w
等待约 45–90 秒,GPU 节点初始化并就绪,Ray worker 运行中,vLLM 开始加载量化后约 40GB 的 Llama-3-70B 模型。
推理与延迟测试
将 1000 份文档上传至 S3 并触发 Ray 作业(Karpenter 将按需启动 CPU 节点):
bash
python scripts/bulk_upload_s3.py ./data rag-platform-documents-prod
python -m pipelines.jobs.s3_event_handler
Ray Data 仪表盘显示读取、解析、向量化及写入的进度与吞吐。得益于 CPU 并行处理与 GPU 批处理能力,1000 份企业文档在 2 分钟内处理完毕。
随后发起复杂查询,观察 Agent 的规划与检索过程:
“`bash
kubectl get ingress
输出示例:api.your-rag-platform.com
curl -X POST https://api.your-rag-platform.com/api/v1/chat/stream
-H “Content-Type: application/json”
-d ‘{
“message”: “Compare the cost of the HPA vs VPA in Kubernetes based on the docs.”,
“session_id”: “demo-session-1”
}’
“`
系统返回流式响应,可观察到 planner -> retriever -> responder 的状态流转,并生成附带引用来源的专业回答。结构化日志提供了各阶段的延迟分解(如 planner 节点耗时、retrieval 并行时间、vLLM 首字生成时间等),整体响应时间在 2–4 秒内完成。
为在引入更多 Agent(如代码审查、网络搜索)后仍能维持约 3 秒的延迟,可将不同 Agent 分布到多个 Ray actors 中并行执行。
Redis 与 Grafana 仪表盘分析
使用 Locust 启动 500 并发压测:
bash
locust -f scripts/load_test.py --headless -u 500 -r 10 --host https://api.your-rag-platform.com
观察 Ray 与 Grafana 仪表盘:随着请求队列加深,Ray Autoscaler 请求更多 GPU worker,Karpenter 在秒级内拉起新的 g5 节点。压测初期延迟上升至约 5 秒,随后系统稳定,延迟恢复至约 1.2 秒。
Autoscaler 日志显示扩容触发,目标节点数提升,在 90 秒内处理容量实现翻倍,延迟恢复稳定。
至此,系统已实现在高噪声数据下保持高效的文档摄取与处理能力,具备自动伸缩性,并在负载降低后“缩容至零”以优化成本。
您可以通过以下 GitHub 项目获取相关实现:
https://github.com/FareedKhan-dev/scalable-rag-pipeline
基于此,您可以继续深入,构建更复杂的 Agentic RAG Pipeline。
关注“鲸栖”小程序,掌握最新AI资讯
本文来自网络搜集,不代表鲸林向海立场,如有侵权,联系删除。转载请注明出处:http://www.itsolotime.com/archives/18594
