解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

构建高效的智能体(Agentic)系统,离不开扎实的软件工程实践。其核心在于设计能够协调运作、并行执行,并能与外部系统高效交互的组件。例如,推测执行(Speculative Execution) 通过预先处理可预测的请求来降低延迟;冗余执行(Redundant Execution) 则通过同时运行同一智能体的多个副本来避免单点故障,提升系统韧性。除此之外,还有一系列模式可以显著增强现代智能体系统的可靠性与性能。

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

并行机制示意(Created by Fareed Khan)

本文将深入探讨以下核心并行化模式:

  • 并行工具调用(Parallel Tool Use):当智能体需要执行彼此独立的 API 调用时,并行触发以隐藏 I/O 延迟。
  • 分层智能体团队(Hierarchical Agent Teams):由一个管理者将复杂任务拆解为子步骤,分发给多个执行者并行处理。
  • 竞争性智能体集成(Competitive Agent Ensembles):多个智能体并行处理同一问题,由系统挑选最佳结果。
  • 冗余执行(Redundant Execution):部署两个或多个智能体解决同一任务,用于错误检测和提升可靠性。
  • 并行检索与混合搜索(Parallel Retrieval and Hybrid Search):并行运行多种检索策略,以提高上下文信息的质量和相关性。
  • 多跳检索(Multi Hop Retrieval):智能体通过迭代的检索步骤,逐层深入,收集更相关、更深层的信息。

本文将逐一剖析这些最常用模式背后的核心流程与概念。我们将通过可视化图表阐明其设计目的,并提供简化的可运行代码示例,展示它们如何融入真实世界的智能体系统。

所有相关的理论和代码实现均可在 GitHub 仓库获取:

GitHub – FareedKhan-dev/agentic-parallelism: Core concepts – where to apply parallelism in agentic…

代码仓库结构如下:

agentic-parallelism/  
    ├── 01_parallel_tool_use.ipynb  
    ├── 02_parallel_hypothesis.ipynb  
    ...  
    ├── 06_competitive_agent_ensembles.ipynb  
    ├── 07_agent_assembly_line.ipynb  
    ├── 08_decentralized_blackboard.ipynb  
    ...  
    ├── 13_parallel_context_preprocessing.ipynb  
    └── 14_parallel_multi_hop_retrieval.ipynb

目录

  • Parallel Tool Use Hiding I/O Latency
  • Hypothesis Generation to Strategic Exploration
  • Parallel Evaluation for Robust Governance
  • Speculative Execution for Hyper-Responsive Agents
  • Hierarchical Agent Teams for Superior Quality
  • Competitive Agent Ensembles
  • Agent Assembly Line for High-Volume Processing
  • Decentralized Blackboard Collaboration
  • Redundant Execution for Fault Tolerance
  • Parallel Query Expansion to Maximize Recall
  • Sharded & Scattered Retrieval
  • Parallel Hybrid Search Fusion for High-Fidelity Context
  • Parallel Context Pre-processing for Accuracy Gains
  • Multi-Hop Retrieval for Deep Reasoning

Parallel Tool Use Hiding I/O Latency

在智能体系统中,最主要的性能瓶颈往往并非大语言模型(LLM)的“思考”时间,而是I/O延迟——即等待网络请求、数据库查询或外部API响应所消耗的时间。

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

并行工具处理(Created by Fareed Khan)

例如,当一个智能体需要从多个独立来源收集信息(如“获取股票价格”和“搜索近期新闻”)时,采用朴素的串行执行方式会依次调用工具,效率低下。如果这些调用之间没有依赖关系,就没有理由不并行执行。

接下来,我们将构建一个智能体系统来演示此模式。该系统接收用户查询,识别出需要调用两个独立的实时API,并以并行方式执行。

首先,创建两个真实世界的工具。第一个是使用 yfinance 库获取实时股价的工具。

from langchain_core.tools import tool
import yfinance as yf

@tool
def get_stock_price(symbol: str) -> float:
    """Get the current stock price for a given stock symbol using Yahoo Finance."""
    # 添加打印语句以清晰指示工具执行时机
    print(f"--- [Tool Call] Executing get_stock_price for symbol: {symbol} ---")

    # 实例化 yfinance Ticker 对象
    ticker = yf.Ticker(symbol)

    # 获取股票信息。使用‘regularMarketPrice’作为可靠数据源,并提供备选方案。
    price = ticker.info.get('regularMarketPrice', ticker.info.get('currentPrice'))

    # 处理股票代码无效或数据不可用的情况
    if price is None:
        return f"Could not find price for symbol {symbol}"
    return price

LangChain@tool 装饰器将普通Python函数转换为智能体可用的工具。此函数用于获取指定股票代码的市场价格。

快速测试工具是否能连接实时API:

get_stock_price.invoke({"symbol": "NVDA"})

#### OUTPUT ####
--- [Tool Call] Executing get_stock_price for symbol: NVDA ---
121.79 ...

输出确认工具已正确连接并能访问外部 yfinance API。

接着,创建第二个工具,使用 Tavily 搜索API获取公司近期新闻。该API专为基于LLM的智能体优化。

from langchain_community.tools.tavily_search import TavilySearchResults

# 初始化基础的 Tavily 搜索工具,限制结果为最相关的5篇文章
tavily_search = TavilySearchResults(max_results=5)

@tool
def get_recent_company_news(company_name: str) -> list:
    """Get recent news articles and summaries for a given company name using the Tavily search engine."""
    # 添加打印语句以清晰记录工具执行
    print(f"--- [Tool Call] Executing get_recent_company_news for: {company_name} ---")

    # 构造更具体的搜索查询
    query = f"latest news about {company_name}"

    # 调用底层的 Tavily 工具
    return tavily_search.invoke(query)

这里将 TavilySearchResults 封装在自定义的 @tool 函数中,目的是根据用户查询获取最新新闻。

同样进行测试:

get_recent_company_news.invoke({"company_name": "NVIDIA"})

#### OUTPUT ####
--- [Tool Call] Executing get_recent_company_news for: NVIDIA ---
[{'url': 'https://www.reuters.com/technology/nvidia-briefly-surpasses-microsoft-most-valuable-company-2024-06-18/', 'content': 'Nvidia briefly overtakes Microsoft as most valuable company...'}, ...]

输出表明第二个工具也能正常工作。至此,智能体已具备两个真实的数据采集能力。

为了准确衡量性能提升,我们需要构建工作流并记录性能。首先定义图(graph)的状态,并加入性能日志字段。

from typing import TypedDict, Annotated, List
from langchain_core.messages import BaseMessage
import operator

class AgentState(TypedDict):
    # ‘messages’ 保存对话历史
    messages: Annotated[List[BaseMessage], operator.add]
    # ‘performance_log’ 累积记录每个步骤执行时间的字符串
    # ‘operator.add’ 函数指示 LangGraph 追加到此列表而非替换
    performance_log: Annotated[List[str], operator.add]

这个 AgentState 类将作为本次智能体运行的“黑盒记录器”。通过对 performance_log 使用 Annotatedoperator.add 归约器,我们为每个执行节点创建了持久化日志。后续可据此分析总执行时间及各阶段耗时。

现在,我们创建第一个带监测的节点,即调用大语言模型(LLM)的智能体“大脑”。

import time

def call_model(state: AgentState):
    """智能体节点:调用LLM,测量其自身执行时间,并将结果记录到状态中。"""
    print("--- AGENT: 调用 LLM ---")
    start_time = time.time()

    # 从状态中获取当前的消息历史。
    messages = state['messages']

    # 调用我们具备工具调用能力的LLM。LLM将决定是直接回答还是需要调用工具。
    response = llm_with_tools.invoke(messages)

    end_time = time.time()
    execution_time = end_time - start_time

    # 创建一个包含性能数据的日志条目。
    log_entry = f"[AGENT] LLM 调用耗时 {execution_time:.2f} 秒。"
    print(log_entry)

    # 将LLM的响应和新的日志条目返回,以便添加到状态中。
    return {
        "messages": [response],
        "performance_log": [log_entry]
    }

call_model 是我们的第一个带监测的图节点。它使用 time.time() 包裹 llm_with_tools.invoke() 调用,精确测量LLM的“思考”时间,并将结果以易读的字符串格式写入状态。

接下来,我们创建执行工具的带监测节点。

from langchain_core.messages import ToolMessage
from langgraph.prebuilt import ToolExecutor

# ToolExecutor 是 LangGraph 的一个实用工具,可以接收一系列工具调用并执行它们。
tool_executor = ToolExecutor(tools)

def call_tool(state: AgentState):
    """工具节点:执行LLM规划的工具调用,测量性能,并记录结果。"""
    print("--- TOOLS: 执行工具调用 ---")
    start_time = time.time()

    # 智能体的最后一条消息将包含工具调用。
    last_message = state['messages'][-1]
    tool_invocations = last_message.tool_calls

    # ToolExecutor 可以批量执行工具调用。对于同步工具,其内部仍然是顺序执行,
    # 但这是管理执行的一种清晰方式。
    responses = tool_executor.batch(tool_invocations)

    end_time = time.time()
    execution_time = end_time - start_time

    # 为工具执行阶段创建一个日志条目。
    log_entry = f"[TOOLS] 在 {execution_time:.2f} 秒内执行了 {len(tool_invocations)} 个工具。"
    print(log_entry)

    # 我们将工具响应格式化为 ToolMessage,这是 LangGraph 期望的标准格式。
    tool_messages = [
        ToolMessage(content=str(response), tool_call_id=call['id'])
        for call, response in zip(tool_invocations, responses)
    ]

    # 返回工具消息和性能日志。
    return {
        "messages": tool_messages,
        "performance_log": [log_entry]
    }

call_model 类似,这里使用计时器包裹了核心逻辑 tool_executor.batch(tool_invocations)。记录 batch 的总耗时,使我们能够在后续与串行执行进行对比,从而量化并行化带来的收益。

定义好带监测的节点后,就可以使用 StateGraph 将它们连接起来。

from langgraph.graph import END, StateGraph

# 此函数充当条件边,根据智能体的最后一条消息来路由工作流。
def should_continue(state: AgentState) -> str:
    last_message = state['messages'][-1]
    # 如果最后一条消息包含工具调用,则路由到 ‘tools’ 节点。
    if last_message.tool_calls:
        return "tools"
    # 否则,智能体已完成推理,我们结束图。
    return END

# 定义图工作流。
workflow = StateGraph(AgentState)

# 添加我们已添加监测的节点。
workflow.add_node("agent", call_model)
workflow.add_node("tools", call_tool)

# 入口点是 ‘agent’ 节点。
workflow.set_entry_point("agent")

# 添加用于路由的条件边。
workflow.add_conditional_edges("agent", should_continue, {"tools": "tools", END: END})

# 添加从 tools 节点回到 agent 节点的边,形成循环。
workflow.add_edge("tools", "agent")

# 将图编译为可运行的应用程序。
app = workflow.compile()

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

并行工具调用(Created by Fareed Khan)

我们定义了一个简单的循环:

  1. agent 进行思考。
  2. should_continue 判断是否需要采取行动;如果需要,则路由到 tools 节点执行,然后返回 agent 节点处理行动结果。
  3. compile() 将抽象定义转换为可运行的对象。

给 Agent 一个需要调用两个工具的查询,并以流式方式逐步查看其执行状态:

from langchain_core.messages import HumanMessage
import json

# 图的初始输入,包含用户查询
inputs = {
    "messages": [HumanMessage(content="What is the current stock price of NVIDIA (NVDA) and what is the latest news about the company?")],
    "performance_log": []
}
step_counter = 1
final_state = None

# 使用 .stream() 并设置 stream_mode='values',以在每个节点运行后获取完整的状态字典
for output in app.stream(inputs, stream_mode="values"):

    # 输出字典的键是刚刚运行的节点名称
    node_name = list(output.keys())[0]
    print(f"n{'*' * 100}")
    print(f"**Step {step_counter}: {node_name.capitalize()} Node Execution**")
    print(f"{'*' * 100}")

    # 美化打印状态以便详细检查
    state_for_printing = output[node_name].copy()
    if 'messages' in state_for_printing:
        # 将消息对象转换为更易读的字符串表示
        state_for_printing['messages'] = [msg.pretty_repr() for msg in state_for_printing['messages']]
    print("nCurrent State:")
    print(json.dumps(state_for_printing, indent=4))

    # 为每一步添加分析
    print(f"n{'-' * 100}")
    print("State Analysis:")

    if node_name == "agent":
        # 检查 Agent 的响应是否包含工具调用
        if "tool_calls" in state_for_printing['messages'][-1]:
            print("Agent 已处理输入。LLM 正确规划了并行工具调用。LLM 调用的执行时间已被记录。")
        else:
            print("Agent 已收到工具执行结果,并将其综合为连贯的最终答案。性能日志现已包含完整历史记录。")
    elif node_name == "tools":
        print("工具执行器收到了工具调用并执行了它们。结果现在以 ToolMessage 的形式存在于状态中。性能日志正在累积。")
    print(f"{'-' * 100}")
    step_counter += 1
    final_state = output[node_name]

运行代码以观察并行效果:

#### OUTPUT ####
****************************************************************************************************
**Step 1: Agent Node Execution**
****************************************************************************************************
--- AGENT: Invoking LLM ---
[AGENT] LLM call took 4.12 seconds.

Current State:
{
    "messages": [
        "HumanMessage(content='What is the current stock price of NVIDIA (NVDA) and what is the latest news about the company?')",
        "AIMessage(content='', tool_calls=[{'name': 'get_stock_price', 'args': {'symbol': 'NVDA'}, 'id': '...'}, {'name': 'get_recent_company_news', 'args': {'company_name': 'NVIDIA'}, 'id': '...'}])"
    ],
    "performance_log": [ "[AGENT] LLM call took 4.12 seconds." ]
}
----------------------------------------------------------------------------------------------------
State Analysis:
The agent has processed the input. The LLM correctly planned parallel tool calls. The execution time of the LLM call has been logged.
----------------------------------------------------------------------------------------------------

****************************************************************************************************
**Step 2: Tools Node Execution**
****************************************************************************************************
--- TOOLS: Executing tool calls ---
--- [Tool Call] Executing get_stock_price for symbol: NVDA ---
--- [Tool Call] Executing get_recent_company_news for: NVIDIA ---
[TOOLS] Executed 2 tools in 2.31 seconds.
Current State:
{
    "messages": [ ... ],
    "performance_log": [ "[AGENT] LLM call took 4.12 seconds.", "[TOOLS] Executed 2 tools in 2.31 seconds." ]
}
----------------------------------------------------------------------------------------------------
State Analysis:
The tool executor received the tool calls and executed them. The results are now in the state as ToolMessages. The performance log is accumulating.
----------------------------------------------------------------------------------------------------
...

流式输出清晰地展示了 Agent 的循环执行过程:

  • 第一步(Agent 节点):在 Agent 节点的首次运行中,AIMessage 显示 Llama 3 模型正确识别出需要两个独立的工具调用——get_stock_priceget_recent_company_news——并在单轮推理中完成了规划。这正是实现性能优化的“并行计划”步骤。
  • 第二步(Tools 节点):Tools 节点接收并执行了两个规划好的工具调用。日志中两个 [Tool Call] 的打印信息确认它们由 ToolExecutor 并行执行。性能日志 [TOOLS] Executed 2 tools in 2.31 seconds 是关键的性能数据点。
  • 第三步(Agent 节点):最终,Agent 节点收到包含执行结果的 ToolMessage,并将其综合为最终答案。

最后进行定量分析。我们解析完整的性能日志,计算时间节省:

print("Run Log:")
total_time = 0
tool_time = 0
for log in final_state['performance_log']:
    print(f" - {log}")
    # 从日志字符串中提取时间数值
    time_val = float(log.split(' ')[-2])
    total_time += time_val
    if "[TOOLS]" in log:
        tool_time = time_val
print("n" + "-"*60 + "n")
print(f"Total Execution Time: {total_time:.2f} secondsn")
print("Analysis:")

并行化如何有效降低延迟?让我们通过一个性能报告来具体分析。

#### 输出示例 ####
============================================================
              FINAL PERFORMANCE REPORT
============================================================
运行日志:
- [AGENT] LLM 调用耗时 4.12 秒。
- [TOOLS] 执行 2 个工具耗时 2.31 秒。
- [AGENT] LLM 调用耗时 5.23 秒。
------------------------------------------------------------

总执行时间:11.66 秒

工具执行步骤总耗时 2.31 秒。假设每个网络调用各耗时约 1.5 秒,串行总耗时约为 3.0 秒(1.5s + 1.5s)。

**并行化节省了约 0.7 秒**。虽然看似不多,但在更复杂的系统中,如果有 5–10 个独立的工具调用,每个耗时 2–3 秒,串行执行需要 10–30 秒,而并行执行仍只需 2–3 秒。这种差距将直接影响系统的可用性。

## 从假设生成到策略探索

在前一种模式中,智能体遵循单一的线性思路;如果初始路径存在偏差或并非最优,整个流程都会受到影响。

> 对于复杂或创意性任务而言,第一个想法往往不是最佳方案,这是一个显著的风险。

**并行假设生成**(Parallel Hypothesis Generation,也称为分支思考)是一种系统化的方法,旨在避免“单一思路依赖”。

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能
分支思考(Created by Fareed Khan) 1. 系统不会只沿着一条推理路径前进,而是在开始时显式生成多个多样化的策略或“假设”。 2. 然后并行探索这些路径,为每条路径生成一个方案。 3. 最后,由一个**评审**智能体对所有候选方案进行评估,并选出最佳方案。这种方法更加稳健、更具创造力,也不易陷入次优路径。 我们将构建一个多智能体系统来处理一个创意营销任务。该系统由**规划者**、并行的**执行者**以及**评审者**组成。本节目标是清晰展示,相较于单一智能体,最终输出在质量上的显著提升。 首先,为了管理复杂的信息流,我们使用 Pydantic 定义结构化输出模型,这是多智能体系统之间的“粘合剂”。 ```python from langchain_core.pydantic_v1 import BaseModel, Field from typing import List class MarketingHypothesis(BaseModel): """用于探索的单一、独特的营销角度或策略的 Pydantic 模型。""" # 角度的简短、吸引人的名称(例如,“技术爱好者”)。 angle_name: str = Field(description="A short, catchy name for the marketing angle (e.g., 'The Tech Enthusiast').") # 目标受众和核心信息的简要描述。 description: str = Field(description="A one-sentence description of the target audience and core message for this angle.") class Plan(BaseModel): """规划者输出的 Pydantic 容器,包含多个假设。""" # 该列表将包含恰好 3 个不同的营销假设,以供并行探索。 hypotheses: List[MarketingHypothesis] = Field(description="A list of exactly 3 distinct marketing hypotheses to explore in parallel.") class Slogan(BaseModel): """单个文案执行者输出的 Pydantic 模型。""" slogan: str = Field(description="The generated marketing slogan.") class Evaluation(BaseModel): """评审者智能体最终结构化输出的 Pydantic 模型。""" # 对所有生成口号的详细评论。 critique: str = Field(description="A detailed critique of all slogans, explaining the pros and cons of each.") # 评审者选出的最佳口号。 best_slogan: str = Field(description="The single best slogan chosen from the list.")

这些 Pydantic 模型是智能体之间的正式“数据契约”。例如,Plan 确保规划者输出一个 MarketingHypothesis 列表;Evaluation 则保证评审者不仅输出获胜口号,还会提供详细的 critique

接着定义 GraphState。它比上一模式更复杂,需要跟踪初始计划以及多个并行执行者的结果。

from typing import TypedDict, Annotated, List, Dict
import operator

class GraphState(TypedDict):
    product_description: str
    plan: List[MarketingHypothesis]
    # ‘worker_results’ 是一个字典,键是角度名称,值是生成的口号。
    # ‘operator.update’ 归约器告诉 LangGraph 合并来自并行分支的字典,而不是替换它们。
    worker_results: Annotated[Dict[str, Slogan], operator.update]
    final_evaluation: Evaluation
    performance_log: Annotated[List[str], operator.add]

这里最关键的是 worker_results: Annotated[Dict[str, Slogan], operator.update]。当并行执行者完成时,每个执行者返回一个仅包含自己结果的小字典;operator.update 告诉 LangGraph 将这些小字典合并为一个完整的 worker_results,从而避免数据丢失。

下面定义规划者智能体,这是图的第一个节点。

def planner_node(state: GraphState):
    """规划者节点:生成包含多个不同假设的初始营销计划。"""
    print("--- AGENT: Planner is thinking... ---")
    start_time = time.time()

    # 创建一个链,将 planner_prompt 传递给 LLM,指示其输出一个 ‘Plan’ 对象。
    planner_chain = planner_prompt | llm.with_structured_output(Plan)
    plan = planner_chain.invoke({"product_description": state['product_description']})

    execution_time = time.time() - start_time
    log_entry = f"[Planner] Generated {len(plan.hypotheses)} hypotheses in {execution_time:.2f}s."
    print(log_entry)

    # 使用假设列表和性能日志更新状态。
    return {"plan": plan.hypotheses, "performance_log": [log_entry]}

planner_node 接收高层的 product_description,利用 LLM 将创意任务分解为三个可并行的子任务(MarketingHypothesis 对象)。这一步的“扇出”操作奠定了整个模式的基础。

接下来定义 Worker 节点。这个节点是具体的执行者,它会针对计划中的每一条假设并行运行一次。

def worker_node(state: GraphState, config):
    """Worker 节点:为一条特定的假设生成口号。此节点会为每条假设并行运行。"""
    # LangGraph 中的 `config` 参数用于传递运行时信息。
    # 我们从 `configurable` 字典中获取此工作实例对应的具体假设。
    hypothesis = config["configurable"]["hypothesis"]
    angle_name = hypothesis.angle_name

    print(f"--- AGENT: Worker for '{angle_name}' is thinking... ---")
    start_time = time.time()

    # 创建此 Worker 的链。
    worker_chain = worker_prompt | llm.with_structured_output(Slogan)
    result = worker_chain.invoke({
        "product_description": state['product_description'],
        "angle_name": angle_name,
        "description": hypothesis.description
    })

    execution_time = time.time() - start_time
    log_entry = f"[Worker-{angle_name}] Generated slogan in {execution_time:.2f}s."
    print(log_entry)

    # 输出是一个字典,键是角度名称。这使得 `operator.update` 归约器能够正确合并所有并行 Worker 的结果。
    return {
        "worker_results": {angle_name: result},
        "performance_log": [log_entry]
    }

worker_node 是具体的文案写手。它不从主 state 中获取自己对应的假设,而是通过 config 参数接收。这是 LangGraph 支持同一节点并行多实例、各自处理不同输入的关键方式。

然后,我们需要一个“条件边”函数,将任务 分发 给所有并行的执行者。

from langgraph.graph.graph import Send

def scatter_to_workers(state: GraphState) -> List[Send]:
    """一个特殊边函数,将计划分发给并行 Worker。"""
    print("--- ORCHESTRATOR: Scattering tasks to workers --- ")
    # 此函数返回一个 `Send` 对象列表。
    # 每个 `Send` 对象都是一个指令,告诉图向特定节点('worker')分派一个任务,
    # 并通过 `config` 参数传递特定的输入。
    tasks = [
        Send(
            "worker",
            config={"configurable": {"hypothesis": hypothesis}}
        )
        for hypothesis in state['plan']
    ]
    return tasks

scatter_to_workers 是实现动态并行的核心。它本身不是一个标准节点,而是作为条件边使用。它从 state 中读取 plan,为每条假设构建一个 Send 指令,从而告知 LangGraph 并行启动多个 worker 实例,每个实例都携带独立的配置。当返回一个 Send 列表时,LangGraph 会将其理解为“全部并行执行”。

最后是 Judge 节点,负责收集并评估所有 Worker 的结果。

def judge_node(state: GraphState):
    """Judge 节点:评估所有 Worker 的结果,提供评判,并选出最佳的一个。"""
    print("--- AGENT: Judge is evaluating... ---")
    start_time = time.time()

    # 将并行的 Worker 结果格式化为一个字符串,供 Judge 提示词使用。
    slogans_to_evaluate = ""
    for angle, slogan_obj in state['worker_results'].items():
        slogans_to_evaluate += f"Angle: {angle}nSlogan: {slogan_obj.slogan}nn"

    # 创建 Judge 的链。
    judge_chain = judge_prompt | llm.with_structured_output(Evaluation)
    evaluation = judge_chain.invoke({
        "product_description": state['product_description'],
        "slogans_to_evaluate": slogans_to_evaluate
    })

    execution_time = time.time() - start_time
    log_entry = f"[Judge] Evaluated {len(state['worker_results'])} slogans in {execution_time:.2f}s."
    print(log_entry)

    # 将最终评估结果更新到状态中。
    return {"final_evaluation": evaluation, "performance_log": [log_entry]}

judge_node 是“扇入”聚合步骤,它读取 worker_results 并进行综合评判。它对比各个创意,做出有理有据的最终选择,从而产出高质量的输出。

定义完所有节点和边之后,就可以组装并编译最终的执行图:

from langgraph.graph import StateGraph, END

# 使用我们定义的状态初始化一个新图。
workflow = StateGraph(GraphState)

# 添加代表智能体的节点。
workflow.add_node("planner", planner_node)
workflow.add_node("worker", worker_node)
workflow.add_node("judge", judge_node)

# 工作流的入口点是 Planner。
workflow.set_entry_point("planner")

# 在 Planner 之后,我们使用特殊的 `scatter_to_workers` 函数作为条件边,将工作扇出。
workflow.add_conditional_edges("planner", scatter_to_workers)

# 当所有 Worker 节点完成后,它们的结果会自动聚合,
# 然后我们定义一个静态边,将结果扇入到 Judge。
workflow.add_edge("worker", "judge")

# Judge 是图结束前的最后一步。
workflow.add_edge("judge", END)

# 将图编译为可运行的应用程序。
app = workflow.compile()

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

并行假设(Created by Fareed Khan)

最后,我们通过性能日志进行定量分析,直观展示并行执行的优势。

total_time = 0
planner_time = 0
worker_times = []
judge_time = 0

# 解析性能日志,提取各阶段耗时
for log in final_state['performance_log']:
    time_val = float(log.split(' ')[-1].replace('s', ''))
    if "[Planner]" in log:
        planner_time = time_val
    elif "[Worker-" in log:
        worker_times.append(time_val)
    elif "[Judge]" in log:
        judge_time = time_val

# 并行步骤的总耗时取决于运行时间最长的任务
parallel_worker_time = max(worker_times) if worker_times else 0

# 整个工作流的总执行时间
total_execution_time = planner_time + parallel_worker_time + judge_time
print(f"Total Execution Time: {total_execution_time:.2f} secondsn")
print("Breakdown:")
print(f" - Planner: {planner_time:.2f} seconds")
print(f" - Parallel Workers (longest path): {parallel_worker_time:.2f} seconds")
print(f" - Judge: {judge_time:.2f} secondsn")

# 模拟串行工作流的耗时
sequential_worker_time = sum(worker_times)
time_saved = sequential_worker_time - parallel_worker_time

执行结果如下:

#### OUTPUT ####
============================================================
                      PERFORMANCE ANALYSIS
============================================================
Total Execution Time: 19.24 seconds

Breakdown:
 - Planner: 6.78 seconds
 - Parallel Workers (longest path): 5.31 seconds
 - Judge: 7.15 seconds

三个 Worker 的耗时分别为 5.31s、5.12s 和 4.98s。若采用串行执行,该阶段将需要 15.41s(5.31 + 5.12 + 4.98)。

通过并行化,该阶段仅耗时 5.31s(即最慢 Worker 的耗时)。

仅此一步就节省了超过 10 秒的时间。

更重要的是,最终输出质量得到了显著提升。 系统不再是生成单一方案,而是并行探索了三种策略,再通过推理步骤从中选择最优解。

并行评估以实现稳健治理

在前述模式中,即便生成了多个想法,评估环节仍依赖单一、线性的路径,这意味着智能体方案最终只接受了一个评判视角的审视。

然而,复杂的决策往往需要从多个不同维度进行评估。

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

并行评估(Created by Fareed Khan)

并行评估(Parallel Evaluation,或称 Multi-Critic Reflection)模式摒弃了单一、集中式的评判机制。其核心流程如下:

  1. 建立一个由多个 AI 批评家(Critic)组成的评审小组。将待评估内容同时发送给所有批评家,让他们从各自的专业视角进行独立评估。
  2. 由最终的 编辑(Editor)智能体汇总这些并行的反馈,做出综合、全面的最终决策。

接下来,我们将构建一个内容审核系统作为示例:待审核的草稿会先交由并行的批评家团队审阅,然后由终审编辑根据集体意见做出最终决定。

首先,为确保批评家反馈格式一致且机器可读,我们定义其输出的 Pydantic 模式。

from langchain_core.pydantic_v1 import BaseModel, Field
from typing import Literal

class Critique(BaseModel):
    """来自单个专业批评家的结构化评审模型。"""
    # 关于内容是否符合该批评家特定标准的明确二元判定。
    is_compliant: bool = Field(description="内容是否符合此批评家的特定标准。")
    # 解释该判定的详细、可操作的反馈。
    feedback: str = Field(description="详细解释内容合规或不合规的原因。如不合规,请提供可操作的改进建议。")

这个 Critique 类充当了正式的通信协议,确保每位专家都提供明确的 is_compliant 判定和文字 feedback,从而使输出稳定可靠,便于最终编辑进行解析。

接着定义 GraphState,它需要跟踪待审查的内容以及并行评审小组的反馈。

from typing import TypedDict, Annotated, Dict
import operator

class GraphState(TypedDict):
    content_to_review: str

    # 'critiques' 是一个字典,键为批评家名称,值为其结构化的 Critique 对象。
    # 'operator.update' 归约器对于合并并行分支的输出至关重要。
    critiques: Annotated[Dict[str, Critique], operator.update]
    final_decision: dict # 简化为 dict 类型
    performance_log: Annotated[List[str], operator.add]

critiques 字典配合 operator.update 实现了并行结果的自动聚合。它会将每个并行分支生成的 Critique 对象汇集到一个完整的字典中,再交给最终的 Editor 进行处理。

现在定义核心的评审节点。以下实现一个评审节点(品牌声音分析师)和最终编辑节点,其他评审节点(事实核查员、风险评估员)模式相同,仅提示词不同。

import time

# 品牌声音分析师节点
def brand_voice_node(state: GraphState):
    """一个简单的评审器,根据预定义的品牌声音指南评估内容。"""
    print("--- 评审:品牌声音分析师正在审核... ---")
    start_time = time.time()

    # 一个简单的链:提示词 -> 大语言模型 -> 结构化输出。
    brand_chain = brand_voice_prompt | llm.with_structured_output(Critique)
    critique = brand_chain.invoke({"content_to_review": state['content_to_review']})

    execution_time = time.time() - start_time
    log_entry = f"[BrandVoice] 完成,耗时 {execution_time:.2f} 秒。"
    print(log_entry)

    return {"critiques": {"BrandVoice": critique}, "performance_log": [log_entry]}
# 主编节点(聚合与决策)
def chief_editor_node(state: GraphState):
    """最终节点:聚合所有评审意见,做出最终、有依据的决策。"""
    print("--- 编辑:主编正在做出决策... ---")
    start_time = time.time()

    # 将状态中的结构化评审意见格式化为单个字符串,供主编提示词使用。
    critiques_str = ""
    for critic_name, critique_obj in state['critiques'].items():
        critiques_str += f"- {critic_name} 评审意见:n  - 合规: {critique_obj.is_compliant}n  - 反馈: {critique_obj.feedback}nn"

    # 创建主编的链。
    editor_chain = chief_editor_prompt | llm.with_structured_output(dict) # 为简化使用 dict
    final_decision = editor_chain.invoke({
        "content_to_review": state['content_to_review'],
        "critiques": critiques_str
    })

    execution_time = time.time() - start_time
    log_entry = f"[ChiefEditor] 完成,耗时 {execution_time:.2f} 秒。"
    print(log_entry)

    return {"final_decision": final_decision, "performance_log": [log_entry]}

这两个节点体现了扇出扇入的逻辑。brand_voice_node 是并行专家的模板,独立运行;chief_editor_node 则汇总多角度反馈,做出统一、可执行的最终决策。

组装完整图,将入口设置为三个评审节点并行启动:

from langgraph.graph import StateGraph, END

# 初始化一个新图。
workflow = StateGraph(GraphState)

# 定义评审节点和编辑节点。
workflow.add_node("fact_checker", fact_checker_node)
workflow.add_node("brand_voice_analyst", brand_voice_node)
workflow.add_node("risk_assessor", risk_assessor_node)
workflow.add_node("chief_editor", chief_editor_node)

# 入口点是一个节点列表,这指示 LangGraph 并行运行它们。
workflow.set_entry_point(["fact_checker", "brand_voice_analyst", "risk_assessor"])

# 在所有评审节点完成后,它们的结果会被合并,
# 然后我们定义一个静态边,扇入到主编节点。
workflow.add_edge(["fact_checker", "brand_voice_analyst", "risk_assessor"], "chief_editor")

# 主编的决策是最后一步。
workflow.add_edge("chief_editor", END)

# 将图编译为可运行的应用程序。
app = workflow.compile()
print("图构建并编译成功。")

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

并行评审(Created by Fareed Khan)

分析并行工作流的性能收益:

critic_times = []
editor_time = 0

# 解析性能日志以提取计时数据。
for log in final_state['performance_log']:
    # 假设日志格式为 '[节点名] 完成,耗时 X.XX 秒。'
    time_val = float(log.split(' ')[-2].replace('秒。', ''))
    if "[ChiefEditor]" in log:
        editor_time = time_val
    else:
        critic_times.append(time_val)

# 并行阶段的耗时是最慢的评审节点的耗时。
parallel_critic_time = max(critic_times) if critic_times else 0

# 对于顺序执行的模拟,我们将它们的耗时相加。
sequential_critic_time = sum(critic_times)

# 我们工作流的总耗时。
total_time = parallel_critic_time + editor_time
time_saved = sequential_critic_time - parallel_critic_time
print(f"总执行时间: {total_time:.2f} 秒n")
print("详细分解:")
print(f" - 并行评审(最长路径): {parallel_critic_time:.2f} 秒")
print(f" - 主编: {editor_time:.2f} 秒n")

#### OUTPUT ####
=============================================================
                     FINAL GOVERNANCE DECISION
=============================================================

Final Decision: Request Revisions

Editors Summary:
The post is non-compliant across the board. The Fact-Checker found unsupported claims, the Risk Assessor identified significant legal and reputational risks with the terms 'guaranteed' and 'cures procrastination', and the Brand Voice Analyst noted that the tone is overly hyped and exaggerated.

Revision Instructions:
Please remove the word 'guaranteed'. Rephrase the '500% faster' claim to be more specific and verifiable, for example, 'up to 5x faster in specific benchmarks'. Remove the unsupported claim about curing procrastination entirely. Tone down the language to be more professional and focus on the practical benefits for the user.

============================================================
                     PERFORMANCE ANALYSIS
============================================================
Total Execution Time: 15.66 seconds

Breakdown:
 - Parallel Critics (longest path): 9.21 seconds
 - Chief Editor: 6.45 seconds

三个评审节点并行运行。若采用串行方式,该阶段将耗时 19.24 秒;而并行执行仅需 9.21 秒(由最慢的评审节点决定)。

> 此举节省了 10.03 秒,使评估阶段的延迟降低了超过 52%。更重要的是,评审质量远优于单一评估。

我们从三个不同视角获得了深入、专业的反馈,使首席编辑能够做出更全面、信息更充分的决策。

## 用于超响应智能体的推测执行

许多智能体工作流遵循以下顺序:`用户输入 -> 智能体思考(LLM调用)-> 智能体行动(工具调用)`。用户需要等待这两个阶段完成。**推测执行** 模式通过让这两个步骤重叠进行来优化此流程。

> 其核心思想是:在智能体“思考”的同时,系统对其下一步最可能的动作做出合理预测,并抢先启动该动作。

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能
推测执行示意图(Created by Fareed Khan) 如果预测正确,工具调用的延迟就被 LLM 推理时间所“掩盖”,从用户视角看,响应几乎是即时的。 这对于任何高吞吐、面向用户的系统都至关重要。我们将构建一个客服智能体来拉取用户订单历史,展示此模式如何从用户感知上消除工具调用的延迟。 首先,我们需要一个“在现实中很慢”的工具。这里使用固定的人工延迟来模拟数据库查询。 ```python from langchain_core.tools import tool import time import json # 定义模拟数据库延迟的常量。 DATABASE_LATENCY_SECONDS = 3 @tool def get_order_history(user_id: str) -> str: """一个模拟的慢速工具,用于从数据库获取指定用户的订单历史。""" print(f"--- [DATABASE] Starting query for user_id: {user_id}. This will take {DATABASE_LATENCY_SECONDS} seconds. ---") # 使用 'time.sleep()' 模拟网络和数据库查询时间。 time.sleep(DATABASE_LATENCY_SECONDS) # 在此演示中使用模拟数据。 mock_db = { "user123": [ {"order_id": "A123", "item": "QuantumLeap AI Processor", "status": "Shipped"}, {"order_id": "B456", "item": "Smart Coffee Mug", "status": "Delivered"} ] } result = mock_db.get(user_id, []) print(f"--- [DATABASE] Query finished for user_id: {user_id}. ---") return json.dumps(result)

get_order_history 中的 time.sleep(DATABASE_LATENCY_SECONDS) 提供了可预测的 3 秒延迟。我们将使用 推测执行 来“隐藏”这部分延迟。

接着定义 GraphState,并添加一个特殊字段 prefetched_data 用于保存推测执行的结果。

from typing import TypedDict, Annotated, List, Optional
from langchain_core.messages import BaseMessage
import operator
from concurrent.futures import Future

class GraphState(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add]
    user_id: str
    # 'prefetched_data' 将持有一个 Python 'Future' 对象,代表后台工具调用的结果占位符。
    prefetched_data: Optional[Future]
    # 'agent_decision' 将保存 LLM 最终决定要执行的实际工具调用。
    agent_decision: Optional[BaseMessage]
    performance_log: Annotated[List[str], operator.add]

关键在于 prefetched_data: Optional[Future]Future 对象是一个未完成结果的占位符,它允许入口节点在启动后台任务后立即返回,并将这个 Future 传递给后续节点使用。

接下来是该模式的核心:entry_point 节点。它将同时启动两个过程:推测性的工具调用和主线的 LLM 推理。

from concurrent.futures import ThreadPoolExecutor

# 创建一个线程池来运行后台任务。
thread_pool = ThreadPoolExecutor(max_workers=5)

def entry_point(state: GraphState):
    """入口节点:并行启动推测性预取和主智能体推理。"""
    print("--- [ORCHESTRATOR] Entry point started. --- ")
    start_time = time.time()

    # 1. 使用线程池在后台线程中启动推测性预取。
    #    .submit() 方法会立即返回一个 'Future' 对象。
    print("--- [ORCHESTRATOR] Starting speculative pre-fetch of order history... ---")
    prefetched_data_future = thread_pool.submit(get_order_history.invoke, {"user_id": state['user_id']})

    # 2. 在工具于后台运行的同时,并行启动主智能体的 LLM 调用。
    print("--- [ORCHESTRATOR] Starting main agent LLM call... ---")
    agent_response = llm_with_tools.invoke(state['messages'])

    execution_time = time.time() - start_time
    log_entry = f"[Orchestrator] LLM reasoning completed in {execution_time:.2f}s."
    print(log_entry)

    # 节点返回 Future 对象和智能体的决策,以便添加到状态中。
    return {
        "prefetched_data": prefetched_data_future,
        "agent_decision": agent_response,
        "performance_log": [log_entry]
    }

entry_point 中,thread_pool.submit() 是非阻塞的:它在独立线程中发起耗时为 3 秒的 get_order_history 调用,并立即返回一个 Future 对象。代码无需等待工具调用完成,而是立即继续执行 llm_with_tools 的调用。这正是实现 时间重叠 的关键。

接下来是 tool_executor_node,它负责执行智能体选择的工具,并在执行前检查所需数据是否已被预取。

from langchain_core.messages import ToolMessage
import time

def tool_executor_node(state: GraphState):
    """执行智能体选择的工具,但首先检查数据是否已被预取。"""
    print("--- [TOOL EXECUTOR] Node started. --- ")
    start_time = time.time()

    agent_decision = state['agent_decision']
    tool_call = agent_decision.tool_calls[0]

    # 检查智能体想要调用的工具是否与我们推测执行的是同一个。
    if tool_call['name'] == "get_order_history":
        print("--- [TOOL EXECUTOR] Agent wants order history. Checking pre-fetch... ---")
        # 如果匹配,我们调用 Future 对象的 .result() 方法。
        # 这会阻塞直到后台线程完成,但如果已经完成,它会立即返回结果。
        prefetched_future = state['prefetched_data']
        tool_result = prefetched_future.result()
        print("--- [TOOL EXECUTOR] Pre-fetch successful! Using cached data instantly. ---")
    else:
        # 如果智能体做了不同的决定,我们将正常执行那个工具。
        print(f"--- [TOOL EXECUTOR] Speculation failed. Agent wants {tool_call['name']}. Executing normally. ---")
        # (在我们的演示中不会走这个分支。)
        tool_result = "Tool not implemented for this demo."

    tool_message = ToolMessage(content=tool_result, tool_call_id=tool_call['id'])

    execution_time = time.time() - start_time
    log_entry = f"[ToolExecutor] Resolved tool call in {execution_time:.2f}s."
    print(log_entry)

    return {"messages": [agent_decision, tool_message], "performance_log": [log_entry]}

tool_executor_node 的核心在于 prefetched_future.result() 的调用:
1. 命中预取:如果数据库查询在 LLM 思考期间已经完成,此调用会立即返回结果。从用户视角看,3 秒的延迟被完全消除。
2. 推测失败:如果推测错误,智能体选择了其他工具,则忽略这个 Future 对象,正常执行所需的工具。

接下来组装工作流图:

from langgraph.graph import StateGraph, END

# 这个条件边用于检查智能体的决策是否包含任何工具调用。
def should_call_tool(state: GraphState) -> str:
    if state['agent_decision'].tool_calls:
        return "execute_tool"
    return END

# 定义图。
workflow = StateGraph(GraphState)
workflow.add_node("entry_point", entry_point)
workflow.add_node("execute_tool", tool_executor_node)
workflow.add_node("final_answer", final_answer_node) # (假设 final_answer_node 已定义)

# 构建图的控制流。
workflow.set_entry_point("entry_point")
workflow.add_conditional_edges("entry_point", should_call_tool)
workflow.add_edge("execute_tool", "final_answer")
workflow.add_edge("final_answer", END)
app = workflow.compile()

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

Speculative Execution(Created by Fareed Khan)

现在进行测试,对比推测式工作流与传统串行工作流的时间:

from langchain_core.messages import HumanMessage

inputs = {
    "messages": [HumanMessage(content="Hi, can you tell me the status of my recent orders?")],
    "user_id": "user123"
}

step_counter = 1
final_state = None

for output in app.stream(inputs, stream_mode="values"):
    node_name = list(output.keys())[0]
    print(f"n{'*' * 100}")
    print(f"**Step {step_counter}: {node_name.replace('_', ' ').title()} Node Execution**")
    print(f"{'*' * 100}")

    step_counter += 1

#### OUTPUT ####
[ORCHESTRATOR] Entry point started. ---
[ORCHESTRATOR] Starting speculative pre-fetch of order history... ---
[DATABASE] Starting query for user_id: user123. This will take 3 seconds. ---
...

为了量化性能提升,我们从运行完毕的 `final_state` 的性能日志中提取了关键计时数据,并进行了对比分析。

```python
# 从完整运行后的 final_state 性能日志中提取计时数据
llm_time_1 = float(final_state['performance_log'][0].split(' ')[-2])
resolution_time = float(final_state['performance_log'][1].split(' ')[-2])
llm_time_2 = float(final_state['performance_log'][2].split(' ')[-2])
db_time = DATABASE_LATENCY_SECONDS # 已知的数据库延迟

# 计算推测式执行的总耗时
speculative_total = llm_time_1 + resolution_time + llm_time_2
# 模拟计算顺序执行的总耗时
sequential_total = llm_time_1 + db_time + llm_time_2

time_saved = sequential_total - speculative_total
reduction_percent = (time_saved / sequential_total) * 100

运行上述代码后,我们得到了清晰的性能对比结果:

============================================================
                  PERFORMANCE SHOWDOWN
============================================================

------------------------------------------------------------
             SPECULATIVE EXECUTION WORKFLOW (Our Run)
------------------------------------------------------------
1. Agent Thinks (LLM Call 1):       4.21 seconds
   (Database Query: 3.00s ran in parallel, fully hidden)
2. Tool Result Resolution:          0.01 seconds (Instant cache hit)
3. Synthesize Answer (LLM Call 2):  3.55 seconds
------------------------------------------------------------
Total Time to Final Answer: 7.77 seconds

------------------------------------------------------------
             TRADITIONAL SEQUENTIAL WORKFLOW (Simulated)
------------------------------------------------------------
1. Agent Thinks (LLM Call 1):       4.21 seconds
2. Execute Tool (Database Query):   3.00 seconds (User waits)
3. Synthesize Answer (LLM Call 2):  3.55 seconds
------------------------------------------------------------
Simulated Total Time: 10.76 seconds

============================================================
                    CONCLUSION
============================================================
Time Saved: 2.99 seconds
Perceived Latency Reduction: 28%

关键结论:节省的时间几乎完全等于慢速工具调用(数据库查询)本身的耗时,实现了 28% 的总体响应时间减少。

在推测式执行工作流中,3秒的数据库查询与4.21秒的首次LLM调用并行执行。由于LLM推理更慢,数据库延迟被完全隐藏;tool_executor_node 几乎瞬间(0.01秒)获取到缓存结果。最终,用户感知的总延迟仅为 7.77秒

分层智能体团队:实现更优质量

至此,我们探讨了智能体如何并行生成想法并进行评估。然而,在复杂任务中,智能体通常需要自行决定“做什么”以及“何时做”,这会在规划与执行阶段之间引入额外的延迟。

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

分层体系结构(Created by Fareed Khan)

专业化与任务分解 架构模式正是解决这一问题的关键。

  1. 编排者/管理者:将复杂任务交给一个高层级的编排者智能体。它不直接执行具体操作,而是专注于整体规划和任务分解。
  2. 工作者:编排者将任务拆解为定义清晰、更小的子任务,并将其分派给专门的工作者智能体。
  3. 并行执行与综合:工作者智能体通常可以并行执行。最终,由编排者综合所有工作者的结果,生成统一的最终输出。

接下来,我们将以“投资报告生成”任务为例,直接对比 单体智能体分层团队 的表现。我们将证明,分层方式不仅在速度上更快,其生成的报告在细节丰富度、结构清晰度和准确性方面也更胜一筹。

定义结构化数据模型,作为Agent之间通信的正式协议。结构化输出是多Agent系统的粘合剂,它确保了信息交换的可靠性和一致性。

from langchain_core.pydantic_v1 import BaseModel, Field
from typing import Optional, List

class FinancialData(BaseModel):
    """金融分析师Agent的结构化输出模型。"""
    price: float = Field(description="当前股价。")
    market_cap: int = Field(description="总市值。")
    pe_ratio: float = Field(description="市盈率。")
    volume: int = Field(description="平均交易量。")

class NewsAndMarketAnalysis(BaseModel):
    """新闻与市场分析师Agent的结构化输出模型。"""
    summary: str = Field(description="近期重要新闻和市场趋势的简明摘要。")
    competitors: List[str] = Field(description="公司主要竞争对手列表。")

class FinalReport(BaseModel):
    """首席分析师最终合成投资报告的结构化输出模型。"""
    company_name: str = Field(description="公司名称。")
    financial_summary: str = Field(description="关键财务数据的摘要段落。")
    news_and_market_summary: str = Field(description="新闻、市场趋势和竞争格局的摘要段落。")
    recommendation: str = Field(description="最终投资建议(如‘强力买入’、‘持有’、‘卖出’)及简要理由。")

这些Pydantic模型充当了专业Agent与编排器之间的正式契约。例如,FinancialData模型强制要求金融分析师必须提供四个具体的数值字段,这比非结构化的纯文本输出更可靠,也更易于后续的综合处理。

接下来,定义分层团队的GraphState,用于跟踪每个专家Agent的输出状态:

from typing import TypedDict, Annotated

class TeamGraphState(TypedDict):
    company_symbol: str
    company_name: str
    # 存放金融分析师的结构化输出。
    financial_data: Optional[FinancialData]
    # 存放新闻与市场分析师的结构化输出。
    news_analysis: Optional[NewsAndMarketAnalysis]
    # 存放合成器的最终产品。
    final_report: Optional[FinalReport]
    performance_log: Annotated[List[str], operator.add]

TeamGraphState是分析团队的共享工作台,其强类型字段(如financial_datanews_analysis)确保了在最终合成阶段,输入数据是干净且组织良好的。

现在定义专业的Worker Agent。每个Worker都是“自包含、会使用工具、提示词高度聚焦”的Agent。首先定义金融分析师节点:

from langchain.agents import create_tool_calling_agent, AgentExecutor
import time

# 为金融分析师创建一个自包含的Agent执行器。其提示词高度聚焦于单一任务。
financial_analyst_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一名金融专家。你的唯一工作是使用提供的工具获取公司的关键财务指标,并以结构化格式返回。"),
    ("human", "获取股票代码为 {symbol} 的公司的财务数据。")
])

# 此Agent仅能访问‘get_financial_data’工具。
financial_agent = create_tool_calling_agent(llm, [get_financial_data], financial_analyst_prompt)

# 链的最后一步强制将Agent的输出转换为我们的‘FinancialData’ Pydantic模型。
financial_executor = AgentExecutor(agent=financial_agent, tools=[get_financial_data]) | llm.with_structured_output(FinancialData)

def financial_analyst_node(state: TeamGraphState):
    """用于获取并结构化财务数据的专家节点。"""
    print("--- [金融分析师] 开始分析... ---")
    start_time = time.time()
    result = financial_executor.invoke({"symbol": state['company_symbol']})
    execution_time = time.time() - start_time
    log = f"[金融分析师] 在 {execution_time:.2f} 秒内完成。"
    print(log)
    return {"financial_data": result, "performance_log": [log]}

financial_analyst_node的提示词高度聚焦,且工具权限受到严格限制。通过限定Agent只执行一项任务,可以显著提高其输出的可靠性。末尾的.with_structured_output(FinancialData)是一个关键的质量检查点,确保交付的格式完全正确。

news_analyst_node遵循相同的模式,仅提示词和工具不同。

最后定义编排器 Agent:report_synthesizer_node。它读取并行Worker的结构化输出,并完成最终的综合报告。

# 创建合成器/编排器的链。
report_synthesizer_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是首席投资分析师。你的工作是将专家团队提供的结构化财务数据和市场分析,综合成一份最终、全面的投资报告,并给出有理由的建议。"),
    ("human", "请为 {company_name} 创建最终报告。nn财务数据:n{financial_data}nn新闻与市场分析:n{news_analysis}")
])
synthesizer_chain = report_synthesizer_prompt | llm.with_structured_output(FinalReport)

def report_synthesizer_node(state: TeamGraphState):
    """读取Worker的结构化输出并合成最终报告的编排器节点。"""
    print("--- [首席分析师] 正在合成最终报告... ---")
    start_time = time.time()

    # 此节点从状态中读取结构化数据。
    report = synthesizer_chain.invoke({
        "company_name": state['company_name'],
        "financial_data": state['financial_data'].json(),
        "news_analysis": state['news_analysis'].json()
    })

    execution_time = time.time() - start_time
    log = f"[首席分析师] 在 {execution_time:.2f} 秒内完成报告。"
    print(log)
    return {"final_report": report, "performance_log": [log]}

report_synthesizer_node是“管理者”角色,专注于综合任务,无需再调用任何工具。它接收干净的FinancialDataNewsAndMarketAnalysis结构化数据,从而可以专注于构建叙事和做出决策。

最后,使用“扇出-扇入”模式组装工作流图:

from langgraph.graph import StateGraph, END

# 初始化图
workflow = StateGraph(TeamGraphState)

# 添加两个专家工作节点和最终的合成器节点
workflow.add_node("financial_analyst", financial_analyst_node)
workflow.add_node("news_analyst", news_analyst_node)
workflow.add_node("report_synthesizer", report_synthesizer_node)

# 设置入口点为列表,指示LangGraph并行运行两个专家工作节点
workflow.set_entry_point(["financial_analyst", "news_analyst"])

# 从节点列表出发的边意味着图将等待列表中所有节点完成后才继续执行,这是“扇入”或同步步骤
workflow.add_edge(["financial_analyst", "news_analyst"], "report_synthesizer")

# 合成器是最终步骤
workflow.add_edge("report_synthesizer", END)

# 编译图
app = workflow.compile()

# 运行工作流
inputs = {
    "company_symbol": "TSLA",
    "company_name": "Tesla",
    "performance_log": []
}

start_time = time.time()
team_result = None
for output in app.stream(inputs, stream_mode="values"):
    team_result = output
end_time = time.time()
team_time = end_time - start_time

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

分层团队结构(Created by Fareed Khan)

最后,对两种架构的报告质量与性能进行对比:

print("="*60)
print("                AGENT OUTPUT COMPARISON")
print("="*60)
print("n" + "-"*60)
print("            MONOLITHIC AGENT REPORT")
print("-"*60 + "n")
print(f"'{monolithic_result['output']}'")

print("n" + "-"*60)
print("            HIERARCHICAL TEAM REPORT")
print("-"*60 + "n")
print(json.dumps(team_result['final_report'], indent=4, default=lambda o: o.dict()))
print("n" + "="*60)
print("                  ACCURACY & QUALITY ANALYSIS")
print("="*60 + "n")

print("="*60)
print("                  PERFORMANCE ANALYSIS")
print("="*60 + "n")
print(f"Monolithic Agent Total Time: {monolithic_time:.2f} seconds")
print(f"Hierarchical Team Total Time: {team_time:.2f} secondsn")
time_saved = monolithic_time - team_time
print(f"Time Saved: {time_saved:.2f} seconds ({time_saved/monolithic_time*100:.0f}% faster)n")
print("Analysis of Parallelism:")

# 从性能日志中提取各工作节点执行时间(如笔记本中所示)
worker_times = [6.89, 8.12]
parallel_worker_time = max(worker_times)
sequential_worker_time = sum(worker_times)

#### OUTPUT ####
============================================================
                    AGENT OUTPUT COMPARISON
============================================================

------------------------------------------------------------
            MONOLITHIC AGENT REPORT
------------------------------------------------------------
Tesla (TSLA) is currently trading at around $177.48. Recent news suggests the company is facing competition from other EV makers but is also expanding its Gigafactory network. The recommendation is to Hold the stock and monitor the competitive landscape.
------------------------------------------------------------
            HIERARCHICAL TEAM REPORT
------------------------------------------------------------
{
    "final_report": {
        "company_name": "Tesla",
        "financial_summary": "Tesla's current stock price is 177.48, with a total market capitalization of 566,310,215,680. It exhibits a trailing Price-to-Earnings (P/E) ratio of 45.4...",
        "news_and_market_summary": "Recent developments indicate that while Tesla remains a dominant force, the competitive landscape in the EV market is intensifying with rivals like BYD, Ford, and GM...",
        "recommendation": "Hold. While the company shows strong financials and market leadership, the high P/E ratio and increasing competition warrant a cautious approach..."
    }
}
============================================================
                    ACCURACY & QUALITY ANALYSIS
============================================================
1.  **结构与细节**:分层团队的报告明显更优。它遵循了Pydantic模式,具有清晰、独立的章节。单体代理仅生成了一段非结构化的文本。
2.  **事实深度**:分层团队的报告包含了具体的硬数据(市值、市盈率、成交量),因为财务分析师被明确要求查找这些信息。单体代理仅提及了价格。
3.  **完整性**:分层团队明确指出了竞争对手,并在综合多个数据点的基础上提供了更细致的建议。相比之下,单体代理的分析较为肤浅。
**结论**:任务的分解和专家代理的使用,直接产生了可证明的、更高质量和更准确的输出。层级结构和Pydantic模型强制进行了更严谨和详细的分析。
============================================================
                    PERFORMANCE ANALYSIS
============================================================
单体代理总耗时:18.34 秒
分层团队总耗时:13.57 秒
节省时间:4.77 秒(快 26%)
并行性分析:
两个专业工作器并行运行。如果串行执行,此阶段将耗时 15.01 秒。通过并行运行,该阶段仅耗时 8.12 秒(即最慢工作器的耗时)。这种并行性是更复杂、更高质量的分层系统反而显著更快的主要原因。

两个专业工作器并行执行:财务分析师耗时 6.89 秒,新闻分析师耗时 8.12 秒,最终报告更为详尽。若串行执行需 15.01 秒,并行仅需 8.12 秒(取决于最慢的工作器)。这正是高质量、复杂系统反而更快的原因。

## Competitive Agent Ensembles

在智能体(Agentic)方案中,每个AI智能体都有其独特的偏好、优势和短板。

> 通过构建一个多样化的智能体集合(Ensemble),可以降低因单个智能体产生次优或错误结果的风险。

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能
集成输出(Created by Fareed Khan) 通过组合多种模型或提示策略,系统变得更稳健,能够避免单点故障,并更容易获得高质量输出。 这类似于“寻求第二意见”或竞赛式的设计流程。 我们将构建一个由三个文案智能体组成的集合,其任务是创建产品描述;随后,让一个评审智能体对并行生成的输出进行推理并选出最佳结果,以此展示质量控制的提升。 首先,集合的强大之处在于成员的多样性。我们使用两个不同的LLM家族来构建三种文案**角色**。 ```python from langchain_huggingface import HuggingFacePipeline from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline from langchain_google_vertexai import ChatVertexAI import torch # LLM 1: Llama 3 8B Instruct (开源,通过 Hugging Face 本地运行) # 此模型将驱动我们的两个角色。 model_id = "meta-llama/Meta-Llama-3-8B-Instruct" tokenizer = AutoTokenizer.from_pretrained(model_id) hf_model = AutoModelForCausalLM.from_pretrained( model_id, torch_dtype=torch.bfloat16, device_map="auto", load_in_4bit=True ) pipe = pipeline("text-generation", model=hf_model, tokenizer=tokenizer, max_new_tokens=1024, do_sample=True, temperature=0.7, top_p=0.9) llama3_llm = HuggingFacePipeline(pipeline=pipe) # LLM 2: Vertex AI 上的 Claude 3.5 Sonnet (专有,基于云) # 使用来自不同家族和训练方法的模型,引入了显著的多样性。 claude_sonnet_llm = ChatVertexAI(model_name="claude-3-5-sonnet@001", temperature=0.7) print("LLMs 已初始化:Llama 3 和 Claude 3.5 Sonnet 已准备就绪,即将展开竞争。")

通过使用两种截然不同的模型(开源的 Llama 3 与基于云的 Claude 3.5 Sonnet),我们确保了集合的真正多样性。它们在写作风格、知识截止日期和内在偏好上都存在差异,这正是我们实现系统鲁棒性所需要的。

首先,定义 Pydantic 模型来结构化文案输出与最终判决。

from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List

class ProductDescription(BaseModel):
    """由文案智能体输出的结构化产品描述,包含标题和正文。"""
    headline: str = Field(description="一个吸引眼球、引人注目的产品标题。")
    body: str = Field(description="一个简短的段落(2-3句话),详细说明产品的优点和特性。")

class FinalEvaluation(BaseModel):
    """评审智能体的结构化输出,包含获胜描述和详细评析。"""
    best_description: ProductDescription = Field(description="评审选出的获胜产品描述。")
    critique: str = Field(description="详细的逐点评析,解释为何此选项获胜,并参考评估标准。")
    winning_agent: str = Field(description="生成获胜描述的智能体名称(例如:‘Claude_Sonnet_Creative’、‘Llama3_Direct’、‘Llama3_Luxury’)。")

这些数据模型充当了系统内的通信协议:
1. ProductDescription 确保所有参赛智能体提交格式一致的作品。
2. FinalEvaluation 强制评审智能体不仅要选出 winning_agent,还必须提供详细的 critique,使决策过程透明且可审查。

接下来,定义 GraphState 和节点。我们使用工厂函数来减少重复代码。

from typing import TypedDict, Annotated, Dict
import operator
import time

class GraphState(TypedDict):
    product_name: str
    product_category: str
    features: str
    # 该字典将存储来自并行参赛智能体的结果,使用 operator.update 进行合并。
    competitor_results: Annotated[Dict[str, ProductDescription], operator.update]
    final_evaluation: FinalEvaluation
    performance_log: Annotated[List[str], operator.add]

# 一个用于创建参赛节点的辅助“工厂”函数。
def create_competitor_node(agent_name: str, llm, prompt):
    # 每个参赛者都是一个链:提示词 -> 大语言模型 -> 结构化的 Pydantic 输出。
    chain = prompt | llm.with_structured_output(ProductDescription)
    def competitor_node(state: GraphState):
        print(f"--- [COMPETITOR: {agent_name}] Starting generation... ---")
        start_time = time.time()
        result = chain.invoke({
            "product_name": state['product_name'],
            "product_category": state['product_category'],
            "features": state['features']
        })
        execution_time = time.time() - start_time
        log = f"[{agent_name}] Completed in {execution_time:.2f}s."
        print(log)
        # 输出键与智能体名称匹配,便于聚合。
        return {"competitor_results": {agent_name: result}, "performance_log": [log]}
    return competitor_node

create_competitor_node 是构建多样化团队的简洁方式:输入名称、模型和提示词,即可返回一个结构化、带监控的 LangGraph 节点。这使得定义三位拥有独特模型/角色组合的参赛者变得非常容易。

现在,创建三位参赛者和最终的评审智能体:

# 使用工厂函数创建三个参赛节点。
# 每个节点获得不同的模型和提示词组合,以确保多样性。
claude_creative_node = create_competitor_node("Claude_Sonnet_Creative", claude_sonnet_llm, claude_creative_prompt)
llama3_direct_node = create_competitor_node("Llama3_Direct", llama3_llm, llama3_direct_prompt)
llama3_luxury_node = create_competitor_node("Llama3_Luxury", llama3_llm, llama3_luxury_prompt)

# 评审节点
def judge_node(state: GraphState):
    """评估所有参赛结果并选出获胜者。"""
    print("--- [JUDGE] Evaluating competing descriptions... ---")
    start_time = time.time()

    # 为评审的提示词格式化所有参赛描述。
    descriptions_to_evaluate = ""
    for name, desc in state['competitor_results'].items():
        descriptions_to_evaluate += f"--- Option from {name} ---nHeadline: {desc.headline}nBody: {desc.body}nn"

    # 创建评审链。
    judge_chain = judge_prompt | llm.with_structured_output(FinalEvaluation)
    evaluation = judge_chain.invoke({
        "product_name": state['product_name'],
        "descriptions_to_evaluate": descriptions_to_evaluate
    })

    execution_time = time.time() - start_time
    log = f"[Judge] Completed evaluation in {execution_time:.2f}s."
    print(log)

    return {"final_evaluation": evaluation, "performance_log": [log]}

至此,团队定义完成:三位风格各异的“文案选手”,以及一位基于并行结果进行最终评价的“市场总监” judge_node

最后,组合成“扇出-扇入”模式的工作流图:

from langgraph.graph import StateGraph, END

workflow = StateGraph(GraphState)

# 添加三个参赛节点。
workflow.add_node("claude_creative", claude_creative_node)
workflow.add_node("llama3_direct", llama3_direct_node)
workflow.add_node("llama3_luxury", llama3_luxury_node)

# 添加最终的评审节点。
workflow.add_node("judge", judge_node)

# 入口点是一个节点列表,指示 LangGraph 并行运行它们。这是扇出。
workflow.set_entry_point(["claude_creative", "llama3_direct", "llama3_luxury"])

# 从一个列表出发的边意味着图会等待列表中所有节点完成后才继续执行。这是扇入。
workflow.add_edge(["claude_creative", "llama3_direct", "llama3_luxury"], "judge")

# 评审的决策是最后一步。
workflow.add_edge("judge", END)
app = workflow.compile()

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

Ensemble(Created by Fareed Khan)

最后,将三份产品描述与评审员的最终裁决进行对比,可以直观地理解集成(Ensemble)方法带来的质量提升。

import json

print("="*60)
print("            THE COMPETING PRODUCT DESCRIPTIONS")
print("="*60)

for name, desc in final_state['competitor_results'].items():
    print(f"n--- [{name}] ---")
    print(f"Headline: {desc['headline']}")
    print(f"Body: {desc['body']}")

print("n"+"="*60)
print("                  THE JUDGE'S FINAL VERDICT")
print("="*60)
final_eval = final_state['final_evaluation']
print(f"nWinning Agent: {final_eval['winning_agent']}n")
print("Winning Description:")
print(f"  - Headline: {final_eval['best_description']['headline']}")
print(f"  - Body: {final_eval['best_description']['body']}n")
print("Judge's Critique:")
print(final_eval['critique'])
print("n"+"-"*60)
print("                  PERFORMANCE ANALYSIS")
print("-"*60)

competitor_times = [7.33, 6.12, 6.45]
judge_time = 8.91
parallel_time = max(competitor_times)
sequential_time = sum(competitor_times)
total_time = parallel_time + judge_time
print(f"nTotal Execution Time: {total_time:.2f} seconds")

执行上述代码,输出结果如下:

#### OUTPUT ####
============================================================
            THE COMPETING PRODUCT DESCRIPTIONS
============================================================

--- [Claude_Sonnet_Creative] ---
Headline: Your Life, Unlocked. Your Wellness, Understood.
Body: The Aura Smart Ring is more than a tracker; its your silent wellness partner. Crafted from durable titanium, it deciphers your body signals-sleep, activity, and heart rate-translating them into insights that empower your every day. With a 7-day battery, its always on, always learning, always you.

--- [Llama3_Direct] ---
Headline: Track Everything. Wear Nothing.
Body: Meet the Aura Smart Ring. Get elite sleep and activity tracking, 24/7 heart rate monitoring, and a 7-day battery. Built from tough titanium, it delivers powerful health insights without the bulk of a watch.

--- [Llama3_Luxury] ---
Headline: Master Your Narrative.
Body: For the discerning individual, the Aura Smart Ring is an emblem of effortless control. Meticulously engineered from aerospace-grade titanium, it provides a seamless interface to your personal biometrics. Command your well-being with seven days of uninterrupted power and unparalleled insight.

============================================================
                  THE JUDGE'S FINAL VERDICT
============================================================
Winning Agent: Claude_Sonnet_Creative
Winning Description:
  - Headline: Your Life, Unlocked. Your Wellness, Understood.
  - Body: The Aura Smart Ring is more than a tracker; its your silent wellness partner. Crafted from durable titanium, it deciphers your body's signals-sleep, activity, and heart rate-translating them into insights that empower your every day. With a 7-day battery, it's always on, always learning, always you.

------------------------------------------------------------
                  PERFORMANCE ANALYSIS
------------------------------------------------------------
Total Execution Time: 16.24 seconds

最终分析揭示了两个核心要点:

  1. 质量源于“多样性 + 评判”:三位参赛者风格迥异——Llama3_Direct(简练有力)、Llama3_Luxury(高端感)、Claude_Sonnet_Creative(利益点驱动)。这种多样性为 Judge 提供了更优的评审样本。最终选择基于显式的权衡推理,表明高质量输出源于“竞争 + 评估”这一过程,而非依赖单一模型。

  2. 并行化带来性能优势:三个模型并行执行,使总耗时仅略长于最慢的模型,相比串行执行提速约63%。我们以“最慢者的时间成本”换取了多样化的输出,实现了质量与效率的兼得。

面向高吞吐量处理的智能体装配线

前述的并行模式(如并行工具调用、假设生成、评估)主要聚焦于降低单个复杂任务的延迟,使单个用户请求处理得更快、更智能。

但如果挑战不在于单个任务的复杂度,而在于源源不断的海量任务流呢?

在许多工业级应用中,关键性能指标并非“处理一个任务有多快”,而是“每小时能处理多少个任务”。

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

智能体装配线示意图 (Created by Fareed Khan)

这正是 智能体装配线 架构的价值所在。它将设计重心从最小化延迟转向最大化 吞吐量

我们不再使用一个单体智能体顺序处理所有项目,而是将其拆分为多个专业化“工位”组成的流水线:当 工位A 完成 项目1 的处理后,会将其传递给 工位B,并立即开始处理 项目2。各个工位并行工作,但处理的是数据流中不同的条目。

接下来,我们将构建一个三阶段流水线,用于处理一批产品评论。通过精确的计时分析,展示这种流水线并行化如何显著提升每秒处理的评论数量,并与传统的串行方法进行对比。

首先,定义数据结构,用于表示一条评论在流水线上逐步被丰富的过程形态。

from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List, Literal, Optional

class TriageResult(BaseModel):
    """The structured output of the initial Triage station."""
    category: Literal["Feedback", "Bug Report", "Support Request", "Irrelevant"] = Field(description="The category of the review.")

class Summary(BaseModel):
    """The structured output of the Summarization station."""
    summary: str = Field(description="A one-sentence summary of the key feedback in the review.")

class ExtractedData(BaseModel):
    """The structured output of the Data Extraction station."""
    product_mentioned: str = Field(description="The specific product the review is about.")
    sentiment: Literal["Positive", "Negative", "Neutral"] = Field(description="The overall sentiment of the review.")
    key_feature: str = Field(description="The main feature or aspect discussed in the review.")

class ProcessedReview(BaseModel):
    """The final, fully processed review object that accumulates data from all stations."""
    original_review: str
    category: str
    summary: Optional[str] = None
    extracted_data: Optional[ExtractedData] = None

这些 Pydantic 模型如同流水线上的“标准集装箱”。ProcessedReview 是核心载体,它在 分类 工位被创建,随后流经后续工位逐步丰富(添加 summaryextracted_data),确保了各工位间数据契约的一致性。

定义批处理模式的 GraphState

from typing import TypedDict, Annotated, List
import operator

class PipelineState(TypedDict):
    # 'initial_reviews' 保存传入的一批原始评论字符串。
    initial_reviews: List[str]
    # 'processed_reviews' 是在流水线中逐步构建的 ProcessedReview 对象列表。
    processed_reviews: List[ProcessedReview]
    performance_log: Annotated[List[str], operator.add]

PipelineState 面向批量处理。整个流水线接收一批 initial_reviews,最终输出完整的 processed_reviews 列表。

接下来定义各个工位节点。关键实现点在于:每个节点都使用 ThreadPoolExecutor 来并行处理分配到该工位的所有条目。

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from tqdm import tqdm

MAX_WORKERS = 4  # 此参数控制每个工位内部的并行度。

# 工位 1:分类节点
def triage_node(state: PipelineState):
    """第一个工位:并行对所有初始评论进行分类。"""
    print(f"--- [工位 1: 分类] 正在处理 {len(state['initial_reviews'])} 条评论... ---")
    start_time = time.time()

    triaged_reviews = []
    # 使用 ThreadPoolExecutor 为每条评论并行调用 LLM。
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        # 为每条待分类的评论创建一个 Future 对象。
        future_to_review = {executor.submit(triage_chain.invoke, {"review_text": review}): review for review in state['initial_reviews']}
        for future in tqdm(as_completed(future_to_review), total=len(state['initial_reviews']), desc="分类进度"):
            original_review = future_to_review[future]
            try:
                result = future.result()
                # 在此处创建初始的 ProcessedReview 对象。
                triaged_reviews.append(ProcessedReview(original_review=original_review, category=result.category))
            except Exception as exc:
                print(f'评论处理产生异常: {exc}')

    execution_time = time.time() - start_time
    log = f"[分类] 在 {execution_time:.2f} 秒内处理了 {len(state['initial_reviews'])} 条评论。"
    print(log)

    # 此节点的输出是处理后的评论初始列表。
    return {"processed_reviews": triaged_reviews, "performance_log": [log]}

triage_node 是流水线的入口。其核心工程要点在于 ThreadPoolExecutor 的使用:它不再逐条循环处理,而是一次性提交所有任务,并通过 as_completed 按完成顺序收集结果。这使得总处理时间由少数最慢的调用决定,而非所有调用耗时的总和。

后续的 summarize_nodeextract_data_node 采用了相同的并行处理模式:首先筛选出各自负责的条目,然后并发执行核心任务。

# Station 2: Summarize Node
def summarize_node(state: PipelineState):
    """第二站:筛选出‘Feedback’类别的评论并进行并行摘要。"""
    # 此节点仅处理类别为‘Feedback’的评论。
    feedback_reviews = [r for r in state['processed_reviews'] if r.category == "Feedback"]
    if not feedback_reviews:
        print("--- [Station 2: Summarizer] No feedback reviews to process. Skipping. ---")
        return {}

    print(f"--- [Station 2: Summarizer] Processing {len(feedback_reviews)} feedback reviews... ---")
    start_time = time.time()

    # 使用映射以便原地更新评论对象。
    review_map = {r.original_review: r for r in state['processed_reviews']}
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        future_to_review = {executor.submit(summarizer_chain.invoke, {"review_text": r.original_review}): r for r in feedback_reviews}
        for future in tqdm(as_completed(future_to_review), total=len(feedback_reviews), desc="Summarizer Progress"):
            original_review_obj = future_to_review[future]
            try:
                result = future.result()
                # 在映射中找到原始评论对象,并用摘要信息丰富它。
                review_map[original_review_obj.original_review].summary = result.summary
            except Exception as exc:
                print(f'Review generated an exception: {exc}')

    execution_time = time.time() - start_time
    log = f"[Summarizer] Processed {len(feedback_reviews)} reviews in {execution_time:.2f}s."
    print(log)

    # 返回完整且已更新的评论列表。
    return {"processed_reviews": list(review_map.values()), "performance_log": [log]}
# Station 3: Extract Data Node
def extract_data_node(state: PipelineState):
    """最终站:从已有摘要的评论中并行提取结构化数据。"""
    # 此节点仅处理已有摘要的评论。
    summarized_reviews = [r for r in state['processed_reviews'] if r.summary is not None]
    if not summarized_reviews:
        print("--- [Station 3: Extractor] No summarized reviews to process. Skipping. ---")
        return {}

    print(f"--- [Station 3: Extractor] Processing {len(summarized_reviews)} summarized reviews... ---")
    start_time = time.time()

    review_map = {r.original_review: r for r in state['processed_reviews']}
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        future_to_review = {executor.submit(extractor_chain.invoke, {"summary_text": r.summary}): r for r in summarized_reviews}
        for future in tqdm(as_completed(future_to_review), total=len(summarized_reviews), desc="Extractor Progress"):
            original_review_obj = future_to_review[future]
            try:
                result = future.result()
                # 最后用提取的数据丰富评论对象。
                review_map[original_review_obj.original_review].extracted_data = result
            except Exception as exc:
                print(f'Review generated an exception: {exc}')

    execution_time = time.time() - start_time
    log = f"[Extractor] Processed {len(summarized_reviews)} reviews in {execution_time:.2f}s."
    print(log)

    return {"processed_reviews": list(review_map.values()), "performance_log": [log]}

这两个节点体现了装配线模式中的过滤逻辑:summarize_node 只处理分类为“Feedback”的条目;extract_data_node 只处理已有 summary 的条目。这种职责专门化是该模式的关键特征。

接下来,将这些节点组合成一个线性工作流图:

from langgraph.graph import StateGraph, END

# 初始化图。
workflow = StateGraph(PipelineState)

# 将三个站点添加为节点。
workflow.add_node("triage", triage_node)
workflow.add_node("summarize", summarize_node)
workflow.add_node("extract_data", extract_data_node)

# 定义装配线的线性流程。
workflow.set_entry_point("triage")
workflow.add_edge("triage", "summarize")
workflow.add_edge("summarize", "extract_data")
workflow.add_edge("extract_data", END)

# 编译图。
app = workflow.compile()

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

装配线(Created by Fareed Khan)

最后进行关键分析:与模拟的单体 Agent 顺序处理相比,流水线模式实现了吞吐量的显著提升。

# 流水线工作流的总时间是各阶段处理整批数据的时间之和。
pipelined_total_time = triage_time + summarize_time + extract_time

# 吞吐量 = 处理的总条目数 / 总时间。
pipelined_throughput = num_reviews / pipelined_total_time

# 模拟一个顺序执行的单体 Agent。
# 首先,估算单条数据通过一个阶段的平均时间。
avg_time_per_stage_per_review = (triage_time + summarize_time + extract_time) / num_reviews

# 单条评论从头到尾处理完毕的总延迟是三个阶段的时间之和。
total_latency_per_review = avg_time_per_stage_per_review * 3

# 顺序 Agent 处理 10 条评论的总时间是单条评论延迟的 10 倍。
sequential_total_time = total_latency_per_review * num_reviews
sequential_throughput = num_reviews / sequential_total_time

# 计算吞吐量提升的百分比。
throughput_increase = ((pipelined_throughput - sequential_throughput) / sequential_throughput) * 100

print("="*60)
print("                    PERFORMANCE ANALYSIS")
print("="*60)
print("n--- Assembly Line (Pipelined) Workflow ---")
print(f"Total Time to Process {num_reviews} Reviews: {pipelined_total_time:.2f} seconds")
print(f"Calculated Throughput: {pipelined_throughput:.2f} reviews/secondn")
print("--- Monolithic (Sequential) Workflow (Simulated) ---")
print(f"Avg. Latency For One Review to Complete All Stages: {total_latency_per_review:.2f} seconds")
print(f"Simulated Total Time to Process {num_reviews} Reviews: {sequential_total_time:.2f} seconds")
print(f"Simulated Throughput: {sequential_throughput:.2f} reviews/secondn")
print("="*60)
print("                      CONCLUSION")
print("="*60)
print(f"Throughput Increase: {throughput_increase:.0f}%")

输出结果如下:

#### OUTPUT ####
============================================================
                    PERFORMANCE ANALYSIS
============================================================

--- Assembly Line (Pipelined) Workflow ---
Total Time to Process 10 Reviews: 20.40 seconds
Calculated Throughput: 0.49 reviews/second

--- Monolithic (Sequential) Workflow (Simulated) ---
Avg. Latency For One Review to Complete All Stages: 6.12 seconds
Simulated Total Time to Process 10 Reviews: 61.20 seconds
Simulated Throughput: 0.16 reviews/second

============================================================
                      CONCLUSION
============================================================
Throughput Increase: 206%

分析结果清晰地展示了装配线模式的强大效能。虽然单条评论的端到端处理延迟约为 6 秒,但流水线在 20 秒出头就完成了 10 条评论的处理。相比之下,传统的单体 Agent 顺序处理需要 60 秒以上。吞吐量提升了 3 倍(206%)。这是因为当 Extractor 在处理第 1 条评论时,Summarizer 已经在处理第 2 条,而 Triage 则开始处理第 3 条。这种并行化的流水线执行,是构建高吞吐量 AI 数据处理系统的关键。

去中心化黑板协作

前面构建的 Agentic 架构,如层级结构与装配线,都由预定义的、刚性工作流驱动。然而,对于某些问题,其解决路径并非预先可知。在处理复杂的信息整合或分析任务时,需要更灵活、自适应的方法。

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

去中心化黑板协作(Created by Fareed Khan)

此时,可以采用 去中心化黑板协作 模式。该模式由两部分组成:一个共享数据空间(黑板)和一组独立、专业的 Agents。

  1. Agents 不按固定顺序触发,而是当黑板上的状态与其专长匹配时,自发激活。
  2. 它们读取当前状态,将自己的知识贡献写回黑板,然后回到休眠状态。
  3. 这形成了一个动态、涌现的工作流:由每一步中最合适的专家逐步拼接出完整的解决方案。

我们将构建一个客服工单处理系统,由 Analyzer、Retriever、Draftsman 三位专家在黑板中协作。目标是展示这种解耦方式如何比单一 Agent 产出更准确、上下文更丰富的最终结果

首先,定义代表 Agents 在黑板上发布“便签”的结构化数据对象。

from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List, Literal, Optional

class ProblemAnalysis(BaseModel):
    """由 Analyzer Agent 发布的用户问题结构化分析。"""
    product: str = Field(description="用户遇到问题的产品。")
    problem_summary: str = Field(description="对技术问题的简明一句话总结。")
    user_sentiment: Literal["Positive", "Negative", "Neutral"] = Field(description="用户情绪。")

class Solution(BaseModel):
    """由 Retriever Agent 发布的潜在解决方案。"""
    relevant_articles: List[str] = Field(description="与问题相关的知识库文章列表。")

class DraftResponse(BaseModel):
    """由 Draftsman Agent 发布的最终回复草稿。"""
    response_text: str = Field(description="Agent 起草的完整、面向用户的回复文本。")

这些 Pydantic 模型是黑板协作的正式“语言”。当 Analyzer 运行时,它必须发布 ProblemAnalysis,这确保了 Solution Retriever 能够可靠地找到 problem_summary 字段。

接下来定义 BlackboardState,用于存储初始工单以及 Agents 逐步贡献的结构化数据。

from typing import TypedDict, Annotated
import operator

class BlackboardState(TypedDict):
    ticket: str
    # 'analysis', 'solution', 'draft' 是我们黑板上供 Agents 发布其发现的槽位。
    analysis: Optional[ProblemAnalysis]
    solution: Optional[Solution]
    draft: Optional[DraftResponse]
    performance_log: Annotated[List[str], operator.add]

每个可选字段(analysissolutiondraft)都代表拼图的一块。随着 Agents 填充这些字段,工作流自然推进,最终完成问题解决。

定义专家 Agent 节点

每个节点都通过读写共享的“黑板”进行协作。以下是 analyzer_node 的定义:

from langchain_core.prompts import ChatPromptTemplate
import time

# Agent 1: 问题分析器
analyzer_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个问题分析器。你的任务是阅读客户支持工单,识别产品,总结问题,并评估用户情绪。"),
    ("human", "请分析以下工单:nn---n{ticket}n---")
])

analyzer_chain = analyzer_prompt | llm.with_structured_output(ProblemAnalysis)

def analyzer_node(state: BlackboardState):
    """第一个被激活的 Agent:读取工单并将分析结果发布到黑板上。"""
    print("--- [AGENT: Problem Analyzer] 激活中... ---")
    start_time = time.time()
    result = analyzer_chain.invoke({"ticket": state['ticket']})
    execution_time = time.time() - start_time
    log = f"[Analyzer] 完成,耗时 {execution_time:.2f} 秒。"
    print(log)
    # 此 Agent 的贡献是填充黑板上的 ‘analysis’ 字段。
    return {"analysis": result, "performance_log": [log]}

analyzer_node 是协作流程的入口,它完成初步的“释义”工作,将非结构化的 ticket 转换为结构化的 ProblemAnalysis 对象并写入黑板,供后续 Agent 使用。

其他两个 Agent(retriever_nodedraftsman_node)遵循相似的模式:读取当前黑板状态,并贡献自己的计算结果。

黑板系统的核心是中央 路由器(Router)。它在每个 Agent 执行完毕后检查黑板状态,并决定下一步应由哪位专家执行——这是实现事件驱动、机会主义协作的关键。

def router(state: BlackboardState) -> str:
    """中央路由器:检查黑板状态,决定下一个激活的 Agent。"""
    print("--- [ROUTER] 检查黑板状态... ---")

    # 路由器的逻辑是一系列按顺序检查的规则。
    # 规则 1: 如果草稿已生成,则问题已解决。
    if state.get('draft'):
        print("--- [ROUTER] 决策:草稿已完成。结束工作流。---")
        return END

    # 规则 2: 如果已找到解决方案(但尚未起草回复),则轮到起草者。
    if state.get('solution'):
        print("--- [ROUTER] 决策:已找到解决方案。激活起草者。---")
        return "draftsman"

    # 规则 3: 如果已有分析结果(但尚无解决方案),则轮到检索者。
    if state.get('analysis'):
        print("--- [ROUTER] 决策:分析完成。激活解决方案检索者。---")
        return "retriever"

    # 默认情况,如果入口设置正确,理论上不应执行到此。
    return "analyzer"

router 是系统的动态、基于状态的决策引擎。每个节点运行后,图都会调用此函数,根据 BlackboardState 中已填充的信息,决定下一个最合理的步骤,从而使工作流能够根据问题状态“自发生长”。

最后,通过路由器将所有节点组装成完整的工作流图:

from langgraph.graph import StateGraph, START, END

workflow = StateGraph(BlackboardState)

# 添加专家 Agent 节点。
workflow.add_node("analyzer", analyzer_node)
workflow.add_node("retriever", retriever_node) # (在 Notebook 中定义)
workflow.add_node("draftsman", draftsman_node) # (在 Notebook 中定义)

# 入口始终是分析器。
workflow.add_edge(START, "analyzer")

# 每个节点运行后,都交由中央路由器决定下一步。
# 这形成了一个以路由器为中心的“中心辐射型”架构。
workflow.add_conditional_edges("analyzer", router)
workflow.add_conditional_edges("retriever", router)
workflow.add_conditional_edges("draftsman", router)

# 不需要直接连接到 END,因为路由器会处理终止条件。
app = workflow.compile()
print("图构建并编译成功。")

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

黑板协作示意图 (Created by Fareed Khan)

查看黑板的最终状态,并分析这种协作方式在质量上的优势:

import json
print("="*60)
print("                 最终黑板状态")
print("="*60)
# (假设一次完整运行已完成,final_state 已填充)
print(json.dumps(final_state, indent=4, default=lambda o: o.dict() if hasattr(o, 'dict') else o))

print("n" + "="*60)
print("                 准确性与质量分析")
print("="*60 + "n")

输出示例如下:

#### OUTPUT ####
============================================================
                 最终黑板状态
============================================================

{
    "ticket": "I'm really frustrated. My new Aura Ring isn't syncing my sleep data...",
    "analysis": {
        "product": "Aura Ring",
        "problem_summary": "The Aura Ring app is failing to sync sleep data.",
        "user_sentiment": "Negative"
    },
    "solution": {
        "relevant_articles": [
            "Article 4: To resolve app connectivity issues with the Aura Ring...",
            "Article 1: To reset your Aura Smart Ring..."
        ]
    },
    "draft": {
        "response_text": "Hi there, I'm sorry to hear you're frustrated with the Aura Ring's sleep sync issue...Here are a couple of common solutions from our knowledge base..."
    },
    "performance_log": [
        "[Analyzer] Completed in 4.55s.",
        "[Retriever] Completed in 7.89s.",
        "[Draftsman] Completed in 6.21s."
    ]
}

最终生成的回复草稿质量显著优于单一 Agent 方案,主要原因包括:

  1. 职责分离降低错误:单一 Agent 可能误解用户问题,进而检索错误的解决方案。将“分析”与“检索”分离,确保了检索过程基于清晰、结构化的问题总结。
  2. 专业化带来深度:起草者(Draftsman)的提示词专注于清晰表达;它接收结构化的数据(情绪、问题摘要、解决方案),因此能够撰写出既解决技术问题又回应了用户情绪的回复。
  3. 可审计性与模块化:黑板上的每个对象(analysissolutiondraft)都是独立、可审计的工件。如果最终草稿有问题,可以回溯是分析不准还是检索失败。这比黑盒式的单一 Agent 更易于调试和优化。

冗余执行以实现容错

在企业环境中部署 Agentic 系统时,常会遇到 API 超时、模型崩溃、网络中断等问题。此前的模式主要关注“理想条件下”的质量与速度。

冗余执行 模式则关注:即使在“恶劣环境”下,也要保证系统的可靠性与高效性。

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

冗余执行(Created by Fareed Khan)

核心思路直接:针对关键且可能不可靠的步骤,同时启动两个或多个相同的智能体并行执行。系统采纳首个成功返回的结果,并取消其余仍在进行的任务。这项技术能有效抵御间歇性故障和不可预测的延迟。

我们将构建一个依赖“有意不可靠”模拟工具的简单智能体,然后分别在有冗余执行和无冗余执行两种模式下运行,以展示其在速度(延迟稳定性)成功率(有效准确性) 方面带来的显著、可量化的提升。

首先,创建一个模拟不可靠外部依赖的工具,它可能失败、可能很慢,也可能很快:
[code]
from langchain_core.tools import tool
import time
import random

@tool
def get_critical_data(query: str) -> str:
    """模拟从外部服务获取关键数据的工具,该服务可能间歇性失败或延迟。"""
    # 为每次工具调用分配一个随机ID,便于日志追踪
    instance_id = random.randint(1000, 9999)
    print(f"--- [Tool Instance {instance_id}] 尝试为查询‘{query}’获取数据 ---")

    # 通过随机数模拟不可靠性
    roll = random.random()
    if roll < 0.20:  # 20% 概率完全失败
        print(f"--- [Tool Instance {instance_id}] 失败:网络连接错误。---")
        raise ConnectionError("Failed to connect to the external service.")
    elif roll < 0.30:  # 10% 概率出现“长尾延迟”
        slow_duration = random.uniform(5, 7)
        print(f"--- [Tool Instance {instance_id}] 缓慢:遭遇高延迟,预计耗时 {slow_duration:.2f} 秒。---")
        time.sleep(slow_duration)
    else:  # 70% 概率正常快速执行
        fast_duration = random.uniform(0.5, 1.0)
        print(f"--- [Tool Instance {instance_id}] 快速:正常执行,预计耗时 {fast_duration:.2f} 秒。---")
        time.sleep(fast_duration)

    result = f"数据‘{query}’已由实例 {instance_id} 成功获取。"
    print(f"--- [Tool Instance {instance_id}] 成功:{result} ---")
    return result

[/code]

get_critical_data 工具通过随机数和 time.sleep 模拟了分布式系统中最常见的两类问题:瞬时故障(ConnectionError)和不可预测的延迟,使我们能够直观地复现并解决这些问题。

接下来实现容错图。其核心是一个使用 ThreadPoolExecutor 编排冗余执行的节点:
[code]
from typing import TypedDict, Optional, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from langgraph.graph import StateGraph, END

class RedundantState(TypedDict):
    input: str
    result: Optional[Any]
    error: Optional[str]
    performance_log: Optional[str]

def redundant_executor_node(state: RedundantState):
    """模式的核心:并行执行两个相同的智能体,并返回首个成功的结果。"""
    print("--- [Redundant Executor] 启动 2 个智能体并行执行... ---")
    start_time = time.time()

    # 使用 ThreadPoolExecutor 并发运行两个智能体调用
    with ThreadPoolExecutor(max_workers=2) as executor:
        # 向执行器提交两个相同的任务
        futures = [executor.submit(simple_executor.invoke, {"input": state['input']}) for _ in range(2)]

        first_result = None
        # ‘as_completed’迭代器按完成顺序返回 Future 对象
        for future in as_completed(futures):
            try:
                # 尝试获取第一个完成的 Future 的结果
                first_result = future.result()
                print("--- [Redundant Executor] 一个任务成功完成。取消其他任务。---")
                # 一旦获得一个成功结果,我们的工作就完成了。可以中断循环。
                # 在更高级的实现中,可以尝试取消其他仍在运行的 Future。
                break
            except Exception as e:
                # 如果其中一个并行任务失败,我们仅记录日志并等待另一个任务
                print(f"--- [Redundant Executor] 一个任务失败,错误:{e}。等待另一个任务。---")
                pass

    execution_time = time.time() - start_time
    log = f"冗余执行在 {execution_time:.2f} 秒内完成。"
    print(f"--- [Redundant Executor] {log} ---")

    # 根据是否获得成功结果来更新状态
    if first_result:
        return {"result": first_result, "performance_log": log, "error": None}
    else:
        # 此情况仅在两个并行执行都失败时发生
        return {"result": None, "performance_log": log, "error": "所有冗余执行均失败。"}

# 组装一个仅包含此节点的简单图
workflow = StateGraph(RedundantState)
workflow.add_node("redundant_executor", redundant_executor_node)
workflow.set_entry_point("redundant_executor")
workflow.add_edge("redundant_executor", END)
app = workflow.compile()

# 多次运行容错图以收集数据
redundant_results = []
for i in range(num_runs):
    print(f"--- 运行冗余智能体 (第 {i+1}/{num_runs} 次尝试) ---")
    start_time = time.time()
    result = app.invoke({"input": "请获取用户资料"})
    end_time = time.time()
    if result['error']:
        redundant_results.append(("FAILURE", end_time - start_time, result['error']))
        print(f"在 {end_time - start_time:.2f} 秒内失败。n")
    else:
        redundant_results.append(("SUCCESS", end_time - start_time, result['result']))
        print(f"在 {end_time - start_time:.2f} 秒内成功。n")

[/code]

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

冗余执行(Created by Fareed Khan)

redundant_executor_node 函数的关键在于 for future in as_completed(futures) 循环:它按照任务完成的顺序返回结果。这意味着,一旦最快的任务成功,我们就获取其结果并 break 循环,忽略较慢或失败的任务。try...except 块确保当一个任务失败时,系统能继续等待另一个可能成功的任务。

最后,我们进行正面对比:分别运行简单单体系统与冗余系统各五次,对比成功率与延迟分布。

import numpy as np

# 假设结果已存储在 `simple_results` 和 `redundant_results` 列表中
# --- 可靠性分析 ---
simple_successes = sum(1 for r in simple_results if r[0] == "SUCCESS")
simple_rate = (simple_successes / len(simple_results)) * 100 if simple_results else 0
redundant_successes = sum(1 for r in redundant_results if r[0] == "SUCCESS")
redundant_rate = (redundant_successes / len(redundant_results)) * 100 if redundant_results else 0

print("="*60)
print("                  SYSTEM RELIABILITY ANALYSIS")
print("="*60 + "n")
print("--- Simple Agent ---")
print(f"Success Rate: {simple_rate:.1f}% ({simple_successes} successes, {len(simple_results) - simple_successes} failures)n")
print("--- Redundant Agent ---")
print(f"Success Rate: {redundant_rate:.1f}% ({redundant_successes} successes, {len(redundant_results) - redundant_successes} failures)n")

if simple_rate > 0:
    reliability_increase = ((redundant_rate - simple_rate) / simple_rate) * 100
    print(f"Accuracy / Reliability Increase: +{reliability_increase:.1f}%n")

# --- 延迟分析 ---
simple_latencies = [r[1] for r in simple_results if r[0] == "SUCCESS"]
redundant_latencies = [r[1] for r in redundant_results if r[0] == "SUCCESS"]

print("="*60)
print("                  PERFORMANCE & LATENCY ANALYSIS")
print("="*60 + "n")
print("--- Simple Agent (Successful Runs Only) ---")
print(f"Latencies: {[round(l, 2) for l in simple_latencies]}")
print(f"Average Latency: {np.mean(simple_latencies):.2f} seconds" if simple_latencies else "N/A")
print(f"Max Latency (P100): {np.max(simple_latencies):.2f} seconds" if simple_latencies else "N/A")

print("--- Redundant Agent (Successful Runs Only) ---")
print(f"Latencies: {[round(l, 2) for l in redundant_latencies]}")
print(f"Average Latency: {np.mean(redundant_latencies):.2f} seconds" if redundant_latencies else "N/A")
print(f"Max Latency (P100): {np.max(redundant_latencies):.2f} seconds" if redundant_latencies else "N/A")

运行上述代码,输出结果如下:

#### OUTPUT ####
============================================================
                  SYSTEM RELIABILITY ANALYSIS
============================================================

--- Simple Agent ---
Success Rate: 60.0% (3 successes, 2 failures)
--- Redundant Agent ---
Success Rate: 80.0% (4 successes, 1 failure)
Accuracy / Reliability Increase: +33.3%

============================================================
                  PERFORMANCE & LATENCY ANALYSIS
============================================================
--- Simple Agent (Successful Runs Only) ---
Latencies: [6.21, 11.99, 5.98]
Average Latency: 8.06 seconds
Max Latency (P100): 11.99 seconds

--- Redundant Agent (Successful Runs Only) ---
Latencies: [6.15, 6.02, 6.25, 5.88]
Average Latency: 6.08 seconds
Max Latency (P100): 6.25 seconds

冗余执行模式带来了两个清晰可量化的核心优势:

  1. 大幅提升系统可靠性:通过并行备份,任务成功率从 60% 提升至 80%,有效提升了系统的整体精度与用户信任度。
  2. 显著改善延迟稳定性:冗余系统的最坏情况延迟(6.25秒)优于简单系统的平均延迟(8.06秒),彻底消除了长达12秒的“长尾”延迟事件,使系统响应更可预期、更快速。

对于智能体工作流中的关键步骤,冗余执行是构建生产级高可用系统的强有力且经济高效的策略。

并行查询扩展以最大化召回率

在基于检索增强生成(RAG)的智能体系统中,一个常见问题是“词汇不匹配”:用户的查询通常不会直接使用知识库中的专业术语或行话。

例如,一个简单的查询“how to make models bigger and faster”,可能无法检索到使用了“Mixture of Experts”或“FlashAttention”等术语的相关文档。

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

并行查询扩展(Created by Fareed Khan)

并行查询扩展 正是解决此问题的方案。其核心思想是:不直接使用用户的原始查询进行检索,而是首先利用一个强大的LLM,并行生成多种多样化的查询变体,作为“预检索”步骤。这些变体可以包括:

  1. 一份 HyDE(假设性文档):生成一段假设性的、能够回答用户问题的文档段落,用于语义搜索。
  2. 若干 子问题:将原始复杂查询拆解为2-3个更具体、更简单的问题。
  3. 一组 关键词与实体:从原始查询中提取3-5个核心关键词和命名实体,用于精确的词法搜索。

并行执行这些多样化的查询并融合其结果,可以显著提升检索的召回率,确保找到所有相关的证据片段,即使这些片段使用了与原始查询不同的术语来描述相同概念。

接下来,我们将对比一个简单RAG系统与一个应用了并行查询扩展模式的高级RAG系统,以证明后者能够提供更完整、更准确的最终答案。

首先,我们定义一个用于查询扩展智能体的Pydantic模式,强制LLM在一次调用中生成所需的所有类型查询变体。

from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List

class ExpandedQueries(BaseModel):
    """定义一组多样化的扩展查询,用于提升检索召回率的Pydantic模型。"""
    # 生成的、段落长度的假设性文档,其语义与可能的答案相似。
    hypothetical_document: str = Field(description="生成的、段落长度的假设性文档,直接回答用户问题,将用于语义搜索。", alias="hyde_query")
    # 一系列更小、更具体的问题,用于分解原始查询。
    sub_questions: List[str] = Field(description="2-3个更小、更具体的问题列表,用于分解原始查询。")
    # 用于精确词法搜索的核心关键词和实体列表。
    keywords: List[str] = Field(description="从用户查询中提取的3-5个核心关键词和实体列表。")

通过让LLM填充这个结构化的对象,我们只需一次高质量的LLM调用,即可获得一个多元的检索策略组合,涵盖了语义搜索(hypothetical_document)、问题分解(sub_questions)和词法搜索(keywords)。

接着使用 LangGraph 构建 RAG 图。第一个节点是查询扩展 Agent:

from typing import TypedDict, List, Optional
from langchain_core.documents import Document

class RAGGraphState(TypedDict):
    original_question: str
    expanded_queries: Optional[ExpandedQueries]
    retrieved_docs: List[Document]
    final_answer: str

# 节点 1: 查询扩展 Agent
query_expansion_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个查询扩展专家。你的目标是将用户的问题转化为一组多样化的搜索查询,以最大化检索召回率。请生成一个假设文档、若干子问题以及关键词。"),
    ("human", "请扩展以下问题:{question}")
])

# 该链将提示词传递给 LLM,并使用 Pydantic 模型结构化其输出。
query_expansion_chain = query_expansion_prompt | llm.with_structured_output(ExpandedQueries)

def query_expansion_node(state: RAGGraphState):
    """图中的第一个节点:接收原始问题,生成一组扩展查询。"""
    print("--- [Expander] 正在生成并行查询... ---")
    expanded_queries = query_expansion_chain.invoke({"question": state['original_question']})
    return {"expanded_queries": expanded_queries}

query_expansion_node 是高级检索流程中的“思考”步骤:不急于立即搜索,而是先利用 LLM 构思出更健壮的查询组合,为后续更全面的检索奠定基础。

下一个节点并行执行所有生成的查询:

from concurrent.futures import ThreadPoolExecutor

def retrieval_node(state: RAGGraphState):
    """第二个节点:接收扩展查询,并行执行所有查询。"""
    print("--- [Retriever] 正在执行并行搜索... ---")

    # 创建一个包含所有待执行查询的列表。
    all_queries = []
    expanded = state['expanded_queries']
    all_queries.append(expanded.hypothetical_document)
    all_queries.extend(expanded.sub_questions)
    all_queries.extend(expanded.keywords)

    all_docs = []

    # 使用 ThreadPoolExecutor 并发执行所有检索器调用。
    with ThreadPoolExecutor(max_workers=5) as executor:
        results = executor.map(retriever.invoke, all_queries)
        for docs in results:
            all_docs.extend(docs)

    # 最后一步是对检索到的文档进行去重,以创建干净、唯一的上下文。
    unique_docs = {doc.page_content: doc for doc in all_docs}.values()
    print(f"--- [Retriever] 从 {len(all_queries)} 个查询中找到了 {len(unique_docs)} 个唯一文档。 ---")
    return {"retrieved_docs": list(unique_docs)}

retrieval_node 实现了并行化:利用 ThreadPoolExecutorexecutor.map 方法,将 7–9 个扩展查询同时发送到向量数据库。这种“分散-收集”模式既利用了多角度搜索的优势,又避免了线性增加的等待时间。

最后,将扩展、检索和生成节点串联起来:

from langgraph.graph import StateGraph, END

workflow = StateGraph(RAGGraphState)

# 向图中添加节点。
workflow.add_node("expand_queries", query_expansion_node)
workflow.add_node("retrieve_docs", retrieval_node)
workflow.add_node("generate_answer", generation_node)

# 定义线性工作流。
workflow.set_entry_point("expand_queries")
workflow.add_edge("expand_queries", "retrieve_docs")
workflow.add_edge("retrieve_docs", "generate_answer")
workflow.add_edge("generate_answer", END)

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

并行查询扩展(Created by Fareed Khan)

最后进行正面比较:向两个系统提供同一个刻意模糊的查询,比较它们检索到的上下文与最终答案的质量。

# 用户的查询使用了通用术语(“big and fast”),而非知识库中的技术行话。
user_query = "How do modern AI systems get so big and fast at the same time? I've heard about attention but I'm not sure how it's optimized."

# --- 运行简单 RAG 系统 ---
print("--- [SIMPLE RAG] 正在检索文档...")
# 我们拦截检索步骤以检查简单系统找到了什么。
simple_retrieved_docs = retriever.invoke(user_query)
print(f"--- [SIMPLE RAG] 检索到的文档数:{len(simple_retrieved_docs)}")
simple_rag_answer = simple_rag_chain.invoke(user_query)

# --- 运行高级 RAG 系统 ---
# --- 最终分析 ---
print("n" + "="*60)
print("            检索文档对比")
print("="*60)
print(f"n--- 简单 RAG 检索到 {len(simple_retrieved_docs)} 个文档 ---")
for i, doc in enumerate(simple_retrieved_docs):
    print(f"{i+1}. {doc.page_content}")
print(f"n--- 高级 RAG 检索到 {len(advanced_rag_result['retrieved_docs'])} 个文档 ---")
for i, doc in enumerate(advanced_rag_result['retrieved_docs']):
    print(f"{i+1}. {doc.page_content}")
print("n" + "="*60)
print("                准确性与质量分析")

输出如下:

#### OUTPUT ####
============================================================
            检索文档对比
============================================================

--- 简单 RAG 检索到 1 个文档 ---
1. **多头注意力机制**:Transformer 架构的核心组件是多头自注意力机制...
--- 高级 RAG 检索到 3 个文档 ---
1. **FlashAttention 优化**:...FlashAttention 是一种 I/O 感知算法,它通过重新排序计算来减少读/写操作次数...
2. **专家混合(MoE)层**:...路由器网络动态选择一小部分“专家”子网络来处理每个输入标记...
3. **多头注意力机制**:Transformer 架构的核心组件...

由此可见:

  1. query_expansion_node 的成功在于其弥合了语义差距的能力。它生成的假设性文档为目标子问题引入了缺失的关键技术术语(如 Mixture of ExpertsFlashAttentionscalingoptimization),从而显著提升了检索的精准度,成功获取了关键文档。
  2. 检索结果的对比证实了召回率的改善。高级系统成功定位了三篇核心文档,为后续的生成器提供了更完整、技术更准确、质量更高的上下文信息。

Sharded & Scattered Retrieval

当知识库的规模从数千扩展到数百万乃至数十亿文档时……

单一的巨型向量库会迅速成为系统瓶颈:检索延迟显著增加,索引的维护与更新也变得异常笨重。

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

分片/散射检索(Created by Fareed Khan)

分片与散射检索 是一种架构层面的解决方案。其核心思想是,依据特定逻辑(如主题、时间、来源等)将庞大的知识库分片为多个小型、独立的向量库。当接收到查询请求时,中央协调器将查询散射到所有分片进行并行检索,随后聚合各分片返回的结果并进行重排序,最终得到全局最优的文档集合。

我们通过构建一个包含两个分片(工程域与营销域)的模拟系统,并与单一知识库方案进行对比,来量化其在延迟答案质量上的收益。

首先,创建两个知识分片:一组工程领域文档,一组营销领域文档。

from langchain_core.documents import Document

# 模拟“工程”知识库分片的文档列表
eng_docs = [
    Document(page_content="The QuantumLeap V3 processor utilizes a 3nm process node and features a dedicated AI accelerator core with 128 tensor units. API endpoint `/api/v3/status` provides real-time thermal throttling data.", metadata={"source": "eng-kb"}),
    Document(page_content="Firmware update v2.1 for the Aura Smart Ring optimizes the photoplethysmography (PPG) sensor algorithm for more accurate sleep stage detection. The update is deployed via the mobile app.", metadata={"source": "eng-kb"}),
    Document(page_content="The Smart Mug's heating element is a nickel-chromium coil controlled by a PID controller. It maintains temperature within +/- 1 degree Celsius. Battery polling is done via the `getBattery` function.", metadata={"source": "eng-kb"})
]
# 模拟“营销”知识库分片的文档列表
mkt_docs = [
    Document(page_content="Press Release: Unveiling the QuantumLeap V3, the AI processor that redefines speed. 'It's a game-changer for creative professionals,' says CEO Jane Doe. Available Q4.", metadata={"source": "mkt-kb"}),
    Document(page_content="Product Page: The Aura Smart Ring is your personal wellness companion. Crafted from aerospace-grade titanium, it empowers you to unlock your full potential by understanding your body's signals.", metadata={"source": "mkt-kb"}),
    Document(page_content="Blog Post: 'Five Ways Our Smart Mug Supercharges Your Morning Routine.' The perfect temperature, from the first sip to the last, means your coffee is always perfect.", metadata={"source": "mkt-kb"})
]

我们有意将知识领域划分得非常清晰:QuantumLeap V3 的技术规格仅存在于 eng_docs,而其市场定位信息仅存在于 mkt_docs。这有助于测试系统是否能从两个独立的来源中正确检索信息。

接下来,为这些文档创建两个独立的向量分片:

from langchain_community.vectorstores import FAISS

# 创建两个独立、互不干扰的 FAISS 向量存储
eng_vectorstore = FAISS.from_documents(eng_docs, embedding=embeddings)
mkt_vectorstore = FAISS.from_documents(mkt_docs, embedding=embeddings)

# 为每个分片创建对应的检索器
eng_retriever = eng_vectorstore.as_retriever(search_kwargs={"k": 2})
mkt_retriever = mkt_vectorstore.as_retriever(search_kwargs={"k": 2})
print(f"知识库分片创建完成:工程知识库 ({len(eng_docs)} 篇文档),营销知识库 ({len(mkt_docs)} 篇文档)。")

至此,我们拥有了两套检索器:eng_retrievermkt_retriever,每个检索器仅在其对应的小型索引内运行。

构建分片式 RAG 系统的核心在于并行地将查询散射到所有分片,并聚合结果。

from typing import TypedDict, List
from concurrent.futures import ThreadPoolExecutor
import time

class ShardedRAGState(TypedDict):
    question: str
    retrieved_docs: List[Document]
    final_answer: str

def parallel_retrieval_node(state: ShardedRAGState):
    """模式的核心:并行地将查询散射到所有分片,并收集结果。"""
    print("--- [元检索器] 并行散射查询至工程与营销分片... ---")

    # 使用 ThreadPoolExecutor 并发执行两个检索任务
    with ThreadPoolExecutor(max_workers=2) as executor:
        # 定义一个辅助函数,为每个分片检索模拟一个延迟,模拟真实场景下搜索较小索引所需的时间
        def p_retrieval(retriever):
            time.sleep(0.5)
            return retriever.invoke(state['question'])

        # 将两个检索任务提交给执行器
        futures = [executor.submit(p_retrieval, retriever) for retriever in [eng_retriever, mkt_retriever]]

        all_docs = []
        for future in futures:
            all_docs.extend(future.result())

    # “聚合”步骤:融合并去重来自所有分片的结果。在实际系统中,此处通常会进行更复杂的重排序。
    unique_docs = list({doc.page_content: doc for doc in all_docs}.values())
    print(f"--- [元检索器] 从 2 个分片聚合了 {len(unique_docs)} 篇唯一文档。 ---")
    return {"retrieved_docs": unique_docs}

# 组装包含生成节点的完整工作流图
from langgraph.graph import StateGraph, END

workflow = StateGraph(ShardedRAGState)
workflow.add_node("parallel_retrieval", parallel_retrieval_node)
workflow.add_node("generate_answer", generation_node)
workflow.set_entry_point("parallel_retrieval")
workflow.add_edge("parallel_retrieval", "generate_answer")
workflow.add_edge("generate_answer", END)
sharded_rag_app = workflow.compile()

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

散射检索(Created by Fareed Khan)

在此实现中,ThreadPoolExecutor 负责执行散射操作:同时向 eng_retrievermkt_retriever 发送查询。随后,通过收集 future.result() 并执行去重来完成聚合,确保从分布式知识库中获得全面且统一的上下文信息。

最后,我们通过一个同时涉及两个分片知识的问题,来对比单库检索与分片检索的表现差异。

# 查询包含强营销关键词('game-changer')和一个具体的技术问题('API status endpoint')。
user_query = "I heard the new QuantumLeap V3 is a 'game-changer for creative professionals'. Can you tell me more about it, and is there an API endpoint to check its status?"

# --- 运行单库 RAG ---
print("--- [MONOLITHIC RAG] Starting run... ---")
start_time = time.time()

monolithic_answer = monolithic_rag_chain.invoke(user_query)
monolithic_time = time.time() - start_time

# --- 运行分片 RAG ---
print("n--- [SHARDED RAG] Starting run... ---")
start_time = time.time()
inputs = {"question": user_query}

sharded_time = time.time() - start_time

# --- 最终分析 ---
print("n" + "="*60)
print("                     ACCURACY & RECALL ANALYSIS")
print("="*60 + "n")


print("="*60)
print("                     PERFORMANCE ANALYSIS")
print("="*60 + "n")
print(f"Monolithic RAG Total Time: {monolithic_time:.2f} seconds")
print(f"Sharded RAG Total Time: {sharded_time:.2f} secondsn")
latency_improvement = ((monolithic_time - sharded_time) / monolithic_time) * 100
print(f"Latency Improvement: {latency_improvement:.0f}%n")

输出结果如下:

#### OUTPUT ####
============================================================
                     ACCURACY & RECALL ANALYSIS
============================================================

**Monolithic System:** Retrieved 3 documents. While it found the two correct documents, it also retrieved an irrelevant document about the 'Aura Smart Ring'. The strong semantic similarity of 'empowers you to unlock your full potential' to 'game-changer for creative professionals' pulled in this unrelated document. This noise can degrade the quality of the final answer.
**Sharded System:** Retrieved 2 documents. The parallel search was more precise. The marketing shard found the press release, and the engineering shard found the technical specs. It correctly ignored all irrelevant documents from other product lines. This resulted in a cleaner, more focused context for the generator.
**Conclusion:** The sharded architecture improved retrieval precision by isolating knowledge domains. This prevents context pollution from irrelevant but semantically similar documents, leading to a more accurate and trustworthy final answer.

============================================================
                     PERFORMANCE ANALYSIS
============================================================
Monolithic RAG Total Time: 6.89 seconds
Sharded RAG Total Time: 4.95 seconds
Latency Improvement: 28%

最终分析揭示了分片与分散检索的两大核心优势:

  1. 精准检索:查询中的营销内容在营销分片中准确命中,技术问题在工程分片中命中。单一索引暴露于全部语料,容易被语义相近的无关文本干扰,导致上下文质量下降。
  2. 性能提升:分片系统快 28%,因为并行查询在更小的领域索引上执行。这种设计具有良好的可扩展性:单库检索会随语料库增长而变慢,而分片系统的延迟主要受最大分片大小的影响。

并行混合搜索融合:获取高保真上下文

向量搜索 擅长理解查询的语义和概念,但可能遗漏包含特定关键词的文档。关键词搜索 善于匹配精确词汇,但无法理解概念间的关联。

并行混合搜索融合 结合了二者的优势:同时执行向量检索与关键词检索,然后将它们各自的结果“融合”为一个合并的结果集。

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

混合检索示意图 (Created by Fareed Khan)

这对于处理混合文本场景(如叙述性文本中包含特定产品代码、错误码或法律条款编号)的 RAG 系统尤为重要。

我们将对比三套系统:纯向量、纯关键词、混合检索,展示混合检索如何获取更高保真度的上下文,从而生成更完整、准确的最终答案。

首先构建两种检索器:向量检索器与关键词检索器。

from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings

# 创建标准的 FAISS 向量库用于语义搜索。
vector_store = FAISS.from_documents(kb_docs, embedding=embeddings)
vector_retriever = vector_store.as_retriever(search_kwargs={"k": 2})

这是标准的语义搜索引擎。

接下来,使用 scikit-learnTfidfVectorizer 从头实现一个经典的关键词检索器:

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from langchain_core.retrievers import BaseRetriever
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from typing import List
from langchain_core.documents import Document

class TfidfRetriever(BaseRetriever):
    """A custom LangChain retriever that uses TF-IDF for keyword-based search."""
    # 存储已拟合的向量化器和原始文档。
    vectorizer: TfidfVectorizer
    docs: List[Document]
    k: int = 2
    class Config:
        arbitrary_types_allowed = True
    def _get_relevant_documents(self, query: str, *, run_manager: CallbackManagerForRetrieverRun) -> List[Document]:
        # 将查询转换为 TF-IDF 向量。
        query_vec = self.vectorizer.transform([query])
        # 获取所有文档的预计算 TF-IDF 向量。
        doc_vectors = self.vectorizer.transform([doc.page_content for doc in self.docs])

        # 计算查询与所有文档之间的余弦相似度。
        similarities = cosine_similarity(query_vec, doc_vectors).flatten()

        # 获取相似度最高的前 'k' 个文档的索引。
        top_k_indices = np.argsort(similarities)[-self.k:][::-1]

        # 返回对应的 Document 对象。
        return [self.docs[i] for i in top_k_indices]

# 在知识库内容上拟合 TF-IDF 向量化器。
vectorizer = TfidfVectorizer().fit([doc.page_content for doc in kb_docs])

# 创建自定义检索器的实例。
keyword_retriever = TfidfRetriever(vectorizer=vectorizer, docs=kb_docs, k=2)

`TfidfRetriever` 是一个基于词频的检索器,它不依赖于语义理解,而是通过精确匹配查询中的字词来查找文档。这种词法检索方式尤其擅长定位包含特定、稀有或专业术语的文档。

接下来,我们构建一个混合检索增强生成(Hybrid RAG)系统。其核心在于一个并行检索节点,该节点同时运行向量检索器和关键词检索器,并将两者的结果融合。

```python
from langgraph.graph import StateGraph, END
from typing import TypedDict, List

class HybridRAGState(TypedDict):
    question: str
    retrieved_docs: List[Document]
    final_answer: str

def parallel_retrieval_node(state: HybridRAGState):
    """模式核心:并行运行向量检索和关键词检索,并融合结果。"""
    print("--- [Hybrid Retriever] Running Vector and Keyword searches in parallel... ---")

    # 使用相同的问题调用两个独立的检索器。在实际的多线程系统中,这两个调用可以并发执行。
    vector_docs = vector_retriever.invoke(state['question'])
    keyword_docs = keyword_retriever.invoke(state['question'])

    # “融合”步骤:合并两个文档列表并进行去重。
    all_docs = vector_docs + keyword_docs
    unique_docs = list({doc.page_content: doc for doc in all_docs}.values())

    print(f"--- [Hybrid Retriever] Fused results: Found {len(unique_docs)} unique documents. ---")
    return {"retrieved_docs": unique_docs}

# 组装包含生成节点的完整工作流图
workflow = StateGraph(HybridRAGState)
workflow.add_node("parallel_retrieval", parallel_retrieval_node)
workflow.add_node("generate_answer", generation_node)
workflow.set_entry_point("parallel_retrieval")
workflow.add_edge("parallel_retrieval", "generate_answer")
workflow.add_edge("generate_answer", END)
hybrid_rag_app = workflow.compile()

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

并行混合检索(Created by Fareed Khan)

parallel_retrieval_node 函数同时调用 vector_retrieverkeyword_retriever。融合过程采用简单的去重策略:如果两个检索器返回了同一份文档,则只保留一份;如果返回的是不同文档,则全部保留。这样,最终形成的上下文既包含了与问题概念相似的文档,也包含了关键词精确匹配的文档。

最后,我们通过一个设计好的查询进行对比测试。这个查询同时包含语义概念(“power saving efforts”)和一个非常稀有的关键词(ERR_THROTTLE_900),旨在让单一类型的检索器失效,从而完整验证混合检索的优势。

# 查询同时包含概念性部分和具体的关键词部分。
user_query = "What are our company's power saving efforts, and what is the error code for QLeap-V4 overheating?"

# --- 运行仅向量检索的RAG ---
vector_answer = rag_chain_vector.invoke(user_query)
# --- 运行仅关键词检索的RAG ---
keyword_answer = rag_chain_keyword.invoke(user_query)
# --- 运行混合检索的RAG ---
hybrid_answer = hybrid_result['final_answer']

# --- 最终分析 ---
print("n" + "="*60)
print("                     ACCURACY & QUALITY ANALYSIS")
print("="*60 + "n")
print("用户目标:用户提出了两个独立的问题:1. 我们的节能措施是什么?(语义问题) 2. 过热的错误代码是什么?(具体/词法问题)。n")
print("-" * 60)
print("仅向量检索RAG表现:")
print("- 结果:未能完整回答。")
print("- 最终答案:", vector_answer)

print("仅关键词检索RAG表现:")
print("- 结果:未能完整回答。")
print("- 最终答案:", keyword_answer)

print("-" * 60)
print("混合检索RAG表现:")
print("- 最终答案:", hybrid_answer)

#### OUTPUT ####
============================================================
                        ACCURACY & QUALITY ANALYSIS
============================================================

**用户目标**:用户提出了两个独立的问题:1. 我们的节能举措是什么?(语义问题);2. 过热对应的错误代码是什么?(具体/词汇问题)。

**仅向量检索 (Vector-Only RAG) 表现**:
- **结果**:未能完整回答问题。
- **最终答案**:根据提供的上下文,公司的节能举措是一项名为“泰坦计划”的倡议,专注于开发节能硬件以降低数据中心功耗,是绿色计算战略的一部分。上下文中不包含 QLeap-V4 过热错误代码的信息。
- **原因**:它在处理语义部分表现出色,通过将“节能举措”的概念与“节能硬件”匹配,找到了关于“泰坦计划”的文档。然而,具体的错误代码“ERR_THROTTLE_900”在语义上与查询不够接近,因此未被检索到。代理正确地声明了无法找到答案。

**仅关键词检索 (Keyword-Only RAG) 表现**:
- **结果**:未能完整回答问题。
- **最终答案**:根据上下文,QLeap-V4 过热的错误代码是“ERR_THROTTLE_900”。“泰坦计划”是一项旨在降低功耗的倡议。
- **原因**:它在处理词汇部分表现出色,完美地将查询中的关键词“ERR_THROTTLE_900”与包含该代码的文档匹配。然而,它遗漏了第二个关于“泰坦计划”是“绿色计算战略”一部分的更概念性的文档,因为关键词重叠度不高。

**混合检索 (Hybrid Search RAG) 表现**:
- **结果**:成功。准确回答了问题的两个部分。
- **最终答案**:我们公司的节能举措名为“泰坦计划”,这是我们绿色计算战略的核心部分,旨在开发节能硬件以降低数据中心功耗。QLeap-V4 过热的官方错误代码是“ERR_THROTTLE_900”。
- **原因**:并行执行与融合步骤结合了两者的优势。向量搜索贡献了关于“泰坦计划”的两个文档,关键词搜索贡献了包含具体错误代码的文档。通过将这些独特的结果合并到一个丰富的上下文中,生成器获得了构建完整、正确答案所需的全部信息。

结论非常明确:
*   **仅向量检索**与**仅关键词检索**都给出了不完整的答案:一个只擅长概念匹配,一个只擅长精确词汇匹配。
*   **混合检索**同时捕获了关于“节能项目”的语义文档和关于“过热错误码”的精确词汇文档,通过并行的“散射-聚合”模式将两类独特发现拼接成完整上下文,使生成器能够轻松给出全面正确的答案。

## 并行上下文预处理以提升准确性

前述的 RAG 模式主要聚焦于提升检索阶段(召回更多正确文档)。**并行上下文预处理** 则关注检索之后。当我们为了提高召回率而将 `k` 值设置得很大(例如 10+)时,会面临一个新问题:

> 若直接将这组噪声较大的文档全部塞入生成器(LLM)的上下文窗口,会导致处理速度慢、成本高(token 消耗大),甚至可能因噪声淹没关键信息而降低答案准确性(即“迷失在中间”问题)。

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能
并行上下文预处理(Created by Fareed Khan) 一个架构性的解决方案是引入一个中间的“蒸馏”步骤:在获取大量候选文档后,使用多个并行的小型 LLM 调用,逐个检查文档与问题的“相关性”。只有通过检查的文档才会被保留在最终的“蒸馏后上下文”中,再交给主生成器。 我们将对比两种 RAG 流程:一种使用大而嘈杂的上下文;另一种采用并行预处理,以展示可量化的改进。 首先定义用于蒸馏/过滤代理的 Pydantic 模式: ```python from langchain_core.pydantic_v1 import BaseModel, Field class RelevancyCheck(BaseModel): """用于蒸馏/过滤代理结构化输出的 Pydantic 模型。""" # 对文档相关性的明确二元判定。 is_relevant: bool = Field(description="如果文档包含直接有助于回答问题的信息,则为 True。") # 判定理由的简要说明。 brief_explanation: str = Field(description="解释文档为何相关或不相关的一句话说明。")

RelevancyCheck 契约确保每次并行调用只输出一个 is_relevant 布尔值,这使得调用快速且可靠;brief_explanation 字段则便于调试和理解文档被纳入或排除的原因。

定义 GraphState 与核心节点 distill_context_node,负责并行预处理:

from typing import TypedDict, List
from langchain_core.documents import Document
from concurrent.futures import ThreadPoolExecutor, as_completed

class RAGGraphState(TypedDict):
    question: str
    raw_docs: List[Document]
    distilled_docs: List[Document]
    final_answer: str

# 为每个文档并行运行的链
distiller_prompt = ChatPromptTemplate.from_template(
    "Given the user's question, determine if the following document is relevant for answering it. "
    "Provide a brief explanation.nn"
    "Question: {question}nnDocument:n{document}"
)

distiller_chain = distiller_prompt | llm.with_structured_output(RelevancyCheck)

def distill_context_node(state: RAGGraphState):
    """模式核心:并行扫描所有检索到的文档以过滤相关性。"""
    print(f"--- [Distiller] 正在并行预处理 {len(state['raw_docs'])} 个原始文档... ---")

    relevant_docs = []
    # 使用 ThreadPoolExecutor 在每个文档上并发运行 'distiller_chain'
    with ThreadPoolExecutor(max_workers=5) as executor:
        # 为每个待检查的文档创建一个 future
        future_to_doc = {executor.submit(distiller_chain.invoke, {"question": state['question'], "document": doc.page_content}): doc for doc in state['raw_docs']}
        for future in as_completed(future_to_doc):
            doc = future_to_doc[future]
            try:
                result = future.result()
                # 如果蒸馏代理将文档标记为相关,则保留它
                if result.is_relevant:
                    print(f"  - 文档 '{doc.metadata['source']}' 相关。原因: {result.brief_explanation}")
                    relevant_docs.append(doc)
                else:
                    # 否则,丢弃它
                    print(f"  - 文档 '{doc.metadata['source']}' 不相关。原因: {result.brief_explanation}")
            except Exception as e:
                print(f"处理文档 {doc.metadata['source']} 时出错: {e}")

    print(f"--- [Distiller] 上下文已精炼至 {len(relevant_docs)} 个文档。 ---")
    return {"distilled_docs": relevant_docs}

distill_context_node 充当“质量检查站”。在高召回率检索产生大量 raw_docs 后,它利用 ThreadPoolExecutor 将每个文档送至独立的小型 LLM 调用进行“相关性判定”,其处理时间近似由最慢的几个并行调用决定;最终仅保留 is_relevant 的文档,形成更小、更干净的 distilled_docs,供主生成器使用。

distill 节点插入“检索→生成”之间:

from langgraph.graph import StateGraph, END

workflow = StateGraph(RAGGraphState)

# 添加流水线的三个节点
workflow.add_node("retrieve", retrieval_node)
workflow.add_node("distill", distill_context_node)
workflow.add_node("generate", generation_node)

# 定义线性工作流:检索 -> 蒸馏 -> 生成
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "distill")
workflow.add_edge("distill", "generate")
workflow.add_edge("generate", END)
advanced_rag_app = workflow.compile()

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

并行上下文预处理(Created by Fareed Khan)

最后进行正面对比:在同一查询下,比较 Simple(大上下文)与 Advanced(蒸馏上下文)的准确性、延迟与 token 消耗。

import tiktoken

def count_tokens(text: str) -> int:
    """用于成本分析的辅助函数,用于统计 token 数量。"""
    encoding = tiktoken.get_encoding("cl100k_base")
    return len(encoding.encode(text))

# --- 分析设置 ---
context_tokens_simple = count_tokens(context_simple)
context_tokens_advanced = count_tokens(context_advanced)
token_improvement = (context_tokens_simple - context_tokens_advanced) / context_tokens_simple * 100
latency_improvement = (gen_time_simple - gen_time_advanced) / gen_time_simple * 100

# --- 打印结果 ---
print("="*60)
print("                  准确性与质量分析")
print("="*60 + "n")
print("**Simple RAG 的答案(来自庞大、嘈杂的上下文):**")
print(f'"{simple_answer}"n')
print("**Advanced RAG 的答案(来自精炼、聚焦的上下文):**")
print(f'"{advanced_answer}"n')

print("="*60)
print("                  延迟与成本(Token)分析")
print("="*60 + "n")
print("| 指标                          | Simple RAG (大上下文)      | Advanced RAG (精炼上下文)    | 改进幅度     |")
print("|-------------------------------|----------------------------|----------------------------------|-------------|")
print(f"| 上下文大小 (Tokens)           | {context_tokens_simple:<26} | {context_tokens_advanced:<32} | **-{token_improvement:.0f}%**      |")
print(f"| 最终生成时间                  | {gen_time_simple:<24.2f} 秒 | {gen_time_advanced:<32.2f} 秒 | **-{latency_improvement:.0f}%**      |")

#### OUTPUT ####
============================================================
                     ACCURACY & QUALITY ANALYSIS
============================================================

**Simple RAG Answer (from Large, Noisy Context):**
“Based on the context, a power supply unit of at least 1200W is recommended for the QLeap-V4 processor. The QLeap-V3 chip had a recommended power supply of 800W.”

**Advanced RAG Answer (from Distilled, Focused Context):**
“Based on the provided context, a power supply unit of at least 1200W is recommended for the QLeap-V4 processor.”

**Analysis:** The Simple RAG answer, while technically correct, includes irrelevant information about the previous-generation product (QLeap-V3). This occurs because the large, noisy context contains documents for both products. The Advanced RAG answer is **more accurate and precise**. The parallel distillation step correctly filters out the irrelevant QLeap-V3 document, providing a clean, focused context to the generator, which then produces a perfect, concise answer.
============================================================
                     LATENCY & COST (TOKEN) ANALYSIS
============================================================
| Metric                      | Simple RAG (Large Context) | Advanced RAG (Distilled Context) | Improvement |
|-----------------------------|----------------------------|----------------------------------|-------------|
| Context Size (Tokens)       | 284                        | 29                               | **-90%**    |
| Final Generation Time       | 7.89 seconds               | 2.15 seconds                     | **-73%**    |

数据驱动的结论清晰表明,并行上下文预处理带来了三重显著提升:

1.  **更高准确度**:高级系统输出更精准、聚焦。蒸馏步骤过滤掉旧型号文档,防止生成器混杂无关信息,直接提升答案质量。
2.  **更低成本**:Token 消耗大幅降低 **90%**。在海量请求的生产环境中,这将显著降低 LLM 推理成本。
3.  **更低延迟**:上下文大幅缩小直接减少了最终生成步骤的时间,**加速 73%**。尽管蒸馏本身有开销,但通常被最终长文本生成环节的大幅节省所抵消,整体耗时更短。

## Multi-Hop Retrieval for Deep Reasoning

许多复杂的用户查询并非单一问题,而是需要从多个不同文档中综合信息的多步骤研究任务。

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能
并行 Multi-Hop(Created by Fareed Khan) **并行多跳检索** 将 RAG 提升为真正的研究智能体。其流程模拟人类研究者的工作方式: 1. **分解**:高层 **元智能体** 首先分析复杂查询,将其拆解为多个更简单、相互独立的子问题。 2. **分发(并行检索)**:每个子问题被分配给一个独立的 **检索智能体** 执行标准 RAG 检索。所有智能体并行运行,各自寻找答案。 3. **汇总与综合**:元智能体收集所有子问题的答案,最终综合成对原始复杂问题的完整回答。 我们将对比简单 RAG 与多跳 RAG:给定一个“无法单跳回答”的比较性问题,证明只有多跳检索能够收集到充分证据,产出**准确且有洞见**的答案。 首先,为分解步骤定义 Pydantic 模式,以结构化元智能体的规划输出。 ```python from langchain_core.pydantic_v1 import BaseModel, Field from typing import List class SubQuestions(BaseModel): """A Pydantic model for the output of the Decomposer agent, holding a list of independent sub-questions.""" questions: List[str] = Field(description="A list of 2-3 simple, self-contained questions that, when answered together, will fully address the original complex query.")

SubQuestions 模式强制 LLM 将复杂问题拆解为简单、可独立回答的小问题,这是实现并行“分而治之”策略的基础。

接下来,使用 LangGraph 构建多跳系统。第一个节点是 分解器(即元智能体的规划模块):

from typing import TypedDict, List, Dict, Annotated
import operator

class MultiHopRAGState(TypedDict):
    original_question: str
    sub_questions: List[str]
    # This dictionary will store the answer to each sub-question, with the question as the key.
    sub_question_answers: Annotated[Dict[str, str], operator.update]
    final_answer: str

# Node 1: Decomposer (The Meta-Agent's first step)
decomposer_prompt = ChatPromptTemplate.from_template(
    "You are a query decomposition expert. Your job is to break down a complex question into simple, independent sub-questions that can be answered by a retrieval system. "
    "Do not try to answer the questions yourself.nn"
    "Question: {question}"
)

decomposer_chain = decomposer_prompt | llm.with_structured_output(SubQuestions)

def decomposer_node(state: MultiHopRAGState):
    """Takes the original complex question and decomposes it into a list of sub-questions."""
    print("--- [Meta-Agent] Decomposing complex question... ---")
    result = decomposer_chain.invoke({"question": state['original_question']})
    print(f"--- [Meta-Agent] Generated {len(result.questions)} sub-questions. ---")
    return {"sub_questions": result.questions}

decomposer_node 充当研究智能体的战略大脑。它不直接回答问题,而是专注于对复杂查询进行合理的拆解规划。

接下来编排并行执行:针对每个分解出的子问题,运行一次标准的 RAG 链。

from concurrent.futures import ThreadPoolExecutor, as_completed

# 这是一个标准的、自包含的 RAG 链,将作为并行检索智能体的“引擎”。
sub_question_rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | generator_prompt
    | llm
    | StrOutputParser()
)

def retrieval_agent_node(state: MultiHopRAGState):
    """第二个节点:并行地为每个子问题运行完整的 RAG 流程。"""
    print(f"--- [Retrieval Agents] 并行回答 {len(state['sub_questions'])} 个子问题... ---")

    answers = {}
    # 使用 ThreadPoolExecutor 并发地在每个子问题上运行 `sub_question_rag_chain`。
    with ThreadPoolExecutor(max_workers=len(state['sub_questions'])) as executor:
        # 为每个待回答的子问题创建一个 future 对象。
        future_to_question = {executor.submit(sub_question_rag_chain.invoke, q): q for q in state['sub_questions']}
        for future in as_completed(future_to_question):
            question = future_to_question[future]
            try:
                answer = future.result()
                answers[question] = answer
                print(f"  - 已找到子问题答案:'{question}'")
            except Exception as e:
                answers[question] = f"回答问题出错:{e}"
    # 将结果收集到 `sub_question_answers` 字典中。
    return {"sub_question_answers": answers}

retrieval_agent_node 是并行“分散-收集”模式的核心引擎:它使用 ThreadPoolExecutorsub_questions 并行送入各自的 RAG 链,完成后将结果聚合到 sub_question_answers 中。

最后是 综合器,作为元智能体的最终步骤,将并行结果合并成完整回答:

# 节点 3:综合器(元智能体的最终步骤)
synthesizer_prompt = ChatPromptTemplate.from_template(
    "你是一位综合专家。你的任务是将多个子问题的答案合并成一个针对用户原始复杂问题的、连贯且全面的单一答案。nn"
    "原始问题:{original_question}nn"
    "子问题答案:n{sub_question_answers}"
)

synthesizer_chain = synthesizer_prompt | llm | StrOutputParser()

def synthesizer_node(state: MultiHopRAGState):
    """接收子问题的答案,并将其综合成最终、全面的答案。"""
    print("--- [Meta-Agent] 正在综合最终答案... ---")

    # 为最终提示格式化收集到的子问题答案。
    sub_answers_str = "n".join([f"- Q: {q}n- A: {a}" for q, a in state['sub_question_answers'].items()])

    final_answer = synthesizer_chain.invoke({
        "original_question": state['original_question'],
        "sub_question_answers": sub_answers_str
    })
    return {"final_answer": final_answer}

synthesizer_node 执行关键的最终推理,不再进行检索,而是将已消化的信息编织成对原始复杂问题的完整回答。

将整个流程连接起来:分解 → 并行检索 → 综合。

from langgraph.graph import StateGraph, END

workflow = StateGraph(MultiHopRAGState)
workflow.add_node("decompose", decomposer_node)
workflow.add_node("retrieve_in_parallel", retrieval_agent_node)
workflow.add_node("synthesize", synthesizer_node)

workflow.set_entry_point("decompose")

workflow.add_edge("decompose", "retrieve_in_parallel")
workflow.add_edge("retrieve_in_parallel", "synthesize")
workflow.add_edge("synthesize", END)
multi_hop_rag_app = workflow.compile()

解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

并行多跳检索(Created by Fareed Khan)

最后进行直接对比:给出一个必须跨文档比较的问题,观察两个系统的表现。

“`python

该查询要求比较两个产品,其信息位于独立且不重叠的文档中。

user_query = “比较 QLeap-V4 和 Eco-AI-M2,重点关注它们的目标用例和功耗。”

— 运行简单 RAG —

print(“=”60)
print(” 简单 RAG 系统输出”)
print(“=”
60 + “n”)
print(f”最终答案:n{simple_answer}”)

— 运行多跳 RAG —

print(“n” + “=”60)
print(” 多跳 RAG 系统输出”)
print(“=”
60 + “n”)
print(“— 子问题答案 —“)
for i, (q, a) in enumerate(multi_hop_result[‘sub_question_answers’].items()):
print(f”{i+1}. Q: {q}n A: {a}”)
print(“n— 最终综合答案 —“)
print(multi_hop_result[‘final_answer’])

— 最终分析 —

print(“n” + “=”60)
print(” 准确性与质量分析”)
print(“=”
60 + “n”)
print(“简单 RAG 表现:“)
print(“- 结果:完全失败。”)
print(“- 原因:用户查询包含两个产品的术语。向量搜索找到了与整个查询平均语义最相似的文档,但只检索到了关于 Eco-AI-M2 的文档,完全未能检索到任何关于 QLeap-V4 的信息。缺乏两个产品的必要上下文,无法进行比较。n”)
print(“多跳 RAG 表现:“)
print(“- 结果:完全成功。”)
print(“- 原因:系统的智能体现在初始的分解步骤。元智能体将复杂的比较查询分解为两个简单、聚焦的子问题:1. 获取产品 A 的信息。2. 获取产品 B 的信息。并行检索智能体能够轻松回答这些简单问题,各自检索到正确且聚焦的上下文。最终的综合器智能体随后接收到关于两个产品的完整、完美的事实集合,使得最终比较变得轻而易举。”)

OUTPUT

============================================================
SIMPLE RAG SYSTEM OUTPUT
============================================================

Final Answer:
Based on the provided context, the Eco-AI-M2 chip is designed for edge computing and mobile devices, with a primary feature of low power consumption at only 15W under full load. The context does not contain information about the QLeap-V4, so I cannot provide a comparison.

============================================================
MULTI-HOP RAG SYSTEM OUTPUT
============================================================
— Sub-Question Answers —
1. Q: What is the target use case and power consumption of the QLeap-V4?
A: The QLeap-V4 processor is designed for maximum performance in data centers, with a primary use case of large-scale AI model training. It consumes 1200W of power under full load.
2. Q: What is the target use case and power consumption of the Eco-AI-M2?
A: The Eco-AI-M2 chip is designed for edge computing and mobile devices like drones and smart cameras. Its key feature is low power consumption, drawing only 15W under full load.
— Final Synthesized Answer —
The QLeap-V4 and the Eco-AI-M2 are designed for very different purposes, primarily distinguished by their target use case and power consumption.
QLeap-V4: This is a high-performance processor intended for data centers. Its main use case is large-scale AI model training, and it has a high power consumption of 1200W.
Eco-AI-M2: This is a low-power chip designed for edge computing and mobile devices. Its focus is on energy efficiency, consuming only 15W, making it suitable for applications like drones and smart cameras.

结论清晰地表明,这并非简单的精度差异,而是系统能力层级的根本区别。

  • 单跳检索的局限:面对比较型查询时,它无法化解歧义,仅检索到单一产品的文档,因缺乏另一半关键信息而无法完成比较任务。
  • 多跳检索的成功逻辑:其核心在于不试图一次性回答复杂问题,而是主动拆解问题。它将复杂查询分解为两个独立的子问题,并行分发给相应的RAG智能体,确保收集到所有必要的证据。最终的综合与对比便水到渠成。

关注“鲸栖”小程序,掌握最新AI资讯

本文由鲸栖原创发布,未经许可,请勿转载。转载请注明出处:http://www.itsolotime.com/archives/13324

(0)
上一篇 2025年11月26日 下午1:44
下一篇 2025年11月27日 上午11:40

相关推荐

  • 揭秘NVIDIA GT200微架构:通过微基准测试发现未公开的存储层级与同步机制

    本文不仅验证了CUDA编程指南[1]中记录的部分硬件特性,还揭示了一系列未在文档中公开的硬件结构,例如_控制流机制、缓存与TLB层级_。此外,在某些场景下,我们的发现与文档描述的特性存在差异(例如纹理缓存和常量缓存的行为)。 本文的核心价值在于介绍了一套用于GPU架构分析的方法论。我们相信,这些方法对于分析其他类型的GPU架构以及验证类GPU性能模型都将有所…

    15小时前
    700
  • DeepSeek 本地化部署:打造专属智能助手

    本文详细介绍了如何在本地使用Ollama框架部署DeepSeek模型,涵盖硬件要求、安装步骤、界面搭建及注意事项,帮助用户打造安全私密的个人智能助手。

    2025年10月15日
    10600
  • 别再把 AI 当“自动补全”了:代码智能体真正的用法被忽视了

    写出更简洁、更聪明的 Python 函数 许多开发者,包括经验丰富的老手,在编写 Python 函数时都会不自觉地陷入一些常见陷阱。这些做法短期内或许不会引发问题,但随着代码库的增长,它们会导致代码变得难以维护、效率低下。 如果你对 Python 函数的理解还停留在“能跑就行”,现在是时候升级你的认知了。了解这些常见误区并采用最佳实践,能让你的代码焕然一新。…

    2025年11月10日
    500
  • 探秘AI智能体设计模式:从ReAct到LATS,深入剖析智能体的“大脑”构建术

    AI智能体的设计模式围绕效率与灵活性展开:基础模式ReAct通过”思考-行动-观察”循环实现环境交互;Plan & Execute、ReWOO和LLM Compiler通过预规划和并行执行优化效率;反思架构赋予智能体自我改进能力;LATS实现多路径智能决策。这些模式为不同应用场景提供了关键设计思路。

    2025年10月14日
    15300
  • DeepSeek OCR:颠覆传统,用视觉压缩破解AI扩展的“十亿美元级”文档处理难题

    Part I: 文本的“隐形重量” 我们通常认为文本是“轻”的:易于存储、传输和计算。但在大语言模型时代,文本变得非常“重”。 处理一张发票的PDF扫描件,就可能消耗1,000至5,000个tokens。将这个数量级乘以企业日志、法律合同、监管文件和数字化档案,总token量将变得极其庞大——其中大部分是冗余、昂贵且处理缓慢的。虽然OpenAI的GPT-4-…

    2025年10月31日
    400

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注