
本项目构建了一个由 AI 驱动的聊天机器人,能够将自然语言问题转换为 SQL 查询,并直接从 SQLite 数据库中检索答案。该应用结合了 LangChain、Hugging Face Embeddings 和 Chroma 向量存储,通过检索增强生成(RAG)工作流,将非结构化的用户输入与结构化数据库连接起来,并配备了 FastAPI 后端与 Streamlit 前端界面。
引言:为什么是 Text-to-SQL?
设想一个场景:在会议中,经理突然提问:
“我们能看看上个月新加入的所有客户吗?”
你看向 SQL 编辑器,意识到需要现场编写查询、确认表名,甚至可能需要调试一个缺失的 JOIN 语句。与此同时,另一个人直接向 AI 聊天机器人提出了同样的问题,并立即获得了格式清晰的答案。
这正是 Text-to-SQL 的魅力所在——将自然语言转化为数据库查询。
面临的挑战
SQL 是数据分析与数据工程的基石。它功能强大、灵活且精确,但对许多人而言并不“友好”。大多数业务用户、分析师,甚至部分开发者,都认为 SQL 语法令人望而生畏,或者在需要快速获取洞察时效率低下。
这在许多团队中造成了鸿沟:
* 数据是可用的,但无法“开口即问”。
* 洞察是存在的,却被 SQL 专业技能所“锁定”。
解决方案:Text-to-SQL
Text-to-SQL 旨在弥合这一鸿沟。它允许任何人——无论技术背景如何——直接提问,例如:
“上个季度我们最畅销的五款产品是什么?”
并直接从数据库中获得答案,AI 则在背后完成所有的翻译工作。
在幕后,一个典型的 Text-to-SQL 系统需要完成四项关键任务:
1. 检索相关的数据库模式(Schema)与上下文(了解有哪些表及其关系)。
2. 生成与自然语言问题对应的有效 SQL 查询。
3. 安全地校验和执行生成的 SQL 语句。
4. 以易于阅读的格式返回查询结果。
理解 RAG 方法
我们已经看到,Text-to-SQL 允许用户用自然语言提问并获取数据库结果。但 AI 是如何“知道”你的具体表名、列名和表间关系的呢?
答案是:RAG(检索增强生成)。
什么是 RAG?
本质上,RAG 是一种结合了“检索”与“生成”的混合式 AI 方法:
1. 检索:系统从知识源中抓取相关信息(在本场景中,即数据库模式、表名、表间关系等)。
2. 生成:大型语言模型(LLM)根据检索到的上下文,生成准确且有依据的输出(即 SQL 查询)。
你可以将 RAG 理解为:在 LLM 开始编写 SQL 之前,先为它提供一份“备考小抄”——即它需要了解的准确数据库结构信息。
为什么不直接使用 LLM?
如果直接向一个通用的大型语言模型(如 GPT 或 Claude)提问:
“列出上个月加入的所有客户。”
它可能会生成如下 SQL:
SELECT * FROM users WHERE signup_date >= '2025-09-01';
这看起来似乎正确,但问题在于:你的数据库中可能根本没有名为 users 的表。实际的表名可能是 customer_data 或 crm_clients。
这就是核心问题:当缺乏具体上下文时,LLM 容易产生“幻觉”。它会猜测表名、遗漏 JOIN 关系或误用列名,因为它默认并不知晓你的特定数据库结构。
RAG 如何解决此问题
RAG 通过让模型基于“真实、可检索的知识”进行推理,使其“脚踏实地”。
在一个 Text-to-SQL 的 RAG 流程中,会发生以下步骤:
1. 检索模式上下文:系统在你的模式索引(如向量数据库)中搜索相关的表名、列描述、关系等信息。
2. 生成 SQL:LLM 同时接收用户的自然语言问题和检索到的模式上下文,据此生成有效的 SQL 查询。
3. 执行并返回结果:生成的查询经过校验后,在真实数据库中执行,并将结果返回给用户。
这确保了每个 SQL 查询都基于“了解模式后的推理”,而非模型的凭空猜测。
RAG 流程概览
一个简化的 RAG 工作流循环如下:
[用户提问]
↓
[检索模式信息]
↓
[生成 SQL (LLM)]
↓
[校验 + 执行]
↓
[返回结果]
↺
每一轮交互都确保模型在生成查询前,拥有“最新且准确”的模式知识,从而减少幻觉并提升可靠性。
系统架构
理解了 Text-to-SQL 为何依赖 RAG 方法后,我们来剖析系统的内部架构。
为了使讨论更具体,假设我们正在构建一个聊天机器人,用于回答关于公司客户数据库的自然语言问题,例如“上个月谁加入了?”或“本季度我们的平均订单金额是多少?”。
下图展示了系统的大致结构 👇
1. SQLite 数据库——结构化数据源
每个 Text-to-SQL 系统都始于一个数据源。为简化原型开发和测试,我们使用轻量级的、基于文件的 SQLite 数据库。
这里存放着你的真实业务数据,例如 customers、orders、products 等表。当用户提问时,我们的目标是将问题转换为能在此数据库上执行的有效 SQL 查询。
2. 嵌入层——将模式转化为向量
在模型生成 SQL 之前,它需要“理解”数据库的结构——包括表名、列名及其含义。
我们通过嵌入来实现这一点:嵌入是对文本的数字化(向量)表示,能够捕捉语义信息。借助 Hugging Face 的嵌入模型(如 all-MiniLM-L6-v2),我们将数据库模式的元数据转换为向量:
import os
import sqlite3
import hashlib
from tqdm import tqdm
from dotenv import load_dotenv
from langchain_community.embeddings.huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
# 加载环境变量
load_dotenv()
SQLITE_PATH = os.getenv("SQLITE_PATH", "sample_db/sample.db")
CHROMA_DIR = os.getenv("CHROMA_DIR", "./chroma_persist")
EMBED_MODEL = os.getenv("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
# ✅ 初始化 HuggingFace 嵌入模型
embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL)
# ✅ 创建/加载 Chroma 向量存储
vectorstore = Chroma(
collection_name="sqlite_docs",
persist_directory=CHROMA_DIR,
embedding_function=embeddings
)
def row_hash(values):
"""为一行数据生成唯一哈希值。"""
return hashlib.sha256("|".join(map(str, values)).encode()).hexdigest()
def row_to_text(table, cols, row):
"""将 SQLite 数据行转换为可读的文本块。"""
return f"Table: {table}n" + "n".join([f"{c}: {v}" for c, v in zip(cols, row)])
def index_table(conn, table):
"""将单个表索引到向量存储中。"""
cur = conn.cursor()
cur.execute(f"PRAGMA table_info({table});")
cols = [c[1] for c in cur.fetchall()]
cur.execute(f"SELECT {', '.join(cols)} FROM {table}")
rows = cur.fetchall()
docs, ids, metas = [], [], []
for r in rows:
txt = row_to_text(table, cols, r)
pk = str(r[0])
hid = row_hash(r)
ids.append(f"{table}:{pk}")
docs.append(txt)
metas.append({"table": table, "pk": pk, "hash": hid})
# 添加到 Chroma 向量存储
vectorstore.add_texts(texts=docs, metadatas=metas, ids=ids)
def main():
"""主索引流程。"""
conn = sqlite3.connect(SQLITE_PATH)
cur = conn.cursor()
cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
tables = [t[0] for t in cur.fetchall()]
for t in tqdm(tables, desc="Indexing tables"):
index_table(conn, t)
conn.close()
print("索引完成,已持久化到 Chroma。")
if __name__ == "__main__":
main()
现在,每张表和每个字段都在高维空间中拥有了向量表示,为后续的智能检索奠定了基础。
3. 向量存储(Chroma)——定位相关模式
接下来,我们将这些嵌入向量存储到向量数据库中,这里选用 Chroma。
当用户提问时,系统会:
1. 使用相同的嵌入模型将用户问题向量化。
2. 在 Chroma 中检索与问题向量“最相似”的模式元素(如表、列)。
例如,当用户询问“最近有哪些用户注册?”时,检索器可能会找到 customers.signup_date 和 customers.name 等相关的模式项。这确保了模型在生成 SQL 时,只看到与之相关的数据库上下文,从而让输出“脚踏实地”。
4. LangChain 组件——系统的“大脑”
LangChain 作为框架,将各个环节串联起来,提供了用于检索、推理和 SQL 生成的模块化组件:
* 检索器:从 Chroma 向量存储中提取相关的模式片段。
* 链:定义执行步骤的序列,例如:检索 → 生成 → 校验 → 执行。
* LLM:基于用户问题和检索到的模式上下文,生成最终的 SQL 查询语句。
4. LangGraph 与 LangChain —— 编排 RAG 流水线
LangChain 作为核心编排框架,负责将检索、生成、验证和执行等步骤串联成一个完整的 RAG 工作流。它确保了用户问题能够顺畅地走完从自然语言到 SQL 查询结果的整个回路。
示例代码展示了关键组件的初始化,包括向量数据库、嵌入模型、大语言模型以及用于生成 SQL 的提示词模板。
from typing import TypedDict, List
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import PromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_community.embeddings.huggingface import HuggingFaceEmbeddings
from dotenv import load_dotenv
import os
load_dotenv()
# 定义工作流状态的数据结构
class RAGState(TypedDict, total=False):
question: str
retrieved_docs: List[str]
generated_sql: str
validated_sql: str
sql_result: List[dict]
messages: List[dict]
# 初始化向量数据库、嵌入模型和 LLM
CHROMA_DIR = os.getenv("CHROMA_DIR", "./chroma_persist")
EMBED_MODEL = os.getenv("EMBED_MODEL", "text-embedding-3-small")
LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o-mini")
TOP_K = int(os.getenv("TOP_K", "8"))
embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL)
vectordb = Chroma(persist_directory=CHROMA_DIR, embedding_function=embeddings, collection_name="sqlite_docs")
retriever = vectordb.as_retriever(search_kwargs={"k": TOP_K})
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0)
sql_prompt = PromptTemplate.from_template("""
You are a SQL generator. Based on the following context, generate a SINGLE READ-ONLY SQLite SELECT query (no semicolons, no multiple statements).
Context:
{context}
Question:
{question}
Return only the SQL SELECT statement.
""")
async def retriever_node(state: RAGState) -> RAGState:
docs = await retriever.ainvoke(state["question"])
state["retrieved_docs"] = [d.page_content for d in docs]
return state
import re
from typing import Any
async def sql_generator_node(state: RAGState) -> RAGState:
"""
Generate SQL from the retrieved documents and user question.
Cleans LLM output, removes markdown/code fences, and ensures only SELECT statements remain.
"""
# 1. Combine retrieved documents
context = "nn".join(state.get("retrieved_docs", []))
# 2. Format the prompt
prompt_text = sql_prompt.format(context=context, question=state["question"])
# 3. Call the LLM asynchronously
out = await llm.ainvoke(prompt_text)
# 4. Extract text content if output is an AIMessage or ChatResult
if hasattr(out, "content"):
out = out.content
out = str(out).strip()
# 5. Remove code fences ``` or ```sql and any leading/trailing whitespace
out = re.sub(r"```(?:sql)?n?", "", out, flags=re.IGNORECASE).replace("```", "").strip()
# 6. Ensure the SQL starts with SELECT (case-insensitive)
match = re.search(r"(selectb.*)", out, flags=re.IGNORECASE | re.DOTALL)
if match:
out = match.group(1).strip()
else:
# fallback if no SELECT found
out = ""
# 7. Optional: remove trailing semicolon if present
out = out.rstrip(";").strip()
# 8. Save cleaned SQL back to state
state["generated_sql"] = out
return state
5. FastAPI 后端——封装流水线
为了让整个系统能够通过 API 交互,我们使用 FastAPI 将 RAG 流水线封装起来。FastAPI 以其简洁和高性能的特性,能够高效地将内部处理流程暴露为 RESTful 端点。
当用户通过 POST 请求提交一个自然语言问题时,后端 API 会依次执行以下步骤:
1. 检索:根据问题检索相关的数据库 Schema 信息。
2. 生成与校验:利用检索到的上下文生成 SQL 语句,并进行安全性和语法校验。
3. 执行:在 SQLite 数据库中执行校验通过的 SQL 查询。
4. 返回:将查询结果格式化为 JSON 返回给前端。
这一层是整个 Text-to-SQL 聊天机器人的核心“引擎室”。
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from dotenv import load_dotenv
from server.langgraph_nodes import retriever_node, sql_generator_node
from server.sql_validator import validate_sql
from server.executor import execute_sql
from server.utils import allowed_tables_from_db
load_dotenv()
SQLITE_PATH = os.getenv("SQLITE_PATH", "sample_db/sample.db")
ALLOWED_TABLES = allowed_tables_from_db(SQLITE_PATH)
app = FastAPI(title="RAG Text->SQL API")
class QueryRequest(BaseModel):
question: str
show_sql: bool = True
@app.post("/query")
async def query(req: QueryRequest):
state = {"question": req.question, "messages": []}
# 1. retrieve
state = await retriever_node(state)
# 2. generate SQL
state = await sql_generator_node(state)
sql = state["generated_sql"]
ok, reason = validate_sql(sql, ALLOWED_TABLES)
if not ok:
raise HTTPException(status_code=400, detail=f"SQL validation failed: {reason}. SQL: {sql}")
# 3. execute
cols, rows = execute_sql(sql)
# format result rows
result = [dict(zip(cols, r)) for r in rows]
return {"sql": sql if req.show_sql else None, "cols": cols, "rows": result}
6. Streamlit 前端——交互式聊天界面
最后,我们使用 Streamlit 构建一个轻量级的前端 Web 界面,为用户提供直观的交互入口。在这个界面中,用户可以:
* 输入自然语言问题。
* 选择是否查看后端生成的 SQL 语句。
* 实时查看查询返回的结构化数据结果。
Streamlit 以其简单易用的特性,非常适合快速构建 AI 应用的原型界面。只需少量 Python 代码,即可获得一个能够“与数据对话”的交互式仪表盘。

端到端架构整合
整个系统的数据流和组件交互如下图所示,清晰地展示了一条查询从用户输入到最终结果返回的完整生命周期。
User (Streamlit UI)
↓
FastAPI Backend
↓
LangChain RAG Pipeline
├── Retriever (Chroma Vector DB)
├── Embedding Layer (Hugging Face)
├── LLM (SQL Generator)
↓
SQLite Database (Execute SQL)
↓
Results → Back to UI
环境准备
- 安装项目依赖:
bash
pip install -r requirements.txt
SQLite 数据库准备
- 准备一个 SQLite 数据库文件,并明确其 Schema 结构(例如包含
customers、orders等表)。
构建示例数据库
首先,我们需要一个用于演示的数据库。以下Python脚本创建了一个SQLite数据库,包含customers(客户)和orders(订单)两张表,并插入了示例数据。
import sqlite3
import os
# 创建数据库目录和文件
os.makedirs("sample_db", exist_ok=True)
DB = "sample_db/sample.db"
conn = sqlite3.connect(DB)
cur = conn.cursor()
# 创建 customers 表
cur.execute("""
CREATE TABLE IF NOT EXISTS customers (
id INTEGER PRIMARY KEY,
name TEXT,
email TEXT,
created_at TEXT
);
""")
# 创建 orders 表,并关联 customers 表
cur.execute("""
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY,
customer_id INTEGER,
total_amount REAL,
status TEXT,
created_at TEXT,
notes TEXT,
FOREIGN KEY(customer_id) REFERENCES customers(id)
);
""")
# 插入客户数据
customers = [
(1, "Alice Johnson", "alice@example.com", "2024-12-01"),
(2, "Bob Lee", "bob@example.com", "2024-12-05"),
(3, "Carol Singh", "carol@example.com", "2024-12-10"),
(4, "David Kim", "david.kim@example.com", "2024-12-12"),
]
# 插入订单数据
orders = [
(1, 1, 120.50, "completed", "2025-01-03", "First order"),
(2, 1, 15.00, "pending", "2025-01-07", "Gift wrap"),
(3, 2, 250.00, "completed", "2025-02-10", "Bulk order"),
]
cur.executemany("INSERT OR REPLACE INTO customers VALUES (?,?,?,?)", customers)
cur.executemany("INSERT OR REPLACE INTO orders VALUES (?,?,?,?,?,?)", orders)
conn.commit()
conn.close()
将上述代码保存为 sample_db/create_sample_db.py,然后运行以下命令来创建数据库:
python sample_db/create_sample_db.py
Schema 的 Embedding 与索引
为了让AI模型能够理解数据库结构(如表、列及其含义),我们需要将Schema信息转换为机器可理解的格式。这通过Embeddings(向量嵌入)和语义索引技术实现。
什么是 Embeddings?
Embedding 是将文本(如单词、短语或句子)转换为高维空间中的数值向量(一组数字)。这种转换能捕捉文本的语义信息。例如,“客户姓名”和“用户全名”的向量在空间中会非常接近,因为它们含义相似。
通过为数据库中的表名、列名甚至样本数据创建向量表示,模型就能在用户提问时,快速检索到语义上相关的上下文信息。例如,当用户询问“显示最近注册的用户”时,系统可以匹配到 created_at、signup_date 等列。
技术栈
- Hugging Face Embeddings: 用于将文本转换为高质量的向量。
- Chroma Vector Store: 一个轻量级、高效的向量数据库,用于存储和检索这些向量。
- SQLite: 作为我们被索引和查询的目标数据库。
索引脚本实现
以下脚本展示了如何将SQLite数据库的Schema和数据索引到Chroma向量数据库中。
import sqlite3
import hashlib
from tqdm import tqdm
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
# 配置参数
EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
SQLITE_PATH = "sample_db/sample.db"
CHROMA_DIR = "./chroma_db"
# 初始化Embedding模型
embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL)
# 创建/加载Chroma向量存储
vectorstore = Chroma(
collection_name="sqlite_docs",
persist_directory=CHROMA_DIR,
embedding_function=embeddings
)
def row_hash(values):
"""为数据行生成唯一哈希值。"""
return hashlib.sha256("|".join(map(str, values)).encode()).hexdigest()
def row_to_text(table, cols, row):
"""将SQLite的一行数据转换为可读的文本片段。"""
return f"Table: {table}n" + "n".join([f"{c}: {v}" for c, v in zip(cols, row)])
def index_table(conn, table):
"""将单个表的数据索引到向量存储中。"""
cur = conn.cursor()
# 获取表的列信息
cur.execute(f"PRAGMA table_info({table});")
cols = [c[1] for c in cur.fetchall()]
# 获取表的所有数据
cur.execute(f"SELECT {', '.join(cols)} FROM {table}")
rows = cur.fetchall()
docs, ids, metas = [], [], []
for r in rows:
txt = row_to_text(table, cols, r)
pk = str(r[0]) # 假设第一列是主键
hid = row_hash(r)
ids.append(f"{table}:{pk}")
docs.append(txt)
metas.append({"table": table, "pk": pk, "hash": hid})
# 批量添加到Chroma
vectorstore.add_texts(texts=docs, metadatas=metas, ids=ids)
def main():
"""主索引流程。"""
conn = sqlite3.connect(SQLITE_PATH)
cur = conn.cursor()
# 获取所有用户表(排除系统表)
cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
tables = [t[0] for t in cur.fetchall()]
for t in tqdm(tables, desc="Indexing tables"):
index_table(conn, t)
conn.close()
print("索引完成,已持久化到Chroma数据库。")
if __name__ == "__main__":
main()
将上述代码保存为 ingestion/index_sqlite.py,然后运行以下命令来生成索引:
python ingestion/index_sqlite.py
索引流程详解
- 提取Schema和数据:遍历数据库中的每张表,获取其列名和所有行数据。每一行数据都被格式化为一个描述性的文本“文档”(例如:
Table: customersnid: 1nname: Alice Johnsonnemail: alice@example.com)。 - 文本转向量:使用
HuggingFaceEmbeddings模型,将上一步生成的每个文本片段转换为一个高维向量。语义相近的文本(如包含“客户”和“姓名”的片段)会得到相似的向量。 - 存入向量数据库:将这些向量及其对应的原始文本、元数据(如表名、主键值、哈希ID)一起存储到Chroma数据库中。
- 启用语义搜索:当用户提出自然语言问题时,系统会将问题文本转换为向量,并在Chroma中进行相似性搜索,快速找到与问题最相关的数据库Schema或数据片段。
为什么索引至关重要
如果没有对Schema进行索引,大语言模型就如同在“黑暗中摸索”——它不了解数据库中有哪些表、列以及它们的数据类型和含义。通过对Schema进行Embedding和索引,我们为模型构建了一套“语义记忆库”。在生成SQL时,模型可以动态地从这套记忆库中检索出最相关的上下文信息,从而显著提高生成SQL的准确性和可靠性。
整个索引过程的流程图如下:
SQLite Database
↓
Extract Tables + Columns
↓
Convert to Embeddings (Hugging Face)
↓
Store in Chroma Vector DB
↓
Semantic Search During Query Time
RAG 查询流程
在构建好Schema索引之后,我们就可以利用检索增强生成(RAG)技术来处理用户查询。这个流程是系统的核心:接收用户问题、检索相关Schema上下文、生成SQL、执行并返回结果。
第 1 步:检索相关 Schema
当用户提出一个问题时(例如:“显示上个月加入的所有客户”),系统首先会将这个问题文本转换为向量。然后,利用Chroma的检索器(Retriever),在之前建立的向量索引中查找语义最相关的Schema片段。
例如,对于上面的问题,检索器可能会找到与 customers 表、created_at 列相关的文档。这些检索到的上下文为模型提供了关于数据库结构的精确“备忘单”。
一个简化的检索节点函数示例如下:
async def retriever_node(state: RAGState) -> RAGState:
# 使用检索器获取相关文档
docs = await retriever.ainvoke(state["question"])
# 将文档内容存入状态
state["retrieved_docs"] = [d.page_content for d in docs]
return state
第 2 步:通过 LLM 生成 SQL
接下来,我们将用户的原始问题(question)和上一步检索到的相关Schema上下文(context)组合在一起,构造一个提示词(Prompt),发送给大语言模型(如GPT-4、Claude或本地模型)。
提示词会明确指示模型扮演一个SQL生成器的角色,并基于给定的上下文生成一个单一的、只读的 SELECT 查询语句。
from langchain.prompts import PromptTemplate
sql_prompt = PromptTemplate.from_template("""
你是一个SQL生成器。请根据以下上下文信息,生成一个**单一的、只读的**SQLite SELECT查询语句(不要分号,不要多条语句)。
数据库上下文:
{context}
用户问题:
{question}
请只返回SQL SELECT语句。
""")
# 使用提示词和LLM生成SQL
llm_chain = sql_prompt | llm
sql_response = await llm_chain.ainvoke({"context": context, "question": question})
generated_sql = sql_response.content.strip()
通过这种方式,LLM生成的SQL不再是基于其固有知识的泛化猜测,而是紧密结合了当前数据库的具体结构,从而极大提升了准确性和适用性。
LLM 的输出会被清洗并校验,以确保其只包含纯净的 SQL 语句:
async def sql_generator_node(state: RAGState) -> RAGState:
context = "nn".join(state.get("retrieved_docs", []))
prompt_text = sql_prompt.format(context=context, question=state["question"])
out = await llm.ainvoke(prompt_text)
out = str(getattr(out, "content", out)).strip()
out = re.sub(r"```(?:sql)?n?", "", out, flags=re.I).replace("```", "").strip()
match = re.search(r"(selectb.*)", out, flags=re.I | re.DOTALL)
if match:
out = match.group(1).rstrip(";").strip()
state["generated_sql"] = out
return state
至此,聊天机器人已能实现“自然语言输入,SQL 输出”的核心功能。
第 3 步:SQL 安全校验
在执行查询前,必须进行安全验证以保护数据库并确保查询的正确性。
我们使用 sqlglot 库来实现以下校验:
* 拒绝任何非 SELECT 语句(如 DELETE、UPDATE 等)。
* 检查 SQL 语句是否只引用了允许访问的表。
* 安全地解析 SQL 语法。
def validate_sql(sql: str, allowed_tables: Set[str]) -> Tuple[bool, str]:
if ";" in sql:
return False, "semicolon or multiple statements not allowed"
for kw in DISALLOWED:
if f" {kw} " in f" {sql.lower()} ":
return False, f"disallowed keyword: {kw}"
try:
parsed = sqlglot.parse_one(sql, read="sqlite")
except Exception as e:
return False, f"sql parse error: {e}"
if parsed.key.lower() not in ALLOWED_STATEMENTS:
return False, "only SELECT statements allowed"
tables = extract_tables(sql)
if not tables.issubset(allowed_tables):
return False, f"disallowed tables used: {tables - allowed_tables}"
return True, "ok"
这一校验层确保了系统是“只读且安全”的。
第 4 步:在 SQLite 中执行查询
通过校验的 SQL 语句将在 SQLite 数据库中执行。我们还添加了行数限制等保护措施,以避免执行负载过重的查询。
def execute_sql(sql: str, row_limit: int = 1000, timeout=5.0) -> Tuple[List[str], List[Tuple[Any]]]:
sql = enforce_limit(sql, row_limit)
conn = open_ro_conn()
conn.execute(f"PRAGMA busy_timeout = {int(timeout*1000)};")
cur = conn.cursor()
cur.execute(sql)
cols = [c[0] for c in cur.description] if cur.description else []
rows = cur.fetchmany(row_limit)
conn.close()
return cols, rows
执行结果随后被封装成 JSON 格式返回给用户。
FastAPI 后端
当 RAG 管道构建并测试完成后,下一步是将其“封装为 API”,以便用户和前端能够实时发送问题、获取 SQL 结果并进行交互。
FastAPI 非常适合此类需求——它快速、类型安全、原生支持异步,是服务 AI 工作流的理想选择。
为什么选择 FastAPI?
FastAPI 提供以下优势:
* 高性能:基于异步 I/O,并自动生成交互式 API 文档(Swagger / ReDoc)。
* 易于集成:便于与 LangChain、Chroma 或本地模型对接。
* 数据校验:通过 Pydantic 确保输入参数符合规范。
* 可扩展性:可轻松部署到 Docker、Serverless 或本地私有化环境。
简而言之,它是封装 RAG Text-to-SQL 逻辑的理想框架。
API 设计概述
我们暴露一个主要的端点:
POST /query
请求体示例:
{
"question": "Show me all customers who joined last month",
"show_sql": true
}
响应示例:
{
"sql": "SELECT * FROM customers WHERE join_date >= '2025-09-01'",
"cols": ["id", "name", "join_date"],
"rows": [
{"id": 1, "name": "Alice", "join_date": "2025-09-05"},
{"id": 2, "name": "Bob", "join_date": "2025-09-12"}
]
}
完整代码示例:
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from dotenv import load_dotenv
# 导入核心 RAG 组件
from server.langgraph_nodes import retriever_node, sql_generator_node
from server.sql_validator import validate_sql
from server.executor import execute_sql
from server.utils import allowed_tables_from_db
# 加载环境变量
load_dotenv()
SQLITE_PATH = os.getenv("SQLITE_PATH", "sample_db/sample.db")
ALLOWED_TABLES = allowed_tables_from_db(SQLITE_PATH)
app = FastAPI(title="RAG Text->SQL API")
class QueryRequest(BaseModel):
question: str
show_sql: bool = True
@app.post("/query")
async def query(req: QueryRequest):
# 初始化对话状态
state = {"question": req.question, "messages": []}
# 1️⃣ 检索相关的数据库 Schema 信息
state = await retriever_node(state)
# 2️⃣ 使用 LLM 生成 SQL 查询
state = await sql_generator_node(state)
sql = state["generated_sql"]
# 3️⃣ 验证 SQL 安全性
ok, reason = validate_sql(sql, ALLOWED_TABLES)
if not ok:
raise HTTPException(status_code=400, detail=f"SQL validation failed: {reason}. SQL: {sql}")
# 4️⃣ 安全地执行 SQL
cols, rows = execute_sql(sql)
result = [dict(zip(cols, r)) for r in rows]
# 5️⃣ 返回 JSON 响应
return {
"sql": sql if req.show_sql else None,
"cols": cols,
"rows": result
}
处理流程分解
- 接收问题:用户通过 POST 请求发送自然语言问题。
- 检索 Schema 上下文:
retriever_node从 Chroma 向量数据库中检索最相关的表和列信息。 - 生成 SQL:
sql_generator_node利用检索到的上下文和用户问题,生成SELECT语句。 - 安全校验:
validate_sql()检查查询格式,并确保只访问允许的表。 - 执行并返回:
execute_sql()在只读模式的 SQLite 数据库上执行查询,并返回格式化的结果。
设计优势
- 模块化:检索、生成、校验、执行等组件均可独立替换或扩展。
- 安全性:通过白名单机制,仅允许执行
SELECT语句。 - 异步支持:整个管道支持并发运行,易于根据用户负载进行扩展。
- 易于部署:可轻松打包为 Docker 镜像,快速部署到云平台(如 AWS/GCP)或 Hugging Face Spaces。
[用户 / Streamlit 前端]
↓ (POST /query)
FastAPI 后端
├── retriever_node() → Chroma 向量数据库
├── sql_generator_node() → LLM (OpenAI / Gemini / Mistral)
├── validate_sql() → SQLGlot 解析器
└── execute_sql() → SQLite 数据库
↓
JSON 响应
这种模块化的架构使得后端系统健壮、透明,并具备生产就绪度。
Streamlit 前端
在 FastAPI 后端运行起来之后,我们可以为其构建一个“可交互”的 Web 应用界面。目标是创建一个简单的网页,让任何用户都能输入自然语言问题、查看生成的 SQL 语句,并实时看到查询结果——在几秒钟内完成整个交互闭环。
完整代码
import streamlit as st
import requests
API_URL = "http://localhost:8000/query"
st.set_page_config(page_title="RAG Text→SQL Demo", layout="centered")
st.title("RAG Text → SQL (SQLite replica)")
with st.form("query_form"):
question = st.text_input(
"Ask a natural language question (about the DB)",
value="Show total orders per customer"
)
show_sql = st.checkbox("Show generated SQL", value=True)
submitted = st.form_submit_button("Submit")
if submitted:
# Ensure types are correct
question = str(question).strip()
show_sql = bool(show_sql)
if not question:
st.warning("Please enter a question.")
else:
payload = {"question": question, "show_sql": show_sql}
with st.spinner("Querying..."):
try:
resp = requests.post(API_URL, json=payload, timeout=60)
resp.raise_for_status()
data = resp.json()
if show_sql:
st.subheader("🧠 Generated SQL")
st.code(data.get("sql", ""), language="sql")
st.subheader("📊 Query Results")
rows = data.get("rows", [])
if rows:
st.dataframe(rows)
else:
st.info("No rows returned for this query.")
except requests.exceptions.HTTPError as http_err:
st.error(f"HTTP error occurred: {http_err} - {resp.text}")
except requests.exceptions.ConnectionError:
st.error("Could not connect to the API. Make sure FastAPI is running on localhost:8000.")
except requests.exceptions.Timeout:
st.error("Request timed out. Try again later.")
except Exception as e:
st.error(f"Unexpected error: {e}")
工作原理
- 用户输入:用户在 Streamlit 文本框中输入自然语言问题。
- 表单提交:Streamlit 将问题及
show_sql标志作为 JSON 负载,发送至 FastAPI 的/query端点。 - 后端处理:FastAPI 后端运行完整的 RAG 管道:
- 检索相关的数据库 schema 片段。
- 利用 LLM 生成 SQL 查询。
- 在 SQLite 数据库上校验并执行生成的 SQL。
- 结果展示:Streamlit 接收 JSON 响应后:
- 当
show_sql=True时,显示生成的 SQL 代码。 - 使用交互式表格展示查询结果。
- 当
+----------------------------------------------------------+
| 🧠 RAG Text → SQL (SQLite Replica) |
| ------------------------------------------------------ |
| Ask a natural language question: |
| [ Show total orders per customer ] [Submit] |
| |
| 🧠 Generated SQL |
| SELECT customer_id, COUNT(*) AS total_orders |
| FROM orders GROUP BY customer_id |
| |
| 📊 Query Results |
| ┌──────────────┬───────────────┐ |
| │ customer_id │ total_orders │ |
| ├──────────────┼───────────────┤ |
| │ 1 │ 12 │ |
| │ 2 │ 8 │ |
| └──────────────┴───────────────┘ |
+----------------------------------------------------------+
这为用户构建了一个“即时反馈”闭环——从自然语言输入到 SQL 生成,再到数据可视化。
运行应用:
# 终端 1:启动 FastAPI 后端
uvicorn main:app --reload
# 终端 2:启动 Streamlit 前端
streamlit run app.py
演示
问题:Show total orders per customer

问题:Total revenue from completed orders

下一步:增量 Embeddings 与可扩展更新
至此,你已经构建了一个完整的 RAG 驱动 Text-to-SQL 管道,涵盖了从 schema 嵌入与检索到 SQL 生成、校验和执行的完整流程。然而,在真实生产环境中,数据库是持续演变的:表结构会变更、新记录会插入、列定义也会随时间调整。
那么,如何确保嵌入索引(以及整个 RAG 系统)与数据库保持同步呢?
以下是将项目推向生产环境的一些关键方向:
1. 处理动态数据库
在原型中,我们在系统启动时仅对数据库进行一次全量索引。这对于静态数据集是可行的,但生产系统需要在以下场景中支持“增量嵌入更新”:
* 表结构变更(新增或删除列)。
* 新增对语义有实质性影响的数据行。
* schema 文档或元数据发生变化。
核心策略是仅嵌入发生变化的部分,而非每次都进行全量重建。
增量索引策略
- 变更追踪:使用时间戳或行哈希值来追踪数据库的更新。
- 元数据比对:将变更与向量数据库(如 Chroma)中已存储的元数据(例如哈希 ID)进行比较。
- 定向重嵌:仅对发生修改的数据行或新增的表进行重新嵌入。
- 增量更新:利用向量数据库(如 Chroma 的
add_texts()API)持久化地增量更新索引。
这种方法能最大限度地降低计算成本、缩短更新时间并减少冗余,在处理大规模数据时尤为重要。
2. 使用后台任务实现自动化
重新索引或更新嵌入向量的操作不应阻塞用户的实时查询。可以将这些操作委托给后台任务处理:
* Celery:配合 Redis 或 RabbitMQ,实现分布式任务调度。
* FastAPI BackgroundTasks:利用 FastAPI 内置的后台任务功能进行轻量级的异步更新。
以下是使用 FastAPI BackgroundTasks 的示例:
from fastapi import BackgroundTasks
@app.post("/update_embeddings")
async def update_embeddings(background_tasks: BackgroundTasks):
background_tasks.add_task(reindex_changed_tables)
return {"status": "update scheduled"}
通过这种方式,主 API(如 /query)可以保持高响应性,而索引更新则在后台异步执行。
3. 未来增强方向
- 模型微调:基于历史查询日志,对 SQL 生成模型进行微调,使其更贴合特定业务场景。
- 查询缓存:为常见问题添加缓存层,提升响应速度并降低 API 调用成本。
- 高级可视化:在 Streamlit 前端集成图表库,动态地将查询结果转化为可视化图表。
- 扩展向量数据库:面向更大规模的数据集,可迁移至 Pinecone、Qdrant 等专为生产环境设计的向量数据库。
- 反馈闭环:允许用户对生成的 SQL 进行纠正,并将这些反馈用于持续改进模型。
这些改进将推动你的系统向一个“具备自我学习能力”的数据助手演进,使其能够更好地理解不断演进的数据库 schema 和业务逻辑。
关键收获
你已经成功构建了一个强大的、基于 RAG 的完整 Text-to-SQL 系统,能够将自然语言转化为可操作的数据库洞察。回顾整个项目,你取得了以下成果:
1. 掌握了用于 Text-to-SQL 的 RAG 技术
你学会了如何运用 RAG 来弥合自然语言与结构化数据库 schema 之间的鸿沟,从而显著提升 SQL 生成的准确性并减少“幻觉”。
* 将数据库 schema 和样本数据行转化为嵌入向量,为 LLM 提供上下文。
* 在生成查询前,先检索最相关的 schema 片段。
* 将检索增强与 LLM 推理相结合,产生可靠的 SQL 输出。
2. 构建了模块化、可扩展的架构
你使用 LangChain 和 FastAPI 搭建了一个具备生产级结构的 AI 服务:
* 数据层:SQLite 作为结构化数据源。
* 检索层:Hugging Face Embeddings + Chroma 实现语义检索。
* 编排层:LangChain 协调检索器、提示模板和 LLM。
* 服务层:FastAPI 提供轻量、异步的 RESTful 后端 API。
* 交互层:Streamlit 提供简洁、友好的前端界面。
各层之间高度解耦,允许你独立替换模型或数据库,而不会破坏其他组件。
3. 部署了交互式聊天界面
你使用 Streamlit 将所有功能封装成一个直观的应用,使用户能够:
* 直接输入自然语言问题。
* 查看系统生成的 SQL 代码(可选)。
* 即时在动态、交互式的数据表格中查看查询结果。
这本质上是将原本属于开发者的“编写 SQL”工作,转变为一种“与数据进行自然对话”的体验。
最后的思考
本项目展示了我们与数据交互方式的一次深刻变革:人工智能正在使数据库访问民主化,让任何不具备 SQL 专业知识的人都能提出复杂的数据问题并获取洞察。
通过结合检索增强生成(RAG)、现代嵌入技术与对话式界面,我们构建的工具旨在将数据能力直接赋予用户,使数据洞察的获取更快速、便捷且易于触及。
我们期待听到您的反馈、想法,以及您在构建 Text-to-SQL 系统过程中的实践经验。
您可以在 GitHub 查看完整项目代码并立即开始实践:https://github.com/dharampatel/ConvertQueryToSQL/tree/master
关注“鲸栖”小程序,掌握最新AI资讯
本文由鲸栖原创发布,未经许可,请勿转载。转载请注明出处:http://www.itsolotime.com/archives/13704
