AI做真实世界研究:数据整理、质控与分析效率怎么一起提
真实世界研究的数据治理最容易卡在三个地方多源数据口径不一、缺失值无法解释、字段映射反复返工。本文只讨论技术架构和工程流程示例不提供诊断、治疗、分诊或用药建议文中的质控阈值和升级规则均为示例真实项目应由医疗专业人员和机构规范确认。问题背景AI分析前数据底座经常先掉链子在真实世界研究项目里数据通常来自多个业务系统、登记表、随访表和人工整理文件。开发侧拿到的不是“可直接建模的数据集”而是一批字段名不一致、编码体系不统一、日期粒度不同、缺失原因不明确的宽表或半结构化文件。我在做这类链路时会把“AI提效”拆成两层第一层是让数据整理、校验、映射尽量自动化第二层才是让分析脚本、特征生成和报告草稿提速。前者不稳定后者只会把错误放大。一个更稳的流程可以画成这样多源数据 - 标准字段映射 - Pandas 清洗 - DuckDB 汇总与落盘 - Great Expectations 质控 - Airflow 编排 - 分析数据集 ADS技术目标把清洗、质控、分析准备放到一条链路里这条链路的目标不是替代研究设计也不是自动判断医学结论而是把数据工程环节做成可复跑、可追踪、可审计。核心约束建议写清楚字段映射必须版本化不能只存在于开发者电脑里的 Excel。每次生成分析数据集时要记录输入文件、规则版本和运行时间。缺失、重复、异常编码不能只在日志里提示要形成质控结果。示例阈值只用于工程演示真实阈值需按项目方案和机构规则确认。AI可以辅助字段匹配、异常解释和代码生成但最终规则要人工确认。方案设计Pandas 负责清洗DuckDB 负责中间层Pandas 适合做逐字段清洗和规则化转换DuckDB 适合在本地或任务容器中做 SQL 汇总、Join 和 Parquet 落盘。两者结合比把所有逻辑都塞进 Pandas 更容易排查性能和口径问题。建议目录结构如下rwr-pipeline/ dags/ rwr_etl_dag.py configs/ field_mapping.yaml quality_rules.yaml data/ raw/ staging/ mart/ src/ extract_clean.py validate.py build_ads.py字段映射文件可以先保持简单patient_id:source:[pid,patient_no,subject_id]target:patient_idvisit_date:source:[visit_dt,date,followup_date]target:visit_datestatus_code:source:[status,outcome_code]target:status_codeAI在这里适合做“候选映射推荐”例如根据字段名、样例值、历史映射表给出建议。但不要让模型直接改生产规则推荐结果应进入人工审核流程。核心实现从多源 CSV 生成分析准备表下面示例用 Pandas 合并多源数据用 DuckDB 做去重和汇总。代码变量都围绕真实世界研究数据整理不涉及业务诊断逻辑。importpandasaspdimportduckdbfrompathlibimportPath RAW_DIRPath(data/raw)STAGING_DIRPath(data/staging)MART_DIRPath(data/mart)FIELD_MAP{pid:patient_id,patient_no:patient_id,subject_id:patient_id,visit_dt:visit_date,date:visit_date,followup_date:visit_date,status:status_code,outcome_code:status_code,}defnormalize_columns(df:pd.DataFrame)-pd.DataFrame:renamed{}forcolindf.columns:keycol.strip().lower()renamed[col]FIELD_MAP.get(key,key)returndf.rename(columnsrenamed)defclean_one_file(path:Path)-pd.DataFrame:dfpd.read_csv(path,dtypestr)dfnormalize_columns(df)required[patient_id,visit_date]forcolinrequired:ifcolnotindf.columns:raiseValueError(f{path.name}缺少必要字段:{col})df[patient_id]df[patient_id].str.strip()df[visit_date]pd.to_datetime(df[visit_date],errorscoerce)df[source_file]path.nameifstatus_codeindf.columns:df[status_code]df[status_code].str.strip().str.upper()returndfdefbuild_staging():frames[clean_one_file(p)forpinRAW_DIR.glob(*.csv)]mergedpd.concat(frames,ignore_indexTrue)STAGING_DIR.mkdir(parentsTrue,exist_okTrue)merged.to_parquet(STAGING_DIR/rwr_staging.parquet,indexFalse)defbuild_ads():MART_DIR.mkdir(parentsTrue,exist_okTrue)conduckdb.connect()con.execute( CREATE OR REPLACE TABLE staging AS SELECT * FROM read_parquet(data/staging/rwr_staging.parquet) )con.execute( COPY ( SELECT patient_id, MIN(visit_date) AS first_visit_date, MAX(visit_date) AS last_visit_date, COUNT(*) AS record_count, COUNT(DISTINCT source_file) AS source_count FROM staging WHERE patient_id IS NOT NULL AND visit_date IS NOT NULL GROUP BY patient_id ) TO data/mart/analysis_dataset.parquet (FORMAT PARQUET) )if__name____main__:build_staging()build_ads()print(ADS generated: data/mart/analysis_dataset.parquet)这段代码没有追求复杂而是先保证三件事字段统一、日期可解析、分析准备表可复跑。后续再把映射配置外置、异常行输出、任务元数据记录补上。质控规则不要只看任务是否成功ETL任务成功不代表数据可用。真实项目里最常见的问题是数据能跑完但关键字段缺失率突然升高或者某个来源的编码规则变了。Great Expectations 可以把质控规则沉淀成可执行资产。示例规则如下importgreat_expectationsasgximportpandasaspd dfpd.read_parquet(data/staging/rwr_staging.parquet)contextgx.get_context()validatorcontext.sources.pandas_default.read_dataframe(df)validator.expect_column_values_to_not_be_null(patient_id)validator.expect_column_values_to_not_be_null(visit_date,mostly0.95)validator.expect_column_values_to_be_in_set(status_code,[A,B,C,UNKNOWN],mostly0.98)resultvalidator.validate()print(success:,result.success)foriteminresult.results:ifnotitem.success:print(item.expectation_config.expectation_type)print(item.result)这里的0.95、0.98只是工程示例不代表任何行业标准。实际项目应把缺失率、枚举范围、异常升级规则写进研究数据管理计划并由项目负责人确认。Airflow编排让返工点变得可定位当数据源增加后手动运行脚本很快会失控。Airflow 的价值不在于“显得平台化”而在于把每个节点的输入、输出和失败原因拆开。fromairflowimportDAGfromairflow.operators.bashimportBashOperatorfromdatetimeimportdatetimewithDAG(dag_idrwr_data_quality_pipeline,start_datedatetime(2026,5,26),scheduleNone,catchupFalse,tags[rwr,data-quality],)asdag:cleanBashOperator(task_idclean_and_stage,bash_commandpython src/extract_clean.py)validateBashOperator(task_idrun_quality_check,bash_commandpython src/validate.py)build_adsBashOperator(task_idbuild_analysis_dataset,bash_commandpython src/build_ads.py)cleanvalidatebuild_ads我的经验是质控节点不要放到最后。清洗后先校验校验通过再生成分析准备表否则分析侧发现问题时往往已经不知道是哪一批原始数据或哪次映射修改引入的。优化点效率来自少返工而不只是跑得快这类链路的性能优化不应只盯 CPU 时间。更重要的是减少返工次数、减少人工核对范围、减少口径争议。可以优先做四个优化原始数据落盘只读清洗结果用 Parquet 保存避免重复解析 CSV。字段映射、枚举映射、缺失原因映射全部配置化并记录版本号。对失败质控输出明细样本让数据管理员可以直接定位问题行。对常见映射冲突使用 AI 生成候选解释但所有规则变更必须走审核。如果数据量达到千万级记录DuckDB 通常比纯 Pandas Join 更稳尤其适合本地批处理和容器化任务。若后续进入多团队协作再考虑迁移到更完整的数据湖或仓库架构。踩坑记录几个容易被低估的问题第一字段名相同不代表语义相同。比如status_code在不同来源里可能表示采集状态、随访状态或记录状态不能只按字段名自动合并。第二缺失值要区分“未采集”“不适用”“未知”“解析失败”。如果都写成空值后续分析准备阶段会丢失大量上下文。第三质控报告要给到可执行结果。只输出“校验失败”没有意义至少要包含失败字段、失败比例、样例记录和数据来源。第四AI生成的清洗代码需要测试集。可以维护一批脱敏样例数据覆盖日期格式、编码大小写、重复记录和缺失原因等常见情况。结论先把数据底座做稳再谈分析提效AI可以帮助真实世界研究的数据团队更快完成字段识别、规则草拟和异常解释但工程链路必须保持可复跑、可审计、可人工确认。Pandas、DuckDB、Airflow 和 Great Expectations 的组合适合从轻量项目起步逐步沉淀成稳定的数据治理流程。下一步建议先选一个数据源做端到端闭环字段映射、清洗落盘、质控报告、ADS生成全部跑通。数据底座稳定以后再把AI用于规则推荐、质控摘要和分析脚本生成效率提升才不会建立在不可靠的数据之上。本文文献检索、文献挖掘以及文献翻译采用的是【超能文献| AI文献检索|AI文档翻译】。