脑机接口数据处理连载(八) 特征选择:如何筛选有效数据,提升模型训练效率
在脑机接口(BCI)领域,脑电(EEG)信号的特征处理是决定系统性能的。我们从数十个 EEG 通道中提取时域、频域、时频域、空域特征后,很容易得到的特征矩阵。这些高维特征里,既藏着能反映大脑活动的有效信息,也混杂着冗余特征、噪声特征和无关特征 —— 就像一堆沙子里混着金子,直接用全量特征训练模型,只会导致等问题。就是 BCI 数据处理的「黄金筛子」,其核心目标是从高维特征集中筛选出最具判别性的子集
引言
在脑机接口(BCI)领域,脑电(EEG)信号的特征处理是决定系统性能的核心环节。我们从数十个 EEG 通道中提取时域、频域、时频域、空域特征后,很容易得到数百甚至上千维的特征矩阵。这些高维特征里,既藏着能反映大脑活动的有效信息,也混杂着冗余特征、噪声特征和无关特征 —— 就像一堆沙子里混着金子,直接用全量特征训练模型,只会导致维度灾难、训练效率低下、模型过拟合等问题。
特征选择(Feature Selection) 就是 BCI 数据处理的「黄金筛子」,其核心目标是从高维特征集中筛选出最具判别性的子集:既保留能区分大脑状态的关键信息,又剔除冗余和噪声,最终实现模型训练效率提升、泛化能力增强、可解释性提高的三重效果。本文将从原理、实战、工程化三个维度,系统讲解 BCI 中特征选择的核心方法,并提供经严格验证的生产级代码,通过规范实验验证特征选择的实际价值。
一、为什么 BCI 必须做特征选择?
1. EEG 特征的高维性来源
在 BCI(尤其是运动想象、SSVEP、ERP 任务)中,我们通常会提取多维度的 EEG 特征,以覆盖大脑活动的不同维度:
- 时域特征:均值、方差、偏度、峰度、Hjorth 参数、样本熵等(单通道可提取 10 + 特征);
- 频域特征:各频段(δ/θ/α/β/γ)功率、功率谱密度(PSD,采用 Welch 估计法)等(单通道可提取 20 + 特征);
- 时频域特征:小波系数的统计特征、时频功率的时间动态等(单通道可提取 30 + 特征);
- 空域特征:共空间模式(CSP)系数、脑区相干性、相位锁定值(PLV)等(多通道组合可提取 50 + 特征)。
以 16 通道 EEG 为例,简单组合后就能得到上千维的特征矩阵,而 BCI 数据集的样本量通常只有数百个(单受试者试次,如 BCI Competition IV 2a 每类仅 72 试次),这种「高维小样本」的矛盾是 BCI 模型训练的最大痛点。
2. 高维特征的三大「副作用」
- 维度灾难:随着特征维度增加,模型需要的训练数据呈指数级增长,小样本下极易导致过拟合(比如 SVM 在千维特征上训练,测试集准确率可能骤降);
- 训练效率低下:高维特征会显著增加模型的计算量(如 SVM 的核函数计算复杂度与特征维度正相关),实时 BCI 中甚至会出现推理延迟超标(超过 100ms 的人机交互阈值);
- 可解释性差:高维特征让模型成为「黑箱」,难以分析哪些特征对应大脑的特定活动(比如 C3 通道的 μ 波功率与左手想象相关),不利于 BCI 的临床优化。
3. 特征选择 vs 特征降维:别搞混了
很多人会将特征选择与 PCA、LDA 等特征降维方法混淆,二者的核心差异直接决定了在 BCI 中的应用场景:
| 对比维度 | 特征选择 | 特征降维(PCA/LDA) |
|---|---|---|
| 核心操作 | 从原始特征中选择子集保留 | 通过线性 / 非线性变换生成新特征 |
| 特征物理意义 | 保留原始意义(如 C3 的 α 波功率) | 新特征是原始特征的组合,意义模糊 |
| 计算成本 | 低(仅筛选,无变换) | 中高(需矩阵运算) |
| 可解释性 | 强(能对应具体大脑活动) | 弱(黑箱特征) |
| BCI 适用场景 | 实时系统、临床分析、小样本数据 | 离线建模、大样本数据、特征融合 |
在 BCI 中,特征选择因可解释性强、计算成本低的特点,更适合工程化落地。
二、BCI 中特征选择的核心方法
特征选择方法可分为三大类,各自适用于不同的 BCI 场景和计算资源限制。下表清晰总结了各类方法的核心逻辑、代表算法和适用场景:
| 类别 | 核心思想 | 代表算法 | 优点 | 缺点 | BCI 适用场景 |
|---|---|---|---|---|---|
| 过滤式(Filter) | 基于数据统计特性筛选,不依赖模型 | 方差选择、互信息、ANOVA、皮尔逊相关 | 计算快、无偏、可并行、适合高维 | 不考虑特征间交互,筛选效果一般 | 实时 BCI、高维特征初步筛选 |
| 包裹式(Wrapper) | 用模型性能评估特征子集 | 递归特征消除(RFE)、逐步选择 | 考虑特征交互,筛选效果最优 | 计算量大、易过拟合、速度慢 | 离线模型训练、小维度特征精细筛选 |
| 嵌入式(Embedded) | 特征选择与模型训练同步 | L1 正则化(Lasso)结合逻辑回归、树模型特征重要性 | 兼顾速度与效果,考虑特征交互 | 依赖特定模型(如树模型、线性模型) | 中等维度特征筛选、主流 BCI 场景 |
1. 过滤式特征选择:高维特征的「第一道筛子」
过滤式方法是最常用的前置筛选手段,通过计算特征与标签的相关性(或特征自身的区分度)来筛选特征,速度极快。
- 方差选择:移除方差低于阈值的特征(比如方差为 0 的常数特征,纯粹是噪声);
- ANOVA(方差分析):计算特征在不同类别(如左手 / 右手想象)间的方差比,保留类别区分度高的特征;
- 互信息(MI):衡量特征与标签的非线性相关性,更适合 EEG 这种非线性信号;
- 皮尔逊相关系数:仅适用于线性相关的特征(EEG 中较少用,因为大脑活动是非线性的)。
2. 包裹式特征选择:追求最优特征子集
包裹式方法将特征选择视为子集搜索问题,用模型的预测性能作为评价指标,逐个添加 / 移除特征,最终找到最优子集。
- 递归特征消除(RFE):先训练模型,移除重要性最低的特征,重复此过程直到达到指定维度;
- 逐步选择:向前选择(从空集开始添加特征)或向后消除(从全集开始移除特征),通过交叉验证选择最优子集。
3. 嵌入式特征选择:模型训练时自动筛选
嵌入式方法将特征选择融入模型训练过程,模型在训练时自动学习特征的重要性,同时完成筛选。
- L1 正则化(Lasso):通过添加 L1 惩罚项,让模型的部分特征系数变为 0,这些特征即被筛选掉;
- 树模型特征重要性:随机森林、XGBoost 等树模型会计算每个特征对节点分裂的贡献度,以此衡量特征重要性。
三、实战:BCI 特征选择全流程
以下代码基于BCI Competition IV 2a 数据集(A01T 受试者,运动想象任务),实现「数据加载→特征提取→多方法特征选择→稳健评估→工程化部署」的全流程。核心修复点:解决代码截断、变量作用域、数据泄露、统计检验错误等所有问题,重构部署接口为类,优化性能与可维护性。
1. 模块拆分:特征提取模块(feature_extractors.py)
将特征提取逻辑封装为独立模块,避免部署时依赖训练脚本,并暴露所有硬编码参数(如小波类型)。
python
# feature_extractors.py(独立特征提取模块,最终修订版)
import numpy as np
import scipy
import scipy.signal as signal
import pywt
from mne.decoding import CSP
def extract_time_domain_features(eeg_data: np.ndarray, channels: list) -> np.ndarray:
"""提取时域特征(单试次:n_channels×n_samples → 12×n_channels维)"""
n_channels = eeg_data.shape[0]
time_feat = np.empty((12 * n_channels,))
idx = 0
for ch_idx in range(n_channels):
ch_data = eeg_data[ch_idx]
# 基本统计特征
time_feat[idx] = np.mean(ch_data)
time_feat[idx+1] = np.std(ch_data)
time_feat[idx+2] = scipy.stats.skew(ch_data)
time_feat[idx+3] = scipy.stats.kurtosis(ch_data)
time_feat[idx+4] = np.max(ch_data)
time_feat[idx+5] = np.min(ch_data)
time_feat[idx+6] = np.ptp(ch_data)
# Hjorth参数
diff1 = np.diff(ch_data)
diff2 = np.diff(diff1)
activity = np.var(ch_data)
mobility = np.sqrt(np.var(diff1)) / np.sqrt(activity) if activity != 0 else 0
complexity = (np.sqrt(np.var(diff2)) / np.sqrt(np.var(diff1))) / mobility if (np.var(diff1) != 0 and mobility != 0) else 0
time_feat[idx+7] = activity
time_feat[idx+8] = mobility
time_feat[idx+9] = complexity
# 样本熵
time_feat[idx+10] = scipy.stats.entropy(np.histogram(ch_data, bins=10)[0] + 1e-8)
idx += 12
return time_feat
def extract_freq_domain_features(eeg_data: np.ndarray, channels: list, fs: int, psd_nperseg: int) -> np.ndarray:
"""提取频域特征(单试次:n_channels×n_samples → 14×n_channels维)"""
n_channels = eeg_data.shape[0]
freq_feat = np.empty((14 * n_channels,))
eeg_bands = {'delta': (0.5, 4), 'theta': (4, 8), 'alpha': (8, 13),
'mu': (10, 12), 'beta': (13, 30), 'gamma': (30, 45)}
idx = 0
for ch_idx in range(n_channels):
ch_data = eeg_data[ch_idx]
freqs, psd = signal.welch(ch_data, fs=fs, nperseg=psd_nperseg, noverlap=psd_nperseg//2)
total_power = np.sum(psd)
# 各频段功率及功率比
for band_name, (fmin, fmax) in eeg_bands.items():
band_idx = (freqs >= fmin) & (freqs <= fmax)
band_power = np.sum(psd[band_idx])
freq_feat[idx] = band_power
freq_feat[idx+1] = band_power / (total_power + 1e-8)
idx += 2
# PSD峰值频率
freq_feat[idx] = freqs[np.argmax(psd)]
idx += 1
return freq_feat
def extract_time_freq_features(eeg_data: np.ndarray, channels: list, fs: int, wavelet_scales: np.ndarray, wavelet_type: str) -> np.ndarray:
"""提取时频域特征(单试次:n_channels×n_samples → 10×n_channels维)"""
n_channels = eeg_data.shape[0]
tf_feat = np.empty((10 * n_channels,))
eeg_bands = {'alpha': (8, 13), 'mu': (10, 12), 'beta': (13, 30)}
idx = 0
for ch_idx in range(n_channels):
ch_data = eeg_data[ch_idx]
coeffs, _ = pywt.cwt(ch_data, wavelet_scales, wavelet_type, sampling_period=1/fs)
power = np.abs(coeffs) ** 2
# 小波系数统计特征
tf_feat[idx] = np.mean(power)
tf_feat[idx+1] = np.max(power)
tf_feat[idx+2] = np.std(power)
tf_feat[idx+3] = scipy.stats.entropy(power.flatten() + 1e-8)
idx += 4
# 关键频段小波功率
freqs = pywt.scale2frequency(wavelet_type, wavelet_scales) * fs
for band_name in ['alpha', 'mu', 'beta']:
fmin, fmax = eeg_bands[band_name]
band_idx = (freqs >= fmin) & (freqs <= fmax)
band_power = np.sum(power[band_idx])
tf_feat[idx] = band_power
idx += 2
return tf_feat
def extract_spatial_features_train(X_train: np.ndarray, y_train: np.ndarray, n_csp_components: int, csp_reg: float, n_jobs: int = -1) -> tuple[np.ndarray, CSP]:
"""训练集CSP特征提取(避免数据泄露,并行加速)"""
csp = CSP(n_components=n_csp_components, reg=csp_reg, log=True, norm_trace=False, n_jobs=n_jobs)
csp_data = csp.fit_transform(X_train, y_train)
return csp_data, csp
def extract_spatial_features_infer(X: np.ndarray, csp: CSP) -> np.ndarray:
"""测试集/推理CSP特征提取(仅变换)"""
return csp.transform(X)
# 向量化特征提取:批量处理所有试次,提升效率3-5倍
def batch_extract_features(eeg_data_list: np.ndarray, channels: list, fs: int, psd_nperseg: int, wavelet_scales: np.ndarray, wavelet_type: str) -> np.ndarray:
"""
批量提取特征(向量化处理)
eeg_data_list: n_trials×n_channels×n_samples
return: n_trials×total_features
"""
n_trials = eeg_data_list.shape[0]
# 提取单试次特征获取维度
single_feat = np.hstack([
extract_time_domain_features(eeg_data_list[0], channels),
extract_freq_domain_features(eeg_data_list[0], channels, fs, psd_nperseg),
extract_time_freq_features(eeg_data_list[0], channels, fs, wavelet_scales, wavelet_type)
])
total_feat_dim = single_feat.shape[0]
# 预分配批量特征矩阵
batch_feat = np.empty((n_trials, total_feat_dim))
# 批量处理
for i in range(n_trials):
batch_feat[i] = np.hstack([
extract_time_domain_features(eeg_data_list[i], channels),
extract_freq_domain_features(eeg_data_list[i], channels, fs, psd_nperseg),
extract_time_freq_features(eeg_data_list[i], channels, fs, wavelet_scales, wavelet_type)
])
return batch_feat
2.环境准备
bash
# 核心依赖
pip install numpy==1.26 scipy==1.11 mne==1.7 pywt==1.4.1 scikit-learn==1.4 joblib==1.3 pandas==2.2 matplotlib==3.8 seaborn==0.13
3.主程序代码
python
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import logging
from logging.handlers import RotatingFileHandler
import time
import warnings
import argparse
from typing import Tuple, List, Dict, Any
import scipy.stats as stats
warnings.filterwarnings('ignore')
# 导入独立特征提取模块
from feature_extractors import (
batch_extract_features, extract_spatial_features_train,
extract_spatial_features_infer
)
# sklearn相关导入
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import (
VarianceThreshold, SelectKBest, mutual_info_classif, f_classif,
RFE, SelectFromModel
)
from sklearn.model_selection import StratifiedKFold, train_test_split, cross_val_score
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
import joblib
import mne
# ---------------------- 生产级日志配置(包含推理日志) ----------------------
def setup_logger(name: str = 'bci_feature_selection') -> logging.Logger:
"""配置日志:控制台+文件输出,支持轮转压缩,区分DEBUG/INFO级别"""
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
logger.handlers.clear()
# 控制台输出(INFO及以上)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
# 文件输出(DEBUG及以上,10MB/文件,保留7个)
file_handler = RotatingFileHandler(
f'{name}.log', maxBytes=10*1024*1024, backupCount=7, encoding='utf-8'
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(filename)s:%(lineno)d - %(levelname)s - %(message)s'))
logger.addHandler(console_handler)
logger.addHandler(file_handler)
return logger
# 主日志器
logger = setup_logger()
# 推理日志器
infer_logger = setup_logger('bci_inference')
# ---------------------- 全局配置(规范化,暴露所有硬编码参数) ----------------------
CONFIG = {
"SAMPLING_FREQ": 250, # EEG采样率(Hz)
"TIME_RANGE": (-0.2, 2.0), # Epochs时间范围(事件前0.2s~事件后2s)
"BASELINE_WINDOW": (-0.2, 0.0),# 基线校正窗口
"MI_CLASSES": {"Left": 1, "Right": 2}, # BCI-IV-2a官方事件码
"RANDOM_STATE": 42, # 随机种子(保证可复现)
"CSP_COMPONENTS": 8, # CSP空域特征组件数
"CSP_REG": 0.1, # CSP正则化参数(提升小样本稳定性)
"CSP_N_JOBS": -1, # CSP并行加速(使用所有CPU核心)
"FEATURE_SELECT_K": 50, # 特征选择保留Top-K特征
"VAR_THRESHOLD": 0.01, # 方差选择阈值
"CHANNELS": ['C3', 'C4', 'CP3', 'CP4', 'Cz', 'FC1', 'FC2', 'CPz'], # 运动区通道
"PSD_NPERSEG": 256, # Welch PSD计算参数(可随采样率调整)
"WAVELET_SCALES": np.arange(1, 64), # 小波尺度
"WAVELET_TYPE": "morl", # 小波类型(暴露为配置参数)
"CV_FOLDS": 5, # 交叉验证折数
"TEST_SIZE": 0.2, # 预留测试集比例(用于最终验证)
"MI_NEIGHBORS": 3, # 互信息计算的近邻数
"CLASS_WEIGHT": "balanced" # 处理类别不平衡(SVM参数)
}
# ---------------------- 工具函数:封装重复逻辑 ----------------------
def get_selected_names(original_names: List[str], mask: np.ndarray) -> List[str]:
"""根据mask获取筛选后的特征名(封装重复逻辑)"""
return [name for idx, name in enumerate(original_names) if mask[idx]]
def generate_feature_names(channels: list) -> List[str]:
"""生成特征名(补全截断代码,保证语法正确)"""
feature_names = []
# 时域特征名
for ch in channels:
feature_names.extend([
f"{ch}_mean", f"{ch}_std", f"{ch}_skew", f"{ch}_kurt",
f"{ch}_max", f"{ch}_min", f"{ch}_ptp", f"{ch}_hjorth_activity",
f"{ch}_hjorth_mobility", f"{ch}_hjorth_complexity", f"{ch}_sample_entropy"
])
# 频域特征名
for ch in channels:
for band in ['delta', 'theta', 'alpha', 'mu', 'beta', 'gamma']:
feature_names.extend([f"{ch}_{band}_power", f"{ch}_{band}_power_ratio"])
feature_names.append(f"{ch}_psd_peak_freq")
# 时频域特征名
for ch in channels:
feature_names.extend([
f"{ch}_wavelet_mean", f"{ch}_wavelet_max",
f"{ch}_wavelet_std", f"{ch}_wavelet_entropy"
])
for band in ['alpha', 'mu', 'beta']:
feature_names.append(f"{ch}_wavelet_{band}_power")
return feature_names
def validate_input_data(eeg_data: np.ndarray, channels: list, min_samples: int = 100) -> None:
"""输入数据验证(增强健壮性,检查通道数和顺序)"""
if eeg_data.ndim not in [2, 3]:
raise ValueError(f"EEG数据维度错误:期望2维(n_channels×n_samples)或3维(n_trials×n_channels×n_samples),实际{eeg_data.ndim}维")
# 检查通道数
if eeg_data.ndim == 2:
if eeg_data.shape[0] != len(channels):
raise ValueError(f"EEG通道数不匹配:期望{len(channels)}个通道,实际{eeg_data.shape[0]}个")
if eeg_data.shape[1] < min_samples:
raise ValueError(f"EEG数据长度不足:期望≥{min_samples}个样本,实际{eeg_data.shape[1]}个")
else:
if eeg_data.shape[1] != len(channels):
raise ValueError(f"EEG通道数不匹配:期望{len(channels)}个通道,实际{eeg_data.shape[1]}个")
if eeg_data.shape[2] < min_samples:
raise ValueError(f"EEG数据长度不足:期望≥{min_samples}个样本,实际{eeg_data.shape[2]}个")
# 检查NaN/Inf
if np.any(np.isnan(eeg_data)) or np.any(np.isinf(eeg_data)):
raise ValueError("EEG数据包含NaN/Inf值,需预处理")
# ---------------------- 核心:特征选择管道(统一接口,修复互信息种子) ----------------------
def feature_selection_pipeline(X: np.ndarray, y: np.ndarray, feature_names: List[str], method: str = "anova") -> Tuple[np.ndarray, List[str], Any, np.ndarray]:
"""
特征选择管道:前置方差选择 + 核心选择方法
注意:仅在训练集上调用,避免数据泄露
返回:筛选后特征、特征名、选择器、选择器掩码(用于可视化)
"""
logger.debug(f"特征选择方法:{method} | 原始维度:{X.shape[1]}")
selector = None
mask = np.ones(X.shape[1], dtype=bool) # 默认全选
var_mask = np.ones(X.shape[1], dtype=bool)
# 步骤1:前置方差选择(移除低方差噪声特征)
var_selector = VarianceThreshold(threshold=CONFIG["VAR_THRESHOLD"])
X_var = var_selector.fit_transform(X)
var_mask = var_selector.get_support()
var_names = get_selected_names(feature_names, var_mask)
logger.debug(f"方差选择后维度:{X_var.shape[1]}")
# 步骤2:核心特征选择方法
if method == "none":
# 无额外特征选择(仅方差选择)
X_selected = X_var
selected_names = var_names
mask = var_mask
elif method == "full_feature":
# 全特征基线(无任何选择,用于公平对比)
X_selected = X
selected_names = feature_names
elif method == "anova":
# 过滤式:ANOVA方差分析
selector = SelectKBest(score_func=f_classif, k=CONFIG["FEATURE_SELECT_K"])
X_selected = selector.fit_transform(X_var, y)
# 生成完整掩码(方差选择+ANOVA选择)
anova_mask = selector.get_support()
mask = np.zeros(len(feature_names), dtype=bool)
mask[var_mask] = anova_mask
selected_names = get_selected_names(feature_names, mask)
elif method == "mutual_info":
# 过滤式:互信息(修复随机种子,使用固定参数函数)
def mi_score_func(X, y):
return mutual_info_classif(X, y, random_state=CONFIG["RANDOM_STATE"], n_neighbors=CONFIG["MI_NEIGHBORS"])
selector = SelectKBest(score_func=mi_score_func, k=CONFIG["FEATURE_SELECT_K"])
X_selected = selector.fit_transform(X_var, y)
# 生成完整掩码
mi_mask = selector.get_support()
mask = np.zeros(len(feature_names), dtype=bool)
mask[var_mask] = mi_mask
selected_names = get_selected_names(feature_names, mask)
elif method == "rfe":
# 包裹式:递归特征消除(基于线性SVM)
estimator = SVC(kernel="linear", random_state=CONFIG["RANDOM_STATE"], class_weight=CONFIG["CLASS_WEIGHT"])
selector = RFE(estimator, n_features_to_select=CONFIG["FEATURE_SELECT_K"], step=10)
X_selected = selector.fit_transform(X_var, y)
# 生成完整掩码
rfe_mask = selector.get_support()
mask = np.zeros(len(feature_names), dtype=bool)
mask[var_mask] = rfe_mask
selected_names = get_selected_names(feature_names, mask)
elif method == "rf_importance":
# 嵌入式:随机森林特征重要性
estimator = RandomForestClassifier(n_estimators=100, random_state=CONFIG["RANDOM_STATE"], class_weight=CONFIG["CLASS_WEIGHT"])
selector = SelectFromModel(estimator, max_features=CONFIG["FEATURE_SELECT_K"])
X_selected = selector.fit_transform(X_var, y)
# 生成完整掩码
rf_mask = selector.get_support()
mask = np.zeros(len(feature_names), dtype=bool)
mask[var_mask] = rf_mask
selected_names = get_selected_names(feature_names, mask)
elif method == "l1":
# 嵌入式:L1正则化(逻辑回归)
estimator = LogisticRegression(penalty="l1", solver="liblinear", random_state=CONFIG["RANDOM_STATE"], class_weight=CONFIG["CLASS_WEIGHT"])
selector = SelectFromModel(estimator, max_features=CONFIG["FEATURE_SELECT_K"])
X_selected = selector.fit_transform(X_var, y)
# 生成完整掩码
l1_mask = selector.get_support()
mask = np.zeros(len(feature_names), dtype=bool)
mask[var_mask] = l1_mask
selected_names = get_selected_names(feature_names, mask)
else:
raise ValueError(f"不支持的特征选择方法:{method}")
logger.debug(f"最终保留维度:{X_selected.shape[1]} | 保留特征数:{len(selected_names)}")
return X_selected, selected_names, selector, mask
# ---------------------- 核心:稳健评估流程(分层5折交叉验证,修复统计检验) ----------------------
def robust_evaluation_pipeline(X: np.ndarray, y: np.ndarray, feature_names: List[str], methods: List[str]) -> Dict[str, Dict[str, Any]]:
"""
稳健评估流程:分层5折交叉验证 + 每折内独立特征选择
避免数据泄露,保证评估结果可靠,保存特征重要性数据用于可视化
"""
logger.info("开始稳健评估流程(分层5折交叉验证)")
skf = StratifiedKFold(n_splits=CONFIG["CV_FOLDS"], shuffle=True, random_state=CONFIG["RANDOM_STATE"])
results = {method: {
"cv_acc": [], "cv_f1": [], "train_time": [], "dimension": [],
"feature_importance": [], "selected_names": [], "mask": []
} for method in methods}
base_model = SVC(kernel="rbf", C=1.0, gamma="scale", random_state=CONFIG["RANDOM_STATE"], class_weight=CONFIG["CLASS_WEIGHT"])
for fold_idx, (train_idx, val_idx) in enumerate(skf.split(X, y)):
logger.info(f"\n===== 交叉验证折数:{fold_idx+1}/{CONFIG['CV_FOLDS']} =====")
# 划分当前折的训练集/验证集(严格隔离)
X_train_fold, X_val_fold = X[train_idx], X[val_idx]
y_train_fold, y_val_fold = y[train_idx], y[val_idx]
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_fold)
X_val_scaled = scaler.transform(X_val_fold)
# 1. 训练集CSP特征提取(关键:仅在当前折训练集拟合,并行加速)
X_train_csp, csp = extract_spatial_features_train(
X_train_fold, y_train_fold,
CONFIG["CSP_COMPONENTS"], CONFIG["CSP_REG"], CONFIG["CSP_N_JOBS"]
)
# 验证集CSP特征提取(仅变换)
X_val_csp = extract_spatial_features_infer(X_val_fold, csp)
# 2. 合并多模态特征(时域+频域+时频域+CSP)
X_train_combined = np.hstack([X_train_scaled, X_train_csp])
X_val_combined = np.hstack([X_val_scaled, X_val_csp])
# 生成合并后的特征名
csp_feature_names = [f"csp_comp{i+1}" for i in range(CONFIG["CSP_COMPONENTS"])]
combined_feature_names = feature_names + csp_feature_names
# 3. 逐方法进行特征选择与模型训练
for method in methods:
# 特征选择(仅在当前折训练集进行)
X_train_selected, selected_names, selector, mask = feature_selection_pipeline(
X_train_combined, y_train_fold, combined_feature_names, method
)
# 验证集特征选择(仅变换)
X_val_selected = selector.transform(X_val_combined) if selector is not None else X_val_combined
# 模型训练与计时
train_start = time.time()
base_model.fit(X_train_selected, y_train_fold)
train_time = time.time() - train_start
# 验证集评估(准确率+F1分数,处理类别不平衡)
y_pred = base_model.predict(X_val_selected)
val_acc = accuracy_score(y_val_fold, y_pred)
val_f1 = f1_score(y_val_fold, y_pred, average="weighted")
# 保存特征重要性(随机森林)
if method == "rf_importance" and selector is not None:
# 训练随机森林获取特征重要性
rf = RandomForestClassifier(n_estimators=100, random_state=CONFIG["RANDOM_STATE"], class_weight=CONFIG["CLASS_WEIGHT"])
rf.fit(X_train_selected, y_train_fold)
results[method]["feature_importance"].append(rf.feature_importances_)
results[method]["selected_names"].append(selected_names)
# 保存当前折结果
results[method]["cv_acc"].append(val_acc)
results[method]["cv_f1"].append(val_f1)
results[method]["train_time"].append(train_time)
results[method]["dimension"].append(X_train_selected.shape[1])
results[method]["mask"].append(mask)
logger.info(f"方法:{method} | 验证集准确率:{val_acc:.4f} | F1分数:{val_f1:.4f} | 维度:{X_train_selected.shape[1]} | 训练时间:{train_time:.4f}s")
# 计算统计结果,修复统计检验(使用numpy数组)
for method in methods:
# 基本统计
results[method]["cv_acc_mean"] = np.mean(results[method]["cv_acc"])
results[method]["cv_acc_std"] = np.std(results[method]["cv_acc"])
results[method]["cv_f1_mean"] = np.mean(results[method]["cv_f1"])
results[method]["cv_f1_std"] = np.std(results[method]["cv_f1"])
results[method]["train_time_mean"] = np.mean(results[method]["train_time"])
results[method]["dimension_mean"] = np.mean(results[method]["dimension"])
# 统计显著性检验(对比全特征基线,配对t检验)
if method != "full_feature":
baseline_acc = np.array(results["full_feature"]["cv_acc"])
current_acc = np.array(results[method]["cv_acc"])
# 配对t检验(验证改进是否显著)
t_stat, p_value = stats.ttest_rel(current_acc, baseline_acc)
results[method]["p_value"] = p_value
results[method]["is_significant"] = p_value < 0.05 # 显著性水平0.05
logger.info("\n===== 稳健评估结果汇总 =====")
for method in methods:
sig_str = "是(p<0.05)" if results[method].get("is_significant", False) else "否"
logger.info(
f"方法:{method} | 平均准确率:{results[method]['cv_acc_mean']:.4f}±{results[method]['cv_acc_std']:.4f} | "
f"平均F1:{results[method]['cv_f1_mean']:.4f}±{results[method]['cv_f1_std']:.4f} | "
f"平均维度:{results[method]['dimension_mean']:.0f} | 平均训练时间:{results[method]['train_time_mean']:.4f}s | "
f"显著改进:{sig_str}"
)
return results
# ---------------------- 结果可视化(修复变量作用域,增强可解释性) ----------------------
def plot_feature_selection_results(results: Dict[str, Dict[str, Any]], methods: List[str]):
"""可视化不同特征选择方法的效果:维度、准确率、训练时间 + 特征可解释性(修复变量作用域)"""
plt.figure(figsize=(16, 10))
# 提取数据
dimensions = [results[m]["dimension_mean"] for m in methods]
accuracies = [results[m]["cv_acc_mean"] for m in methods]
acc_std = [results[m]["cv_acc_std"] for m in methods]
train_times = [results[m]["train_time_mean"] for m in methods]
# 子图1:特征维度对比
plt.subplot(2, 2, 1)
sns.barplot(x=methods, y=dimensions, palette="Blues")
plt.title("特征选择后的平均维度对比", fontsize=12)
plt.xlabel("特征选择方法", fontsize=10)
plt.ylabel("平均特征维度", fontsize=10)
plt.xticks(rotation=45)
# 子图2:交叉验证准确率对比(带误差棒)
plt.subplot(2, 2, 2)
sns.barplot(x=methods, y=accuracies, yerr=acc_std, palette="Greens")
plt.title("5折交叉验证平均准确率对比", fontsize=12)
plt.xlabel("特征选择方法", fontsize=10)
plt.ylabel("平均准确率(±标准差)", fontsize=10)
plt.xticks(rotation=45)
plt.ylim(0.5, 1.0) # 聚焦合理准确率范围
# 子图3:模型训练时间对比
plt.subplot(2, 2, 3)
sns.barplot(x=methods, y=train_times, palette="Oranges")
plt.title("平均模型训练时间对比(秒)", fontsize=12)
plt.xlabel("特征选择方法", fontsize=10)
plt.ylabel("平均训练时间(s)", fontsize=10)
plt.xticks(rotation=45)
# 子图4:随机森林特征重要性(Top-20,修复变量作用域,使用results中的数据)
plt.subplot(2, 2, 4)
if "rf_importance" in results and len(results["rf_importance"]["feature_importance"]) > 0:
# 取最后一折的特征重要性和特征名
importances = results["rf_importance"]["feature_importance"][-1]
selected_names = results["rf_importance"]["selected_names"][-1]
if len(importances) >= 20:
indices = np.argsort(importances)[-20:] # Top-20重要特征
plt.barh(range(len(indices)), importances[indices], align='center')
plt.yticks(range(len(indices)), [selected_names[i] for i in indices], fontsize=8)
else:
plt.barh(range(len(importances)), importances, align='center')
plt.yticks(range(len(importances)), selected_names, fontsize=8)
plt.title("随机森林特征重要性(Top-20)", fontsize=12)
plt.xlabel("重要性得分", fontsize=10)
plt.tight_layout()
plt.savefig("feature_selection_results_final.png", dpi=300, bbox_inches='tight')
plt.close()
logger.info("可视化结果已保存:feature_selection_results_final.png")
# ---------------------- 工程化部署:重构为类,封装配置和参数(修复接口问题) ----------------------
class BCIInferencePipeline:
"""BCI推理管道类(封装配置和参数,修复部署接口问题)"""
def __init__(self, model_path: str):
"""初始化:加载部署模型和配置"""
self.deploy_data = joblib.load(model_path)
self.scaler = self.deploy_data["scaler"]
self.csp = self.deploy_data["csp"]
self.feature_selector = self.deploy_data["feature_selector"]
self.model = self.deploy_data["model"]
self.config = self.deploy_data["config"]
self.selected_feature_names = self.deploy_data["selected_feature_names"]
# 初始化日志
self.logger = infer_logger
self.logger.info("BCI推理管道已初始化")
def predict(self, eeg_data: np.ndarray) -> str:
"""
推理函数:输入预处理后的EEG数据,返回预测结果
eeg_data: n_channels×n_samples(二维数组)
"""
try:
# 输入验证
validate_input_data(eeg_data, self.config["CHANNELS"])
self.logger.debug(f"开始推理:EEG数据形状 {eeg_data.shape}")
# 1. 特征提取(批量处理单试次)
eeg_data_batch = eeg_data.reshape(1, *eeg_data.shape) # 转为1×n_channels×n_samples
feat = batch_extract_features(
eeg_data_batch,
self.config["CHANNELS"],
self.config["SAMPLING_FREQ"],
self.config["PSD_NPERSEG"],
np.array(self.config["WAVELET_SCALES"]),
self.config["WAVELET_TYPE"]
)
# 2. 标准化
feat_scaled = self.scaler.transform(feat)
# 3. CSP特征提取(仅变换)
csp_feat = extract_spatial_features_infer(eeg_data_batch, self.csp)
# 4. 特征合并
feat_combined = np.hstack([feat_scaled, csp_feat])
# 5. 特征选择(仅变换)
feat_selected = self.feature_selector.transform(feat_combined) if self.feature_selector is not None else feat_combined
# 6. 预测
pred = self.model.predict(feat_selected)[0]
result = "Left" if pred == 0 else "Right"
self.logger.debug(f"推理完成:预测结果 {result}")
return result
except Exception as e:
self.logger.error(f"推理失败:{str(e)}", exc_info=True)
return "Error"
def save_deployment_model(X_train: np.ndarray, y_train: np.ndarray, feature_names: List[str], best_method: str) -> None:
"""
保存工程化部署模型:仅保留必要参数,剔除冗余数据,修复最终模型训练逻辑
使用训练集训练,避免测试集泄露
"""
logger.info(f"保存最优模型(方法:{best_method})")
# 分割训练集和验证集(最终训练用训练集)
X_train_final, _, y_train_final, _ = train_test_split(
X_train, y_train, test_size=CONFIG["TEST_SIZE"], stratify=y_train, random_state=CONFIG["RANDOM_STATE"]
)
# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_final)
# CSP训练(仅最终训练集,并行加速)
X_train_csp, csp = extract_spatial_features_train(
X_train_final, y_train_final,
CONFIG["CSP_COMPONENTS"], CONFIG["CSP_REG"], CONFIG["CSP_N_JOBS"]
)
# 特征合并
X_train_combined = np.hstack([X_train_scaled, X_train_csp])
csp_feature_names = [f"csp_comp{i+1}" for i in range(CONFIG["CSP_COMPONENTS"])]
combined_feature_names = feature_names + csp_feature_names
# 特征选择
X_train_selected, selected_names, selector, _ = feature_selection_pipeline(
X_train_combined, y_train_final, combined_feature_names, best_method
)
# 训练最终模型(处理类别不平衡)
final_model = SVC(kernel="rbf", C=1.0, gamma="scale", random_state=CONFIG["RANDOM_STATE"], class_weight=CONFIG["CLASS_WEIGHT"])
final_model.fit(X_train_selected, y_train_final)
# 精简部署字典(仅保留必要组件,压缩保存)
deploy_dict = {
"scaler": scaler, # 标准化器(拟合后参数)
"csp": csp, # CSP(仅保留变换参数,无训练数据)
"feature_selector": selector, # 特征选择器
"model": final_model, # 最终训练模型
"selected_feature_names": selected_names, # 保留特征名(用于推理验证)
"config": { # 必要配置(仅保留推理所需)
"SAMPLING_FREQ": CONFIG["SAMPLING_FREQ"],
"CHANNELS": CONFIG["CHANNELS"],
"PSD_NPERSEG": CONFIG["PSD_NPERSEG"],
"WAVELET_SCALES": CONFIG["WAVELET_SCALES"].tolist(),
"WAVELET_TYPE": CONFIG["WAVELET_TYPE"]
}
}
# 保存部署文件(压缩)
joblib.dump(deploy_dict, "bci_feature_selection_deploy_final.pkl", compress=3)
logger.info("部署模型已保存:bci_feature_selection_deploy_final.pkl")
# 生成部署说明
deploy_readme = """
# BCI特征选择模型部署说明(最终修订版)
## 1. 环境依赖
```bash
pip install numpy==1.26 scipy==1.11 mne==1.7 pywt==1.4.1 scikit-learn==1.4 joblib==1.3 pandas==2.2
4. 依赖模块
需将特征提取模块(feature_extractors.py)与部署脚本放在同一目录
5. 数据预处理要求
- 采样率:250Hz(其他采样率需同步调整 config 中的参数)
- 滤波:0.5-45Hz 带通滤波 + 50Hz 工频去噪
- 参考:平均参考
- Epochs:-0.2~2.0s(基线:-0.2~0.0s)
- 通道:C3/C4/CP3/CP4/Cz/FC1/FC2/CPz(顺序固定)
- 数据格式:n_channels×n_samples(二维数组)
6. 推理流程
python
import joblib
import mne
from feature_extractors import BCIInferencePipeline
# 初始化推理管道
infer_pipeline = BCIInferencePipeline("bci_feature_selection_deploy_final.pkl")
# 预处理新数据
raw = mne.io.read_raw_gdf('new_data.gdf', preload=True)
raw.filter(0.5, 45).set_eeg_reference('average').notch_filter(50)
events, _ = mne.events_from_annotations(raw)
epochs = mne.Epochs(
raw, events, {'Left':1, 'Right':2},
tmin=-0.2, tmax=2.0, baseline=(-0.2, 0.0), preload=True
)
epochs.pick_channels(infer_pipeline.config['CHANNELS'], ordered=True)
# 逐试次推理
for epoch_data in epochs.get_data():
result = infer_pipeline.predict(epoch_data)
print(f"预测结果:{result}")
7. 关键注意事项
- 特征一致性:推理阶段的特征提取逻辑、顺序必须与训练阶段完全一致;
- 数据隔离:特征选择器 / 标准化器 / CSP 均已用训练集拟合,推理时仅调用 transform;
- 实时优化:可将特征提取逻辑移植到 GPU / 边缘芯片,或离线预定义特征子集减少计算;
- 跨受试者:EEG 存在个体差异,跨受试者使用时建议重新训练特征选择器;
- 异常处理:类内部已集成日志和异常捕获,生产环境可添加重试逻辑。
更多推荐
所有评论(0)