
将非结构化会议记录转化为可查询的知识图谱,并支持增量更新——无需每次进行全量重处理。
会议记录是组织智能的金矿,其中记录了决策、行动项、参与者信息以及人与任务之间的关系。然而,多数组织仍将其视为静态文档,仅能进行基础的全文检索。
试想一下,能够像查询数据库一样查询你的会议记录:
- “谁参加过主题为‘预算规划’的会议?”
- “Sarah 在所有会议中被分配了哪些任务?”
- “展示第四季度所有涉及工程团队的决策。”
这正是知识图谱的用武之地。通过将非结构化会议记录抽取为结构化信息并构建图表示,可以解锁强大的基于关系的查询能力,这是传统文档存储无法实现的。
本文将构建一个实用的 CocoIndex 处理流水线,实现以下功能:
- 从 Google Drive 读取 Markdown 格式的会议记录。
- 使用大型语言模型抽取结构化实体(会议、参与者、任务)。
- 将数据以知识图谱的形式写入 Neo4j 数据库。
- 仅在源文档发生变化时自动更新图谱。
完整源码可在 GitHub 获取:https://github.com/cocoindex-io/meeting-notes-knowledge-graph

架构总览
该流水线遵循清晰的数据流,并在每个阶段内置了增量处理逻辑:
Google Drive(带变更追踪的文档)
→ 识别变更的文档
→ 按会议拆分内容
→ 使用 LLM 抽取结构化数据(仅处理变更的文档)
→ 收集节点与关系
→ 导出到 Neo4j(带 upsert 逻辑)
前置条件
- Neo4j:安装并启动 Neo4j 数据库。本文示例使用默认本地地址
http://localhost:7474,凭证为用户名neo4j,密码cocoindex。 - OpenAI API Key:配置你的 OpenAI API 密钥。
- Google Drive 准备:
- 创建一个 Google Cloud 服务账号并下载其 JSON 凭证文件。
- 将包含会议记录的源文件夹共享给该服务账号的邮箱。
- 获取你希望导入的根文件夹 ID。
- 详细设置步骤请参考“Google Drive 设置指南”。
环境变量
设置以下环境变量:
export OPENAI_API_KEY=sk-...
export GOOGLE_SERVICE_ACCOUNT_CREDENTIAL=/absolute/path/to/service_account.json
export GOOGLE_DRIVE_ROOT_FOLDER_IDS=folderId1,folderId2
说明:
* GOOGLE_DRIVE_ROOT_FOLDER_IDS 接受以逗号分隔的多个文件夹 ID。
* 该流程会轮询最近变更,并定期刷新数据。
下面将逐一拆解流水线的各个组件。
流程定义
概览

添加 Source 和 Collector
python
@cocoindex.flow_def(name="MeetingNotesGraph")
def meeting_notes_graph_flow(
flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope
) -> None:
"""
定义一个从文件中抽取三元组并构建知识图谱的示例流程。
"""
credential_path = os.environ["GOOGLE_SERVICE_ACCOUNT_CREDENTIAL"]
root_folder_ids = os.environ["GOOGLE_DRIVE_ROOT_FOLDER_IDS"].split(",")
data_scope["documents"] = flow_builder.add_source(
cocoindex.sources.GoogleDrive(
service_account_credential_path=credential_path,
root_folder_ids=root_folder_ids,
recent_changes_poll_interval=datetime.timedelta(seconds=10),
),
refresh_interval=datetime.timedelta(minutes=1),
)
流水线首先通过服务账号连接到 Google Drive。CocoIndex 内置的 Source 连接器负责身份验证,并提供增量变更检测功能。recent_changes_poll_interval 参数设定为每 10 秒检查一次新增或修改的文件,而 refresh_interval 参数则决定了整个流程的重新运行频率(每分钟一次)。

这正是 CocoIndex 的核心能力之一:具备自动变更跟踪的增量处理。该框架不会每次都重新处理全部文档,而是:
- 从 Google Drive 列出所有文件及其最后修改时间。
- 仅识别自上次成功运行以来新增或修改的文件。
- 完全跳过未发生变更的文件。
- 只将变更的文档传递到下游进行处理。
效果如何?在一个日均变更率为 1% 的企业环境中,只有 1% 的文档会触发下游处理。未变更的文件不会调用 LLM API,不会生成 Neo4j 查询,也不会消耗额外的计算资源。
添加 Collector
python
meeting_nodes = data_scope.add_collector()
attended_rels = data_scope.add_collector()
decided_tasks_rels = data_scope.add_collector()
assigned_rels = data_scope.add_collector()
随后,流水线使用不同的 Collector 来收集不同类型实体和关系的数据:
- 会议节点:存储会议本身的信息,包含日期和记录内容。
- 参会关系:捕捉谁参加了会议,以及其是否为会议组织者。
- 任务决策关系:将会议与会议中决定的各项任务关联起来。
- 任务分配关系:将具体的任务分配给相应的人员。
处理每个文档
抽取会议
会议文档通常在同一文件中包含多场会议。本步骤基于 Markdown 标题(## 或 #,且前有空行)进行拆分,将每个标题段落视作一场独立会议。代码中的 keep_separator="RIGHT" 表示分隔符(标题)将保留在右侧段落,以确保后续处理的上下文完整性。
python
with data_scope["documents"].row() as document:
document["meetings"] = document["content"].transform(
cocoindex.functions.SplitBySeparators(
separators_regex=[r"nn##? "], keep_separator="RIGHT"
)
)

抽取单场会议
定义 Meeting Schema
为确保 LLM 能够进行结构化、可靠的抽取,我们首先定义清晰的数据模式(Schema)。这比让 LLM 输出自由格式文本更有效,能直接生成适用于构建知识图谱的结构化数据。
“`python
@dataclass
class Person:
name: str
@dataclass
class Task:
description: str
assigned_to: list[Person]
@dataclass
class Meeting:
time: datetime.date
note: str
organizer: Person
participants: list[Person]
tasks: list[Task]
“`
抽取并收集关系
接下来,我们使用 LLM 根据上述 Schema 从单场会议文本中抽取结构化信息。此步骤启用了缓存机制:只要输入(文本、模型、输出类型定义)未变,就会复用缓存结果,避免不必要的 LLM 重复调用,这对于处理大量数据至关重要。
python
with document["meetings"].row() as meeting:
parsed = meeting["parsed"] = meeting["text"].transform(
cocoindex.functions.ExtractByLlm(
llm_spec=cocoindex.LlmSpec(
api_type=cocoindex.LlmApiType.OPENAI, model="gpt-5"
),
output_type=Meeting,
)
)

收集关系
在 CocoIndex 中,Collectors 充当内存缓冲区。我们为不同实体和关系声明各自的收集器,并在处理每个会议时填充数据。以下代码从解析结果中收集节点与关系,为后续构建知识图谱做准备:
- 会议节点:收集会议的核心信息。
- 参与关系:将参与者(包括组织者)与他们参加的会议相连(
ATTENDED)。 - 决策关系:将会议与会议中决定的任务相连(
DECIDED)。 - 分配关系:将任务与负责该任务的人员相连(
ASSIGNED_TO)。
“`python
meeting_key = {“note_file”: document[“filename”], “time”: parsed[“time”]}
meeting_nodes.collect(**meeting_key, note=parsed[“note”])
attended_rels.collect(
id=cocoindex.GeneratedField.UUID,
**meeting_key,
person=parsed[“organizer”][“name”],
is_organizer=True,
)
with parsed[“participants”].row() as participant:
attended_rels.collect(
id=cocoindex.GeneratedField.UUID,
**meeting_key,
person=participant[“name”],
)
with parsed[“tasks”].row() as task:
decided_tasks_rels.collect(
id=cocoindex.GeneratedField.UUID,
meeting_key,
description=task[“description”],
)
with task[“assigned_to”].row() as assigned_to:
assigned_rels.collect(
id=cocoindex.GeneratedField.UUID,
meeting_key,
task=task[“description”],
person=assigned_to[“name”],
)
“`
映射到图数据库
概览
我们将构建一个属性图,包含上文收集的节点与关系。关于属性图的更多信息,可参阅 CocoIndex 文档中的 “Property Graph Targets” 部分。
映射会议节点
以下代码将收集到的会议节点数据映射并导出到 Neo4j 数据库,为其设置 Meeting 标签,并指定复合主键。

python
meeting_nodes.export(
"meeting_nodes",
cocoindex.targets.Neo4j(
connection=conn_spec, mapping=cocoindex.targets.Nodes(label="Meeting")
),
primary_key_fields=["note_file", "time"],
)
声明 Person 与 Task 节点
使用 flow_builder.declare 方法,我们预先声明了知识图谱中两种核心实体的节点结构。
python
flow_builder.declare(
cocoindex.targets.Neo4jDeclaration(
connection=conn_spec,
nodes_label="Person",
primary_key_fields=["name"],
)
)
flow_builder.declare(
cocoindex.targets.Neo4jDeclaration(
connection=conn_spec,
nodes_label="Task",
primary_key_fields=["description"],
)
)
* Person 节点:代表会议参与者或组织者,以 name 字段作为唯一标识。
* Task 节点:代表会议中产生的任务项,以 description 字段作为唯一标识。
映射 ATTENDED 关系
此步骤定义了人员与会议之间的参与关系。
python
attended_rels.export(
"attended_rels",
cocoindex.targets.Neo4j(
connection=conn_spec,
mapping=cocoindex.targets.Relationships(
rel_type="ATTENDED",
source=cocoindex.targets.NodeFromFields(
label="Person",
fields=[
cocoindex.targets.TargetFieldMapping(
source="person", target="name"
)
],
),
target=cocoindex.targets.NodeFromFields(
label="Meeting",
fields=[
cocoindex.targets.TargetFieldMapping("note_file"),
cocoindex.targets.TargetFieldMapping("time"),
],
),
),
),
primary_key_fields=["id"],
)
* 关系语义:创建 ATTENDED 类型的边,连接 Person 节点与 Meeting 节点,表示“某人参加了某次会议”。
* 映射逻辑:
* 源节点 (Person):使用数据中的 person 字段匹配已声明的 Person 节点的 name 主键。
* 目标节点 (Meeting):使用 note_file(纪要文件标识)和 time(会议时间)共同匹配 Meeting 节点。
* 作用与优势:
* 支持高效的图查询,例如“Alice 参加了哪些会议?”或“某次会议有哪些参与者?”。
* 通过主键映射确保节点唯一性,避免创建重复的 Person 或 Meeting 节点。
* 关系自身拥有唯一 id,结合一致的节点映射策略,保障了知识图谱在增量更新时的稳定性和幂等性,避免产生重复边。
映射 DECIDED 关系
此步骤定义了会议与任务之间的决策关系。
python
decided_tasks_rels.export(
"decided_tasks_rels",
cocoindex.targets.Neo4j(
connection=conn_spec,
mapping=cocoindex.targets.Relationships(
rel_type="DECIDED",
source=cocoindex.targets.NodeFromFields(
label="Meeting",
fields=[
cocoindex.targets.TargetFieldMapping("note_file"),
cocoindex.targets.TargetFieldMapping("time"),
],
),
target=cocoindex.targets.NodeFromFields(
label="Task",
fields=[
cocoindex.targets.TargetFieldMapping("description"),
],
),
),
),
primary_key_fields=["id"],
)
* 关系语义:创建 DECIDED 类型的边,连接 Meeting 节点与 Task 节点,表示“某次会议决定了某项任务”。
* 映射逻辑:
* 源节点 (Meeting):使用 note_file 和 time 字段匹配 Meeting 节点。
* 目标节点 (Task):使用 description 字段匹配已声明的 Task 节点的 description 主键。
* 作用与优势:
* 支持追溯任务来源和会议产出,例如“某次会议决定了哪些任务?”或“这个任务是在哪场会议上提出的?”。
* 通过主键确保 Task 节点的唯一性,相同的任务描述不会被重复创建。
* 同样具备幂等性,保障增量更新时图的稳定性。
映射 ASSIGNED_TO 关系
ASSIGNED_TO 关系
python
assigned_rels.export(
"assigned_rels",
cocoindex.targets.Neo4j(
connection=conn_spec,
mapping=cocoindex.targets.Relationships(
rel_type="ASSIGNED_TO",
source=cocoindex.targets.NodeFromFields(
label="Person",
fields=[
cocoindex.targets.TargetFieldMapping(
source="person", target="name"
),
],
),
target=cocoindex.targets.NodeFromFields(
label="Task",
fields=[
cocoindex.targets.TargetFieldMapping(
source="task", target="description"
),
],
),
),
),
primary_key_fields=["id"],
)
最终图谱
运行该流水线后,你的 Neo4j 数据库中将形成一个结构丰富、便于查询的知识图谱:
节点类型:
Meeting—— 表示单场会议,包含日期、记录等属性。Person—— 表示参与会议的个人。Task—— 表示在会议中决定的可执行事项。
关系类型:
ATTENDED—— 连接人员与他们参加的会议。DECIDED—— 连接会议与其决定的任务。ASSIGNED_TO—— 连接人员与他们被分配负责的任务。
关键的一点是,在最终导出到知识图谱的步骤中,CocoIndex 同样采用增量更新机制。它只会对发生变化的节点或关系进行更新,而对未变更的部分则不做任何操作。这种方式有效避免了目标数据库的无效震荡,并最大限度地降低了写入成本。
运行指南
构建/更新图谱
安装项目依赖:bash
pip install -e .
更新索引(运行一次流程来构建或更新图谱):bash
cocoindex update main
浏览知识图谱
打开 Neo4j Browser:http://localhost:7474。
以下是一些示例 Cypher 查询:
“`cypher
// 查看所有关系
MATCH p=()–>() RETURN p
// 查询谁参加了哪些会议(包含组织者)
MATCH (p:Person)-[:ATTENDED]->(m:Meeting)
RETURN p, m
// 查询会议中决定的任务
MATCH (m:Meeting)-[:DECIDED]->(t:Task)
RETURN m, t
// 查询任务分配情况
MATCH (p:Person)-[:ASSIGNED_TO]->(t:Task)
RETURN p, t
“`

企业级真实场景
上述模式的应用远不止于会议纪要解析:
- 研究论文分析 —— 从组织知识库中抽取论文,在成千上万份文档中构建概念与引用的知识图谱,并追踪引用关系与概念的更新。
- 客户支持工单 —— 抽取问题、解决方案以及工单与客户之间的关系;在处理频繁的编辑与状态更新的同时,从海量工单中识别模式。
- 邮件线程摘要 —— 在数百万封邮件中构建沟通模式与决策结果的图谱;应对团队转发、编辑并引用历史讨论的复杂场景。
- 合规文档管理 —— 从政策文档中抽取监管要求;追踪政策变更并通过图结构级联其影响;维护文档版本的审计轨迹。
- 竞争情报分析 —— 从公开文档与新闻中抽取数据;构建竞争对手关系、产品与市场定位的知识图谱,并处理信息的持续更新。
关注“鲸栖”小程序,掌握最新AI资讯
本文来自网络搜集,不代表鲸林向海立场,如有侵权,联系删除。转载请注明出处:http://www.itsolotime.com/archives/20036
