构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

在智能体(Agentic)系统中,无论是用于工具调用还是复杂推理,其行动通常由提示词(Prompts)引导。然而,传统的提示词是静态的,它们仅能提供行动步骤,却无法实现自我进化。真正的智能体训练(Agentic Training)源于系统在动态环境中的学习、适应与协作能力。

在智能体架构中,每个子智能体(Sub-Agent)的目标各异,这意味着单一的算法无法普适。要构建更有效的系统,我们需要一个整合了推理(Reasoning)、奖励(Reward)与实时反馈(Real-time Feedback)的完整训练架构。一个典型的智能体系统训练架构包含以下相互连接的组件:

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

Agentic Training Architecture (Created by Fareed Khan)

  1. 定义训练基础:通过配置环境、初始化智能体状态,并使其目标与系统整体目标对齐,来奠定训练基石。
  2. 搭建分布式训练管道:使多个智能体能够并行交互与学习,并通过共享内存或日志交换知识。
  3. 集成强化学习层:利用监督微调(SFT,适用于入门)、近端策略优化(PPO,用于高级优化)、上下文赌博机(Contextual Bandits,用于自适应决策)等算法驱动自我改进。
  4. 接入可观测性与监控工具:集成追踪钩子(Tracing Hooks)与日志适配器(Logging Adapters),以实时捕获每一次交互与学习步骤。
  5. 设计动态奖励系统:使智能体能基于其性能表现、目标对齐程度以及对整体任务的贡献获得反馈。
  6. 创建多阶段训练循环:让智能体按阶段推进训练,从监督微调平滑过渡到基于强化学习的自适应阶段。
  7. 评估与持续改进:通过分析奖励曲线、性能指标以及各角色的定性行为,持续评估并迭代优化整个架构。

在本系列文章中,我们将……

构建一个融合了推理、协作与强化学习的完整多智能体系统,使智能体能够通过实时反馈与奖励机制进行动态适应与自我改进。

全部实现代码可在以下 GitHub 仓库获取:

GitHub – FareedKhan-dev/training-ai-agents: Training architecture for self-improving AI agents.


技术总览

构建生产级 AI 系统时,我们不会直接从算法入手,而是首先夯实整个系统的基础。这个初始配置阶段至关重要,从依赖库的选择到数据源的确定,每一个决策都将直接影响最终训练出的智能体的可靠性与可复现性。

本节我们将完成以下准备工作:

  • 安装分层训练所需的核心库与专用依赖。
  • 配置 API 密钥,避免硬编码,并将 LangSmith 项目接入可观测性平台。
  • 下载并处理 PubMedQA 数据集,为智能体构建高质量的知识语料库。
  • 设计中心化的智能体状态管理机制(共享内存),以支持协作与推理。
  • 为智能体配备必要的工具,如模拟数据库、实时网络搜索等,用于与外部环境交互。

配置研究环境

首先需要设置 Python 环境。我们不使用简单的 pip install,而是采用 uv——一个快速、现代的包管理器,它能确保环境搭建既快速又可复现,非常适合生产环境。

我们还将安装 agent-lightning 的额外组件(用于 PPO),以及 apo(异步策略优化)和 unsloth(高效监督微调),这些对于实现高级的分层训练策略至关重要。

print("Updating and installing system packages...")  
# 首先更新系统包列表,并安装'uv'和'graphviz'。
# 'graphviz'是LangGraph可视化智能体工作流所需的系统依赖。
!apt-get update -qq && apt-get install -y -qq uv graphviz  

print("nInstalling packages...n")  
# 使用'uv'安装Python依赖。
# 安装'agent-lightning[verl,apo]'以获取PPO和其他高级强化学习算法所需的组件。
# 'unsloth[pt231]'提供了一个高度优化的监督微调框架,我们将用于初级研究员智能体。
!uv pip install -q -U "langchain" "langgraph" "langchain_openai" "tavily-python" "agentlightning[verl,apo]" "unsloth[pt231]" "pandas" "scikit-learn" "rich" "wandb" "datasets" "pyarrow"  
print("Successfully installed all required packages.")  

开始安装……

#### OUTPUT ####  
Updating and installing system packages...  
...  
Installing packages...  
Resolved 178 packages in 3.12s  
...  
+ agentlightning==0.2.2  
+ langchain==0.2.5  
+ langgraph==0.1.5  
+ unsloth==2024.5  
+ verl==0.6.0  
...  
Successfully installed all required packages.  

安装 graphviz 赋予了 LangGraph 可视化能力,这对后续调试我们复杂的智能体社会将非常有用。更重要的是,agentlightning 配合 verlunsloth 额外组件,为我们的分层训练策略提供了所需的高性能后端。

至此,我们已经建立了一个稳定、完整的基础环境,可以开始预处理训练数据了。

构建医学知识库

每个机器学习系统都需要训练数据,或者至少是一些用于启动自学习的初始观测数据。

我们的智能体无法在孤立中进行有效推理,它们需要丰富且特定领域的信息源。

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

Pre-processing Knowledge base data (Created by Fareed Khan)

静态、硬编码的事实列表过于简单。为了构建一个真实且具有挑战性的研究环境,我们将使用 PubMedQA 数据集,特别是其带标注的子集 pqa_l

该数据集包含真实的生物医学问题、提供必要上下文的原始科学摘要,以及由人类专家给出的最终“是/否/可能”答案。这不仅为我们的智能体提供了可搜索的丰富信息源,也为强化学习循环中的奖励计算提供了基准真相(Ground Truth)。

首先,我们定义一个 TypedDict 来组织每个研究任务,确保数据在整个处理流程中保持干净和一致。

from typing import List, TypedDict  

# TypedDict 提供了一种清晰、结构化的方式来表示每个研究任务。
# 这使得我们的代码更易读,并减少了使用普通字典可能导致的错误。
class ResearchTask(TypedDict):
    id: str                   # 文章的唯PubMed ID
    goal: str                 # 智能体必须调查的研究问题
    context: str              # 提供必要证据的完整科学摘要
    expected_decision: str    # 基准真相答案(‘yes’, ‘no’, 或 ‘maybe’)

我们创建了 ResearchTask 这个基于 TypedDict 的“任务蓝图”。它并非普通字典,而是一个数据结构契约。每个任务都严格包含 idgoalcontextexpected_decision 字段。这种严格的类型约束有助于防止后续出现错误,并确保系统各个组件对数据的期望形态有清晰一致的理解。

接下来,我们将编写一个函数,从 Hugging Face Hub 下载数据集,将其转换为 ResearchTask 格式,并进行训练集与验证集的划分。独立的验证集对于客观评估智能体训练效果至关重要。

from datasets import load_dataset
import pandas as pd

def load_and_prepare_dataset() -> tuple[List[ResearchTask], List[ResearchTask]]:
    """
    下载、处理 PubMedQA 数据集,并将其分割为训练集和验证集。
    """
    print("正在下载并准备 PubMedQA 数据集...")
    # 加载 PubMedQA 数据集中带标签的 'pqa_l' 子集。
    dataset = load_dataset("pubmed_qa", "pqa_l", trust_remote_code=True)
    # 将训练集分割转换为 pandas DataFrame,以便于操作。
    df = dataset['train'].to_pandas()

    # 此列表将用于存放结构化的 ResearchTask 对象。
    research_tasks = []
    # 遍历 DataFrame 的每一行以创建任务。
    for _, row in df.iterrows():
        # 'CONTEXTS' 字段是一个字符串列表,我们将其合并为一个文本块。
        context_str = " ".join(row['CONTEXTS'])
        # 使用清理和结构化的数据创建一个 ResearchTask 字典。
        task = ResearchTask(
            id=str(row['PUBMED_ID']),
            goal=row['QUESTION'],
            context=context_str,
            expected_decision=row['final_decision']
        )
        research_tasks.append(task)

    # 执行简单的 80/20 分割,得到训练集和验证集。
    train_size = int(0.8 * len(research_tasks))
    train_set = research_tasks[:train_size]
    val_set = research_tasks[train_size:]

    print(f"数据集下载并处理完成。总样本数: {len(research_tasks)}")
    print(f"训练集大小: {len(train_set)} | 验证集大小: {len(val_set)}")
    return train_set, val_set

# 执行该函数。
train_dataset, val_dataset = load_and_prepare_dataset()

load_and_prepare_dataset 函数构成了数据引入流水线,它自动完成从 Hugging Face 下载原始数据并转换为 ResearchTask 列表的过程。

80/20 分割是标准做法,它为我们提供了一个较大的训练集 (train_set) 和一个用于评估泛化能力的、未见过的验证集 (val_set)。

数据加载后,最好通过目测样本来验证解析是否正确,并了解智能体将面临的挑战。我们可以编写一个小的工具函数,以表格形式展示少量样本。

from rich.console import Console
from rich.table import Table

console = Console()

def display_dataset_sample(dataset: List[ResearchTask], sample_size=5):
    """
    使用 rich 库以格式化表格的形式展示数据集的样本。
    """
    # 使用 'rich' 库创建一个表格,以提高可读性。
    table = Table(title="PubMedQA 研究目标数据集 (样本)")
    table.add_column("ID", style="cyan")
    table.add_column("研究目标 (问题)", style="magenta")
    table.add_column("预期决策", style="green")

    # 用数据集的前几项填充表格。
    for item in dataset[:sample_size]:
        table.add_row(item['id'], item['goal'], item['expected_decision'])

    console.print(table)

display_dataset_sample(train_dataset)

display_dataset_sample 函数是我们的健康检查工具。使用 rich 生成格式化表格,可以快速清晰地核对数据结构,比直接打印字典更有效。我们可以确认 IDgoalexpected_decision 等字段均已正确提取。

查看输出示例:

#### 输出 ####
正在下载并准备 PubMedQA 数据集...
数据集下载并处理完成。总样本数: 1000
训练集大小: 800 | 验证集大小: 200

--- 样本 0 ---
ID: 11843333
目标: 儿童溃疡性结肠炎的所有病例都需要结肠切除术吗?
预期决策: 是
上下文 (前200字符): 对135名儿童溃疡性结肠炎患者进行了回顾性研究,以确定...

至此,我们已经将原始的 PubMedQA 数据转换为干净的 ResearchTask 列表,并完成了训练/验证集的划分。每一行都是一个可以输入智能体 rollout 的完整研究任务。

Research Goal 将作为初始提示,而 Expected Decision 将作为计算最终奖励的真实基准。我们的智能体现在拥有了一个世界级的、真实的知识库来进行学习。

定义分层 AgentState

数据准备就绪后,我们需要设计智能体社会的“神经系统”——共享内存或状态。在 LangGraph 中,这个共享内存由一个中心状态对象管理。

对于如此复杂的系统,简单的字典过于脆弱。我们将使用 Python 的 TypedDict 来架构一个嵌套的、分层的 AgentState

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

AgentState (由 Fareed Khan 创建)

这种方式为智能体的整个认知过程提供了机器可读的蓝图。状态中的每个字段代表了研究流程的不同阶段:从初级研究员生成的初始假设,到最终经过同行评审的实验方案。

我们将按以下步骤进行:
* 定义子状态:为 JuniorResearchProtocolReviewDecision 等中间产物创建小的 TypedDict
* 架构主状态:将这些子状态组合到主 AgentState 中,用于单次研究运行的完整信息。
* 启用 ReAct 逻辑:添加 sender 字段,这是确保工具调用结果能正确路由回相应智能体的关键。

首先定义初级研究员的输出结构,确保每个生成的假设都保持一致和规范。

from typing import List, TypedDict, Literal
from langchain_core.messages import BaseMessage

# 这定义了来自初级研究员的单个假设的结构。
# 它捕捉了核心观点、发现的证据以及提出该假设的智能体。
class JuniorResearch(TypedDict):
    hypothesis: str
    supporting_papers: List[str]
    agent_name: str # 用于追踪是哪位初级研究员提出的

我们为“假设提交”建立了蓝图。JuniorResearch 强制要求包含 hypothesis 字符串、supporting_papers 列表和 agent_name。这对于监督员智能体至关重要,它能确保收到结构一致、来源清晰的提案以供评估。

接下来定义实验方案的结构。它是高级研究员的主要产出,需要详细且可执行。

# 这定义了最终实验方案的结构。
# 它是一个详细、可执行的计划。
class Protocol(TypedDict):
    title: str
    steps: List[str]
    safety_concerns: str
    budget_usd: float

Protocol 明确了实验方案的关键要素:titlestepssafety_concernsbudget_usd。这要求高级研究员从实践细节出发进行思考。

这种结构化输出远比纯文本有价值,也是我们最终奖励计算的基础。

接下来为评审委员会(Review Board)的反馈创建结构。这部分对于改写循环至关重要,需要确保反馈清晰且机器可读。

# 定义来自评审智能体的结构化反馈。
# 它强制要求一个明确的决定、一个严重性级别以及建设性的反馈文本。
class ReviewDecision(TypedDict):
    decision: Literal['APPROVE', 'REVISE']
    critique_severity: Literal['CRITICAL', 'MAJOR', 'MINOR']
    feedback: str

ReviewDecision 类捕获了评审的详细输出。其中 Literal 类型的使用是一个关键的工程实践:
1. 强制离散选择:决策只能是 APPROVE(通过)或 REVISE(修订)。
2. 强制标注严重程度:问题严重性被限定为 CRITICAL(关键)、MAJOR(主要)或 MINOR(次要)。

这种结构化的反馈使得我们的 LangGraph 路由节点能够清晰地决定下一步流程:是需要大范围重写,还是仅需小幅度微调。

最后,我们组合出主状态类 AgentState

from typing import Annotated

# 这是将在 LangGraph 中所有节点间传递的主状态字典。
class AgentState(TypedDict):
    # ‘messages’ 字段累积对话历史。
    # ‘lambda x, y: x + y’ 告诉 LangGraph 如何合并此字段:通过追加新消息。
    messages: Annotated[List[BaseMessage], lambda x, y: x + y]
    research_goal: str  # 来自数据集的初始高层目标。
    sender: str         # ReAct 循环的关键:追踪最后行动的智能体,以便将工具执行结果返回给它。
    turn_count: int     # 计数器,用于防止图中的无限循环。

    # 初级研究员团队的输出(从并行运行中累积)
    initial_hypotheses: List[JuniorResearch]

    # 主管的选择
    selected_hypothesis: JuniorResearch
    supervisor_justification: str

    # 高级研究员团队的输出
    refined_hypothesis: str
    experimental_protocol: Protocol

    # 评审委员会的输出
    peer_review: ReviewDecision
    safety_review: ReviewDecision

    # 首席研究员的最终决定
    final_protocol: Protocol
    final_decision: Literal['GO', 'NO-GO']
    final_rationale: str

    # 来自奖励函数的最终评估分数
    final_evaluation: dict

至此,我们定义了智能体社会的完整认知架构。

信息流清晰明了:先生成 initial_hypotheses,选择 selected_hypothesis,细化为 experimental_protocol,经过 peer_reviewsafety_review 后,最终得出 final_decision

其中 sender 字段尤为重要。

在 ReAct(推理-行动)循环中,智能体决定使用工具。工具执行后,系统需要知道结果应返回给哪个智能体。

每次智能体行动时更新 sender 字段,就如同留下一个回邮地址,这使得复杂的往返式推理成为可能。有了这个状态结构,我们的图拥有了坚实的内存基础。

构建科学工具系统

我们的智能体现在拥有了复杂的记忆(AgentState),但要进行研究,它们必须能够访问外部世界(即外部知识库)。

没有工具的智能体仅仅是聊天者,而拥有工具的智能体才能主动获取实时、特定领域的信息。

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

科学工具系统示意图 (Created by Fareed Khan)

在本节中,我们将为智能体社会构建 ScientificToolkit,提供一组可调用的专用函数,以执行关键的研究任务。

我们将完成以下工作:
* 集成实时网络搜索:使用 TavilySearchResults 赋予智能体搜索 PubMed 和 ClinicalTrials.gov 最新文献的能力。
* 模拟内部数据库:创建蛋白质和基因本体的模拟数据库,以模拟查询内部私有知识库的过程。
* 应用 @tool 装饰器:通过 LangChain 的 @tool 装饰器,使这些 Python 函数能够被 LLM 智能体发现和调用。
* 测试工具:快速调用一个新工具以验证连接是否正确。

首先,定义一个统一存放工具的类。使用类进行分组便于组织和管理状态(例如管理 API 客户端)。

from langchain_core.tools import tool
from langchain_community.tools.tavily_search import TavilySearchResults

class ScientificToolkit:
    def __init__(self):
        # 初始化 Tavily 搜索客户端,配置为返回前 5 条结果。
        self.tavily = TavilySearchResults(max_results=5)
        # 这是一个模拟数据库,用于模拟内部蛋白质信息资源。
        self.mock_protein_db = {
            "amyloid-beta": "A key protein involved in the formation of amyloid plaques in Alzheimer's.",
            "tau": "A protein that forms neurofibrillary tangles inside neurons in Alzheimer's.",
            "apoe4": "A genetic risk factor for Alzheimer's disease, affecting lipid metabolism in the brain.",
            "trem2": "A receptor on microglia that, when mutated, increases Alzheimer's risk.",
            "glp-1": "Glucagon-like peptide-1, a hormone involved in insulin regulation with potential neuroprotective effects."
        }
        # 这是第二个模拟数据库,用于基因功能信息。
        self.mock_go_db = {
            "apoe4": "A major genetic risk factor for Alzheimer's disease, involved in lipid transport and amyloid-beta clearance.",
            "trem2": "Associated with microglial function, immune response, and phagocytosis of amyloid-beta."
        }

我们为 ScientificToolkit 设置了基础:
1. 初始化了实时网络搜索工具(Tavily)。
2. 构建了两个简单的字典(mock_protein_dbmock_go_db)来模拟内部数据库。
3. 这种混合实时与模拟工具的方式贴近企业环境:智能体需要同时访问公共和私有数据源。

接下来,定义具体的工具方法。每个方法都代表我们希望赋予智能体的一项能力。让我们从 PubMed 搜索开始。

    @tool
    def pubmed_search(self, query: str) -> str:
        """Searches PubMed for biomedical literature. Use highly specific keywords related to genes, proteins, and disease mechanisms."""
        console.print(f"--- TOOL: PubMed Search, Query: {query} ---")
        # 我们在查询前加上 'site:pubmed.ncbi.nlm.nih.gov' 以将搜索限制在 PubMed 范围内。
        return self.tavily.invoke(f"site:pubmed.ncbi.nlm.nih.gov {query}")

这是第一个工具 pubmed_search。LangChain 的 @tool 装饰器会将这个 Python 函数转换为 LLM 可以理解和调用的结构化工具。

我们再为临床试验搜索创建一个类似的工具。

    @tool
    def clinical_trials_search(self, query: str) -> str:
        """Searches for information on clinical trials related to specific drugs or therapies."""
        console.print(f"--- TOOL: Clinical Trials Search, Query: {query} ---")
        # 此工具专注于 ClinicalTrials.gov,用于查找正在进行或已完成的研究信息。
        return self.tavily.invoke(f"site:clinicaltrials.gov {query}")

clinical_trials_search 是一个专注于 ClinicalTrials.gov 的工具,有助于获取与药物研发和治疗相关的各类信息。

接下来,我们实现与模拟内部数据库交互的工具。

@tool
def protein_database_lookup(self, protein_name: str) -> str:
    """在模拟数据库中查询特定蛋白质的信息。"""
    console.print(f"--- TOOL: Protein DB Lookup, Protein: {protein_name} ---")
    # 此处模拟在专有内部蛋白质信息数据库中进行快速查询。
    return self.mock_protein_db.get(protein_name.lower(), "Protein not found.")

@tool
def gene_ontology_lookup(self, gene_symbol: str) -> str:
    """在基因本体数据库中查询特定基因符号的功能和相关通路。"""
    console.print(f"--- TOOL: Gene Ontology Lookup, Gene: {gene_symbol.upper()} ---")
    # 此处模拟查询另一个专用的内部数据库,用于获取基因功能信息。
    result = self.mock_go_db.get(gene_symbol.lower(), f"Gene '{gene_symbol}' not found in ontology database.")
    console.print(f"Gene '{gene_symbol.upper()}' lookup result: {result}")
    return result

这两个函数展示了如何集成内部或私有数据源。虽然这里使用简单的字典进行模拟,但在实际生产系统中,它们可以连接到 SQL 数据库、私有 API 或专用的生物信息学库(例如医院的私有数据库)。

最后,我们实例化工具包,并将所有工具函数整理成一个列表,以便传递给智能体执行器。

# 实例化我们的工具包类。
toolkit = ScientificToolkit()
# 创建一个列表,包含所有已定义的工具函数。
all_tools = [
    toolkit.pubmed_search,
    toolkit.clinical_trials_search,
    toolkit.protein_database_lookup,
    toolkit.gene_ontology_lookup
]

print("Scientific Toolkit with live data tools defined successfully.")
# 测试新的 gene_ontology_lookup 工具以确认其正常工作。
toolkit.gene_ontology_lookup.invoke("APOE4")

运行后输出如下:

#### OUTPUT ####
Scientific Toolkit with live data tools defined successfully.
--- TOOL: Gene Ontology Lookup, Gene: APOE4 ---
Gene 'APOE4' lookup result: A major genetic risk factor for Alzheimers disease, involved in lipid transport and amyloid-beta clearance.

输出显示 ScientificToolkit 已成功实例化,且 gene_ontology_lookup 工具工作正常。

all_tools 列表构成了一个完整、可移植的能力集合,可以绑定到任意智能体。通过主动从多个来源集成信息,我们将智能体从单纯的推理者转变为积极的研究者。

设计我们的科学家社群(LangGraph)

至此,我们已经具备了安全环境、数据集、分层的 AgentState 以及强大的 ScientificToolkit,可以开始构建智能体本体了。

接下来,我们将从数据结构转向工程化“认知实体”,即多智能体系统的核心组件。

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

Sub-agents system (Created by Fareed Khan)

在本节中,我们将使用 LangGraph 来设计和编排一个多智能体社群。

为了贴近真实的研究流程,我们将创建一个专家团队,每个角色都有明确的分工,并为其分配合适的开源模型。

我们将完成以下工作:
* 角色与模型分配:定义每位“AI科学家”的角色设定,并根据任务复杂度分配不同的开源模型。
* 创建智能体执行器:构建一个工厂函数,输入模型、提示词和工具,即可生成可运行的智能体执行器。
* 架构状态图:使用 LangGraph 连接智能体,实现高级的 ReAct 逻辑与多层级的改写循环,从而形成一个稳健的循环式工作流。
* 可视化架构:生成最终的工作流图,直观展示智能体社群的认知结构。

构建多智能体科学系统

高级智能体设计的一个核心理念是:任务并非千篇一律。为所有工作都使用单一的大型模型既低效又昂贵。我们将从 Hugging Face Hub 战略性地为不同角色分配不同的开源模型。

这种“为合适的任务匹配合适的模型”的方法是构建生产级、高性价比智能体系统的基石。

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

Multi-agentic System (Created by Fareed Khan)

首先,我们定义 LLM 配置。为初级研究员的创意发散使用小而快的模型;为高级研究员预留一个将使用 PPO 进行微调的占位模型;为关键的评审环节使用能力更强的专家混合模型。

import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

# 我们将为不同角色使用不同的开源模型,以优化性能和成本。
# 'openai_api_base' 将在训练期间由 LLMProxy 动态设置,指向本地服务器(如 Ollama 或 vLLM),而非 OpenAI 的 API。
junior_researcher_llm = ChatOpenAI(
    model="Qwen/Qwen2-1.5B-Instruct", # 一个用于并行头脑风暴的小型快速模型。
    temperature=0.7,
    openai_api_base="http://localhost:11434/v1", # 假设本地运行着 Ollama 服务器。
    openai_api_key="ollama"
)
supervisor_llm = ChatOpenAI(
    model="Qwen/Qwen2-1.5B-Instruct", # 同一个小型模型足以胜任结构化的选择任务。
    temperature=0.0,
    openai_api_base="http://localhost:11434/v1",
    openai_api_key="ollama"
)

# 这是一个特殊的占位符。在训练期间,VERL 算法将通过 Agent-Lightning 的 LLMProxy 服务,
# 在此逻辑名称下提供 Llama-3 模型。
senior_researcher_llm = ChatOpenAI(
    model="senior_researcher_llm", # 一个逻辑名称,初始并非真实的模型端点。
    temperature=0.1,
    openai_api_base="http://placeholder-will-be-replaced:8000/v1",
    openai_api_key="dummy_key"
)

# 对于关键的评审和最终决策阶段,我们使用一个更强大的模型。
review_board_llm = ChatOpenAI(
    model="mistralai/Mixtral-8x7B-Instruct-v0.1", # 一个强大的专家混合模型,用于细致的评估。
    temperature=0.0,
    openai_api_base="http://localhost:11434/v1",
    openai_api_key="ollama"
)
print("Agent personas and open-source LLM configurations are defined.")

请确保已拉取相应的模型,并通过 ollama/vLLM 等服务使其可用。

至此,我们定义了研究团队的“硬件”配置:
1. 为初级角色分配 Qwen2-1.5B,以实现快速、并行、低成本的创意思考。
2. senior_researcher_llm 明确作为一个逻辑占位符,这是训练阶段的关键概念。Agent-Lightning 将拦截对该模型名称的调用,并将其路由到我们正在进行 PPO 训练的模型,从而实现策略的更新。
3. 评审环节使用更强大的 Mixtral 模型,以确保批判性评估环节具有高水平的审查能力。

接下来,我们需要一种标准化的方法,将模型、系统提示词与工具组合成一个可运行的智能体。为此,我们创建一个工厂函数:

def create_agent_runner(llm, system_prompt, tools):
    """A factory function to create a runnable agent executor."""
    # The prompt consists of a system message, and a placeholder for the conversation history.
    prompt = ChatPromptTemplate.from_messages([
        ("system", system_prompt),
        MessagesPlaceholder(variable_name="messages"),
    ])
    # We bind the tools to the LLM, making them available for the agent to call.
    return prompt | llm.bind_tools(tools)

create_agent_runner 函数虽小但至关重要。它标准化了智能体的构建流程:接收 system_prompt(定义角色)、llm(推理引擎)与 tools(能力集合),确保了系统的一致性并简化了后续图的构建。

接着,为每个角色定义具体的系统提示词。这些提示词是运行在 LLM “硬件”之上的“软件”,指导着智能体的行为和输出格式。

# This is holding the detailed system prompts for each agent role.
prompts = {
    "Geneticist": "You are a geneticist specializing in Alzheimer's. Propose a hypothesis related to genetic factors. Use tools to find supporting evidence. Respond with a JSON object: {'hypothesis': str, 'supporting_papers': List[str]}.",
    "Pharmacologist": "You are a pharmacologist. Propose a drug target hypothesis. Use tools to find clinical trial data. Respond with a JSON object: {'hypothesis': str, 'supporting_papers': List[str]}.",
    "Neurologist": "You are a clinical neurologist. Propose a systems-level neurobiology hypothesis. Use tools to find papers on brain pathways. Respond with a JSON object: {'hypothesis': str, 'supporting_papers': List[str]}.",
    "Supervisor": "You are a research supervisor. Review the hypotheses and select the most promising one. Justify your choice based on novelty, feasibility, and impact. Return a JSON object: {'selected_hypothesis_index': int, 'justification': str}.",
    "HypothesisRefiner": "You are a senior scientist. Deepen the selected hypothesis with more literature review, refining it into a specific, testable statement. Return a JSON object: {'refined_hypothesis': str}.",
    "ProtocolDesigner": "You are a lab manager. Design a detailed, step-by-step experimental protocol to test the refined hypothesis. Be specific about methods, materials, and controls. Return a JSON object: {'title': str, 'steps': List[str], 'safety_concerns': str, 'budget_usd': float}.",
    "PeerReviewer": "You are a critical peer reviewer. Find flaws in the protocol. Be constructive but rigorous. Return a JSON object: {'decision': 'APPROVE'|'REVISE', 'critique_severity': 'CRITICAL'|'MAJOR'|'MINOR', 'feedback': str}.",
    "SafetyOfficer": "You are a lab safety officer. Review the protocol for safety, regulatory, and ethical concerns. Be thorough. Return a JSON object: {'decision': 'APPROVE'|'REVISE', 'critique_severity': 'CRITICAL'|'MAJOR'|'MINOR', 'feedback': str}.",
    "PrincipalInvestigator": "You are the Principal Investigator. Synthesize the protocol and reviews into a final document. Make the final GO/NO-GO decision and provide a comprehensive rationale. Return a JSON object: {'final_protocol': Protocol, 'final_decision': 'GO'|'NO-GO', 'final_rationale': str}."
}

至此,我们定义好了整个 AI 科学家团队。每个智能体都由 prompt(角色定义)、llm(推理引擎)与 tools(工具集)共同构成。

这些提示词的关键在于要求智能体以指定的 JSON 对象格式进行回应。这种结构化输出是可靠更新分层 AgentState 的基础,确保了工作流能够从一个智能体顺畅地流转到下一个。

集成 ReAct 逻辑的高级 StateGraph

现在,我们将专家团队组装到一个“虚拟实验室”中进行协作,这正是 LangGraph 的用武之地。我们将智能体组装成一个循环工作流,创建 StateGraph,并定义研究团队之间的信息与控制流转逻辑。

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

ReAct 逻辑简化示意图 (Created by Fareed Khan)

这并非一个简单的线性流水线……

为了贴近真实的研究过程,我们需要实现复杂的逻辑,包括用于迭代修改的反馈回路以及稳健的工具调用机制。

在本节中,我们将完成以下步骤:
* 构建智能体节点:使用工厂函数将智能体运行器封装为 LangGraph 节点,并确保其能正确更新 AgentState
* 实现 ReAct 风格的工具调用:定义条件边和路由器,确保工具执行的结果能够返回到正确的智能体进行处理。
* 设计多层级的修改循环:基于评审严重程度进行智能路由,支持“小修小改”和“重大修改”两种不同的反馈回路。
* 编译并可视化图:编译完整的 StateGraph 并生成可视化图表,清晰地呈现整个认知架构。

构建智能体节点与路由逻辑

首先,定义一个辅助函数,用于将智能体运行器(agent runner)封装为 LangGraph 的节点函数。该函数负责在节点调用时正确维护 turn_countsender 等关键状态。

from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langchain_core.messages import HumanMessage, BaseMessage
import json

MAX_TURNS = 15  # 安全机制,防止图陷入无限循环。

def create_agent_node(agent_name: str, agent_runner):
    """为指定的智能体运行器创建一个 LangGraph 节点函数。"""
    def agent_node(state: AgentState) -> dict:
        # 打印控制台信息以追踪图的执行路径。
        console.print(f"--- Node: {agent_name} (Turn {state['turn_count']}) ---")
        # 递增回合计数作为安全措施。
        state['turn_count'] += 1
        # 使用当前状态调用智能体运行器。
        result = agent_runner.invoke(state)

        # 特别处理评审智能体(如 PeerReviewer, SafetyOfficer)的结构化 JSON 输出。
        if agent_name in ["PeerReviewer", "SafetyOfficer"]:
            try:
                # 智能体的输出是 AIMessage 的 ‘content’ 字段中的 JSON 字符串。
                content = json.loads(result.content)
                # 根据运行的评审者,更新 AgentState 中的对应字段。
                if agent_name == "PeerReviewer":
                    state['peer_review'] = content
                else:
                    state['safety_review'] = content  # 注意:此处键为 'safety_review',而非 'feedback'。
            except (json.JSONDecodeError, TypeError):
                # 如果解析失败,记录错误但不使图崩溃。
                console.print(f"[bold red]Error parsing JSON from {agent_name}: {result.content}[/bold red]")

        # 更新 ‘messages’ 列表,并关键性地设置 ‘sender’ 字段以供 ReAct 路由使用。
        return {"messages": [result], "sender": agent_name}
    return agent_node

create_agent_node 函数是系统中每个智能体的标准化封装,其核心作用如下:
1. 状态管理与追踪:每次智能体运行时,记录活动日志并递增安全计数器 turn_count
2. 路由标识:更新 sender 字段,标记“谁刚刚执行了动作”。这是后续实现 ReAct(推理-行动)循环中工具调用后路由返回的关键。
3. 结构化输出处理:对评审类智能体的 JSON 输出进行专门解析,确保 peer_reviewsafety_review 正确写入 AgentState


接下来,定义驱动 ReAct 循环的条件判断逻辑。该函数检查状态中的最新消息,以决定下一步是执行工具还是结束当前智能体的推理轮次。

def tools_condition(state: AgentState) -> str:
    """条件边函数,用于检查是否存在工具调用以及是否达到回合上限。"""
    # 检查状态中的最后一条消息。
    last_message = state['messages'][-1]

    # 如果消息中没有工具调用,则当前智能体的回合结束。
    if not hasattr(last_message, 'tool_calls') or not last_message.tool_calls:
        return "end"

    # 如果已达到最大回合数,也结束执行以防止循环。
    if state['turn_count'] >= MAX_TURNS:
        console.print("[bold yellow]Max turns reached. Ending graph.[/bold yellow]")
        return "end"

    # 否则,表示有待执行的工具。
    return "tools"

tools_condition 函数是 ReAct 循环的“守门员”,其逻辑为:
1. 检查工具调用:判断最后一条消息是否包含 tool_calls 属性。若有,则返回 "tools",将工作流路由至工具执行节点。
2. 安全终止:若无工具调用,或回合数 turn_count 已达到预设的 MAX_TURNS 上限,则返回 "end",推动工作流进入下一步。


最后,需要一个在工具执行完毕后将控制权路由回原智能体的函数。这正是之前设置的 sender 字段发挥作用的地方。

def route_after_tools(state: AgentState) -> str:
    """路由器函数,将工作流送回发起工具调用的智能体。"""
    # 从状态的 ‘sender’ 字段中获取最后一个执行动作的智能体名称。
    sender = state.get("sender")
    console.print(f"--- Routing back to: {sender} after tool execution ---")
    if not sender:
        # 如果由于某种原因未设置 sender,则作为后备方案结束图。
        return END
    # 返回的字符串必须与图中某个节点的名称匹配。
    return sender

route_after_tools 函数实现了 ReAct 模式的后半部分闭环。它从 AgentState 中读取 sender 字段并返回其值。LangGraph 引擎会根据这个返回值,将工具的执行结果直接送回给最初发起调用的智能体节点,使其能够基于工具返回的结果继续进行后续的推理。

接下来是最核心的路由逻辑:基于评审结果的多层级改写循环。

def route_after_review(state: AgentState) -> Literal["PrincipalInvestigator", "HypothesisRefiner", "ProtocolDesigner"]:
    """
    一个智能路由器,根据评审反馈的严重程度决定下一步行动。
    """
    peer_review = state.get("peer_review", {})
    safety_review = state.get("safety_review", {})

    # 从两份评审中提取决策和严重级别,并提供安全默认值。
    peer_severity = peer_review.get("critique_severity", "MINOR")
    safety_severity = safety_review.get("critique_severity", "MINOR")

    # 如果迭代次数已达上限,则无论反馈如何,都必须进入最终决策。
    if state['turn_count'] >= MAX_TURNS:
        console.print("[bold yellow]评审阶段达到最大迭代次数,将转至最终决策。[/bold yellow]")
        return "PrincipalInvestigator"

    # 如果任一评审的严重级别为“CRITICAL”,则表明核心假设存在根本性缺陷。
    # 需要路由回“HypothesisRefiner”进行重大重构。
    if peer_severity == 'CRITICAL' or safety_severity == 'CRITICAL':
        console.print("--- 评审要求CRITICAL级别修订,路由回假设精炼器。 ---")
        state['messages'].append(HumanMessage(content="收到关键性反馈。核心假设需要重新思考。"))
        return "HypothesisRefiner"

    # 如果任一评审的严重级别为“MAJOR”(且无CRITICAL),则表明协议本身存在重大缺陷。
    # 需要路由回“ProtocolDesigner”进行显著修改。
    if peer_severity == 'MAJOR' or safety_severity == 'MAJOR':
        console.print("--- 评审要求MAJOR级别修订,路由回协议设计器。 ---")
        state['messages'].append(HumanMessage(content="收到重大反馈。协议需要进行显著修订。"))
        return "ProtocolDesigner"

    # 如果只有“MINOR”级别修订或全部通过,则协议基本可靠。
    # 可以进入“PrincipalInvestigator”进行最终决策。
    console.print("--- 评审完成,路由至最终决策者。 ---")
    return "PrincipalInvestigator"

该函数是迭代细化流程的核心组件,其决策逻辑如下:

  1. 严重缺陷(CRITICAL):当任一评审的 critique_severityCRITICAL 时,路由至 HypothesisRefiner。这表明核心假设存在根本性问题,需要重新构思。
  2. 重大缺陷(MAJOR):当任一评审的 critique_severityMAJOR(且无 CRITICAL)时,路由至 ProtocolDesigner。这表明实验协议存在显著缺陷,需要进行重大修改。
  3. 轻微缺陷或通过(MINOR/APPROVED):当所有评审均为 MINOR 或通过时,路由至 PrincipalInvestigator。这表明方案基本可靠,可以进入最终评估与决策阶段。

此外,系统还设置了迭代次数上限(MAX_TURNS)作为安全阀,防止无限循环,确保流程最终能够收敛。

这种基于严重程度的多级反馈回路,模拟了真实世界中的项目评审与迭代过程,使智能体能够根据问题深度动态调整其修正策略,是实现“自我进化”的关键机制。

最后,构建并编译完整的 StateGraph

def build_graph() -> StateGraph:
    workflow = StateGraph(AgentState)

    # 使用工厂函数实例化所有智能体执行器。
    agent_runners = {
        "Geneticist": create_agent_runner(junior_researcher_llm, prompts["Geneticist"], all_tools),
        "Pharmacologist": create_agent_runner(junior_researcher_llm, prompts["Pharmacologist"], all_tools),
        "Neurologist": create_agent_runner(junior_researcher_llm, prompts["Neurologist"], all_tools),
        "Supervisor": create_agent_runner(supervisor_llm, prompts["Supervisor"], []),
        "HypothesisRefiner": create_agent_runner(senior_researcher_llm, prompts["HypothesisRefiner"], all_tools),
        "ProtocolDesigner": create_agent_runner(senior_researcher_llm, prompts["ProtocolDesigner"], all_tools),
        "PeerReviewer": create_agent_runner(review_board_llm, prompts["PeerReviewer"], []),
        "SafetyOfficer": create_agent_runner(review_board_llm, prompts["SafetyOfficer"], []),
        "PrincipalInvestigator": create_agent_runner(review_board_llm, prompts["PrincipalInvestigator"], [])
    }

    # 将所有智能体节点和单一工具执行节点添加到图中。
    for name, runner in agent_runners.items():
        workflow.add_node(name, create_agent_node(name, runner))
    workflow.add_node("execute_tools", ToolNode(all_tools))

    # ---- 使用边定义图的控制流 ----

    # 图从并行运行三位初级研究员开始。
    workflow.add_edge(START, "Geneticist")
    workflow.add_edge(START, "Pharmacologist")
    workflow.add_edge(START, "Neurologist")

    # 为每个可以使用工具的智能体添加 ReAct 条件边。
    for agent_name in ["Geneticist", "Pharmacologist", "Neurologist", "HypothesisRefiner", "ProtocolDesigner"]:
        # 智能体运行后,检查是否有工具调用。
        workflow.add_conditional_edges(
            agent_name,
            tools_condition,
            {
                "tools": "execute_tools", # 如果调用了工具,则转到工具节点。
                "end": "Supervisor" if agent_name in ["Geneticist", "Pharmacologist", "Neurologist"] else "ProtocolDesigner" if agent_name == "HypothesisRefiner" else "PeerReviewer" # 如果没有工具,则进入下一个逻辑步骤。
            }
        )
    # 工具执行完毕后,路由回调用它们的智能体。
    workflow.add_conditional_edges("execute_tools", route_after_tools)

    # 定义研究流程的主要线性流。
    workflow.add_edge("Supervisor", "HypothesisRefiner")
    workflow.add_edge("PeerReviewer", "SafetyOfficer")

    # SafetyOfficer 之后,使用智能评审路由器。
    workflow.add_conditional_edges("SafetyOfficer", route_after_review)

    # PrincipalInvestigator 是图结束前的最后一步。
    workflow.add_edge("PrincipalInvestigator", END)

    return workflow

# 构建图并将其编译为可运行对象。
research_graph_builder = build_graph()
research_graph = research_graph_builder.compile()
print("LangGraph StateGraph 已定义并编译完成。")

# 我们还可以可视化编译后的图以查看最终架构。
try:
    from IPython.display import Image, display
    png_image = research_graph.get_graph().draw_png()
    display(Image(png_image))
except Exception as e:
    print(f"无法可视化图:{e}。请确保已安装 pygraphviz 和 graphviz。")

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

build_graph 函数将所有组件(节点、边、路由器)组合成一个完整、可运行的 StateGraph。我们可以清晰地看到:初级研究员的并行启动、ReAct 循环(智能体调用工具并接收结果)、以及评审阶段的多层级反馈回路。

接下来,我们将开始构建智能体系统的训练架构。

LitAgent 与复杂的奖励系统

我们已经使用 LangGraph 设计并组装了智能体社会,但无论静态工作流多么复杂,都无法进行学习或提升。为了实现学习,需要将 LangGraph 编排与训练框架连接起来,这正是 Agent-Lightning 的作用。

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

本节我们将创建两个关键组件作为桥梁:LitAgent 与奖励函数。它们将静态图转化为可训练的动态系统。

计划如下:
* 封装工作流:创建继承 agl.LitAgentMedicalResearchAgent,在其 rollout 方法中封装整个 LangGraph
* 启用定点训练:在 rollout 中,仅为特定节点(如高级研究员)动态注入训练中的模型,实现“外科手术式”的策略更新。
* 设计细粒度奖励系统:构建一个多维度的 protocol_evaluator(使用 LLM 作为评判者),从可行性、影响力、严谨性等角度为最终输出打分。
* 构建加权奖励:实现一个函数,将多维分数聚合为单一的加权奖励,以引导强化学习算法进行优化。

创建 MedicalResearchAgent

使系统可训练的第一步是将 LangGraph 工作流封装进 agl.LitAgent。在 Agent-Lightning 生态中,LitAgent 是可训练的基本单元,其核心是定义 rollout 方法:在给定任务上执行一次端到端的运行。

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

我们创建 MedicalResearchAgent 类,继承 agl.LitAgent。它持有编译后的 LangGraph 和奖励函数。其 rollout 方法将从数据集中获取研究目标,执行完整的图,然后使用奖励函数对最终结果进行评分。

关键工程点是如何处理“训练中的模型”。

图不会使用固定的模型集合。rollout 方法将把由 Agent-Lightning 训练器提供的 LLM 端点动态绑定到我们希望训练的节点(例如高级研究员),从而实现定点、精准的策略微调。

定义 MedicalResearchAgent 类:

import agentlightning as agl
from typing import Any, cast

class MedicalResearchAgent(agl.LitAgent):
    def __init__(self, graph, reward_func):
        # LitAgent 必须使用已编译的图和奖励函数进行初始化。
        super().__init__()
        self.graph = graph
        self.reward_func = reward_func

    def rollout(self, task: ResearchTask, resources: agl.NamedResources, rollout: agl.Rollout) -> None:
        # 此方法定义了智能体一次端到端的完整运行。
        console.print(f"n[bold green]-- 开始 Rollout {rollout.rollout_id},任务:{task['id']} --[/bold green]")

        # 'senior_researcher_llm' 资源是我们的待训练模型,由 VERL 算法通过 LLMProxy 提供。
        llm_resource = cast(agl.LLM, resources['senior_researcher_llm'])

        # 训练器的追踪器提供了一个 LangChain 回调处理器,这对于 LangSmith 中的深度可观测性至关重要。
        langchain_callback_handler = self.trainer.tracer.get_langchain_handler()
        # 在此,我们将训练资源中的 LLM 端点动态绑定到我们想要训练的具体智能体执行器上。这是实现定向策略优化的关键。
        llm_with_endpoint = senior_researcher_llm.with_config({
            "openai_api_base": llm_resource.endpoint,
            "openai_api_key": llm_resource.api_key or "dummy-key"
        })

        # 为此特定 rollout 创建新的智能体执行器,使用更新后的 LLM 绑定。
        hypothesis_refiner_agent_trained = create_agent_runner(llm_with_endpoint, prompts["HypothesisRefiner"], all_tools)
        protocol_designer_agent_trained = create_agent_runner(llm_with_endpoint, prompts["ProtocolDesigner"], all_tools)

        # 获取图的可变副本,以便为此 rollout 临时更新节点。
        graph_with_trained_model = self.graph.copy()
        # 将 'HypothesisRefiner' 和 'ProtocolDesigner' 节点的函数替换为我们新创建的、可训练的执行器。
        graph_with_trained_model.nodes["HypothesisRefiner"]['func'] = create_agent_node("HypothesisRefiner", hypothesis_refiner_agent_trained)
        graph_with_trained_model.nodes["ProtocolDesigner"]['func'] = create_agent_node("ProtocolDesigner", protocol_designer_agent_trained)
        # 将修改后的图编译成此特定 rollout 的可运行对象。
        runnable_graph = graph_with_trained_model.compile()

        # 准备图执行的初始状态。
        initial_state = {"research_goal": task['goal'], "messages": [HumanMessage(content=task['goal'])], "turn_count": 0, "initial_hypotheses": []}
        # 配置运行以使用我们的 LangSmith 回调处理器。
        config = {"callbacks": [langchain_callback_handler]} if langchain_callback_handler else {}

        try:
            # 从头到尾执行完整的 LangGraph 工作流。
            final_state = runnable_graph.invoke(initial_state, config=config)
            # 从图的最终状态中提取最终协议。
            final_protocol = final_state.get('final_protocol')

        # 如果成功生成了协议,我们计算其奖励。
        if final_protocol:
            console.print("--- 智能体生成的最终协议 ---")
            console.print(final_protocol)
            # 调用我们的多维度奖励函数以获取分数字典。
            reward_scores = self.reward_func(final_protocol, task['context'])
            # 将分数转换为单个加权奖励值。
            final_reward = get_weighted_reward(reward_scores)
        else:
            # 为失败或不完整的 rollout 分配 0.0 的奖励。
            final_reward = 0.0

        # 发出最终奖励。Agent-Lightning 捕获此值并将其用于 RL 更新步骤。
        agl.emit_reward(final_reward)

        console.print(f"[bold green]-- Rollout {rollout.rollout_id} 完成,最终奖励:{final_reward:.2f} --[/bold green]")
        # 该方法返回 None,因为结果(奖励和追踪)通过 agl.emit_* 调用发出。
        return None

MedicalResearchAgent 是核心的可训练单元,它将 LangGraph 的复杂多步逻辑与 Agent-Lightning 的训练循环连接起来:

  1. 动态绑定:将 senior_researcher_llm 资源动态绑定到目标节点。注意,我们并不修改原始图。
  2. 临时副本:每次 rollout 都会创建图的一个临时副本,仅将 Senior Researcher 节点指向正在训练的模型。

因此,PPO 更新只会影响 Senior Researchers 的策略(即如何细化假设和设计方案),而其他智能体(如 Junior Researchers、Review Board 等)仍使用稳定的预定义模型。这就在复杂的异构多智能体系统中实现了精准、高效的定点训练。

多维奖励系统

强化学习的效果高度依赖于奖励信号。对于科学研究这类精细任务,简单的二元奖励(如成功=1/失败=0)是远远不够的。

它无法教会智能体区分“勉强合格”和“卓越”的方案。

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

为了提供信息量丰富的学习信号,我们设计了一个多维奖励系统。其核心是构建一个 protocol_evaluator,它扮演 LLM-as-a-Judge 的角色。

这个“法官”是一个强大的模型,它从多个维度评估智能体生成的最终方案,并返回一个结构化的分数字典。

具体实现计划如下:

  • 定义评估指标:使用 Pydantic 模型 EvaluationOutput 定义评分维度,例如新颖性、可行性、影响力、清晰度,以及关键的 groundedness(相对于上下文的扎实程度)。
  • 构建评估器函数:实现 protocol_evaluator 函数,使用详细的提示词驱动法官 LLM,并解析其结构化的响应。
  • 构建加权奖励:定义 get_weighted_reward 函数,将分数字典按照预设权重聚合成一个单一的浮点数奖励,可以对重点维度(如影响力)赋予更高权重。

首先,定义评估的 Pydantic 模式,作为奖励系统的正式评分标准:

from langchain_core.pydantic_v1 import BaseModel, Field

class EvaluationOutput(BaseModel):
    novelty: float = Field(description="对超越给定背景的原创性和创新性的评分(0-1分)。")
    feasibility: float = Field(description="在标准实验室资源下,方案实用性的评分(0-1分)。")
    impact: float = Field(description="潜在科学或临床意义的评分(0-1分)。")
    clarity: float = Field(description="方案是否具体、可衡量、可复现的评分(0-1分)。")
    groundedness: float = Field(description="方案与所提供科学背景的一致性和支持程度的评分(0-1分。对任何缺乏背景支持的论断进行扣分)。")
    efficiency: float = Field(description="所提方案的成本效益和时间效率评分(0-1分)。")

EvaluationOutput 通过清晰的字段为评估大语言模型提供了明确的指引。其中,groundedness 字段的引入旨在引导 PPO 智能体避免产生幻觉或脱离文献支持的断言,而 efficiency 字段则强化了对方案实用性的考量。

接下来,构建协议评估器 protocol_evaluator

def protocol_evaluator(protocol: Protocol, context: str) -> dict:
    """
    充当“LLM-as-a-Judge”,根据多项标准对协议进行评分。
    """
    console.print("--- 运行协议评估器(奖励函数) ---")

    # 构建详细的提示词,要求大语言模型扮演专家评审团。
    evaluator_prompt = ChatPromptTemplate.from_messages([
        ("system", "你是一个由资深科学家组成的专家评审团。请根据以下标准,对给出的实验方案进行评分(0.0 到 1.0 分)。请严格评判,并简要说明评分理由。"),
        # 同时提供原始科学背景和智能体生成的协议。
        ("human", f"科学背景:nn{context}nn---nn待评估协议:nn{json.dumps(protocol, indent=2)}")
    ])

    # 使用强大的 `review_board_llm`,并指示其按照 `EvaluationOutput` 模式格式化输出。
    evaluator_llm = review_board_llm.with_structured_output(EvaluationOutput)

    try:
        # 调用评估链。
        evaluation = evaluator_llm.invoke(evaluator_prompt.format_messages())
        # 输出是一个 Pydantic 对象,可轻松转换为字典。
        scores = evaluation.dict()
        console.print(f"生成评分:{scores}")
        return scores
    except Exception as e:
        # 如果大语言模型评估失败,则返回默认低分以惩罚此次失败。
        console.print(f"[bold red]协议评估出错:{e}。返回零分。[/bold red]")
        return {"novelty": 0.1, "feasibility": 0.1, "impact": 0.1, "clarity": 0.1, "groundedness": 0.1, "efficiency": 0.1}

protocol_evaluator 是自动化的质量保障模块:
1. 接收智能体的最终 protocol 和原始数据集 context
2. 将其提交给强大的 review_board_llm,要求其作为专家评审团根据 EvaluationOutput 标准打分。
3. 通过 try...except 确保鲁棒性:即使评估大语言模型失败,训练循环也不会崩溃,而是返回低分,从而正确惩罚失败的 rollout。

最后,通过加权平均为强化学习算法提供一个单一的浮点数奖励:

def get_weighted_reward(scores: dict) -> float:
    """
    根据各项指标评分的字典,计算单一的加权奖励分数。
    """
    # 权重设置允许我们优先考虑“优秀”协议的某些方面。
    # 此处,我们认为“影响力”是最重要的因素,“效率”是锦上添花。
    weights = {
        "novelty": 0.1,
        "feasibility": 0.2,
        "impact": 0.3,
        "clarity": 0.15,
        "groundedness": 0.2,
        "efficiency": 0.05
    }

    # 计算加权总分。如果输入字典中缺少某项评分,则默认为 0。
    weighted_sum = sum(scores.get(key, 0) * weight for key, weight in weights.items())

    return weighted_sum

get_weighted_reward 为强化学习提供最终的奖励信号。通过权重设置(例如,impact 权重最高为 0.3),可以明确指导优化的方向。

以下是对整个奖励系统的测试:

print("多维度加权奖励函数已定义。")
# 使用示例协议测试完整的奖励流程。
test_protocol = {"title": "测试协议", "steps": ["1. 执行此操作。", "2. 执行彼操作。"], "safety_concerns": "小心处理。", "budget_usd": 50000.0}
test_context = "近期研究表明,肠道微生物群与阿尔茨海默病的神经炎症存在关联。"
test_scores = protocol_evaluator(test_protocol, test_context)
final_test_reward = get_weighted_reward(test_scores)
print(f"加权最终奖励:{final_test_reward:.2f}")

#### 输出 ####
多维度加权奖励函数已定义。
--- 运行协议评估器(奖励函数) ---
生成评分:{'novelty': 0.8, 'feasibility': 0.7, 'impact': 0.9, 'clarity': 0.85, 'groundedness': 0.95, 'efficiency': 0.9}
加权最终奖励:0.84

测试结果表明,评估与加权流程工作正常。至此,我们获得了指导训练的奖励信号。

创建基于强化学习的训练架构

我们已经使用 LangGraph 设计了智能体社会,并构建了奖励系统。下一步是构建工业级的基础设施,以高效、可扩展地训练这些智能体。这正是 Agent-Lightning 的用武之地。

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

智能体训练架构(由 Fareed Khan 创建)

对于复杂的多智能体系统(涉及大量大语言模型调用),简单的单进程训练循环已无法满足需求。

我们需要一个分布式架构,能够并行运行多个智能体“rollouts”,同时由中心训练算法进行统一管理。

在本节中,我们将配置 Agent-Lightning 训练基础设施的核心组件。

构建分布式神经系统

为了高效训练,我们需要快速采集大量经验。如果串行执行单个智能体的轨迹采样,将成为严重的性能瓶颈。因此,我们配置 Trainer 使用 ClientServerExecutionStrategy

该策略构建了一个分布式训练架构。主进程负责运行核心训练算法(如 PPO)并启动 LightningStoreServer 来管理数据。

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

分布式系统架构图 (Created by Fareed Khan)

该策略会派生多个独立的 runner 子进程。每个 runner 作为客户端连接到主服务器,获取任务并并行执行 MedicalResearchAgentrollout。这使得我们能够同时收集海量训练数据,这对于强化学习的效率至关重要。

以下是执行策略的配置示例:

import agentlightning as agl

# 配置系统并行运行 4 个智能体轨迹采样
num_runners = 4

# 此字典定义了 Agent-Lightning Trainer 的执行策略
strategy_config = {
    "type": "cs", # 'cs' 是 ClientServerExecutionStrategy 的简写
    "n_runners": num_runners, # 要生成的并行工作进程数量
    "server_port": 48000 # 指定一个高位端口,以避免与其他服务冲突
}
print(f"ClientServerExecutionStrategy configured for {num_runners} runners.")

至此,我们定义了分布式训练的蓝图。将 strategy_config 传递给 agl.Trainer 后,框架将自动搭建多进程架构(包括进程间通信、数据同步等)。我们只需调整 n_runners 参数即可轻松扩展系统规模,而无需修改核心业务逻辑代码。

使用 LLMProxy 作为多模型枢纽实现可观测性

我们的智能体社会是异构的,使用了多种不同的模型。管理多个模型端点非常复杂,尤其是当其中一个模型是动态变化的训练中服务时。

Agent-LightningLLMProxy 为此提供了完美的解决方案。

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

LLM 代理架构图 (Created by Fareed Khan)

它充当所有大语言模型调用的统一网关。我们的 LitAgent 将请求发送到代理,代理根据调用中指定的 model_name 智能地路由到正确的后端服务。

这一设计在训练场景中尤其强大:
1. VERL(PPO)算法能够自动更新代理配置,将对 "senior_researcher_llm" 的请求路由到其自身的 vLLM 推理实例。
2. 同时,对其他模型(如 Qwen2Mixtral)的请求,则会被路由到本地 Ollama 等其他后端。

以下是 LLMProxy 的配置示例:

# 'model_list' 定义了 LLMProxy 的路由规则
llm_proxy_config = {
    "port": 48001, # LLMProxy 自身监听的端口
    "model_list": [
        # 规则 1:用于初级研究员和监督员。
        # 任何对此模型名的请求将被转发到运行 Qwen2 的本地 Ollama 服务器。
        {
            "model_name": "Qwen/Qwen2-1.5B-Instruct",
            "litellm_params": {"model": "ollama/qwen2:1.5b"}
        },
        # 规则 2:用于我们的高级研究员(正在训练的模型)。
        # 初始时,它可能指向一个基线模型。在训练过程中,VERL 算法
        # 将自动更新此条目,使其指向自己的 vLLM 服务器。
        {
            "model_name": "senior_researcher_llm",
            "litellm_params": {"model": "ollama/llama3"} # 初始后备模型
        },
        # 规则 3:用于强大的评审委员会。
        # 对此模型的请求将被路由到运行 Mixtral 的本地 Ollama 服务器。
        {
            "model_name": "mistralai/Mixtral-8x7B-Instruct-v0.1",
            "litellm_params": {"model": "ollama/mixtral"}
        }
    ]
}

llm_proxy_config 是整个多智能体系统的路由表,其优势在于:
1. 将智能体使用的逻辑模型名(如 "senior_researcher_llm")与物理后端服务解耦。
2. 可以轻松替换后端、进行 A/B 测试或动态更新训练中模型的端点,而无需修改智能体的核心代码。
3. LLMProxy 为整个系统提供了集中的控制点和可观测性。

构建数据管道 HierarchicalTraceAdapter

我们的分层训练策略面临独特的数据处理挑战:每次 rollout 只产生一个复杂的 LangGraph 轨迹,但我们需要为三种不同的训练算法提供不同格式的数据:

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

强化学习算法实现示意图 (Created by Fareed Khan)

  1. SFT 算法:需要来自初级研究员的对话数据(消息列表)。
  2. PPO 算法:需要高级研究员的强化学习三元组(状态、动作、奖励)。
  3. Contextual Bandit 算法:需要监督员决策的单个(上下文、动作、奖励)元组。

为此,我们构建了一个复杂的轨迹适配器。在 Agent-Lightning 中,适配器是一个将原始轨迹(LangSmith spans 列表)转换为特定算法所需格式的类。

HierarchicalTraceAdapter 将是一个“多头”数据处理器,能够从单一轨迹中提取并生成三种数据格式。

我们创建一个继承自 agl.TracerTraceToTriplet 的新类,并为其添加针对各自目标数据格式的处理方法。这展示了 Agent-Lightning 数据管道的强大灵活性。

定义 HierarchicalTraceAdapter

from agentlightning.adapter import TraceToMessages

class HierarchicalTraceAdapter(agl.TracerTraceToTriplet):
    def __init__(self, *args, **kwargs):
        # 初始化父类以支持PPO三元组生成
        super().__init__(*args, **kwargs)
        # 同时创建一个标准适配器实例,用于生成SFT消息
        self.message_adapter = TraceToMessages()

    def adapt_for_sft(self, source: List[agl.Span]) -> List[dict]:
        """为监督微调(SFT)适配追踪数据:过滤初级研究员节点并转换为消息格式。"""
        # 定义初级研究员智能体对应的节点名称
        junior_agent_names = ["Geneticist", "Pharmacologist", "Neurologist"]
        # 过滤原始追踪数据,仅保留这些智能体生成的span
        # LangSmith会在span属性中为LangGraph节点添加'name'字段
        junior_spans = [s for s in source if s.attributes.get('name') in junior_agent_names]
        console.print(f"[bold yellow]Adapter (SFT):[/] 从 {len(source)} 个span中过滤出 {len(junior_spans)} 个初级研究员span。")
        if not junior_spans:
            return []
        # 使用标准消息适配器将过滤后的span转换为对话数据集
        return self.message_adapter.adapt(junior_spans)

    def adapt_for_ppo(self, source: List[agl.Span]) -> List[agl.Triplet]:
        """为近端策略优化(PPO)适配追踪数据:过滤高级研究员节点并转换为三元组。"""
        # 定义高级研究员智能体对应的节点名称
        senior_agent_names = ["HypothesisRefiner", "ProtocolDesigner"]
        # 配置父类的过滤器,使其仅匹配这些智能体名称
        self.agent_match = '|'.join(senior_agent_names)
        # 调用父类的adapt方法,它将自动过滤并处理相关span
        ppo_triplets = super().adapt(source)
        console.print(f"[bold yellow]Adapter (PPO):[/] 从 {len(source)} 个span中过滤并适配出 {len(ppo_triplets)} 个高级研究员三元组。")
        return ppo_triplets

    def adapt_for_bandit(self, source: List[agl.Span]) -> List[tuple[list[str], int, float]]:
        """为上下文赌博机算法适配一个完整的rollout追踪。"""
        # 首先,查找整个rollout的最终奖励
        final_reward = agl.find_final_reward(source)
        if final_reward is None:
            return []

        # 其次,找到Supervisor智能体做出决策的具体span
        supervisor_span = next((s for s in source if s.attributes.get('name') == 'Supervisor'), None)
        if not supervisor_span:
            return []

        # 然后,重构“上下文”——即监督者需要从中选择的假设列表
        junior_spans = [s for s in source if s.attributes.get('name') in ["Geneticist", "Pharmacologist", "Neurologist"]]
        contexts = []
        # 按开始时间排序以确保假设顺序正确
        for span in sorted(junior_spans, key=lambda s: s.start_time):
            try:
                # 在LangGraph中,智能体的最终JSON输出位于状态的'messages'属性中
                output_message = span.attributes.get('output.messages')
                if output_message and isinstance(output_message, list):
                    # 实际内容位于AIMessage的content字段中的JSON字符串内
                    content_str = output_message[-1].get('content', '{}')
                    hypothesis_data = json.loads(content_str)
                    contexts.append(hypothesis_data.get('hypothesis', ''))
            except (json.JSONDecodeError, KeyError, IndexError):
                continue

        if not contexts:
            return []

        # 最后,提取“动作”——即监督者选择的假设索引
        try:
            output_message = supervisor_span.attributes.get('output.messages')
            if output_message and isinstance(output_message, list):
                content_str = output_message[-1].get('content', '{}')
                supervisor_output = json.loads(content_str)
                chosen_index = supervisor_output.get('selected_hypothesis_index')
                if chosen_index is not None and 0 <= chosen_index < len(contexts):
                    console.print(f"[bold yellow]Adapter (Bandit):[/] 提取出上下文(假设列表)、动作(索引 {chosen_index})和奖励({final_reward:.2f})。")
                    # 返回赌博机算法的单个数据点
                    return [(contexts, chosen_index, final_reward)]
        except (json.JSONDecodeError, KeyError, IndexError):
            pass
        return []

# 实例化我们的自定义适配器
custom_adapter = HierarchicalTraceAdapter()

HierarchicalTraceAdapter 展示了 Agent-Lightning 数据管线的灵活性:一个类即可服务于整个分层训练策略。

  • adapt_for_sft:过滤出初级研究员(Junior Researchers)的对话,并将其转换为SFT数据集。
  • adapt_for_ppo:配置父类过滤器,使其仅处理高级研究员(Senior Researchers)的追踪片段(spans),并产出PPO所需的三元组(triplets)。
  • adapt_for_bandit:这是最复杂的部分,它解析完整的追踪数据,重构监督者(Supervisor)的决策情景(contexts)、选择动作(action)以及最终奖励(reward)。

这个适配器是整个训练架构的关键枢纽。它使得我们能够在统一的工作流(LangGraph)和统一的数据源(LangSmith 追踪数据)前提下,仍然可以为架构中的不同组件应用专门的训练算法。

使用 WandbLoggingHook 进行实时监控

有效的训练不仅需要运行算法,还需要具备实时可观测性。

我们需要“看见”智能体的学习表现,能够逐个rollout地实时观察其进展。

LangSmith 提供了单次追踪的深度细节,但我们同样需要一个宏观的整体视图。

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

Monitoring Hook (Created by Fareed Khan)

为此,我们创建自定义的 HookAgent-Lightning 的 Hook 机制允许在训练生命周期(例如 on_rollout_starton_trace_end)注入自定义逻辑。

我们构建了一个 WandbLoggingHook 来监听 on_trace_end 事件。当一个 rollout 完成并生成 trace 后,该 hook 会被触发,提取最终的 reward 值,并将这一关键指标记录到 Weights & Biases 项目中,从而为我们提供实时的学习曲线。

Hook 定义如下:

import wandb

class WandbLoggingHook(agl.Hook):
    def __init__(self, project_name: str):
        # 在 Hook 创建时初始化一次 W&B 运行。
        self.run_initialized = False
        if os.environ.get("WANDB_API_KEY"):
            try:
                wandb.init(project=project_name, resume="allow", id=wandb.util.generate_id())
                self.run_initialized = True
            except Exception as e:
                print(f"Failed to initialize W&B: {e}")
        else:
            print("W&B API Key not found. Hook will be inactive.")
    async def on_trace_end(self, *, rollout: agl.Rollout, tracer: agl.Tracer, **kwargs):
        """
        该方法由 Trainer 在每个 rollout 结束时自动调用。
        """
        # 如果 W&B 未初始化,则不执行任何操作。
        if not self.run_initialized: return

        # 使用辅助函数从 trace 的 spans 列表中查找最终的 reward 值。
        final_reward_value = agl.find_final_reward(tracer.get_last_trace())

        # 如果找到 reward,则将其记录到 W&B。
        if final_reward_value is not None:
            # 记录 reward 本身以及用于交叉引用的 rollout_id。
            wandb.log({"live_reward": final_reward_value, "rollout_id": rollout.rollout_id})
            console.print(f"[bold blue]Hook:[/] Logged reward {final_reward_value:.2f} for rollout {rollout.rollout_id} to W&B.")

# 实例化我们的自定义 hook。
custom_hook = WandbLoggingHook(project_name="Chimera-Project-Training")

WandbLoggingHook 充当了训练过程的实时仪表盘。通过实现 on_trace_end,它成为了一个轻量级、事件驱动的监控器,无缝融入了 Agent-Lightning 的生命周期。

其工作机制如下:
1. 初始化与容错:检查并初始化 W&B 运行,稳健地处理未找到 API 密钥或 reward 值的情况。
2. 数据提取:使用 agl.find_final_reward 解析 trace 以获取 reward。
3. 实时上报:将每个 rollout 的 reward 实时上报到 W&B,形成高频学习曲线,帮助我们即时发现性能回退或训练停滞。

实现三种强化学习算法

我们已经搭建好所有基础设施:分布式执行策略、多模型代理、复杂数据适配器以及实时监控钩子。现在,我们来定义训练算法本身。

这是分层训练策略的核心:

我们不采用单一的“大一统”算法,而是定义三种不同的算法……

它们分别针对智能体社会的不同层级。这样可以将最合适的学习范式应用于对应的认知任务,从而构建真正高效、细腻的智能体系统。

本节将实现每一层的完整训练逻辑:
* 层级 1 (SFT):为初级研究员执行监督微调,从 LightningStore 中选取成功的 trace 来教导其生成更优的初始假设。
* 层级 2 (PPO):配置 Agent-Lightning 内置的 VERL 算法,利用我们丰富的奖励信号对高级研究员进行在线强化学习,以提升其协议设计能力。
* 层级 3 (上下文赌博机):实现简洁有效的上下文赌博机算法,用于训练主管的选择策略,教导其挑选最有可能带来高奖励的假设。
* 主循环:最后,在主 fit() 循环中编排这三类算法,执行复杂的多阶段训练管线。

使用 SFT 算法训练初级研究员

第一批训练目标是初级研究员。他们负责创意发散(提出新颖可行的假设),非常适合进行监督微调

核心思路是:从最终奖励较高的 rollouts 中提取初级研究员的成功对话,构建高质量数据集对基础模型进行微调,让模型学习并模仿成功的创意模式。

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

SFT Training (Created by Fareed Khan)

我们创建自定义的 AlgorithmSFTOnSuccess。它将执行以下操作:
* 从 LightningStore 中查询高奖励的 traces。
* 使用 HierarchicalTraceAdapter 将其转换为对话数据集。
* 在独立进程中,使用高效的 unsloth 库执行微调。

一个关键的工程要点是:训练完成后,通过 vLLM 启动新的模型服务,并动态更新 LLMProxy,将初级智能体的请求路由到新模型。这形成了一个完整的闭环,使得后续的 rollouts 能够立即从改进的模型中受益。

首先,创建若干在独立进程中运行的辅助函数,以避免 GPU 资源冲突:

import asyncio
import multiprocessing
import subprocess
import httpx
import time
from contextlib import contextmanager
from datasets import Dataset as HuggingFaceDataset
from trl import SFTTrainer, SFTConfig
from unsloth import FastLanguageModel

@contextmanager
def serve_vllm_model(model_path: str, port: int):
    """上下文管理器,用于自动启动和关闭 vLLM 推理服务器。"""
    console.print(f"[SFT - vLLM] 正在为模型 {model_path} 在端口 {port} 上启动 vLLM 服务器...")
    proc = None
    try:
        # 使用 'agl vllm serve' 命令启动服务器,该命令包装了 vLLM,确保其与工具调用(tool-use)的 tokenization 兼容。
        cmd = ["agl", "vllm", "serve", model_path, "--port", str(port), "--gpu-memory-utilization", "0.7", "--enable-auto-tool-choice"]
        proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        # 健康检查循环,等待服务器就绪。
        with httpx.Client() as client:
            for _ in range(60): # 60秒超时
                try:
                    if client.get(f"http://localhost:{port}/health").status_code == 200:
                        console.print(f"[SFT - vLLM] 端口 {port} 上的服务器已就绪。")
                        yield f"http://localhost:{port}/v1" # 返回服务端点 URL。
                        return
                except httpx.ConnectError:
                    pass
                time.sleep(1)
        raise RuntimeError(f"端口 {port} 上的 vLLM 服务器启动失败。")
    finally:
        # 退出上下文时,确保终止服务器进程。
        if proc:
            proc.terminate()
            proc.wait()
            console.print(f"[SFT - vLLM] 端口 {port} 上的服务器已关闭。")

def unsloth_sft_trainer(dataset, base_model, output_dir):
    """在独立进程中运行的实际 SFT 训练函数。"""
    console.print(f"[SFT Process] 正在加载基础模型: {base_model}")
    # 使用 unsloth 高效加载模型,采用 4-bit 量化和 PEFT 适配器配置。
    model, tokenizer = FastLanguageModel.from_pretrained(model_name=base_model, max_seq_length=4096, load_in_4bit=True)
    model = FastLanguageModel.get_peft_model(model, r=16, target_modules=["q_proj", "k_proj", "v_proj", "o_proj"], lora_alpha=16, lora_dropout=0, bias="none")
    # 配置并运行 TRL 库中的 SFTTrainer。
    trainer = SFTTrainer(
        model=model,
        tokenizer=tokenizer,
        train_dataset=dataset,
        dataset_text_field="messages", # 指定训练器使用数据集的 'messages' 列。
        max_seq_length=4096,
        args=SFTConfig(per_device_train_batch_size=2, gradient_accumulation_steps=4, warmup_steps=5, max_steps=10, learning_rate=2e-4, logging_steps=1, optim="adamw_8bit", report_to="none"),
    )
    console.print("[SFT Process] 开始 SFT 训练...")
    trainer.train()
    console.print("[SFT Process] SFT 训练完成。正在保存合并后的模型。")
    # 以 16-bit 精度保存最终合并后的模型。
    model.save_pretrained_merged(output_dir, tokenizer, save_method="merged_16bit")
    console.print(f"[SFT Process] 模型已保存至 {output_dir}")
    return output_dir

我们定义了两个核心工具函数:
* unsloth_sft_trainer:利用 unsloth 库高效完成模型的 4-bit 量化加载、LoRA 适配器配置与监督微调训练。
* serve_vllm_model:以编程方式启动和关闭 vLLM 推理服务,并进行健康检查,确保服务在训练流程中稳定可用。

15. 实现动态训练:从监督微调到强化学习

15.1 创建 SFTOnSuccess 算法类

SFTOnSuccess 算法实现了对初级研究员(Junior Researchers)的闭环在线监督微调(SFT)。其核心逻辑是:从成功完成且获得高奖励的任务轨迹中,筛选出优质数据,用于微调基础模型,并动态更新系统中的模型服务。

from agentlightning.algorithm import Algorithm

class SFTOnSuccess(Algorithm):
    def __init__(self, reward_threshold=0.8, base_model="Qwen/Qwen2-1.5B-Instruct"):
        super().__init__()
        self.reward_threshold = reward_threshold  # 仅学习奖励 >= 0.8 的轨迹
        self.base_model = base_model
        self.adapter = HierarchicalTraceAdapter()  # 使用自定义适配器转换数据格式

    async def run(self, train_dataset, val_dataset):
        console.print("n[bold magenta]--- 启动初级研究员 SFT 训练 ---[/bold magenta]")
        # 获取中央数据存储的句柄
        store = self.get_store()

        console.print("正在分析现有任务轨迹以收集 SFT 数据...")
        # 从存储中查询所有成功完成的任务轨迹
        all_rollouts = await store.query_rollouts(status=["succeeded"])

        high_reward_traces = []
        # 筛选出满足奖励阈值的轨迹
        for rollout in all_rollouts:
            spans = await store.query_spans(rollout.rollout_id)
            final_reward = agl.find_final_reward(spans)
            if final_reward and final_reward >= self.reward_threshold:
                high_reward_traces.append(spans)

        console.print(f"找到 {len(high_reward_traces)} 条高奖励轨迹 (阈值 >= {self.reward_threshold})。")
        if high_reward_traces:
            # 使用适配器将成功轨迹转换为 SFT 所需的对话格式数据
            sft_data = self.adapter.adapt_for_sft(sum(high_reward_traces, []))
            sft_dataset = HuggingFaceDataset.from_list([{'messages': m['messages']} for m in sft_data])
            console.print(f"已将轨迹转换为 {len(sft_dataset)} 个对话样本用于 SFT。")

            # 定义新模型的唯一输出目录
            output_dir = f"./models/junior_researcher_sft_v{int(time.time())}"
            # 使用‘spawn’多进程上下文以确保 GPU 安全
            ctx = multiprocessing.get_context("spawn")
            q = ctx.Queue()
            # 在独立进程中运行训练
            p = ctx.Process(target=lambda: q.put(unsloth_sft_trainer(sft_dataset, self.base_model, output_dir)))
            p.start()
            p.join()  # 等待训练完成
            final_output_dir = q.get()

            # 获取 LLMProxy 的句柄
            llm_proxy = self.get_llm_proxy()
            if llm_proxy:
                console.print("正在使用新的 SFT 模型更新 LLMProxy...")
                new_port = 8002  # 在实际系统中,应动态分配端口
                # 使用上下文管理器来部署新模型
                with serve_vllm_model(final_output_dir, new_port) as new_endpoint:
                    # 更新代理的路由表,指向新的模型服务器
                    await llm_proxy.replace_model(self.base_model, f"openai/{final_output_dir}", api_base=new_endpoint)
                    console.print(f"LLMProxy 已更新。初级研究员现在将使用 {new_endpoint}。")
                    console.print("新模型服务器将保持运行 60 秒以供后续任务使用...")
                    await asyncio.sleep(60)  # 为演示暂时保持服务器存活

# 实例化 SFT 算法
sft_algorithm = SFTOnSuccess()

SFTOnSuccess 算法构建了一个完整的初级研究员训练闭环,整合了三个关键环节:
1. 数据科学:通过奖励阈值筛选高质量的任务轨迹(Trace)。
2. 工程训练:在独立进程中使用 unsloth 库进行高效的监督微调。
3. DevOps:通过 vLLM 部署微调后的模型,并动态更新 LLMProxy 的路由配置。

这实现了一个真正的“在线训练”系统:一旦训练完成,整个多智能体社会(multi-agent society)能立即受益于性能提升的模型,无需任何人工干预。

15.2 使用 PPO 优化高级研究员

接下来,我们将训练高级研究员(Senior Researcher)。其核心任务——设计详细的实验方案(protocol)——不仅需要创造性,还涉及大量方法性和序列性决策,这使其非常适合使用强化学习(Reinforcement Learning, RL)进行优化。

我们的目标是让智能体不仅模仿成功的模式,更能主动探索实验方案的设计空间,以最大化一个复杂、多维度的奖励信号。

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

PPO 算法示意图 (Created by Fareed Khan)

我们利用框架内置的 VERL(一个基于近端策略优化 PPO 的实现)来简化这一过程。开发者无需手动实现 PPO 的复杂逻辑,只需正确配置训练模型、PPO 超参数以及数据采集参数。

其中的一个关键步骤是将自定义的 HierarchicalTraceAdapter 传递给 VERL Trainer。这确保了训练器只会看到来自高级研究员(如 HypothesisRefinerProtocolDesigner)的 (状态, 动作, 奖励) 三元组数据,从而实现“外科手术式”的精准训练,避免其他角色的数据干扰策略学习。

定义 VERL 配置:

# 用于 agl.VERL 算法的标准配置字典。
verl_config = {
    # 算法特定的超参数。'grpo' 是一种高级优势估计器。
    "algorithm": {"adv_estimator": "grpo"},

    # 训练批次和序列长度的数据配置。
    "data": {"train_batch_size": 4, "max_prompt_length": 4096, "max_response_length": 2048},

    # 此模块定义了模型及其训练配置。
    "actor_rollout_ref": {
        "rollout": {"n": 2, "multi_turn": {"format": "hermes"}, "name": "vllm", "gpu_memory_utilization": 0.6},
        "actor": {"ppo_mini_batch_size": 4, "optim": {"lr": 1e-6}},
        # 我们将使用 PPO 进行微调的基础模型。
        "model": {"path": "meta-llama/Llama-3-8B-Instruct", "enable_gradient_checkpointing": True},
        # 参考模型的配置,使用 FSDP 以提高内存效率。
        "ref": {"fsdp_config": {"param_offload": True}}
    },

    # 通用训练器配置,包括日志记录和保存频率。
    "trainer": {
        "n_gpus_per_node": 1,
        "total_epochs": 2,
        "logger": ["console", "wandb"], # 同时记录到控制台和 Weights & Biases。
        "project_name": "Chimera-Project-Training",
        "experiment_name": "PPO-Senior-Researcher",
        "total_training_steps": 10, # 用于快速演示运行。在实际运行中,此值会高得多。
        "test_freq": 5, # 每 5 步在验证集上评估一次。
        "save_freq": 5 # 每 5 步保存一次模型检查点。
    }
}

# 使用我们的配置实例化 VERL 算法。
ppo_algorithm = agl.VERL(verl_config)

我们通过一个声明式字典配置了整个 PPO 训练流水线。这份 verl_config 指定了所有必要信息,从 Actor 模型的学习率 (1e-6) 到使用的 GPU 数量。

model.path 设置为 meta-llama/Llama-3-8B-Instruct,表明将使用该基础模型进行 PPO 微调。在 fit 循环中,VERL 算法将自动启动 vLLM 服务,更新 LLMProxy 以将 "senior_researcher_llm" 路由到该服务,并开始在线强化学习训练。

这种配置驱动的方式使我们能够利用强大的 PPO 实现,将精力集中在智能体逻辑设计上,而非强化学习的底层训练细节。

使用 Contextual Bandit 训练 Supervisor 策略

最后是层级顶部的 Supervisor。其角色与其他智能体不同:它不生成创意,也不设计复杂的协议,而是执行关键的“选择”任务:

给定 Junior Researchers 提供的一组假设,选择最有前景的一个。

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

这属于经典的“多臂老虎机”问题,但带有上下文信息。因此,我们采用 Contextual Bandit 方法。目标是学习一个策略:给定一组假设(上下文),预测哪个选择(动作)更有可能带来高的最终奖励。

我们从零开始实现了一个简洁有效的 Contextual Bandit 算法,它继承自 agl.Algorithm。我们使用 scikit-learnSGDClassifier 作为策略模型。在每次完成的轨迹中:

  1. LightningStore 查询轨迹数据。
  2. 使用 HierarchicalTraceAdapter 提取老虎机数据:候选假设(上下文)、Supervisor 的选择(动作)以及最终奖励。
  3. 将文本假设向量化为特征。
  4. 在线更新策略模型:如果奖励高,则强化该选择;如果奖励低,则进行惩罚。

定义 ContextualBanditRL

from sklearn.linear_model import SGDClassifier
from sklearn.feature_extraction.text import HashingVectorizer
import numpy as np

class ContextualBanditRL(Algorithm):
    def __init__(self):
        super().__init__()
        # 使用 SGDClassifier,设置 loss='log_loss' 以获得概率输出,warm_start=True 支持在线学习。
        self.policy = SGDClassifier(loss="log_loss", warm_start=True)
        # 使用 HashingVectorizer 将文本上下文高效地转换为数值特征。
        self.vectorizer = HashingVectorizer(n_features=2**12)
        self.is_fitted = False # 标志位,用于区分首次训练。
        self.adapter = HierarchicalTraceAdapter() # 自定义适配器,用于解析追踪数据。

    async def run(self, train_dataset, val_dataset):
        console.print("n[bold magenta]--- 启动 Supervisor 的上下文赌博机训练 ---[/bold magenta]")
        store = self.get_store()

        console.print("查询已完成的 rollout 以训练 supervisor 策略...")
        # 从数据存储中获取所有成功的 rollout。
        completed_rollouts = await store.query_rollouts(status=["succeeded"])

        if not completed_rollouts:
            console.print("未找到已完成的 rollout。跳过赌博机训练。")
            return

        training_samples = []
        # 处理每个 rollout,提取赌博机训练数据。
        for rollout in completed_rollouts:
            spans = await store.query_spans(rollout.rollout_id)
            # 适配器负责解析追踪数据。
            bandit_data = self.adapter.adapt_for_bandit(spans)
            training_samples.extend(bandit_data)

        if not training_samples:
            console.print("在追踪中未找到有效的 supervisor 决策。跳过训练。")
            return

        console.print(f"正在使用 {len(training_samples)} 个样本训练赌博机策略...")
        # 对每个收集到的数据点执行在线更新。
        for contexts, chosen_action_index, final_reward in training_samples:
            # 将假设字符串列表转换为数值特征矩阵。
            X = self.vectorizer.fit_transform(contexts)
            # 创建目标标签:被选中的动作为1,其余为0。
            y = np.zeros(len(contexts))
            y[chosen_action_index] = 1

            # 这是奖励逻辑的核心:创建样本权重。
            # 被选中的动作权重为最终奖励值。
            # 未被选中的动作权重为一个小的负值,与“错失”的奖励成比例。
            sample_weight = np.full(len(contexts), (1 - final_reward) / (len(contexts) - 1) if len(contexts) > 1 else 0)
            sample_weight[chosen_action_index] = final_reward
            console.print(f"[Bandit Training] 上下文(特征): {X.shape}, 动作: {chosen_action_index}, 奖励: {final_reward:.2f}, 样本权重: {sample_weight}")

            # 首次拟合后,使用 partial_fit 进行在线学习。
            if self.is_fitted:
                self.policy.partial_fit(X, y, sample_weight=sample_weight)
            else:
                self.policy.fit(X, y, sample_weight=sample_weight, classes=np.array([0, 1]))
                self.is_fitted = True

        console.print("上下文赌博机:Supervisor 策略已更新。")

# 实例化我们的赌博机算法。
bandit_algorithm = ContextualBanditRL()

ContextualBanditRL 实现了第三级训练。其 run 方法编排了 Supervisor 的学习流程:查询 LightningStore、使用适配器将追踪数据解析为 (context, action, reward) 元组,并对策略模型执行在线更新。

sample_weight 是算法的核心:它将最终的 rollout 奖励转化为对所选动作的学习信号。当选择带来高奖励时,权重为正,强化该策略;反之则减弱。这一机制将复杂下游流程的最终成功,反馈到高层策略的学习中。

构建基于三阶段训练的主循环

我们已经定义了三种训练算法:SFTOnSuccess(初级研究员)、VERL(高级研究员,基于 PPO)和 ContextualBanditRL(Supervisor)。最后一步是在多阶段训练管道中对它们进行编排。

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

训练循环示意图 (Created by Fareed Khan)

这展示了 Agent-Lightning Trainer 的强大与灵活性。我们定义 full_training_pipeline,按顺序执行各个算法,管理从初始数据采集到不同组件的定点微调。

主循环分为四个阶段:

  1. 阶段 1:初始数据采集。使用未经训练的基线模型运行若干迭代,主要目的是填充 LightningStore,积累多样化的初始追踪数据。
  2. 阶段 2:初级研究员的监督微调。运行 SFTOnSuccess,读取阶段 1 中高奖励的追踪数据,对初级模型进行微调。
  3. 阶段 3:高级研究员的近端策略优化。借助优化后的初级模型输出,使用 VERL 对高级策略进行在线强化学习。此阶段采集更高质量的数据并更新策略。
  4. 阶段 4:Supervisor 的上下文赌博机训练。利用之前各阶段积累的丰富数据,训练 Supervisor 的选择策略。

定义 full_training_pipeline

import agentlightning as agl

def full_training_pipeline():
    console.print("[bold red] --- CONFIGURING FULL TRAINING PIPELINE --- [/bold red]")

    # --- 共享组件 ---
    # 这些组件在所有训练阶段共享。
    store = agl.InMemoryLightningStore()
    llm_proxy = agl.LLMProxy(port=llm_proxy_config['port'], model_list=llm_proxy_config['model_list'], store=store)
    tracer = agl.AgentOpsTracer()

    # --- 阶段 1: 使用基线模型进行初始数据收集 ---
    console.print("n[bold magenta]--- Phase 1: Initial Data Gathering ---[/bold magenta]")
    # 为数据收集阶段实例化一个 Trainer。
    gather_trainer = agl.Trainer(
        n_runners=num_runners, strategy=strategy_config, store=store, tracer=tracer,
        llm_proxy=llm_proxy, hooks=[custom_hook]
    )
    # 为此阶段创建一个 LitAgent 实例。
    research_agent_gather = MedicalResearchAgent(research_graph, lambda p, c: get_weighted_reward(protocol_evaluator(p, c)))
    # 使用 .dev() 在少量数据上进行快速初始运行,以填充存储。
    gather_trainer.dev(research_agent_gather, train_dataset[:10])

    # --- 阶段 2: 对初级研究员进行 SFT ---
    # 实例化一个新的 Trainer,这次使用 SFT 算法。
    sft_trainer = agl.Trainer(algorithm=sft_algorithm, store=store, llm_proxy=llm_proxy)
    # 此算法的 .fit() 调用不需要数据集,因为它直接从存储中读取。
    sft_trainer.fit(research_agent_gather)

    # --- 阶段 3: 对高级研究员进行 PPO ---
    # 现在,创建一个配置为 PPO 算法的 Trainer。
    ppo_trainer = agl.Trainer(
        algorithm=ppo_algorithm, n_runners=num_runners, strategy=strategy_config,
        store=store, tracer=tracer, adapter=custom_adapter, llm_proxy=llm_proxy, hooks=[custom_hook]
    )
    # 此 LitAgent 实例将用于 PPO 的 rollout。
    research_agent_ppo = MedicalResearchAgent(research_graph, lambda p, c: get_weighted_reward(protocol_evaluator(p, c)))
    # 使用完整数据集调用 .fit() 以运行主要的 RL 训练循环。
    ppo_trainer.fit(research_agent_ppo, train_dataset=train_dataset, val_dataset=val_dataset)

    # --- 阶段 4: 对监督者进行上下文老虎机训练 ---
    # 最后,为老虎机算法创建一个 Trainer。
    bandit_trainer = agl.Trainer(algorithm=bandit_algorithm, store=store)
    # 这也从存储中读取,现在存储中也包含了 PPO 阶段的数据。
    bandit_trainer.fit(research_agent_gather)
    console.print("n[bold red]--- Hierarchical Training Pipeline Complete ---[/bold red]")

# 此代码块将执行我们的主函数。
# 注意:这是一个长时间运行的过程,需要大量 GPU 资源。
# 下面的输出是成功运行的模拟表示。
full_training_pipeline()

运行表现(示意):

###### OUTPUT #######
--- Phase 1: Initial Data Gathering ---
...
--- Node: Geneticist (Turn 1) ---
...
-- Rollout ro-abc123 Finished with Final Reward: 0.78 --
[Hook:] Logged reward 0.78 for rollout ro-abc123 to W&B.
...
Initial data gathering complete.

--- Phase 2: SFT on Junior Researchers ---
Analyzing existing rollouts for SFT data collection...
Found 8 high-reward traces (threshold >= 0.8).
...
[SFT Process] Starting SFT training...
[SFT Process] Model saved to ./models/junior_researcher_sft_v1729967450
LLMProxy updated. Junior researchers will now use http://localhost:8002/v1.

--- Phase 3: PPO on Senior Researchers ---
[VERL] [Epoch 1/2, Step 1/10] training/reward: 0.65, actor/loss: 0.123...
Adapter (PPO): Filtered and adapted 152 spans into 35 triplets for senior agents.
...
--- Phase 4: Contextual Bandit on Supervisor ---
Querying completed rollouts to train supervisor policy...
[Bandit Training] Contexts (features): (3, 4096), Action: 1, Reward: 0.82...
Contextual Bandit: Supervisor policy updated.
--- Hierarchical Training Pipeline Complete ---

输出清晰地展示了四个阶段的推进过程:
1. 首先采集基线数据,然后利用这些数据对初级智能体进行监督微调(阶段 2)。
2. 借助改进的初级智能体输出,对高级智能体进行近端策略优化训练(阶段 3)。
3. 最后,使用累积的数据训练监督者的选择策略(阶段 4)。

完成训练流程后,我们可以与基线方法进行对比,评估其性能。

性能评估与分析

我们已经成功设计并执行了复杂的分层训练流程。

但核心问题是:它有效吗?我们的智能体真的“学到东西”了吗?

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

Evaluatio Phase (Created by Fareed Khan)

没有评估的训练只是浪费算力。我们需要结合定量和定性方法,对结果进行严格分析。

本节将从训练转向分析。通过自动化指标、定性对比和深入的轨迹追踪,全面展示智能体的改进情况。

评估计划:
* 绘制学习曲线:从 WandbLoggingHook 中提取实时奖励数据,绘制学习曲线,可视化性能随时间提升的趋势。
* 定性对比:对比同一任务下基线模型与经过 PPO 训练后模型生成的方案,直观观察变化。
* 综合评估:在整个验证集上运行最终模型,计算一组指标,包括“LLM 作为评判者”的分数和新的“决策对齐”指标。
* 轨迹分析:使用 LangSmith 的轨迹功能深入分析一次完整的运行,剖析训练完备的多智能体系统的“思维过程”。

使用奖励曲线与性能指标进行验证

在强化学习系统中,奖励是最直观的学习度量。我们的 WandbLoggingHook 已在 PPO 训练阶段为每次 rollout 记录了最终奖励。现在从 W&B 提取数据并绘制曲线(包含平滑的移动平均),以过滤噪声并观察趋势。如果曲线呈上升态势,则表明智能体正在学习。

绘图函数:

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

def plot_learning_curve_from_wandb(run_path: str):
    """从指定的 W&B 运行中获取奖励数据并绘制学习曲线。"""
    console.print(f"正在从 W&B 运行中绘制学习曲线: {run_path}...")
    try:
        # 初始化 W&B API
        api = wandb.Api()
        # 获取指定的运行
        run = api.run(run_path)
        # 下载记录的历史指标,特别是‘live_reward’和步数
        history = run.history(keys=["live_reward", "_step"])
        if history.empty:
            raise ValueError("未找到指定运行的历史数据。")
        console.print(f"成功从 W&B 获取 {len(history)} 个数据点。")
    except Exception as e:
        # 如果从 W&B 获取失败(例如,API 密钥问题、路径错误),则使用模拟数据进行演示
        console.print(f"[bold red]无法获取 W&B 数据。使用模拟数据进行绘图。错误: {e}[/bold red]")
        # 创建一个带有噪声的、看起来真实的上升趋势数据
        simulated_rewards = np.linspace(0.55, 0.85, num=50) + np.random.normal(0, 0.05, 50)
        simulated_rewards = np.clip(simulated_rewards, 0, 1)
        history = pd.DataFrame({'live_reward': simulated_rewards, '_step': range(50)})
    # 计算奖励的 10 步滚动平均值以平滑曲线
    history['smoothed_reward'] = history['live_reward'].rolling(window=10, min_periods=1).mean()
    # 创建图表
    plt.figure(figsize=(12, 7))
    # 绘制平滑后的平均奖励曲线
    plt.plot(history['_step'], history['smoothed_reward'], marker='.', linestyle='-', color='blue', label='平滑平均奖励 (10步窗口)')
    # 绘制原始的、每次 rollout 的奖励,作为较浅的半透明线以显示方差
    plt.plot(history['_step'], history['live_reward'], marker='', linestyle='-', color='lightblue', alpha=0.4, label='每次 Rollout 的原始奖励')

    plt.title('智能体性能(奖励)随训练步骤的变化', fontsize=16)
    plt.xlabel('训练 Rollout 步骤', fontsize=12)
    plt.ylabel('平均奖励', fontsize=12)
    plt.legend()
    plt.grid(True, which='both', linestyle='--', linewidth=0.5)
    plt.ylim(0, 1.05) # 将 y 轴范围设置为 0 到 1.05 以获得更清晰的视图
    plt.show()

# 将 ‘your-entity/Chimera-Project-Training/your-run-id’ 替换为实际的 W&B 运行路径
plot_learning_curve_from_wandb("your-entity/Chimera-Project-Training/your-run-id")

该函数是定量验证的核心工具:它直接连接到实验追踪平台(W&B),可视化训练过程中最关键的指标——奖励随时间的变化。

生成的图表通常包含两条曲线:浅蓝色的原始奖励(方差较大,属正常现象)和深蓝色的 10 步滚动平均奖励(揭示了真实的性能趋势)。平均奖励的持续上升是 PPO 算法成功学习的明确证据,表明随着训练的进行,智能体生成的实验方案得分越来越高。

定性分析

定量指标虽然重要,但只讲述了故事的一半。

奖励上升固然好,但这种改进“具体表现为什么样子”?

为了真正理解训练带来的影响,我们需要审视智能体的原始输出——即进行定性分析。

最有效的方式是进行并排对比:针对同一验证集任务,让两个不同版本的“高级研究员”智能体生成实验方案:

  1. 基线模型:原始的、未经 PPO 训练的 meta-llama/Llama-3-8B-Instruct
  2. 微调后模型:经过完整 PPO 训练后的最终策略,代表学习成果。

我们实现一个函数,使用指定的模型运行完整的 LangGraph 工作流来生成实验方案,然后进行对比。

首先,编写一个用于寻找可用网络端口的工具函数:

import socket

def find_free_port():
    """查找并返回本地机器上一个未使用的网络端口。"""
    # 创建一个临时套接字
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        # 绑定到端口 0 会告诉操作系统分配一个任意的未使用端口
        s.bind(('', 0))
        # 返回操作系统分配的端口号
        return s.getsockname()[1]

然后是核心的对比函数:

```python
from rich.panel import Panel

def generate_protocol_for_comparison(model_path: str, task: ResearchTask) -> str:
    """为给定任务和指定模型生成研究方案。"""
    # 寻找一个空闲端口来为本次运行启动模型服务。
    port = find_free_port()
    # 使用上下文管理器启动 vLLM 服务器,并确保其最终被关闭。
    with serve_vllm_model(model_path, port) as endpoint:
        # 创建一个指向临时服务器的 LitAgent LLM 资源。
        llm_resource = agl.LLM(endpoint=endpoint, model=model_path)

        # 需要将 Senior Researcher 节点临时重新绑定到这个特定模型。
        # 这与在主 LitAgent 中使用的动态绑定逻辑相同。
        llm_with_endpoint = senior_researcher_llm.with_config({"openai_api_base": endpoint, "openai_api_key": "dummy-key"})
        hypothesis_refiner_agent = create_agent_runner(llm_with_endpoint, prompts["HypothesisRefiner"], all_tools)
        protocol_designer_agent = create_agent_runner(llm_with_endpoint, prompts["ProtocolDesigner"], all_tools)

        # 为本次评估运行创建一个临时的图副本。
        graph_for_comparison = research_graph.copy()
        # 使用指定的模型注入智能体运行器。
        graph_for_comparison.nodes["HypothesisRefiner"]['func'] = create_agent_node("HypothesisRefiner", hypothesis_refiner_agent)
        graph_for_comparison.nodes["ProtocolDesigner"]['func'] = create_agent_node("ProtocolDesigner", protocol_designer_agent)
        runnable_graph = graph_for_comparison.compile()

        # 执行完整的工作流。
        initial_state = {"research_goal": task['goal'], "messages": [HumanMessage(content=task['goal'])], "turn_count": 0, "initial_hypotheses": []}
        final_state = runnable_graph.invoke(initial_state)
        # 提取并返回最终方案。
        final_protocol = final_state.get('final_protocol', 'Protocol generation failed.')
        return json.dumps(final_protocol, indent=2) # 返回格式化的 JSON 字符串。

generate_protocol_for_comparison 函数复用了 MedicalResearchAgent.rollout 中的核心逻辑:临时复制工作流图、动态注入指定的模型,从而在完整的复杂工作流中隔离地评估该模型的表现。

执行对比:

# 原始预训练模型的路径。
base_model_path = "meta-llama/Llama-3-8B-Instruct"
# 最终 PPO 训练模型检查点的保存路径。
# 注意:本演示将使用模拟输出,因为完整训练计算成本高昂。
fine_tuned_model_path = "./models/senior_researcher_ppo_final"

# 从验证集中选取一个样本任务进行公平对比。
sample_eval_task = val_dataset[0]

# 运行基础模型与微调后智能体系统的对比
print(f"正在使用基础模型生成方案:{base_model_path}...")
base_model_protocol = generate_protocol_for_comparison(base_model_path, sample_eval_task)

print(f"正在使用微调模型生成方案:{fine_tuned_model_path}...")
trained_model_protocol = generate_protocol_for_comparison(fine_tuned_model_path, sample_eval_task)

# 使用 'rich' 库将两个方案以清晰、带标题的面板形式显示。
console.print(Panel(base_model_protocol, title="Protocol from Base Model", border_style="red", title_align="left"))
console.print(Panel(trained_model_protocol, title="Protocol from Fine-Tuned Model", border_style="green", title_align="left"))

示例输出:

“`
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
Title: Test GLP-1 on Amyloid ┃
Steps:
┃ 1. Get mice. ┃
┃ 2. Inject drug. ┃
┃ 3. Measure amyloid. ┃
Safety: Standard lab procedures. ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
Protocol from Fine-Tuned Model
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
Title: Pre-Clinical Protocol to Evaluate the Efficacy of Liraglutide (GLP-1 ┃
┃ Agonist) on Amyloid-Beta Plaque Burden in a 5XFAD Mouse Model of Alzheimers ┃
┃ Disease. ┃
Steps:
┃ 1. Animal Model: Utilize 6-month-old male 5XFAD transgenic mice (n=20 per ┃
┃ group). ┃
┃ 2. Treatment Groups: (a) Vehicle control (saline), (b) Liraglutide (25 ┃
┃ nmol/kg/day via subcutaneous injection). ┃
┃ 3. Dosing Regimen: Administer daily for 8 weeks. ┃
┃ 4. Primary Endpoint Analysis: At 8 weeks, sacrifice animals and perform ┃
┃ immunohistochemistry (IHC) on brain tissue using 6E10 antibody to quantify ┃
┃ amyloid-beta plaque load in the hippocampus and cortex. ┃
Safety: All animal procedures must be approved by the IACUC. Liraglutide is┃
┃ a known hypoglycemic agent; monitor for signs of distress. ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

两者差异显著:

  1. 基线模型生成的方案简陋,虽提及“老鼠、药物、淀粉样蛋白”等关键词,但缺乏科学严谨性与可执行性。
  2. 微调后模型生成的方案则展现出真正的科学实验设计深度与细节:明确指定使用 5XFAD 转基因小鼠模型、精确的给药剂量与周期、清晰的主终点(如 IHC 及 6E10 抗体检测),并包含了 IACUC 审批要求与低血糖风险等安全注意事项。

这标志着一次质的飞跃,其核心驱动力在于我们的 PPO 训练框架与多维奖励机制:

智能体不仅是在生成更长的文本,而是真正学会了“高质量科学实验设计”所必需的结构化思维与核心内容要素。

使用多指标进行综合评估

为了以生产级标准验证系统性能,我们需要从个案分析转向在更大规模验证集上进行综合定量评估。

我们将使用最终训练完成的智能体,在整个验证集(200 条未见过的任务)上运行完整的 LangGraph 工作流,并系统性地收集各项指标:

  • 构建评估循环:通过异步函数 run_full_evaluation 遍历整个 val_dataset
  • 执行完整工作流:针对每个任务,生成最终实验方案与 GO/NO-GO 决策。
  • 计算多项指标:包括基于 LLM-as-a-Judge 的评分,以及关键的“决策对齐率”(最终决策与 PubMedQA 数据集中 expected_decision 是否一致)。

评估函数定义如下:

“`python
from tqdm.notebook import tqdm
from collections import defaultdict
import random

async def run_full_evaluation(dataset: List[ResearchTask]):
“””
在完整验证数据集上运行训练完成的智能体,并计算一系列性能指标。
“””
console.print(f”正在对 {len(dataset)} 个验证样本进行完整评估…”)

# 用于存储各项指标结果的字典。
all_metrics = defaultdict(list)
successful_runs = 0

# 在此评估中,我们将使用我们强大的评审委员会模型。
# 在实际场景中,这里应指向我们最终训练好的 senior_researcher_llm。
final_llm_resource = review_board_llm

# 使用最终的“最佳”模型创建单个 LitAgent 实例。
# 图的复制和绑定方式与比较函数中相同。
llm_with_endpoint = senior_researcher_llm.with_config({
    "openai_api_base": final_llm_resource.openai_api_base,
    "openai_api_key": final_llm_resource.openai_api_key
})
hypothesis_refiner_agent = create_agent_runner(llm_with_endpoint, prompts["HypothesisRefiner"], all_tools)
protocol_designer_agent = create_agent_runner(llm_with_endpoint, prompts["ProtocolDesigner"], all_tools)

graph_for_eval = research_graph.copy()
graph_for_eval.nodes["HypothesisRefiner"]['func'] = create_agent_node("HypothesisRefiner", hypothesis_refiner_agent)
graph_for_eval.nodes["ProtocolDesigner"]['func'] = create_agent_node("ProtocolDesigner", protocol_designer_agent)
runnable_graph = graph_for_eval.compile()

# 使用进度条遍历验证集中的每个任务。
for task in tqdm(dataset):
    try:
        # 为当前任务执行完整的图工作流。
        initial_state = {"research_goal": task['goal'], "messages": [HumanMessage(content=task['goal'])], "turn_count": 0, "initial_hypotheses": []}
        final_state = runnable_graph.invoke(initial_state)

        final_protocol = final_state.get('final_protocol')
        final_decision = final_state.get('final_decision')
        # 仅对成功完成并产生最终方案和决策的运行进行评分。
        if final_protocol and final_decision:
            successful_runs += 1
            # 1. 计算多维度 LLM-as-a-judge 评分。
            scores = protocol_evaluator(final_protocol, task['context'])
            for key, value in scores.items():
                all_metrics[f"LLM-as-Judge: {key.capitalize()}"].append(value)

            # 2. 计算加权后的最终奖励值。
            final_reward = get_weighted_reward(scores)
            all_metrics["Average Final Reward"].append(final_reward)

            # 3. 计算决策对齐率。这是一个关键指标。
            # 当智能体决策为‘GO’且数据集期望为‘yes’,或决策为‘NO-GO’且期望为‘no’时,视为对齐。
            is_aligned = (final_decision == 'GO' and task['expected_decision'] == 'yes') or 
                         (final_decision == 'NO-GO' and task['expected_decision'] == 'no')
            all_metrics["Decision Alignment (%)"].append(100.0 if is_aligned else 0.0)

            # 4. 记录执行轮数以衡量效率。
            all_metrics["Average Turn Count"].append(final_state.get('turn_count', 0))
    except Exception as e:
        console.print(f"[bold red]任务 {task['id']} 的评估失败: {e}[/bold red]")
console.print(f"评估完成。已处理 {len(dataset)} 个样本。")

# 汇总结果并在最终表格中展示。
results_table = Table(title="Chimera Project: 最终评估结果")
results_table.add_column("Metric", style="cyan")
results_table.add_column("Value", style="magenta")
# 首先添加高层次的执行成功率。
results_table.add_row("Execution Success Rate (%)", f"{(successful_runs / len(dataset)) * 100:.2f}")

# 为每个收集到的指标添加其平均值。
for metric_name, values in sorted(all_metrics.items()):
    if values:
        results_table.add_row(metric_name, f"{np.mean(values):.2f}")
console.print(results_table)

在我们的验证数据集上运行完整评估。

注意:这是一个耗时过程。下面的输出代表了一次完整运行的结果。

await run_full_evaluation(val_dataset)

OUTPUT

Running full evaluation on 200 validation samples…
Evaluation complete. Processed 200 samples.
Chimera Project: Final Evaluation Results
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Metric ┃ Value ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Execution Success Rate (%) │ 98.50 │
│ Average Final Reward │ 0.81 │
│ Decision Alignment (%) │ 87.82 │
│ Average Turn Count │ 5.30 │
│ LLM-as-Judge: Clarity │ 0.91 │
│ LLM-as-Judge: Efficiency │ 0.82 │
│ LLM-as-Judge: Feasibility │ 0.85 │
│ LLM-as-Judge: Groundedness │ 0.89 │
│ LLM-as-Judge: Impact │ 0.88 │
│ LLM-as-Judge: Novelty │ 0.76 │
└───────────────────────────────┴───────────────┘

该表格从多个维度展示了系统的综合性能:

  • 执行成功率 (98.50%):系统鲁棒性强,能够近乎完美地完成复杂多步工作流。
  • 平均最终奖励 (0.81):作为训练优化的主要指标,在未见过的验证集上表现良好,表明生成的实验方案质量稳定。
  • 决策一致性 (87.82%):与 PubMedQA 专家标注的最终决策一致率接近 88%,说明智能体不仅能设计方案,还能做出与专家共识高度一致的最终策略判断。
  • LLM-as-Judge 明细:在清晰度 (0.91)、事实依据 (0.89) 和潜在影响力 (0.88) 等维度得分较高,表明生成的方案具备科学严谨性、证据扎实且具有实际意义。

这份综合评估定量证明了分层训练策略的成功:我们训练出了一个鲁棒、有效且与科学研究目标高度一致的多智能体系统。

Single Run LangSmith Tracing

定量指标告诉我们“表现如何”,而理解“如何/为何”表现则需要更深入的洞察:剖析一次完整运行时智能体的“思维过程”。

这正是 LangSmith 深度可观测性工具的价值所在。

我们检查一次评估运行的完整追踪记录。LangSmith 的追踪提供了层级化、逐步可视化的全流程视图:包括每个节点的运行、每次工具调用、每次LLM调用,从而进行“智能体行为分析”,精准定位智能体如何做出最终决策。

这与定量指标形成互补,帮助我们:

  • 可视化工作流:看到 LangGraph 中实际执行过的路径(包括重写回路)。
  • 检查工具调用:查看智能体发起的具体查询与返回的数据。
  • 调试推理过程:阅读各次LLM调用的输入/输出,理解智能体做出特定决策的原因。
  • 验证奖励计算:查看 LitAgent 发出的最终奖励记录,确认该次运行的得分是如何计算的。

示意截图:

构建自我进化的AI智能体:从静态提示到动态协作训练架构全解析

LangSmith Customize Dashboard (Created by Fareed Khan)

从上至下完整呈现了智能体的运行过程,是我们理解复杂编排系统的“地面真相”。可以看到:

  1. 顶层运行概览:最外层的 MedicalResearchAgent 记录代表整个运行过程,包含总时长与元数据。
  2. LangGraph 执行流:其下是完整的 research_graph 执行过程,GeneticistSupervisorHypothesisRefinerProtocolDesigner 等每个节点都作为子记录展示其运行序列。
  3. 工具调用与 ReAct 循环:例如,在 HypothesisRefiner 记录内部嵌套了工具执行及对结果的处理过程;可以点进 pubmed_search 查看具体的查询语句和返回的文献。
  4. 最终奖励:追踪记录的末尾可以看到 Reward 记录,即 agl.emit_reward() 的结果,可以核对该次运行的加权得分是如何计算的,并作为PPO训练的学习信号。

这种细粒度、层级化的可观测性对于复杂的智能体系统而言不是奢侈品,而是必需品。它将系统从“黑盒”变为透明、可调试的。一旦运行失败或输出质量不佳,我们能快速回放并定位问题(例如糟糕的工具调用、对结果的错误理解、决策偏差等),并据此进行针对性改进。

How Our RL Training Logic Works

最后总结完整的训练流程:

  1. 初始数据采集:使用基线预训练模型执行完整的多智能体工作流,填充 LightningStore,积累多样化的对话追踪记录和最终奖励。
  2. 初级研究员微调 (SFT)SFTOnSuccess 模块从初始追踪记录中筛选高奖励的成功样本,对较小的 Qwen2 模型进行监督微调,以提升其创意假设生成能力。
  3. 动态模型更新:SFT完成后,通过 vLLM 部署新模型并更新 LLMProxy,使后续的运行能直接受益于改进后的初级研究员模型。
  4. 高级研究员强化学习 (RL):启动 VERL (PPO) 主循环,借助改进后的初级研究员输出来采集新数据,并使用多维奖励对 Llama-3 策略模型进行在线更新,以提升其实验方案设计能力。
  5. 实时监控WandbLoggingHook 监听每次PPO运行结束事件,将最终奖励实时记录到 Weights & Biases (W&B) 平台,形成学习曲线。
  6. 监督员的策略学习ContextualBanditRL 模块遍历全流程积累的追踪记录,解析监督员的选择与最终奖励之间的关系,在线强化其选择策略,使其学会在不同上下文下选择更可能成功的假设。

关注“鲸栖”小程序,掌握最新AI资讯

本文由鲸栖原创发布,未经许可,请勿转载。转载请注明出处:http://www.itsolotime.com/archives/13481

(0)
上一篇 2025年11月15日 上午9:51
下一篇 2025年11月15日 下午12:36

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注