构建可自我进化的Agentic RAG系统:从医疗健康领域实践到通用设计模式

Agentic RAG 系统可以被视为一个高维度的决策空间,其中每个维度都对应一项关键设计选择,例如提示工程、智能体协同机制或检索策略。手动调整这些维度以找到最优组合不仅极其困难,而且系统上线后遇到的未知数据也常常会打破在测试环境中有效的配置。

因此,一个更优的解决方案是让系统具备“自我优化”的能力。一条典型的、可自我进化的 Agentic RAG 流水线遵循以下逻辑:

构建可自我进化的Agentic RAG系统:从医疗健康领域实践到通用设计模式

Self Improving Agentic RAG System (Created by Fareed Khan)

  • 专家团队执行任务:一个由“专家型智能体”组成的协作团队,依据当前的标准作业程序,从多源信息中生成一份完整的文档。
  • 多维性能评估:一个“多维评价系统”对团队输出进行评分,从准确性、可行性、合规性等多个目标维度生成一个性能向量。
  • 诊断根因:一个“性能诊断智能体”分析该性能向量,像咨询顾问一样识别工作流中的主要薄弱环节并追溯其根本原因。
  • 流程迭代更新:一个“SOP 架构智能体”根据诊断洞察更新标准作业程序,提出专门用于修复已识别薄弱点的新变体。
  • 新版本测试:每个“新版本 SOP”都会在团队重复执行任务时进行测试,其输出再次被评估,生成对应的性能向量。
  • 帕累托优化与决策:系统识别所有已测试 SOP 中的“帕累托前沿”,即最优权衡组合,并将这些优化策略呈现给“人类决策者”,从而形成一个完整的进化闭环。

本文将聚焦于“医疗健康”领域。该领域的核心挑战在于:系统需要为输入查询或知识库考虑“多种可能性”,同时确保“最终决策权仍由人类掌握”。

我们将构建一条端到端、可自我改进的 Agentic RAG 流水线,用于生成 RAG 系统的不同设计模式。

完整代码可在 GitHub 仓库获取:

GitHub – FareedKhan-dev/autonomous-agentic-rag: Self improving agentic rag pipeline

目录

  • 医学 AI 的知识基础设施
    • 安装开源技术栈
    • 环境配置与依赖导入
    • 配置本地大语言模型
    • 准备知识库
  • 构建内部临床试验设计网络
    • 定义标准操作规程
    • 定义专业智能体
    • 使用 LangGraph 编排工作流
    • 完整运行工作流图
  • 多维度评价体系
    • 为每个参数构建自定义评估器
    • 创建聚合型 LangSmith 评估器
  • 进化引擎的外层循环
    • 管理配置
    • 构建主任级智能体
    • 运行完整的进化循环
  • 基于五维的帕累托分析
    • 识别帕累托前沿
    • 可视化前沿并做出决策
  • 理解认知工作流
    • 可视化智能体工作流时间线
    • 使用雷达图剖析输出结果
  • 将其转变为自主策略

医学 AI 的知识基础设施

在构建可自我进化的 Agentic RAG 系统之前,首先需要建立高质量的知识数据库,并搭建必要的技术架构。

一套生产级 RAG 系统通常包含多样化的数据源,既有敏感的内部组织数据,也包含开源数据,用以提升检索质量并弥补信息过时或不完整的问题。这一基础步骤至关重要……

因为数据源的质量将直接决定最终输出的质量。

构建可自我进化的Agentic RAG系统:从医疗健康领域实践到通用设计模式

Sourcing the knowledge base (Created by Fareed Khan)

本节我们将逐步组装系统的各个核心组件,计划如下:

  • 安装开源技术栈:搭建环境并安装必要的库,坚持本地、开源优先的原则。
  • 配置安全可观测性:安全加载 API 密钥,并配置 LangSmith,以便从一开始就能追踪和调试复杂的智能体交互。
  • 搭建本地 LLM 工坊:通过 Ollama 构建不同的开源模型组合,为不同任务匹配合适的模型,以优化性能与成本。
  • 获取并处理多模态数据:下载并准备四类真实数据源:PubMed 科学文献、FDA 监管指南、伦理原则,以及一个大型结构化临床数据集。
  • 索引知识库:最终,将原始数据处理为高效可检索的数据库:对非结构化文本使用 FAISS 向量库,对结构化临床数据使用 DuckDB。

安装开源技术栈

第一步是安装所需的 Python 库。一个可复现的环境是所有严肃项目的基石。我们选择业界标准的开源技术栈,以便对系统进行完全掌控。这包括用于核心智能体框架的 LangChain 和 LangGraph、与本地 LLM 交互的 Ollama,以及访问 PubMed 的 Biopython、进行高性能临床数据分析的 DuckDB 等专业库。

让我们安装所有必需的模块:

# 使用 pip 的“安静”和“升级”标志来安装所有必需的包。
# - langchain, langgraph 等:这些构成了我们构建和编排智能体的核心框架。
# - ollama:这个客户端库允许我们的 Python 代码与本地运行的 Ollama 服务器通信。
# - duckdb:一个极其快速、进程内的分析数据库,非常适合处理我们的结构化临床数据,无需繁重的服务器设置。
# - faiss-cpu:Facebook AI 的高效相似性搜索库,将为我们的 RAG 智能体提供向量存储支持。
# - sentence-transformers:一个便于使用最先进文本嵌入模型的库。
# - biopython, pypdf, beautifulsoup4:一套强大的工具集,用于下载和解析我们多样化的真实世界数据源。
%pip install -U langchain langgraph langchain_community langchain_openai langchain_core ollama pandas duckdb faiss-cpu sentence-transformers biopython pypdf pydantic lxml html2text beautifulsoup4 matplotlib -qqq

我们一次性准备好所有工具和“建筑材料”。每个库各司其职:从用 LangGraph 编排智能体工作流,到用 DuckDB 进行数据分析。

模块安装完成后,让我们逐一初始化它们。

环境配置与依赖导入

我们需要安全地配置环境。将 API 密钥硬编码在代码中既存在安全风险,也不利于代码共享。

我们使用 .env 文件来管理敏感信息,主要是 LangSmith 的 API 密钥。从一开始就配置 LangSmith 是必不可少的要求,它将为我们提供深度可观测性,以跟踪、调试并理解智能体之间的复杂交互。代码如下:

import os
import getpass
from dotenv import load_dotenv

# 这个来自 python-dotenv 库的函数会搜索 .env 文件,并将其中的键值对加载到操作系统的环境变量中,使脚本可以访问它们。
load_dotenv()

# 这是一个关键检查。我们验证脚本是否能够从环境中访问必要的 API 密钥。
if "LANGCHAIN_API_KEY" not in os.environ or "ENTREZ_EMAIL" not in os.environ:
    # 如果密钥缺失,我们打印错误信息并停止,因为应用无法继续运行。
    print("Required environment variables not set. Please set them in your .env file or environment.")
else:
    # 这个确认信息告诉我们,我们的密钥已安全加载并准备就绪。
    print("Environment variables loaded successfully.")

# 我们显式设置 LangSmith 项目名称。这是一个最佳实践,确保本项目生成的所有追踪记录
# 都能在 LangSmith 用户界面中自动分组在一起,便于分析。
os.environ["LANGCHAIN_PROJECT"] = "AI_Clinical_Trials_Architect"

load_dotenv() 是敏感凭据与代码之间的一座“安全桥梁”。它读取 .env 文件(切勿提交到版本库),并将密钥注入环境。

从现在起,我们使用 LangChain 或 LangGraph 的所有操作都会自动被采集并发送到指定的 LangSmith 项目中。

配置本地大语言模型

在生产级智能体系统中,“一刀切”的模型策略往往不是最佳选择。大型前沿模型计算开销大且速度慢,将其用于简单任务会浪费资源(尤其是在自托管 GPU 时)。然而,小型模型虽然快速,却可能缺乏执行关键决策所需的深度推理能力。

构建可自我进化的Agentic RAG系统:从医疗健康领域实践到通用设计模式

Configuring Local LLMs (Created by Fareed Khan)

关键在于将“合适的模型放在系统的合适位置”。我们将构建一个多模型组合(均由 Ollama 在本地服务,以确保隐私、可控性和成本效益),让每个模型在其擅长的特定角色中发挥所长。

首先,定义一个配置字典来集中管理所有选定的模型客户端,这便于后续的替换和统一管理。

from langchain_community.chat_models import ChatOllama
from langchain_community.embeddings import OllamaEmbeddings

# 此字典将作为所有LLM和嵌入模型客户端的中央注册表或“工厂”。
llm_config = {
    # 对于‘planner’,我们使用 Llama 3.1 8B。这是一个现代、能力强大的模型,擅长遵循指令。
    # 我们设置 `format='json'` 以利用 Ollama 内置的 JSON 模式,确保为这个关键任务提供可靠的结构化输出。
    "planner": ChatOllama(model="llama3.1:8b-instruct", temperature=0.0, format='json'),

    # 对于‘drafter’和‘sql_coder’,我们使用 Qwen2 7B。这是一个敏捷快速的模型,
    # 非常适合文本生成和代码补全等重视速度的任务。
    "drafter": ChatOllama(model="qwen2:7b", temperature=0.2),
    "sql_coder": ChatOllama(model="qwen2:7b", temperature=0.0),

    # 对于最高级别的策略代理‘director’,我们使用强大的 Llama 3 70B 模型。
    # 诊断性能和演进系统自身流程这一高风险任务,证明了使用更大、更强大模型的合理性。
    "director": ChatOllama(model="llama3:70b", temperature=0.0, format='json'),
    # 对于嵌入模型,我们使用‘nomic-embed-text’,这是一个顶级的、高效的开源模型。
    "embedding_model": OllamaEmbeddings(model="nomic-embed-text")
}

我们创建的 llm_config 字典,作为所有模型初始化的“中央枢纽”。通过为不同角色分配不同模型,我们构建了一套基于成本-性能权衡优化的层次结构。

  • 快速灵巧(7B–8B)plannerdraftersql_coder 处理频繁且定义清晰的任务。使用 Qwen2 7B 和 Llama 3.1 8B 能在保证低延迟和高性价比的同时,提供足够的指令跟随能力来生成计划、撰写文本或编写 SQL。
  • 深度策略(70B)director 需要分析多维性能数据并重写整个 SOP(标准操作程序),这要求较强的因果推理和全局理解能力。为这种“低频高风险”的任务分配 Llama 3 70B 是合理的。

可以打印配置以确认初始化状态:

# 打印配置以确认客户端已初始化且参数设置正确。
print("LLM clients configured:")
print(f"Planner ({llm_config['planner'].model}): {llm_config['planner']}")
print(f"Drafter ({llm_config['drafter'].model}): {llm_config['drafter']}")
print(f"SQL Coder ({llm_config['sql_coder'].model}): {llm_config['sql_coder']}")
print(f"Director ({llm_config['director'].model}): {llm_config['director']}")
print(f"Embedding Model ({llm_config['embedding_model'].model}): {llm_config['embedding_model']}")

输出示例如下:

#### OUTPUT ####
LLM clients configured:
Planner (llama3.1:8b-instruct): ChatOllama(model='llama3.1:8b-instruct', temperature=0.0, format='json')
Drafter (qwen2:7b): ChatOllama(model='qwen2:7b', temperature=0.2)
SQL Coder (qwen2:7b): ChatOllama(model='qwen2:7b', temperature=0.0)
Director (llama3:70b): ChatOllama(model='llama3:70b', temperature=0.0, format='json')
Embedding Model (nomic-embed-text): OllamaEmbeddings(model='nomic-embed-text')

这表明 ChatOllamaOllamaEmbeddings 客户端已按指定的模型和参数成功初始化。接下来,我们将连接知识库。

准备知识库

RAG 系统的“灵魂”在于一套丰富的多模态知识基座。面对临床试验设计这样的专业任务,通用的网页搜索远远不够。我们需要以权威、领域特定的信息作为根基。

构建可自我进化的Agentic RAG系统:从医疗健康领域实践到通用设计模式

Knowledge store creation (Created by Fareed Khan)

为此,我们将构建一个全面的“知识库”,从四类真实世界数据源中采集、下载并处理内容。多源信息融合对于帮助智能体进行综合判断至关重要,能使最终输出更加全面可靠。

首先,创建数据目录以组织文件:

import os

# 一个字典,用于存放不同类型数据的路径。这有助于保持文件管理的清晰和集中。
data_paths = {
    "base": "./data",
    "pubmed": "./data/pubmed_articles",
    "fda": "./data/fda_guidelines",
    "ethics": "./data/ethical_guidelines",
    "mimic": "./data/mimic_db"
}
# 遍历定义的路径,使用 os.makedirs() 创建所有尚不存在的目录。
# 这可以防止后续步骤中尝试向这些位置保存文件时出错。
for path in data_paths.values():
    if not os.path.exists(path):
        os.makedirs(path)
        print(f"Created directory: {path}")

这确保了项目从一开始就拥有一个干净、组织良好的文件结构。

接着,我们从 PubMed 获取真实的医学文献,为 Medical Researcher 提供核心知识。以下函数演示了如何通过 NCBI 的 Entrez API 下载文献摘要:

from Bio import Entrez
from Bio import Medline
import os

def download_pubmed_articles(query, max_articles=20):
    """
    根据查询从 PubMed 获取摘要,并保存为文本文件。
    NCBI API 要求提供邮箱进行身份识别,我们从环境变量中获取。
    """
    Entrez.email = os.environ.get("ENTREZ_EMAIL")
    print(f"Fetching PubMed articles for query: {query}")

    # 步骤 1:使用 Entrez.esearch 查找匹配查询的 PMIDs
    handle = Entrez.esearch(db="pubmed", term=query, retmax=max_articles, sort="relevance")
    record = Entrez.read(handle)
    id_list = record["IdList"]
    print(f"Found {len(id_list)} article IDs.")

    print("Downloading articles...")
    # 步骤 2:使用 Entrez.efetch 获取 MEDLINE 格式的完整记录
    handle = Entrez.efetch(db="pubmed", id=id_list, rettype="medline", retmode="text")
    records = Medline.parse(handle)

    count = 0
    # 步骤 3:遍历记录,解析并保存每篇摘要到文件
    for i, record in enumerate(records):
        pmid = record.get("PMID", "")
        title = record.get("TI", "No Title")
        abstract = record.get("AB", "No Abstract")
        if pmid:
            # 以 PMID 命名文件,便于引用和避免重复
            filepath = os.path.join(data_paths["pubmed"], f"{pmid}.txt")
            with open(filepath, "w") as f:
                f.write(f"Title: {title}nnAbstract: {abstract}")
            print(f"[{i+1}/{len(id_list)}] Fetching PMID: {pmid}... Saved to {filepath}")
            count += 1
    return count

该函数按三步连接 NCBI:检索符合布尔查询的 PMID、拉取 MEDLINE 记录,并将标题与摘要保存到本地文本文件。

执行示例:

# 定义一个具体的布尔查询,以查找与试验概念高度相关的文章
pubmed_query = "(SGLT2 inhibitor) AND (type 2 diabetes) AND (renal impairment)"
num_downloaded = download_pubmed_articles(pubmed_query)
print(f"PubMed download complete. {num_downloaded} articles saved.")

示例输出:

Fetching PubMed articles for query: (SGLT2 inhibitor) AND (type 2 diabetes) AND (renal impairment)
Found 20 article IDs.
Downloading articles...
[1/20] Fetching PMID: 38810260... Saved to ./data/pubmed_articles/38810260.txt
[2/20] Fetching PMID: 38788484... Saved to ./data/pubmed_articles/38788484.txt
...
PubMed download complete. 20 articles saved.

至此,Medical Researcher 具备了扎实、最新且领域特定的科学依据。

接下来,我们获取监管文件,供 Regulatory Specialist 使用。以下函数演示了如何从 URL 下载 PDF 并提取文本:

import requests
from pypdf import PdfReader
import io

def download_and_extract_text_from_pdf(url, output_path):
    """
    从 URL 下载 PDF,保存文件,并将其文本内容提取到单独的 .txt 文件中。
    """
    print(f"Downloading FDA Guideline: {url}")
    try:
        # 使用 requests 库执行 HTTP GET 请求以下载文件
        response = requests.get(url)
        response.raise_for_status()  # 如果下载失败(例如 404 错误),将引发异常

        # 保存原始 PDF 文件,便于归档
        with open(output_path, 'wb') as f:
            f.write(response.content)
        print(f"Successfully downloaded and saved to {output_path}")

        # 使用 pypdf 直接从内存中的响应内容读取 PDF
        reader = PdfReader(io.BytesIO(response.content))
        text = ""
        # 遍历 PDF 的每一页并追加提取的文本
        for page in reader.pages:
            text += page.extract_text() + "nn"

        # 将提取的干净文本保存到 .txt 文件,RAG 系统将实际使用此文件
        txt_output_path = os.path.splitext(output_path)[0] + '.txt'
        with open(txt_output_path, 'w') as f:
            f.write(text)
        return True
    except requests.exceptions.RequestException as e:
        print(f"Error downloading file: {e}")
        return False

运行以下代码下载 FDA 指南并提取文本:

# 此 URL 指向一份真实的 FDA 糖尿病药物开发指南文件
fda_url = "https://www.fda.gov/media/71185/download"
fda_pdf_path = os.path.join(data_paths["fda"], "fda_diabetes_guidance.pdf")
download_and_extract_text_from_pdf(fda_url, fda_pdf_path)

输出示例:

Downloading FDA Guideline: https://www.fda.gov/media/71185/download
Successfully downloaded and saved to ./data/fda_guidelines/fda_diabetes_guidance.pdf

现在,Regulatory Specialist 拥有了法律与监管文本的基础语料。

接着为 Ethics Specialist 准备一份精要文档(相当于 Belmont Report 的核心原则摘要),以确保其推理建立在最重要的伦理概念之上:

# 这份多行字符串包含了《贝尔蒙特报告》三大核心原则的摘要,
# 该报告是美国涉及人类受试者研究的伦理基础文件。
ethics_content = """  
Title: Summary of the Belmont Report Principles for Clinical Research  
1. Respect for Persons: This principle requires that individuals be treated as autonomous agents and that persons with diminished autonomy are entitled to protection. This translates to robust informed consent processes. Inclusion/exclusion criteria must not unduly target or coerce vulnerable populations, such as economically disadvantaged individuals, prisoners, or those with severe cognitive impairments, unless the research is directly intended to benefit that population.  
2. Beneficence: This principle involves two complementary rules: (1) do not harm and (2) maximize possible benefits and minimize possible harms. The criteria must be designed to select a population that is most likely to benefit and least likely to be harmed by the intervention. The risks to subjects must be reasonable in relation to anticipated benefits.  
3. Justice: This principle concerns the fairness of distribution of the burdens and benefits of research. The selection of research subjects must be equitable. Criteria should not be designed to exclude certain groups without a sound scientific or safety-related justification. For example, excluding participants based on race, gender, or socioeconomic status is unjust unless there is a clear rationale related to the drug's mechanism or risk profile.  
"""  

# 定义伦理文档的保存路径
ethics_path = os.path.join(data_paths["ethics"], "belmont_summary.txt")

# 以写入模式打开文件并保存内容
with open(ethics_path, "w") as f:
    f.write(ethics_content)
print(f"Created ethics guideline file: {ethics_path}")

最后是最复杂的数据源:来自 MIMIC-III 的结构化临床数据,为 Patient Cohort Analyst 提供真实世界人群数据,用以评估招募可行性。

import duckdb
import pandas as pd
import os

def load_real_mimic_data():
    """Loads real MIMIC-III CSVs into a persistent DuckDB database file, processing the massive LABEVENTS table efficiently."""
    print("Attempting to load real MIMIC-III data from local CSVs...")
    db_path = os.path.join(data_paths["mimic"], "mimic3_real.db")
    csv_dir = os.path.join(data_paths["mimic"], "mimiciii_csvs")

    # 定义所需的压缩CSV文件路径
    required_files = {
        "patients": os.path.join(csv_dir, "PATIENTS.csv.gz"),
        "diagnoses": os.path.join(csv_dir, "DIAGNOSES_ICD.csv.gz"),
        "labevents": os.path.join(csv_dir, "LABEVENTS.csv.gz"),
    }

    # 开始前,检查所有必要的源文件是否存在
    missing_files = [path for path in required_files.values() if not os.path.exists(path)]
    if missing_files:
        print("ERROR: The following MIMIC-III files were not found:")
        for f in missing_files: print(f"- {f}")
        print("nPlease download them as instructed and place them in the correct directory.")
        return None

    print("Required files found. Proceeding with database creation.")
    # 移除旧的数据库文件以确保从头构建
    if os.path.exists(db_path):
        os.remove(db_path)
    # 连接到DuckDB。如果数据库文件不存在,它将被创建。
    con = duckdb.connect(db_path)

    # 使用DuckDB强大的`read_csv_auto`函数,直接从gzip压缩的CSV文件加载数据到SQL表中
    print(f"Loading {required_files['patients']} into DuckDB...")
    con.execute(f"CREATE TABLE patients AS SELECT SUBJECT_ID, GENDER, DOB, DOD FROM read_csv_auto('{required_files['patients']}')")

    print(f"Loading {required_files['diagnoses']} into DuckDB...")
    con.execute(f"CREATE TABLE diagnoses_icd AS SELECT SUBJECT_ID, ICD9_CODE FROM read_csv_auto('{required_files['diagnoses']}')")

    # LABEVENTS表非常庞大。为了稳健处理,我们采用两阶段流程。
    print(f"Loading and processing {required_files['labevents']} (this may take several minutes)...")
    # 1. 将数据加载到一个临时的‘staging’表中,将所有列视为文本(`all_varchar=True`)。
    #    这可以防止混合数据类型导致的解析错误。同时,我们只筛选我们关心的实验室项目ID
    #    (50912代表肌酐,50852代表糖化血红蛋白),并使用正则表达式确保VALUENUM是数字。
    con.execute(f"""CREATE TABLE labevents_staging AS
                   SELECT SUBJECT_ID, ITEMID, VALUENUM
                   FROM read_csv_auto('{required_files['labevents']}', all_varchar=True)
                   WHERE ITEMID IN ('50912', '50852') AND VALUENUM IS NOT NULL AND VALUENUM ~ '^[0-9]+(\.[0-9]+)?$'
                """)
    # 2. 通过从staging表中选择并将列转换为正确的数字类型,来创建最终、干净的表。
    con.execute("CREATE TABLE labevents AS SELECT SUBJECT_ID, CAST(ITEMID AS INTEGER) AS ITEMID, CAST(VALUENUM AS DOUBLE) AS VALUENUM FROM labevents_staging")
    # 3. 删除临时staging表以节省空间。
    con.execute("DROP TABLE labevents_staging")
    con.close()
    return db_path

这里利用 DuckDB 直接从磁盘处理大型 CSV 文件,而不是用 pandas 全量读入内存。对于庞大的 LABEVENTS 表,采用两阶段清洗策略(先以 all_varchar 模式过滤,再强制转换类型),以稳健应对数据质量问题,最终得到清洁高效的查询表。

执行并检查:

# 执行函数以构建数据库
db_path = load_real_mimic_data()

# 如果数据库创建成功,则连接并检查其模式及部分样本数据
if db_path:
    print(f"n真实的 MIMIC-III 数据库已创建于: {db_path}")
    print("n正在测试数据库连接与模式...")
    con = duckdb.connect(db_path)
    print(f"数据库中的表: {con.execute('SHOW TABLES').df()['name'].tolist()}")
    print("n‘patients’ 表样例:")
    print(con.execute("SELECT * FROM patients LIMIT 5").df())
    print("n‘diagnoses_icd’ 表样例:")
    print(con.execute("SELECT * FROM diagnoses_icd LIMIT 5").df())
    con.close()

示例输出(略)显示三张表均已成功创建并可查询。

构建可自我进化的Agentic RAG系统:从医疗健康领域实践到通用设计模式

Pre-processing Step (Created by Fareed Khan)

最后,将所有非结构化文本数据索引为可供 RAG 系统检索的向量库:

from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document

def create_vector_store(folder_path: str, embedding_model, store_name: str):
    """加载指定文件夹下所有 .txt 文件,将其分割为文本块,并创建内存中的 FAISS 向量存储。"""
    print(f"--- 正在创建 {store_name} 向量存储 ---")
    # 使用 DirectoryLoader 高效加载指定文件夹下的所有 .txt 文件
    loader = DirectoryLoader(folder_path, glob="**/*.txt", loader_cls=TextLoader, show_progress=True)
    documents = loader.load()

    if not documents:
        print(f"在 {folder_path} 中未找到文档,跳过向量存储创建。")
        return None

    # 使用 RecursiveCharacterTextSplitter 将大文档分割为较小的、1000字符的块,重叠部分为100字符。
    # 重叠有助于在块之间保持上下文。
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
    texts = text_splitter.split_documents(documents)

    print(f"已加载 {len(documents)} 个文档,分割为 {len(texts)} 个文本块。")
    print("正在生成嵌入并索引到 FAISS...(这可能需要一些时间)")
    # FAISS.from_documents 是一个便捷函数,可一步完成文本块嵌入和高效 FAISS 索引的构建。
    db = FAISS.from_documents(texts, embedding_model)
    print(f"{store_name} 向量存储创建成功。")
    return db

def create_retrievers(embedding_model):
    """为所有非结构化数据源创建向量存储检索器,并整合所有知识存储。"""
    # 为每种文档类型创建独立的、专门的向量存储。
    pubmed_db = create_vector_store(data_paths["pubmed"], embedding_model, "PubMed")
    fda_db = create_vector_store(data_paths["fda"], embedding_model, "FDA")
    ethics_db = create_vector_store(data_paths["ethics"], embedding_model, "Ethics")

    # 返回一个包含所有已配置数据访问工具的字典。
    # ‘as_retriever’ 方法将向量存储转换为标准的 LangChain 检索器对象。
    # ‘search_kwargs’ 中的 ‘k’ 参数控制每次搜索返回的顶部文档数量。
    return {
        "pubmed_retriever": pubmed_db.as_retriever(search_kwargs={"k": 3}) if pubmed_db else None,
        "fda_retriever": fda_db.as_retriever(search_kwargs={"k": 3}) if fda_db else None,
        "ethics_retriever": ethics_db.as_retriever(search_kwargs={"k": 2}) if ethics_db else None,
        "mimic_db_path": db_path # 同时包含结构化 DuckDB 数据库的文件路径。
    }

create_vector_store 函数封装了“加载 -> 分割 -> 嵌入 -> 索引”的标准 RAG 构建流程;create_retrievers 则为每类语料构建独立的向量库并返回检索器字典。我们采用“分域向量库”而非“大一统”策略,以便各代理仅检索与其相关的知识源(例如 Regulatory Specialist 仅使用 fda_retriever)。

执行创建操作:

# 执行函数以创建所有检索器
knowledge_stores = create_retrievers(llm_config["embedding_model"])

print("n知识存储与检索器已成功创建。")

# 打印最终字典以确认所有组件均已就位
for name, store in knowledge_stores.items():
    print(f"{name}: {store}")

输出显示各检索器创建成功。

至此,数据(下载、处理、索引)与 LLM(配置)均已准备就绪,可以开始构建系统的第一个核心组件:试验设计公会。

构建内部临床试验设计网络

随着知识库准备完毕,现在开始构建系统核心。这并非一个简单的线性 RAG 链,而是一套基于 LangGraph 的协作式多代理工作流:一支由 AI 专家组成的团队,共同将高层次的试验概念转化为一份详细、有数据支撑的标准化文档。

构建可自我进化的Agentic RAG系统:从医疗健康领域实践到通用设计模式

Main Inner Loop RAG (Created by Fareed Khan)

整个架构的行为并非硬编码,而是由一个动态配置对象——标准作业程序(Standard Operating Procedure, GuildSOP)——来治理。

这个 SOP 是我们 RAG 流水线的“基因组”,也是外层的“AI Research Director”将要进化和优化的对象。

本节计划如下:
* 定义 RAG 基因组:创建 Pydantic 模型 GuildSOP,用于驱动整个工作流架构。
* 设计共享工作台:定义 GuildState,作为代理之间共享计划与发现的中央空间。
* 构建专家型代理:将 Planner、Researchers、SQL Analyst、Synthesizer 分别实现为 Python 函数,作为图中的节点。
* 编排协作:使用 LangGraph 将这些代理节点连接成完整的端到端工作流。
* 全量测试:使用基线 SOP 调用完整的公会图,观察其实际运行并生成第一版标准文档。

定义公会标准操作规程

首先定义控制整体流程行为的结构。我们使用 Pydantic BaseModel 创建 GuildSOP。通过强类型、数据验证和自文档化特性,确保 SOP 稳定且可进化。

构建可自我进化的Agentic RAG系统:从医疗健康领域实践到通用设计模式

Guild SOP Design (Created by Fareed Khan)

GuildSOP 是一个 Pydantic 模型,作为整个 RAG 工作流的动态配置中心。它定义了关键的操作参数,使外层的 AI Director 能够通过调整这些“策略杠杆”来优化系统性能。

from pydantic import BaseModel, Field
from typing import Literal

class GuildSOP(BaseModel):
    """临床试验设计公会的标准操作流程。此对象作为整个RAG工作流的动态配置。"""

    # Planner Agent 的系统提示词,用于定义其策略。
    planner_prompt: str = Field(description="Planner Agent 的系统提示词。")

    # 控制医学研究员(Medical Researcher)检索的文档数量,用于调节搜索广度。
    researcher_retriever_k: int = Field(description="医学研究员检索的文档数量。", default=3)

    # 最终合成器(Synthesizer Agent)的系统提示词。
    synthesizer_prompt: str = Field(description="标准合成器 Agent 的系统提示词。")

    # 动态指定最终起草阶段使用的LLM模型,在速度和质量之间进行权衡。
    synthesizer_model: Literal["qwen2:7b", "llama3.1:8b-instruct"] = Field(description="合成器使用的LLM。", default="qwen2:7b")

    # 布尔值作为“功能开关”,允许 Director 开启或关闭特定 Agent 的能力。
    use_sql_analyst: bool = Field(description="是否使用患者队列分析员 Agent。", default=True)
    use_ethics_specialist: bool = Field(description="是否使用伦理专家 Agent。", default=True)

GuildSOP 通过类型注解(如 Literal)确保配置参数的类型安全。我们可以基于它创建一个基线版本,作为初始的“手工工程”最佳猜测,供后续的 AI Director 进行优化。

import json

baseline_sop = GuildSOP(
    planner_prompt="""You are a master planner for clinical trial design...""",
    synthesizer_prompt="""You are an expert medical writer...""",
    researcher_retriever_k=3,
    synthesizer_model="qwen2:7b",
    use_sql_analyst=True,
    use_ethics_specialist=True
)

print("Baseline GuildSOP (v1.0):")
print(json.dumps(baseline_sop.dict(), indent=4))

定义专业智能体(Specialist Agents)

在定义了“规则书”(SOP)之后,接下来需要实现执行具体任务的智能体。在 LangGraph 框架中,每个智能体被实现为一个节点(Node),即一个 Python 函数,它接收当前图状态作为输入,并输出状态的增量更新。

构建可自我进化的Agentic RAG系统:从医疗健康领域实践到通用设计模式

Specialist Agents (Created by Fareed Khan)

首先,定义共享的协作状态 GuildState,它充当所有智能体共享的“工作台”,用于传递初始请求、计划、各专家的发现以及最终输出。

from typing import List, Dict, Any, Optional
from langchain_core.pydantic_v1 import BaseModel
from typing_extensions import TypedDict

class AgentOutput(BaseModel):
    """每个智能体发现结果的结构化输出。"""
    agent_name: str
    findings: Any

class GuildState(TypedDict):
    """临床试验设计公会工作流的状态,在所有节点间传递。"""
    initial_request: str
    plan: Optional[Dict[str, Any]]
    agent_outputs: List[AgentOutput]
    final_criteria: Optional[str]
    sop: GuildSOP

接着,实现 planner_agent。该智能体读取 SOP 中的 planner_prompt,并根据初始请求生成一个结构化的计划(JSON),用于指导后续所有专家智能体的工作。

def planner_agent(state: GuildState) -> GuildState:
    """接收初始请求,并为专家智能体创建结构化计划。"""
    print("--- EXECUTING PLANNER AGENT ---")

    sop = state['sop']

    planner_llm = llm_config['planner'].with_structured_output(schema={"plan": []})

    prompt = f"{sop.planner_prompt}nnTrial Concept: '{state['initial_request']}'"
    print(f"Planner Prompt:n{prompt}")

    response = planner_llm.invoke(prompt)
    print(f"Generated Plan:n{json.dumps(response, indent=2)}")

    return {**state, "plan": response}

然后,实现一个通用的“检索型代理”函数 retrieval_agent。该函数可被 Medical ResearcherRegulatory SpecialistEthics Specialist 等智能体复用,用于从指定的知识库中进行检索。

def retrieval_agent(task_description: str, state: GuildState, retriever_name: str, agent_name: str) -> AgentOutput:
    """一个通用的智能体函数,根据任务描述从指定的向量存储库执行检索。"""
    print(f"--- EXECUTING {agent_name.upper()} ---")
    print(f"Task: {task_description}")

    retriever = knowledge_stores[retriever_name]

    if agent_name == "Medical Researcher":
        retriever.search_kwargs['k'] = state['sop'].researcher_retriever_k
        print(f"Using k={state['sop'].researcher_retriever_k} for retrieval.")

    retrieved_docs = retriever.invoke(task_description)

    findings = "nn---nn".join([f"Source: {doc.metadata.get('source', 'N/A')}nn{doc.page_content}" for doc in retrieved_docs])
    print(f"Retrieved {len(retrieved_docs)} documents.")
    print(f"Sample Finding:n{findings[:500]}...")

    return AgentOutput(agent_name=agent_name, findings=findings)

`Patient Cohort Analyst` 是系统中最为复杂的代理,其核心功能是执行 Text-to-SQL 任务:将自然语言描述的患者筛选条件,转换为有效的 SQL 查询语句,并在 DuckDB 数据库上执行,最终给出可招募人群的估算数量。

```python
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

def patient_cohort_analyst(task_description: str, state: GuildState) -> AgentOutput:
    """Estimates cohort size by generating and then executing a SQL query against the MIMIC database."""
    print("--- EXECUTING PATIENT COHORT ANALYST ---")

    if not state['sop'].use_sql_analyst:
        print("SQL Analyst skipped as per SOP.")
        return AgentOutput(agent_name="Patient Cohort Analyst", findings="Analysis skipped as per SOP.")

    # 连接数据库并获取表结构信息
    con = duckdb.connect(knowledge_stores['mimic_db_path'])
    schema_query = """
    SELECT table_name, column_name, data_type
    FROM information_schema.columns
    WHERE table_schema = 'main' ORDER BY table_name, column_name;
    """
    schema = con.execute(schema_query).df()
    con.close()

    # 构建 SQL 生成提示词,包含数据库 Schema 和关键医学概念映射
    sql_generation_prompt = ChatPromptTemplate.from_messages([
        ("system", f"You are an expert SQL writer specializing in DuckDB. ... schema:n{schema.to_string()}nnIMPORTANT: All column names ...nnKey Mappings:n- T2DM ... ICD9_CODE '25000'.n- Moderate renal impairment ... creatinine ... ITEMID 50912 ... VALUENUM 1.5-3.0.n- Uncontrolled T2D ... HbA1c ... ITEMID 50852 ... VALUENUM > 8.0."),
        ("human", "Please write a SQL query to count the number of unique patients who meet the following criteria: {task}")
    ])

    sql_chain = sql_generation_prompt | llm_config['sql_coder'] | StrOutputParser()

    print(f"Generating SQL for task: {task_description}")
    sql_query = sql_chain.invoke({"task": task_description})
    sql_query = sql_query.strip().replace("```sql", "").replace("```", "")
    print(f"Generated SQL Query:n{sql_query}")

    # 执行生成的 SQL 查询
    try:
        con = duckdb.connect(knowledge_stores['mimic_db_path'])
        result = con.execute(sql_query).fetchone()
        patient_count = result[0] if result else 0
        con.close()

        findings = f"Generated SQL Query:n{sql_query}nnEstimated eligible patient count from the database: {patient_count}."
        print(f"Query executed successfully. Estimated patient count: {patient_count}")
    except Exception as e:
        findings = f"Error executing SQL query: {e}. Defaulting to a count of 0."
        print(f"Error during query execution: {e}")

    return AgentOutput(agent_name="Patient Cohort Analyst", findings=findings)

最后是 criteria_synthesizer 代理,它的职责是将所有专家代理的结构化发现,汇总并整合为一份正式的“入排标准(Inclusion/Exclusion Criteria)”文档。该代理支持根据 SOP 动态切换用于合成的语言模型。

def criteria_synthesizer(state: GuildState) -> GuildState:
    """Synthesizes all the structured findings from the specialist agents into the final criteria document."""
    print("--- EXECUTING CRITERIA SYNTHESIZER ---")

    sop = state['sop']
    drafter_llm = ChatOllama(model=sop.synthesizer_model, temperature=0.2)

    # 汇集所有专家代理的输出作为上下文
    context = "nn---nn".join([f"**{out.agent_name} Findings:**n{out.findings}" for out in state['agent_outputs']])

    prompt = f"{sop.synthesizer_prompt}nn**Context from Specialist Teams:**n{context}"
    print(f"Synthesizer is using model '{sop.synthesizer_model}'.")

    response = drafter_llm.invoke(prompt)
    print("Final criteria generated.")

    return {**state, "final_criteria": response.content}

使用 LangGraph 进行工作流编排

我们将上述代理节点通过 LangGraph 进行编排,形成 Planner → 专家并行执行 → Synthesizer 的流程。

构建可自我进化的Agentic RAG系统:从医疗健康领域实践到通用设计模式

Guild with langgraph (Created by Fareed Khan)

首先定义一个“调度节点”,它根据 Planner 生成的计划,将任务分派给对应的专家代理。

from langgraph.graph import StateGraph, END

def specialist_execution_node(state: GuildState) -> GuildState:
    """This node acts as a dispatcher, executing all specialist tasks defined in the plan."""
    plan_tasks = state['plan']['plan']
    outputs = []

    for task in plan_tasks:
        agent_name = task['agent']
        task_desc = task['task_description']

        if "Regulatory" in agent_name:
            output = retrieval_agent(task_desc, state, "fda_retriever", "Regulatory Specialist")
        elif "Medical" in agent_name:
            output = retrieval_agent(task_desc, state, "pubmed_retriever", "Medical Researcher")
        elif "Ethics" in agent_name and state['sop'].use_ethics_specialist:
            output = retrieval_agent(task_desc, state, "ethics_retriever", "Ethics Specialist")
        elif "Cohort" in agent_name:
            output = patient_cohort_analyst(task_desc, state)
        else:
            continue

        outputs.append(output)
    return {**state, "agent_outputs": outputs}

接下来,构建并编译整个工作流图。

workflow = StateGraph(GuildState)

workflow.add_node("planner", planner_agent)
workflow.add_node("execute_specialists", specialist_execution_node)
workflow.add_node("synthesizer", criteria_synthesizer)

workflow.set_entry_point("planner")
workflow.add_edge("planner", "execute_specialists")
workflow.add_edge("execute_specialists", "synthesizer")
workflow.add_edge("synthesizer", END)

guild_graph = workflow.compile()
print("Graph compiled successfully.")

至此,一个完整的“内循环(Inner Loop)”多代理 RAG 管线已搭建完毕。

完整运行公会工作流图

最后,使用基线 SOP 和一个真实的临床试验概念,对整个系统进行端到端测试。此举旨在验证所有代理、数据存储与编排逻辑能否正常协作,并产出首个“基线(baseline)”输出,为后续的评估与系统进化环路提供基础。

构建可自我进化的Agentic RAG系统:从医疗健康领域实践到通用设计模式

Run Workflow (Created by Fareed Khan)

test_request = "Draft inclusion/exclusion criteria for a Phase II trial of 'Sotagliflozin', a novel SGLT2 inhibitor, for adults with uncontrolled Type 2 Diabetes (HbA1c > 8.0%) and moderate chronic kidney disease (CKD Stage 3)."

print("Running the full Guild graph with baseline SOP v1.0...")
graph_input = {
    "initial_request": test_request,
    "sop": baseline_sop
}
final_result = guild_graph.invoke(graph_input)
print("nFinal Guild Output:")
print("---------------------")
print(final_result['final_criteria'])

系统执行日志会展示每个智能体的调用过程,最终生成一份结构化的临床试验入排标准文档。至此,我们成功构建并验证了一个基于真实数据源的多智能体RAG工作流。

多维度评价体系

一个能够自我进化的系统,必须具备评估自身表现的能力。这需要超越单一指标(如准确率),建立一套多维度的质量评估体系。我们将构建一个多维评估套件,依据“五大支柱”原则对Guild的输出进行评分,从而为外层的进化循环提供丰富且可操作的反馈信号。

构建可自我进化的Agentic RAG系统:从医疗健康领域实践到通用设计模式

Multi-dimension Eval (Created by Fareed Khan)

本节将实现以下评估模块:

  • LLM-as-a-Judge:使用 llama3:70b 模型构建三位“专家评委”,分别评估科学严谨性法规合规性伦理健全性
  • 程序化评估:编写两个快速、可靠、客观的函数,评估招募可行性操作简易性
  • 汇总评估器:将五个单项评估封装为一个总评函数,接收Guild的输出并生成一个五维性能向量,供AI Director进行决策。

为每个维度构建自定义评估器

首先,定义LLM评委的统一输出数据结构:

from langchain_core.pydantic_v1 import BaseModel, Field

class GradedScore(BaseModel):
    """用于结构化LLM-as-a-Judge评估器输出的Pydantic模型。"""
    score: float = Field(description="评分,范围从0.0到1.0")
    reasoning: str = Field(description="评分的简要理由。")
  1. 科学严谨性评估器
from langchain_core.prompts import ChatPromptTemplate

def scientific_rigor_evaluator(generated_criteria: str, pubmed_context: str) -> GradedScore:
    evaluator_llm = llm_config['director'].with_structured_output(GradedScore)
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are an expert clinical scientist. ..."),
        ("human", "Evaluate the following criteria:nn**Generated Criteria:**n{criteria}nn**Supporting Scientific Context:**n{context}")
    ])
    chain = prompt | evaluator_llm
    return chain.invoke({"criteria": generated_criteria, "context": pubmed_context})
  1. 法规合规性评估器
def regulatory_compliance_evaluator(generated_criteria: str, fda_context: str) -> GradedScore:
    evaluator_llm = llm_config['director'].with_structured_output(GradedScore)
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are an expert regulatory affairs specialist. ..."),
        ("human", "Evaluate the following criteria:nn**Generated Criteria:**n{criteria}nn**Applicable FDA Guidelines:**n{context}")
    ])
    chain = prompt | evaluator_llm
    return chain.invoke({"criteria": generated_criteria, "context": fda_context})
  1. 伦理健全性评估器
def ethical_soundness_evaluator(generated_criteria: str, ethics_context: str) -> GradedScore:
    evaluator_llm = llm_config['director'].with_structured_output(GradedScore)
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are an expert on clinical trial ethics. ..."),
        ("human", "Evaluate the following criteria:nn**Generated Criteria:**n{criteria}nn**Ethical Principles:**n{context}")
    ])
    chain = prompt | evaluator_llm
    return chain.invoke({"criteria": generated_criteria, "context": ethics_context})
  1. 招募可行性评估器(程序化)
def feasibility_evaluator(cohort_analyst_output: AgentOutput) -> GradedScore:
    findings_text = cohort_analyst_output.findings
    try:
        count_str = findings_text.split("database: ")[1].replace('.', '')
        patient_count = int(count_str)
    except (IndexError, ValueError):
        return GradedScore(score=0.0, reasoning="Could not parse patient count from analyst output.")

    IDEAL_COUNT = 150.0
    score = min(1.0, patient_count / IDEAL_COUNT)
    reasoning = f"Estimated {patient_count} eligible patients. Score is normalized against an ideal target of {int(IDEAL_COUNT)}."
    return GradedScore(score=score, reasoning=reasoning)
  1. 操作简易性评估器(程序化)
def simplicity_evaluator(generated_criteria: str) -> GradedScore:
    EXPENSIVE_TESTS = ["mri", "genetic sequencing", "pet scan", "biopsy", "echocardiogram", "endoscopy"]
    test_count = sum(1 for test in EXPENSIVE_TESTS if test in generated_criteria.lower())
    score = max(0.0, 1.0 - (test_count * 0.5))
    reasoning = f"Found {test_count} expensive/complex screening procedures mentioned."
    return GradedScore(score=score, reasoning=reasoning)

创建聚合型LangSmith评估器

定义总评结果模型与汇总函数:

class EvaluationResult(BaseModel):
    rigor: GradedScore
    compliance: GradedScore
    ethics: GradedScore
    feasibility: GradedScore
    simplicity: GradedScore
def run_full_evaluation(guild_final_state: GuildState) -> EvaluationResult:
    """Orchestrates the entire evaluation process, calling each of the five specialist evaluators."""
    print("--- RUNNING FULL EVALUATION GAUNTLET ---")

    final_criteria = guild_final_state['final_criteria']
    agent_outputs = guild_final_state['agent_outputs']

    pubmed_context = next((o.findings for o in agent_outputs if o.agent_name == "Medical Researcher"), "")
    fda_context = next((o.findings for o in agent_outputs if o.agent_name == "Regulatory Specialist"), "")
    ethics_context = next((o.findings for o in agent_outputs if o.agent_name == "Ethics Specialist"), "")
    analyst_output = next((o for o in agent_outputs if o.agent_name == "Patient Cohort Analyst"), None)

    print("Evaluating: Scientific Rigor...")
    rigor = scientific_rigor_evaluator(final_criteria, pubmed_context)
    print("Evaluating: Regulatory Compliance...")
    compliance = regulatory_compliance_evaluator(final_criteria, fda_context)
    print("Evaluating: Ethical Soundness...")
    ethics = ethical_soundness_evaluator(final_criteria, ethics_context)
    print("Evaluating: Recruitment Feasibility...")
    feasibility = feasibility_evaluator(analyst_output) if analyst_output else GradedScore(score=0, reasoning="Analyst did not run.")
    print("Evaluating: Operational Simplicity...")
    simplicity = simplicity_evaluator(final_criteria)

    print("--- EVALUATION GAUNTLET COMPLETE ---")
    return EvaluationResult(rigor=rigor, compliance=compliance, ethics=ethics, feasibility=feasibility, simplicity=simplicity)

对基线输出运行评估,示例结果显示在“可行性”维度得分明显偏低(0.39),这为外层的 AI Director 指明了明确的改进方向。

进化引擎的外层循环

接下来构建系统的“大脑”——“AI Research Director”(外层进化回路)。它的核心职责并非直接设计试验方案,而是优化“设计试验方案”这一过程本身:通过分析5D评估向量、诊断性能瓶颈的根本原因、并智能地改写 GuildSOP。这是系统实现学习与自适应的核心机制。

构建可自我进化的Agentic RAG系统:从医疗健康领域实践到通用设计模式

Outer Loop (Created by Fareed Khan)

本节将涵盖以下内容:
* 创建“基因池”:用于管理 SOP 的演化版本及其对应的评估分数,形成可追溯的“基因”历史。
* 设计主任级智能体:Performance Diagnostician 负责识别性能弱点;SOP Architect 负责提出改进方案。
* 架构进化循环:定义完整的单代进化流程:诊断 → 演化 → 评估。
* 运行一次全流程:展示系统如何自主发现“可行性”弱点,并生成新的 SOP 变体来修复它。

管理配置

定义 SOPGenePool 类,用于存储 SOP、其评估结果及“父版本”信息:

class SOPGenePool:
    def __init__(self):
        self.pool: List[Dict[str, Any]] = []
        self.version_counter = 0

    def add(self, sop: GuildSOP, eval_result: EvaluationResult, parent_version: Optional[int] = None):
        self.version_counter += 1
        entry = {
            "version": self.version_counter,
            "sop": sop,
            "evaluation": eval_result,
            "parent": parent_version
        }
        self.pool.append(entry)
        print(f"Added SOP v{self.version_counter} to the gene pool.")

    def get_latest_entry(self) -> Optional[Dict[str, Any]]:
        return self.pool[-1] if self.pool else None

构建主任级智能体

首先是 Performance Diagnostician,它分析5D评估向量并给出结构化诊断:

class Diagnosis(BaseModel):
    primary_weakness: Literal['rigor', 'compliance', 'ethics', 'feasibility', 'simplicity']
    root_cause_analysis: str = Field(...)
    recommendation: str = Field(...)

def performance_diagnostician(eval_result: EvaluationResult) -> Diagnosis:
    print("--- EXECUTING PERFORMANCE DIAGNOSTICIAN ---")
    diagnostician_llm = llm_config['director'].with_structured_output(Diagnosis)
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a world-class management consultant ..."),
        ("human", "Please analyze the following performance evaluation report:nn{report}")
    ])
    chain = prompt | diagnostician_llm
    return chain.invoke({"report": eval_result.json()})

其次是 SOP Architect,它根据诊断结果和当前 SOP 生成多个改进后的 SOP 变体作为候选:

class EvolvedSOPs(BaseModel):
    mutations: List[GuildSOP]

def sop_architect(diagnosis: Diagnosis, current_sop: GuildSOP) -> EvolvedSOPs:
    print("--- EXECUTING SOP ARCHITECT ---")
    architect_llm = llm_config['director'].with_structured_output(EvolvedSOPs)
    prompt = ChatPromptTemplate.from_messages([
        ("system", f"You are an AI process architect. ... schema: {GuildSOP.schema_json()} ..."),
        ("human", "Here is the current SOP:n{current_sop}nnHere is the performance diagnosis:n{diagnosis}nnBased on the diagnosis, please generate 2-3 new, improved SOPs.")
    ])
    chain = prompt | architect_llm
    return chain.invoke({"current_sop": current_sop.json(), "diagnosis": diagnosis.json()})

运行完整的进化循环

封装一次完整的进化循环

以下代码封装了一次完整的进化循环,展示了系统如何基于诊断结果生成并评估新的SOP候选方案。

def run_evolution_cycle(gene_pool: SOPGenePool, trial_request: str):
    print("n" + "="*25 + " STARTING NEW EVOLUTION CYCLE " + "="*25)

    # 1. 获取当前最优SOP作为父代
    current_best_entry = gene_pool.get_latest_entry()
    parent_sop = current_best_entry['sop']
    parent_eval = current_best_entry['evaluation']
    parent_version = current_best_entry['version']
    print(f"Improving upon SOP v{parent_version}...")

    # 2. 诊断父代SOP的性能瓶颈
    diagnosis = performance_diagnostician(parent_eval)
    print(f"Diagnosis complete. Primary Weakness: '{diagnosis.primary_weakness}'. Recommendation: {diagnosis.recommendation}")

    # 3. 基于诊断结果,由架构师生成新的SOP候选方案
    new_sop_candidates = sop_architect(diagnosis, parent_sop)
    print(f"Generated {len(new_sop_candidates.mutations)} new SOP candidates.")

    # 4. 对每个候选SOP进行完整测试与评估
    for i, candidate_sop in enumerate(new_sop_candidates.mutations):
        print(f"n--- Testing SOP candidate {i+1}/{len(new_sop_candidates.mutations)} ---")
        guild_input = {"initial_request": trial_request, "sop": candidate_sop}
        final_state = guild_graph.invoke(guild_input)

        eval_result = run_full_evaluation(final_state)
        gene_pool.add(sop=candidate_sop, eval_result=eval_result, parent_version=parent_version)

    print("n" + "="*25 + " EVOLUTION CYCLE COMPLETE " + "="*26)

系统初始化基因池并加入基线SOP后,运行一轮进化循环。典型的输出结果显示:诊断模块识别出“可行性(Feasibility)”是主要弱项;架构师据此生成了两个候选SOP;测试后,其中一个候选SOP(v2)显著提升了可行性得分(例如从0.5提升至0.81),仅以微小的严谨性(Rigor)代价换取了巨大的实际收益;而另一个候选SOP(v3)则未能带来改进。

基于五维度的帕累托分析

完成一代进化循环后,需要对结果进行多目标优化分析。在多目标问题中,通常不存在单一的“最优”解,而是存在一个“帕累托前沿(Pareto Frontier)”。我们的目标是识别这一前沿,并将其呈现给人类决策者。

本节计划:
* 分析基因池:打印所有SOP及其五维评分的摘要,以观察变体的直接影响。
* 识别帕累托前沿:编写函数,程序化地识别基因池中的非支配解。
* 可视化前沿:使用并行坐标图展示五个维度之间的权衡关系,使取舍一目了然。

打印摘要的步骤在此省略。以下是识别帕累托前沿的核心函数:

import numpy as np

def identify_pareto_front(gene_pool: SOPGenePool) -> List[Dict[str, Any]]:
    pareto_front = []
    pool_entries = gene_pool.pool

    for i, candidate in enumerate(pool_entries):
        is_dominated = False
        # 提取当前候选SOP的五维评分数组
        cand_scores = np.array([s['score'] for s in candidate['evaluation'].dict().values()])

        # 与其他所有SOP进行比较
        for j, other in enumerate(pool_entries):
            if i == j: continue
            other_scores = np.array([s['score'] for s in other['evaluation'].dict().values()])
            # 判断当前解是否被“支配”:其他解在所有维度上都不差,且至少一个维度更好
            if np.all(other_scores >= cand_scores) and np.any(other_scores > cand_scores):
                is_dominated = True
                break
        # 如果未被任何其他解支配,则属于帕累托前沿
        if not is_dominated:
            pareto_front.append(candidate)
    return pareto_front

运行此函数后,通常会发现v1(基线)和v2(进化后)共同构成了帕累托前沿:v1代表“最大化严谨性”的策略,而v2代表“高可行性”的策略。在实际决策中,如何取舍取决于具体的业务优先级。

可视化帕累托前沿

为了更直观地理解权衡关系,我们使用二维散点图(严谨性 vs. 可行性)与五维并行坐标图进行可视化。

import matplotlib.pyplot as plt
import pandas as pd

def visualize_frontier(pareto_sops):
    if not pareto_sops:
        print("No SOPs on the Pareto front to visualize.")
        return
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(18, 7))

    # 准备二维散点图数据
    labels = [f"v{s['version']}" for s in pareto_sops]
    rigor_scores = [s['evaluation'].rigor.score for s in pareto_sops]
    feasibility_scores = [s['evaluation'].feasibility.score for s in pareto_sops]

    ax1.scatter(rigor_scores, feasibility_scores, s=200, alpha=0.7, c='blue')
    for i, txt in enumerate(labels):
        ax1.annotate(txt, (rigor_scores[i], feasibility_scores[i]), xytext=(10,-10), textcoords='offset points', fontsize=14)
    ax1.set_title('Pareto Frontier: Rigor vs. Feasibility', fontsize=16)
    ax1.set_xlabel('Scientific Rigor Score', fontsize=14)
    ax1.set_ylabel('Recruitment Feasibility Score', fontsize=14)
    ax1.grid(True, linestyle='--', alpha=0.6)
    ax1.set_xlim(min(rigor_scores)-0.05, max(rigor_scores)+0.05)
    ax1.set_ylim(min(feasibility_scores)-0.1, max(feasibility_scores)+0.1)

    # 准备五维并行坐标图数据
    data = []
    for s in pareto_sops:
        eval_dict = s['evaluation'].dict()
        scores = {k.capitalize(): v['score'] for k, v in eval_dict.items()}
        scores['SOP Version'] = f"v{s['version']}"
        data.append(scores)

    df = pd.DataFrame(data)
    pd.plotting.parallel_coordinates(df, 'SOP Version', colormap=plt.get_cmap("viridis"), ax=ax2, axvlines_kwargs={"linewidth": 1, "color": "grey"})
    ax2.set_title('5D Performance Trade-offs on Pareto Front', fontsize=16)
    ax2.grid(True, which='major', axis='y', linestyle='--', alpha=0.6)
    ax2.set_ylabel('Normalized Score', fontsize=14)
    ax2.legend(loc='lower center', bbox_to_anchor=(0.5, -0.15), ncol=len(labels))
    plt.tight_layout()
    plt.show()

可视化结果直观地展示了v1与v2在各维度的差异:两者在合规性(Compliance)、伦理性(Ethics)和简洁性(Simplicity)上表现几乎一致,主要是在严谨性(Rigor)与可行性(Feasibility)之间形成了典型的权衡关系(在图上呈现为“交叉”形态)。

可视化前沿并做出决策

我们已经从宏观层面(进化循环、帕累托前沿)看到了系统如何自我改进。现在需要从微观层面理解一次“高表现”运行的内部过程:智能体如何协作?瓶颈在哪里?多维得分如何转化为可视化的性能剖面?

构建可自我进化的Agentic RAG系统:从医疗健康领域实践到通用设计模式

计划如下:
* 工作流仪表化:精确记录每个智能体任务的开始时间、结束时间和耗时。
* 可视化执行时间线:使用甘特图呈现整个工作流,清晰显示并行与串行执行阶段。
* 对比性能剖面:使用雷达图对比基线SOP与进化后SOP在五个维度的表现差异。

理解认知工作流

使用图的 .stream() 方法逐节点获取事件并记录时间戳:

import time
from collections import defaultdict

def invoke_with_timing(graph, sop, request):
    """调用 Guild 图并捕获每个节点的开始和结束时间。"""
    print(f"--- 为 SOP 启动图运行计时: {sop.dict()} ---")

    timing_data = []
    start_times = defaultdict(float)

    graph_input = {"initial_request": request, "sop": sop}

    for event in graph.stream(graph_input, stream_mode="values"):
        node_name = list(event.keys())[0]
        end_time = time.time()

        if node_name not in start_times:
            start_times[node_name] = end_time - 0.1

        start_time = end_time - duration
        timing_data.append({
            "node": node_name,
            "start_time": start_time,
            "end_time": end_time,
            "duration": duration
        })
        start_times[node_name] = start_time

    overall_start_time = min(d['start_time'] for d in timing_data)
    for data in timing_data:
        data['start_time'] -= overall_start_time
        data['end_time'] -= overall_start_time

    final_state = event[list(event.keys())[-1]]
    return final_state, timing_data

对 v2 版本执行并捕获时序数据,示例输出显示 execute_specialists 是主要耗时阶段,符合预期。

绘制甘特图以可视化执行流程:

import matplotlib.pyplot as plt

def plot_gantt_chart(timing_data: List[Dict[str, Any]], title: str):
    """根据时序数据绘制智能体工作流的甘特图。"""
    fig, ax = plt.subplots(figsize=(12, 4))

    labels = [d['node'] for d in timing_data]
    ax.barh(labels, [d['duration'] for d in timing_data], left=[d['start_time'] for d in timing_data], color='skyblue')

    ax.set_xlabel('时间 (秒)')
    ax.set_title(title, fontsize=16)
    ax.grid(True, which='major', axis='x', linestyle='--', alpha=0.6)
    ax.invert_yaxis()
    plt.show()

甘特图清晰地展示了顶层的串行流程以及内部的并行机会,提示性能优化应聚焦于 execute_specialists 阶段。

使用雷达图剖析输出结果

使用雷达图对比基线 v1 版本与进化后 v2 版本在五个维度上的表现剖面:

import pandas as pd
import numpy as np

def plot_radar_chart(eval_results: List[Dict[str, Any]], labels: List[str]):
    """创建雷达图以比较多个 SOP 在 5D 维度上的性能。"""
    categories = ['严谨性', '合规性', '伦理性', '可行性', '简洁性']
    num_vars = len(categories)
    angles = np.linspace(0, 2 * np.pi, num_vars, endpoint=False).tolist()
    angles += angles[:1]

    fig, ax = plt.subplots(figsize=(8, 8), subplot_kw=dict(polar=True))

    for i, result in enumerate(eval_results):
        values = [res.score for res in result.dict().values()]
        values += values[:1]
        ax.plot(angles, values, linewidth=2, linestyle='solid', label=labels[i])
        ax.fill(angles, values, alpha=0.25)

    ax.set_yticklabels([])
    ax.set_xticks(angles[:-1])
    ax.set_xticklabels(categories, fontsize=12)
    ax.set_title('5D 性能剖面比较', size=20, color='blue', y=1.1)
    plt.legend(loc='upper right', bbox_to_anchor=(1.3, 1.1))
    plt.show()

图中可见,两个版本在合规性、伦理性和简洁性上表现都很强;v1 在严谨性上略优,而 v2 在可行性上显著优越,清晰地呈现了设计权衡。

自主进化策略

至此,我们已经设计、构建并演示了一套可自我改进的智能体系统。这不仅是一个具体的解决方案,更是一套可扩展的基础架构,其核心包括:分层代理设计、动态 SOP、多维评估和自动进化。这些原则为未来的发展打开了广阔的空间:

  1. 持续运行进化循环:当前仅完成了一代进化,未来可连续迭代数百代,以发现更丰富、更多样化的帕累托前沿(即经过实战检验的 SOP)。
  2. 将 Director 的推理过程蒸馏为更小的策略模型:基于成功变体的历史数据进行训练,用更快、更便宜的专用模型替换 70B 参数的 Director,使进化过程更高效。
  3. 让 AI Director 动态调整 Guild 的结构:根据试验概念的需求,学习增加或删除专家(例如新增“生物统计学家”),实现团队层面的进化。
  4. 用实时 API 替换静态的 MIMIC-III 数据集:将 Patient Cohort Analyst 连接到安全的实时电子健康记录系统,使可行性评估基于最新的患者数据。
  5. 强化 SOP Architect 的进化操作符:引入“交叉”等遗传算法机制,融合不同成功 SOP 的优势,加速新策略的发现。
  6. 融合人类专家反馈:将临床科学家的评分接入评估回路,用专家判断作为最终的“奖励信号”,引导系统趋向于“技术最优 + 实践卓越”的方案。

关注“鲸栖”小程序,掌握最新AI资讯

本文由鲸栖原创发布,未经许可,请勿转载。转载请注明出处:http://www.itsolotime.com/archives/13448

(0)
上一篇 2025年11月18日 下午12:56
下一篇 2025年11月19日 上午11:48

相关推荐

  • 构建本体驱动GraphRAG:从数据填埋场到零噪声知识图谱的蜕变之路

    构建一个自我演进的知识图谱,它不仅能存储数据,更能理解、校验并持续演化。 gemini 在初次构建 GraphRAG 系统时,我遵循了多数教程的路径:将文档输入大语言模型(LLM),抽取实体,将生成的 JSON 导入 Neo4j,然后宣告完成。在演示环境中,一切运行完美。直到我将它应用于真实的医疗记录。 问题随之暴露。LLM 在一份报告中抽取了“John D…

    6天前
    400
  • 告别手动造数据:5款高效生成逼真测试数据的开发者利器

    几乎每位开发者都经历过因缺少数据而测试受阻的时刻。无论是测试一个API、一个表单还是一个数据看板,如果没有足够真实的数据输入,测试结果往往缺乏参考价值。手动编造假邮箱、手机号或地址,对付几行数据尚可,一旦需要成百上千条记录,就会变成一项耗时且枯燥的苦差事。 为了进行有效的测试,我们需要结构化且逼真的应用数据。无论是验证分页逻辑的稳健性,还是观察API在面对混…

    2025年12月5日
    200
  • AI在线强化学习实现“实践式学习”,斯坦福团队助力7B小模型性能大幅提升,表现超越GPT-4o

    斯坦福团队推出AgentFlow框架,通过在线强化学习让仅7B参数的小模型在流式协作中“边做边学”。该方法使模型在搜索、数学等10项任务中性能显著提升,部分表现甚至超越了GPT-4o等超大模型,证明了优化系统设计可突破模型规模限制。

    2025年10月24日
    20900
  • 构建智能数据库对话助手:基于RAG的Text-to-SQL聊天机器人实战

    本项目构建了一个由 AI 驱动的聊天机器人,能够将自然语言问题转换为 SQL 查询,并直接从 SQLite 数据库中检索答案。该应用结合了 LangChain、Hugging Face Embeddings 和 Chroma 向量存储,通过检索增强生成(RAG)工作流,将非结构化的用户输入与结构化数据库连接起来,并配备了 FastAPI 后端与 Stream…

    2025年11月4日
    500
  • LangGraph实战:单智能体与多智能体系统的性能对比与架构解析

    在 LangGraph 中基于结构化数据源构建 在 LangGraph 中构建不同的 agent 系统 | Image by author 对于希望构建不同智能体系统的开发者而言,一个有效的切入点是深入比较单智能体工作流与多智能体工作流,这本质上是评估系统设计的灵活性与可控性之间的权衡。 本文旨在阐明 Agentic AI 的核心概念,并演示如何利用 Lan…

    2025年11月2日
    200

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注