打破推理与训练割裂!Uni-Agent:统一框架让智能体规模化构建、运行与强化学习训练一气呵成

当前 AI Agent 领域面临着一个颇为尴尬的割裂局面:推理阶段依赖一套框架,训练阶段又得换用另一套框架。这两者之间的数据管道、环境抽象以及工具接口,几乎完全不兼容。

这意味着,研究人员在验证了一个 Agent 的推理能力后,若想借助强化学习进一步优化它,就不得不从头重写所有的交互逻辑。这个过程不仅耗时巨大,还极易引入各种意想不到的不一致性 Bug。

为了解决这一根本性矛盾,Uni-Agent 应运而生。它提出了一套统一交互栈(Unified Interaction Stack),确保 Agent 在执行推理和进行强化学习训练时,走的是完全相同的代码路径。

其核心设计哲学是三层解耦:model 负责思考,tool 负责感知与行动,env 负责状态保持。这三者既可以独立替换,又能实现无缝组合。凭借异步信号量的并发控制机制,Uni-Agent 已经在超过 1000 个并发任务的规模下稳定运行,并在 SWE-Bench-Verified 上取得了 67.7 的优异成绩

下图展示了 Uni-Agent 智能体系统的整体架构,它清晰地分为交互运行与模型训练两大模块。上层的 Agent Interaction System 构成了核心的执行闭环:Agent Chat Model(集成了多种主流大模型)调用 Tools Pool 中的开发工具、浏览器等能力,去识别或修改 Environment(如监控面板、云服务、容器环境)。环境的状态变化会反馈回模型,支撑其进行后续的决策迭代。下层的 Training System(以 verl 框架为例)则负责模型优化:任务完成后,Reward System 会生成奖励信号,通过 Message Queue 传递给 Training Engine,引擎基于这些数据更新 Agent 模型的权重,从而让智能体在持续交互与训练中不断提升决策能力,形成从执行到反馈再到迭代优化的完整链路。

Uni-Agent 的轻量级监控面板,顶部清晰呈现了任务的整体状态:共 500 个任务,其中运行中 64 个、排队 339 个、已完成 97 个,直观地反映了任务队列与执行进度。左侧面板列出了所有活跃任务的 ID 及当前执行步骤,中间与右侧的四格日志面板则分别展示不同智能体的完整交互链路。这些链路包含了“思考(Thought)”、“行动(Action,如执行 Python 脚本、运行 pytest 测试指令)”、“观测(Observation,如代码报错信息、弃用警告、Token 消耗数据)”等内容,完整还原了智能体的决策、指令执行与环境反馈过程,帮助开发者实时追踪智能体在开发调试任务中的运行状态,快速定位问题节点。更多信息可参考:Uni-Agent 轻量级监控面板[1]

unsetunset本文目录unsetunset

  • 快速上手
  • 一、项目结构全景
  • 二、架构总览与核心数据流
  • 2.1 三层解耦的设计哲学
  • 2.2 一次完整交互的数据流
  • 三、统一入口:UniAgentLoop 的编排逻辑
  • 3.1 并发控制与资源隔离
  • 3.2 核心执行流程
  • 3.3 异常轨迹的优雅降级
  • 四、交互引擎:AgentInteraction 的多轮循环
  • 4.1 StepOutput 数据模型
  • 4.2 单步交互的五阶段流水线
  • 4.3 超时预算机制
  • 五、Rollout Cache:训练数据的实时累积
  • 5.1 核心数据结构
  • 5.2 模型生成时的累积
  • 5.3 环境反馈时的累积
  • 5.4 Message Boundary Tokens 的精确计算
  • 六、环境抽象:AgentEnv 与多后端部署
  • 6.1 统一环境接口
  • 6.2 部署后端的类型判别联合体
  • 6.3 RemoteRuntime:可靠的 HTTP 通信层
  • 6.4 健壮的超时与中断处理
  • 七、工具系统:从注册到执行
  • 7.1 注册表模式
  • 7.2 工具安装流程
  • 7.3 Tool Call 翻译为 Bash
  • 7.4 多格式 Tool Parser
  • 八、Reward 系统:环境内验证
  • 8.1 注册表与抽象基类
  • 8.2 SWE-Bench Reward:真实测试用例驱动
  • 九、auto_await:同步异步无缝桥接
  • 十、双模型抽象:训练与纯推理的统一
  • 10.1 AgentChatModel vs OpenAICompatibleChatModel
  • 10.2 OpenAI 兼容模型的 Tool Call 处理
  • 十一、并发模型与性能工程
  • 11.1 全链路异步架构
  • 11.2 关键性能数据
  • 十二、结果保存与可观测性
  • 12.1 交互结果的持久化
  • 12.2 Dashboard 的状态推断
  • 总结与展望

unsetunset快速上手unsetunset

# 克隆仓库并初始化子模块
git clone https://github.com/verl-project/uni-agent.git
cd uni-agent
git submodule update --init --recursive

# 安装 verl(作为 Python 包)
pip install --no-deps -e ./verl

# 安装核心依赖
pip install swe-rex loguru pydantic pydantic_settings aiohttp

安装完成后,可以直接启动一个 Agent 环境并运行 demo 脚本。关于更多场景(如搜索 Agent 构建、并行推理、RL 训练)的详细配置,请参考官方文档[2]。

启动实时监控面板,执行以下命令:

python -m dashboard.server --log-dir /tmp/swebench_qwen3_coder --port 8765

关于 Dashboard 的远程端口转发与绑定配置细节,请查阅 dashboard/README.md[3] 文档。

一、全景项目结构解析

在深入代码细节前,我们先通过目录树来直观理解项目的整体组织逻辑:

uni-agent/  
├── uni_agent/                  # 核心 Python 包  
│   ├── agent_loop.py           # 统一入口:UniAgentLoop(连接 verl 训练)  
│   ├── async_logging.py        # 异步日志(按 run_id 隔离)  
│   ├── utils.py                # auto_await 等实用工具  
│   ├── interaction/            # 交互引擎层  
│   │   ├── interaction.py      #   多轮交互循环 AgentInteraction  
│   │   ├── model.py            #   模型抽象(AgentChatModel / OpenAI 兼容)  
│   │   ├── env.py              #   环境抽象 AgentEnv(动作执行)  
│   │   ├── tools_manager.py    #   工具管理与调用翻译  
│   │   └── tool_parser.py      #   工具调用格式解析器(XML/Hermes)  
│   ├── tools/                  # 具体工具实现  
│   │   ├── base.py             #   AbstractTool 基类  
│   │   ├── registry.py         #   工具注册表  
│   │   ├── execute_bash/       #   Bash 执行工具  
│   │   ├── str_replace_editor/ #   文件编辑工具  
│   │   ├── search/             #   代码搜索工具  
│   │   ├── search_arxiv/       #   arXiv 论文搜索  
│   │   ├── finish/             #   完成标记  
│   │   └── submit/             #   提交标记  
│   ├── deployment/             # 环境部署后端  
│   │   ├── config.py           #   统一部署配置(类型判别联合体)  
│   │   ├── remote_runtime.py   #   远程运行时 HTTP 客户端  
│   │   ├── host/               #   宿主机直执模式  
│   │   ├── local/              #   本地容器沙箱(Docker/Apptainer)  
│   │   ├── modal/              #   Modal 云平台  
│   │   └── vefaas/             #   火山引擎 FaaS  
│   └── reward/                 # 奖励计算模块  
│       ├── base.py             #   AbstractRewardSpec 基类  
│       ├── registry.py         #   奖励函数注册表  
│       ├── swe_bench.py        #   SWE-Bench 评测奖励  
│       ├── swe_bench_live.py   #   SWE-Bench Live 变体  
│       ├── swe_rebench.py      #   SWE-ReBench 变体  
│       ├── r2e_gym.py          #   R2E-Gym 训练奖励  
│       └── search.py           #   搜索 Agent 奖励  
├── examples/                   # 示例脚本  
│   ├── agent_env/              #   启动环境 demo  
│   ├── agent_interaction/      #   并行交互 demo  
│   ├── agent_train/            #   RL 训练脚本  
│   ├── search_agent/           #   搜索 Agent 训练  
│   ├── search_arxiv/           #   arXiv Agent demo  
│   └── data_preprocess/        #   数据预处理  
├── dashboard/                  # 实时监控面板(JS/CSS/Python server)  
├── verl/                       # Git 子模块 → volcengine/verl  
├── docs/                       # Sphinx 文档源码  
└── pyproject.toml              # 包元信息与依赖  

二、架构总览与核心数据流

2.1 三层解耦的设计理念

Uni-Agent 的整体设计可通过以下架构图来直观呈现:

图中清晰展示了三大核心模块的职责划分:

  • Model(推理引擎):该模块封装了与大语言模型(LLM)的推理交互。它负责维护 rollout cache(包含 token 序列、掩码 mask 以及 logprobs)。在训练模式下,它能无缝对接 verl 框架的 AsyncLLM server;而在纯推理场景中,则兼容任何遵循 OpenAI 接口标准的 API。
  • Tool(感知与执行层):此模块的核心功能是将模型生成的、结构化的工具调用指令(tool call)翻译为具体的 bash 命令。这些工具以可执行的脚本形式被安装到运行容器之中。
  • Env(运行时沙箱):该模块通过 SWE-ReX 协议与容器或远程沙箱进行通信。其关键特性在于能够支持四种不同的部署后端,并实现它们之间的无缝切换。

2.2 一次完整的交互数据流解析

在 Uni-Agent 的框架下,一次完整的智能体交互遵循一个清晰且可循环的数据流。整个流程从 UniAgentLoop 入口开始,经过 AgentInteraction 模块内部的多步迭代,最终产出可用于强化学习训练的结构化输出。

具体流程如下:
1. 查询模型 (Query Model):系统将当前的对话历史与任务指令作为输入,发送给 AgentChat Model 模块。
2. 解析工具调用 (Parse Tool Call):模型返回的响应中,如果包含工具调用指令,AgentInteraction 模块会将其解析为结构化的 Tool Call 请求。
3. 在环境中执行 (Execute in Env):解析后的工具调用被传递给 AgentEnv,在实际的容器、远程主机或本地环境中执行相应的操作(如运行 bash 命令)。
4. 观察与追加 (Observe & Append):环境执行完毕后,会将执行结果(如标准输出、错误信息)作为观察结果(Observation)返回。这个观察结果会被追加到对话历史中,作为下一次 Query Model 的输入。

上述步骤会形成一个循环,持续迭代,直到任务完成(done)或达到预设的最大轮次。当循环结束时,系统会进入以下阶段:

  • 奖励评定 (RewardSpec)RewardSpec 模块会根据环境内预设的评测标准,对智能体的整个交互过程进行打分和评估。
  • 输出格式化 (AgentLoopOutput):最后,AgentLoopOutput 模块会将整个交互过程中的 promptresponsemask 以及 logprobs 等关键数据打包成一个标准化的输出。这个输出可以直接喂给 verl 强化学习训练框架,用于模型的进一步优化。

让我们追踪一次 Agent 从接收任务到生成训练数据的完整运行链路:

用户 Prompt (messages)


┌─ prepare_rollout_cache ─┐ 将 messages 编码为 token 序列
│ prompt_ids = tokenize │ 初始化空的 response_mask/logprobs
└─────────┬───────────────┘

┌─────▼─────┐
│ STEP 1 │◀───────────────────────────────────────┐
├───────────┤ │
│ model.query() │
│ → response_ids (mask=1, logprobs记录) │
│ │
│ tools_manager.parse_action() │
│ → 提取 function name + arguments │
│ │
│ tools_manager.get_tool_bash_command() │
│ → 转换为 bash 命令字符串 │
│ │
│ env.run_action() │
│ → observation 字符串 │
│ │
│ model.append_messages_to_rollout_cache() │
│ → observation 编码追加 (mask=0) │
│ │
│ done? ──No──────────────────────────────────────────┘
│ │
│ Yes
└───┬───┘


┌─ reward_spec.compute_reward() ─┐ 在同一容器中运行测试
│ 例:SWE-Bench 测试脚本执行 │ 并解析测试报告
└────────────┬───────────────────┘


┌─ convert_to_agent_output() ──┐ 切分 prompt/response
│ 拼装 AgentLoopOutput │ 应用 mask_abnormal_exit
│ → 输出给 verl 做 PPO/GSPO │ 截断超长序列
└──────────────────────────────┘

这套流程中最精妙之处在于 rollout cache 的实时累积 ——它使得同一次交互既能完成“推理执行”,又能同步完成“训练数据采集”,完全无需额外的后处理步骤。

三、统一入口:UniAgentLoop 的编排逻辑

3.1 并发控制与资源隔离

UniAgentLoop 充当着 Uni-Agent 与 verl 训练框架之间的桥梁角色。它继承自 verl 的 AgentLoopBase,在强化学习训练的 rollout 阶段被调用:

# 来源:uni_agent/agent_loop.py  
class UniAgentLoop(AgentLoopBase):  
_semaphore: asyncio.Semaphore | None = None  # 类级别共享  

async def run(self, sampling_params: dict[str, Any], **kwargs) -> AgentLoopOutput:  
config_dict = self._init_config(sampling_params, **kwargs)  
# 全局并发按 worker 数均摊  
global_concurrent = config_dict.get("concurrency", 512)  
num_workers = self.config.actor_rollout_ref.rollout.agent.num_workers  
worker_concurrent = max(global_concurrent // num_workers, 1)  
if UniAgentLoop._semaphore is None:  
UniAgentLoop._semaphore = asyncio.Semaphore(worker_concurrent)  

这里的 _semaphore 被定义为类变量,所有 UniAgentLoop 实例共享同一个信号量。假设全局并发上限设置为 512、系统中有 4 个 worker,那么每个 worker 上最多可同时运行 128 个环境。这种机制有效防止了资源过载引发的 OOM 错误或容器调度失败。

3.2 核心执行流程

# 来源:uni_agent/agent_loop.py  
async with self._semaphore:  
try:  
await self.env.start()                           # 启动沙箱  
self.chat_model.set_tools_schemas(...)           # 注入工具 schema  
await self.env.install_tools(self.tools_manager.tools)  # 安装工具到容器  

interaction_result = await self.interaction.run() # 核心交互循环  

# 环境内计算 reward(环境尚未关闭)  
if self.reward_spec is not None:  
reward_score, _ = await self.reward_spec.compute_reward(  
interaction_result=interaction_result,  
)  
interaction_result["reward_score"] = reward_score  

self._save_interaction_result(interaction_result)  
output = await self.convert_to_agent_output(interaction_result)  
except Exception as e:  
output = await self._build_empty_agent_output(exit_reason="agent_loop_failed")  
finally:  
await self.env.close()  # 无论成功失败都关闭环境  
return output  

此处 finally 块确保环境必定被清理——在千级并发场景中,未释放的容器会迅速耗尽集群资源。

3.3 异常轨迹的优雅降级

当 Agent 交互异常退出时,系统不会丢弃这条数据,而是生成一条“空轨迹”并打上遮罩标记:

# 来源:uni_agent/agent_loop.py  
async def _build_empty_agent_output(self, exit_reason: str) -> AgentLoopOutput:  
# 用 pad_token 或 eos_token 填充  
dummy_token_id = getattr(self.tokenizer, "pad_token_id", None)  
if dummy_token_id is None:  
dummy_token_id = getattr(self.tokenizer, "eos_token_id", None)  

extra_fields["traj_masked"] = 1          # 训练时跳过此轨迹  
extra_fields["traj_exit_reason"] = exit_reason  

return AgentLoopOutput(  
response_ids=[dummy_token_id] * dummy_response_length,  
response_mask=[0] * dummy_response_length,  # 全零 → 不参与梯度  
reward_score=0,  
...  
)  

这样一来,训练 batch 的形状一致性得以保证——无需动态 padding 或 batch 大小波动。

四、交互引擎:AgentInteraction 的多轮循环

4.1 StepOutput 数据模型

每一步交互都会产出一个结构化的 StepOutput

# 来源:uni_agent/interaction/interaction.py  
class StepOutput(BaseModel):  
step_idx: int  
response: str = ""        # 模型原始输出  
thought: str = ""         # 思考内容(tool_call 之前的文本)  
action: str = ""          # 转换后的 bash 命令  
observation: str = ""     # 环境返回  
execution_time: float | None = None  
done: bool = False  
exit_reason: str = ""     # finished/token_limit/timeout_error/...

### 4.2 单步交互的五阶段流水线

```python
# 来源:uni_agent/interaction/interaction.py  
async def step(self, step_idx: int):  
step_output = StepOutput(step_idx=step_idx)  

# ── 阶段1:模型推理 ──  
model_output, rollout_cache, generation_info = await self.model.query(  
messages=self.messages, rollout_cache=self.rollout_cache,  
)  

# ── 阶段2:解析 Tool Call ──  
self.messages.append({"role": "assistant", "content": model_output})  
structured_tool_calls = self.rollout_cache.get("extra_fields", {}).get("last_tool_calls", [])  
if structured_tool_calls:  
# OpenAI 原生 tool_calls 格式(推理场景)  
content, tool_calls = await self.tools_manager.parse_structured_action(  
content=model_output, tool_calls_data=structured_tool_calls,  
)  
else:  
# 从文本中解析 XML/Hermes 格式(训练场景)  
content, tool_calls = await self.tools_manager.parse_action(model_output=model_output)  

# ── 阶段3:转换为 Bash 命令 ──  
tool_call = tool_calls[0]  
action_cmd = self.tools_manager.get_tool_bash_command(tool_call)  

# ── 阶段4:环境执行 ──  
observation = await self.env.run_action(action_cmd, action_timeout=self.action_timeout)  
tool_message = {"role": "tool", "content": observation}  
self.messages.append(tool_message)  
self.rollout_cache = await self.model.append_messages_to_rollout_cache(  
[tool_message], self.rollout_cache  
)  

# ── 阶段5:判断终止条件 ──  
if tool_call.function.name in ["finish", "submit"]:  
step_output.done = True  
step_output.exit_reason = "finished"  

整个流程可以直观地表示为:

┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐
│  Model   │───▶│  Parse   │───▶│ Translate│───▶│ Execute  │───▶│  Judge   │
│  Query   │    │ ToolCall │    │ to Bash  │    │  in Env  │    │   Done?  │
└──────────┘    └──────────┘    └──────────┘    └──────────┘    └──────────┘
▲                                                               │
└───────────────────── loop if not done ────────────────────────┘

4.3 超时预算机制

一个精妙的容错设计体现在 timeout_budget 上:

# 来源:uni_agent/interaction/interaction.py  
except ActionTimeoutError as e:  
step_output.exit_reason = "timeout_error"  
if self.timeout_budget > 0:  
self.timeout_budget -= 1  
return step_output  # 继续下一步(给模型纠错机会)  
else:  
step_output.done = True  # 超时次数耗尽,强制终止  
return step_output  

系统默认允许发生 3 次超时——Agent 在前几次超时后会收到错误反馈,从而有机会选择更快速的命令。只有当超时连续发生多次,系统才会判定为“不可恢复”并终止交互。这一机制显著提升了长周期任务的成功率。

unsetunset五、Rollout Cache:训练数据的实时累积unsetunset

5.1 核心数据结构

Rollout Cache 是 Uni-Agent 区别于其他所有 Agent 框架的核心创新。它在交互过程中实时增长,数据结构如下:

rollout_cache = {  
"request_id":       "uuid-...",  
"prompt_ids":       [101, 2035, 8841, ...],   # 持续拼接的完整 token 序列  
"response_mask":    [0, 0, 0, ..., 1, 1, 1, ..., 0, 0, ...],  # 哪些是模型生成  
"response_logprobs": [0.0, 0.0, ..., -0.23, -1.1, ..., 0.0, ...],  
"routed_experts":   [...] | None,   # MoE 路由信息(可选)  
"metrics":          {"generate_sequences": 1.23, ...},  
"extra_fields":     {"traj_masked": 0, ...},  
}

用一个时间轴来理解 mask 的作用:

Token 序列:  [system_prompt] [user_msg] [asst_response_1] [tool_obs_1] [asst_response_2] ...  
Mask:         0 0 0 0 0 0 0   0 0 0 0    1 1 1 1 1 1 1 1   0 0 0 0 0    1 1 1 1 1 1 1 ...  
╰─── prompt ──╯            ╰── 模型决策 ──╯  ╰─ 环境返回 ╯ ╰── 模型决策 ──╯  

梯度计算时,仅针对 mask 值为 1 的位置(即模型真正“做出决策”的 token)生效。

### 5.2 模型生成阶段的累积

```python
# 来源:uni_agent/interaction/model.py
async def query(self, messages, rollout_cache, **kwargs):
prompt_ids = rollout_cache["prompt_ids"]

# 检查上下文长度是否超出限制
if len(prompt_ids) >= self.max_model_len:
raise MaxTokenExceededError(...)

# 调用 LLM 执行生成
token_output = await self.client.generate(
request_id=request_id, prompt_ids=prompt_ids, sampling_params=sampling_params,
)
response_ids = token_output.token_ids

# 累积过程:模型生成的所有 token,其 mask 均设为 1
rollout_cache["prompt_ids"] += response_ids
rollout_cache["response_mask"] += [1] * len(response_ids)
if token_output.log_probs is not None:
rollout_cache["response_logprobs"] += token_output.log_probs

5.3 环境反馈阶段的累积

# 来源:uni_agent/interaction/model.py
async def append_messages_to_rollout_cache(self, new_messages, rollout_cache):
tool_response_ids = await self._get_new_message_ids(new_messages)
# 环境返回的 token,其 mask 设为 0,不参与策略梯度计算
rollout_cache["prompt_ids"] += tool_response_ids
rollout_cache["response_mask"] += [0] * len(tool_response_ids)
if rollout_cache["response_logprobs"]:
rollout_cache["response_logprobs"] += [0.0] * len(tool_response_ids)
return rollout_cache

5.4 Message Boundary Tokens 的精确计算

有一个容易被忽略但极为关键的细节:当工具反馈被追加到 token 序列中时,模板中的分隔符(例如 <|im_start|>tool)同样需要被正确编码。message_boundary_tokens 通过差分法来实现对这些边界 token 的精确计算:

# 来源:uni_agent/interaction/model.py
@cached_property
def message_boundary_tokens(self) -> list[int]:
# 使用 dummy 消息计算“单独编码”与“带上下文编码”之间的差异
standalone_ids = apply_chat_template([dummy_next_message], ...)
with_boundary_ids = apply_chat_template(dummy_history + [dummy_next_message], ...)

# 两者的差集即为边界 token(位于 eos 之后、消息内容之前)
text_before_message_ids = with_boundary_ids[:-len(standalone_ids)]
for i in range(len(text_before_message_ids) - 1, -1, -1):
if text_before_message_ids[i] == eos_id:
return text_before_message_ids[i + 1:]
return []

这一设计确保了 rollout cache 中的 token 序列与“一次性编码整段对话”的结果完全等价,从而避免了训练与推理之间的不一致问题。

六、环境抽象:AgentEnv 与多后端部署

6.1 统一环境接口

AgentEnv 为上层应用屏蔽了底层运行时的差异,无论是本地 Docker、Modal 云环境还是火山引擎 FaaS,均可以通过同一套方法进行调用:

# 来源:uni_agent/interaction/env.py
class AgentEnv:
async def start(self, max_retries=5):    # 启动环境
async def run_action(self, cmd, timeout): # 执行命令并返回观测结果
async def install_tools(self, tools):     # 安装工具脚本
async def read_file(self, path):          # 读取容器内的文件
async def write_file(self, path, content):# 向容器内写入文件
async def close(self):                    # 关闭环境

6.2 部署后端的类型判别联合体

四种部署后端借助 Pydantic 的 discriminated union 机制,实现了零配置的灵活切换:

# 来源:uni_agent/deployment/config.py  
DeployConfig: TypeAlias = Annotated[  
VefaasDeploymentConfig | LocalDeploymentConfig | HostDeploymentConfig | ModalDeploymentConfig,  
Field(discriminator="type"),  
]  

开发者只需在 YAML 配置文件中指定 type: localtype: modal,框架便会自动实例化相应的部署后端。下表对比了各后端的关键参数:

参数 host local modal vefaas
容器镜像 N/A python:3.12 python:3.11 自定义
运行时协议 本地进程 SWE-ReX HTTP SWE-ReX HTTP SWE-ReX HTTP
默认超时 60s 60s 60s 60s
启动超时 120s 180s 180s 120s
容器运行时 N/A apptainer Modal veFaaS

6.3 RemoteRuntime:可靠的 HTTP 通信层

所有远程后端最终都通过 RemoteRuntime 与沙箱进行通信。其 _request 方法内置了面向生产环境的容错机制:

# 来源:uni_agent/deployment/remote_runtime.py  
async def _request(self, endpoint, payload, output_class, num_retries=0, client_error_retries=2):  
request_id = str(uuid.uuid4())  
headers["X-Request-ID"] = request_id  # 幂等性 key  

retry_delay = 2  
backoff_max = 30  
while num_retries >= 0 and client_error_retries >= 0:  
try:  
async with aiohttp.ClientSession(...) as session:  
async with session.post(request_url, json=payload.model_dump(), ...) as resp:  
await self._handle_response_errors(resp)  
return output_class(**await resp.json())  
except aiohttp.ClientError as e:  
client_error_retries -= 1  
await asyncio.sleep(retry_delay)  
retry_delay = min(retry_delay * 2, backoff_max)  # 指数退避  

该模块的关键设计要点包括:

  • 幂等性 Key:通过 X-Request-ID 请求头,服务端能够识别重复请求并直接返回缓存结果,避免重复处理。
  • 指数退避:初始等待时间为 2 秒,此后每次翻倍,上限为 30 秒,有效防止请求雪崩。
  • 异常透传:远端发生的 Python 异常通过 _ExceptionTransfer 机制序列化后传回本地,并在本地重新抛出,确保异常信息完整传递。

6.4 鲁棒的超时与中断处理机制

# 来源:uni_agent/interaction/env.py  
async def run_action(self, action_cmd, action_timeout, max_observation_length=100_000):  
try:  
observation = await self.communicate(input=action_cmd, timeout=action_timeout)  
# 清理 ANSI 转义码  
observation = re.sub(r"x1b[[0-9;]*m|r", "", observation)  
# 过长输出截断并提示  
if len(observation) > max_observation_length:  
observation = f"{observation[:max_observation_length]}<response clipped>..."  
except CommandTimeoutError:  
# 先尝试中断会话  
try:  
await self.interrupt_session()  
except Exception:  
# 中断失败 → 5 次健康探测  
for _ in range(5):  
probe = await self.communicate("echo 'terminal still alive'", check="ignore")  
if "terminal still alive" in probe:  
terminal_alive = True  
break  
if not terminal_alive:  
raise TerminalNotAliveError("Terminal did not respond")  
raise ActionTimeoutError(f"Command timed out after {action_timeout}s")  

这套“中断 → 探测 → 判活”的三级容错机制确保了即便单条指令失控,整个 Agent 会话也不会因此崩溃。


七、工具系统:从注册到执行

7.1 注册表模式

工具通过装饰器的方式注册至全局注册表:

# 来源:uni_agent/tools/registry.py  
TOOL_REGISTRY: dict[str, type[AbstractTool]] = {}  

def register_tool(name: str) -> type[AbstractTool]:  
def decorator(cls):  
if name in TOOL_REGISTRY and TOOL_REGISTRY[name] != cls:  
raise ValueError(f"Tool {name} already registered")  
TOOL_REGISTRY[name] = cls  
return cls  
return decorator  

7.2 工具安装流程

工具并非简单的 Python 函数调用,而是以可执行脚本的形式安装到容器的 PATH 路径中

# 来源:uni_agent/interaction/env.py  
async def install_tools(self, tools: list[AbstractTool]) -> None:  
install_dir = self.tool_install_dir  # 默认 /usr/local/bin  
await self.communicate(f"export PATH={install_dir}:$PATH", check="raise")  
for tool in tools:  
# 1. 上传脚本到容器  
await self.copy_to_container(src=tool.local_path, tgt=install_dir / tool.name)  
# 2. 赋予执行权限  
await self.communicate(f"chmod +x {container_tool_path}", check="raise")  
# 3. 运行可选的安装命令(如 pip install 依赖)  
if tool.get_install_command():  
await self.communicate(tool.get_install_command(), check="raise")  
# 4. 验证安装  
await self.communicate(f"which {tool.name}", check="raise")  

这意味着工具可以用任意编程语言实现(Python、Bash、Go 等),只要它是一个能够接收 --param value 类型参数的 CLI 程序即可。

7.3 将 Tool Call 转换为 Bash 命令

ToolsManager 负责将 OpenAI 的 function calling 格式转换成 shell 命令,具体实现如下:

# 来源:uni_agent/interaction/tools_manager.py  
def get_tool_bash_command(self, tool_call) -> str:  
func_name = tool_call.function.name  
func_params = tool_call.function.arguments  

# 特殊处理:execute_bash 直接执行原始命令  
if func_name == "execute_bash":  
return func_params.get("command", "")  

# 特殊处理:submit 输出终止标记  
if func_name == "submit":  
return "echo '<<<Finished>>>'"  

# 通用工具:转为 CLI 调用格式  
# 例:str_replace_editor --command view --path /tmp/foo.py  
cmd_parts = [shlex.quote(func_name)]  
for param_key, param_value in func_params.items():  
if isinstance(param_value, list | dict):  
param_str = json.dumps(param_value, ensure_ascii=False)  
else:  
param_str = str(param_value)  
cmd_parts.append(f"--{param_key}")  
cmd_parts.append(shlex.quote(param_str))  
return " ".join(cmd_parts)  

通过 shlex.quote 对所有参数进行转义处理,有效防范了 shell 注入风险。

7.4 多格式 Tool Parser

系统内建了两类解析器,用于适配不同模型的输出格式差异:

┌─────────────────────────────────────────────────────────────────────────────┐
│  XMLToolParser(Qwen3-Coder 格式)                                          │
│                                                                             │
│  模型输出:                                                                  │
│  I'll check the file structure first.                                       │
│  <tool_call>                                                                │
│  <function=execute_bash>                                                    │
│  <parameter=command>find /testbed -name "*.py" | head -20</parameter>       │
│  </function>                                                                │
│  </tool_call>                                                               │
│                                                                             │
│  解析结果:                                                                  │
│  content = "I'll check the file structure first."                           │
│  tool_call = {name: "execute_bash", arguments: {command: "find ..."}}       │
├─────────────────────────────────────────────────────────────────────────────┤
│  HermesToolParser(Hermes JSON 格式)                                        │
│                                                                             │
│  模型输出:                                                                  │
│  Let me search for the bug.                                                 │
│  <tool_call>                                                                │
│  {"name": "execute_bash", "arguments": {"command": "grep -r 'bug' ."}}      │
│  </tool_call>                                                               │
└─────────────────────────────────────────────────────────────────────────────┘

XML 解析器额外集成了参数类型推断功能,能够依据 tool schema 中定义的类型信息,自动将字符串转换为 int、bool、list 等 Python 原生对象。

八、Reward 系统:环境内验证

8.1 注册表与抽象基类

与奖励系统的设计理念相似,奖励机制同样采用了注册表模式:

来源:uni_agent/reward/base.py

class AbstractRewardSpec(ABC):
@abstractmethod
def compute_reward(self) -> AgentLoopOutput:
“””在交互环境中计算 reward”””

8.2 SWE-Bench 奖励机制:基于真实测试用例的驱动

我们以 SWEBenchRewardSpec 为例,来说明环境内评测的完整执行流程:

来源:uni_agent/reward/swe_bench.py

@register_reward_spec(“swe_bench”)
class SWEBenchRewardSpec(AbstractRewardSpec):
async def compute_reward(self, **kwargs) -> tuple[dict | None, bool]:
instance = self.metadata

1. 构建评测脚本(激活 conda、应用 test patch、运行测试)

eval_script_list = _make_eval_script_list(
instance=instance, specs=specs,
env_name=”testbed”, repo_directory=”/testbed”,
base_commit=instance[“base_commit”],
test_patch=instance[“test_patch”],
)
eval_script = “n”.join([“#!/bin/bash”, “set -uxo pipefail”] + eval_script_list)

2. 写入容器并执行(超时 300s)

await self.env.write_file(eval_script_container, eval_script)
output = await self.env.communicate(f”bash {eval_script_container}”,
timeout=self.eval_timeout)

3. 解析测试报告

eval_report = self._get_eval_report(output)
return eval_report[“resolved”], result

关键的 _get_eval_report 方法会执行以下步骤:

  1. 从输出结果中提取 START_TEST_OUTPUTEND_TEST_OUTPUT 之间的内容
  2. 利用对应仓库的日志解析器(例如 pytest/unittest 等)来分析测试状态
  3. 对比 FAIL_TO_PASS(即本应被修复的测试用例)和 PASS_TO_PASS(即不应被破坏的测试用例)
  4. 如果所有 FAIL_TO_PASS 都通过,且 PASS_TO_PASS 没有出现回归,则最终判定为 RESOLVED

与“事后验证”相比,这种“环境内验证”模式的优势在于:无需额外创建评测容器,在大规模并发(千级)场景下能显著降低容器的调度开销。

unsetunset九、auto_await:同步与异步的无缝桥接unsetunset

Uni-Agent 中包含一个精巧的工具函数 auto_await,它使得同一个 async 方法既能够通过 await 来调用,也能在同步上下文中被直接触发:

来源:uni_agent/utils.py

def auto_await(func):
@functools.wraps(func)
def wrapper(args, kwargs):
coro = func(
args, **kwargs)
if not inspect.iscoroutine(coro):
return coro

try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None

如果没有事件循环 → 使用 asyncio.run()

if loop is None:
return asyncio.run(coro)

如果有事件循环且调用者是协程 → 返回 coro 让调用者自行 await

caller_frame = inspect.currentframe().f_back
caller_is_async = (caller_frame.f_code.co_flags & inspect.CO_COROUTINE) != 0
if caller_is_async:
return coro

如果有事件循环但调用者是同步 → 使用线程池执行(避免死锁)

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(asyncio.run, coro)
return future.result()
return wrapper

这个设计解决了一个实际痛点:奖励规范(reward spec)或环境操作可能会被强化学习训练框架的同步回调调用,也可能在异步交互循环中被 await 调用。auto_await 使得同一份代码能够完美适配这两种不同的调用场景。

unsetunset十、双模型抽象:训练与纯推理的统一unsetunset

10.1 AgentChatModel 与 OpenAICompatibleChatModel 的对比

┌────────────────────────────────────────────────────────────────────────┐  
│                         model.py 中的两套实现                          │  
├────────────────────────────┬───────────────────────────────────────────┤  
│     AgentChatModel         │     OpenAICompatibleChatModel             │  
├────────────────────────────┼───────────────────────────────────────────┤  
│ 适用场景:RL 训练 rollout  │ 适用场景:纯推理/评测                     │  
│ 后端引擎:verl AsyncLLM    │ 后端引擎:任意 OpenAI API                 │  
│ 输出内容:token_ids + logprobs │ 输出内容:文本 + tool_calls           │  
│ rollout_cache:存储完整 token  │ rollout_cache:仅存储 api_messages    │  
│ mask:精确到每个 token     │ mask:无(无需训练)                      │  
└────────────────────────────┴───────────────────────────────────────────┘  

这两个实现采用了完全一致的方法签名,包括 queryprepare_rollout_cacheappend_messages_to_rollout_cache。这种设计让 AgentInteraction 在调用时完全无需关心底层具体使用的是哪一类模型。

10.2 针对 OpenAI 兼容模型的工具调用处理

在纯推理场景中,OpenAI API 会直接以结构化的 tool_calls 格式返回结果,避免了文本解析的麻烦:

# 来源:uni_agent/interaction/model.py (OpenAICompatibleChatModel)  
async def query(self, messages, rollout_cache, **kwargs):  
chat_completion = await self.client.chat.completions.create(  
model=self.model_name, messages=api_messages, tools=self.tools_schemas, ...  
)  
response_tool_calls = list(response_message.tool_calls or [])  
if response_tool_calls:  
# 序列化后存入 rollout_cache,供 tools_manager.parse_structured_action 使用  
rollout_cache["extra_fields"]["last_tool_calls"] = serialized_tool_calls  

十一、并发模型与性能优化

11.1 全链路异步架构

┌─────────────────────────────────────────────────────────────┐  
│                    asyncio Event Loop                        │  
│                                                             │  
│  ┌──────────┐  ┌──────────┐  ┌──────────┐       ┌──────┐  │  
│  │ Agent #1  │  │ Agent #2  │  │ Agent #3  │  ...  │#1000 │  │  
│  └────┬─────┘  └────┬─────┘  └────┬─────┘       └──┬───┘  │  
│       │              │              │                │      │  
│       ▼              ▼              ▼                ▼      │  
│  ┌───────────────────────────────────────────────────────┐  │  
│  │          asyncio.Semaphore(worker_concurrent)         │  │  
│  │          ← 控制同时活跃的环境数 →                      │  │  
│  └───────────────────────────────────────────────────────┘  │  
│       │              │              │                │      │  
│       ▼              ▼              ▼                ▼      │  
│  ┌──────────────────────────────────────────────────────┐   │  
│  │            aiohttp connection pool                   │   │  
│  │       (force_close=True 避免连接泄漏)                │   │  
│  └──────────────────────────────────────────────────────┘   │  
└─────────────────────────────────────────────────────────────┘  

11.2 关键性能数据

根据 README 中公布的结果:

模型 Benchmark 分数
Qwen3-Coder-30B SWE-Bench-Verified 48.8 (1-Attempt, Avg@4)
Qwen3-Coder-480B SWE-Bench-Verified 64.2
Qwen3-Coder-Next SWE-Bench-Verified 67.7
Qwen3-30B-A3B (RL) R2E-Gym 22.2 → 36.8
Qwen3-Coder-30B-A3B (RL) R2E-Gym 46.2 → 52.0

在 RL 训练过程中,采用了 GSPO、Fully-Async 与 Partial Rollout 相结合的方案。其中,“Fully-Async”本质上就是 Uni-Agent 所实现的异步交互机制——其核心思想在于,无需等待所有 Agent 全部完成,即可开始更新策略参数。

unsetunset十二、结果保存与可观测性unsetunset

12.1 交互结果的持久化

# 来源:uni_agent/agent_loop.py  
def _save_interaction_result(self, interaction_result: dict):  
self.output_dir.mkdir(parents=True, exist_ok=True)  
# 二进制 pickle:rollout_cache(用于训练重放)  
with (self.output_dir / "rollout_cache.pkl").open("wb") as f:  
pickle.dump(interaction_result["rollout_cache"], f, protocol=pickle.HIGHEST_PROTOCOL)  
# JSON:trajectory + messages(用于人工审查)  
save_content = {  
"trajectory": [s.model_dump() for s in interaction_result["trajectory"]],  
"execution_time": interaction_result["execution_time"],  
"messages": interaction_result["messages"],  
"metrics": interaction_result.get("metrics", {}),  
}  
(self.output_dir / "interaction_result.json").write_text(json.dumps(save_content, ...))  

每个运行实例的输出目录结构如下:

/tmp/swebench_qwen3_coder/{run_id}/  
├── run.log                  # 实时日志(Dashboard 流式读取)  
├── rollout_cache.pkl        # 训练数据(token 级别)  
└── interaction_result.json  # 人类可读的交互轨迹  

12.2 Dashboard 的状态推断

Dashboard 通过检测特定文件的存在来判断运行状态:

  • run.log 存在且无运行标记 → 状态为 queued
  • 环境启动/交互步骤开始 → 状态为 running
  • reward 计算阶段 → 状态为 verify
  • interaction_result.json 存在 → 状态为 completed

unsetunset总结与展望unsetunset

Uni-Agent 的核心贡献可以归纳为三个层面:

  • 架构层面:它提出了一种 model-tool-env 三层解耦,并结合 rollout cache 实时累积的设计范式。这使得推理路径与训练路径真正统一在同一条代码路径上,彻底消除了传统方案中因数据转换而带来的开销以及语义不一致问题。
  • 工程层面:通过类级信号量、指数退避重试、幂等性 Key、超时预算以及异常轨迹降级等一系列机制,实现了在千级并发环境下的生产级稳定性。其中,auto_await 装饰器巧妙地弥合了同步调用与异步调用之间的鸿沟。
  • 生态层面:工具以可执行脚本形式安装,部署可通过 discriminated union 无缝切换,reward 则通过注册表进行扩展。整个系统的每一个扩展点都被设计成“即插即用”的模式。

从 Roadmap 来看,项目正在向 GUI 工具支持、多模态模型、DeepSeek 适配以及更多云部署后端发展

随着 Agent RL 训练从学术原型走向工程实践,Uni-Agent 这种“一栈到底”的框架设计将展现出越来越大的价值——正如深度学习时代的 PyTorch 统一了研究与生产的训练流程一样,Uni-Agent 正在为 Agent 时代扮演着相同的角色。

参考资料[1]

Uni-Agent轻量级监控面板: https://github.com/verl-project/uni-agent/blob/b09f167c35a069f8839c1f19f859d58dacbeeef9/dashboard/README.md

[2]

官方文档: https://uni-agent.readthedocs.io/en/latest/index.html

[3]

dashboard/README.md: https://github.com/verl-project/uni-agent/blob/main/dashboard/README.md


关注“鲸栖”小程序,掌握最新AI资讯

本文来自网络搜集,不代表鲸林向海立场,如有侵权,联系删除。转载请注明出处:https://www.itsolotime.com/archives/35465

(0)
上一篇 1小时前
下一篇 52分钟前

相关推荐