1. 安装官方 Miniconda替代​ curl -O https://repo.anaconda.com/miniconda/Miniconda3-latest-MacOSX-arm64.sh bash Miniconda3-latest-MacOSX-arm64.sh -b -p $HOME/miniconda3 source $HOME/miniconda3/bin/activate conda init zsh ​2.接受服务条款接受 main 渠道的服务条款conda tos accept --override-channels --channel https://repo.anaconda.com/pkgs/main接受 r 渠道的服务条款conda tos accept --override-channels --channel https://repo.anaconda.com/pkgs/r执行后应该显示类似 Terms of Service accepted 的提示。注常用 conda 操作参考操作命令列出所有环境conda env list 或 conda info --envs创建新环境conda create -n 环境名 python3.x -y激活环境conda activate 环境名退出环境conda deactivate删除环境conda remove -n 环境名 --all安装包conda install 包名升级 condaconda update conda查看已安装包conda list3.建立虚拟环境conda create -n cslstm python3.10 -y验证环境列表conda env list激活环境conda activate cslstm4.安装pytorch # 默认从 PyPI 拉到 ARM64 版pip install torch torchvision torchaudio5.安装深度学习常用包# 数据加载和科学计算pip install numpy pandas scikit-learn matplotlib# 实验跟踪和可视化pip install wandb tensorboard# 数据处理pip install opencv-python pillow scikit-image# 深度学习工具pip install timm datasets# 科学计算 小波 数据工具pip install tqdm pip install PyWavelets # 小波变换模块① pip install scipy # FFT# 安装 pywtpip install pywt PyWavelets# 安装 TensorBoardpip install tensorboard注numpy要在2以下a.先查看当前 NumPy 版本python -c import numpy; print(numpy.__version__)b.降级到 NumPy 1.xpip install numpy2c.验证降级成功python -c import numpy; print(numpy.__version__)# 预期输出1.26.x 或 1.24.x6.验证 MPS 可用python -c import torch;print(torch.backends.mps.is_available())7.官方代码仓库可以手动下载git clone https://github.com/NESA-Lab/Contextual-and-Seasonal-LSTMs-for-TSAD.gitcd Contextual-and-Seasonal-LSTMs-for-TSAD8.修改代码指定用cpu跑cs_lstms.pyimport torch import numpy as np from torch import nn from time_series_decomp import Series_decop class CSLSTMs(nn.Module): def __init__( self, hp, ): super(CSLSTMs, self).__init__() self.hp hp self.lstm nn.LSTM(2 * self.hp.contextual_window 2, self.hp.d_model // 4, batch_firstTrue, dropoutself.hp.dropout_rate) self.cycle_lstm nn.LSTM(2 * self.hp.seasonal_window 2, self.hp.d_model, batch_firstTrue, dropoutself.hp.dropout_rate) self.series_decop Series_decop(levelint(np.round(np.log2(self.hp.window)) - 4), hpself.hp) self.hour_linear nn.Sequential( nn.Linear(self.hp.d_model // 4, self.hp.d_model // 4), nn.Tanh(), nn.Linear(self.hp.d_model // 4, self.hp.d_model // 4), nn.Tanh(), nn.Linear(self.hp.d_model // 4, self.hp.d_model // 4), nn.Tanh(), nn.Linear(self.hp.d_model // 4, self.hp.contextual_window 2), nn.Tanh() ) self.day_linear nn.Sequential( nn.Linear(self.hp.d_model, self.hp.d_model), nn.Tanh(), nn.Linear(self.hp.d_model, self.hp.d_model), nn.Tanh(), nn.Linear(self.hp.d_model, self.hp.d_model), nn.Tanh(), nn.Linear(self.hp.d_model, self.hp.seasonal_window 2), nn.Tanh() ) self.hour_mu nn.Linear(self.hp.contextual_window, self.hp.contextual_window) self.hour_log_var nn.Linear(self.hp.contextual_window, self.hp.contextual_window) self.day_mu nn.Linear(self.hp.seasonal_window, self.hp.seasonal_window) self.day_log_var nn.Linear(self.hp.seasonal_window, self.hp.seasonal_window) def get_input(self, trend_signal): # 取最后 24 个元素求一天内的影响 hour_input trend_signal.clone() hour_input hour_input[:, :, -self.hp.contextual_window * 4:] hour_input hour_input.unfold(dimension2, sizeself.hp.contextual_window, stepself.hp.step) hour_input hour_input.squeeze(1) f_hour_input hour_input.clone() f_hour_input[:, -1, -1] 0 f_hour_input self.get_freq(f_hour_input) hour_input torch.cat((hour_input, f_hour_input), dim-1) # 整个窗口进行切分求天与天之间的关系 day_input trend_signal.clone() day_input day_input.unfold(dimension2, sizeself.hp.seasonal_window, stepself.hp.cycle) day_input day_input.squeeze(1) f_day_input day_input.clone() f_day_input[:, -1, -1] 0 f_day_input self.get_freq(f_day_input) day_input torch.cat((day_input, f_day_input), dim-1) return day_input, hour_input def get_freq(self, input): [修改] 将 FFT 计算强制在 CPU 上进行避免 MPS 不支持复数操作 original_device input.device input_cpu input.to(cpu) f_global torch.fft.rfft(input_cpu, dim-1) f_global torch.cat((f_global.real, f_global.imag), dim-1) return f_global.to(original_device) def encode(self, trend_signal): day_input, hour_input self.get_input(trend_signal) # 计算 hour result result, h self.lstm(hour_input) result_hour self.hour_linear(result[:, -1, :]).unsqueeze(1) hour_real result_hour[:, :, :(self.hp.contextual_window // 2 1)] hour_imag result_hour[:, :, (self.hp.contextual_window // 2 1):] f_hour torch.stack((hour_real, hour_imag), dim-1) # [修改] 复数操作迁移到 CPU f_hour_cpu f_hour.to(cpu) f_hour_cpu torch.view_as_complex(f_hour_cpu) result_hour_cpu torch.fft.irfft(f_hour_cpu) f_hour result_hour_cpu.to(result_hour.device) hour_mu self.hour_mu(f_hour) hour_log_var self.hour_log_var(f_hour) # 计算 day result result, h self.cycle_lstm(day_input) result_day self.day_linear(result[:, -1, :]).unsqueeze(1) day_real result_day[:, :, :(self.hp.seasonal_window // 2 1)] day_imag result_day[:, :, (self.hp.seasonal_window // 2 1):] f_day torch.stack((day_real, day_imag), dim-1) # [修改] 复数操作迁移到 CPU f_day_cpu f_day.to(cpu) f_day_cpu torch.view_as_complex(f_day_cpu) result_day_cpu torch.fft.irfft(f_day_cpu) f_day result_day_cpu.to(result_day.device) day_mu self.day_mu(f_day) day_log_var self.day_log_var(f_day) return hour_mu, hour_log_var, day_mu, day_log_var def forward(self, input, input_normal, mode, mask): if mode train: loss self.loss_func(input, input_normal, mask) return loss elif mode valid: loss self.loss_func(input, input_normal, mask) return loss else: return self.reference(input, input_normal) def loss_func(self, input, input_normal, mask): trend_signal self.series_decop(input) hour_mu, hour_log_var, day_mu, day_log_var self.encode(input) trend_day trend_signal[:, :, -self.hp.seasonal_window:].squeeze(1) trend_hour trend_signal[:, :, -self.hp.contextual_window:].squeeze(1) input_day input_normal[:, :, -self.hp.seasonal_window:].squeeze(1) input_hour input_normal[:, :, -self.hp.contextual_window:].squeeze(1) mask_day mask[:, -self.hp.seasonal_window:] mask_hour mask[:, -self.hp.contextual_window:] hour_mu hour_mu.squeeze(1) hour_log_var hour_log_var.squeeze(1) day_mu day_mu.squeeze(1) day_log_var day_log_var.squeeze(1) input_day input_day * mask_day trend_day * torch.logical_not(mask_day) input_hour input_hour * mask_hour trend_hour * torch.logical_not(mask_hour) hour_recon_loss torch.mean( 0.5 * torch.mean((hour_log_var (input_hour - hour_mu) ** 2 / torch.exp(hour_log_var)), dim-1), dim0 ) day_recon_loss torch.mean( 0.5 * torch.mean((day_log_var (input_day - day_mu) ** 2 / torch.exp(day_log_var)), dim-1), dim0 ) recon_loss day_recon_loss hour_recon_loss loss recon_loss if torch.isinf(loss): raise return loss def reference(self, input, input_normal): loss 0 input_hour input_normal[:, :, -self.hp.contextual_window:] input_day input_normal[:, :, -self.hp.seasonal_window:] hour_mu, hour_log_var, day_mu, day_log_var self.encode(input) hour_loss (hour_log_var (input_hour - hour_mu) ** 2 / torch.exp(hour_log_var)) hour_loss hour_loss[:, :, -1].unsqueeze(2) cycle_loss (day_log_var (input_day - day_mu) ** 2 / torch.exp(day_log_var)) cycle_loss cycle_loss[:, :, -1].unsqueeze(2) loss hour_loss cycle_loss recon_x (hour_mu[:, :, -1].unsqueeze(2) day_mu[:, :, -1].unsqueeze(2)) / 2 return loss, recon_xtrain.pyimport os # [新增] 强制使用 CPU避免 MPS 的 ComplexFloat 问题 os.environ[PYTORCH_ENABLE_MPS_FALLBACK] 1 os.environ[TORCH.backends.mps.enabled] 0 import logging import numpy as np import torch from pytorch_lightning import Trainer from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint from model import MyCSLSTMs from pytorch_lightning.loggers import TensorBoardLogger import argparse import time import json from collections import defaultdict SEED 8 torch.manual_seed(SEED) torch.cuda.manual_seed_all(SEED) np.random.seed(SEED) logger TensorBoardLogger(namelogs, save_dir./) def main(hparams): print(loading model...) model MyCSLSTMs(hparams) print(model built) early_stop EarlyStopping( monitorval_loss_valid_epoch, patience3, verboseTrue, modemin ) checkpoint ModelCheckpoint( dirpath./ckpt/, filename{}.format(hparams.data_name), monitorval_loss_valid_epoch, modemin, ) # 【修改】自动检测可用设备优先 MPS (Mac)其次是 CPU if torch.backends.mps.is_available(): accelerator mps devices 1 print(Using MPS (Apple Silicon GPU)) elif torch.cuda.is_available(): accelerator gpu devices [hparams.gpu] print(Using CUDA GPU) else: accelerator cpu devices 1 print(Using CPU only) trainer Trainer( max_epochshparams.max_epoch, callbacks[early_stop, checkpoint], loggerlogger, acceleratoraccelerator, devicesdevices, check_val_every_n_epoch1, gradient_clip_algorithmvalue, gradient_clip_val2, ) print(fit start) train_loader model.mydataloader(train) val_loader model.mydataloader(valid) trainer.fit(model, train_dataloaderstrain_loader, val_dataloadersval_loader) start_time time.time() trainer.test(model, dataloadersmodel.mydataloader(test)) end_time time.time() print(View tensorboard logs by running\ntensorboard --logdir %s % os.getcwd()) print(and going to http://localhost:6006 on your browser) print(end_time - start_time) def compute_json_values_mean(json_list): 读取文件内包含的JSON数组对每个字段求均值。 假设所有JSON对象字段一致且值为数字。 参数: json_file_path (str): JSON文件路径 返回: dict: 每个字段的均值字典 if not json_list: return {} field_sums defaultdict(float) field_counts defaultdict(int) for json_obj in json_list: if isinstance(json_obj, dict): for key, value in json_obj.items(): if isinstance(value, (int, float)): field_sums[key] value field_counts[key] 1 else: print(fWarning: Skipping non-dictionary item in list: {json_obj}) averages {} for key in field_sums: if field_counts[key] 0: averages[key] field_sums[key] / field_counts[key] return averages if __name__ __main__: parser MyCSLSTMs.add_model_specific_args() hyperparams parser.parse_args() print(fRUNNING) if hyperparams.only_test 1: model MyCSLSTMs.load_from_checkpoint(checkpoint_pathhyperparams.ckpt_path) model.hp hyperparams print(model.hp) # 【修改】自动检测可用设备优先 CPU兼容复数操作其次 MPS if torch.cuda.is_available(): accelerator gpu devices [hparams.gpu] print(Using CUDA GPU) elif torch.backends.mps.is_available(): # 检查是否有复数操作需求如果有则使用 CPU accelerator cpu devices 1 print(MPS available but using CPU for ComplexTensor compatibility) else: accelerator cpu devices 1 print(Using CPU only) trainer Trainer(acceleratoraccelerator, devicesdevices) trainer.test(model, dataloadersmodel.mydataloader(test)) else: main(hyperparams)time_series_decomp.pyimport pywt import torch.nn as nn import torch import numpy as np class Series_decop(nn.Module): def __init__(self, wavletdb4, level5, hpNone): super().__init__() self.wavlet wavlet self.level level self.hp hp def forward(self, x): signal x.clone().cpu().detach().numpy() coeffs pywt.wavedec(signal[:,:], self.wavlet, levelself.level) # 估计噪声阈值 eps 1e-10 d np.sqrt(2*np.log(signal.shape[-1])) for i in range(1, self.level1): sigma np.median(np.abs(coeffs[i])) / 0.6745 sigma max(sigma, eps) uthresh sigma*d # 对细节系数应用软阈值 coeffs[i] pywt.threshold(coeffs[i], uthresh, modesoft) # 【修改】自动检测可用设备 device torch.device(mps) if torch.backends.mps.is_available() else torch.device(cpu) trend_signal torch.tensor(pywt.waverec(coeffs, self.wavlet)).to(device) return trend_signal训练数据python train.py \ --data_name AIOPS \ #修改数据集名 --data_dir ./data/AIOPS \ #修改数据集名 --window 240 \ --seasonal_window 48 \ --cycle 48 \ --contextual_window 4 \ --step 2 \ --save_file ./result.txt \ --learning_rate 0.0005 \ --max_epoch 100 \ --batch_size 64 \ --gpu 0 \ --num_workers 2