在智能体(Agentic)系统中,无论是用于工具调用还是复杂推理,其行动通常由提示词(Prompts)引导。然而,传统的提示词是静态的,它们仅能提供行动步骤,却无法实现自我进化。真正的智能体训练(Agentic Training)源于系统在动态环境中的学习、适应与协作能力。
在智能体架构中,每个子智能体(Sub-Agent)的目标各异,这意味着单一的算法无法普适。要构建更有效的系统,我们需要一个整合了推理(Reasoning)、奖励(Reward)与实时反馈(Real-time Feedback)的完整训练架构。一个典型的智能体系统训练架构包含以下相互连接的组件:

Agentic Training Architecture (Created by Fareed Khan)
- 定义训练基础:通过配置环境、初始化智能体状态,并使其目标与系统整体目标对齐,来奠定训练基石。
- 搭建分布式训练管道:使多个智能体能够并行交互与学习,并通过共享内存或日志交换知识。
- 集成强化学习层:利用监督微调(SFT,适用于入门)、近端策略优化(PPO,用于高级优化)、上下文赌博机(Contextual Bandits,用于自适应决策)等算法驱动自我改进。
- 接入可观测性与监控工具:集成追踪钩子(Tracing Hooks)与日志适配器(Logging Adapters),以实时捕获每一次交互与学习步骤。
- 设计动态奖励系统:使智能体能基于其性能表现、目标对齐程度以及对整体任务的贡献获得反馈。
- 创建多阶段训练循环:让智能体按阶段推进训练,从监督微调平滑过渡到基于强化学习的自适应阶段。
- 评估与持续改进:通过分析奖励曲线、性能指标以及各角色的定性行为,持续评估并迭代优化整个架构。
在本系列文章中,我们将……
构建一个融合了推理、协作与强化学习的完整多智能体系统,使智能体能够通过实时反馈与奖励机制进行动态适应与自我改进。
全部实现代码可在以下 GitHub 仓库获取:
GitHub – FareedKhan-dev/training-ai-agents: Training architecture for self-improving AI agents.
技术总览
构建生产级 AI 系统时,我们不会直接从算法入手,而是首先夯实整个系统的基础。这个初始配置阶段至关重要,从依赖库的选择到数据源的确定,每一个决策都将直接影响最终训练出的智能体的可靠性与可复现性。
本节我们将完成以下准备工作:
- 安装分层训练所需的核心库与专用依赖。
- 配置 API 密钥,避免硬编码,并将 LangSmith 项目接入可观测性平台。
- 下载并处理 PubMedQA 数据集,为智能体构建高质量的知识语料库。
- 设计中心化的智能体状态管理机制(共享内存),以支持协作与推理。
- 为智能体配备必要的工具,如模拟数据库、实时网络搜索等,用于与外部环境交互。
配置研究环境
首先需要设置 Python 环境。我们不使用简单的 pip install,而是采用 uv——一个快速、现代的包管理器,它能确保环境搭建既快速又可复现,非常适合生产环境。
我们还将安装 agent-lightning 的额外组件(用于 PPO),以及 apo(异步策略优化)和 unsloth(高效监督微调),这些对于实现高级的分层训练策略至关重要。
print("Updating and installing system packages...")
# 首先更新系统包列表,并安装'uv'和'graphviz'。
# 'graphviz'是LangGraph可视化智能体工作流所需的系统依赖。
!apt-get update -qq && apt-get install -y -qq uv graphviz
print("nInstalling packages...n")
# 使用'uv'安装Python依赖。
# 安装'agent-lightning[verl,apo]'以获取PPO和其他高级强化学习算法所需的组件。
# 'unsloth[pt231]'提供了一个高度优化的监督微调框架,我们将用于初级研究员智能体。
!uv pip install -q -U "langchain" "langgraph" "langchain_openai" "tavily-python" "agentlightning[verl,apo]" "unsloth[pt231]" "pandas" "scikit-learn" "rich" "wandb" "datasets" "pyarrow"
print("Successfully installed all required packages.")
开始安装……
#### OUTPUT ####
Updating and installing system packages...
...
Installing packages...
Resolved 178 packages in 3.12s
...
+ agentlightning==0.2.2
+ langchain==0.2.5
+ langgraph==0.1.5
+ unsloth==2024.5
+ verl==0.6.0
...
Successfully installed all required packages.
安装 graphviz 赋予了 LangGraph 可视化能力,这对后续调试我们复杂的智能体社会将非常有用。更重要的是,agentlightning 配合 verl 和 unsloth 额外组件,为我们的分层训练策略提供了所需的高性能后端。
至此,我们已经建立了一个稳定、完整的基础环境,可以开始预处理训练数据了。
构建医学知识库
每个机器学习系统都需要训练数据,或者至少是一些用于启动自学习的初始观测数据。
我们的智能体无法在孤立中进行有效推理,它们需要丰富且特定领域的信息源。

Pre-processing Knowledge base data (Created by Fareed Khan)
静态、硬编码的事实列表过于简单。为了构建一个真实且具有挑战性的研究环境,我们将使用 PubMedQA 数据集,特别是其带标注的子集 pqa_l。
该数据集包含真实的生物医学问题、提供必要上下文的原始科学摘要,以及由人类专家给出的最终“是/否/可能”答案。这不仅为我们的智能体提供了可搜索的丰富信息源,也为强化学习循环中的奖励计算提供了基准真相(Ground Truth)。
首先,我们定义一个 TypedDict 来组织每个研究任务,确保数据在整个处理流程中保持干净和一致。
from typing import List, TypedDict
# TypedDict 提供了一种清晰、结构化的方式来表示每个研究任务。
# 这使得我们的代码更易读,并减少了使用普通字典可能导致的错误。
class ResearchTask(TypedDict):
id: str # 文章的唯PubMed ID
goal: str # 智能体必须调查的研究问题
context: str # 提供必要证据的完整科学摘要
expected_decision: str # 基准真相答案(‘yes’, ‘no’, 或 ‘maybe’)
我们创建了 ResearchTask 这个基于 TypedDict 的“任务蓝图”。它并非普通字典,而是一个数据结构契约。每个任务都严格包含 id、goal、context 与 expected_decision 字段。这种严格的类型约束有助于防止后续出现错误,并确保系统各个组件对数据的期望形态有清晰一致的理解。
接下来,我们将编写一个函数,从 Hugging Face Hub 下载数据集,将其转换为 ResearchTask 格式,并进行训练集与验证集的划分。独立的验证集对于客观评估智能体训练效果至关重要。
from datasets import load_dataset
import pandas as pd
def load_and_prepare_dataset() -> tuple[List[ResearchTask], List[ResearchTask]]:
"""
下载、处理 PubMedQA 数据集,并将其分割为训练集和验证集。
"""
print("正在下载并准备 PubMedQA 数据集...")
# 加载 PubMedQA 数据集中带标签的 'pqa_l' 子集。
dataset = load_dataset("pubmed_qa", "pqa_l", trust_remote_code=True)
# 将训练集分割转换为 pandas DataFrame,以便于操作。
df = dataset['train'].to_pandas()
# 此列表将用于存放结构化的 ResearchTask 对象。
research_tasks = []
# 遍历 DataFrame 的每一行以创建任务。
for _, row in df.iterrows():
# 'CONTEXTS' 字段是一个字符串列表,我们将其合并为一个文本块。
context_str = " ".join(row['CONTEXTS'])
# 使用清理和结构化的数据创建一个 ResearchTask 字典。
task = ResearchTask(
id=str(row['PUBMED_ID']),
goal=row['QUESTION'],
context=context_str,
expected_decision=row['final_decision']
)
research_tasks.append(task)
# 执行简单的 80/20 分割,得到训练集和验证集。
train_size = int(0.8 * len(research_tasks))
train_set = research_tasks[:train_size]
val_set = research_tasks[train_size:]
print(f"数据集下载并处理完成。总样本数: {len(research_tasks)}")
print(f"训练集大小: {len(train_set)} | 验证集大小: {len(val_set)}")
return train_set, val_set
# 执行该函数。
train_dataset, val_dataset = load_and_prepare_dataset()
load_and_prepare_dataset 函数构成了数据引入流水线,它自动完成从 Hugging Face 下载原始数据并转换为 ResearchTask 列表的过程。
80/20 分割是标准做法,它为我们提供了一个较大的训练集 (
train_set) 和一个用于评估泛化能力的、未见过的验证集 (val_set)。
数据加载后,最好通过目测样本来验证解析是否正确,并了解智能体将面临的挑战。我们可以编写一个小的工具函数,以表格形式展示少量样本。
from rich.console import Console
from rich.table import Table
console = Console()
def display_dataset_sample(dataset: List[ResearchTask], sample_size=5):
"""
使用 rich 库以格式化表格的形式展示数据集的样本。
"""
# 使用 'rich' 库创建一个表格,以提高可读性。
table = Table(title="PubMedQA 研究目标数据集 (样本)")
table.add_column("ID", style="cyan")
table.add_column("研究目标 (问题)", style="magenta")
table.add_column("预期决策", style="green")
# 用数据集的前几项填充表格。
for item in dataset[:sample_size]:
table.add_row(item['id'], item['goal'], item['expected_decision'])
console.print(table)
display_dataset_sample(train_dataset)
display_dataset_sample 函数是我们的健康检查工具。使用 rich 生成格式化表格,可以快速清晰地核对数据结构,比直接打印字典更有效。我们可以确认 ID、goal、expected_decision 等字段均已正确提取。
查看输出示例:
#### 输出 ####
正在下载并准备 PubMedQA 数据集...
数据集下载并处理完成。总样本数: 1000
训练集大小: 800 | 验证集大小: 200
--- 样本 0 ---
ID: 11843333
目标: 儿童溃疡性结肠炎的所有病例都需要结肠切除术吗?
预期决策: 是
上下文 (前200字符): 对135名儿童溃疡性结肠炎患者进行了回顾性研究,以确定...
至此,我们已经将原始的 PubMedQA 数据转换为干净的 ResearchTask 列表,并完成了训练/验证集的划分。每一行都是一个可以输入智能体 rollout 的完整研究任务。
Research Goal 将作为初始提示,而 Expected Decision 将作为计算最终奖励的真实基准。我们的智能体现在拥有了一个世界级的、真实的知识库来进行学习。
定义分层 AgentState
数据准备就绪后,我们需要设计智能体社会的“神经系统”——共享内存或状态。在 LangGraph 中,这个共享内存由一个中心状态对象管理。
对于如此复杂的系统,简单的字典过于脆弱。我们将使用 Python 的
TypedDict来架构一个嵌套的、分层的AgentState。

AgentState (由 Fareed Khan 创建)
这种方式为智能体的整个认知过程提供了机器可读的蓝图。状态中的每个字段代表了研究流程的不同阶段:从初级研究员生成的初始假设,到最终经过同行评审的实验方案。
我们将按以下步骤进行:
* 定义子状态:为 JuniorResearch、Protocol、ReviewDecision 等中间产物创建小的 TypedDict。
* 架构主状态:将这些子状态组合到主 AgentState 中,用于单次研究运行的完整信息。
* 启用 ReAct 逻辑:添加 sender 字段,这是确保工具调用结果能正确路由回相应智能体的关键。
首先定义初级研究员的输出结构,确保每个生成的假设都保持一致和规范。
from typing import List, TypedDict, Literal
from langchain_core.messages import BaseMessage
# 这定义了来自初级研究员的单个假设的结构。
# 它捕捉了核心观点、发现的证据以及提出该假设的智能体。
class JuniorResearch(TypedDict):
hypothesis: str
supporting_papers: List[str]
agent_name: str # 用于追踪是哪位初级研究员提出的
我们为“假设提交”建立了蓝图。JuniorResearch 强制要求包含 hypothesis 字符串、supporting_papers 列表和 agent_name。这对于监督员智能体至关重要,它能确保收到结构一致、来源清晰的提案以供评估。
接下来定义实验方案的结构。它是高级研究员的主要产出,需要详细且可执行。
# 这定义了最终实验方案的结构。
# 它是一个详细、可执行的计划。
class Protocol(TypedDict):
title: str
steps: List[str]
safety_concerns: str
budget_usd: float
Protocol 明确了实验方案的关键要素:title、steps、safety_concerns、budget_usd。这要求高级研究员从实践细节出发进行思考。
这种结构化输出远比纯文本有价值,也是我们最终奖励计算的基础。
接下来为评审委员会(Review Board)的反馈创建结构。这部分对于改写循环至关重要,需要确保反馈清晰且机器可读。
# 定义来自评审智能体的结构化反馈。
# 它强制要求一个明确的决定、一个严重性级别以及建设性的反馈文本。
class ReviewDecision(TypedDict):
decision: Literal['APPROVE', 'REVISE']
critique_severity: Literal['CRITICAL', 'MAJOR', 'MINOR']
feedback: str
ReviewDecision 类捕获了评审的详细输出。其中 Literal 类型的使用是一个关键的工程实践:
1. 强制离散选择:决策只能是 APPROVE(通过)或 REVISE(修订)。
2. 强制标注严重程度:问题严重性被限定为 CRITICAL(关键)、MAJOR(主要)或 MINOR(次要)。
这种结构化的反馈使得我们的 LangGraph 路由节点能够清晰地决定下一步流程:是需要大范围重写,还是仅需小幅度微调。
最后,我们组合出主状态类 AgentState。
from typing import Annotated
# 这是将在 LangGraph 中所有节点间传递的主状态字典。
class AgentState(TypedDict):
# ‘messages’ 字段累积对话历史。
# ‘lambda x, y: x + y’ 告诉 LangGraph 如何合并此字段:通过追加新消息。
messages: Annotated[List[BaseMessage], lambda x, y: x + y]
research_goal: str # 来自数据集的初始高层目标。
sender: str # ReAct 循环的关键:追踪最后行动的智能体,以便将工具执行结果返回给它。
turn_count: int # 计数器,用于防止图中的无限循环。
# 初级研究员团队的输出(从并行运行中累积)
initial_hypotheses: List[JuniorResearch]
# 主管的选择
selected_hypothesis: JuniorResearch
supervisor_justification: str
# 高级研究员团队的输出
refined_hypothesis: str
experimental_protocol: Protocol
# 评审委员会的输出
peer_review: ReviewDecision
safety_review: ReviewDecision
# 首席研究员的最终决定
final_protocol: Protocol
final_decision: Literal['GO', 'NO-GO']
final_rationale: str
# 来自奖励函数的最终评估分数
final_evaluation: dict
至此,我们定义了智能体社会的完整认知架构。
信息流清晰明了:先生成 initial_hypotheses,选择 selected_hypothesis,细化为 experimental_protocol,经过 peer_review 与 safety_review 后,最终得出 final_decision。
其中 sender 字段尤为重要。
在 ReAct(推理-行动)循环中,智能体决定使用工具。工具执行后,系统需要知道结果应返回给哪个智能体。
每次智能体行动时更新 sender 字段,就如同留下一个回邮地址,这使得复杂的往返式推理成为可能。有了这个状态结构,我们的图拥有了坚实的内存基础。
构建科学工具系统
我们的智能体现在拥有了复杂的记忆(AgentState),但要进行研究,它们必须能够访问外部世界(即外部知识库)。
没有工具的智能体仅仅是聊天者,而拥有工具的智能体才能主动获取实时、特定领域的信息。

科学工具系统示意图 (Created by Fareed Khan)
在本节中,我们将为智能体社会构建 ScientificToolkit,提供一组可调用的专用函数,以执行关键的研究任务。
我们将完成以下工作:
* 集成实时网络搜索:使用 TavilySearchResults 赋予智能体搜索 PubMed 和 ClinicalTrials.gov 最新文献的能力。
* 模拟内部数据库:创建蛋白质和基因本体的模拟数据库,以模拟查询内部私有知识库的过程。
* 应用 @tool 装饰器:通过 LangChain 的 @tool 装饰器,使这些 Python 函数能够被 LLM 智能体发现和调用。
* 测试工具:快速调用一个新工具以验证连接是否正确。
首先,定义一个统一存放工具的类。使用类进行分组便于组织和管理状态(例如管理 API 客户端)。
from langchain_core.tools import tool
from langchain_community.tools.tavily_search import TavilySearchResults
class ScientificToolkit:
def __init__(self):
# 初始化 Tavily 搜索客户端,配置为返回前 5 条结果。
self.tavily = TavilySearchResults(max_results=5)
# 这是一个模拟数据库,用于模拟内部蛋白质信息资源。
self.mock_protein_db = {
"amyloid-beta": "A key protein involved in the formation of amyloid plaques in Alzheimer's.",
"tau": "A protein that forms neurofibrillary tangles inside neurons in Alzheimer's.",
"apoe4": "A genetic risk factor for Alzheimer's disease, affecting lipid metabolism in the brain.",
"trem2": "A receptor on microglia that, when mutated, increases Alzheimer's risk.",
"glp-1": "Glucagon-like peptide-1, a hormone involved in insulin regulation with potential neuroprotective effects."
}
# 这是第二个模拟数据库,用于基因功能信息。
self.mock_go_db = {
"apoe4": "A major genetic risk factor for Alzheimer's disease, involved in lipid transport and amyloid-beta clearance.",
"trem2": "Associated with microglial function, immune response, and phagocytosis of amyloid-beta."
}
我们为 ScientificToolkit 设置了基础:
1. 初始化了实时网络搜索工具(Tavily)。
2. 构建了两个简单的字典(mock_protein_db、mock_go_db)来模拟内部数据库。
3. 这种混合实时与模拟工具的方式贴近企业环境:智能体需要同时访问公共和私有数据源。
接下来,定义具体的工具方法。每个方法都代表我们希望赋予智能体的一项能力。让我们从 PubMed 搜索开始。
@tool
def pubmed_search(self, query: str) -> str:
"""Searches PubMed for biomedical literature. Use highly specific keywords related to genes, proteins, and disease mechanisms."""
console.print(f"--- TOOL: PubMed Search, Query: {query} ---")
# 我们在查询前加上 'site:pubmed.ncbi.nlm.nih.gov' 以将搜索限制在 PubMed 范围内。
return self.tavily.invoke(f"site:pubmed.ncbi.nlm.nih.gov {query}")
这是第一个工具 pubmed_search。LangChain 的 @tool 装饰器会将这个 Python 函数转换为 LLM 可以理解和调用的结构化工具。
我们再为临床试验搜索创建一个类似的工具。
@tool
def clinical_trials_search(self, query: str) -> str:
"""Searches for information on clinical trials related to specific drugs or therapies."""
console.print(f"--- TOOL: Clinical Trials Search, Query: {query} ---")
# 此工具专注于 ClinicalTrials.gov,用于查找正在进行或已完成的研究信息。
return self.tavily.invoke(f"site:clinicaltrials.gov {query}")
clinical_trials_search 是一个专注于 ClinicalTrials.gov 的工具,有助于获取与药物研发和治疗相关的各类信息。
接下来,我们实现与模拟内部数据库交互的工具。
@tool
def protein_database_lookup(self, protein_name: str) -> str:
"""在模拟数据库中查询特定蛋白质的信息。"""
console.print(f"--- TOOL: Protein DB Lookup, Protein: {protein_name} ---")
# 此处模拟在专有内部蛋白质信息数据库中进行快速查询。
return self.mock_protein_db.get(protein_name.lower(), "Protein not found.")
@tool
def gene_ontology_lookup(self, gene_symbol: str) -> str:
"""在基因本体数据库中查询特定基因符号的功能和相关通路。"""
console.print(f"--- TOOL: Gene Ontology Lookup, Gene: {gene_symbol.upper()} ---")
# 此处模拟查询另一个专用的内部数据库,用于获取基因功能信息。
result = self.mock_go_db.get(gene_symbol.lower(), f"Gene '{gene_symbol}' not found in ontology database.")
console.print(f"Gene '{gene_symbol.upper()}' lookup result: {result}")
return result
这两个函数展示了如何集成内部或私有数据源。虽然这里使用简单的字典进行模拟,但在实际生产系统中,它们可以连接到 SQL 数据库、私有 API 或专用的生物信息学库(例如医院的私有数据库)。
最后,我们实例化工具包,并将所有工具函数整理成一个列表,以便传递给智能体执行器。
# 实例化我们的工具包类。
toolkit = ScientificToolkit()
# 创建一个列表,包含所有已定义的工具函数。
all_tools = [
toolkit.pubmed_search,
toolkit.clinical_trials_search,
toolkit.protein_database_lookup,
toolkit.gene_ontology_lookup
]
print("Scientific Toolkit with live data tools defined successfully.")
# 测试新的 gene_ontology_lookup 工具以确认其正常工作。
toolkit.gene_ontology_lookup.invoke("APOE4")
运行后输出如下:
#### OUTPUT ####
Scientific Toolkit with live data tools defined successfully.
--- TOOL: Gene Ontology Lookup, Gene: APOE4 ---
Gene 'APOE4' lookup result: A major genetic risk factor for Alzheimers disease, involved in lipid transport and amyloid-beta clearance.
输出显示 ScientificToolkit 已成功实例化,且 gene_ontology_lookup 工具工作正常。
all_tools 列表构成了一个完整、可移植的能力集合,可以绑定到任意智能体。通过主动从多个来源集成信息,我们将智能体从单纯的推理者转变为积极的研究者。
设计我们的科学家社群(LangGraph)
至此,我们已经具备了安全环境、数据集、分层的 AgentState 以及强大的 ScientificToolkit,可以开始构建智能体本体了。
接下来,我们将从数据结构转向工程化“认知实体”,即多智能体系统的核心组件。

Sub-agents system (Created by Fareed Khan)
在本节中,我们将使用 LangGraph 来设计和编排一个多智能体社群。
为了贴近真实的研究流程,我们将创建一个专家团队,每个角色都有明确的分工,并为其分配合适的开源模型。
我们将完成以下工作:
* 角色与模型分配:定义每位“AI科学家”的角色设定,并根据任务复杂度分配不同的开源模型。
* 创建智能体执行器:构建一个工厂函数,输入模型、提示词和工具,即可生成可运行的智能体执行器。
* 架构状态图:使用 LangGraph 连接智能体,实现高级的 ReAct 逻辑与多层级的改写循环,从而形成一个稳健的循环式工作流。
* 可视化架构:生成最终的工作流图,直观展示智能体社群的认知结构。
构建多智能体科学系统
高级智能体设计的一个核心理念是:任务并非千篇一律。为所有工作都使用单一的大型模型既低效又昂贵。我们将从 Hugging Face Hub 战略性地为不同角色分配不同的开源模型。
这种“为合适的任务匹配合适的模型”的方法是构建生产级、高性价比智能体系统的基石。

Multi-agentic System (Created by Fareed Khan)
首先,我们定义 LLM 配置。为初级研究员的创意发散使用小而快的模型;为高级研究员预留一个将使用 PPO 进行微调的占位模型;为关键的评审环节使用能力更强的专家混合模型。
import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
# 我们将为不同角色使用不同的开源模型,以优化性能和成本。
# 'openai_api_base' 将在训练期间由 LLMProxy 动态设置,指向本地服务器(如 Ollama 或 vLLM),而非 OpenAI 的 API。
junior_researcher_llm = ChatOpenAI(
model="Qwen/Qwen2-1.5B-Instruct", # 一个用于并行头脑风暴的小型快速模型。
temperature=0.7,
openai_api_base="http://localhost:11434/v1", # 假设本地运行着 Ollama 服务器。
openai_api_key="ollama"
)
supervisor_llm = ChatOpenAI(
model="Qwen/Qwen2-1.5B-Instruct", # 同一个小型模型足以胜任结构化的选择任务。
temperature=0.0,
openai_api_base="http://localhost:11434/v1",
openai_api_key="ollama"
)
# 这是一个特殊的占位符。在训练期间,VERL 算法将通过 Agent-Lightning 的 LLMProxy 服务,
# 在此逻辑名称下提供 Llama-3 模型。
senior_researcher_llm = ChatOpenAI(
model="senior_researcher_llm", # 一个逻辑名称,初始并非真实的模型端点。
temperature=0.1,
openai_api_base="http://placeholder-will-be-replaced:8000/v1",
openai_api_key="dummy_key"
)
# 对于关键的评审和最终决策阶段,我们使用一个更强大的模型。
review_board_llm = ChatOpenAI(
model="mistralai/Mixtral-8x7B-Instruct-v0.1", # 一个强大的专家混合模型,用于细致的评估。
temperature=0.0,
openai_api_base="http://localhost:11434/v1",
openai_api_key="ollama"
)
print("Agent personas and open-source LLM configurations are defined.")
请确保已拉取相应的模型,并通过 ollama/vLLM 等服务使其可用。
至此,我们定义了研究团队的“硬件”配置:
1. 为初级角色分配 Qwen2-1.5B,以实现快速、并行、低成本的创意思考。
2. senior_researcher_llm 明确作为一个逻辑占位符,这是训练阶段的关键概念。Agent-Lightning 将拦截对该模型名称的调用,并将其路由到我们正在进行 PPO 训练的模型,从而实现策略的更新。
3. 评审环节使用更强大的 Mixtral 模型,以确保批判性评估环节具有高水平的审查能力。
接下来,我们需要一种标准化的方法,将模型、系统提示词与工具组合成一个可运行的智能体。为此,我们创建一个工厂函数:
def create_agent_runner(llm, system_prompt, tools):
"""A factory function to create a runnable agent executor."""
# The prompt consists of a system message, and a placeholder for the conversation history.
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
MessagesPlaceholder(variable_name="messages"),
])
# We bind the tools to the LLM, making them available for the agent to call.
return prompt | llm.bind_tools(tools)
create_agent_runner 函数虽小但至关重要。它标准化了智能体的构建流程:接收 system_prompt(定义角色)、llm(推理引擎)与 tools(能力集合),确保了系统的一致性并简化了后续图的构建。
接着,为每个角色定义具体的系统提示词。这些提示词是运行在 LLM “硬件”之上的“软件”,指导着智能体的行为和输出格式。
# This is holding the detailed system prompts for each agent role.
prompts = {
"Geneticist": "You are a geneticist specializing in Alzheimer's. Propose a hypothesis related to genetic factors. Use tools to find supporting evidence. Respond with a JSON object: {'hypothesis': str, 'supporting_papers': List[str]}.",
"Pharmacologist": "You are a pharmacologist. Propose a drug target hypothesis. Use tools to find clinical trial data. Respond with a JSON object: {'hypothesis': str, 'supporting_papers': List[str]}.",
"Neurologist": "You are a clinical neurologist. Propose a systems-level neurobiology hypothesis. Use tools to find papers on brain pathways. Respond with a JSON object: {'hypothesis': str, 'supporting_papers': List[str]}.",
"Supervisor": "You are a research supervisor. Review the hypotheses and select the most promising one. Justify your choice based on novelty, feasibility, and impact. Return a JSON object: {'selected_hypothesis_index': int, 'justification': str}.",
"HypothesisRefiner": "You are a senior scientist. Deepen the selected hypothesis with more literature review, refining it into a specific, testable statement. Return a JSON object: {'refined_hypothesis': str}.",
"ProtocolDesigner": "You are a lab manager. Design a detailed, step-by-step experimental protocol to test the refined hypothesis. Be specific about methods, materials, and controls. Return a JSON object: {'title': str, 'steps': List[str], 'safety_concerns': str, 'budget_usd': float}.",
"PeerReviewer": "You are a critical peer reviewer. Find flaws in the protocol. Be constructive but rigorous. Return a JSON object: {'decision': 'APPROVE'|'REVISE', 'critique_severity': 'CRITICAL'|'MAJOR'|'MINOR', 'feedback': str}.",
"SafetyOfficer": "You are a lab safety officer. Review the protocol for safety, regulatory, and ethical concerns. Be thorough. Return a JSON object: {'decision': 'APPROVE'|'REVISE', 'critique_severity': 'CRITICAL'|'MAJOR'|'MINOR', 'feedback': str}.",
"PrincipalInvestigator": "You are the Principal Investigator. Synthesize the protocol and reviews into a final document. Make the final GO/NO-GO decision and provide a comprehensive rationale. Return a JSON object: {'final_protocol': Protocol, 'final_decision': 'GO'|'NO-GO', 'final_rationale': str}."
}
至此,我们定义好了整个 AI 科学家团队。每个智能体都由 prompt(角色定义)、llm(推理引擎)与 tools(工具集)共同构成。
这些提示词的关键在于要求智能体以指定的 JSON 对象格式进行回应。这种结构化输出是可靠更新分层 AgentState 的基础,确保了工作流能够从一个智能体顺畅地流转到下一个。
集成 ReAct 逻辑的高级 StateGraph
现在,我们将专家团队组装到一个“虚拟实验室”中进行协作,这正是 LangGraph 的用武之地。我们将智能体组装成一个循环工作流,创建 StateGraph,并定义研究团队之间的信息与控制流转逻辑。

ReAct 逻辑简化示意图 (Created by Fareed Khan)
这并非一个简单的线性流水线……
为了贴近真实的研究过程,我们需要实现复杂的逻辑,包括用于迭代修改的反馈回路以及稳健的工具调用机制。
在本节中,我们将完成以下步骤:
* 构建智能体节点:使用工厂函数将智能体运行器封装为 LangGraph 节点,并确保其能正确更新 AgentState。
* 实现 ReAct 风格的工具调用:定义条件边和路由器,确保工具执行的结果能够返回到正确的智能体进行处理。
* 设计多层级的修改循环:基于评审严重程度进行智能路由,支持“小修小改”和“重大修改”两种不同的反馈回路。
* 编译并可视化图:编译完整的 StateGraph 并生成可视化图表,清晰地呈现整个认知架构。
构建智能体节点与路由逻辑
首先,定义一个辅助函数,用于将智能体运行器(agent runner)封装为 LangGraph 的节点函数。该函数负责在节点调用时正确维护 turn_count 与 sender 等关键状态。
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langchain_core.messages import HumanMessage, BaseMessage
import json
MAX_TURNS = 15 # 安全机制,防止图陷入无限循环。
def create_agent_node(agent_name: str, agent_runner):
"""为指定的智能体运行器创建一个 LangGraph 节点函数。"""
def agent_node(state: AgentState) -> dict:
# 打印控制台信息以追踪图的执行路径。
console.print(f"--- Node: {agent_name} (Turn {state['turn_count']}) ---")
# 递增回合计数作为安全措施。
state['turn_count'] += 1
# 使用当前状态调用智能体运行器。
result = agent_runner.invoke(state)
# 特别处理评审智能体(如 PeerReviewer, SafetyOfficer)的结构化 JSON 输出。
if agent_name in ["PeerReviewer", "SafetyOfficer"]:
try:
# 智能体的输出是 AIMessage 的 ‘content’ 字段中的 JSON 字符串。
content = json.loads(result.content)
# 根据运行的评审者,更新 AgentState 中的对应字段。
if agent_name == "PeerReviewer":
state['peer_review'] = content
else:
state['safety_review'] = content # 注意:此处键为 'safety_review',而非 'feedback'。
except (json.JSONDecodeError, TypeError):
# 如果解析失败,记录错误但不使图崩溃。
console.print(f"[bold red]Error parsing JSON from {agent_name}: {result.content}[/bold red]")
# 更新 ‘messages’ 列表,并关键性地设置 ‘sender’ 字段以供 ReAct 路由使用。
return {"messages": [result], "sender": agent_name}
return agent_node
create_agent_node 函数是系统中每个智能体的标准化封装,其核心作用如下:
1. 状态管理与追踪:每次智能体运行时,记录活动日志并递增安全计数器 turn_count。
2. 路由标识:更新 sender 字段,标记“谁刚刚执行了动作”。这是后续实现 ReAct(推理-行动)循环中工具调用后路由返回的关键。
3. 结构化输出处理:对评审类智能体的 JSON 输出进行专门解析,确保 peer_review 或 safety_review 正确写入 AgentState。
接下来,定义驱动 ReAct 循环的条件判断逻辑。该函数检查状态中的最新消息,以决定下一步是执行工具还是结束当前智能体的推理轮次。
def tools_condition(state: AgentState) -> str:
"""条件边函数,用于检查是否存在工具调用以及是否达到回合上限。"""
# 检查状态中的最后一条消息。
last_message = state['messages'][-1]
# 如果消息中没有工具调用,则当前智能体的回合结束。
if not hasattr(last_message, 'tool_calls') or not last_message.tool_calls:
return "end"
# 如果已达到最大回合数,也结束执行以防止循环。
if state['turn_count'] >= MAX_TURNS:
console.print("[bold yellow]Max turns reached. Ending graph.[/bold yellow]")
return "end"
# 否则,表示有待执行的工具。
return "tools"
tools_condition 函数是 ReAct 循环的“守门员”,其逻辑为:
1. 检查工具调用:判断最后一条消息是否包含 tool_calls 属性。若有,则返回 "tools",将工作流路由至工具执行节点。
2. 安全终止:若无工具调用,或回合数 turn_count 已达到预设的 MAX_TURNS 上限,则返回 "end",推动工作流进入下一步。
最后,需要一个在工具执行完毕后将控制权路由回原智能体的函数。这正是之前设置的 sender 字段发挥作用的地方。
def route_after_tools(state: AgentState) -> str:
"""路由器函数,将工作流送回发起工具调用的智能体。"""
# 从状态的 ‘sender’ 字段中获取最后一个执行动作的智能体名称。
sender = state.get("sender")
console.print(f"--- Routing back to: {sender} after tool execution ---")
if not sender:
# 如果由于某种原因未设置 sender,则作为后备方案结束图。
return END
# 返回的字符串必须与图中某个节点的名称匹配。
return sender
route_after_tools 函数实现了 ReAct 模式的后半部分闭环。它从 AgentState 中读取 sender 字段并返回其值。LangGraph 引擎会根据这个返回值,将工具的执行结果直接送回给最初发起调用的智能体节点,使其能够基于工具返回的结果继续进行后续的推理。
接下来是最核心的路由逻辑:基于评审结果的多层级改写循环。
def route_after_review(state: AgentState) -> Literal["PrincipalInvestigator", "HypothesisRefiner", "ProtocolDesigner"]:
"""
一个智能路由器,根据评审反馈的严重程度决定下一步行动。
"""
peer_review = state.get("peer_review", {})
safety_review = state.get("safety_review", {})
# 从两份评审中提取决策和严重级别,并提供安全默认值。
peer_severity = peer_review.get("critique_severity", "MINOR")
safety_severity = safety_review.get("critique_severity", "MINOR")
# 如果迭代次数已达上限,则无论反馈如何,都必须进入最终决策。
if state['turn_count'] >= MAX_TURNS:
console.print("[bold yellow]评审阶段达到最大迭代次数,将转至最终决策。[/bold yellow]")
return "PrincipalInvestigator"
# 如果任一评审的严重级别为“CRITICAL”,则表明核心假设存在根本性缺陷。
# 需要路由回“HypothesisRefiner”进行重大重构。
if peer_severity == 'CRITICAL' or safety_severity == 'CRITICAL':
console.print("--- 评审要求CRITICAL级别修订,路由回假设精炼器。 ---")
state['messages'].append(HumanMessage(content="收到关键性反馈。核心假设需要重新思考。"))
return "HypothesisRefiner"
# 如果任一评审的严重级别为“MAJOR”(且无CRITICAL),则表明协议本身存在重大缺陷。
# 需要路由回“ProtocolDesigner”进行显著修改。
if peer_severity == 'MAJOR' or safety_severity == 'MAJOR':
console.print("--- 评审要求MAJOR级别修订,路由回协议设计器。 ---")
state['messages'].append(HumanMessage(content="收到重大反馈。协议需要进行显著修订。"))
return "ProtocolDesigner"
# 如果只有“MINOR”级别修订或全部通过,则协议基本可靠。
# 可以进入“PrincipalInvestigator”进行最终决策。
console.print("--- 评审完成,路由至最终决策者。 ---")
return "PrincipalInvestigator"
该函数是迭代细化流程的核心组件,其决策逻辑如下:
- 严重缺陷(CRITICAL):当任一评审的
critique_severity为CRITICAL时,路由至HypothesisRefiner。这表明核心假设存在根本性问题,需要重新构思。 - 重大缺陷(MAJOR):当任一评审的
critique_severity为MAJOR(且无CRITICAL)时,路由至ProtocolDesigner。这表明实验协议存在显著缺陷,需要进行重大修改。 - 轻微缺陷或通过(MINOR/APPROVED):当所有评审均为
MINOR或通过时,路由至PrincipalInvestigator。这表明方案基本可靠,可以进入最终评估与决策阶段。
此外,系统还设置了迭代次数上限(MAX_TURNS)作为安全阀,防止无限循环,确保流程最终能够收敛。
这种基于严重程度的多级反馈回路,模拟了真实世界中的项目评审与迭代过程,使智能体能够根据问题深度动态调整其修正策略,是实现“自我进化”的关键机制。
最后,构建并编译完整的 StateGraph。
def build_graph() -> StateGraph:
workflow = StateGraph(AgentState)
# 使用工厂函数实例化所有智能体执行器。
agent_runners = {
"Geneticist": create_agent_runner(junior_researcher_llm, prompts["Geneticist"], all_tools),
"Pharmacologist": create_agent_runner(junior_researcher_llm, prompts["Pharmacologist"], all_tools),
"Neurologist": create_agent_runner(junior_researcher_llm, prompts["Neurologist"], all_tools),
"Supervisor": create_agent_runner(supervisor_llm, prompts["Supervisor"], []),
"HypothesisRefiner": create_agent_runner(senior_researcher_llm, prompts["HypothesisRefiner"], all_tools),
"ProtocolDesigner": create_agent_runner(senior_researcher_llm, prompts["ProtocolDesigner"], all_tools),
"PeerReviewer": create_agent_runner(review_board_llm, prompts["PeerReviewer"], []),
"SafetyOfficer": create_agent_runner(review_board_llm, prompts["SafetyOfficer"], []),
"PrincipalInvestigator": create_agent_runner(review_board_llm, prompts["PrincipalInvestigator"], [])
}
# 将所有智能体节点和单一工具执行节点添加到图中。
for name, runner in agent_runners.items():
workflow.add_node(name, create_agent_node(name, runner))
workflow.add_node("execute_tools", ToolNode(all_tools))
# ---- 使用边定义图的控制流 ----
# 图从并行运行三位初级研究员开始。
workflow.add_edge(START, "Geneticist")
workflow.add_edge(START, "Pharmacologist")
workflow.add_edge(START, "Neurologist")
# 为每个可以使用工具的智能体添加 ReAct 条件边。
for agent_name in ["Geneticist", "Pharmacologist", "Neurologist", "HypothesisRefiner", "ProtocolDesigner"]:
# 智能体运行后,检查是否有工具调用。
workflow.add_conditional_edges(
agent_name,
tools_condition,
{
"tools": "execute_tools", # 如果调用了工具,则转到工具节点。
"end": "Supervisor" if agent_name in ["Geneticist", "Pharmacologist", "Neurologist"] else "ProtocolDesigner" if agent_name == "HypothesisRefiner" else "PeerReviewer" # 如果没有工具,则进入下一个逻辑步骤。
}
)
# 工具执行完毕后,路由回调用它们的智能体。
workflow.add_conditional_edges("execute_tools", route_after_tools)
# 定义研究流程的主要线性流。
workflow.add_edge("Supervisor", "HypothesisRefiner")
workflow.add_edge("PeerReviewer", "SafetyOfficer")
# SafetyOfficer 之后,使用智能评审路由器。
workflow.add_conditional_edges("SafetyOfficer", route_after_review)
# PrincipalInvestigator 是图结束前的最后一步。
workflow.add_edge("PrincipalInvestigator", END)
return workflow
# 构建图并将其编译为可运行对象。
research_graph_builder = build_graph()
research_graph = research_graph_builder.compile()
print("LangGraph StateGraph 已定义并编译完成。")
# 我们还可以可视化编译后的图以查看最终架构。
try:
from IPython.display import Image, display
png_image = research_graph.get_graph().draw_png()
display(Image(png_image))
except Exception as e:
print(f"无法可视化图:{e}。请确保已安装 pygraphviz 和 graphviz。")

build_graph 函数将所有组件(节点、边、路由器)组合成一个完整、可运行的 StateGraph。我们可以清晰地看到:初级研究员的并行启动、ReAct 循环(智能体调用工具并接收结果)、以及评审阶段的多层级反馈回路。
接下来,我们将开始构建智能体系统的训练架构。
LitAgent 与复杂的奖励系统
我们已经使用 LangGraph 设计并组装了智能体社会,但无论静态工作流多么复杂,都无法进行学习或提升。为了实现学习,需要将 LangGraph 编排与训练框架连接起来,这正是 Agent-Lightning 的作用。

本节我们将创建两个关键组件作为桥梁:LitAgent 与奖励函数。它们将静态图转化为可训练的动态系统。
计划如下:
* 封装工作流:创建继承 agl.LitAgent 的 MedicalResearchAgent,在其 rollout 方法中封装整个 LangGraph。
* 启用定点训练:在 rollout 中,仅为特定节点(如高级研究员)动态注入训练中的模型,实现“外科手术式”的策略更新。
* 设计细粒度奖励系统:构建一个多维度的 protocol_evaluator(使用 LLM 作为评判者),从可行性、影响力、严谨性等角度为最终输出打分。
* 构建加权奖励:实现一个函数,将多维分数聚合为单一的加权奖励,以引导强化学习算法进行优化。
创建 MedicalResearchAgent
使系统可训练的第一步是将 LangGraph 工作流封装进 agl.LitAgent。在 Agent-Lightning 生态中,LitAgent 是可训练的基本单元,其核心是定义 rollout 方法:在给定任务上执行一次端到端的运行。

我们创建 MedicalResearchAgent 类,继承 agl.LitAgent。它持有编译后的 LangGraph 和奖励函数。其 rollout 方法将从数据集中获取研究目标,执行完整的图,然后使用奖励函数对最终结果进行评分。
关键工程点是如何处理“训练中的模型”。
图不会使用固定的模型集合。rollout 方法将把由 Agent-Lightning 训练器提供的 LLM 端点动态绑定到我们希望训练的节点(例如高级研究员),从而实现定点、精准的策略微调。
定义 MedicalResearchAgent 类:
import agentlightning as agl
from typing import Any, cast
class MedicalResearchAgent(agl.LitAgent):
def __init__(self, graph, reward_func):
# LitAgent 必须使用已编译的图和奖励函数进行初始化。
super().__init__()
self.graph = graph
self.reward_func = reward_func
def rollout(self, task: ResearchTask, resources: agl.NamedResources, rollout: agl.Rollout) -> None:
# 此方法定义了智能体一次端到端的完整运行。
console.print(f"n[bold green]-- 开始 Rollout {rollout.rollout_id},任务:{task['id']} --[/bold green]")
# 'senior_researcher_llm' 资源是我们的待训练模型,由 VERL 算法通过 LLMProxy 提供。
llm_resource = cast(agl.LLM, resources['senior_researcher_llm'])
# 训练器的追踪器提供了一个 LangChain 回调处理器,这对于 LangSmith 中的深度可观测性至关重要。
langchain_callback_handler = self.trainer.tracer.get_langchain_handler()
# 在此,我们将训练资源中的 LLM 端点动态绑定到我们想要训练的具体智能体执行器上。这是实现定向策略优化的关键。
llm_with_endpoint = senior_researcher_llm.with_config({
"openai_api_base": llm_resource.endpoint,
"openai_api_key": llm_resource.api_key or "dummy-key"
})
# 为此特定 rollout 创建新的智能体执行器,使用更新后的 LLM 绑定。
hypothesis_refiner_agent_trained = create_agent_runner(llm_with_endpoint, prompts["HypothesisRefiner"], all_tools)
protocol_designer_agent_trained = create_agent_runner(llm_with_endpoint, prompts["ProtocolDesigner"], all_tools)
# 获取图的可变副本,以便为此 rollout 临时更新节点。
graph_with_trained_model = self.graph.copy()
# 将 'HypothesisRefiner' 和 'ProtocolDesigner' 节点的函数替换为我们新创建的、可训练的执行器。
graph_with_trained_model.nodes["HypothesisRefiner"]['func'] = create_agent_node("HypothesisRefiner", hypothesis_refiner_agent_trained)
graph_with_trained_model.nodes["ProtocolDesigner"]['func'] = create_agent_node("ProtocolDesigner", protocol_designer_agent_trained)
# 将修改后的图编译成此特定 rollout 的可运行对象。
runnable_graph = graph_with_trained_model.compile()
# 准备图执行的初始状态。
initial_state = {"research_goal": task['goal'], "messages": [HumanMessage(content=task['goal'])], "turn_count": 0, "initial_hypotheses": []}
# 配置运行以使用我们的 LangSmith 回调处理器。
config = {"callbacks": [langchain_callback_handler]} if langchain_callback_handler else {}
try:
# 从头到尾执行完整的 LangGraph 工作流。
final_state = runnable_graph.invoke(initial_state, config=config)
# 从图的最终状态中提取最终协议。
final_protocol = final_state.get('final_protocol')
# 如果成功生成了协议,我们计算其奖励。
if final_protocol:
console.print("--- 智能体生成的最终协议 ---")
console.print(final_protocol)
# 调用我们的多维度奖励函数以获取分数字典。
reward_scores = self.reward_func(final_protocol, task['context'])
# 将分数转换为单个加权奖励值。
final_reward = get_weighted_reward(reward_scores)
else:
# 为失败或不完整的 rollout 分配 0.0 的奖励。
final_reward = 0.0
# 发出最终奖励。Agent-Lightning 捕获此值并将其用于 RL 更新步骤。
agl.emit_reward(final_reward)
console.print(f"[bold green]-- Rollout {rollout.rollout_id} 完成,最终奖励:{final_reward:.2f} --[/bold green]")
# 该方法返回 None,因为结果(奖励和追踪)通过 agl.emit_* 调用发出。
return None
MedicalResearchAgent 是核心的可训练单元,它将 LangGraph 的复杂多步逻辑与 Agent-Lightning 的训练循环连接起来:
- 动态绑定:将
senior_researcher_llm资源动态绑定到目标节点。注意,我们并不修改原始图。 - 临时副本:每次
rollout都会创建图的一个临时副本,仅将 Senior Researcher 节点指向正在训练的模型。
因此,PPO 更新只会影响 Senior Researchers 的策略(即如何细化假设和设计方案),而其他智能体(如 Junior Researchers、Review Board 等)仍使用稳定的预定义模型。这就在复杂的异构多智能体系统中实现了精准、高效的定点训练。
多维奖励系统
强化学习的效果高度依赖于奖励信号。对于科学研究这类精细任务,简单的二元奖励(如成功=1/失败=0)是远远不够的。
它无法教会智能体区分“勉强合格”和“卓越”的方案。

为了提供信息量丰富的学习信号,我们设计了一个多维奖励系统。其核心是构建一个 protocol_evaluator,它扮演 LLM-as-a-Judge 的角色。
这个“法官”是一个强大的模型,它从多个维度评估智能体生成的最终方案,并返回一个结构化的分数字典。
具体实现计划如下:
- 定义评估指标:使用 Pydantic 模型
EvaluationOutput定义评分维度,例如新颖性、可行性、影响力、清晰度,以及关键的groundedness(相对于上下文的扎实程度)。 - 构建评估器函数:实现
protocol_evaluator函数,使用详细的提示词驱动法官 LLM,并解析其结构化的响应。 - 构建加权奖励:定义
get_weighted_reward函数,将分数字典按照预设权重聚合成一个单一的浮点数奖励,可以对重点维度(如影响力)赋予更高权重。
首先,定义评估的 Pydantic 模式,作为奖励系统的正式评分标准:
from langchain_core.pydantic_v1 import BaseModel, Field
class EvaluationOutput(BaseModel):
novelty: float = Field(description="对超越给定背景的原创性和创新性的评分(0-1分)。")
feasibility: float = Field(description="在标准实验室资源下,方案实用性的评分(0-1分)。")
impact: float = Field(description="潜在科学或临床意义的评分(0-1分)。")
clarity: float = Field(description="方案是否具体、可衡量、可复现的评分(0-1分)。")
groundedness: float = Field(description="方案与所提供科学背景的一致性和支持程度的评分(0-1分。对任何缺乏背景支持的论断进行扣分)。")
efficiency: float = Field(description="所提方案的成本效益和时间效率评分(0-1分)。")
EvaluationOutput 通过清晰的字段为评估大语言模型提供了明确的指引。其中,groundedness 字段的引入旨在引导 PPO 智能体避免产生幻觉或脱离文献支持的断言,而 efficiency 字段则强化了对方案实用性的考量。
接下来,构建协议评估器 protocol_evaluator:
def protocol_evaluator(protocol: Protocol, context: str) -> dict:
"""
充当“LLM-as-a-Judge”,根据多项标准对协议进行评分。
"""
console.print("--- 运行协议评估器(奖励函数) ---")
# 构建详细的提示词,要求大语言模型扮演专家评审团。
evaluator_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个由资深科学家组成的专家评审团。请根据以下标准,对给出的实验方案进行评分(0.0 到 1.0 分)。请严格评判,并简要说明评分理由。"),
# 同时提供原始科学背景和智能体生成的协议。
("human", f"科学背景:nn{context}nn---nn待评估协议:nn{json.dumps(protocol, indent=2)}")
])
# 使用强大的 `review_board_llm`,并指示其按照 `EvaluationOutput` 模式格式化输出。
evaluator_llm = review_board_llm.with_structured_output(EvaluationOutput)
try:
# 调用评估链。
evaluation = evaluator_llm.invoke(evaluator_prompt.format_messages())
# 输出是一个 Pydantic 对象,可轻松转换为字典。
scores = evaluation.dict()
console.print(f"生成评分:{scores}")
return scores
except Exception as e:
# 如果大语言模型评估失败,则返回默认低分以惩罚此次失败。
console.print(f"[bold red]协议评估出错:{e}。返回零分。[/bold red]")
return {"novelty": 0.1, "feasibility": 0.1, "impact": 0.1, "clarity": 0.1, "groundedness": 0.1, "efficiency": 0.1}
protocol_evaluator 是自动化的质量保障模块:
1. 接收智能体的最终 protocol 和原始数据集 context。
2. 将其提交给强大的 review_board_llm,要求其作为专家评审团根据 EvaluationOutput 标准打分。
3. 通过 try...except 确保鲁棒性:即使评估大语言模型失败,训练循环也不会崩溃,而是返回低分,从而正确惩罚失败的 rollout。
最后,通过加权平均为强化学习算法提供一个单一的浮点数奖励:
def get_weighted_reward(scores: dict) -> float:
"""
根据各项指标评分的字典,计算单一的加权奖励分数。
"""
# 权重设置允许我们优先考虑“优秀”协议的某些方面。
# 此处,我们认为“影响力”是最重要的因素,“效率”是锦上添花。
weights = {
"novelty": 0.1,
"feasibility": 0.2,
"impact": 0.3,
"clarity": 0.15,
"groundedness": 0.2,
"efficiency": 0.05
}
# 计算加权总分。如果输入字典中缺少某项评分,则默认为 0。
weighted_sum = sum(scores.get(key, 0) * weight for key, weight in weights.items())
return weighted_sum
get_weighted_reward 为强化学习提供最终的奖励信号。通过权重设置(例如,impact 权重最高为 0.3),可以明确指导优化的方向。
以下是对整个奖励系统的测试:
print("多维度加权奖励函数已定义。")
# 使用示例协议测试完整的奖励流程。
test_protocol = {"title": "测试协议", "steps": ["1. 执行此操作。", "2. 执行彼操作。"], "safety_concerns": "小心处理。", "budget_usd": 50000.0}
test_context = "近期研究表明,肠道微生物群与阿尔茨海默病的神经炎症存在关联。"
test_scores = protocol_evaluator(test_protocol, test_context)
final_test_reward = get_weighted_reward(test_scores)
print(f"加权最终奖励:{final_test_reward:.2f}")
#### 输出 ####
多维度加权奖励函数已定义。
--- 运行协议评估器(奖励函数) ---
生成评分:{'novelty': 0.8, 'feasibility': 0.7, 'impact': 0.9, 'clarity': 0.85, 'groundedness': 0.95, 'efficiency': 0.9}
加权最终奖励:0.84
测试结果表明,评估与加权流程工作正常。至此,我们获得了指导训练的奖励信号。
创建基于强化学习的训练架构
我们已经使用 LangGraph 设计了智能体社会,并构建了奖励系统。下一步是构建工业级的基础设施,以高效、可扩展地训练这些智能体。这正是 Agent-Lightning 的用武之地。

智能体训练架构(由 Fareed Khan 创建)
对于复杂的多智能体系统(涉及大量大语言模型调用),简单的单进程训练循环已无法满足需求。
我们需要一个分布式架构,能够并行运行多个智能体“rollouts”,同时由中心训练算法进行统一管理。
在本节中,我们将配置 Agent-Lightning 训练基础设施的核心组件。
构建分布式神经系统
为了高效训练,我们需要快速采集大量经验。如果串行执行单个智能体的轨迹采样,将成为严重的性能瓶颈。因此,我们配置 Trainer 使用 ClientServerExecutionStrategy。
该策略构建了一个分布式训练架构。主进程负责运行核心训练算法(如 PPO)并启动 LightningStoreServer 来管理数据。

分布式系统架构图 (Created by Fareed Khan)
该策略会派生多个独立的 runner 子进程。每个 runner 作为客户端连接到主服务器,获取任务并并行执行 MedicalResearchAgent 的 rollout。这使得我们能够同时收集海量训练数据,这对于强化学习的效率至关重要。
以下是执行策略的配置示例:
import agentlightning as agl
# 配置系统并行运行 4 个智能体轨迹采样
num_runners = 4
# 此字典定义了 Agent-Lightning Trainer 的执行策略
strategy_config = {
"type": "cs", # 'cs' 是 ClientServerExecutionStrategy 的简写
"n_runners": num_runners, # 要生成的并行工作进程数量
"server_port": 48000 # 指定一个高位端口,以避免与其他服务冲突
}
print(f"ClientServerExecutionStrategy configured for {num_runners} runners.")
至此,我们定义了分布式训练的蓝图。将 strategy_config 传递给 agl.Trainer 后,框架将自动搭建多进程架构(包括进程间通信、数据同步等)。我们只需调整 n_runners 参数即可轻松扩展系统规模,而无需修改核心业务逻辑代码。
使用 LLMProxy 作为多模型枢纽实现可观测性
我们的智能体社会是异构的,使用了多种不同的模型。管理多个模型端点非常复杂,尤其是当其中一个模型是动态变化的训练中服务时。
Agent-Lightning的LLMProxy为此提供了完美的解决方案。

LLM 代理架构图 (Created by Fareed Khan)
它充当所有大语言模型调用的统一网关。我们的 LitAgent 将请求发送到代理,代理根据调用中指定的 model_name 智能地路由到正确的后端服务。
这一设计在训练场景中尤其强大:
1. VERL(PPO)算法能够自动更新代理配置,将对 "senior_researcher_llm" 的请求路由到其自身的 vLLM 推理实例。
2. 同时,对其他模型(如 Qwen2 或 Mixtral)的请求,则会被路由到本地 Ollama 等其他后端。
以下是 LLMProxy 的配置示例:
# 'model_list' 定义了 LLMProxy 的路由规则
llm_proxy_config = {
"port": 48001, # LLMProxy 自身监听的端口
"model_list": [
# 规则 1:用于初级研究员和监督员。
# 任何对此模型名的请求将被转发到运行 Qwen2 的本地 Ollama 服务器。
{
"model_name": "Qwen/Qwen2-1.5B-Instruct",
"litellm_params": {"model": "ollama/qwen2:1.5b"}
},
# 规则 2:用于我们的高级研究员(正在训练的模型)。
# 初始时,它可能指向一个基线模型。在训练过程中,VERL 算法
# 将自动更新此条目,使其指向自己的 vLLM 服务器。
{
"model_name": "senior_researcher_llm",
"litellm_params": {"model": "ollama/llama3"} # 初始后备模型
},
# 规则 3:用于强大的评审委员会。
# 对此模型的请求将被路由到运行 Mixtral 的本地 Ollama 服务器。
{
"model_name": "mistralai/Mixtral-8x7B-Instruct-v0.1",
"litellm_params": {"model": "ollama/mixtral"}
}
]
}
llm_proxy_config 是整个多智能体系统的路由表,其优势在于:
1. 将智能体使用的逻辑模型名(如 "senior_researcher_llm")与物理后端服务解耦。
2. 可以轻松替换后端、进行 A/B 测试或动态更新训练中模型的端点,而无需修改智能体的核心代码。
3. LLMProxy 为整个系统提供了集中的控制点和可观测性。
构建数据管道 HierarchicalTraceAdapter
我们的分层训练策略面临独特的数据处理挑战:每次 rollout 只产生一个复杂的 LangGraph 轨迹,但我们需要为三种不同的训练算法提供不同格式的数据:

强化学习算法实现示意图 (Created by Fareed Khan)
- SFT 算法:需要来自初级研究员的对话数据(消息列表)。
- PPO 算法:需要高级研究员的强化学习三元组(状态、动作、奖励)。
- Contextual Bandit 算法:需要监督员决策的单个(上下文、动作、奖励)元组。
为此,我们构建了一个复杂的轨迹适配器。在 Agent-Lightning 中,适配器是一个将原始轨迹(LangSmith spans 列表)转换为特定算法所需格式的类。
HierarchicalTraceAdapter将是一个“多头”数据处理器,能够从单一轨迹中提取并生成三种数据格式。
我们创建一个继承自 agl.TracerTraceToTriplet 的新类,并为其添加针对各自目标数据格式的处理方法。这展示了 Agent-Lightning 数据管道的强大灵活性。
定义 HierarchicalTraceAdapter:
from agentlightning.adapter import TraceToMessages
class HierarchicalTraceAdapter(agl.TracerTraceToTriplet):
def __init__(self, *args, **kwargs):
# 初始化父类以支持PPO三元组生成
super().__init__(*args, **kwargs)
# 同时创建一个标准适配器实例,用于生成SFT消息
self.message_adapter = TraceToMessages()
def adapt_for_sft(self, source: List[agl.Span]) -> List[dict]:
"""为监督微调(SFT)适配追踪数据:过滤初级研究员节点并转换为消息格式。"""
# 定义初级研究员智能体对应的节点名称
junior_agent_names = ["Geneticist", "Pharmacologist", "Neurologist"]
# 过滤原始追踪数据,仅保留这些智能体生成的span
# LangSmith会在span属性中为LangGraph节点添加'name'字段
junior_spans = [s for s in source if s.attributes.get('name') in junior_agent_names]
console.print(f"[bold yellow]Adapter (SFT):[/] 从 {len(source)} 个span中过滤出 {len(junior_spans)} 个初级研究员span。")
if not junior_spans:
return []
# 使用标准消息适配器将过滤后的span转换为对话数据集
return self.message_adapter.adapt(junior_spans)
def adapt_for_ppo(self, source: List[agl.Span]) -> List[agl.Triplet]:
"""为近端策略优化(PPO)适配追踪数据:过滤高级研究员节点并转换为三元组。"""
# 定义高级研究员智能体对应的节点名称
senior_agent_names = ["HypothesisRefiner", "ProtocolDesigner"]
# 配置父类的过滤器,使其仅匹配这些智能体名称
self.agent_match = '|'.join(senior_agent_names)
# 调用父类的adapt方法,它将自动过滤并处理相关span
ppo_triplets = super().adapt(source)
console.print(f"[bold yellow]Adapter (PPO):[/] 从 {len(source)} 个span中过滤并适配出 {len(ppo_triplets)} 个高级研究员三元组。")
return ppo_triplets
def adapt_for_bandit(self, source: List[agl.Span]) -> List[tuple[list[str], int, float]]:
"""为上下文赌博机算法适配一个完整的rollout追踪。"""
# 首先,查找整个rollout的最终奖励
final_reward = agl.find_final_reward(source)
if final_reward is None:
return []
# 其次,找到Supervisor智能体做出决策的具体span
supervisor_span = next((s for s in source if s.attributes.get('name') == 'Supervisor'), None)
if not supervisor_span:
return []
# 然后,重构“上下文”——即监督者需要从中选择的假设列表
junior_spans = [s for s in source if s.attributes.get('name') in ["Geneticist", "Pharmacologist", "Neurologist"]]
contexts = []
# 按开始时间排序以确保假设顺序正确
for span in sorted(junior_spans, key=lambda s: s.start_time):
try:
# 在LangGraph中,智能体的最终JSON输出位于状态的'messages'属性中
output_message = span.attributes.get('output.messages')
if output_message and isinstance(output_message, list):
# 实际内容位于AIMessage的content字段中的JSON字符串内
content_str = output_message[-1].get('content', '{}')
hypothesis_data = json.loads(content_str)
contexts.append(hypothesis_data.get('hypothesis', ''))
except (json.JSONDecodeError, KeyError, IndexError):
continue
if not contexts:
return []
# 最后,提取“动作”——即监督者选择的假设索引
try:
output_message = supervisor_span.attributes.get('output.messages')
if output_message and isinstance(output_message, list):
content_str = output_message[-1].get('content', '{}')
supervisor_output = json.loads(content_str)
chosen_index = supervisor_output.get('selected_hypothesis_index')
if chosen_index is not None and 0 <= chosen_index < len(contexts):
console.print(f"[bold yellow]Adapter (Bandit):[/] 提取出上下文(假设列表)、动作(索引 {chosen_index})和奖励({final_reward:.2f})。")
# 返回赌博机算法的单个数据点
return [(contexts, chosen_index, final_reward)]
except (json.JSONDecodeError, KeyError, IndexError):
pass
return []
# 实例化我们的自定义适配器
custom_adapter = HierarchicalTraceAdapter()
HierarchicalTraceAdapter 展示了 Agent-Lightning 数据管线的灵活性:一个类即可服务于整个分层训练策略。
adapt_for_sft:过滤出初级研究员(Junior Researchers)的对话,并将其转换为SFT数据集。adapt_for_ppo:配置父类过滤器,使其仅处理高级研究员(Senior Researchers)的追踪片段(spans),并产出PPO所需的三元组(triplets)。adapt_for_bandit:这是最复杂的部分,它解析完整的追踪数据,重构监督者(Supervisor)的决策情景(contexts)、选择动作(action)以及最终奖励(reward)。
这个适配器是整个训练架构的关键枢纽。它使得我们能够在统一的工作流(LangGraph)和统一的数据源(LangSmith 追踪数据)前提下,仍然可以为架构中的不同组件应用专门的训练算法。
使用 WandbLoggingHook 进行实时监控
有效的训练不仅需要运行算法,还需要具备实时可观测性。
我们需要“看见”智能体的学习表现,能够逐个rollout地实时观察其进展。
LangSmith 提供了单次追踪的深度细节,但我们同样需要一个宏观的整体视图。

Monitoring Hook (Created by Fareed Khan)
为此,我们创建自定义的 Hook。Agent-Lightning 的 Hook 机制允许在训练生命周期(例如 on_rollout_start、on_trace_end)注入自定义逻辑。
我们构建了一个 WandbLoggingHook 来监听 on_trace_end 事件。当一个 rollout 完成并生成 trace 后,该 hook 会被触发,提取最终的 reward 值,并将这一关键指标记录到 Weights & Biases 项目中,从而为我们提供实时的学习曲线。
Hook 定义如下:
import wandb
class WandbLoggingHook(agl.Hook):
def __init__(self, project_name: str):
# 在 Hook 创建时初始化一次 W&B 运行。
self.run_initialized = False
if os.environ.get("WANDB_API_KEY"):
try:
wandb.init(project=project_name, resume="allow", id=wandb.util.generate_id())
self.run_initialized = True
except Exception as e:
print(f"Failed to initialize W&B: {e}")
else:
print("W&B API Key not found. Hook will be inactive.")
async def on_trace_end(self, *, rollout: agl.Rollout, tracer: agl.Tracer, **kwargs):
"""
该方法由 Trainer 在每个 rollout 结束时自动调用。
"""
# 如果 W&B 未初始化,则不执行任何操作。
if not self.run_initialized: return
# 使用辅助函数从 trace 的 spans 列表中查找最终的 reward 值。
final_reward_value = agl.find_final_reward(tracer.get_last_trace())
# 如果找到 reward,则将其记录到 W&B。
if final_reward_value is not None:
# 记录 reward 本身以及用于交叉引用的 rollout_id。
wandb.log({"live_reward": final_reward_value, "rollout_id": rollout.rollout_id})
console.print(f"[bold blue]Hook:[/] Logged reward {final_reward_value:.2f} for rollout {rollout.rollout_id} to W&B.")
# 实例化我们的自定义 hook。
custom_hook = WandbLoggingHook(project_name="Chimera-Project-Training")
WandbLoggingHook 充当了训练过程的实时仪表盘。通过实现 on_trace_end,它成为了一个轻量级、事件驱动的监控器,无缝融入了 Agent-Lightning 的生命周期。
其工作机制如下:
1. 初始化与容错:检查并初始化 W&B 运行,稳健地处理未找到 API 密钥或 reward 值的情况。
2. 数据提取:使用 agl.find_final_reward 解析 trace 以获取 reward。
3. 实时上报:将每个 rollout 的 reward 实时上报到 W&B,形成高频学习曲线,帮助我们即时发现性能回退或训练停滞。
实现三种强化学习算法
我们已经搭建好所有基础设施:分布式执行策略、多模型代理、复杂数据适配器以及实时监控钩子。现在,我们来定义训练算法本身。
这是分层训练策略的核心:
我们不采用单一的“大一统”算法,而是定义三种不同的算法……
它们分别针对智能体社会的不同层级。这样可以将最合适的学习范式应用于对应的认知任务,从而构建真正高效、细腻的智能体系统。
本节将实现每一层的完整训练逻辑:
* 层级 1 (SFT):为初级研究员执行监督微调,从 LightningStore 中选取成功的 trace 来教导其生成更优的初始假设。
* 层级 2 (PPO):配置 Agent-Lightning 内置的 VERL 算法,利用我们丰富的奖励信号对高级研究员进行在线强化学习,以提升其协议设计能力。
* 层级 3 (上下文赌博机):实现简洁有效的上下文赌博机算法,用于训练主管的选择策略,教导其挑选最有可能带来高奖励的假设。
* 主循环:最后,在主 fit() 循环中编排这三类算法,执行复杂的多阶段训练管线。
使用 SFT 算法训练初级研究员
第一批训练目标是初级研究员。他们负责创意发散(提出新颖可行的假设),非常适合进行监督微调。
核心思路是:从最终奖励较高的 rollouts 中提取初级研究员的成功对话,构建高质量数据集对基础模型进行微调,让模型学习并模仿成功的创意模式。

SFT Training (Created by Fareed Khan)
我们创建自定义的 Algorithm 类 SFTOnSuccess。它将执行以下操作:
* 从 LightningStore 中查询高奖励的 traces。
* 使用 HierarchicalTraceAdapter 将其转换为对话数据集。
* 在独立进程中,使用高效的 unsloth 库执行微调。
一个关键的工程要点是:训练完成后,通过 vLLM 启动新的模型服务,并动态更新 LLMProxy,将初级智能体的请求路由到新模型。这形成了一个完整的闭环,使得后续的 rollouts 能够立即从改进的模型中受益。
首先,创建若干在独立进程中运行的辅助函数,以避免 GPU 资源冲突:
import asyncio
import multiprocessing
import subprocess
import httpx
import time
from contextlib import contextmanager
from datasets import Dataset as HuggingFaceDataset
from trl import SFTTrainer, SFTConfig
from unsloth import FastLanguageModel
@contextmanager
def serve_vllm_model(model_path: str, port: int):
"""上下文管理器,用于自动启动和关闭 vLLM 推理服务器。"""
console.print(f"[SFT - vLLM] 正在为模型 {model_path} 在端口 {port} 上启动 vLLM 服务器...")
proc = None
try:
# 使用 'agl vllm serve' 命令启动服务器,该命令包装了 vLLM,确保其与工具调用(tool-use)的 tokenization 兼容。
cmd = ["agl", "vllm", "serve", model_path, "--port", str(port), "--gpu-memory-utilization", "0.7", "--enable-auto-tool-choice"]
proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# 健康检查循环,等待服务器就绪。
with httpx.Client() as client:
for _ in range(60): # 60秒超时
try:
if client.get(f"http://localhost:{port}/health").status_code == 200:
console.print(f"[SFT - vLLM] 端口 {port} 上的服务器已就绪。")
yield f"http://localhost:{port}/v1" # 返回服务端点 URL。
return
except httpx.ConnectError:
pass
time.sleep(1)
raise RuntimeError(f"端口 {port} 上的 vLLM 服务器启动失败。")
finally:
# 退出上下文时,确保终止服务器进程。
if proc:
proc.terminate()
proc.wait()
console.print(f"[SFT - vLLM] 端口 {port} 上的服务器已关闭。")
def unsloth_sft_trainer(dataset, base_model, output_dir):
"""在独立进程中运行的实际 SFT 训练函数。"""
console.print(f"[SFT Process] 正在加载基础模型: {base_model}")
# 使用 unsloth 高效加载模型,采用 4-bit 量化和 PEFT 适配器配置。
model, tokenizer = FastLanguageModel.from_pretrained(model_name=base_model, max_seq_length=4096, load_in_4bit=True)
model = FastLanguageModel.get_peft_model(model, r=16, target_modules=["q_proj", "k_proj", "v_proj", "o_proj"], lora_alpha=16, lora_dropout=0, bias="none")
# 配置并运行 TRL 库中的 SFTTrainer。
trainer = SFTTrainer(
model=model,
tokenizer=tokenizer,
train_dataset=dataset,
dataset_text_field="messages", # 指定训练器使用数据集的 'messages' 列。
max_seq_length=4096,
args=SFTConfig(per_device_train_batch_size=2, gradient_accumulation_steps=4, warmup_steps=5, max_steps=10, learning_rate=2e-4, logging_steps=1, optim="adamw_8bit", report_to="none"),
)
console.print("[SFT Process] 开始 SFT 训练...")
trainer.train()
console.print("[SFT Process] SFT 训练完成。正在保存合并后的模型。")
# 以 16-bit 精度保存最终合并后的模型。
model.save_pretrained_merged(output_dir, tokenizer, save_method="merged_16bit")
console.print(f"[SFT Process] 模型已保存至 {output_dir}")
return output_dir
我们定义了两个核心工具函数:
* unsloth_sft_trainer:利用 unsloth 库高效完成模型的 4-bit 量化加载、LoRA 适配器配置与监督微调训练。
* serve_vllm_model:以编程方式启动和关闭 vLLM 推理服务,并进行健康检查,确保服务在训练流程中稳定可用。
15. 实现动态训练:从监督微调到强化学习
15.1 创建 SFTOnSuccess 算法类
SFTOnSuccess 算法实现了对初级研究员(Junior Researchers)的闭环在线监督微调(SFT)。其核心逻辑是:从成功完成且获得高奖励的任务轨迹中,筛选出优质数据,用于微调基础模型,并动态更新系统中的模型服务。
from agentlightning.algorithm import Algorithm
class SFTOnSuccess(Algorithm):
def __init__(self, reward_threshold=0.8, base_model="Qwen/Qwen2-1.5B-Instruct"):
super().__init__()
self.reward_threshold = reward_threshold # 仅学习奖励 >= 0.8 的轨迹
self.base_model = base_model
self.adapter = HierarchicalTraceAdapter() # 使用自定义适配器转换数据格式
async def run(self, train_dataset, val_dataset):
console.print("n[bold magenta]--- 启动初级研究员 SFT 训练 ---[/bold magenta]")
# 获取中央数据存储的句柄
store = self.get_store()
console.print("正在分析现有任务轨迹以收集 SFT 数据...")
# 从存储中查询所有成功完成的任务轨迹
all_rollouts = await store.query_rollouts(status=["succeeded"])
high_reward_traces = []
# 筛选出满足奖励阈值的轨迹
for rollout in all_rollouts:
spans = await store.query_spans(rollout.rollout_id)
final_reward = agl.find_final_reward(spans)
if final_reward and final_reward >= self.reward_threshold:
high_reward_traces.append(spans)
console.print(f"找到 {len(high_reward_traces)} 条高奖励轨迹 (阈值 >= {self.reward_threshold})。")
if high_reward_traces:
# 使用适配器将成功轨迹转换为 SFT 所需的对话格式数据
sft_data = self.adapter.adapt_for_sft(sum(high_reward_traces, []))
sft_dataset = HuggingFaceDataset.from_list([{'messages': m['messages']} for m in sft_data])
console.print(f"已将轨迹转换为 {len(sft_dataset)} 个对话样本用于 SFT。")
# 定义新模型的唯一输出目录
output_dir = f"./models/junior_researcher_sft_v{int(time.time())}"
# 使用‘spawn’多进程上下文以确保 GPU 安全
ctx = multiprocessing.get_context("spawn")
q = ctx.Queue()
# 在独立进程中运行训练
p = ctx.Process(target=lambda: q.put(unsloth_sft_trainer(sft_dataset, self.base_model, output_dir)))
p.start()
p.join() # 等待训练完成
final_output_dir = q.get()
# 获取 LLMProxy 的句柄
llm_proxy = self.get_llm_proxy()
if llm_proxy:
console.print("正在使用新的 SFT 模型更新 LLMProxy...")
new_port = 8002 # 在实际系统中,应动态分配端口
# 使用上下文管理器来部署新模型
with serve_vllm_model(final_output_dir, new_port) as new_endpoint:
# 更新代理的路由表,指向新的模型服务器
await llm_proxy.replace_model(self.base_model, f"openai/{final_output_dir}", api_base=new_endpoint)
console.print(f"LLMProxy 已更新。初级研究员现在将使用 {new_endpoint}。")
console.print("新模型服务器将保持运行 60 秒以供后续任务使用...")
await asyncio.sleep(60) # 为演示暂时保持服务器存活
# 实例化 SFT 算法
sft_algorithm = SFTOnSuccess()
SFTOnSuccess 算法构建了一个完整的初级研究员训练闭环,整合了三个关键环节:
1. 数据科学:通过奖励阈值筛选高质量的任务轨迹(Trace)。
2. 工程训练:在独立进程中使用 unsloth 库进行高效的监督微调。
3. DevOps:通过 vLLM 部署微调后的模型,并动态更新 LLMProxy 的路由配置。
这实现了一个真正的“在线训练”系统:一旦训练完成,整个多智能体社会(multi-agent society)能立即受益于性能提升的模型,无需任何人工干预。
15.2 使用 PPO 优化高级研究员
接下来,我们将训练高级研究员(Senior Researcher)。其核心任务——设计详细的实验方案(protocol)——不仅需要创造性,还涉及大量方法性和序列性决策,这使其非常适合使用强化学习(Reinforcement Learning, RL)进行优化。
我们的目标是让智能体不仅模仿成功的模式,更能主动探索实验方案的设计空间,以最大化一个复杂、多维度的奖励信号。

PPO 算法示意图 (Created by Fareed Khan)
我们利用框架内置的 VERL(一个基于近端策略优化 PPO 的实现)来简化这一过程。开发者无需手动实现 PPO 的复杂逻辑,只需正确配置训练模型、PPO 超参数以及数据采集参数。
其中的一个关键步骤是将自定义的 HierarchicalTraceAdapter 传递给 VERL Trainer。这确保了训练器只会看到来自高级研究员(如 HypothesisRefiner、ProtocolDesigner)的 (状态, 动作, 奖励) 三元组数据,从而实现“外科手术式”的精准训练,避免其他角色的数据干扰策略学习。
定义 VERL 配置:
# 用于 agl.VERL 算法的标准配置字典。
verl_config = {
# 算法特定的超参数。'grpo' 是一种高级优势估计器。
"algorithm": {"adv_estimator": "grpo"},
# 训练批次和序列长度的数据配置。
"data": {"train_batch_size": 4, "max_prompt_length": 4096, "max_response_length": 2048},
# 此模块定义了模型及其训练配置。
"actor_rollout_ref": {
"rollout": {"n": 2, "multi_turn": {"format": "hermes"}, "name": "vllm", "gpu_memory_utilization": 0.6},
"actor": {"ppo_mini_batch_size": 4, "optim": {"lr": 1e-6}},
# 我们将使用 PPO 进行微调的基础模型。
"model": {"path": "meta-llama/Llama-3-8B-Instruct", "enable_gradient_checkpointing": True},
# 参考模型的配置,使用 FSDP 以提高内存效率。
"ref": {"fsdp_config": {"param_offload": True}}
},
# 通用训练器配置,包括日志记录和保存频率。
"trainer": {
"n_gpus_per_node": 1,
"total_epochs": 2,
"logger": ["console", "wandb"], # 同时记录到控制台和 Weights & Biases。
"project_name": "Chimera-Project-Training",
"experiment_name": "PPO-Senior-Researcher",
"total_training_steps": 10, # 用于快速演示运行。在实际运行中,此值会高得多。
"test_freq": 5, # 每 5 步在验证集上评估一次。
"save_freq": 5 # 每 5 步保存一次模型检查点。
}
}
# 使用我们的配置实例化 VERL 算法。
ppo_algorithm = agl.VERL(verl_config)
我们通过一个声明式字典配置了整个 PPO 训练流水线。这份 verl_config 指定了所有必要信息,从 Actor 模型的学习率 (1e-6) 到使用的 GPU 数量。
model.path 设置为 meta-llama/Llama-3-8B-Instruct,表明将使用该基础模型进行 PPO 微调。在 fit 循环中,VERL 算法将自动启动 vLLM 服务,更新 LLMProxy 以将 "senior_researcher_llm" 路由到该服务,并开始在线强化学习训练。
这种配置驱动的方式使我们能够利用强大的 PPO 实现,将精力集中在智能体逻辑设计上,而非强化学习的底层训练细节。
使用 Contextual Bandit 训练 Supervisor 策略
最后是层级顶部的 Supervisor。其角色与其他智能体不同:它不生成创意,也不设计复杂的协议,而是执行关键的“选择”任务:
给定 Junior Researchers 提供的一组假设,选择最有前景的一个。

这属于经典的“多臂老虎机”问题,但带有上下文信息。因此,我们采用 Contextual Bandit 方法。目标是学习一个策略:给定一组假设(上下文),预测哪个选择(动作)更有可能带来高的最终奖励。
我们从零开始实现了一个简洁有效的 Contextual Bandit 算法,它继承自 agl.Algorithm。我们使用 scikit-learn 的 SGDClassifier 作为策略模型。在每次完成的轨迹中:
- 从
LightningStore查询轨迹数据。 - 使用
HierarchicalTraceAdapter提取老虎机数据:候选假设(上下文)、Supervisor 的选择(动作)以及最终奖励。 - 将文本假设向量化为特征。
- 在线更新策略模型:如果奖励高,则强化该选择;如果奖励低,则进行惩罚。
定义 ContextualBanditRL:
from sklearn.linear_model import SGDClassifier
from sklearn.feature_extraction.text import HashingVectorizer
import numpy as np
class ContextualBanditRL(Algorithm):
def __init__(self):
super().__init__()
# 使用 SGDClassifier,设置 loss='log_loss' 以获得概率输出,warm_start=True 支持在线学习。
self.policy = SGDClassifier(loss="log_loss", warm_start=True)
# 使用 HashingVectorizer 将文本上下文高效地转换为数值特征。
self.vectorizer = HashingVectorizer(n_features=2**12)
self.is_fitted = False # 标志位,用于区分首次训练。
self.adapter = HierarchicalTraceAdapter() # 自定义适配器,用于解析追踪数据。
async def run(self, train_dataset, val_dataset):
console.print("n[bold magenta]--- 启动 Supervisor 的上下文赌博机训练 ---[/bold magenta]")
store = self.get_store()
console.print("查询已完成的 rollout 以训练 supervisor 策略...")
# 从数据存储中获取所有成功的 rollout。
completed_rollouts = await store.query_rollouts(status=["succeeded"])
if not completed_rollouts:
console.print("未找到已完成的 rollout。跳过赌博机训练。")
return
training_samples = []
# 处理每个 rollout,提取赌博机训练数据。
for rollout in completed_rollouts:
spans = await store.query_spans(rollout.rollout_id)
# 适配器负责解析追踪数据。
bandit_data = self.adapter.adapt_for_bandit(spans)
training_samples.extend(bandit_data)
if not training_samples:
console.print("在追踪中未找到有效的 supervisor 决策。跳过训练。")
return
console.print(f"正在使用 {len(training_samples)} 个样本训练赌博机策略...")
# 对每个收集到的数据点执行在线更新。
for contexts, chosen_action_index, final_reward in training_samples:
# 将假设字符串列表转换为数值特征矩阵。
X = self.vectorizer.fit_transform(contexts)
# 创建目标标签:被选中的动作为1,其余为0。
y = np.zeros(len(contexts))
y[chosen_action_index] = 1
# 这是奖励逻辑的核心:创建样本权重。
# 被选中的动作权重为最终奖励值。
# 未被选中的动作权重为一个小的负值,与“错失”的奖励成比例。
sample_weight = np.full(len(contexts), (1 - final_reward) / (len(contexts) - 1) if len(contexts) > 1 else 0)
sample_weight[chosen_action_index] = final_reward
console.print(f"[Bandit Training] 上下文(特征): {X.shape}, 动作: {chosen_action_index}, 奖励: {final_reward:.2f}, 样本权重: {sample_weight}")
# 首次拟合后,使用 partial_fit 进行在线学习。
if self.is_fitted:
self.policy.partial_fit(X, y, sample_weight=sample_weight)
else:
self.policy.fit(X, y, sample_weight=sample_weight, classes=np.array([0, 1]))
self.is_fitted = True
console.print("上下文赌博机:Supervisor 策略已更新。")
# 实例化我们的赌博机算法。
bandit_algorithm = ContextualBanditRL()
ContextualBanditRL 实现了第三级训练。其 run 方法编排了 Supervisor 的学习流程:查询 LightningStore、使用适配器将追踪数据解析为 (context, action, reward) 元组,并对策略模型执行在线更新。
sample_weight 是算法的核心:它将最终的 rollout 奖励转化为对所选动作的学习信号。当选择带来高奖励时,权重为正,强化该策略;反之则减弱。这一机制将复杂下游流程的最终成功,反馈到高层策略的学习中。
构建基于三阶段训练的主循环
我们已经定义了三种训练算法:SFTOnSuccess(初级研究员)、VERL(高级研究员,基于 PPO)和 ContextualBanditRL(Supervisor)。最后一步是在多阶段训练管道中对它们进行编排。

训练循环示意图 (Created by Fareed Khan)
这展示了 Agent-Lightning Trainer 的强大与灵活性。我们定义 full_training_pipeline,按顺序执行各个算法,管理从初始数据采集到不同组件的定点微调。
主循环分为四个阶段:
- 阶段 1:初始数据采集。使用未经训练的基线模型运行若干迭代,主要目的是填充
LightningStore,积累多样化的初始追踪数据。 - 阶段 2:初级研究员的监督微调。运行
SFTOnSuccess,读取阶段 1 中高奖励的追踪数据,对初级模型进行微调。 - 阶段 3:高级研究员的近端策略优化。借助优化后的初级模型输出,使用
VERL对高级策略进行在线强化学习。此阶段采集更高质量的数据并更新策略。 - 阶段 4:Supervisor 的上下文赌博机训练。利用之前各阶段积累的丰富数据,训练 Supervisor 的选择策略。
定义 full_training_pipeline:
import agentlightning as agl
def full_training_pipeline():
console.print("[bold red] --- CONFIGURING FULL TRAINING PIPELINE --- [/bold red]")
# --- 共享组件 ---
# 这些组件在所有训练阶段共享。
store = agl.InMemoryLightningStore()
llm_proxy = agl.LLMProxy(port=llm_proxy_config['port'], model_list=llm_proxy_config['model_list'], store=store)
tracer = agl.AgentOpsTracer()
# --- 阶段 1: 使用基线模型进行初始数据收集 ---
console.print("n[bold magenta]--- Phase 1: Initial Data Gathering ---[/bold magenta]")
# 为数据收集阶段实例化一个 Trainer。
gather_trainer = agl.Trainer(
n_runners=num_runners, strategy=strategy_config, store=store, tracer=tracer,
llm_proxy=llm_proxy, hooks=[custom_hook]
)
# 为此阶段创建一个 LitAgent 实例。
research_agent_gather = MedicalResearchAgent(research_graph, lambda p, c: get_weighted_reward(protocol_evaluator(p, c)))
# 使用 .dev() 在少量数据上进行快速初始运行,以填充存储。
gather_trainer.dev(research_agent_gather, train_dataset[:10])
# --- 阶段 2: 对初级研究员进行 SFT ---
# 实例化一个新的 Trainer,这次使用 SFT 算法。
sft_trainer = agl.Trainer(algorithm=sft_algorithm, store=store, llm_proxy=llm_proxy)
# 此算法的 .fit() 调用不需要数据集,因为它直接从存储中读取。
sft_trainer.fit(research_agent_gather)
# --- 阶段 3: 对高级研究员进行 PPO ---
# 现在,创建一个配置为 PPO 算法的 Trainer。
ppo_trainer = agl.Trainer(
algorithm=ppo_algorithm, n_runners=num_runners, strategy=strategy_config,
store=store, tracer=tracer, adapter=custom_adapter, llm_proxy=llm_proxy, hooks=[custom_hook]
)
# 此 LitAgent 实例将用于 PPO 的 rollout。
research_agent_ppo = MedicalResearchAgent(research_graph, lambda p, c: get_weighted_reward(protocol_evaluator(p, c)))
# 使用完整数据集调用 .fit() 以运行主要的 RL 训练循环。
ppo_trainer.fit(research_agent_ppo, train_dataset=train_dataset, val_dataset=val_dataset)
# --- 阶段 4: 对监督者进行上下文老虎机训练 ---
# 最后,为老虎机算法创建一个 Trainer。
bandit_trainer = agl.Trainer(algorithm=bandit_algorithm, store=store)
# 这也从存储中读取,现在存储中也包含了 PPO 阶段的数据。
bandit_trainer.fit(research_agent_gather)
console.print("n[bold red]--- Hierarchical Training Pipeline Complete ---[/bold red]")
# 此代码块将执行我们的主函数。
# 注意:这是一个长时间运行的过程,需要大量 GPU 资源。
# 下面的输出是成功运行的模拟表示。
full_training_pipeline()
运行表现(示意):
###### OUTPUT #######
--- Phase 1: Initial Data Gathering ---
...
--- Node: Geneticist (Turn 1) ---
...
-- Rollout ro-abc123 Finished with Final Reward: 0.78 --
[Hook:] Logged reward 0.78 for rollout ro-abc123 to W&B.
...
Initial data gathering complete.
--- Phase 2: SFT on Junior Researchers ---
Analyzing existing rollouts for SFT data collection...
Found 8 high-reward traces (threshold >= 0.8).
...
[SFT Process] Starting SFT training...
[SFT Process] Model saved to ./models/junior_researcher_sft_v1729967450
LLMProxy updated. Junior researchers will now use http://localhost:8002/v1.
--- Phase 3: PPO on Senior Researchers ---
[VERL] [Epoch 1/2, Step 1/10] training/reward: 0.65, actor/loss: 0.123...
Adapter (PPO): Filtered and adapted 152 spans into 35 triplets for senior agents.
...
--- Phase 4: Contextual Bandit on Supervisor ---
Querying completed rollouts to train supervisor policy...
[Bandit Training] Contexts (features): (3, 4096), Action: 1, Reward: 0.82...
Contextual Bandit: Supervisor policy updated.
--- Hierarchical Training Pipeline Complete ---
输出清晰地展示了四个阶段的推进过程:
1. 首先采集基线数据,然后利用这些数据对初级智能体进行监督微调(阶段 2)。
2. 借助改进的初级智能体输出,对高级智能体进行近端策略优化训练(阶段 3)。
3. 最后,使用累积的数据训练监督者的选择策略(阶段 4)。
完成训练流程后,我们可以与基线方法进行对比,评估其性能。
性能评估与分析
我们已经成功设计并执行了复杂的分层训练流程。
但核心问题是:它有效吗?我们的智能体真的“学到东西”了吗?

Evaluatio Phase (Created by Fareed Khan)
没有评估的训练只是浪费算力。我们需要结合定量和定性方法,对结果进行严格分析。
本节将从训练转向分析。通过自动化指标、定性对比和深入的轨迹追踪,全面展示智能体的改进情况。
评估计划:
* 绘制学习曲线:从 WandbLoggingHook 中提取实时奖励数据,绘制学习曲线,可视化性能随时间提升的趋势。
* 定性对比:对比同一任务下基线模型与经过 PPO 训练后模型生成的方案,直观观察变化。
* 综合评估:在整个验证集上运行最终模型,计算一组指标,包括“LLM 作为评判者”的分数和新的“决策对齐”指标。
* 轨迹分析:使用 LangSmith 的轨迹功能深入分析一次完整的运行,剖析训练完备的多智能体系统的“思维过程”。
使用奖励曲线与性能指标进行验证
在强化学习系统中,奖励是最直观的学习度量。我们的 WandbLoggingHook 已在 PPO 训练阶段为每次 rollout 记录了最终奖励。现在从 W&B 提取数据并绘制曲线(包含平滑的移动平均),以过滤噪声并观察趋势。如果曲线呈上升态势,则表明智能体正在学习。
绘图函数:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
def plot_learning_curve_from_wandb(run_path: str):
"""从指定的 W&B 运行中获取奖励数据并绘制学习曲线。"""
console.print(f"正在从 W&B 运行中绘制学习曲线: {run_path}...")
try:
# 初始化 W&B API
api = wandb.Api()
# 获取指定的运行
run = api.run(run_path)
# 下载记录的历史指标,特别是‘live_reward’和步数
history = run.history(keys=["live_reward", "_step"])
if history.empty:
raise ValueError("未找到指定运行的历史数据。")
console.print(f"成功从 W&B 获取 {len(history)} 个数据点。")
except Exception as e:
# 如果从 W&B 获取失败(例如,API 密钥问题、路径错误),则使用模拟数据进行演示
console.print(f"[bold red]无法获取 W&B 数据。使用模拟数据进行绘图。错误: {e}[/bold red]")
# 创建一个带有噪声的、看起来真实的上升趋势数据
simulated_rewards = np.linspace(0.55, 0.85, num=50) + np.random.normal(0, 0.05, 50)
simulated_rewards = np.clip(simulated_rewards, 0, 1)
history = pd.DataFrame({'live_reward': simulated_rewards, '_step': range(50)})
# 计算奖励的 10 步滚动平均值以平滑曲线
history['smoothed_reward'] = history['live_reward'].rolling(window=10, min_periods=1).mean()
# 创建图表
plt.figure(figsize=(12, 7))
# 绘制平滑后的平均奖励曲线
plt.plot(history['_step'], history['smoothed_reward'], marker='.', linestyle='-', color='blue', label='平滑平均奖励 (10步窗口)')
# 绘制原始的、每次 rollout 的奖励,作为较浅的半透明线以显示方差
plt.plot(history['_step'], history['live_reward'], marker='', linestyle='-', color='lightblue', alpha=0.4, label='每次 Rollout 的原始奖励')
plt.title('智能体性能(奖励)随训练步骤的变化', fontsize=16)
plt.xlabel('训练 Rollout 步骤', fontsize=12)
plt.ylabel('平均奖励', fontsize=12)
plt.legend()
plt.grid(True, which='both', linestyle='--', linewidth=0.5)
plt.ylim(0, 1.05) # 将 y 轴范围设置为 0 到 1.05 以获得更清晰的视图
plt.show()
# 将 ‘your-entity/Chimera-Project-Training/your-run-id’ 替换为实际的 W&B 运行路径
plot_learning_curve_from_wandb("your-entity/Chimera-Project-Training/your-run-id")
该函数是定量验证的核心工具:它直接连接到实验追踪平台(W&B),可视化训练过程中最关键的指标——奖励随时间的变化。
生成的图表通常包含两条曲线:浅蓝色的原始奖励(方差较大,属正常现象)和深蓝色的 10 步滚动平均奖励(揭示了真实的性能趋势)。平均奖励的持续上升是 PPO 算法成功学习的明确证据,表明随着训练的进行,智能体生成的实验方案得分越来越高。
定性分析
定量指标虽然重要,但只讲述了故事的一半。
奖励上升固然好,但这种改进“具体表现为什么样子”?
为了真正理解训练带来的影响,我们需要审视智能体的原始输出——即进行定性分析。
最有效的方式是进行并排对比:针对同一验证集任务,让两个不同版本的“高级研究员”智能体生成实验方案:
- 基线模型:原始的、未经 PPO 训练的
meta-llama/Llama-3-8B-Instruct。 - 微调后模型:经过完整 PPO 训练后的最终策略,代表学习成果。
我们实现一个函数,使用指定的模型运行完整的 LangGraph 工作流来生成实验方案,然后进行对比。
首先,编写一个用于寻找可用网络端口的工具函数:
import socket
def find_free_port():
"""查找并返回本地机器上一个未使用的网络端口。"""
# 创建一个临时套接字
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# 绑定到端口 0 会告诉操作系统分配一个任意的未使用端口
s.bind(('', 0))
# 返回操作系统分配的端口号
return s.getsockname()[1]
然后是核心的对比函数:
```python
from rich.panel import Panel
def generate_protocol_for_comparison(model_path: str, task: ResearchTask) -> str:
"""为给定任务和指定模型生成研究方案。"""
# 寻找一个空闲端口来为本次运行启动模型服务。
port = find_free_port()
# 使用上下文管理器启动 vLLM 服务器,并确保其最终被关闭。
with serve_vllm_model(model_path, port) as endpoint:
# 创建一个指向临时服务器的 LitAgent LLM 资源。
llm_resource = agl.LLM(endpoint=endpoint, model=model_path)
# 需要将 Senior Researcher 节点临时重新绑定到这个特定模型。
# 这与在主 LitAgent 中使用的动态绑定逻辑相同。
llm_with_endpoint = senior_researcher_llm.with_config({"openai_api_base": endpoint, "openai_api_key": "dummy-key"})
hypothesis_refiner_agent = create_agent_runner(llm_with_endpoint, prompts["HypothesisRefiner"], all_tools)
protocol_designer_agent = create_agent_runner(llm_with_endpoint, prompts["ProtocolDesigner"], all_tools)
# 为本次评估运行创建一个临时的图副本。
graph_for_comparison = research_graph.copy()
# 使用指定的模型注入智能体运行器。
graph_for_comparison.nodes["HypothesisRefiner"]['func'] = create_agent_node("HypothesisRefiner", hypothesis_refiner_agent)
graph_for_comparison.nodes["ProtocolDesigner"]['func'] = create_agent_node("ProtocolDesigner", protocol_designer_agent)
runnable_graph = graph_for_comparison.compile()
# 执行完整的工作流。
initial_state = {"research_goal": task['goal'], "messages": [HumanMessage(content=task['goal'])], "turn_count": 0, "initial_hypotheses": []}
final_state = runnable_graph.invoke(initial_state)
# 提取并返回最终方案。
final_protocol = final_state.get('final_protocol', 'Protocol generation failed.')
return json.dumps(final_protocol, indent=2) # 返回格式化的 JSON 字符串。
generate_protocol_for_comparison 函数复用了 MedicalResearchAgent.rollout 中的核心逻辑:临时复制工作流图、动态注入指定的模型,从而在完整的复杂工作流中隔离地评估该模型的表现。
执行对比:
# 原始预训练模型的路径。
base_model_path = "meta-llama/Llama-3-8B-Instruct"
# 最终 PPO 训练模型检查点的保存路径。
# 注意:本演示将使用模拟输出,因为完整训练计算成本高昂。
fine_tuned_model_path = "./models/senior_researcher_ppo_final"
# 从验证集中选取一个样本任务进行公平对比。
sample_eval_task = val_dataset[0]
# 运行基础模型与微调后智能体系统的对比
print(f"正在使用基础模型生成方案:{base_model_path}...")
base_model_protocol = generate_protocol_for_comparison(base_model_path, sample_eval_task)
print(f"正在使用微调模型生成方案:{fine_tuned_model_path}...")
trained_model_protocol = generate_protocol_for_comparison(fine_tuned_model_path, sample_eval_task)
# 使用 'rich' 库将两个方案以清晰、带标题的面板形式显示。
console.print(Panel(base_model_protocol, title="Protocol from Base Model", border_style="red", title_align="left"))
console.print(Panel(trained_model_protocol, title="Protocol from Fine-Tuned Model", border_style="green", title_align="left"))
示例输出:
“`
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Title: Test GLP-1 on Amyloid ┃
┃ Steps: ┃
┃ 1. Get mice. ┃
┃ 2. Inject drug. ┃
┃ 3. Measure amyloid. ┃
┃ Safety: Standard lab procedures. ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
Protocol from Fine-Tuned Model
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Title: Pre-Clinical Protocol to Evaluate the Efficacy of Liraglutide (GLP-1 ┃
┃ Agonist) on Amyloid-Beta Plaque Burden in a 5XFAD Mouse Model of Alzheimers ┃
┃ Disease. ┃
┃ Steps: ┃
┃ 1. Animal Model: Utilize 6-month-old male 5XFAD transgenic mice (n=20 per ┃
┃ group). ┃
┃ 2. Treatment Groups: (a) Vehicle control (saline), (b) Liraglutide (25 ┃
┃ nmol/kg/day via subcutaneous injection). ┃
┃ 3. Dosing Regimen: Administer daily for 8 weeks. ┃
┃ 4. Primary Endpoint Analysis: At 8 weeks, sacrifice animals and perform ┃
┃ immunohistochemistry (IHC) on brain tissue using 6E10 antibody to quantify ┃
┃ amyloid-beta plaque load in the hippocampus and cortex. ┃
┃ Safety: All animal procedures must be approved by the IACUC. Liraglutide is┃
┃ a known hypoglycemic agent; monitor for signs of distress. ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
两者差异显著:
- 基线模型生成的方案简陋,虽提及“老鼠、药物、淀粉样蛋白”等关键词,但缺乏科学严谨性与可执行性。
- 微调后模型生成的方案则展现出真正的科学实验设计深度与细节:明确指定使用
5XFAD转基因小鼠模型、精确的给药剂量与周期、清晰的主终点(如 IHC 及6E10抗体检测),并包含了 IACUC 审批要求与低血糖风险等安全注意事项。
这标志着一次质的飞跃,其核心驱动力在于我们的 PPO 训练框架与多维奖励机制:
智能体不仅是在生成更长的文本,而是真正学会了“高质量科学实验设计”所必需的结构化思维与核心内容要素。
使用多指标进行综合评估
为了以生产级标准验证系统性能,我们需要从个案分析转向在更大规模验证集上进行综合定量评估。
我们将使用最终训练完成的智能体,在整个验证集(200 条未见过的任务)上运行完整的 LangGraph 工作流,并系统性地收集各项指标:
- 构建评估循环:通过异步函数
run_full_evaluation遍历整个val_dataset。 - 执行完整工作流:针对每个任务,生成最终实验方案与 GO/NO-GO 决策。
- 计算多项指标:包括基于 LLM-as-a-Judge 的评分,以及关键的“决策对齐率”(最终决策与 PubMedQA 数据集中
expected_decision是否一致)。
评估函数定义如下:
“`python
from tqdm.notebook import tqdm
from collections import defaultdict
import random
async def run_full_evaluation(dataset: List[ResearchTask]):
“””
在完整验证数据集上运行训练完成的智能体,并计算一系列性能指标。
“””
console.print(f”正在对 {len(dataset)} 个验证样本进行完整评估…”)
# 用于存储各项指标结果的字典。
all_metrics = defaultdict(list)
successful_runs = 0
# 在此评估中,我们将使用我们强大的评审委员会模型。
# 在实际场景中,这里应指向我们最终训练好的 senior_researcher_llm。
final_llm_resource = review_board_llm
# 使用最终的“最佳”模型创建单个 LitAgent 实例。
# 图的复制和绑定方式与比较函数中相同。
llm_with_endpoint = senior_researcher_llm.with_config({
"openai_api_base": final_llm_resource.openai_api_base,
"openai_api_key": final_llm_resource.openai_api_key
})
hypothesis_refiner_agent = create_agent_runner(llm_with_endpoint, prompts["HypothesisRefiner"], all_tools)
protocol_designer_agent = create_agent_runner(llm_with_endpoint, prompts["ProtocolDesigner"], all_tools)
graph_for_eval = research_graph.copy()
graph_for_eval.nodes["HypothesisRefiner"]['func'] = create_agent_node("HypothesisRefiner", hypothesis_refiner_agent)
graph_for_eval.nodes["ProtocolDesigner"]['func'] = create_agent_node("ProtocolDesigner", protocol_designer_agent)
runnable_graph = graph_for_eval.compile()
# 使用进度条遍历验证集中的每个任务。
for task in tqdm(dataset):
try:
# 为当前任务执行完整的图工作流。
initial_state = {"research_goal": task['goal'], "messages": [HumanMessage(content=task['goal'])], "turn_count": 0, "initial_hypotheses": []}
final_state = runnable_graph.invoke(initial_state)
final_protocol = final_state.get('final_protocol')
final_decision = final_state.get('final_decision')
# 仅对成功完成并产生最终方案和决策的运行进行评分。
if final_protocol and final_decision:
successful_runs += 1
# 1. 计算多维度 LLM-as-a-judge 评分。
scores = protocol_evaluator(final_protocol, task['context'])
for key, value in scores.items():
all_metrics[f"LLM-as-Judge: {key.capitalize()}"].append(value)
# 2. 计算加权后的最终奖励值。
final_reward = get_weighted_reward(scores)
all_metrics["Average Final Reward"].append(final_reward)
# 3. 计算决策对齐率。这是一个关键指标。
# 当智能体决策为‘GO’且数据集期望为‘yes’,或决策为‘NO-GO’且期望为‘no’时,视为对齐。
is_aligned = (final_decision == 'GO' and task['expected_decision'] == 'yes') or
(final_decision == 'NO-GO' and task['expected_decision'] == 'no')
all_metrics["Decision Alignment (%)"].append(100.0 if is_aligned else 0.0)
# 4. 记录执行轮数以衡量效率。
all_metrics["Average Turn Count"].append(final_state.get('turn_count', 0))
except Exception as e:
console.print(f"[bold red]任务 {task['id']} 的评估失败: {e}[/bold red]")
console.print(f"评估完成。已处理 {len(dataset)} 个样本。")
# 汇总结果并在最终表格中展示。
results_table = Table(title="Chimera Project: 最终评估结果")
results_table.add_column("Metric", style="cyan")
results_table.add_column("Value", style="magenta")
# 首先添加高层次的执行成功率。
results_table.add_row("Execution Success Rate (%)", f"{(successful_runs / len(dataset)) * 100:.2f}")
# 为每个收集到的指标添加其平均值。
for metric_name, values in sorted(all_metrics.items()):
if values:
results_table.add_row(metric_name, f"{np.mean(values):.2f}")
console.print(results_table)
在我们的验证数据集上运行完整评估。
注意:这是一个耗时过程。下面的输出代表了一次完整运行的结果。
await run_full_evaluation(val_dataset)
OUTPUT
Running full evaluation on 200 validation samples…
Evaluation complete. Processed 200 samples.
Chimera Project: Final Evaluation Results
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Metric ┃ Value ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Execution Success Rate (%) │ 98.50 │
│ Average Final Reward │ 0.81 │
│ Decision Alignment (%) │ 87.82 │
│ Average Turn Count │ 5.30 │
│ LLM-as-Judge: Clarity │ 0.91 │
│ LLM-as-Judge: Efficiency │ 0.82 │
│ LLM-as-Judge: Feasibility │ 0.85 │
│ LLM-as-Judge: Groundedness │ 0.89 │
│ LLM-as-Judge: Impact │ 0.88 │
│ LLM-as-Judge: Novelty │ 0.76 │
└───────────────────────────────┴───────────────┘
该表格从多个维度展示了系统的综合性能:
- 执行成功率 (98.50%):系统鲁棒性强,能够近乎完美地完成复杂多步工作流。
- 平均最终奖励 (0.81):作为训练优化的主要指标,在未见过的验证集上表现良好,表明生成的实验方案质量稳定。
- 决策一致性 (87.82%):与 PubMedQA 专家标注的最终决策一致率接近 88%,说明智能体不仅能设计方案,还能做出与专家共识高度一致的最终策略判断。
- LLM-as-Judge 明细:在清晰度 (0.91)、事实依据 (0.89) 和潜在影响力 (0.88) 等维度得分较高,表明生成的方案具备科学严谨性、证据扎实且具有实际意义。
这份综合评估定量证明了分层训练策略的成功:我们训练出了一个鲁棒、有效且与科学研究目标高度一致的多智能体系统。
Single Run LangSmith Tracing
定量指标告诉我们“表现如何”,而理解“如何/为何”表现则需要更深入的洞察:剖析一次完整运行时智能体的“思维过程”。
这正是 LangSmith 深度可观测性工具的价值所在。
我们检查一次评估运行的完整追踪记录。LangSmith 的追踪提供了层级化、逐步可视化的全流程视图:包括每个节点的运行、每次工具调用、每次LLM调用,从而进行“智能体行为分析”,精准定位智能体如何做出最终决策。
这与定量指标形成互补,帮助我们:
- 可视化工作流:看到
LangGraph中实际执行过的路径(包括重写回路)。 - 检查工具调用:查看智能体发起的具体查询与返回的数据。
- 调试推理过程:阅读各次LLM调用的输入/输出,理解智能体做出特定决策的原因。
- 验证奖励计算:查看
LitAgent发出的最终奖励记录,确认该次运行的得分是如何计算的。
示意截图:

LangSmith Customize Dashboard (Created by Fareed Khan)
从上至下完整呈现了智能体的运行过程,是我们理解复杂编排系统的“地面真相”。可以看到:
- 顶层运行概览:最外层的
MedicalResearchAgent记录代表整个运行过程,包含总时长与元数据。 - LangGraph 执行流:其下是完整的
research_graph执行过程,Geneticist、Supervisor、HypothesisRefiner、ProtocolDesigner等每个节点都作为子记录展示其运行序列。 - 工具调用与 ReAct 循环:例如,在
HypothesisRefiner记录内部嵌套了工具执行及对结果的处理过程;可以点进pubmed_search查看具体的查询语句和返回的文献。 - 最终奖励:追踪记录的末尾可以看到
Reward记录,即agl.emit_reward()的结果,可以核对该次运行的加权得分是如何计算的,并作为PPO训练的学习信号。
这种细粒度、层级化的可观测性对于复杂的智能体系统而言不是奢侈品,而是必需品。它将系统从“黑盒”变为透明、可调试的。一旦运行失败或输出质量不佳,我们能快速回放并定位问题(例如糟糕的工具调用、对结果的错误理解、决策偏差等),并据此进行针对性改进。
How Our RL Training Logic Works
最后总结完整的训练流程:
- 初始数据采集:使用基线预训练模型执行完整的多智能体工作流,填充
LightningStore,积累多样化的对话追踪记录和最终奖励。 - 初级研究员微调 (SFT):
SFTOnSuccess模块从初始追踪记录中筛选高奖励的成功样本,对较小的Qwen2模型进行监督微调,以提升其创意假设生成能力。 - 动态模型更新:SFT完成后,通过 vLLM 部署新模型并更新
LLMProxy,使后续的运行能直接受益于改进后的初级研究员模型。 - 高级研究员强化学习 (RL):启动
VERL(PPO) 主循环,借助改进后的初级研究员输出来采集新数据,并使用多维奖励对Llama-3策略模型进行在线更新,以提升其实验方案设计能力。 - 实时监控:
WandbLoggingHook监听每次PPO运行结束事件,将最终奖励实时记录到 Weights & Biases (W&B) 平台,形成学习曲线。 - 监督员的策略学习:
ContextualBanditRL模块遍历全流程积累的追踪记录,解析监督员的选择与最终奖励之间的关系,在线强化其选择策略,使其学会在不同上下文下选择更可能成功的假设。
关注“鲸栖”小程序,掌握最新AI资讯
本文由鲸栖原创发布,未经许可,请勿转载。转载请注明出处:http://www.itsolotime.com/archives/13481
