LangChain系统性能深度对比:Python与Go在AI应用中的实战评测

LangChain 系统性能深度对比:Python 与 Go 在 AI 应用中的实战评测

Lovable,这家高速增长的 AI 公司,今年将其后端从 Python 迁移到 Go,将服务器使用和部署时间从约 15 分钟降至约 3 分钟。Go 正日益被视为现代 AI 系统的理想语言,从高吞吐的 RAG 管道到多工具的 Agent。

但若要真正评估 Go 在生产级 AI 系统中的表现,必须在真实负载下测试,因为数据摄取、向量嵌入、检索和流程编排各自都有其性能上限。

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测 Go vs Python Benchmarking(作者 Fareed Khan)

这些组件都是潜在瓶颈,编程语言的选择可能决定一个系统是停留在迟缓的原型,还是成长为可扩展、可生产的高可用平台。我们将进行全面测试,重点覆盖以下五大领域:

  1. 核心 LLM 交互:单次、原始模型调用有多快?这是所有功能的基石,任何框架在这里引入的开销都会在每个请求上重复支付。
  2. 数据摄取与检索:任何 RAG 系统的核心。我们将测量从原始文档切分到分块嵌入,再到压力下的检索的完整管道。
  3. Agent 推理与工具使用:Agent “思考”的成本是什么?我们将测试单步、多跳和高频工具调用循环的开销。
  4. 并发与可扩展性:当 50 个用户同时访问服务时会发生什么?这测试了每种语言在不崩溃的情况下处理真实并行负载的能力。
  5. 运营效率与韧性:隐藏成本是什么?我们将衡量长时间运行的 Agent 的内存占用,以及加入必要可观测性后带来的性能损耗。

所有代码均在我的 GitHub 仓库:

GitHub – FareedKhan-dev/langchain-go-vs-python: Benchmarking RAG and agentic systems in Go vs…

代码库组织如下:

langchain-go-vs-python/
├── agent_multi_hop/
│ ├── agent.go
│ └── agent.py
├── concurrent_rag/
│ ├── rag.go
│ └── rag.py
├── data_embedding_batching/
│ ├── embedding.go
│ └── embedding.py
...
├── gpu_saturation/
│ ├── saturation.go
│ └── saturation.py
├── ingestion_throughput/
│ ├── ingestion.go
│ └── ingestion.py
...
├── resiliency_parsing_failure/
│ ├── parsing.go
│ └── parsing.py
└── workflow_transformation/
├── transformation.go
└── transformation.py

目录

  • 环境搭建
  • 第一部分:核心 LLM 性能基准测试
    • 单轮延迟测试
    • Time-to-First-Token(TTFT)流式测试
  • 第二部分:生产级 RAG 管道测试
    • 大文档切分:CPU 受限任务
    • 批量 Embedding:处理吞吐
    • 完整摄取管道
    • 检索延迟:信息能多快被找到?
    • 端到端 RAG:整合验证
  • 第三部分:Agentic 架构评估
    • 简单 Agent:单次工具调用
    • 多跳 Agent:串联多次工具调用
    • 高频工具使用:测试 Agent 开销
    • 工作流逻辑:数据转换与路由
  • 第四部分:生产就绪的压力测试
    • 并发 RAG 与 Agent 系统:应对真实负载
    • GPU 饱和:将模型服务推向极限
    • 处理超时、工具失败与解析错误
  • 第五部分:衡量真实运营成本
    • 内存占用:长会话对话
    • 可观测性开销:Tracing 的成本

环境搭建

开始基准测试前,我们需要先搭建测试环境。为公平比较 Go 与 Python,二者必须使用完全相同的后端服务。

我们的环境由三部分组成:

  1. 本地 LLM Server(Ollama):用于运行我们的 llama3:8b 模型。
  2. 本地向量数据库(ChromaDB):为 RAG 管道存储和管理向量嵌入。
  3. Go 与 Python 项目代码:我们将编写并运行的自定义 Go 与 Python 脚本。

首先访问 Ollama 官网,下载适合你操作系统(macOS、Linux 或 Windows)的应用。安装过程非常简单,一键即可。

Ollama 启动后,打开终端拉取 llama3:8b 模型。该模型非常适合本地测试,速度与推理能力均衡。

“`

下载 8B llama(你也可以选择 Ollama 支持的其他模型)

ollama pull llama3:8b
“`

为确认一切正常,可直接在终端运行模型:

“`

测试 Ollama 服务器是否正常运行

ollama run llama3:8b “Why is the sky blue?”

输出示例

The color of sky is ….
“`

你应能看到模型开始生成回复。对于 RAG 管道基准测试,我们需要一个向量数据库来存储文档嵌入。我们使用 ChromaDB,你也可以选用其他数据库,但本文使用它做本地嵌入存储。最简单的方式是用 Docker 运行。

“`yaml
version: ‘3.10’

services:
chroma:
image: chromadb/chroma
ports:
– “8000:8000”
volumes:
– chroma_data:/chroma/.chroma/

volumes:
chroma_data:
“`

该配置执行两项核心操作:拉取官方的 chromadb/chroma 镜像并启动容器,使其在 localhost:8000 上可用;同时创建一个名为 chroma_data 的 Docker 卷,以确保向量数据库的数据在容器停止或重启后得以持久化。

启动服务时,在终端中进入克隆的仓库根目录并运行:

bash
docker-compose up -d

此外,需要在本地安装 Go 环境,可从其官方网站下载安装。

项目中的每个基准测试目录(如 agent_single_toolconcurrent_rag)都是一个独立的 Go 模块。每个目录下都有一个 go.mod 文件,它是 Go 模块的核心配置文件,其作用类似于 Python 的 requirements.txt 或 Node.js 的 package.json

以下是一个典型的 go.mod 文件示例:

“`go
module agent_single_tool

go 1.22.0
“`

该文件仅声明了模块名称和所需的 Go 版本。依赖管理通过 go mod tidy 命令完成。

在 Python 生态中,LangChain、LangGraph 和 LangSmith 已形成一个成熟的技术栈。而在 Go 生态中,相关开发仍处于早期阶段。目前已有 LangChainGo 的实现可作为模块使用,这使我们无需从零开始构建所有功能,可以在必要时自行编码,其余部分则借助该实现来构建 Go 版本的解决方案。


第一部分:核心 LLM 性能基准测试

在构建复杂的 RAG 管道或智能体系统之前,必须从最基础的环节开始评估:与大型语言模型进行单次交互的原始速度。

所有高级功能都建立在这一核心操作之上。如果此环节存在延迟,整个系统的性能都会受到影响。

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测

本部分将测量两个最基本的性能指标:
1. 获取完整回答所需的总时间(延迟)。
2. 模型开始生成第一个词元的速度(Time-to-First-Token,TTFT)。

这些指标将为后续对比提供一个基准,清晰地展示不同框架在核心交互上引入的额外开销。

单轮延迟测试

与 LLM 最基础的交互形式是简单的单轮补全。这类似于向聊天机器人提问并等待其完整回复,也常用于分类、数据提取或简单问答等任务。

通过测量单轮延迟,可以量化各框架的原始开销:即有多少时间消耗在框架自身的逻辑处理上,而非实际的 LLM 推理过程。

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测

我们先编写 Go 版本。构建一个简单基准:只初始化一次 Ollama client,然后在循环中用相同 prompt 调用 LLM 200 次,测量每次耗时。

“`go
package main

import (
“context”
“fmt”
“log”
“time”
“github.com/tmc/langchaingo/llms”
“github.com/tmc/langchaingo/llms/ollama”
)

// — Configuration —
const (
numIterations = 200
modelName = “llama3:8b”
prompt = “Why is the sky blue?”
)

func main() {
fmt.Println(“— LangChainGo: Single-Turn Completion Latency Test (200 Iterations) —“)

// 1. Initialize the Ollama LLM client.
// This is created once and reused, which is standard practice.
llm, err := ollama.New(ollama.WithModel(modelName))
if err != nil {
    log.Fatalf("Failed to create Ollama client: %v", err)
}
latencies := make([]time.Duration, 0, numIterations)

ctx := context.Background()
// 2. Start the main benchmark loop.
for i := 0; i < numIterations; i++ {
    start := time.Now()

    // 3. Execute the LLM call. This is a blocking call.
    // The framework handles HTTP request creation, sending, and JSON parsing.
    _, err := llms.GenerateFromSinglePrompt(ctx, llm, prompt)
    if err != nil {
        log.Printf("Warning: Iteration %d failed: %vn", i+1, err)
        continue
    }
    latency := time.Since(start)
    latencies = append(latencies, latency)
    fmt.Printf("Iteration %d: %vn", i+1, latency)
}

// 4. After all iterations, calculate and display the final statistics.
fmt.Println("n--- LangChainGo Benchmark Results ---")
calculateAndPrintStats(latencies)

}
“`

main 函数中,我们首先在循环外初始化一次 ollama.New client,以模拟真实应用场景。

基准测试的核心是一个运行 200 次的 for 循环。每次迭代记录开始时间,调用 llms.GenerateFromSinglePrompt 执行 LLM 请求,然后计算延迟。

llms.GenerateFromSinglePrompt 函数封装了完整的请求-响应周期:
1. 框架构造 HTTP 请求;
2. 将请求发送给 Ollama 服务器;
3. 等待完整响应并进行解析。我们测量的正是这个过程的耗时。


接着看 Python 的等效实现。其结构与 Go 版本非常相似,我们使用 langchain_community 连接 Ollama,并用 numpy 进行统计分析。

“`python
import time
import numpy as np
from langchain_community.llms import Ollama

— Configuration —

NUM_ITERATIONS = 200
MODEL_NAME = “llama3:8b”
PROMPT = “Why is the sky blue?”

def main():
“””Main function to run the benchmark.”””
print(“— LangChain Python: Single-Turn Completion Latency Test (200 Iterations) —“)

# 1. Initialize the Ollama LLM client.
llm = Ollama(model=MODEL_NAME)
latencies = []

# 2. Start the main benchmark loop.
for i in range(NUM_ITERATIONS):
    start_time = time.perf_counter()

    # 3. Execute the LLM call using the modern `.invoke()` method.
    llm.invoke(PROMPT)

    end_time = time.perf_counter()
    latency = end_time - start_time
    latencies.append(latency)
    print(f"Iteration {i + 1}: {latency:.4f}s")

# 4. Calculate and display statistics.
print("n--- LangChain Python Benchmark Results ---")
calculate_and_print_stats(latencies)

“`

与 Go 版本一样,Python 的 main 函数也只初始化一次 Ollama client。循环中使用 .invoke() 方法进行标准的同步 LLM 调用,并使用 time.perf_counter() 进行高精度计时。

运行两种基准:

“`bash

运行 Go 版本

go run latency.go

运行 Python 版本

python latency.py
“`

LangChainGo 基准测试结果
– 总迭代次数:200
– 总耗时:199.85s
– 最小延迟:980.1ms
– 最大延迟:1.15s
– 平均延迟:999.2ms
– 标准差:28.5ms

LangChain Python 基准测试结果
– 总迭代次数:200
– 总耗时:238.1512s
– 最小延迟:1152.34ms
– 最大延迟:1.48s
– 平均延迟:1190.76ms
– 标准差:89.31ms

在 200 次请求下,性能差异显著。LangChainGo 表现更优,平均延迟为 999.2ms,而 Python 版本为 1190.76ms。

这意味着在最基础的操作上,Go 实现了约 19% 的性能提升。

标准差数据也表明 Go 的延迟波动更小(28.5ms vs 89.31ms)。

这主要源于 Go 以编译后的二进制文件运行,几乎没有启动或解释开销;而 Python 每次执行都会引入少量延迟,在多次迭代中累计效应明显。

Time-to-First-Token(TTFT)流式测试

在构建用户应用(如聊天机器人)时,总延迟并非唯一的衡量标准。

真正影响用户体验的是用户看到第一个响应内容的速度。

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测 TTFT(作者 Fareed Khan)

TTFT 度量的是从请求发出到第一个输出 token 出现的时间。TTFT 越低,应用给人的感觉就越“跟手”和响应迅速,即便生成完整回复仍需一定时间。

在 Go 实现中,我们利用并发原语对首次令牌时间进行精确测量。核心机制是创建一个带缓冲的 channel,在流式回调收到首块数据的瞬间发送信号。

“`go
func main() {
fmt.Println(“— LangChainGo: Streaming Time-to-First-Token (TTFT) Test (200 Iterations) —“)

llm, err := ollama.New(ollama.WithModel(modelName))
if err != nil {
log.Fatalf(“Failed to create Ollama client: %v”, err)
}

ttfts := make([]time.Duration, 0, numIterations)
ctx := context.Background()

fmt.Printf(“Running %d iterations with model ‘%s’…nn”, numIterations, modelName)

for i := 0; i < numIterations; i++ {
// firstTokenCh 是一个信号 channel,用于协程间高效通知事件发生。
// 缓冲区大小为 1 可防止发送协程阻塞。
firstTokenCh := make(chan struct{}, 1)

// wg 确保主循环在流式调用完全完成前不会退出。
var wg sync.WaitGroup
wg.Add(1)

start := time.Now()

// 流式回调函数,每收到一个数据块时被调用。
streamingFunc := func(ctx context.Context, chunk []byte) error {
// select 语句是 TTFT 测量的核心,它尝试向 channel 发送信号。
select {
case firstTokenCh <- struct{}{}:
// 此 case 仅在收到第一个数据块时执行。
// 后续调用将因 channel 已满而进入 default 分支。
default:
// 对后续数据块不执行任何操作,这是一个高效的空操作。
}
return nil
}

// 在单独的协程中运行 LLM 调用,使主线程能立即开始等待首个令牌信号。
go func() {
defer wg.Done()
_, err := llms.GenerateFromSinglePrompt(
ctx,
llm,
longPrompt,
llms.WithStreamingFunc(streamingFunc),
)
if err != nil {
log.Printf(“Warning: Goroutine for iteration %d failed: %vn”, i+1, err)
// 发生错误时,仍须解除主线程的阻塞以避免死锁。
select {
case firstTokenCh <- struct{}{}:
default:
}
}
}()

// 主线程在此阻塞,直到从 firstTokenCh 收到信号。
// 从 start 到此处的时间即为 TTFT。
<-firstTokenCh
ttft := time.Since(start)
ttfts = append(ttfts, ttft)
fmt.Printf(“Iteration %d: TTFT = %vn”, i+1, ttft)

// 等待整个流式输出完成后再开始下一次迭代。
// 这可以防止后续测试受到残留的后台进程影响。
wg.Wait()
}

fmt.Println(“n— LangChainGo Benchmark Results —“)
calculateAndPrintStats(ttfts)
}
“`

上述代码的关键在于 streamingFunc 回调函数。它被传入 LLM 调用,每收到一个流式数据块就执行一次。

函数内部的 select 语句尝试向 firstTokenCh 发送信号。由于该 channel 的缓冲区大小为 1,发送操作仅在首次收到数据块时成功。后续的调用会因 channel 已满而直接进入 default 分支,不执行任何操作。

与此同时,主循环在后台协程中发起 LLM 调用,然后立即在 <-firstTokenCh 处阻塞等待。当首个数据块到达且回调函数成功发送信号的瞬间,该行解除阻塞并计算耗时,所得结果即为 TTFT。我们还使用了 sync.WaitGroup 来确保在开始下一次迭代前,当前的流式输出已完整结束。

Python 版本通过迭代器实现相同目标。.stream() 方法返回一个迭代器,仅在调用 next() 时才会发起实际的网络请求。

“`python
def main():
“””运行流式传输基准测试的主函数。”””
print(“— LangChain Python: 流式传输首令牌时间测试 (200 次迭代) —“)

llm = Ollama(model=MODEL_NAME)
ttfts = []

print(f"正在运行 {NUM_ITERATIONS} 次迭代,模型为 '{MODEL_NAME}'...n")

for i in range(NUM_ITERATIONS):
    try:
        start_time = time.perf_counter()

        # 1. `.stream()` 返回一个迭代器。此时并未发起网络调用,这是一个惰性操作。
        stream_iterator = llm.stream(LONG_PROMPT)

        # 2. 当我们首次尝试从迭代器获取项时,才会发起实际的网络请求。
        # 我们测量的 TTFT 就是这次 `next()` 调用所花费的时间。
        next(stream_iterator)

        first_token_time = time.perf_counter()

        # 3. 计算并存储 TTFT。
        ttft = first_token_time - start_time
        ttfts.append(ttft)
        print(f"迭代 {i + 1}: TTFT = {ttft * 1000:.2f}ms")

        # 4. 非常重要:必须消费整个迭代器以关闭连接。
        # 否则,底层的 HTTP 连接可能残留在连接池中,耗尽资源并导致后续测试失败或挂起。
        # 这是与 Go 回调模型在资源管理上的一个关键区别。
        for _ in stream_iterator:
            pass

    except Exception as e:
        print(f"警告: 迭代 {i + 1} 失败: {e}")

print("n--- LangChain Python 基准测试结果 ---")
calculate_and_print_stats(ttfts)

“`

在 Python 中,理解迭代器的惰性至关重要。调用 llm.stream(LONG_PROMPT) 仅返回迭代器对象,并未发出网络请求;直到执行 next(stream_iterator) 时请求才真正发送。代码首先启动计时器,然后调用 stream() 并立即调用 next(),程序会阻塞在 next() 调用处,直到服务器返回第一个令牌。此时记录的时间差即为 TTFT。最后,必须消费完整个迭代器以关闭底层 HTTP 连接,否则连接池资源可能被耗尽。

运行 TTFT 基准测试:

“`bash

运行 Go 版本

go run ttft.go

运行 Python 版本

python ttft.py
“`

“`
— LangChain Go 基准测试结果 —
总迭代次数: 200
总耗时: 28.15s
最小延迟: 135.2ms
最大延迟: 159.8ms
平均延迟: 140.75ms
标准差: 4.1ms

— LangChain Python 基准测试结果 —
总迭代次数: 200
总耗时: 42.6743s
最小延迟: 205.10ms
最大延迟: 251.56ms
平均延迟: 213.37ms
标准差: 15.88ms
“`

在 TTFT 评估中,Go 的表现同样更优。

Go 的平均 TTFT 为 140.75ms,相比 LangChain Python 的 213.37ms,速度提升约 51%。

Go 能够近乎即时地处理首个数据块并触发回调,而 Python 额外的抽象层使其稍慢一步。

对于任何实时、对话式 AI 应用,初始响应体验至关重要。更快的 TTFT 直接转化为更好的用户体验。

第二部分:生产级 RAG 管道测试

在建立了核心 LLM 性能基线之后,我们可以进入更复杂、也更贴近生产环境的场景:构建完整的检索增强生成(RAG)管道。

在生产环境中,Go 与 Python 在 RAG 实现上的差异将更加明显,尤其是在效率和吞吐量至关重要的场景下。

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测

RAG 系统的性能不仅取决于最终的 LLM 调用,更在于……

它是多个步骤性能的总和:数据加载、文档切分、向量嵌入、存储与检索。任何一步的瓶颈都可能拖累整体性能。

本部分我们将从零开始搭建完整的文档摄取与查询管道,并对每个关键步骤进行基准测试。

大文档切分:CPU 受限任务

在处理文档之前,必须先将其拆分为更小、更易于管理的片段。这是一个纯 CPU 受限的任务,涉及大量的字符串处理操作。

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测

这是摄取管道的第一大步骤,其效率直接影响处理新信息的速度。

一个低效的切分器在需要同时摄取成千上万文档时会成为严重瓶颈。

我们先看 Go 实现。代码加载一个 10MB 的大文件,并多次运行 RecursiveCharacterTextSplitter 以获得稳定的性能数据。

“`go
package main

// — Configuration —
const (
largeFilePath = “../data_large/large_document.txt”
chunkSize = 1000
chunkOverlap = 200
numIterations = 5 // 重复运行以获得稳定的平均值
)

func main() {
fmt.Println(“— LangChainGo: Text Splitter Throughput Test —“)

// 1. 将大文档加载到内存中。
// 此操作在循环外进行,以确保仅对文本分割进行基准测试,而非文件 I/O。
content, err := os.ReadFile(largeFilePath)
if err != nil {
    log.Fatalf("Failed to read large document file: %v", err)
}
doc := []schema.Document{{PageContent: string(content)}}
docSizeMB := float64(len(content)) / (1024 * 1024)
fmt.Printf("Loaded document of size: %.2f MBn", docSizeMB)

// 2. 初始化文本分割器。
splitter := textsplitter.NewRecursiveCharacter(
    textsplitter.WithChunkSize(chunkSize),
    textsplitter.WithChunkOverlap(chunkOverlap),
)

var latencies []time.Duration
var totalChunks int

// --- 性能分析设置 ---
var startMem, endMem runtime.MemStats
runtime.ReadMemStats(&startMem)

fmt.Printf("nRunning %d splitting iterations...n", numIterations)
totalStart := time.Now()

// 3. 运行基准测试循环。
for i := 0; i < numIterations; i++ {
    iterStart := time.Now()

    // 这是我们要测量的核心 CPU 密集型操作。
    chunks, err := textsplitter.SplitDocuments(splitter, doc)
    if err != nil {
        log.Fatalf("Iteration %d failed: %v", i+1, err)
    }

    latency := time.Since(iterStart)
    latencies = append(latencies, latency)
    totalChunks = len(chunks) // 每次应相同
    fmt.Printf("Iteration %d: Split into %d chunks in %vn", i+1, totalChunks, latency)
}

totalDuration := time.Since(totalStart)
runtime.ReadMemStats(&endMem)

// --- 计算并输出指标 ---
var totalLatency time.Duration
for _, l := range latencies {
    totalLatency += l
}
avgLatency := totalLatency / time.Duration(numIterations)
throughputMBps := (docSizeMB * float64(numIterations)) / totalDuration.Seconds()
throughputChunksps := float64(totalChunks*numIterations) / totalDuration.Seconds()

heapAlloc := endMem.Alloc - startMem.Alloc
totalAlloc := endMem.TotalAlloc - startMem.TotalAlloc
numGC := endMem.NumGC - startMem.NumGC
pauseTotalMs := float64(endMem.PauseTotalNs-startMem.PauseTotalNs) / 1_000_000

fmt.Println("n--- LangChainGo Splitting Results ---")
fmt.Printf("Average Latency per Run: %vn", avgLatency)
fmt.Printf("Throughput (Data Size):  %.2f MB/sn", throughputMBps)
fmt.Printf("Throughput (Chunks):     %.2f chunks/sn", throughputChunksps)
fmt.Println("n--- Memory & GC Metrics ---")
fmt.Printf("Peak Heap Increase:      %.2f KBn", float64(heapAlloc)/1024)
fmt.Printf("Total Alloc (Churn):     %.2f MBn", float64(totalAlloc)/(1024*1024))
fmt.Printf("Number of GC Runs:       %dn", numGC)
fmt.Printf("Total GC Pause Time:     %.2f msn", pauseTotalMs)

}
“`

此代码的设计旨在隔离并准确测量文本分割器本身的性能。[[IMAGE_X]]

  1. 仅加载一次磁盘文档,置于循环外,确保只测切分性能;
  2. 基准中重复调用 textsplitter.SplitDocuments(负责递归切分的重活);
  3. 记录每轮耗时,同时跟踪内存与 GC 指标,观察 Go 在 CPU 压力下资源管理效率。

再看 Python 实现对比:

“`python

— Configuration —

LARGE_FILE_PATH = “../data_large/large_document.txt”
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200
NUM_ITERATIONS = 5

def get_cpu_time():
“””Gets the CPU time used by the current process.”””
process = psutil.Process(os.getpid())
return process.cpu_times().user

def get_memory_usage():
“””Gets the current memory usage (RSS) of the process in bytes.”””
process = psutil.Process(os.getpid())
return process.memory_info().rss

def main():
“””Main function to run the text splitter benchmark.”””
print(“— LangChain Python: Text Splitter Throughput Test —“)

# 1. Load the document.
loader = TextLoader(LARGE_FILE_PATH, encoding="utf-8")
doc = loader.load()
doc_size_mb = os.path.getsize(LARGE_FILE_PATH) / (1024 * 1024)
print(f"Loaded document of size: {doc_size_mb:.2f} MB")

# 2. Initialize the text splitter.
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=CHUNK_SIZE,
    chunk_overlap=CHUNK_OVERLAP
)

latencies = []
total_chunks = 0

# --- Profiling Setup ---
start_mem = get_memory_usage()
start_cpu = get_cpu_time()

print(f"nRunning {NUM_ITERATIONS} splitting iterations...n")
total_start_time = time.perf_counter()

# 3. Run the benchmark loop.
for i in range(NUM_ITERATIONS):
    iter_start_time = time.perf_counter()

    # This is the core CPU-bound operation.
    chunks = text_splitter.split_documents(doc)

    latency = time.perf_counter() - iter_start_time
    latencies.append(latency)
    total_chunks = len(chunks)
    print(f"Iteration {i + 1}: Split into {total_chunks} chunks in {latency:.4f}s")

total_duration = time.perf_counter() - total_start_time
end_mem = get_memory_usage()
end_cpu = get_cpu_time()

# --- Calculate and Print Metrics ---
avg_latency = np.mean(latencies)
throughput_mb_ps = (doc_size_mb * NUM_ITERATIONS) / total_duration
throughput_chunks_ps = (total_chunks * NUM_ITERATIONS) / total_duration

mem_increase = end_mem - start_mem
cpu_used = end_cpu - start_cpu

print("n--- LangChain Python Splitting Results ---")
print(f"Average Latency per Run: {avg_latency:.4f}s")
print(f"Throughput (Data Size):  {throughput_mb_ps:.2f} MB/s")
print(f"Throughput (Chunks):     {throughput_chunks_ps:.2f} chunks/s")
print("n--- Resource Metrics ---")
print(f"Memory Usage Increase (RSS): {mem_increase / 1024:.2f} KB")
print(f"Total CPU Time Used:         {cpu_used:.4f}s")

if name == “main“:
main()
“`

逻辑相同:

  1. TextLoader 加载文档,初始化 RecursiveCharacterTextSplitter
  2. 循环 NUM_ITERATIONS 次,计时 split_documents(doc)
  3. psutil 统计总 CPU 时间与内存增长(RSS)。

运行对比

“`bash

运行 Go 版本

go run splitter.go

运行 Python 版本

python splitter.py
“`

LangChainGo: TextSplitter 吞吐测试
Loaded document of size: 10.05MB

LangChainGo 切分结果
Average Latency per Run: 151.2ms
Throughput (Data Size): 66.47MB/s
Throughput (Chunks): 83088.62 chunks/s
内存与 GC 指标
Total Alloc (Churn): 95.15MB
Total GC Pause Time: 0.81ms

LangChain Python: TextSplitter 吞吐测试
Loaded document of size: 10.05MB

LangChain Python 切分结果
Average Latency per Run: 3.5476s
Throughput (Data Size): 2.83MB/s
Throughput (Chunks): 3543.83 chunks/s
资源指标
Total CPU Time Used: 17.5123s

结果差异显著。

LangChainGo 平均耗时 151ms 完成 10MB 文件处理,吞吐量达 66.47 MB/s;LangChain Python 平均耗时 3.5s,吞吐量仅为 2.83 MB/s。

Go 版本速度提升约 23 倍。对于字符串处理这类 CPU 密集型任务,Go 的编译型语言特性优势明显——直接编译为本地机器码;而 Python 的解释器与全局解释器锁(GIL)则引入了显著开销,这一点也体现在其较高的 Total CPU Time Used 上。

此外,Go 的垃圾回收器在内存分配与暂停时间方面也表现稳定。

批量 Embedding:处理吞吐

文档切分完成后,下一步是将其转换为向量嵌入(Embedding)。此过程既涉及 CPU(分词),也涉及网络 I/O(将批次数据发送至 Ollama)。效率的关键在于批处理(Batching)与并发(Concurrency)。

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测

我们将测试两种策略:
1. 顺序批量处理:单线程处理所有文本块。
2. 并发批量处理:多工作线程并行处理批次。

这将展示框架在并行网络请求与 CPU 资源管理方面的能力。

首先查看 Go 实现。我们使用 goroutine 构建一个工作池(worker pool)来进行并发测试。

“`go
package main

// — 配置 —
const (
largeFilePath = “../data_large/large_document.txt”
modelName = “llama3:8b”
chunkSize = 1000
chunkOverlap = 200
batchSize = 100 // 单个嵌入请求发送的文档数量
concurrencyLevel = 8 // 并发测试的并行工作线程数
)

func main() {
fmt.Println(“— LangChainGo: Embedding Batching Performance Test —“)

// 1. 准备数据:加载文档并切分为文本块。
chunks := prepareChunks()
fmt.Printf("Prepared %d text chunks for embedding.n", len(chunks))

// 2. 初始化嵌入器。
llm, err := ollama.New(ollama.WithModel(modelName))
if err != nil {
    log.Fatalf("Failed to create LLM for embedder: %v", err)
}
embedder, err := embeddings.NewEmbedder(llm)
if err != nil {
    log.Fatalf("Failed to create embedder: %v", err)
}

// --- 运行顺序批量测试 ---
runSequentialTest(embedder, chunks)

// --- 运行并发批量测试 ---
runConcurrentTest(embedder, chunks)

}

// prepareChunks 加载大文件并将其切分为文本块。
func prepareChunks() []schema.Document {
content, err := os.ReadFile(largeFilePath)
if err != nil {
log.Fatalf(“Failed to read large document file: %v”, err)
}
doc := []schema.Document{{PageContent: string(content)}}
splitter := textsplitter.NewRecursiveCharacter(
textsplitter.WithChunkSize(chunkSize),
textsplitter.WithChunkOverlap(chunkOverlap),
)
chunks, err := textsplitter.SplitDocuments(splitter, doc)
if err != nil {
log.Fatalf(“Failed to split documents: %v”, err)
}
return chunks
}
“`

然后编写顺序批处理函数,负责将 chunk 做 embedding:

“`go
// runSequentialTest 对文档块进行顺序批处理嵌入并执行基准测试。
func runSequentialTest(embedder embeddings.Embedder, docs []schema.Document) {
fmt.Println(“n— Starting Sequential Embedding Test —“)

docContents := make([]string, len(docs))
for i, doc := range docs {
    docContents[i] = doc.PageContent
}

start := time.Now()
var startMem runtime.MemStats
runtime.ReadMemStats(&startMem)

// langchaingo 中的 CreateEmbedding 函数会自动处理批处理。
_, err := embedder.CreateEmbedding(context.Background(), docContents)
if err != nil {
    log.Fatalf("Sequential embedding failed: %v", err)
}
duration := time.Since(start)
var endMem runtime.MemStats
runtime.ReadMemStats(&endMem)
throughput := float64(len(docs)) / duration.Seconds()
totalAlloc := endMem.TotalAlloc - startMem.TotalAlloc

fmt.Println("--- Go Sequential Results ---")
fmt.Printf("Total Time:         %vn", duration)
fmt.Printf("Throughput:         %.2f docs/secn", throughput)
fmt.Printf("Total Alloc (Churn): %.2f MBn", float64(totalAlloc)/(1024*1024))

}

// runConcurrentTest 使用多个并行工作协程对文档块进行并发嵌入并执行基准测试。
func runConcurrentTest(embedder embeddings.Embedder, docs []schema.Document) {
fmt.Println(“n— Starting Concurrent Embedding Test —“)

// 创建一个通道来向工作协程分发任务。
tasks := make(chan string, len(docs))
for _, doc := range docs {
    tasks <- doc.PageContent
}
close(tasks)
var wg sync.WaitGroup

start := time.Now()
var startMem runtime.MemStats
runtime.ReadMemStats(&startMem)

// 1. 启动并发工作协程。
for i := 0; i < concurrencyLevel; i++ {
    wg.Add(1)
    go func(workerID int) {
        defer wg.Done()

        batch := make([]string, 0, batchSize)

        // 每个工作协程从通道中拉取任务,直到通道为空。
        for task := range tasks {
            batch = append(batch, task)
            if len(batch) == batchSize {
                // 当一个批次填满时,对其进行嵌入。
                _, err := embedder.CreateEmbedding(context.Background(), batch)
                if err != nil {
                    log.Printf("Worker %d failed: %v", workerID, err)
                }
                // 重置批次。
                batch = make([]string, 0, batchSize)
            }
        }

        // 嵌入最后一个批次中剩余的任何项目。
        if len(batch) > 0 {
            _, err := embedder.CreateEmbedding(context.Background(), batch)
            if err != nil {
                log.Printf("Worker %d failed on final batch: %v", workerID, err)
            }
        }
    }(i)
}

// 2. 等待所有工作协程完成。
wg.Wait()
duration := time.Since(start)
var endMem runtime.MemStats
runtime.ReadMemStats(&endMem)
throughput := float64(len(docs)) / duration.Seconds()
totalAlloc := endMem.TotalAlloc - startMem.TotalAlloc
fmt.Println("--- Go Concurrent Results ---")
fmt.Printf("Total Time:         %vn", duration)
fmt.Printf("Throughput:         %.2f docs/secn", throughput)
fmt.Printf("Total Alloc (Churn): %.2f MBn", float64(totalAlloc)/(1024*1024))

}
“`

这里我们采用典型的 worker pool 模式:

  1. 创建 tasks channel 并填入所有待处理的文本块,启动 concurrencyLevel 个 goroutine 作为工作协程。
  2. 每个工作协程从 channel 中拉取任务,当累积的任务数量达到预设的 batchSize 后,统一调用 CreateEmbedding 接口进行批处理。

Go 语言轻量级的 goroutine 机制天然适合处理 I/O 密集型并发任务,这种模式有助于充分利用 Ollama 服务器的网络带宽和处理能力。LangChain系统性能深度对比:Python与Go在AI应用中的实战评测

Python 版本使用 ThreadPoolExecutor 实现并发处理:

“`python

— Configuration —

LARGE_FILE_PATH = “../data_large/large_document.txt”
MODEL_NAME = “llama3:8b”
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200
BATCH_SIZE = 100
CONCURRENCY_LEVEL = 8
logging.basicConfig(level=logging.INFO, format=’%(asctime)s – %(levelname)s – %(message)s’)

def get_cpu_time():
“””Gets the CPU time used by the current process.”””
process = psutil.Process(os.getpid())
return process.cpu_times().user

def get_memory_usage():
“””Gets the current memory usage (RSS) of the process in bytes.”””
process = psutil.Process(os.getpid())
return process.memory_info().rss

def prepare_chunks():
“””Loads a large file and splits it into text chunks.”””
loader = TextLoader(LARGE_FILE_PATH, encoding=”utf-8″)
doc = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
chunks = text_splitter.split_documents(doc)
return [c.page_content for c in chunks]

def run_sequential_test(embedder, chunks):
“””Benchmarks embedding chunks one batch at a time.”””
print(“n— Starting Sequential Embedding Test —“)

start_time = time.perf_counter()
start_mem = get_memory_usage()
start_cpu = get_cpu_time()
try:
    embedder.embed_documents(chunks)
except Exception as e:
    logging.fatal(f"Sequential embedding failed: {e}")
    return
duration = time.perf_counter() - start_time
end_mem = get_memory_usage()
end_cpu = get_cpu_time()
throughput = len(chunks) / duration
mem_increase = end_mem - start_mem
cpu_used = end_cpu - start_cpu

print("--- Python Sequential Results ---")
print(f"Total Time:         {duration:.4f}s")
print(f"Throughput:         {throughput:.2f} docs/sec")
print(f"Memory Increase (RSS): {mem_increase / 1024:.2f} KB")
print(f"Total CPU Time Used:   {cpu_used:.4f}s")

def embed_batch(embedder, batch):
“””A helper function for a single worker to embed one batch.”””
try:
embedder.embed_documents(batch)
return len(batch)
except Exception as e:
logging.warning(f”A batch failed to embed: {e}”)
return 0

def run_concurrent_test(embedder, chunks):
“””Benchmarks embedding chunks using multiple parallel workers.”””
print(“n— Starting Concurrent Embedding Test —“)

start_time = time.perf_counter()
start_mem = get_memory_usage()
start_cpu = get_cpu_time()

with ThreadPoolExecutor(max_workers=CONCURRENCY_LEVEL) as executor:
    batches = [chunks[i:i + BATCH_SIZE] for i in range(0, len(chunks), BATCH_SIZE)]

    futures = [executor.submit(embed_batch, embedder, batch) for batch in batches]

    for future in as_completed(futures):
        future.result()
duration = time.perf_counter() - start_time
end_mem = get_memory_usage()
end_cpu = get_cpu_time()
throughput = len(chunks) / duration
mem_increase = end_mem - start_mem
cpu_used = end_cpu - start_cpu
print("--- Python Concurrent Results ---")
print(f"Total Time:         {duration:.4f}s")
print(f"Throughput:         {throughput:.2f} docs/sec")
print(f"Memory Increase (RSS): {mem_increase / 1024:.2f} KB")
print(f"Total CPU Time Used:   {cpu_used:.4f}s")

def main():
print(“— LangChain Python: Embedding Batching Performance Test —“)
chunks = prepare_chunks()
print(f”Prepared {len(chunks)} text chunks for embedding.”)

embedder = OllamaEmbeddings(model=MODEL_NAME)

run_sequential_test(embedder, chunks)
run_concurrent_test(embedder, chunks)

if name == “main“:
main()
“`

Python 使用标准库的 ThreadPoolExecutor 管理线程池。我们先将 chunk 切成 batches,再提交任务给线程池,并通过 as_completed 等待所有任务完成。

运行基准:

“`bash

running go version

go run embedding.go

running python version

python embedding.py
“`

“`
— LangChainGo:EmbeddingBatchingPerformanceTest —
— GoSequentialResults —
Total Time: 1m31s
Throughput: 138.02 docs/sec
— GoConcurrentResults —
Total Time: 22.5s
Throughput: 558.22 docs/sec

— LangChain Python:EmbeddingBatchingPerformanceTest —
— PythonSequentialResults —
Total Time: 128.1234s
Throughput: 98.03 docs/sec
— PythonConcurrentResults —
Total Time: 49.8123s
Throughput: 252.15 docs/sec
“`

结果表明两点:其一,并发对两种语言都有显著提升。Go 吞吐从 138 docs/sec 提升到 558 docs/sec(约 4 倍);Python 则从 98 docs/sec 提升至 252 docs/sec(约 2.6 倍)。

其二,Go 的并发实现更高效:

在 8 个并发 worker 下,LangChainGo 的吞吐为 558 docs/sec,是 LangChain Python(252 docs/sec)的 2.2 倍以上。

这是 Go 轻量级并发模型的优势所在。相比 Python 线程,goroutine 开销极低;Go 的调度器对 I/O 受限任务高度优化,能更有效地管理并行网络请求,从而获得更高总体吞吐。在需要实时对数据流做 embedding 的生产系统中,这是关键优势。

完整摄取管道

现在我们把上述两步结合起来,对完整的摄取管道进行基准:从磁盘加载原始文件,到 embedding 并存入 ChromaDB。这样可得到端到端的真实摄取性能衡量。

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测 Full ingestion(作者 Fareed Khan)

Go 实现是前述组件的顺序拼接:加载、切分、存储。

“`go
package main

func main() {
fmt.Println(“— LangChainGo: RAG Ingestion Throughput Test —“)

totalStart := time.Now()

// 1. Initialize Embedder
ollamaLLM, err := ollama.New(ollama.WithModel(modelName))
if err != nil {
    log.Fatalf("Failed to create Ollama client for embeddings: %v", err)
}
embedder, err := embeddings.NewEmbedder(ollamaLLM)
if err != nil {
    log.Fatalf("Failed to create embedder: %v", err)
}

// 2. Initialize Vector Store (ChromaDB)
store, err := chroma.New(
    chroma.WithChromaURL("http://localhost:8000"),
    chroma.WithNameSpace(collectionName),
    chroma.WithEmbedder(embedder),
)
if err != nil {
    log.Fatalf("Failed to create Chroma vector store: %v", err)
}

// 3. Load Documents from Directory
loadStart := time.Now()
loader := documentloaders.NewDirectory(dataDir)
docs, err := loader.Load(context.Background())
if err != nil {
    log.Fatalf("Failed to load documents: %v", err)
}
loadDuration := time.Since(loadStart)
fmt.Printf("Step 1: Loaded %d documents in %vn", len(docs), loadDuration)

// 4. Split Documents into Chunks
splitStart := time.Now()
splitter := textsplitter.NewRecursiveCharacter(
    textsplitter.WithChunkSize(chunkSize),
    textsplitter.WithChunkOverlap(chunkOverlap),
)
chunks, err := textsplitter.SplitDocuments(splitter, docs)
if err != nil {
    log.Fatalf("Failed to split documents: %v", err)
}
splitDuration := time.Since(splitStart)
fmt.Printf("Step 2: Split %d documents into %d chunks in %vn", len(docs), len(chunks), splitDuration)

// 5. Add Documents to Vector Store (Embedding + Storing)
storeStart := time.Now()
_, err = store.AddDocuments(context.Background(), chunks)
if err != nil {
    _ = store.RemoveCollection(context.Background())
    log.Fatalf("Failed to add documents to vector store: %v", err)
}
storeDuration := time.Since(storeStart)
fmt.Printf("Step 3: Embedded and stored %d chunks in %vn", len(chunks), storeDuration)
totalDuration := time.Since(totalStart)
fmt.Println("n--- LangChainGo Ingestion Results ---")
fmt.Printf("Total time to ingest %d documents: %vn", len(docs), totalDuration)

fmt.Println("Cleaning up ChromaDB collection...")
if err := store.RemoveCollection(context.Background()); err != nil {
    log.Printf("Warning: failed to remove collection '%s': %vn", collectionName, err)
}

}
“`

按顺序执行每个步骤并计时,其中 store.AddDocuments 会执行 embedding 并与 ChromaDB 通信,langchaingo 会在内部自动处理 batching。

“`python
def main():
“””Main function to run the ingestion benchmark.”””
print(“— LangChain Python: RAG Ingestion Throughput Test —“)

total_start = time.perf_counter()

# 1. Initialize Embedder
embeddings = OllamaEmbeddings(model=MODEL_NAME)

# 2. Initialize Vector Store (ChromaDB)
client = chromadb.HttpClient(host='localhost', port=8000)

try:
    client.delete_collection(name=COLLECTION_NAME)
    logging.info(f"Existing collection '{COLLECTION_NAME}' deleted for a clean test.")
except Exception:
    logging.info(f"Collection '{COLLECTION_NAME}' did not exist, creating new.")
    pass

# 3. Load Documents from Directory
load_start = time.perf_counter()
loader = DirectoryLoader(DATA_DIR, glob="**/*.md")
docs = loader.load()
load_duration = time.perf_counter() - load_start
print(f"Step 1: Loaded {len(docs)} documents in {load_duration:.4f}s")

# 4. Split Documents into Chunks
split_start = time.perf_counter()
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=CHUNK_SIZE,
    chunk_overlap=CHUNK_OVERLAP
)
chunks = text_splitter.split_documents(docs)
split_duration = time.perf_counter() - split_start
print(f"Step 2: Split {len(docs)} documents into {len(chunks)} chunks in {split_duration:.4f}s")

# 5. Add Documents to Vector Store (Embedding + Storing)
store_start = time.perf_counter()
try:
    Chroma.from_documents(
        documents=chunks,
        embedding=embeddings,
        collection_name=COLLECTION_NAME,
        client=client
    )
except Exception as e:
    logging.fatal(f"Failed to add documents to vector store: {e}")
    return
store_duration = time.perf_counter() - store_start
print(f"Step 3: Embedded and stored {len(chunks)} chunks in {store_duration:.4f}s")

total_duration = time.perf_counter() - total_start
print("n--- LangChain Python Ingestion Results ---")
print(f"Total time to ingest {len(docs)} documents: {total_duration:.4f}s")

print("Cleaning up ChromaDB collection...")
try:
    client.delete_collection(name=COLLECTION_NAME)
except Exception as e:
    logging.warning(f"Warning: failed to remove collection '{COLLECTION_NAME}': {e}")

if name == “main“:
main()
“`

Python 代码结构与此前展示的 Go 版本基本一致。其核心在于使用了 LangChain 提供的 Chroma.from_documents 高阶封装方法,该方法将文档块(chunks)的向量化(embedding)与写入向量数据库这两个步骤合并为一次原子操作,简化了开发流程。[[IMAGE_X]]

运行:

“`bash

running go version

go run ingestion.go

running python version

python ingestion.py
“`

“`
— LangChainGo: RAG Ingestion Throughput Test —
Step 1: Loaded 50 documents in 17.8ms
Step 2: Split 50 documents into 853 chunks in 45.1ms
Step 3: Embedded and stored 853 chunks in 1m 18s

— LangChainGo Ingestion Results —
Total time to ingest 50 documents: 1m 18s

— LangChain Python: RAG Ingestion Throughput Test —
Step 1: Loaded 50 documents in 0.1105s
Step 2: Split 50 documents into 853 chunks in 0.6158s
Step 3: Embedded and stored 853 chunks in 2m 15s

— LangChain Python Ingestion Results —
Total time to ingest 50 documents: 135.7263s
“`

整体结果与之前各步骤一致:CPU 受限的切分步骤,Go 远快于 Python(45ms vs 615ms);以 I/O 为主的 embedding 与存储步骤,Go 同样显著更快。

总体上,LangChainGo 完成 50 篇文档摄取耗时 78 秒,LangChain Python 需要 135 秒。

端到端摄取速度提升约 73%。对于需要持续更新知识库、不断摄取新文档的应用,这样的速度差异将转化为实际运营优势。

检索延迟:信息能多快被找到?

数据入库后,需要测试检索速度。检索延迟是 RAG 查询总时延的关键组成部分,包含两步:

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测 Retrieval latency(作者 Fareed Khan)

  1. 对用户查询进行 embedding;
  2. 使用该 embedding 到 ChromaDB 进行相似度搜索。

我们将运行 100 次以得到稳定的平均值。

Go 版本连接已有的 ChromaDB 集合,并创建一个 retriever 对象:

“`go
package main

// — Configuration —
const (
numIterations = 100
modelName = “llama3:8b”
collectionName = “langchaingo-ingestion-test”
query = “What is the main topic of these documents?”
topK = 5
)
func main() {
fmt.Println(“— LangChainGo: RAG Retrieval Latency Test —“)

// 1. Initialize Embedder.
ollamaLLM, err := ollama.New(ollama.WithModel(modelName))
if err != nil {
    log.Fatalf("Failed to create Ollama client for embeddings: %v", err)
}
embedder, err := embeddings.NewEmbedder(ollamaLLM)
if err != nil {
    log.Fatalf("Failed to create embedder: %v", err)
}

// 2. Initialize Vector Store and connect to the existing collection.
store, err := chroma.New(
    chroma.WithChromaURL("http://localhost:8000"),
    chroma.WithNameSpace(collectionName),
    chroma.WithEmbedder(embedder),
)
if err != nil {
    log.Fatalf("Failed to create Chroma vector store: %v", err)
}

// 3. Create a retriever from the vector store.
retriever := vectorstores.ToRetriever(store, topK)
latencies := make([]time.Duration, 0, numIterations)
ctx := context.Background()
fmt.Printf("Running %d iterations to retrieve %d documents...nn", numIterations, topK)

// 5. Run the benchmark loop.
for i := 0; i < numIterations; i++ {
    start := time.Now()

    // 6. Execute the retrieval.
    docs, err := retriever.GetRelevantDocuments(ctx, query)
    if err != nil {
        log.Printf("Warning: Iteration %d failed: %vn", i+1, err)
        continue
    }
    latency := time.Since(start)
    latencies = append(latencies, latency)
    if len(docs) != topK {
        log.Printf("Warning: Expected %d docs, but got %dn", topK, len(docs))
    }
    fmt.Printf("Iteration %d: %vn", i+1, latency)
}
fmt.Println("n--- LangChainGo Retrieval Results ---")
calculateAndPrintStats(latencies)

}
“`

该基准测试的核心是 retriever.GetRelevantDocuments:它一并完成创建查询 embedding(调用 Ollama)与到 ChromaDB 查询 top-k 最相似文档。

“`python

— Configuration —

NUM_ITERATIONS = 100
MODEL_NAME = “llama3:8b”
COLLECTION_NAME = “langchaingo-ingestion-test”
QUERY = “What is the main topic of these documents?”
TOP_K = 5

logging.basicConfig(level=logging.INFO, format=’%(asctime)s – %(levelname)s – %(message)s’)

def main():
“””Main function to run the retrieval benchmark.”””
print(“— LangChain Python: RAG Retrieval Latency Test —“)

# 1. Initialize Embedder.
embeddings = OllamaEmbeddings(model=MODEL_NAME)

# 2. Initialize Vector Store client.
client = chromadb.HttpClient(host='localhost', port=8000)

vector_store = Chroma(
    collection_name=COLLECTION_NAME,
    embedding_function=embeddings,
    client=client
)

# 3. Create a retriever from the vector store.
retriever = vector_store.as_retriever(search_kwargs={"k": TOP_K})
latencies = []
print(f"Running {NUM_ITERATIONS} iterations to retrieve {TOP_K} documents...n")

# 5. Run the benchmark loop.
for i in range(NUM_ITERATIONS):
    try:
        start_time = time.perf_counter()

        # 6. Execute the retrieval.
        docs = retriever.invoke(QUERY)

        end_time = time.perf_counter()
        latency = end_time - start_time
        latencies.append(latency)

        if len(docs) != TOP_K:
            logging.warning(f"Expected {TOP_K} docs, but got {len(docs)}")
        print(f"Iteration {i + 1}: {latency:.4f}s")
    except Exception as e:
        logging.warning(f"Warning: Iteration {i + 1} failed: {e}")
print("n--- LangChain Python Retrieval Results ---")
calculate_and_print_stats(latencies)

if name == “main“:
main()
“`

在此代码中,我们实例化 Chroma 向量库以连接现有集合,使用 .as_retriever() 方法创建检索器,并通过循环调用 .invoke() 方法进行计时。

性能测试结果如下:

“`
— LangChainGo: RAG Retrieval Latency Test —
— LangChainGo Retrieval Results —
Total Iterations: 100
Total Time: 23.78s
Min Latency: 230.1ms
Max Latency: 255.9ms
Average Latency: 237.8ms
Std Deviation: 5.9ms

— LangChain Python: RAG Retrieval Latency Test —
— LangChain Python Retrieval Results —
Total Iterations: 100
Total Time: 34.8521s
Min Latency: 330.43ms
Max Latency: 398.81ms
Average Latency: 348.52ms
Std Deviation: 18.55ms
“`

结果与之前的测试趋势一致:

LangChainGo 的平均检索延迟为 237.8ms,比 LangChain Python 的 348.52ms 快了约 46%。

性能优势的来源依然明确:Go 语言在发起 Embedding 网络调用以及处理请求/响应的框架开销上更低。更小的标准差也表明其表现更为稳定。

端到端 RAG:全流程整合验证

最后,我们对完整的端到端 RAG 查询流程进行基准测试,将检索与基于上下文的最终 LLM 回答生成步骤合并。这是对真实场景下查询性能的终极检验。

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测

Go 使用 chains.NewRetrievalQAFromLLM,将检索器(retriever)与大型语言模型(LLM)组装为标准 RAG 工作流。

“`go
package main

// — Configuration —
const (
numIterations = 20
modelName = “llama3:8b”
collectionName = “langchaingo-ingestion-test”
query = “Summarize the key themes from the documents in one paragraph.”
topK = 5
)

func main() {
fmt.Println(“— LangChainGo: End-to-End RAG Latency Test —“)

// 1. Initialize LLM for generation.
llm, err := ollama.New(ollama.WithModel(modelName))
if err != nil {
    log.Fatalf("Failed to create Ollama client: %v", err)
}

// 2. Initialize Embedder.
embedder, err := embeddings.NewEmbedder(llm)
if err != nil {
    log.Fatalf("Failed to create embedder: %v", err)
}

// 3. Initialize Vector Store.
store, err := chroma.New(
    chroma.WithChromaURL("http://localhost:8000"),
    chroma.WithNameSpace(collectionName),
    chroma.WithEmbedder(embedder),
)
if err != nil {
    log.Fatalf("Failed to create Chroma vector store: %v", err)
}

// 4. Create the full RAG chain.
ragChain := chains.NewRetrievalQAFromLLM(llm, vectorstores.ToRetriever(store, topK))
latencies := make([]time.Duration, 0, numIterations)
ctx := context.Background()
inputValues := map[string]any{"query": query}
fmt.Printf("Running %d end-to-end RAG iterations...nn", numIterations)

// 5. Run the benchmark loop.
for i := 0; i < numIterations; i++ {
    start := time.Now()

    // 6. Execute the entire RAG chain with a single call.
    _, err := chains.Call(ctx, ragChain, inputValues)
    if err != nil {
        log.Printf("Warning: Iteration %d failed: %vn", i+1, err)
        continue
    }
    latency := time.Since(start)
    latencies = append(latencies, latency)
    fmt.Printf("Iteration %d: %vn", i+1, latency)
}
fmt.Println("n--- LangChainGo End-to-End RAG Results ---")
calculateAndPrintStats(latencies)

}
“`

chains.Call 调用 ragChain 将整个流程编排好:
1. 调用检索器(retriever)获取相关文档,并将其与用户查询(query)一同填入提示词(prompt)模板。
2. 调用大型语言模型(LLM)生成最终答案。

“`python

— Configuration —

NUM_ITERATIONS = 20
MODEL_NAME = “llama3:8b”
COLLECTION_NAME = “langchaingo-ingestion-test”
QUERY = “Summarize the key themes from the documents in one paragraph.”
TOP_K = 5

logging.basicConfig(level=logging.INFO, format=’%(asctime)s – %(levelname)s – %(message)s’)

def main():
“””Main function to run the end-to-end RAG benchmark.”””
print(“— LangChain Python: End-to-End RAG Latency Test —“)

# 1. Initialize LLM, Embedder, and Vector Store client.
llm = Ollama(model=MODEL_NAME)
embeddings = OllamaEmbeddings(model=MODEL_NAME)
client = chromadb.HttpClient(host='localhost', port=8000)

vector_store = Chroma(
    collection_name=COLLECTION_NAME,
    embedding_function=embeddings,
    client=client
)

# 2. Create the full RAG chain.
rag_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vector_store.as_retriever(search_kwargs={"k": TOP_K}),
    return_source_documents=False
)
latencies = []
print(f"Running {NUM_ITERATIONS} end-to-end RAG iterations...n")

# 3. Run the benchmark loop.
for i in range(NUM_ITERATIONS):
    try:
        start_time = time.perf_counter()

        # 4. Execute the RAG chain.
        rag_chain.invoke({"query": QUERY})

        end_time = time.perf_counter()
        latency = end_time - start_time
        latencies.append(latency)
        print(f"Iteration {i + 1}: {latency:.4f}s")
    except Exception as e:
        logging.warning(f"Warning: Iteration {i + 1} failed: {e}")

print("n--- LangChain Python End-to-End RAG Results ---")
calculate_and_print_stats(latencies)

if name == “main“:
main()
“`

RetrievalQA.from_chain_type 是 LangChain 的标准构造器,其逻辑与 Go 版本保持一致。

端到端性能测试结果如下:

“`
— LangChainGo End-to-End RAG Results —
Total Iterations: 20
Total Time: 54.12s
Min Latency: 2.61s
Max Latency: 2.95s
Average Latency: 2.70s
Std Deviation: 95.4ms

— LangChain Python End-to-End RAG Results —
Total Iterations: 20
Total Time: 83.5432s
Min Latency: 4.0123s
Max Latency: 4.9521s
Average Latency: 4.1771s
Std Deviation: 0.2589s
“`

LangChainGo 的端到端 RAG 查询平均耗时 2.70 秒,比 LangChain Python 的 4.17 秒快 54%。

这一优势是前述各项优势的叠加体现:更低的网络调用开销、更高效的 CPU 处理与更精简的框架逻辑。对于面向用户的生产级 RAG 系统而言,54% 的响应时间缩短是极其显著的改进。

第三部分:智能体架构评估

在 RAG 管道上 Go 已展现出优势,接下来我们将评估场景升级至更复杂的智能体系统。

如果说 RAG 是让 LLM 访问知识,那么智能体则是赋予其行动与推理的能力。

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测

智能体不仅限于回答问题,它能够拆解问题、调用工具、串联多步操作以达成目标。这引入了“推理循环”(思考 -> 行动 -> 观察),其中框架开销会快速累积。

本部分我们将测试 LangChainGo 与 LangChain Python 如何应对这种复杂性。

简单智能体:单次工具调用

我们从最基础的智能体任务开始:回答一个仅需调用一次工具的问题。这是智能体的“Hello, World!”,用于测试框架是否能:

  1. 理解用户意图;
  2. 从工具列表中选择正确的工具;
  3. 执行工具并利用其输出生成最终答案。

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测

此测试旨在度量单次智能体推理循环的开销基线。

我们定义了数个简单的本地工具,以隔离框架性能(避免外部 API 速度的影响):WordLengthToolSimpleCalculatorCounterTool

“`go
package main

// — Configuration —
const (
modelName = “llama3:8b”
)

// — Custom Local Tools —
// These tools perform simple, local computations to isolate framework overhead.

// WordLengthTool calculates the length of a given word.
type WordLengthTool struct{}

func (t WordLengthTool) Name() string { return “WordLengthTool” }
func (t WordLengthTool) Description() string {
return “Calculates the character length of a single word. Input must be one word.”
}
func (t WordLengthTool) Call(_ context.Context, input string) (string, error) {
return strconv.Itoa(len(strings.TrimSpace(input))), nil
}

// SimpleCalculatorTool evaluates a simple mathematical expression.
type SimpleCalculatorTool struct{}

func (t SimpleCalculatorTool) Name() string { return “SimpleCalculator” }
func (t SimpleCalculatorTool) Description() string {
return “A simple calculator that can evaluate basic arithmetic expressions like ‘1+23′. Do not use for multiple operations.”
}
func (t SimpleCalculatorTool) Call(_ context.Context, input string) (string, error) {
// Note: This is a highly insecure way to implement a calculator in a real app!
// For this benchmark, it’s a stand-in for a CPU-bound tool.
if strings.Contains(input, “
“) {
parts := strings.Split(input, “*”)
if len(parts) == 2 {
a, _ := strconv.Atoi(strings.TrimSpace(parts[0]))
b, _ := strconv.Atoi(strings.TrimSpace(parts[1]))
return strconv.Itoa(a * b), nil
}
}
return “invalid expression”, nil
}

// CounterTool simulates a stateful operation.
var counter = 0

type CounterTool struct{}

func (t CounterTool) Name() string { return “CounterTool” }
func (t CounterTool) Description() string {
return “Increments a global counter by one and returns the new value.”
}
func (t CounterTool) Call(_ context.Context, input string) (string, error) {
counter++
return strconv.Itoa(counter), nil
}

// — Benchmark Runner —
func main() {
// Determine which test to run based on the command-line arguments.
if len(os.Args) < 2 {
fmt.Println(“Usage: go run . “)
fmt.Println(“Available tests: single, multi, high_freq”)
os.Exit(1)
}
testName := os.Args[1]

// 1. Initialize the LLM and the set of tools.
llm, err := ollama.New(ollama.WithModel(modelName))
if err != nil {
    log.Fatalf("Failed to create LLM: %v", err)
}

// The agent will have access to all these tools.
availableTools := []tools.Tool{
    WordLengthTool{},
    SimpleCalculatorTool{},
    CounterTool{},
}

// 2. Create the agent executor.
// We use a Zero-Shot ReAct agent, which is a standard choice for this kind of task.
agentExecutor, err := agents.Initialize(
    llm,
    availableTools,
    agents.ZeroShotReactDescription,
)
if err != nil {
    log.Fatalf("Failed to initialize agent: %v", err)
}

// 3. Define the prompts for each test scenario.
prompts := map[string]string{
    "single":    "What is the character length of the word 'phenomenon'?",
    "multi":     "What is 25 multiplied by 4, and what is the character length of the word 'knowledge'?",
    "high_freq": "Using the CounterTool, count from 1 to 5 by calling the tool for each number.",
}
prompt, ok := prompts[testName]
if !ok {
    log.Fatalf("Invalid test name: %s", testName)
}
fmt.Printf("--- LangChainGo: Agent Test '%s' ---n", testName)
fmt.Printf("Prompt: %snn", prompt)

// --- Profiling Setup ---
var startMem, endMem runtime.MemStats
runtime.ReadMemStats(&startMem) // Read memory stats before the run.

// 4. Run the agent and measure performance.
startTime := time.Now()
result, err := chains.Run(context.Background(), agentExecutor, prompt)
if err != nil {
    log.Fatalf("Agent execution failed: %v", err)
}
duration := time.Since(startTime)
runtime.ReadMemStats(&endMem) // Read memory stats after the run.

memAllocated := endMem.Alloc - startMem.Alloc
totalMemAllocated := endMem.TotalAlloc - startMem.TotalAlloc
fmt.Println("--- Agent Final Answer ---")
fmt.Println(result)
fmt.Println("--------------------------n")
fmt.Println("--- Performance Metrics ---")
fmt.Printf("End-to-End Latency: %vn", duration)
fmt.Printf("Memory Allocated (Heap): %d bytes (%.2f KB)n", memAllocated, float64(memAllocated)/1024)
fmt.Printf("Total Memory Allocated (Cumulative): %d bytes (%.2f MB)n", totalMemAllocated, float64(totalMemAllocated)/1024/1024)

}
“`

工具以实现了 tools.Tool 接口的结构体进行定义,其中必须包含 NameDescriptionCall 三个核心字段。在 main 函数中,首先初始化 Ollama LLM,随后通过 agents.Initialize 方法创建基于 ReAct 逻辑的 Agent。基准测试的核心逻辑位于 chains.Run 函数中,该函数负责启动并执行 Agent 的完整推理过程。

Python 使用 @tool 装饰器定义工具(更现代也更简洁):

“`python

— Configuration —

MODEL_NAME = “llama3:8b”

— Custom Local Tools —

Using the @tool decorator is the modern way to define tools in LangChain.

@tool
def WordLengthTool(word: str) -> int:
“””Calculates the character length of a single word. Input must be one word.”””
return len(word.strip())

@tool
def SimpleCalculator(expression: str) -> str:
“””A simple calculator that can evaluate basic arithmetic expressions like ‘1+23′. Do not use for multiple operations.”””
# This is an insecure eval for benchmark purposes only!
try:
if “
” in expression:
parts = [p.strip() for p in expression.split(“*”)]
if len(parts) == 2:
return str(int(parts[0]) * int(parts[1]))
return “invalid expression”
except:
return “error evaluating expression”

counter_val = 0

@tool
def CounterTool(placeholder: str = “”) -> int:
“””Increments a global counter by one and returns the new value. The input is ignored.”””
global counter_val
counter_val += 1
return counter_val

def get_memory_usage():
“””Gets the current memory usage of the process.”””
process = psutil.Process(os.getpid())
# rss: Resident Set Size, the non-swapped physical memory a process has used.
return process.memory_info().rss

def main():
“””Main function to run the agent benchmarks.”””
if len(sys.argv) < 2:
print(“Usage: python agent.py “)
print(“Available tests: single, multi, high_freq”)
sys.exit(1)

test_name = sys.argv[1]

# 1. Initialize LLM and tools.
llm = Ollama(model=MODEL_NAME)
available_tools = [WordLengthTool, SimpleCalculator, CounterTool]

# 2. Create the agent.
# We construct a ReAct prompt template. This defines the agent's reasoning process.
prompt_template = PromptTemplate.from_template(
    """Answer the following questions as best you can. You have access to the following tools:

{tools}
Use the following format:
Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
… (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question
Begin!
Question: {input}
Thought:{agent_scratchpad}”””
)

agent = create_react_agent(llm, available_tools, prompt_template)
agent_executor = AgentExecutor(agent=agent, tools=available_tools, verbose=True)

# 3. Define prompts for each test.
prompts = {
    "single": "What is the character length of the word 'phenomenon'?",
    "multi": "What is 25 multiplied by 4, and what is the character length of the word 'knowledge'?",
    "high_freq": "Using the CounterTool, count from 1 to 5 by calling the tool for each number.",
}
prompt_text = prompts.get(test_name)
if not prompt_text:
    print(f"Invalid test name: {test_name}")
    sys.exit(1)
print(f"--- LangChain Python: Agent Test '{test_name}' ---")
print(f"Prompt: {prompt_text}n")

# --- Profiling Setup ---
start_mem = get_memory_usage()

# 4. Run the agent and measure performance.
start_time = time.perf_counter()
result = agent_executor.invoke({"input": prompt_text})
duration = time.perf_counter() - start_time

end_mem = get_memory_usage()
mem_used = end_mem - start_mem

print("nn--- Agent Final Answer ---")
print(result.get('output'))
print("--------------------------n")
print("--- Performance Metrics ---")
print(f"End-to-End Latency: {duration:.4f}s")
print(f"Memory Usage Increase (RSS): {mem_used} bytes ({mem_used / 1024:.2f} KB)")

if name == “main“:
main()
“`

Python 代码实现相同功能,但结构略有不同。我们使用 @tool 装饰器自动处理工具的模式与描述,显式构建 ReAct 的 PromptTemplate,通过 create_react_agent 组合各组件,最终由 AgentExecutor 作为可运行对象。基准测试围绕 agent_executor.invoke 方法展开。

运行 “single” 测试:

“`bash

运行 Go 版本

go run single_tool.go

运行 Python 版本

python single_tool.py
“`

“`
— LangChainGo: Agent Test ‘single’ —
— Performance Metrics —
End-to-End Latency: 1.95s
Total Memory Allocated (Cumulative): 2.55 MB

— LangChain Python: Agent Test ‘single’ —
— Performance Metrics —
End-to-End Latency: 3.0513s
Memory Usage Increase (RSS): 412.00 KB
“`

即便是如此简单的单次循环任务,性能差异也已相当显著。

LangChainGo 用时 1.95s,LangChain Python 用时 3.05s,Go 快 56%。

Python 解释器、提示词构造、解析 LLM 的“思考”输出、调用工具函数等各环节的开销叠加,最终导致其比编译型的 Go 多出 1 秒以上的延迟。

多跳 Agent:串联多次工具调用

Agent 的真正价值在于多跳推理——将一个工具的输出作为下一步的输入。这要求 Agent 能够运行多次推理循环。

当一个问题需要 Agent 连续使用两个不同工具时,性能表现如何?

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测

例如:“25 乘以 4 等于多少?并给出单词 knowledge 的字符长度。”

此测试的代码与单工具测试完全一致。我们使用同一组工具与同一个 Agent Executor,唯一变化是使用了更复杂的提示词。这很好地测试了 Agent 的推理能力以及框架在多轮循环下的效率。

“`
— LangChainGo: Agent Test ‘multi’ —
— Performance Metrics —
End-to-End Latency: 3.98s
Total Memory Allocated (Cumulative): 5.81 MB

— LangChain Python: Agent Test ‘multi’ —
— Performance Metrics —
End-to-End Latency: 6.8812s
Memory Usage Increase (RSS): 1024.00 KB
“`

性能差距进一步扩大。

LangChainGo 完成两步任务耗时 3.98s;LangChain Python 耗时 6.88s,Go 快 73%。

这是因为在单工具测试中的框架开销,在此场景下被“支付了两次”。每一个“思考 -> 行动 -> 观察”周期,Go 的实现都更轻、更快;串联两个循环后,Go 的初始优势产生复利效应,带来了更大的绝对时间节省。

高频工具使用:测试 Agent 开销

最后一个 Agent 测试旨在尽可能隔离框架开销:

如果给 Agent 一个需要多次调用“非常快速的本地工具”的任务,会怎样?

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测

例如:“用 CounterTool 从 1 数到 5”,强制 Agent 运行 5 次推理循环,同时使每一步的 LLM “思考”时间最小化。这样可以将焦点直接放在框架处理循环机制的效率上。

我们使用相同的 Agent 代码,仅更换了高频调用的提示词。

“`
— LangChainGo: Agent Test ‘high_freq’ —
— Performance Metrics —
End-to-End Latency: 9.12s
Total Memory Allocated (Cumulative): 14.50 MB

— LangChain Python: Agent Test ‘high_freq’ —
— Performance Metrics —
End-to-End Latency: 18.5123s
Memory Usage Increase (RSS): 2856.00 KB
“`

结果再次凸显了 Go 的优势:

LangChainGo 完成 5 次循环耗时 9.12s,而 LangChain Python 为 18.51s。Go 速度超过 2 倍(提升 103%)。

该测试旨在尽量减少等待 LLM 的时间,最大化框架自身循环开销的占比。Python 的解释型特性意味着每一步(解析 LLM 输出、路由到工具、调用工具、格式化观察结果)都要付出微小开销;在五次快速循环中,这些开销成为总延迟的主要原因。而 Go 作为编译型语言,内存效率更高,在快速循环中代价更低。

工作流逻辑:数据转换与路由

实际应用往往需要更结构化的工作流,而非单个 ReAct 循环。工作流可能需要将查询路由至不同的专业链,或在 LLM 调用之间执行自定义数据转换。

这些“胶水代码”的效率,对整体性能同样至关重要。

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测

首先测试一个路由链(Router Chain)。该模式用于智能地将用户查询路由到最适合的处理链,从而节省时间与资源。我们创建两个专项链 mathChainproseChain,以及一个 routerChain 来决定使用哪一个。

“`go
// — Configuration —
const (
modelName = “llama3:8b”
mathQuery = “What is 15 * 12?”
proseQuery = “Write a haiku about a sunset.”
)

func main() {
fmt.Println(“— LangChainGo: Conditional (Router) Chain Test —“)
llm, err := ollama.New(ollama.WithModel(modelName))
if err != nil {
log.Fatalf(“Failed to create LLM: %v”, err)
}

// 2. Define the destination chains.
mathTemplate := prompts.NewPromptTemplate("Evaluate this math expression: {{.input}}", []string{"input"})
mathChain := chains.NewLLMChain(llm, mathTemplate)

proseTemplate := prompts.NewPromptTemplate("Write a creative piece based on this prompt: {{.input}}", []string{"input"})
proseChain := chains.NewLLMChain(llm, proseTemplate)

// 3. Define the Router Chain.
routerTemplate := prompts.NewPromptTemplate(
    `Given the user query, classify it as either "math" or "prose". Respond with only one of these two words.nQuery: {{.input}}`,
    []string{"input"},
)
routerChain := chains.NewLLMChain(llm, routerTemplate)

// --- Run Benchmarks ---
fmt.Println("n--- Testing Math Query ---")
runAndProfile(routerChain, mathChain, proseChain, mathQuery)
fmt.Println("n--- Testing Prose Query ---")
runAndProfile(routerChain, mathChain, proseChain, proseQuery)

}

func runAndProfile(router, math, prose chains.Chain, query string) {
totalStart := time.Now()
var totalLLMDuration time.Duration
ctx := context.Background()

// --- Step 1: Run the router ---
routerStart := time.Now()
routerResult, err := chains.Run(ctx, router, query)
llm1Duration := time.Since(routerStart)
totalLLMDuration += llm1Duration
destination := strings.TrimSpace(strings.ToLower(routerResult))

// --- Step 2: Route to the appropriate destination chain ---
destinationStart := time.Now()
switch destination {
case "math":
    _, err = chains.Run(ctx, math, query)
case "prose":
    _, err = chains.Run(ctx, prose, query)
}
llm2Duration := time.Since(destinationStart)
totalLLMDuration += llm2Duration
totalDuration := time.Since(totalStart)
frameworkOverhead := totalDuration - totalLLMDuration
// ... print results

}
“`

我们手工实现路由逻辑:

  1. 先运行 routerChain 得到分类(”math” 或 “prose”);
  2. 然后通过 switch 语句跳转到目标链。

这种底层控制方式使我们能够精确地区分框架逻辑耗时与 LLM 调用耗时。

Python 使用 LangChain 内置的 MultiPromptChain(更高阶的抽象)来实现:

“`python
import time
from langchain.chains import LLMChain, ConversationChain
from langchain.chains.router import MultiPromptChain
from langchain.chains.router.llm_router import LLMRouterChain, RouterOutputParser
from langchain.prompts import PromptTemplate
from langchain_community.llms import Ollama

def main():
# 1. 初始化共享的 LLM。
llm = Ollama(model=MODEL_NAME)
# 2. 为目标链定义提示词模板。
prompt_infos = [
{“name”: “math”, “description”: “Good for answering questions about math”, //},
{“name”: “prose”, “description”: “Good for writing creative prose”, //},
]
# 3. 创建目标链。
destination_chains = { / … create LLMChains … / }

# 4. 创建路由链。
router_template_str = ("...")
router_prompt = PromptTemplate(/*...*/)
router_chain = LLMRouterChain.from_llm(llm, router_prompt)

# 5. 创建最终的 MultiPromptChain。
chain = MultiPromptChain(
    router_chain=router_chain,
    destination_chains=destination_chains,
    default_chain=ConversationChain(llm=llm, output_key="text"),
    verbose=True,
)
# --- 运行基准测试 ---
run_and_profile(chain, MATH_QUERY)
run_and_profile(chain, PROSE_QUERY)

def run_and_profile(chain, query):
total_start = time.perf_counter()
result = chain.invoke({“input”: query})
total_duration = time.perf_counter() – total_start
# … 打印结果
“`

由于前文已实现大部分组件,此处不再重复。Python 借助 MultiPromptChain 完成路由,它基于各目标链的 description 构造路由提示词。尽管方便,但抽象层的增加也会引入额外开销。

测试结果如下:

“`
— LangChainGo: Conditional (Router) Chain Test —
— Testing Math Query —
— Routing Performance Metrics —
Final Answer: 180
Total End-to-End Latency: 1.402s
Total LLM-Only Time: 1.400s
Framework Logic Overhead: 2.0ms
Correctness: Routed correctly

— LangChain Python: Conditional (Router) Chain Test —
— Testing Math Query —
— Routing Performance Metrics —
Final Answer: 180
Total End-to-End Latency: 2.1534s
Process CPU Time Used: 0.6521s
Correctness: Routed correctly
“`

Go 在路由逻辑上的框架开销仅为 2.0ms;而 LangChain Python 在同一任务上的 Process CPU Time Used 为 652ms。尽管指标不能严格一一对应,但这清晰地表明,Go 以编译型、最小化方式处理两次 LLM 调用的效率远高于 Python 的高层动态抽象。

对于拥有大量决策点、复杂路由逻辑的智能体系统,这种开销在 Python 中可能成为显著的性能瓶颈。当需要快速执行大量动作时,Go 的效率优势十分明显。

第四部分:生产就绪的压力测试

至此,我们已在可控、单一任务条件下完成了基准测试。然而,这与真实世界的复杂场景相去甚远。

生产级 AI 系统通常运行在混乱的环境中,需要持续处理并发请求、应对不可靠的网络,并保持 7×24 小时稳定运行而不崩溃。

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测

本部分我们将从“实验室”走向“实战”,通过模拟真实生产压力,测试哪个框架更能承受极限负载。

我们将对 LangChainGo 与 LangChain Python 进行三类压力测试,以衡量其可扩展性、饱和潜力以及在面对失败时的韧性。

并发 RAG 与智能体系统:应对真实负载

生产服务器不会一次只服务一个用户,而是需要同时处理成百上千的请求。高效管理大量并发请求是可扩展 AI 应用最重要的因素之一。

此测试旨在衡量各框架在高负载场景下的并发模型表现。

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测

我们将模拟 50 个用户同时查询 RAG 系统,以及 25 个用户同时运行多跳智能体任务,以测试框架处理并行 I/O 与 CPU 任务的能力。

“`go
package main

// — Configuration —
const (
concurrencyLevel = 50 // Number of concurrent requests to simulate
modelName = “llama3:8b”
collectionName = “langchaingo-ingestion-test”
topK = 3
)

// A sample list of queries to make the benchmark more realistic.
var queries = []string{
“What are the main themes?”,
“Tell me about the introduction.”,
“Is there a conclusion section?”,
“What is the most complex topic discussed?”,
}

func main() {
fmt.Printf(“— LangChainGo: Concurrent RAG Queries Test (Concurrency: %d) —n”, concurrencyLevel)

// 1. Initialize shared components ONCE.
llm, err := ollama.New(ollama.WithModel(modelName))
if err != nil { /* ... error handling ... */ }
embedder, err := embeddings.NewEmbedder(llm)
if err != nil { /* ... error handling ... */ }
store, err := chroma.New(
    chroma.WithChromaURL("http://localhost:8000"),
    chroma.WithNameSpace(collectionName),
    chroma.WithEmbedder(embedder),
)
if err != nil { /* ... error handling ... */ }
ragChain := chains.NewRetrievalQAFromLLM(llm, vectorstores.ToRetriever(store, topK))

// 2. Prepare for the concurrent benchmark.
var wg sync.WaitGroup
resultsChan := make(chan time.Duration, concurrencyLevel)
errorChan := make(chan error, concurrencyLevel)
fmt.Printf("Dispatching %d concurrent RAG queries...n", concurrencyLevel)
totalStart := time.Now()

// 3. Dispatch worker goroutines.
for i := 0; i < concurrencyLevel; i++ {
    wg.Add(1)
    go func(workerID int) {
        defer wg.Done()

        ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
        defer cancel()
        query := queries[workerID%len(queries)]
        inputValues := map[string]any{"query": query}

        start := time.Now()
        _, err := chains.Call(ctx, ragChain, inputValues)
        latency := time.Since(start)
        if err != nil {
            errorChan <- fmt.Errorf("worker %d failed: %w", workerID, err)
        } else {
            resultsChan <- latency
            fmt.Printf("Worker %d completed in %vn", workerID, latency)
        }
    }(i)
}

// 4. Wait for all goroutines to complete.
wg.Wait()
close(resultsChan)
close(errorChan)

// ... calculate and print results ...

}
“`

该测试代码展示了 Go 语言的原生并发能力。所有 LangChain 组件(LLM、嵌入器、向量存储)仅初始化一次,随后启动 50 个独立的 goroutine(每个模拟一个用户请求)并发执行完整的 RAG 查询流程。通过 sync.WaitGroup 同步并等待所有请求完成。这种基于 goroutine 和 channel 的模型是 Go 处理大规模 I/O 密集型并发任务的高效原生方式。[[IMAGE_X]]

Python 版本使用 asyncio 实现同样目标:

“`python

— Configuration —

CONCURRENCY_LEVEL = 50
MODEL_NAME = “llama3:8b”
COLLECTION_NAME = “langchaingo-ingestion-test”
TOP_K = 3
QUERIES = [
“What are the main themes?”, “Tell me about the introduction.”, “Is there a conclusion section?”,
“What is the most complex topic discussed?”,
]

logging.basicConfig(level=logging.INFO, format=’%(asctime)s – %(levelname)s – %(message)s’)

async def run_rag_query(rag_chain, query, worker_id):
“””Asynchronously runs a single RAG query and returns its latency.”””
start_time = time.perf_counter()
try:
await rag_chain.ainvoke({“query”: query})
latency = time.perf_counter() – start_time
print(f”Worker {worker_id} completed in {latency:.4f}s”)
return latency, None
except Exception as e:
latency = time.perf_counter() – start_time
error_message = f”Worker {worker_id} failed after {latency:.4f}s: {e}”
logging.warning(error_message)
return None, error_message

async def main():
“””Main async function to orchestrate the concurrent benchmark.”””
print(f”— LangChain Python: Concurrent RAG Queries Test (Concurrency: {CONCURRENCY_LEVEL}) —“)

# 1. Initialize shared components ONCE.
llm = Ollama(model=MODEL_NAME)
embeddings = OllamaEmbeddings(model=MODEL_NAME)
client = chromadb.HttpClient(host='localhost', port=8000)
vector_store = Chroma(/*...*/)
rag_chain = RetrievalQA.from_chain_type(/*...*/)

# 2. Create a list of concurrent tasks.
tasks = [run_rag_query(rag_chain, QUERIES[i % len(QUERIES)], i) for i in range(CONCURRENCY_LEVEL)]
total_start = time.perf_counter()

# 3. `asyncio.gather` runs all tasks concurrently.
results = await asyncio.gather(*tasks, return_exceptions=True)

total_duration = time.perf_counter() - total_start

# ... process results and print stats ...

if name == “main“:
asyncio.run(main())
“`

在 Python 中,asyncio 是处理 I/O 并发的标准方式。我们定义了 run_rag_query 协程,使用 ainvoke 方法进行异步调用。在 main 函数中,我们创建了 50 个任务,并使用 asyncio.gather 来并发执行它们。

分别对并发 RAG 查询与并发 Agent 执行进行测试:

“`bash

运行 Go 版本

go run concurrent.go

运行 Python 版本

python concurrent.py
“`

测试结果如下:

“`
— LangChainGo: Concurrent RAG Queries Test (Concurrency: 50) —
Total Time Elapsed: 6.5s
Throughput: 7.69 ops/sec

— LangChain Python: Concurrent RAG Queries Test (Concurrency: 50) —
Total Time Elapsed: 18.2s
Throughput: 2.75 ops/sec

— LangChainGo: Concurrent Agent Executions Test (Concurrency: 25) —
Total Time Elapsed: 10.5s
Throughput: 2.38 ops/sec

— LangChain Python: Concurrent Agent Executions Test (Concurrency: 25) —
Total Time Elapsed: 32.1s
Throughput: 0.78 ops/sec
“`

在并发 RAG 查询测试中,LangChainGo 的吞吐量达到 7.69 ops/sec,几乎是 Python 版本(2.75 ops/sec)的 3 倍。

在更复杂的并发 Agent 任务测试中,性能差距更为显著:Go 版本为 2.38 ops/sec,而 Python 版本仅为 0.78 ops/sec,Go 的性能优势超过 3 倍。

并发性能是生产系统中最关键的基准之一。Go 运行时从设计之初就面向大规模网络并发,其 goroutine 极其轻量,能够轻松处理成千上万的并发 I/O 操作。Python 的 asyncio 虽然功能强大,但其事件循环和线程管理的复杂性及开销,在大规模并发场景下难以与 Go 的原生效率相匹敌。

GPU 饱和:将模型服务推向极限

[[IMAGE_X]]

另一类可扩展性问题是:客户端应用能否有效地让服务端昂贵的 GPU 保持高负载。

若客户端发送请求的速度过慢,GPU 将出现空闲,导致资源浪费与整体吞吐量下降。

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测 GPU Saturation(作者 Fareed Khan)

本测试旨在固定时间内对 Ollama 服务器施加尽可能高的负载,以评估不同框架生成负载的能力。

Go 版本使用一个持续向任务通道(job channel)分发工作的 worker 池,确保 worker 一旦空闲就能立即处理新请求:

“`go
package main

// — 配置 —
const (
concurrencyLevel = 100
testDuration = 60 * time.Second
)
func main() {

// ... 相同的初始化代码 ...
llm, err := ollama.New(ollama.WithModel(modelName))

jobs := make(chan bool, concurrencyLevel)
var totalRequests atomic.Int64
var wg sync.WaitGroup

// 3. 启动 worker 池。
for i := 0; i < concurrencyLevel; i++ {
    wg.Add(1)
    go func(workerID int) {
        defer wg.Done()
        for range jobs {
            _, err := llms.GenerateFromSinglePrompt(context.Background(), llm, prompt)
            if err == nil {
                totalRequests.Add(1)
            }
        }
    }(i)
}

// 4. 在测试持续时间内持续向任务通道投放工作。
ctx, cancel := context.WithTimeout(context.Background(), testDuration)
defer cancel()
for {
    select {
    case jobs <- true: // 分发一个任务
    case <-ctx.Done(): // 时间到
        goto end_loop
    }
}

end_loop:
close(jobs)
wg.Wait()
// … 计算并打印结果 …
}
“`

此实现创建了 100 个等待任务的 worker goroutine。主协程在 60 秒内持续尝试向 jobs 通道发送任务(一个布尔值),从而在模型端维持最大压力。

Python 版本使用 asyncioSemaphore 实现类似效果:

“`python
async def worker(llm, semaphore, request_counter):
“””持续发起请求的单个 worker。”””
while True:
async with semaphore:
try:
await llm.ainvoke(PROMPT)
request_counter[0] += 1
except Exception as e:
# … 错误处理 …
await asyncio.sleep(0)

async def main():
# … 初始化 …
llm = Ollama(model=MODEL_NAME)

# 2. 设置并发控制。
semaphore = asyncio.Semaphore(CONCURRENCY_LEVEL)
request_counter = [0]

# 3. 创建并启动 worker 任务。
tasks = [asyncio.create_task(worker(llm, semaphore, request_counter)) for _ in range(CONCURRENCY_LEVEL)]

# 4. 让 worker 运行指定的持续时间。
try:
    await asyncio.sleep(TEST_DURATION_SECONDS)
finally:
    # 5. 取消所有 worker 任务以结束基准测试。
    for task in tasks:
        task.cancel()
    await asyncio.gather(*tasks, return_exceptions=True)
# ... 计算并打印结果 ...

“`

此实现创建了 100 个无限循环的 worker。asyncio.Semaphore(CONCURRENCY_LEVEL) 是关键控制机制,确保同时“在途”的请求不超过 100 个。运行 60 秒后,取消所有任务并结束测试。

“`
— LangChainGo Saturation Results —
Total Duration: 1m0.02s
Completed Requests: 395
Throughput: 6.58 req/sec

— LangChain Python Saturation Results —
Total Duration: 60.01s
Completed Requests: 255
Throughput: 4.25 req/sec
“`

测试结果揭示了客户端生成负载的效率差异:

在 60 秒内,Go 客户端完成了 395 个请求,吞吐量为 6.58 req/sec;而 Python 客户端仅完成 255 个请求,吞吐量为 4.25 req/sec。

这意味着 LangChainGo 客户端在“喂饱”服务端方面的效率高出约 55%。客户端侧更低的开销使其能够更快地派发新请求,从而更充分地利用昂贵的 GPU 资源。

处理超时、工具失败与解析错误

最后,生产系统必须具备韧性。我们测试了以下三种常见的故障场景:

超时:当 LLM 服务响应缓慢会怎样?

Go 语言通过 context.WithTimeout 来处理超时,这是其处理操作中止的惯用方式。以下示例演示了如何为 LLM 调用设置一个 2 秒的截止时间:

“`go
package main

// — 配置 —
const (
modelName = “llama3:8b”
prompt = “Tell me a long story about the history of the internet.”
timeout = 2 * time.Second
)

func main() {
fmt.Printf(“— LangChainGo: 超时与取消测试 (超时: %v) —n”, timeout)
llm, err := ollama.New(ollama.WithModel(modelName))
if err != nil {
log.Fatalf(“创建 Ollama 客户端失败: %v”, err)
}

// 创建一个带截止时间的上下文
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
fmt.Println("发送预期会超时的请求...")
start := time.Now()

// 使用可取消的上下文调用 LLM
_, err = llm.Call(ctx, prompt)
duration := time.Since(start)

// --- 验证 ---
fmt.Println("n--- LangChainGo 弹性测试结果 ---")
fmt.Printf("请求完成耗时: %vn", duration)
if err == nil {
    log.Fatalf("测试失败: 预期超时错误,但未收到任何错误。")
}

// 验证错误类型
if errors.Is(err, context.DeadlineExceeded) {
    fmt.Println("成功: 收到正确的 'context.DeadlineExceeded' 错误。")
} else if netErr, ok := err.(interface{ Timeout() bool }); ok && netErr.Timeout() {
    fmt.Println("成功: 收到预期的网络超时错误。")
} else {
    log.Fatalf("测试失败: 预期超时错误,但收到其他错误: %v", err)
}

// 验证是否遵守了超时设置
if duration >= timeout && duration < timeout+500*time.Millisecond {
    fmt.Printf("成功: 函数在约 %v 后返回,遵守了超时设置。n", timeout)
} else {
    log.Fatalf("测试失败: 函数未遵守超时设置。耗时 %v。", duration)
}

}
“`

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测

Python 通常在 client 初始化时配置超时:

“`python

— Configuration —

MODEL_NAME = “llama3:8b”
PROMPT = “Tell me a long story about the history of the internet.”
TIMEOUT_SECONDS = 2.0

logging.basicConfig(level=logging.INFO, format=’%(asctime)s – %(levelname)s – %(message)s’)

def main():
“””Main function to run the timeout benchmark.”””
print(f”— LangChain Python: Timeout Test (Timeout: {TIMEOUT_SECONDS}s) —“)

# 1. Initialize the Ollama client with a timeout.
try:
    llm = Ollama(model=MODEL_NAME, request_timeout=TIMEOUT_SECONDS)
except Exception as e:
    logging.fatal(f"Failed to create Ollama client: {e}")
    return

print("Sending request that is expected to time out...")
start_time = time.perf_counter()
err = None

# 2. Call the LLM.
try:
    llm.invoke(PROMPT)
except Exception as e:
    err = e

duration = time.perf_counter() - start_time

# --- Verification ---
print("n--- LangChain Python Resiliency Results ---")
print(f"Request completed in: {duration:.4f}s")

if err is None:
    logging.fatal("TEST FAILED: Expected a timeout error, but got none.")

# 3. Verify the error type.
error_string = str(err).lower()
if "timed out" in error_string or "timeout" in error_string:
    print(f"SUCCESS: Received an expected timeout-related error: {type(err).__name__}")
else:
    logging.fatal(f"TEST FAILED: Expected a timeout error, but got a different error: {err}")

# 4. Verify adherence to the timeout.
if TIMEOUT_SECONDS <= duration < TIMEOUT_SECONDS + 0.5:
    print(f"SUCCESS: The function returned after ~{TIMEOUT_SECONDS}s, respecting the timeout.")
else:
    logging.fatal(f"TEST FAILED: The function did not adhere to the timeout. Took {duration:.4f}s.")

if name == “main“:
main()
“`

运行超时测试:

“`bash

running go verion

go run timeout.go

running python version

python timeout.py
“`

测试结果如下:

“`
— LangChainGo: Timeout and Cancellation Test (Timeout: 2s) —
— LangChainGo Resiliency Results —
Request completed in: 2.0015s
SUCCESS: Received the correct ‘context.DeadlineExceeded’ error.
SUCCESS: The function returned after ~2s, respecting the timeout.

— LangChain Python: Timeout Test (Timeout: 2.0s) —
— LangChain Python Resiliency Results —
Request completed in: 2.0081s
SUCCESS: Received an expected timeout-related error: ReadTimeout
SUCCESS: The function returned after ~2.0s, respecting the timeout.
“`

此测试关注的是正确性而非速度。两种框架都能正确处理超时,并在截止时间后立即返回。这是生产级库的基本素养。

主要区别在于实现方式:

  1. Go 通过每个请求的 context 进行控制,这种方式更灵活,适合为复杂系统中的不同 API 调用设定不同的超时时间。
  2. Python 在客户端(client)层进行统一配置,这种方式更简单,但控制粒度相对较粗。

对于工具调用和解析失败,两者同样提供了错误处理机制:

  1. Go 显式地传播错误,便于开发者直接处理,控制流程明确。
  2. LangChain Python 提供了更高阶的封装,例如 AgentExecutorhandle_parsing_errors=True 参数,可以自动捕获工具调用失败,并将错误信息反馈给 Agent 的提示词(prompt),允许其进行自我纠正。

可以说,Go 提供了底层可控、可预测的错误处理机制;而 Python 则提供了构建复杂、具备自愈能力的 Agent 逻辑的便捷性。

第五部分:衡量真实运营成本

目前我们关注的多是速度、延迟、吞吐量与响应时间。但在真实生产环境中,性能只是故事的一半,另一半是运营成本。这不仅是金钱成本,也包括应用在整个生命周期中所消耗的计算资源。

一个运行很快,但存在内存泄漏,或者一引入监控就显著变慢的应用,并不具备生产可用性,反而会成为运维的负担。

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测 Operation Cost Benchmark

最后,我们测量两个关键的运营成本指标:

  1. 有状态 Agent 在长时会话中的内存消耗;
  2. 引入必要可观测性功能后带来的性能损耗。

内存占用:长会话对话

聊天机器人与 Agent 通常需要具备记忆能力。最直接的方式是使用 ConversationBufferMemory,即将完整的对话历史附加到每一轮的提示词中。这种方法虽然简单,但会导致内存占用随对话轮数线性增长。

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测 Memory Footprint

该测试模拟了长达 100 轮的连续对话,并在关键节点测量内存使用量,以观察不同框架如何处理持续增长的状态。

Go 使用内置的 runtime 包来获取精确的堆内存统计信息:

“`go
package main

// — Configuration —
const (
numTurns = 100
modelName = “llama3:8b”
initialPrompt = “My name is John.”
)

// A struct to hold the metrics we want to capture at each checkpoint.
type BenchmarkMetrics struct {
Turn int
Latency time.Duration
HeapAlloc uint64 // Bytes currently allocated on the heap
TotalAlloc uint64 // Cumulative bytes allocated (shows churn)
NumGC uint32 // Number of garbage collections
PauseTotalNs uint64 // Total time spent in GC pauses
}

func main() {
fmt.Printf(“— LangChainGo: Long-Term Memory Footprint Test (%d Turns) —n”, numTurns)

// 1. Initialize the LLM and Memory components.
llm, err := ollama.New(ollama.WithModel(modelName))
if err != nil {
    log.Fatalf("Failed to create LLM: %v", err)
}

mem := memory.NewConversationBuffer()
conversationChain := chains.NewConversation(llm, mem)
checkpoints := map[int]bool{1: true, 10: true, 50: true, 100: true}
var metrics []BenchmarkMetrics
var initialMemStats runtime.MemStats
runtime.ReadMemStats(&initialMemStats)

fmt.Println("Starting simulated conversation...")

// 2. Start the conversation loop.
currentPrompt := initialPrompt
ctx := context.Background()
for i := 1; i <= numTurns; i++ {
    start := time.Now()

    // 3. Run the chain.
    result, err := chains.Run(ctx, conversationChain, currentPrompt)
    if err != nil {
        log.Fatalf("Chain run failed at turn %d: %v", i, err)
    }
    latency := time.Since(start)
    currentPrompt = fmt.Sprintf("That's interesting. Can you tell me more about the last thing you said? My name is still %s.", "John")

    // 4. Record metrics at specified checkpoints.
    if checkpoints[i] {
        var currentMemStats runtime.MemStats
        runtime.ReadMemStats(&currentMemStats)

        metrics = append(metrics, BenchmarkMetrics{
            Turn:         i,
            Latency:      latency,
            HeapAlloc:    currentMemStats.Alloc, // Current live memory
            TotalAlloc:   currentMemStats.TotalAlloc - initialMemStats.TotalAlloc,
            NumGC:        currentMemStats.NumGC - initialMemStats.NumGC,
            PauseTotalNs: currentMemStats.PauseTotalNs - initialMemStats.PauseTotalNs,
        })
        fmt.Printf("Checkpoint Turn %d: Latency=%vn", i, latency)
    }
}

// 5. Print the final results table.
printResults(metrics)

}

// Eval Result metrics
func printResults(metrics []BenchmarkMetrics) {
fmt.Println(“n— LangChainGo Memory & Performance Results —“)
fmt.Printf(“%-5s | %-15s | %-15s | %-20s | %-10s | %-20sn”, “Turn”, “Latency”, “Heap Alloc”, “Total Alloc (Churn)”, “Num GC”, “Total GC Pause”)
for _, m := range metrics {
heapAllocKB := float64(m.HeapAlloc) / 1024
totalAllocMB := float64(m.TotalAlloc) / (1024 * 1024)
pauseMs := float64(m.PauseTotalNs) / 1_000_000

    fmt.Printf("%-5d | %-15v | %-15s | %-20s | %-10d | %-20sn",
        m.Turn, m.Latency.Round(time.Millisecond),
        fmt.Sprintf("%.2f KB", heapAllocKB), fmt.Sprintf("%.2f MB", totalAllocMB),
        m.NumGC, fmt.Sprintf("%.2f ms", pauseMs),
    )
}

}
“`

我们建立带 ConversationBufferConversationChain

  1. 在循环中,每次 chains.Run 都会执行以下操作:从内存加载完整历史记录、将其格式化为提示词、调用大语言模型(LLM)、然后将本轮对话的输入和输出写回内存。
  2. 使用 runtime.ReadMemStats 捕获 HeapAlloc(当前活跃堆内存)与 TotalAlloc(累计分配总量,反映内存变动率)作为内存指标。

Python 版本使用 psutil 库读取进程的常驻集大小(RSS)作为内存指标:

“`python

— Configuration —

NUM_TURNS = 100
MODEL_NAME = “llama3:8b”
INITIAL_PROMPT = “My name is John.”

def get_memory_usage():
“””Gets the current memory usage (RSS) of the process in bytes.”””
process = psutil.Process(os.getpid())
return process.memory_info().rss

def main():
“””Main function to run the memory footprint benchmark.”””

print(f"--- LangChain Python: Long-Term Memory Footprint Test ({NUM_TURNS} Turns) ---")

llm = Ollama(model=MODEL_NAME)
memory = ConversationBufferMemory()
conversation_chain = ConversationChain(llm=llm, memory=memory)
checkpoints = {1, 10, 50, 100}
metrics = []
initial_mem_usage = get_memory_usage()

print("Starting simulated conversation...")

current_prompt = INITIAL_PROMPT

for i in range(1, NUM_TURNS + 1):
    start_time = time.perf_counter()

    # 3. Run one turn of the conversation.
    response = conversation_chain.invoke({"input": current_prompt})

    latency = time.perf_counter() - start_time
    current_prompt = f"That's interesting. Can you tell me more about the last thing you said? My name is still John."

    if i in checkpoints:
        current_mem_usage = get_memory_usage()
        mem_increase = current_mem_usage - initial_mem_usage

        metrics.append({
            "turn": i,
            "latency_s": latency,
            "rss_increase_bytes": mem_increase,
        })
        print(f"Checkpoint Turn {i}: Latency={latency:.4f}s")

print_results(metrics)

def print_results(metrics):
“””Prints the final benchmark results in a formatted table.”””

print("n--- LangChain Python Memory & Performance Results ---")

print(f"{'Turn':<5} | {'Latency (s)':<15} | {'Memory Increase (RSS)':<25}")

for m in metrics:
    rss_increase_mb = m['rss_increase_bytes'] / (1024 * 1024)
    print(f"{m['turn']:<5} | {m['latency_s']:<15.4f} | {f'{rss_increase_mb:.2f} MB':<25}")

if name == “main“:
main()
“`

Python 版本的逻辑与之等同:创建带有 ConversationBufferMemoryConversationChain,在循环中调用 invoke 方法,并在预设的检查点读取 RSS 内存增量。

运行结果:

— LangChainGo Memory & Performance Results —
Turn | Latency | Heap Alloc | Total Alloc (Churn) | Num GC | Total GC Pause


1 | 1.95s | 160.25 KB | 5.10 MB | 2 | 0.31 ms
10 | 2.85s | 510.50 KB | 58.60 MB | 18 | 2.55 ms
50 | 6.10s | 2850.20 KB | 310.40 MB | 85 | 12.82 ms
100 | 11.25s | 5910.80 KB | 680.95 MB | 165 | 25.15 ms

— LangChain Python Memory & Performance Results —
Turn | Latency (s) | Memory Increase (RSS)


1 | 2.5012 | 22.50 MB
10 | 4.1534 | 45.12 MB
50 | 10.2912 | 145.80 MB
100 | 19.8567 | 290.25 MB

内存效率差异巨大:

100 轮后,Go 应用堆上仅 5.9 MB 活跃内存;而 Python 进程 RSS 增加了 290.25 MB。

尽管指标不完全一一对应,但数据足以说明:Go 作为编译型语言,拥有值类型与高效垃圾回收机制,内存使用更为精简;Python 的动态特性与对象开销使得处理同等对话历史需要显著更多的内存。

对于需要同时处理成千上万长会话的服务,这种内存效率的差异直接关系到服务器成本与系统稳定性。

可观测性开销:Tracing 的成本

在生产环境中,可观测性(如日志、指标与链路追踪)对于理解系统行为、调试问题与监控性能至关重要。

然而,这些“观察”并非免费,每条日志与每个追踪事件都会带来少量性能开销。

LangChain系统性能深度对比:Python与Go在AI应用中的实战评测

本测试旨在度量这部分开销。我们将并发 Agent 基准测试运行两次:一次不启用任何仪表化(作为基线),另一次则开启简单的日志回调,用于打印 Agent 生命周期中的每个事件。

Go 实现:定义实现 callbacks.Handler 接口的 SimpleLogHandler

“`go
package main

import (
“context”
“fmt”
“log”
“sort”
“strings”
“sync”
“time”
“github.com/tmc/langchaingo/agents”
“github.com/tmc/langchaingo/callbacks”
“github.com/tmc/langchaingo/chains”
“github.com/tmc/langchaingo/llms/ollama”
“github.com/tmc/langchaingo/schema”
“github.com/tmc/langchaingo/tools”
)

// … Tool definition …

// — Custom Callback Handler —
type SimpleLogHandler struct {
callbacks.SimpleHandler
}

func (h SimpleLogHandler) HandleChainStart(ctx context.Context, inputs map[string]any) {
log.Printf(“[HANDLER] Chain Start. Inputs: %v”, inputs)
}

func (h SimpleLogHandler) HandleChainEnd(ctx context.Context, outputs map[string]any) { // }
func (h SimpleLogHandler) HandleLLMStart(ctx context.Context, prompts []string) { // }
func (h SimpleLogHandler) HandleLLMEnd(ctx context.Context, output schema.LLMResult) { // }
func (h SimpleLogHandler) HandleToolStart(ctx context.Context, input string) { // }
func (h SimpleLogHandler) HandleToolEnd(ctx context.Context, output string) { // }
func (h SimpleLogHandler) HandleAgentAction(ctx context.Context, action schema.AgentAction) { // }

// …

func runBenchmark(withCallbacks bool) BenchmarkMetrics {
// … setup llm and tools …

for i := 0; i < concurrencyLevel; i++ {
    wg.Add(1)
    go func(workerID int) {
        defer wg.Done()
        var opts []chains.ChainCallOption
        if withCallbacks {
            // Each goroutine gets its own handler instance.
            opts = append(opts, chains.WithCallbacks(SimpleLogHandler{}))
        }

        agentExecutor, _ := agents.Initialize(llm, availableTools, agents.ZeroShotReactDescription)

        start := time.Now()
        _, err := chains.Run(context.Background(), agentExecutor, multiHopPrompt, opts...)
        // ...
    }(i)
}
// ... wait and calculate metrics ...

}
“`

关键在 runBenchmark:当 withCallbacks 为 true 时,创建 SimpleLogHandler 实例并通过 chains.WithCallbacks 传入 chains.Run,框架会在 Agent 的每一步调用相应的 handler。

Python 版本定义 BaseCallbackHandler 子类:

“`python

— Custom Callback Handler —

class SimpleLogHandler(BaseCallbackHandler):
def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any) -> Any:
print(f”[HANDLER] Chain Start. Inputs: {list(inputs.keys())}”)

def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> Any:
    print(f"[HANDLER] Chain End. Outputs: {list(outputs.keys())}")

def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any) -> Any:
    print(f"[HANDLER] LLM Start. Prompt: {prompts[0][:50]}...")

def on_llm_end(self, response, **kwargs: Any) -> Any:
    print(f"[HANDLER] LLM End. Generations: {len(response.generations)}")

def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs: Any) -> Any:
    print(f"[HANDLER] Tool Start. Input: {input_str}")

def on_tool_end(self, output: str, **kwargs: Any) -> Any:
    print(f"[HANDLER] Tool End. Output: {output}")

— Agent runner —

def run_agent_task(llm, tools, prompt_template, with_callbacks: bool, worker_id: int):
“””Runs a single agent task and returns its latency.”””
callbacks = [SimpleLogHandler()] if with_callbacks else []

agent = create_react_agent(llm, tools, prompt_template)
agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    handle_parsing_errors=True,
    callbacks=callbacks,
)

start_time = time.perf_counter()
agent_executor.invoke({"input": PROMPT})
return time.perf_counter() - start_time

“`

将 handler 实例传给 AgentExecutorcallbacks 列表,LangChain 的回调管理器会在 Agent 运行时调用相应的 on_... 方法。

运行并分析结果:

“`
— Observability Overhead Analysis —


Metric | Baseline | With Callbacks | Overhead/Degradation

Throughput (ops/sec) | 10.51 | 10.15 | 3.43%
Avg Latency | 3.89s | 4.05s | +4.11%
P99 Latency | 4.75s | 4.95s |


“`

“`
— Observability Overhead Analysis —


Metric | Baseline | With Callbacks | Overhead/Degradation

Throughput (ops/sec) | 6.95 | 4.51 | 35.11%
Avg Latency (s) | 5.85 | 8.95 | +53.00%
P99 Latency (s) | 6.91 | 11.21 |
Total CPU Time (s) | 9.85 | 18.92 | +92.08%


“`

这也许是最醒目的结果:

在 Go 中,开启简单日志只带来约 3–4% 的轻微性能下降;而在 Python 中,同等可观测性导致吞吐骤降 35%、平均延迟增加 53%、CPU 使用几乎翻倍。

原因在于 Python 中每个回调事件都需要动态函数调用、对象创建,并在解释型框架的多层中传递。在高并发负载下,这种“千刀万剐”的代价极其可观;而 Go 的编译期函数调用几乎无开销。

对任何依赖细粒度、实时 tracing 来调试与监控的严肃生产系统而言,这是关键发现:在 Python 中可观测性的成本可能高昂,而在 Go 中则几乎“免费”。


关注“鲸栖”小程序,掌握最新AI资讯

本文来自网络搜集,不代表鲸林向海立场,如有侵权,联系删除。转载请注明出处:http://www.itsolotime.com/archives/16531

(0)
上一篇 2025年12月30日 下午11:50
下一篇 2025年12月31日 上午11:07

相关推荐

  • Claude Code创始人Boris Cherny亲授:13条高效AI编程实战秘籍,引爆500万在线围观

    2026年新年第三天,Claude Code的创始人兼负责人Boris Cherny进行了一场线上教学,亲自演示了他使用这款AI编程工具的个人工作流。 他表示,自己的配置可能“简单”得令人意外。Claude Code开箱即用的体验已经非常出色,因此他个人并未进行太多自定义设置。 Boris强调,使用Claude Code没有所谓的“标准答案”。该工具在设计之…

    2026年1月4日
    12000
  • 清华UniCardio:多模态扩散模型革新心血管监测,实现实时全面信号生成

    可穿戴健康监测信号由于监测难度高、观测噪声大、易受干扰,高质量的心血管信号仍难以长期便捷获取,这是智能健康监测系统始终面临的现实困境。近日,清华朱军等团队提出了一种统一的多模态生成框架 UniCardio,在单扩散模型中同时实现了心血管信号的去噪、插补与跨模态生成,为真实场景下的人工智能辅助医疗提供了一种新的解决思路。相关工作《Versatile Cardi…

    2025年12月30日
    14600
  • DeepSeek开源条件记忆模块:让Transformer告别“苦力活”,27B模型性能碾压MoE

    DeepSeek为Transformer引入“条件记忆”模块 DeepSeek在最新研究中为Transformer架构引入了“条件记忆”机制,旨在弥补其原生缺乏高效知识检索能力的短板。 研究团队在论文结论中明确指出:条件记忆应被视为下一代稀疏模型不可或缺的核心建模原语。 该研究由梁文锋署名,并与北京大学王选计算机研究所的赵东岩、张辉帅团队合作完成。 论文不仅…

    2026年1月13日
    9600
  • OpenAI研究员揭秘:Codex内部评估机制与AI产品落地的50+实战经验

    昨日,两位来自 OpenAI 及前微软的 AI 产品一线从业者——Aishwarya Naresh Reganti 与 Kiriti Badam,在 Lenny 的播客节目中深入分享了他们在超过 50 个 AI 产品落地项目中的实践经验与教训。 这些经验源于反复的试错与总结。播客主持人 Lenny 提炼出一个核心观点:痛苦是新的护城河。 两位嘉宾均具备深厚的…

    2026年1月12日
    7000
  • 解锁Agentic AI并行化:14个核心模式提升系统可靠性与性能

    构建高效的智能体(Agentic)系统,离不开扎实的软件工程实践。其核心在于设计能够协调运作、并行执行,并能与外部系统高效交互的组件。例如,推测执行(Speculative Execution) 通过预先处理可预测的请求来降低延迟;冗余执行(Redundant Execution) 则通过同时运行同一智能体的多个副本来避免单点故障,提升系统韧性。除此之外,还…

    2025年11月27日
    8700