用Python搞定NCDC气象数据:从FTP下载到Excel可视化全流程(附避坑代码)
用Python搞定NCDC气象数据从FTP下载到Excel可视化全流程附避坑代码气象数据分析在科研、农业、能源等领域具有广泛应用价值。美国国家海洋与大气管理局NOAA下属的国家气候数据中心NCDC提供了全球范围的气象观测数据这些数据以标准化格式存储在FTP服务器上供研究人员免费获取。本文将详细介绍如何用Python构建一个完整的自动化工作流从FTP下载原始数据到最终生成可视化Excel报告涵盖数据处理全流程中的关键技术要点和常见问题解决方案。1. 环境准备与数据获取在开始处理NCDC数据之前需要确保Python环境中安装了必要的库。推荐使用conda或pip安装以下依赖pip install pandas numpy requests ftplib openpyxl matplotlibNCDC的数据存储在FTP服务器上按年份和站点组织。我们可以使用Python的ftplib库来自动化下载过程。以下是一个健壮的FTP下载函数包含错误处理和断点续传功能import os import ftplib from tqdm import tqdm def download_ncdc_data(station_id, year, save_dir): 从NCDC FTP服务器下载指定站点和年份的气象数据 参数: station_id: 气象站ID (如592871) year: 年份 (如2022) save_dir: 本地保存目录 ftp ftplib.FTP(ftp.ncdc.noaa.gov) ftp.login() # 匿名登录 remote_path f/pub/data/noaa/isd-lite/{year}/ filename f{station_id}-99999-{year}.gz local_path os.path.join(save_dir, filename) try: ftp.cwd(remote_path) with open(local_path, wb) as f: size ftp.size(filename) with tqdm(totalsize, unitB, unit_scaleTrue, descfilename) as pbar: def callback(data): f.write(data) pbar.update(len(data)) ftp.retrbinary(fRETR {filename}, callback) print(f成功下载: {filename}) except Exception as e: print(f下载失败: {e}) if os.path.exists(local_path): os.remove(local_path) # 删除可能不完整的文件 finally: ftp.quit()常见问题处理FTP连接超时添加重试机制文件不存在提前检查远程文件列表网络中断实现断点续传功能2. 数据解析与清洗NCDC的原始数据采用固定格式的文本文件没有表头各列数据以空格分隔。我们需要先解压.gz文件然后解析数据内容。以下是完整的解析和清洗流程import gzip import pandas as pd import numpy as np def process_ncdc_file(filepath): 处理单个NCDC数据文件返回清洗后的DataFrame 参数: filepath: 本地文件路径 (.gz格式) # 列定义 (根据isd-lite-format.txt) columns [year, month, day, hour, temp, dew_point, pressure, wind_dir, wind_speed, cloud_cover, precip_1h, precip_6h] # 解压并读取数据 with gzip.open(filepath, rt) as f: data [line.strip().split() for line in f] # 转换为DataFrame df pd.DataFrame(data, columnscolumns).astype(float) # 处理缺失值 (原始数据中用-9999表示) df.replace(-9999, np.nan, inplaceTrue) # 单位换算 (原始数据需要除以10) scale_cols [temp, dew_point, pressure, wind_speed, precip_1h, precip_6h] df[scale_cols] df[scale_cols] / 10 # 创建时间索引 df[datetime] pd.to_datetime(df[[year, month, day, hour]]) df.set_index(datetime, inplaceTrue) df.drop([year, month, day, hour], axis1, inplaceTrue) return df数据质量检查要点检查时间序列的连续性验证数值范围是否合理统计缺失值比例检查单位换算是否正确3. 多站点数据合并与重采样实际应用中我们通常需要处理多个站点的数据并将高频数据如每小时转换为低频数据如每日。以下是实现这些功能的代码def resample_daily(df, station_id): 将小时数据重采样为日数据 参数: df: 原始DataFrame (小时数据) station_id: 站点ID (用于列命名) 返回: 包含日统计数据的DataFrame # 温度相关指标 temp_stats df[temp].resample(D).agg([mean, max, min]) temp_stats.columns [f{station_id}_temp_{x} for x in temp_stats.columns] # 降水相关指标 precip_stats df[precip_1h].resample(D).sum() precip_stats.name f{station_id}_precip_total # 合并结果 return pd.concat([temp_stats, precip_stats], axis1) def process_multiple_stations(station_ids, year, data_dir): 处理多个站点的数据并合并结果 参数: station_ids: 站点ID列表 year: 年份 data_dir: 数据存储目录 all_daily [] for sid in station_ids: try: # 下载数据 (如果不存在) filename f{sid}-99999-{year}.gz filepath os.path.join(data_dir, filename) if not os.path.exists(filepath): download_ncdc_data(sid, year, data_dir) # 处理数据 df process_ncdc_file(filepath) daily_df resample_daily(df, sid) all_daily.append(daily_df) except Exception as e: print(f处理站点 {sid} 时出错: {e}) continue # 合并所有站点数据 return pd.concat(all_daily, axis1)性能优化技巧使用多线程下载多个站点数据对大数据集使用Dask替代Pandas缓存中间结果避免重复计算使用HDF5格式存储临时数据4. 数据可视化与Excel导出最终我们需要将处理好的数据导出为Excel并添加可视化图表。以下是完整的导出函数def export_to_excel(df, output_path, station_ids): 将处理好的数据导出到Excel并添加可视化图表 参数: df: 要导出的DataFrame output_path: 输出文件路径 station_ids: 站点ID列表 (用于图表标题) # 创建Excel写入对象 writer pd.ExcelWriter(output_path, enginexlsxwriter) df.to_excel(writer, sheet_nameDaily Data) # 获取workbook和worksheet对象 workbook writer.book worksheet writer.sheets[Daily Data] # 添加温度趋势图 temp_chart workbook.add_chart({type: line}) for i, sid in enumerate(station_ids): col i * 4 1 # 每站点占4列(mean,max,min,precip) temp_chart.add_series({ name: f{sid} Mean Temp, categories: [Daily Data, 1, 0, len(df), 0], values: [Daily Data, 1, col, len(df), col], }) temp_chart.set_title({name: Daily Temperature Trends}) temp_chart.set_x_axis({name: Date}) temp_chart.set_y_axis({name: Temperature (°C)}) worksheet.insert_chart(H2, temp_chart) # 添加降水柱状图 precip_chart workbook.add_chart({type: column}) for i, sid in enumerate(station_ids): col i * 4 4 # 降水数据在第4列 precip_chart.add_series({ name: f{sid} Precipitation, categories: [Daily Data, 1, 0, len(df), 0], values: [Daily Data, 1, col, len(df), col], }) precip_chart.set_title({name: Daily Precipitation}) precip_chart.set_x_axis({name: Date}) precip_chart.set_y_axis({name: Precipitation (mm)}) worksheet.insert_chart(H20, precip_chart) # 保存Excel文件 writer.save()Excel输出优化建议添加数据验证和条件格式设置合理的数字格式添加数据说明工作表优化图表布局和样式添加自动筛选功能5. 完整工作流整合将上述各步骤整合为一个完整的自动化工作流def ncdc_workflow(station_ids, year, output_file): NCDC数据处理完整工作流 参数: station_ids: 站点ID列表 year: 年份 output_file: 输出Excel文件路径 # 创建临时目录 data_dir temp_ncdc_data os.makedirs(data_dir, exist_okTrue) try: # 处理所有站点数据 print(开始处理站点数据...) combined_df process_multiple_stations(station_ids, year, data_dir) # 导出到Excel print(导出到Excel...) export_to_excel(combined_df, output_file, station_ids) print(f处理完成! 结果已保存到: {output_file}) finally: # 清理临时文件 for f in os.listdir(data_dir): os.remove(os.path.join(data_dir, f)) os.rmdir(data_dir) # 示例用法 if __name__ __main__: stations [592871, 579720, 578660] # 示例站点ID ncdc_workflow(stations, 2022, ncdc_weather_data.xlsx)错误处理与日志记录添加详细的日志记录实现电子邮件通知功能保存处理状态以便恢复验证输入参数的有效性6. 高级应用与扩展对于更复杂的应用场景可以考虑以下扩展功能1. 自动化数据更新def auto_update_data(station_ids, output_file, yearsNone): 自动更新数据到最新版本 参数: station_ids: 站点ID列表 output_file: 输出文件路径 years: 要处理的年份列表 (None表示自动检测最新年份) if years is None: current_year pd.Timestamp.now().year years range(current_year - 5, current_year 1) # 最近5年当年 all_data [] for year in years: try: year_data process_multiple_stations(station_ids, year, temp_ncdc_data) year_data[Year] year all_data.append(year_data) except Exception as e: print(f处理 {year} 年数据时出错: {e}) combined pd.concat(all_data) combined.to_excel(output_file)2. 地理空间分析结合地理信息可以增强数据分析能力import geopandas as gpd from shapely.geometry import Point def add_spatial_info(df, station_info): 为数据添加空间信息 参数: df: 气象数据DataFrame station_info: 包含站点位置信息的GeoDataFrame # 创建几何列 geometry [Point(xy) for xy in zip(station_info[LON], station_info[LAT])] gdf gpd.GeoDataFrame(station_info, geometrygeometry, crsEPSG:4326) # 空间连接或其他空间分析 # ... return gdf3. 机器学习应用气象数据可用于各种预测模型from sklearn.ensemble import RandomForestRegressor from sklearn.model_selection import train_test_split def build_weather_model(df, targettemp_mean): 构建简单的天气预测模型 参数: df: 包含历史天气数据的DataFrame target: 要预测的目标变量 # 准备特征和目标变量 features df.drop(columns[target]) target df[target] # 划分训练测试集 X_train, X_test, y_train, y_test train_test_split( features, target, test_size0.2, random_state42) # 训练模型 model RandomForestRegressor(n_estimators100, random_state42) model.fit(X_train, y_train) # 评估模型 score model.score(X_test, y_test) print(f模型R^2分数: {score:.2f}) return model4. Web应用集成使用Dash或Streamlit创建交互式可视化import streamlit as st def create_weather_app(data): 创建简单的气象数据可视化应用 参数: data: 处理好的气象数据DataFrame st.title(NCDC气象数据可视化) # 站点选择 stations list(set(col.split(_)[0] for col in data.columns)) selected st.multiselect(选择站点, stations, defaultstations[:1]) # 变量选择 variables [temp, precip] var st.selectbox(选择变量, variables) # 时间范围选择 date_range st.slider(选择日期范围, min_valuedata.index.min(), max_valuedata.index.max(), value(data.index.min(), data.index.max())) # 过滤数据 filtered data.loc[date_range[0]:date_range[1]] # 绘制图表 fig, ax plt.subplots(figsize(10, 4)) for sid in selected: col f{sid}_{var}_mean if var temp else f{sid}_{var}_total filtered[col].plot(axax, labelsid) ax.set_title(f{var}变化趋势) ax.legend() st.pyplot(fig)