构建智能数据库对话助手:基于RAG的Text-to-SQL聊天机器人实战

构建智能数据库对话助手:基于RAG的Text-to-SQL聊天机器人实战

本项目构建了一个由 AI 驱动的聊天机器人,能够将自然语言问题转换为 SQL 查询,并直接从 SQLite 数据库中检索答案。该应用结合了 LangChain、Hugging Face Embeddings 和 Chroma 向量存储,通过检索增强生成(RAG)工作流,将非结构化的用户输入与结构化数据库连接起来,并配备了 FastAPI 后端与 Streamlit 前端界面。

引言:为什么是 Text-to-SQL

设想一个场景:在会议中,经理突然提问:

“我们能看看上个月新加入的所有客户吗?”

你看向 SQL 编辑器,意识到需要现场编写查询、确认表名,甚至可能需要调试一个缺失的 JOIN 语句。与此同时,另一个人直接向 AI 聊天机器人提出了同样的问题,并立即获得了格式清晰的答案。

这正是 Text-to-SQL 的魅力所在——将自然语言转化为数据库查询。

面临的挑战

SQL 是数据分析与数据工程的基石。它功能强大、灵活且精确,但对许多人而言并不“友好”。大多数业务用户、分析师,甚至部分开发者,都认为 SQL 语法令人望而生畏,或者在需要快速获取洞察时效率低下。

这在许多团队中造成了鸿沟:
* 数据是可用的,但无法“开口即问”。
* 洞察是存在的,却被 SQL 专业技能所“锁定”。

解决方案:Text-to-SQL

Text-to-SQL 旨在弥合这一鸿沟。它允许任何人——无论技术背景如何——直接提问,例如:

“上个季度我们最畅销的五款产品是什么?”

并直接从数据库中获得答案,AI 则在背后完成所有的翻译工作。

在幕后,一个典型的 Text-to-SQL 系统需要完成四项关键任务:
1. 检索相关的数据库模式(Schema)与上下文(了解有哪些表及其关系)。
2. 生成与自然语言问题对应的有效 SQL 查询。
3. 安全地校验和执行生成的 SQL 语句。
4. 以易于阅读的格式返回查询结果。

理解 RAG 方法

我们已经看到,Text-to-SQL 允许用户用自然语言提问并获取数据库结果。但 AI 是如何“知道”你的具体表名、列名和表间关系的呢?

答案是:RAG(检索增强生成)。

什么是 RAG?

本质上,RAG 是一种结合了“检索”与“生成”的混合式 AI 方法:
1. 检索:系统从知识源中抓取相关信息(在本场景中,即数据库模式、表名、表间关系等)。
2. 生成:大型语言模型(LLM)根据检索到的上下文,生成准确且有依据的输出(即 SQL 查询)。

你可以将 RAG 理解为:在 LLM 开始编写 SQL 之前,先为它提供一份“备考小抄”——即它需要了解的准确数据库结构信息。

为什么不直接使用 LLM?

如果直接向一个通用的大型语言模型(如 GPT 或 Claude)提问:

“列出上个月加入的所有客户。”

它可能会生成如下 SQL:

SELECT * FROM users WHERE signup_date >= '2025-09-01';

这看起来似乎正确,但问题在于:你的数据库中可能根本没有名为 users 的表。实际的表名可能是 customer_datacrm_clients

这就是核心问题:当缺乏具体上下文时,LLM 容易产生“幻觉”。它会猜测表名、遗漏 JOIN 关系或误用列名,因为它默认并不知晓你的特定数据库结构。

RAG 如何解决此问题

RAG 通过让模型基于“真实、可检索的知识”进行推理,使其“脚踏实地”。

在一个 Text-to-SQL 的 RAG 流程中,会发生以下步骤:
1. 检索模式上下文:系统在你的模式索引(如向量数据库)中搜索相关的表名、列描述、关系等信息。
2. 生成 SQL:LLM 同时接收用户的自然语言问题和检索到的模式上下文,据此生成有效的 SQL 查询。
3. 执行并返回结果:生成的查询经过校验后,在真实数据库中执行,并将结果返回给用户。

这确保了每个 SQL 查询都基于“了解模式后的推理”,而非模型的凭空猜测。

RAG 流程概览

一个简化的 RAG 工作流循环如下:

[用户提问]
      ↓
[检索模式信息]
      ↓
[生成 SQL (LLM)]
      ↓
[校验 + 执行]
      ↓
[返回结果]
      ↺

每一轮交互都确保模型在生成查询前,拥有“最新且准确”的模式知识,从而减少幻觉并提升可靠性。

系统架构

理解了 Text-to-SQL 为何依赖 RAG 方法后,我们来剖析系统的内部架构。

为了使讨论更具体,假设我们正在构建一个聊天机器人,用于回答关于公司客户数据库的自然语言问题,例如“上个月谁加入了?”或“本季度我们的平均订单金额是多少?”。

下图展示了系统的大致结构 👇

1. SQLite 数据库——结构化数据源

每个 Text-to-SQL 系统都始于一个数据源。为简化原型开发和测试,我们使用轻量级的、基于文件的 SQLite 数据库。

这里存放着你的真实业务数据,例如 customersordersproducts 等表。当用户提问时,我们的目标是将问题转换为能在此数据库上执行的有效 SQL 查询。

2. 嵌入层——将模式转化为向量

在模型生成 SQL 之前,它需要“理解”数据库的结构——包括表名、列名及其含义。

我们通过嵌入来实现这一点:嵌入是对文本的数字化(向量)表示,能够捕捉语义信息。借助 Hugging Face 的嵌入模型(如 all-MiniLM-L6-v2),我们将数据库模式的元数据转换为向量:

import os
import sqlite3
import hashlib
from tqdm import tqdm
from dotenv import load_dotenv
from langchain_community.embeddings.huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma

# 加载环境变量
load_dotenv()
SQLITE_PATH = os.getenv("SQLITE_PATH", "sample_db/sample.db")
CHROMA_DIR = os.getenv("CHROMA_DIR", "./chroma_persist")
EMBED_MODEL = os.getenv("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2")

# ✅ 初始化 HuggingFace 嵌入模型
embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL)

# ✅ 创建/加载 Chroma 向量存储
vectorstore = Chroma(
    collection_name="sqlite_docs",
    persist_directory=CHROMA_DIR,
    embedding_function=embeddings
)

def row_hash(values):
    """为一行数据生成唯一哈希值。"""
    return hashlib.sha256("|".join(map(str, values)).encode()).hexdigest()

def row_to_text(table, cols, row):
    """将 SQLite 数据行转换为可读的文本块。"""
    return f"Table: {table}n" + "n".join([f"{c}: {v}" for c, v in zip(cols, row)])

def index_table(conn, table):
    """将单个表索引到向量存储中。"""
    cur = conn.cursor()
    cur.execute(f"PRAGMA table_info({table});")
    cols = [c[1] for c in cur.fetchall()]
    cur.execute(f"SELECT {', '.join(cols)} FROM {table}")
    rows = cur.fetchall()

    docs, ids, metas = [], [], []
    for r in rows:
        txt = row_to_text(table, cols, r)
        pk = str(r[0])
        hid = row_hash(r)
        ids.append(f"{table}:{pk}")
        docs.append(txt)
        metas.append({"table": table, "pk": pk, "hash": hid})

    # 添加到 Chroma 向量存储
    vectorstore.add_texts(texts=docs, metadatas=metas, ids=ids)

def main():
    """主索引流程。"""
    conn = sqlite3.connect(SQLITE_PATH)
    cur = conn.cursor()
    cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
    tables = [t[0] for t in cur.fetchall()]

    for t in tqdm(tables, desc="Indexing tables"):
        index_table(conn, t)

    conn.close()
    print("索引完成,已持久化到 Chroma。")

if __name__ == "__main__":
    main()

现在,每张表和每个字段都在高维空间中拥有了向量表示,为后续的智能检索奠定了基础。

3. 向量存储(Chroma)——定位相关模式

接下来,我们将这些嵌入向量存储到向量数据库中,这里选用 Chroma。

当用户提问时,系统会:
1. 使用相同的嵌入模型将用户问题向量化。
2. 在 Chroma 中检索与问题向量“最相似”的模式元素(如表、列)。

例如,当用户询问“最近有哪些用户注册?”时,检索器可能会找到 customers.signup_datecustomers.name 等相关的模式项。这确保了模型在生成 SQL 时,只看到与之相关的数据库上下文,从而让输出“脚踏实地”。

4. LangChain 组件——系统的“大脑”

LangChain 作为框架,将各个环节串联起来,提供了用于检索、推理和 SQL 生成的模块化组件:
* 检索器:从 Chroma 向量存储中提取相关的模式片段。
* :定义执行步骤的序列,例如:检索 → 生成 → 校验 → 执行。
* LLM:基于用户问题和检索到的模式上下文,生成最终的 SQL 查询语句。

4. LangGraph 与 LangChain —— 编排 RAG 流水线

LangChain 作为核心编排框架,负责将检索、生成、验证和执行等步骤串联成一个完整的 RAG 工作流。它确保了用户问题能够顺畅地走完从自然语言到 SQL 查询结果的整个回路。

示例代码展示了关键组件的初始化,包括向量数据库、嵌入模型、大语言模型以及用于生成 SQL 的提示词模板。

from typing import TypedDict, List
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import PromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_community.embeddings.huggingface import HuggingFaceEmbeddings
from dotenv import load_dotenv
import os

load_dotenv()

# 定义工作流状态的数据结构
class RAGState(TypedDict, total=False):
    question: str
    retrieved_docs: List[str]
    generated_sql: str
    validated_sql: str
    sql_result: List[dict]
    messages: List[dict]


# 初始化向量数据库、嵌入模型和 LLM
CHROMA_DIR = os.getenv("CHROMA_DIR", "./chroma_persist")
EMBED_MODEL = os.getenv("EMBED_MODEL", "text-embedding-3-small")
LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o-mini")
TOP_K = int(os.getenv("TOP_K", "8"))

embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL)
vectordb = Chroma(persist_directory=CHROMA_DIR, embedding_function=embeddings, collection_name="sqlite_docs")
retriever = vectordb.as_retriever(search_kwargs={"k": TOP_K})

llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0)

sql_prompt = PromptTemplate.from_template("""
        You are a SQL generator. Based on the following context, generate a SINGLE READ-ONLY SQLite SELECT query (no semicolons, no multiple statements).
        Context:
        {context}

        Question:
        {question}

        Return only the SQL SELECT statement.
        """)


async def retriever_node(state: RAGState) -> RAGState:
    docs = await retriever.ainvoke(state["question"])
    state["retrieved_docs"] = [d.page_content for d in docs]
    return state



import re
from typing import Any

async def sql_generator_node(state: RAGState) -> RAGState:
    """
    Generate SQL from the retrieved documents and user question.
    Cleans LLM output, removes markdown/code fences, and ensures only SELECT statements remain.
    """
    # 1. Combine retrieved documents
    context = "nn".join(state.get("retrieved_docs", []))

    # 2. Format the prompt
    prompt_text = sql_prompt.format(context=context, question=state["question"])

    # 3. Call the LLM asynchronously
    out = await llm.ainvoke(prompt_text)

    # 4. Extract text content if output is an AIMessage or ChatResult
    if hasattr(out, "content"):
        out = out.content
    out = str(out).strip()

    # 5. Remove code fences ``` or ```sql and any leading/trailing whitespace
    out = re.sub(r"```(?:sql)?n?", "", out, flags=re.IGNORECASE).replace("```", "").strip()

    # 6. Ensure the SQL starts with SELECT (case-insensitive)
    match = re.search(r"(selectb.*)", out, flags=re.IGNORECASE | re.DOTALL)
    if match:
        out = match.group(1).strip()
    else:
        # fallback if no SELECT found
        out = ""

    # 7. Optional: remove trailing semicolon if present
    out = out.rstrip(";").strip()

    # 8. Save cleaned SQL back to state
    state["generated_sql"] = out
    return state

5. FastAPI 后端——封装流水线

为了让整个系统能够通过 API 交互,我们使用 FastAPI 将 RAG 流水线封装起来。FastAPI 以其简洁和高性能的特性,能够高效地将内部处理流程暴露为 RESTful 端点。

当用户通过 POST 请求提交一个自然语言问题时,后端 API 会依次执行以下步骤:
1. 检索:根据问题检索相关的数据库 Schema 信息。
2. 生成与校验:利用检索到的上下文生成 SQL 语句,并进行安全性和语法校验。
3. 执行:在 SQLite 数据库中执行校验通过的 SQL 查询。
4. 返回:将查询结果格式化为 JSON 返回给前端。

这一层是整个 Text-to-SQL 聊天机器人的核心“引擎室”。

import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from dotenv import load_dotenv
from server.langgraph_nodes import retriever_node, sql_generator_node
from server.sql_validator import validate_sql
from server.executor import execute_sql
from server.utils import allowed_tables_from_db

load_dotenv()
SQLITE_PATH = os.getenv("SQLITE_PATH", "sample_db/sample.db")
ALLOWED_TABLES = allowed_tables_from_db(SQLITE_PATH)

app = FastAPI(title="RAG Text->SQL API")

class QueryRequest(BaseModel):
    question: str
    show_sql: bool = True

@app.post("/query")
async def query(req: QueryRequest):
    state = {"question": req.question, "messages": []}
    # 1. retrieve
    state = await retriever_node(state)
    # 2. generate SQL
    state = await sql_generator_node(state)
    sql = state["generated_sql"]
    ok, reason = validate_sql(sql, ALLOWED_TABLES)
    if not ok:
        raise HTTPException(status_code=400, detail=f"SQL validation failed: {reason}. SQL: {sql}")
    # 3. execute
    cols, rows = execute_sql(sql)
    # format result rows
    result = [dict(zip(cols, r)) for r in rows]
    return {"sql": sql if req.show_sql else None, "cols": cols, "rows": result}

6. Streamlit 前端——交互式聊天界面

最后,我们使用 Streamlit 构建一个轻量级的前端 Web 界面,为用户提供直观的交互入口。在这个界面中,用户可以:
* 输入自然语言问题。
* 选择是否查看后端生成的 SQL 语句。
* 实时查看查询返回的结构化数据结果。

Streamlit 以其简单易用的特性,非常适合快速构建 AI 应用的原型界面。只需少量 Python 代码,即可获得一个能够“与数据对话”的交互式仪表盘。

构建智能数据库对话助手:基于RAG的Text-to-SQL聊天机器人实战

端到端架构整合

整个系统的数据流和组件交互如下图所示,清晰地展示了一条查询从用户输入到最终结果返回的完整生命周期。

User (Streamlit UI)
        ↓
FastAPI Backend
        ↓
LangChain RAG Pipeline
   ├── Retriever (Chroma Vector DB)
   ├── Embedding Layer (Hugging Face)
   ├── LLM (SQL Generator)
        ↓
SQLite Database (Execute SQL)
        ↓
Results → Back to UI

环境准备

  • 安装项目依赖:
    bash
    pip install -r requirements.txt

SQLite 数据库准备

  • 准备一个 SQLite 数据库文件,并明确其 Schema 结构(例如包含 customersorders 等表)。

构建示例数据库

首先,我们需要一个用于演示的数据库。以下Python脚本创建了一个SQLite数据库,包含customers(客户)和orders(订单)两张表,并插入了示例数据。

import sqlite3
import os

# 创建数据库目录和文件
os.makedirs("sample_db", exist_ok=True)
DB = "sample_db/sample.db"
conn = sqlite3.connect(DB)
cur = conn.cursor()

# 创建 customers 表
cur.execute("""
CREATE TABLE IF NOT EXISTS customers (
  id INTEGER PRIMARY KEY,
  name TEXT,
  email TEXT,
  created_at TEXT
);
""")

# 创建 orders 表,并关联 customers 表
cur.execute("""
CREATE TABLE IF NOT EXISTS orders (
  id INTEGER PRIMARY KEY,
  customer_id INTEGER,
  total_amount REAL,
  status TEXT,
  created_at TEXT,
  notes TEXT,
  FOREIGN KEY(customer_id) REFERENCES customers(id)
);
""")

# 插入客户数据
customers = [
    (1, "Alice Johnson", "alice@example.com", "2024-12-01"),
    (2, "Bob Lee", "bob@example.com", "2024-12-05"),
    (3, "Carol Singh", "carol@example.com", "2024-12-10"),
    (4, "David Kim", "david.kim@example.com", "2024-12-12"),
]

# 插入订单数据
orders = [
    (1, 1, 120.50, "completed", "2025-01-03", "First order"),
    (2, 1, 15.00, "pending", "2025-01-07", "Gift wrap"),
    (3, 2, 250.00, "completed", "2025-02-10", "Bulk order"),
]

cur.executemany("INSERT OR REPLACE INTO customers VALUES (?,?,?,?)", customers)
cur.executemany("INSERT OR REPLACE INTO orders VALUES (?,?,?,?,?,?)", orders)

conn.commit()
conn.close()

将上述代码保存为 sample_db/create_sample_db.py,然后运行以下命令来创建数据库:

python sample_db/create_sample_db.py

Schema 的 Embedding 与索引

为了让AI模型能够理解数据库结构(如表、列及其含义),我们需要将Schema信息转换为机器可理解的格式。这通过Embeddings(向量嵌入)和语义索引技术实现。

什么是 Embeddings?

Embedding 是将文本(如单词、短语或句子)转换为高维空间中的数值向量(一组数字)。这种转换能捕捉文本的语义信息。例如,“客户姓名”和“用户全名”的向量在空间中会非常接近,因为它们含义相似。

通过为数据库中的表名、列名甚至样本数据创建向量表示,模型就能在用户提问时,快速检索到语义上相关的上下文信息。例如,当用户询问“显示最近注册的用户”时,系统可以匹配到 created_atsignup_date 等列。

技术栈

  • Hugging Face Embeddings: 用于将文本转换为高质量的向量。
  • Chroma Vector Store: 一个轻量级、高效的向量数据库,用于存储和检索这些向量。
  • SQLite: 作为我们被索引和查询的目标数据库。

索引脚本实现

以下脚本展示了如何将SQLite数据库的Schema和数据索引到Chroma向量数据库中。

import sqlite3
import hashlib
from tqdm import tqdm
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma

# 配置参数
EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
SQLITE_PATH = "sample_db/sample.db"
CHROMA_DIR = "./chroma_db"

# 初始化Embedding模型
embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL)

# 创建/加载Chroma向量存储
vectorstore = Chroma(
    collection_name="sqlite_docs",
    persist_directory=CHROMA_DIR,
    embedding_function=embeddings
)

def row_hash(values):
    """为数据行生成唯一哈希值。"""
    return hashlib.sha256("|".join(map(str, values)).encode()).hexdigest()

def row_to_text(table, cols, row):
    """将SQLite的一行数据转换为可读的文本片段。"""
    return f"Table: {table}n" + "n".join([f"{c}: {v}" for c, v in zip(cols, row)])

def index_table(conn, table):
    """将单个表的数据索引到向量存储中。"""
    cur = conn.cursor()
    # 获取表的列信息
    cur.execute(f"PRAGMA table_info({table});")
    cols = [c[1] for c in cur.fetchall()]
    # 获取表的所有数据
    cur.execute(f"SELECT {', '.join(cols)} FROM {table}")
    rows = cur.fetchall()

    docs, ids, metas = [], [], []
    for r in rows:
        txt = row_to_text(table, cols, r)
        pk = str(r[0])  # 假设第一列是主键
        hid = row_hash(r)
        ids.append(f"{table}:{pk}")
        docs.append(txt)
        metas.append({"table": table, "pk": pk, "hash": hid})

    # 批量添加到Chroma
    vectorstore.add_texts(texts=docs, metadatas=metas, ids=ids)

def main():
    """主索引流程。"""
    conn = sqlite3.connect(SQLITE_PATH)
    cur = conn.cursor()
    # 获取所有用户表(排除系统表)
    cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
    tables = [t[0] for t in cur.fetchall()]

    for t in tqdm(tables, desc="Indexing tables"):
        index_table(conn, t)

    conn.close()
    print("索引完成,已持久化到Chroma数据库。")

if __name__ == "__main__":
    main()

将上述代码保存为 ingestion/index_sqlite.py,然后运行以下命令来生成索引:

python ingestion/index_sqlite.py

索引流程详解

  1. 提取Schema和数据:遍历数据库中的每张表,获取其列名和所有行数据。每一行数据都被格式化为一个描述性的文本“文档”(例如:Table: customersnid: 1nname: Alice Johnsonnemail: alice@example.com)。
  2. 文本转向量:使用 HuggingFaceEmbeddings 模型,将上一步生成的每个文本片段转换为一个高维向量。语义相近的文本(如包含“客户”和“姓名”的片段)会得到相似的向量。
  3. 存入向量数据库:将这些向量及其对应的原始文本、元数据(如表名、主键值、哈希ID)一起存储到Chroma数据库中。
  4. 启用语义搜索:当用户提出自然语言问题时,系统会将问题文本转换为向量,并在Chroma中进行相似性搜索,快速找到与问题最相关的数据库Schema或数据片段。

为什么索引至关重要

如果没有对Schema进行索引,大语言模型就如同在“黑暗中摸索”——它不了解数据库中有哪些表、列以及它们的数据类型和含义。通过对Schema进行Embedding和索引,我们为模型构建了一套“语义记忆库”。在生成SQL时,模型可以动态地从这套记忆库中检索出最相关的上下文信息,从而显著提高生成SQL的准确性和可靠性。

整个索引过程的流程图如下:

SQLite Database
   ↓
Extract Tables + Columns
   ↓
Convert to Embeddings (Hugging Face)
   ↓
Store in Chroma Vector DB
   ↓
Semantic Search During Query Time

RAG 查询流程

在构建好Schema索引之后,我们就可以利用检索增强生成(RAG)技术来处理用户查询。这个流程是系统的核心:接收用户问题、检索相关Schema上下文、生成SQL、执行并返回结果。

第 1 步:检索相关 Schema

当用户提出一个问题时(例如:“显示上个月加入的所有客户”),系统首先会将这个问题文本转换为向量。然后,利用Chroma的检索器(Retriever),在之前建立的向量索引中查找语义最相关的Schema片段。

例如,对于上面的问题,检索器可能会找到与 customers 表、created_at 列相关的文档。这些检索到的上下文为模型提供了关于数据库结构的精确“备忘单”。

一个简化的检索节点函数示例如下:

async def retriever_node(state: RAGState) -> RAGState:
    # 使用检索器获取相关文档
    docs = await retriever.ainvoke(state["question"])
    # 将文档内容存入状态
    state["retrieved_docs"] = [d.page_content for d in docs]
    return state

第 2 步:通过 LLM 生成 SQL

接下来,我们将用户的原始问题(question)和上一步检索到的相关Schema上下文(context)组合在一起,构造一个提示词(Prompt),发送给大语言模型(如GPT-4、Claude或本地模型)。

提示词会明确指示模型扮演一个SQL生成器的角色,并基于给定的上下文生成一个单一的、只读的 SELECT 查询语句。

from langchain.prompts import PromptTemplate

sql_prompt = PromptTemplate.from_template("""
你是一个SQL生成器。请根据以下上下文信息,生成一个**单一的、只读的**SQLite SELECT查询语句(不要分号,不要多条语句)。

数据库上下文:
{context}

用户问题:
{question}

请只返回SQL SELECT语句。
""")

# 使用提示词和LLM生成SQL
llm_chain = sql_prompt | llm
sql_response = await llm_chain.ainvoke({"context": context, "question": question})
generated_sql = sql_response.content.strip()

通过这种方式,LLM生成的SQL不再是基于其固有知识的泛化猜测,而是紧密结合了当前数据库的具体结构,从而极大提升了准确性和适用性。

LLM 的输出会被清洗并校验,以确保其只包含纯净的 SQL 语句:

async def sql_generator_node(state: RAGState) -> RAGState:
    context = "nn".join(state.get("retrieved_docs", []))
    prompt_text = sql_prompt.format(context=context, question=state["question"])
    out = await llm.ainvoke(prompt_text)

    out = str(getattr(out, "content", out)).strip()
    out = re.sub(r"```(?:sql)?n?", "", out, flags=re.I).replace("```", "").strip()

    match = re.search(r"(selectb.*)", out, flags=re.I | re.DOTALL)
    if match:
        out = match.group(1).rstrip(";").strip()

    state["generated_sql"] = out
    return state

至此,聊天机器人已能实现“自然语言输入,SQL 输出”的核心功能。

第 3 步:SQL 安全校验

在执行查询前,必须进行安全验证以保护数据库并确保查询的正确性。

我们使用 sqlglot 库来实现以下校验:
* 拒绝任何非 SELECT 语句(如 DELETEUPDATE 等)。
* 检查 SQL 语句是否只引用了允许访问的表。
* 安全地解析 SQL 语法。

def validate_sql(sql: str, allowed_tables: Set[str]) -> Tuple[bool, str]:
    if ";" in sql:
        return False, "semicolon or multiple statements not allowed"
    for kw in DISALLOWED:
        if f" {kw} " in f" {sql.lower()} ":
            return False, f"disallowed keyword: {kw}"

    try:
        parsed = sqlglot.parse_one(sql, read="sqlite")
    except Exception as e:
        return False, f"sql parse error: {e}"

    if parsed.key.lower() not in ALLOWED_STATEMENTS:
        return False, "only SELECT statements allowed"

    tables = extract_tables(sql)
    if not tables.issubset(allowed_tables):
        return False, f"disallowed tables used: {tables - allowed_tables}"
    return True, "ok"

这一校验层确保了系统是“只读且安全”的。

第 4 步:在 SQLite 中执行查询

通过校验的 SQL 语句将在 SQLite 数据库中执行。我们还添加了行数限制等保护措施,以避免执行负载过重的查询。

def execute_sql(sql: str, row_limit: int = 1000, timeout=5.0) -> Tuple[List[str], List[Tuple[Any]]]:
    sql = enforce_limit(sql, row_limit)
    conn = open_ro_conn()
    conn.execute(f"PRAGMA busy_timeout = {int(timeout*1000)};")
    cur = conn.cursor()
    cur.execute(sql)
    cols = [c[0] for c in cur.description] if cur.description else []
    rows = cur.fetchmany(row_limit)
    conn.close()
    return cols, rows

执行结果随后被封装成 JSON 格式返回给用户。

FastAPI 后端

当 RAG 管道构建并测试完成后,下一步是将其“封装为 API”,以便用户和前端能够实时发送问题、获取 SQL 结果并进行交互。

FastAPI 非常适合此类需求——它快速、类型安全、原生支持异步,是服务 AI 工作流的理想选择。

为什么选择 FastAPI?

FastAPI 提供以下优势:
* 高性能:基于异步 I/O,并自动生成交互式 API 文档(Swagger / ReDoc)。
* 易于集成:便于与 LangChain、Chroma 或本地模型对接。
* 数据校验:通过 Pydantic 确保输入参数符合规范。
* 可扩展性:可轻松部署到 Docker、Serverless 或本地私有化环境。

简而言之,它是封装 RAG Text-to-SQL 逻辑的理想框架。

API 设计概述

我们暴露一个主要的端点:

POST /query

请求体示例:

{
  "question": "Show me all customers who joined last month",
  "show_sql": true
}

响应示例:

{
  "sql": "SELECT * FROM customers WHERE join_date >= '2025-09-01'",
  "cols": ["id", "name", "join_date"],
  "rows": [
    {"id": 1, "name": "Alice", "join_date": "2025-09-05"},
    {"id": 2, "name": "Bob", "join_date": "2025-09-12"}
  ]
}

完整代码示例:

import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from dotenv import load_dotenv

# 导入核心 RAG 组件
from server.langgraph_nodes import retriever_node, sql_generator_node
from server.sql_validator import validate_sql
from server.executor import execute_sql
from server.utils import allowed_tables_from_db

# 加载环境变量
load_dotenv()
SQLITE_PATH = os.getenv("SQLITE_PATH", "sample_db/sample.db")
ALLOWED_TABLES = allowed_tables_from_db(SQLITE_PATH)

app = FastAPI(title="RAG Text->SQL API")

class QueryRequest(BaseModel):
    question: str
    show_sql: bool = True

@app.post("/query")
async def query(req: QueryRequest):
    # 初始化对话状态
    state = {"question": req.question, "messages": []}

    # 1️⃣ 检索相关的数据库 Schema 信息
    state = await retriever_node(state)

    # 2️⃣ 使用 LLM 生成 SQL 查询
    state = await sql_generator_node(state)
    sql = state["generated_sql"]

    # 3️⃣ 验证 SQL 安全性
    ok, reason = validate_sql(sql, ALLOWED_TABLES)
    if not ok:
        raise HTTPException(status_code=400, detail=f"SQL validation failed: {reason}. SQL: {sql}")

    # 4️⃣ 安全地执行 SQL
    cols, rows = execute_sql(sql)
    result = [dict(zip(cols, r)) for r in rows]

    # 5️⃣ 返回 JSON 响应
    return {
        "sql": sql if req.show_sql else None,
        "cols": cols,
        "rows": result
    }

处理流程分解

  1. 接收问题:用户通过 POST 请求发送自然语言问题。
  2. 检索 Schema 上下文retriever_node 从 Chroma 向量数据库中检索最相关的表和列信息。
  3. 生成 SQLsql_generator_node 利用检索到的上下文和用户问题,生成 SELECT 语句。
  4. 安全校验validate_sql() 检查查询格式,并确保只访问允许的表。
  5. 执行并返回execute_sql() 在只读模式的 SQLite 数据库上执行查询,并返回格式化的结果。

设计优势

  • 模块化:检索、生成、校验、执行等组件均可独立替换或扩展。
  • 安全性:通过白名单机制,仅允许执行 SELECT 语句。
  • 异步支持:整个管道支持并发运行,易于根据用户负载进行扩展。
  • 易于部署:可轻松打包为 Docker 镜像,快速部署到云平台(如 AWS/GCP)或 Hugging Face Spaces。
[用户 / Streamlit 前端]
        ↓ (POST /query)
    FastAPI 后端
  ├── retriever_node()  →  Chroma 向量数据库
  ├── sql_generator_node()  →  LLM (OpenAI / Gemini / Mistral)
  ├── validate_sql()  →  SQLGlot 解析器
  └── execute_sql()  →  SQLite 数据库
        ↓
     JSON 响应

这种模块化的架构使得后端系统健壮、透明,并具备生产就绪度。

Streamlit 前端

在 FastAPI 后端运行起来之后,我们可以为其构建一个“可交互”的 Web 应用界面。目标是创建一个简单的网页,让任何用户都能输入自然语言问题、查看生成的 SQL 语句,并实时看到查询结果——在几秒钟内完成整个交互闭环。

完整代码

import streamlit as st
import requests

API_URL = "http://localhost:8000/query"

st.set_page_config(page_title="RAG Text→SQL Demo", layout="centered")
st.title("RAG Text → SQL (SQLite replica)")

with st.form("query_form"):
    question = st.text_input(
        "Ask a natural language question (about the DB)",
        value="Show total orders per customer"
    )
    show_sql = st.checkbox("Show generated SQL", value=True)
    submitted = st.form_submit_button("Submit")

if submitted:
    # Ensure types are correct
    question = str(question).strip()
    show_sql = bool(show_sql)

    if not question:
        st.warning("Please enter a question.")
    else:
        payload = {"question": question, "show_sql": show_sql}

        with st.spinner("Querying..."):
            try:
                resp = requests.post(API_URL, json=payload, timeout=60)
                resp.raise_for_status()
                data = resp.json()

                if show_sql:
                    st.subheader("🧠 Generated SQL")
                    st.code(data.get("sql", ""), language="sql")

                st.subheader("📊  Query Results")
                rows = data.get("rows", [])
                if rows:
                    st.dataframe(rows)
                else:
                    st.info("No rows returned for this query.")

            except requests.exceptions.HTTPError as http_err:
                st.error(f"HTTP error occurred: {http_err} - {resp.text}")
            except requests.exceptions.ConnectionError:
                st.error("Could not connect to the API. Make sure FastAPI is running on localhost:8000.")
            except requests.exceptions.Timeout:
                st.error("Request timed out. Try again later.")
            except Exception as e:
                st.error(f"Unexpected error: {e}")

工作原理

  1. 用户输入:用户在 Streamlit 文本框中输入自然语言问题。
  2. 表单提交:Streamlit 将问题及 show_sql 标志作为 JSON 负载,发送至 FastAPI 的 /query 端点。
  3. 后端处理:FastAPI 后端运行完整的 RAG 管道:
    • 检索相关的数据库 schema 片段。
    • 利用 LLM 生成 SQL 查询。
    • 在 SQLite 数据库上校验并执行生成的 SQL。
  4. 结果展示:Streamlit 接收 JSON 响应后:
    • show_sql=True 时,显示生成的 SQL 代码。
    • 使用交互式表格展示查询结果。
+----------------------------------------------------------+
|  🧠 RAG Text → SQL (SQLite Replica)                      |
|  ------------------------------------------------------  |
|  Ask a natural language question:                        |
|  [ Show total orders per customer             ] [Submit] |
|                                                          |
|  🧠 Generated SQL                                        |
|  SELECT customer_id, COUNT(*) AS total_orders            |
|  FROM orders GROUP BY customer_id                        |
|                                                          |
|  📊  Query Results                                       |
|  ┌──────────────┬───────────────┐                        |
|  │ customer_id  │ total_orders  │                        |
|  ├──────────────┼───────────────┤                        |
|  │ 1            │ 12            │                        |
|  │ 2            │ 8             │                        |
|  └──────────────┴───────────────┘                        |
+----------------------------------------------------------+

这为用户构建了一个“即时反馈”闭环——从自然语言输入到 SQL 生成,再到数据可视化。

运行应用

# 终端 1:启动 FastAPI 后端
uvicorn main:app --reload
# 终端 2:启动 Streamlit 前端
streamlit run app.py

演示

问题:Show total orders per customer

构建智能数据库对话助手:基于RAG的Text-to-SQL聊天机器人实战

问题:Total revenue from completed orders

构建智能数据库对话助手:基于RAG的Text-to-SQL聊天机器人实战

下一步:增量 Embeddings 与可扩展更新

至此,你已经构建了一个完整的 RAG 驱动 Text-to-SQL 管道,涵盖了从 schema 嵌入与检索到 SQL 生成、校验和执行的完整流程。然而,在真实生产环境中,数据库是持续演变的:表结构会变更、新记录会插入、列定义也会随时间调整。

那么,如何确保嵌入索引(以及整个 RAG 系统)与数据库保持同步呢?

以下是将项目推向生产环境的一些关键方向:

1. 处理动态数据库

在原型中,我们在系统启动时仅对数据库进行一次全量索引。这对于静态数据集是可行的,但生产系统需要在以下场景中支持“增量嵌入更新”:
* 表结构变更(新增或删除列)。
* 新增对语义有实质性影响的数据行。
* schema 文档或元数据发生变化。

核心策略是仅嵌入发生变化的部分,而非每次都进行全量重建。

增量索引策略

  • 变更追踪:使用时间戳或行哈希值来追踪数据库的更新。
  • 元数据比对:将变更与向量数据库(如 Chroma)中已存储的元数据(例如哈希 ID)进行比较。
  • 定向重嵌:仅对发生修改的数据行或新增的表进行重新嵌入。
  • 增量更新:利用向量数据库(如 Chroma 的 add_texts() API)持久化地增量更新索引。

这种方法能最大限度地降低计算成本、缩短更新时间并减少冗余,在处理大规模数据时尤为重要。

2. 使用后台任务实现自动化

重新索引或更新嵌入向量的操作不应阻塞用户的实时查询。可以将这些操作委托给后台任务处理:
* Celery:配合 Redis 或 RabbitMQ,实现分布式任务调度。
* FastAPI BackgroundTasks:利用 FastAPI 内置的后台任务功能进行轻量级的异步更新。

以下是使用 FastAPI BackgroundTasks 的示例:

from fastapi import BackgroundTasks

@app.post("/update_embeddings")
async def update_embeddings(background_tasks: BackgroundTasks):
    background_tasks.add_task(reindex_changed_tables)
    return {"status": "update scheduled"}

通过这种方式,主 API(如 /query)可以保持高响应性,而索引更新则在后台异步执行。

3. 未来增强方向

  • 模型微调:基于历史查询日志,对 SQL 生成模型进行微调,使其更贴合特定业务场景。
  • 查询缓存:为常见问题添加缓存层,提升响应速度并降低 API 调用成本。
  • 高级可视化:在 Streamlit 前端集成图表库,动态地将查询结果转化为可视化图表。
  • 扩展向量数据库:面向更大规模的数据集,可迁移至 Pinecone、Qdrant 等专为生产环境设计的向量数据库。
  • 反馈闭环:允许用户对生成的 SQL 进行纠正,并将这些反馈用于持续改进模型。

这些改进将推动你的系统向一个“具备自我学习能力”的数据助手演进,使其能够更好地理解不断演进的数据库 schema 和业务逻辑。

关键收获

你已经成功构建了一个强大的、基于 RAG 的完整 Text-to-SQL 系统,能够将自然语言转化为可操作的数据库洞察。回顾整个项目,你取得了以下成果:

1. 掌握了用于 Text-to-SQL 的 RAG 技术

你学会了如何运用 RAG 来弥合自然语言与结构化数据库 schema 之间的鸿沟,从而显著提升 SQL 生成的准确性并减少“幻觉”。
* 将数据库 schema 和样本数据行转化为嵌入向量,为 LLM 提供上下文。
* 在生成查询前,先检索最相关的 schema 片段。
* 将检索增强与 LLM 推理相结合,产生可靠的 SQL 输出。

2. 构建了模块化、可扩展的架构

你使用 LangChain 和 FastAPI 搭建了一个具备生产级结构的 AI 服务:
* 数据层:SQLite 作为结构化数据源。
* 检索层:Hugging Face Embeddings + Chroma 实现语义检索。
* 编排层:LangChain 协调检索器、提示模板和 LLM。
* 服务层:FastAPI 提供轻量、异步的 RESTful 后端 API。
* 交互层:Streamlit 提供简洁、友好的前端界面。

各层之间高度解耦,允许你独立替换模型或数据库,而不会破坏其他组件。

3. 部署了交互式聊天界面

你使用 Streamlit 将所有功能封装成一个直观的应用,使用户能够:
* 直接输入自然语言问题。
* 查看系统生成的 SQL 代码(可选)。
* 即时在动态、交互式的数据表格中查看查询结果。

这本质上是将原本属于开发者的“编写 SQL”工作,转变为一种“与数据进行自然对话”的体验。

最后的思考

本项目展示了我们与数据交互方式的一次深刻变革:人工智能正在使数据库访问民主化,让任何不具备 SQL 专业知识的人都能提出复杂的数据问题并获取洞察。

通过结合检索增强生成(RAG)、现代嵌入技术与对话式界面,我们构建的工具旨在将数据能力直接赋予用户,使数据洞察的获取更快速、便捷且易于触及。

我们期待听到您的反馈、想法,以及您在构建 Text-to-SQL 系统过程中的实践经验。

您可以在 GitHub 查看完整项目代码并立即开始实践:https://github.com/dharampatel/ConvertQueryToSQL/tree/master


关注“鲸栖”小程序,掌握最新AI资讯

本文由鲸栖原创发布,未经许可,请勿转载。转载请注明出处:http://www.itsolotime.com/archives/13704

(0)
上一篇 2025年11月4日 上午8:07
下一篇 2025年11月4日 上午11:15

相关推荐

  • Context Window终极掌控指南:如何避免AI编码代理的“健忘症”与性能下滑

    Context Window 终极掌控指南 关于AI编码代理(coding agents)的讨论往往两极分化。一方认为“AI编码糟透了,我试过,没用”,另一方则反驳“不,是你用错了,这是技能问题”。 双方都有一定道理。但对于大多数开发者而言,在使用AI编码代理时最容易“翻车”的技能问题,往往源于对Context Window的理解不足——这是决定编码代理如何…

    2025年11月11日
    300
  • Python开发者必备:12个能解决大问题的小型库

    小工具,大作用。 Python 工具带:12 个能解决大问题的小型库 发现一打容易被忽视的 Python 库,它们安静地让开发更顺滑、更高效、更聪明——一次优雅的 import 就够。 如果你是有经验的 Python 开发者,你的工具箱里可能已经装满了 requests、pandas、flask 和 numpy 这样的“大腕”。但在这些明星库之下,还隐藏着一…

    2025年12月4日
    300
  • 实战指南:基于LangChain与FastAPI构建实时多工具AI智能体

    构建一个可用于生产的、工具增强型 LLM Agent,使其具备 Token 流式输出、代码执行、搜索能力,并利用 FastAPI 实现高性能 API 服务。 ChatGPT 的出现带来了震撼的体验,但开发者很快开始思考:如何超越“聊天”本身?我们能否构建一个能够实时推理、联网搜索、执行代码、查询数据,并像人类打字一样流式响应的智能体? 答案是肯定的。通过结合…

    2025年12月13日
    200
  • Vision Agents:开源框架革新实时视频AI,构建多模态智能体的终极解决方案

    如果你曾尝试构建一个能够“看见”、“听见”并即时“响应”的实时 AI 系统,就会知道其技术栈有多么复杂。 视频需要一个 SDK。 语音需要另一个。 目标检测需要另一个。 大语言模型(LLM)还需要一个。 之后,你仍需将所有组件集成起来,处理延迟问题,并设法让整个系统实时运行。 Vision Agents 改变了这一切。 这是一个开源框架,旨在帮助开发者构建能…

    4天前
    200
  • 从Jupyter到Web应用:用Python、FastAPI与LangChain构建可部署的AI工具

    从Jupyter到Web应用:用Python、FastAPI与LangChain构建可部署的AI工具(第1/2部分) 为何需要将AI脚本转化为Web应用 在Jupyter Notebook中成功验证一个AI模型(如问答或文本摘要)后,其价值往往受限于本地环境。团队无法协作,用户无法访问,模型的价值难以释放。 核心在于:AI的价值不仅在于模型本身,更在于其可访…

    2025年11月30日
    200

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注