ETDataset 数据集预处理实战:从原始CSV到PyTorch DataLoader的5个关键步骤 ETDataset 数据集预处理实战从原始CSV到PyTorch DataLoader的5个关键步骤电力变压器温度预测是能源管理领域的重要课题ETDataset作为业内广泛使用的时间序列数据集为研究者提供了丰富的实战素材。本文将手把手带你完成从原始CSV文件到可投入模型训练的PyTorch DataLoader的全流程重点解决实际工程中的五个核心挑战。1. 数据加载与初步探索任何数据预处理流程的第一步都是理解数据结构和内容。ETDataset通常以CSV格式存储包含分钟级(ETTm)和小时级(ETTh)两种时间粒度。我们先使用pandas加载数据并检查基本特征import pandas as pd # 加载小时级数据示例 df pd.read_csv(ETTh1.csv) print(f数据集形状: {df.shape}) print(df.head()) # 检查数据类型和缺失值 print(\n数据类型统计:) print(df.dtypes) print(\n缺失值统计:) print(df.isnull().sum())典型输出会显示8个特征列date: 时间戳HUFL/HULL: 高使用率有效/无效负载MUFL/MULL: 中等使用率有效/无效负载LUFL/LULL: 低使用率有效/无效负载OT: 油温预测目标注意实际项目中建议使用pathlib代替直接字符串路径增强代码可移植性。例如from pathlib import Path data_path Path(data/ETTh1.csv)时间序列数据的探索性分析(EDA)至关重要。快速绘制特征分布和趋势图import matplotlib.pyplot as plt df[OT].plot(titleOil Temperature Trend) plt.xlabel(Time) plt.ylabel(Temperature) plt.show()2. 时间戳解析与索引处理正确处理时间特征是时间序列分析的基础。ETDataset中的date列需要转换为datetime类型并设为索引df[date] pd.to_datetime(df[date]) df.set_index(date, inplaceTrue) # 检查时间序列连续性 time_diff df.index.to_series().diff() print(f时间间隔统计:\n{time_diff.value_counts()})对于存在不规则间隔的数据我们需要进行重采样。例如将分钟级数据统一为15分钟间隔# 只适用于ETTm数据集 df_resampled df.resample(15T).mean()处理大型数据集时可以考虑使用dask库进行分布式加载import dask.dataframe as dd ddf dd.read_csv(ETTh1.csv, parse_dates[date]) ddf ddf.set_index(date)3. 特征工程与归一化电力负载特征通常存在量纲差异需要进行标准化处理。我们使用Scikit-learn的ColumnTransformer实现差异化处理from sklearn.preprocessing import StandardScaler, MinMaxScaler from sklearn.compose import ColumnTransformer # 定义特征分组 load_features [HUFL, HULL, MUFL, MULL, LUFL, LULL] target_feature [OT] # 创建转换器 preprocessor ColumnTransformer( transformers[ (load, StandardScaler(), load_features), (temp, MinMaxScaler(), target_feature) ], remainderpassthrough ) # 应用转换 df_scaled pd.DataFrame( preprocessor.fit_transform(df), columnsload_features target_feature, indexdf.index )对于时间序列数据我们通常还需要创建滞后特征# 创建24小时滞后特征 for feature in load_features: df_scaled[f{feature}_lag24] df_scaled[feature].shift(24) # 删除因滞后产生的缺失值 df_scaled.dropna(inplaceTrue)4. 缺失值处理与异常检测尽管ETDataset已经过预处理实际项目中仍需处理数据质量问题。以下是常见的处理方法缺失值处理策略对比表方法适用场景优点缺点线性插值少量连续缺失保持趋势可能低估波动前向填充设备短暂故障简单快速延长异常影响季节性均值周期性数据保留周期特征忽略近期变化模型预测大量缺失精度高计算成本高异常值检测可以使用滑动窗口统计法def detect_anomalies(series, window24, sigma3): rolling_mean series.rolling(window).mean() rolling_std series.rolling(window).std() return series[(series - rolling_mean).abs() sigma * rolling_std] anomalies detect_anomalies(df_scaled[OT]) print(f检测到异常值数量: {len(anomalies)})5. 构建时序样本与DataLoader时间序列预测需要将数据转换为监督学习格式。我们实现一个灵活的滑动窗口生成器import torch from torch.utils.data import Dataset, DataLoader class ETDataset(Dataset): def __init__(self, data, window_size24, horizon12): self.data torch.FloatTensor(data.values) self.window_size window_size self.horizon horizon def __len__(self): return len(self.data) - self.window_size - self.horizon 1 def __getitem__(self, idx): x self.data[idx:idxself.window_size, :-1] # 除最后一列外的所有特征 y self.data[idxself.window_size:idxself.window_sizeself.horizon, -1] # 最后一列是目标 return x, y # 示例使用 dataset ETDataset(df_scaled, window_size24*7, horizon24) # 使用一周数据预测下一天 dataloader DataLoader(dataset, batch_size32, shuffleTrue) # 检查一个批次 for x, y in dataloader: print(f输入形状: {x.shape}, 输出形状: {y.shape}) break提示对于多GPU训练可以使用DistributedSampler优化数据加载from torch.utils.data.distributed import DistributedSampler sampler DistributedSampler(dataset) dataloader DataLoader(dataset, batch_size32, samplersampler)完整的数据处理流程还应包括数据集分割。时间序列需要按时间顺序划分# 按时间划分训练/验证/测试集 train_size int(0.7 * len(df_scaled)) val_size int(0.2 * len(df_scaled)) train_data df_scaled.iloc[:train_size] val_data df_scaled.iloc[train_size:train_sizeval_size] test_data df_scaled.iloc[train_sizeval_size:] print(f数据集划分: 训练集 {len(train_data)}, 验证集 {len(val_data)}, 测试集 {len(test_data)})高级技巧与性能优化处理大型时间序列数据集时内存和计算效率至关重要。以下是几个实用技巧内存映射技术对于超大型数据集可以使用numpy的memmap功能import numpy as np # 将数据保存为memmap格式 fp np.memmap(temp.mmap, dtypefloat32, modew, shapedf_scaled.values.shape) fp[:] df_scaled.values[:] del fp # 刷新到磁盘 # 后续加载 data np.memmap(temp.mmap, dtypefloat32, moder, shapedf_scaled.values.shape)并行预处理使用joblib加速特征工程from joblib import Parallel, delayed def process_feature(col): return df[col].rolling(24).mean() results Parallel(n_jobs4)(delayed(process_feature)(col) for col in load_features)自定义DataLoader实现更高效的时间序列采样class SequentialSampler(torch.utils.data.Sampler): def __init__(self, data_source, stride1): self.data_source data_source self.stride stride def __iter__(self): n len(self.data_source) return iter(range(0, n, self.stride)) def __len__(self): return len(range(0, len(self.data_source), self.stride)) sampler SequentialSampler(dataset, stride12) dataloader DataLoader(dataset, batch_size32, samplersampler)工程化部署考虑将预处理流程产品化时需要关注以下方面可复现性使用joblib保存预处理管道from joblib import dump dump(preprocessor, preprocessor.joblib)监控数据漂移定期检查特征分布变化from scipy import stats def detect_drift(reference, current, alpha0.05): pvalues [] for col in reference.columns: _, p stats.ks_2samp(reference[col], current[col]) pvalues.append(p alpha) return sum(pvalues) / len(pvalues)自动化测试为数据质量添加断言检查assert df_scaled.isnull().sum().sum() 0, 存在缺失值未处理 assert (df_scaled.index pd.Series(df_scaled.index).drop_duplicates()).all(), 索引不唯一性能基准记录各步骤耗时from time import perf_counter class Timer: def __enter__(self): self.start perf_counter() return self def __exit__(self, *args): self.end perf_counter() self.duration self.end - self.start with Timer() as t: preprocessor.fit_transform(df) print(f预处理耗时: {t.duration:.2f}秒)