MFlow03-数据模型解析
MFlow03-数据模型解析从生活故事到代码实现的完整思考路径源码地址https://github.com/FlowElement-ai/m_flow文章目录MFlow03-数据模型解析 第一部分用生活故事理解数据结构故事侦探事务所的记忆管理系统 场景一一个完整案件Episode 场景二案件的多个角度Facet 场景三具体的事实细节FacetPoint 场景四跨越案件的人Entity 第二部分从0到1的设计推导步骤1最简单的记录 ❌步骤2分类管理 ❌步骤3引入事件包概念 ✅步骤4事件需要分解为多个角度 ✅步骤5主题需要更细粒度的锚点 ✅步骤6跨事件关联问题 ✅步骤7基类抽象 ✅步骤8关系本身也有语义 ✅ 第三部分调用链路分析场景A用户调用 memorize() 将文档转化为记忆场景B用户调用 search() 检索记忆 第四部分数据流转图1️⃣ 写入流程memorize2️⃣ 检索流程search 第五部分设计模式和架构思想1️⃣ **继承 模板方法模式**2️⃣ **组合模式Composite Pattern**3️⃣ **适配器模式**4️⃣ **工厂模式**5️⃣ **策略模式**6️⃣ **边的语义化Semantic Edge**7️⃣ **锥形图拓扑Inverted Cone Topology**8️⃣ **最小成本路径Minimum Cost Path** 总结核心数据模型的关系 学习建议如果你想深入理解数据模型按这个顺序阅读可以暂时忽略的部分 第一部分用生活故事理解数据结构故事侦探事务所的记忆管理系统想象你经营一家侦探事务所每天处理大量案件。你需要建立一个记忆系统让新侦探能快速找到历史经验。 场景一一个完整案件Episode案件周一项目会议争论 ├─ 时间2024年1月15日 ├─ 参与者Maria、张经理、李工 ├─ 经过讨论数据库选型Maria和张经理发生争执 └─ 结果暂定PostgreSQL两周后再议这个案件就是一个Episode事件包—— 它是记忆的最高层单元包含一个完整的故事。 场景二案件的多个角度Facet当新侦探问“发生了什么” 你不会简单复述而是从不同角度组织信息周一项目会议争论Episode ├─ Facet 1: 决策讨论 │ ├─ 内容选型PostgreSQL vs MySQL │ └─ 关键点性能优先 ├─ Facet 2: 人际冲突 │ ├─ 内容Maria对截止日期不满 │ └─ 关键点沟通不畅 ├─ Facet 3: 技术评估 │ ├─ 内容P99延迟要求500ms │ └─ 关键点PostgreSQL更合适 └─ Facet 4: 后续计划 ├─ 内容两周后最终决定 └─ 关键点需要更多数据每个Facet主题是案件的一个维度或切面。 场景三具体的事实细节FacetPoint新侦探追问“P99延迟要求是什么”Facet: 技术评估 └─ FacetPoint: P99延迟目标必须低于500毫秒 ├─ 证据来源会议纪要第3页 └─ 相关实体PostgreSQL、性能指标FacetPoint是原子级的事实点—— 最精确的记忆锚点。 场景四跨越案件的人Entity一个月后又有一个关于Maria的案子案件A周一项目会议争论 └─ involves_entity → Maria角色后端负责人 案件B周五冲刺回顾 └─ involves_entity → Maria角色提出问题的人通过Entity实体你能跨案件追踪同一个人MariaEntity ├─ same_entity_as → Maria案件A中的描述 └─ same_entity_as → Maria案件B中的描述这样查询Maria参与的所有事件时系统能把相关案件都找出来。 第二部分从0到1的设计推导现在让我们像设计师一样从零开始推导这个数据结构。步骤1最简单的记录 ❌# 想法直接存储文本memory{text:周一开会讨论数据库Maria生气了}问题无法检索无法组织无法关联。步骤2分类管理 ❌# 想法像图书馆一样分类memory{category:技术会议,title:数据库选型讨论,content:...}问题一个会议可能同时是技术会议和人际冲突静态分类无法应对复杂场景无法表达关系Maria参加了会议步骤3引入事件包概念 ✅洞察人类记忆是以事件为单位组织的。# 设计Episode事件包classEpisode:name:str# 周一数据库选型会议summary:str# 讨论PostgreSQL vs MySQLMaria因截止日期问题不满status:str# open | closed为什么这样设计summary是可检索的字段会被向量化status表示事件是否已完结一个 Episode 是一个完整的语义单元步骤4事件需要分解为多个角度 ✅问题一个事件包含多个维度决策、冲突、技术、计划如何组织传统方案用子标题summary ## 决策 选型PostgreSQL ## 冲突 Maria不满 ## 技术 P99 500ms 缺点结构化信息丢失无法精确检索到技术评估这个维度LLM容易混淆不同主题M-flow方案引入 FacetclassFacet:name:str# 技术评估facet_type:str# metric | decision | risk ...search_text:str# 性能目标讨论简短用于检索description:str# 详细描述...Episode 和 Facet 的关系classEpisode:has_facet:List[tuple[Edge,Facet]]# 一个Episode包含多个Facet关键设计tuple[Edge, Facet]Edge携带关系的语义edge_text: “涉及技术评估”这让关系本身可检索步骤5主题需要更细粒度的锚点 ✅问题用户问P99延迟目标是什么这个问题太精确无法匹配 Facet.search_text性能目标讨论太宽泛。解决方案引入 FacetPointclassFacetPoint:name:str# P99延迟目标search_text:str# P99延迟必须低于500毫秒description:str# 详细解释...Facet 和 FacetPoint 的关系classFacet:has_point:List[tuple[Edge,FacetPoint]]# 一个Facet包含多个FacetPoint为什么需要三层Episode回答发生了什么Facet回答哪个方面FacetPoint回答具体是什么这就是锥形图的物理结构步骤6跨事件关联问题 ✅问题如何找到Maria参与的所有事件传统方案文本搜索Maria❌ 会漏掉用同义词提到的 Maria如后端负责人M-flow方案引入 EntityclassEntity:name:str# Mariacanonical_name:str# maria规范化用于跨文档匹配description:str# 后端负责人负责性能优化Episode 和 Entity 的关系classEpisode:involves_entity:List[tuple[Edge,Entity]]# 一个Episode涉及多个EntityEntity 之间的关联classEntity:same_entity_as:List[tuple[Edge,Entity]]# 跨Episode的实体关联示例Episode A: involves_entity → Maria描述后端负责人 Episode B: involves_entity → Maria描述提出问题的人 ↓ same_entity_as通过 canonical_name 自动关联这样查询Maria时两个Episode都会被找到。步骤7基类抽象 ✅观察Episode、Facet、FacetPoint、Entity 都有共同属性id唯一标识type类型名称created_at创建时间metadata索引配置设计模式继承 模板方法classMemoryNode(BaseModel):所有图节点的基类id:UUIDtype:str# 自动填充为类名version:intmetadata:dict# {index_fields: [字段名]}created_at:intupdated_at:intclassmethoddefextract_index_text(cls,node):拼接索引字段用于向量化# 实现...子类继承classEpisode(MemoryNode):name:strsummary:strmetadata:dict{index_fields:[summary]}# 只索引summary好处统一的字段管理统一的向量化逻辑统一的序列化/反序列化步骤8关系本身也有语义 ✅创新设计Edge 不只是连接符它携带可检索的语义classEdge:edge_text:str# 讨论了 / 涉及 / 基于weight:float# 权重可选relationship_type:str# 关系类型可选使用方式episode.has_facet[(Edge(edge_text重点讨论了),Facet(name技术评估,search_text性能目标讨论))]检索时Edge.edge_text 也会被向量化为什么重要查询会议争论了什么 → 匹配 edge_text“争论了”查询会议决定了什么 → 匹配 edge_text“决定了”关系本身参与相关性评分 第三部分调用链路分析场景A用户调用memorize()将文档转化为记忆用户代码 ↓ m_flow.api.v1.memorize.memorize() 【业务核心】接收文档协调整个处理流程 ↓ m_flow.pipeline.execute_workflow() 【业务核心】执行多阶段处理管线 ↓ m_flow.pipeline.tasks.Stage 【业务核心】定义5个处理阶段 ├─ Stage1: 文本分块 ├─ Stage2: 信息提取LLM ├─ Stage3: 构建Episode ├─ Stage4: 构建Facet └─ Stage5: 构建FacetPoint和Entity ↓ m_flow.memory.episodic.write_episodic_memories() 【业务核心】将提取的数据转化为Episode/Facet/FacetPoint/Entity对象 ↓ m_flow.storage.persist_memory_nodes() 【适配层-可延后学习】统一持久化入口 ↓ m_flow.adapters.graph.graph_db_interface.write_nodes() 【适配层-可延后学习】图数据库适配器接口 ↓ m_flow.adapters.graph.kuzu.adapter.write_nodes() 【适配层-可延后学习】Kuzu数据库实现 ↓ m_flow.adapters.vector.vector_db_interface.upsert() 【适配层-可延后学习】向量数据库适配器接口 ↓ m_flow.adapters.vector.chroma.adapter.upsert() 【适配层-可延后学习】Chroma向量数据库实现关键方法说明文件.方法作用类型memorize.py:memorize()接收文档启动处理流程返回处理结果业务核心pipeline.py:execute_workflow()协调5个阶段的顺序执行处理依赖关系业务核心write_episodic_memories.py:write_episodic_memories()核心数据转换逻辑从LLM提取结果 → Episode对象业务核心episode_builder.py:execute_step1()从文档片段创建Episode节点业务核心episode_builder.py:_build_has_facet_edges()构建Episode→Facet的边携带edge_text业务核心episode_builder.py:_build_involves_entity_edges()构建Episode→Entity的边业务核心persist_memory_nodes()统一的持久化入口处理节点和边工具代码graph_db_interface.py:write_nodes()图数据库抽象接口适配层-可延后学习kuzu.adapter.py:write_nodes()Kuzu数据库的具体实现适配层-可延后学习vector_db_interface.py:upsert()向量数据库抽象接口适配层-可延后学习场景B用户调用search()检索记忆用户代码 ↓ m_flow.api.v1.search.search() 【业务核心】接收查询文本协调检索流程 ↓ m_flow.search.methods.search() 【业务核心】处理权限检查选择检索模式 ↓ m_flow.search.methods.no_access_control_search() 【业务核心】执行无权限控制的检索或其他模式 ↓ m_flow.search.operations.get_recall_mode_tools() 【业务核心】根据RecallMode选择检索策略 ├─ VECTOR: 纯向量搜索 ├─ GRAPH: 纯图遍历 ├─ HYBRID: 混合搜索 └─ TRIPLET_COMPLETION: M-flow特有的锥形图检索 ↓ m_flow.adapters.graph.get_graph_provider() 【适配层-可延后学习】获取图数据库实例 ↓ m_flow.adapters.vector.get_vector_provider() 【适配层-可延后学习】获取向量数据库实例 ↓ m_flow.search.operations.execute_triplet_search() 【业务核心】执行M-flow核心检索算法 ├─ 向量搜索找到锚点Entity/FacetPoint/Facet/Episode ├─ 图遍历传播成本沿着边 ├─ 计算每个Episode的最低路径成本 └─ 返回排序后的Episode列表 ↓ m_flow.search.utils.prepare_search_result() 【业务核心】格式化检索结果 ↓ 返回给用户关键方法说明文件.方法作用类型search.py:search()API入口解析参数处理权限业务核心search.methods.search:search()选择检索模式调用底层检索业务核心get_recall_mode_tools()工厂方法根据RecallMode返回检索工具业务核心execute_triplet_search()【核心算法】锥形图路径成本传播业务核心graph_db_interface:execute_query()执行图查询Cypher/Gremlin等适配层-可延后学习vector_db_interface:search()执行向量搜索适配层-可延后学习prepare_search_result()将图节点转换为API返回格式业务核心 第四部分数据流转图1️⃣ 写入流程memorize输入原始文档 │ ▼ ┌─────────────────────────────────────────────────────┐ │ Stage 1: 文本分块 (TextChunker) │ │ 输入长文档 │ │ 输出List[TextChunk] - 按语义分割的文本块 │ └─────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ Stage 2: LLM信息提取 (extract_graph) │ │ 输入List[TextChunk] │ │ 输出FragmentDigest - 提取的结构化信息 │ │ ├─ summaries: 摘要 │ │ ├─ entities: 实体列表 │ │ ├─ sections: 分段信息 │ │ └─ time_ranges: 时间范围 │ └─────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ Stage 3: 构建Episode (episode_builder) │ │ 输入FragmentDigest │ │ 输出Episode对象列表 │ │ ├─ name: 数据库选型会议 │ │ ├─ summary: 讨论了PostgreSQL vs MySQL... │ │ ├─ has_facet: [] │ │ └─ involves_entity: [] │ └─────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ Stage 4: 构建Facet (episode_builder) │ │ 输入FragmentDigest.sections │ │ 输出Facet对象列表 │ │ ├─ search_text: 性能目标讨论 │ │ ├─ facet_type: metric │ │ └─ anchor_text: 详细描述... │ │ 同时构建Episode→Facet的边携带edge_text │ └─────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ Stage 5: 构建FacetPoint和Entity │ │ 输入FragmentDigest的细粒度信息 │ │ 输出FacetPoint对象列表 Entity对象列表 │ │ FacetPoint: │ │ ├─ search_text: P99延迟必须低于500ms │ │ └─ supported_by: ContentFragment │ │ Entity: │ │ ├─ name: Maria │ │ ├─ canonical_name: maria │ │ └─ description: 后端负责人 │ └─────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ 数据持久化 (persist_memory_nodes) │ │ 输入Episode Facet FacetPoint Entity Edge │ │ 输出写入图数据库 向量数据库 │ │ │ │ 图数据库存储 │ │ ├─ 节点Episode, Facet, FacetPoint, Entity │ │ ├─ 边has_facet, involves_entity, has_point │ │ └─ 边属性edge_text可检索的语义 │ │ │ │ 向量数据库存储 │ │ ├─ Episode_summary集合 │ │ ├─ Facet_search_text集合 │ │ ├─ Facet_anchor_text集合 │ │ ├─ FacetPoint_search_text集合 │ │ ├─ Entity_name集合 │ │ └─ Edge_edge_text集合 │ └─────────────────────────────────────────────────────┘2️⃣ 检索流程search输入用户查询 为什么Maria在周一站会上生气 │ ▼ ┌─────────────────────────────────────────────────────┐ │ 步骤1: 宽网向量搜索 (wide_search) │ │ 在7个向量集合中同时搜索 │ │ ├─ Episode_summary │ │ ├─ Facet_search_text │ │ ├─ Facet_anchor_text │ │ ├─ FacetPoint_search_text │ │ ├─ Entity_name │ │ ├─ Entity_canonical_name │ │ └─ Edge_edge_text │ │ 每个集合返回top-100候选 │ └─────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ 步骤2: 投影到知识图 (project_to_graph) │ │ 输入向量命中的节点ID列表 │ │ 操作 │ │ ├─ 从图数据库获取这些节点的完整信息 │ │ ├─ 获取相邻节点1-hop邻居 │ │ └─ 获取连接边包括edge_text │ │ 输出局部子图锚点 邻居 边 │ └─────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ 步骤3: 成本传播 (cost_propagation) 【核心算法】 │ │ │ │ 对子图中的每个Episode │ │ ┌─────────────────────────────────────────┐ │ │ │ 找到所有从锚点到该Episode的路径 │ │ │ │ │ │ │ │ 路径示例 │ │ │ │ 1. Entity → Episode │ │ │ │ 起始成本: 0.1Entity.name匹配度高 │ │ │ │ 边成本: 0.3edge_text相关性中等 │ │ │ │ 跳数惩罚: 0.2 │ │ │ │ 总成本: 0.6 │ │ │ │ │ │ │ │ 2. FacetPoint → Facet → Episode │ │ │ │ 起始成本: 0.05精确匹配 │ │ │ │ 边1成本: 0.1属于边 │ │ │ │ 边2成本: 0.2edge_text匹配 │ │ │ │ 跳数惩罚: 0.42跳 │ │ │ │ 总成本: 0.75 │ │ │ │ │ │ │ │ Episode得分 MIN(所有路径成本) 0.6 │ │ │ └─────────────────────────────────────────┘ │ │ │ │ 关键设计 │ │ ✓ 取最小成本一条强路径足够 │ │ ✓ 直接命中Episode会被惩罚防止泛化匹配 │ │ ✓ 边的edge_text参与成本计算 │ └─────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ 步骤4: 排序和组装 (rank_and_assemble) │ │ ├─ 按成本排序Episode │ │ ├─ 选择top-K │ │ └─ 根据display_mode组装输出 │ │ ├─ summary: 只返回Episode.summary │ │ ├─ detail: 返回EpisodeFacetEntity │ │ └─ highly_related: 只返回匹配的Facet相关段落 │ └─────────────────────────────────────────────────────┘ │ ▼ 输出List[SearchResult] 第五部分设计模式和架构思想1️⃣继承 模板方法模式体现所有节点继承自MemoryNodeMemoryNode(抽象基类)├─ Episode ├─ Facet ├─ FacetPoint ├─ Entity ├─ EntityType └─ Procedure好处统一字段管理id, created_at, metadata统一向量化逻辑extract_index_text统一序列化/反序列化Pydantic BaseModel2️⃣组合模式Composite Pattern体现Episode → Facet → FacetPoint 的层级结构Episode ├─ has_facet:List[Facet]│ └─ has_point:List[FacetPoint]└─ involves_entity:List[Entity]好处树形结构的统一处理递归遍历从Episode找到所有FacetPoint灵活扩展可以添加新的层级3️⃣适配器模式体现图数据库和向量数据库的适配层GraphProvider(接口)├─ KuzuAdapter ├─ Neo4jAdapter └─ NeptuneAdapter VectorProvider(接口)├─ ChromaAdapter ├─ PgVectorAdapter └─ OpenSearchAdapter好处解耦业务逻辑和具体存储切换数据库不需要修改业务代码支持多种存储后端4️⃣工厂模式体现get_graph_provider(),get_vector_provider()defget_graph_provider():backendconfig.GR_BACKEND# kuzu | neo4j | ...ifbackendkuzu:returnKuzuAdapter()elifbackendneo4j:returnNeo4jAdapter()# ...好处根据配置动态创建适配器隐藏实现细节统一入口5️⃣策略模式体现RecallMode检索策略classRecallMode(Enum):VECTORvector# 纯向量GRAPHgraph# 纯图HYBRIDhybrid# 混合TRIPLET_COMPLETIONtriplet# M-flow特有defget_recall_mode_tools(mode):ifmodeRecallMode.VECTOR:returnVectorSearchStrategy()elifmodeRecallMode.TRIPLET_COMPLETION:returnTripletSearchStrategy()# ...好处运行时切换检索算法每种策略独立实现易于添加新策略6️⃣边的语义化Semantic Edge创新设计Edge 不是简单的连接符而是携带可检索语义的一等公民Edge(edge_text重点讨论了,weight0.8)为什么重要查询争论了什么 → 匹配 edge_text“争论了”查询决定了什么 → 匹配 edge_text“决定了”边参与相关性评分过滤无关路径这是M-flow区别于传统图数据库的核心创新之一7️⃣锥形图拓扑Inverted Cone Topology架构思想从精确到抽象的倒金字塔Entity FacetPoint ← 尖端最精确的匹配点 │ │ └─────┬─────┘ │ Facet ← 中层主题维度 │ Episode ← 底座完整事件包检索逻辑向量搜索在尖端找到精确锚点图遍历沿着边向下传播到底座计算路径成本返回最相关的Episode为什么这样设计不同粒度的查询自然路由精确问题命中FacetPoint宽泛问题命中Episode跨文档关联通过Entity实现8️⃣最小成本路径Minimum Cost Path设计哲学一条强证据链足够证明相关性episode_scoreMIN(all_path_costs)# 而不是episode_scoreAVG(all_path_costs)# ❌为什么重要模仿人类记忆一个联想触发回忆防止无关Facet拉低相关性即使Episode有10个Facet只要1个相关就应被检索 总结核心数据模型的关系MemoryNode (基类) │ ├─ Episode (事件包) - 最顶层记忆单元 │ ├─ has_facet → List[Facet] │ ├─ involves_entity → List[Entity] │ └─ includes_chunk → List[ContentFragment] │ ├─ Facet (主题维度) - 事件的一个角度 │ ├─ search_text (索引字段) │ ├─ anchor_text (索引字段) │ └─ has_point → List[FacetPoint] │ ├─ FacetPoint (事实点) - 最细粒度的记忆 │ └─ search_text (索引字段) │ ├─ Entity (实体) - 跨事件的人/物 │ ├─ name (索引字段) │ ├─ canonical_name (索引字段) │ └─ same_entity_as → List[Entity] │ ├─ EntityType (实体类型) - 实体的分类标签 │ └─ name (索引字段) │ └─ Procedure (程序性记忆) - 方法/步骤/流程 ├─ summary (索引字段) ├─ has_context_point → List[ProcedureContextPoint] └─ has_key_point → List[ProcedureStepPoint] Edge (边) - 携带语义的关系 ├─ edge_text (索引字段) ← 核心创新 ├─ weight └─ relationship_type 学习建议如果你想深入理解数据模型按这个顺序阅读基础m_flow/core/models/MemoryNode.py30分钟理解基类的设计理解extract_index_text的逻辑核心节点m_flow/core/domain/models/Episode.py1小时理解Episode的结构理解has_facet和involves_entity的设计主题层m_flow/core/domain/models/Facet.py30分钟理解search_text和anchor_text的区别理解索引字段的配置事实层m_flow/core/domain/models/FacetPoint.py20分钟理解最细粒度的记忆单元实体层m_flow/core/domain/models/Entity.py30分钟理解canonical_name和same_entity_as的设计边的语义m_flow/core/models/Edge.py15分钟理解为什么Edge需要edge_text写入流程m_flow/memory/episodic/write_episodic_memories.py2小时理解如何从LLM输出构建Episode理解如何建立边的关系可以暂时忽略的部分❌m_flow/adapters/- 适配器层可延后学习❌m_flow/storage/- 持久化细节可延后学习❌m_flow/pipeline/tasks/- 具体的LLM提示词可延后学习最后的提醒数据模型是M-flow的骨架理解它需要✅ 从生活场景入手侦探事务所的故事✅ 理解设计者的推导思路从0到1✅ 追踪调用链路看数据如何流动✅ 识别设计模式继承、组合、适配器✅ 理解核心创新边的语义化、锥形图、最小成本路径不要一开始就陷入细节先理解为什么这样设计再深入怎么实现