Python机器学习全流程实战:从数据探索到模型部署
·
Python机器学习全流程实战:从数据探索到模型部署
摘要
本文全面介绍了使用Python进行机器学习的完整流程,涵盖了数据预处理、特征工程、模型训练、评估优化和部署应用等关键环节。通过实际案例演示,结合详细的代码实现和可视化分析,为读者提供了一套可复现的机器学习实践框架。文章包含超过20个完整的代码示例,30余张可视化图表,深入探讨了Scikit-learn、Pandas、Matplotlib、Seaborn、XGBoost等主流库的应用技巧,并引入了模型解释性、自动化机器学习等前沿话题。
第一章:引言与背景
1.1 机器学习的发展现状
机器学习作为人工智能的核心分支,在过去十年中取得了突破性进展。根据Gartner的技术成熟度曲线,机器学习技术已从期望膨胀期进入实质生产的高原期,在各行各业展现出强大的应用价值。Python凭借其简洁的语法、丰富的生态库和强大的社区支持,已成为机器学习领域的首选编程语言。
1.2 Python机器学习生态系统
Python的机器学习生态系统呈现出多层次、模块化的特点:
- 基础科学计算库:NumPy、SciPy提供高效的数值计算能力
- 数据处理库:Pandas、Dask处理结构化数据的利器
- 可视化库:Matplotlib、Seaborn、Plotly实现数据可视化
- 机器学习框架:Scikit-learn提供经典算法实现
- 深度学习框架:TensorFlow、PyTorch支持神经网络模型
- 自动化工具:AutoML、TPOT简化模型构建流程
1.3 本文结构与目标
本文采用"理论-实践-应用"的三段式结构,旨在帮助读者:
- 掌握机器学习的基本原理和算法思想
- 熟练使用Python进行数据分析和模型构建
- 理解模型评估和优化的方法论
- 学会将机器学习模型部署到生产环境
第二章:环境配置与数据准备
2.1 Python环境搭建
# 环境配置与依赖管理
import sys
import platform
import subprocess
def check_environment():
"""检查Python环境配置"""
print(f"Python版本: {sys.version}")
print(f"操作系统: {platform.system()} {platform.release()}")
print(f"处理器架构: {platform.machine()}")
# 检查关键库的版本
required_libraries = {
'numpy': '1.21.0',
'pandas': '1.3.0',
'scikit-learn': '1.0.0',
'matplotlib': '3.4.0',
'seaborn': '0.11.0',
'xgboost': '1.5.0'
}
for lib, min_version in required_libraries.items():
try:
module = __import__(lib)
version = getattr(module, '__version__', '未知')
print(f"{lib}: {version}")
except ImportError:
print(f"{lib}: 未安装")
# 创建虚拟环境(示例命令)
def create_virtual_env():
"""创建Python虚拟环境"""
commands = [
'python -m venv ml_env',
'source ml_env/bin/activate', # Linux/Mac
# 'ml_env\\Scripts\\activate', # Windows
'pip install --upgrade pip',
'pip install numpy pandas scikit-learn matplotlib seaborn xgboost jupyter'
]
print("建议执行以下命令创建环境:")
for cmd in commands:
print(f" {cmd}")
if __name__ == "__main__":
check_environment()
create_virtual_env()
2.2 数据获取与加载
import pandas as pd
import numpy as np
from sklearn.datasets import fetch_california_housing, load_iris, load_wine
import requests
import zipfile
import io
import os
class DataLoader:
"""数据加载器类"""
def __init__(self, data_dir='./data'):
self.data_dir = data_dir
os.makedirs(data_dir, exist_ok=True)
def load_sklearn_dataset(self, dataset_name='california_housing'):
"""加载Scikit-learn内置数据集"""
datasets = {
'california_housing': fetch_california_housing,
'iris': load_iris,
'wine': load_wine,
'diabetes': lambda: load_diabetes(return_X_y=False),
'breast_cancer': load_breast_cancer
}
if dataset_name in datasets:
data = datasets[dataset_name]()
if dataset_name == 'california_housing':
df = pd.DataFrame(data.data, columns=data.feature_names)
df['target'] = data.target
else:
df = pd.DataFrame(data.data, columns=data.feature_names)
df['target'] = data.target
print(f"数据集 '{dataset_name}' 加载成功")
print(f"数据形状: {df.shape}")
print(f"特征列: {list(df.columns[:-1])}")
print(f"目标列: {df.columns[-1]}")
return df
else:
raise ValueError(f"不支持的数据集: {dataset_name}")
def load_csv_from_url(self, url, filename=None):
"""从URL加载CSV数据"""
if filename is None:
filename = url.split('/')[-1]
filepath = os.path.join(self.data_dir, filename)
if not os.path.exists(filepath):
print(f"从 {url} 下载数据...")
response = requests.get(url)
if response.status_code == 200:
with open(filepath, 'wb') as f:
f.write(response.content)
print(f"数据已保存到 {filepath}")
else:
raise Exception(f"下载失败,状态码: {response.status_code}")
df = pd.read_csv(filepath)
print(f"CSV数据加载成功,形状: {df.shape}")
return df
def load_compressed_data(self, url, extract_dir=None):
"""加载压缩格式数据"""
if extract_dir is None:
extract_dir = self.data_dir
print(f"下载并解压数据: {url}")
response = requests.get(url)
if response.status_code == 200:
with zipfile.ZipFile(io.BytesIO(response.content)) as z:
z.extractall(extract_dir)
print(f"数据解压到: {extract_dir}")
# 返回解压后的文件列表
return z.namelist()
else:
raise Exception(f"下载失败,状态码: {response.status_code}")
# 示例:加载多个数据集
def load_example_datasets():
"""加载示例数据集"""
loader = DataLoader()
# 加载加州房价数据集
housing_data = loader.load_sklearn_dataset('california_housing')
# 加载鸢尾花数据集
iris_data = loader.load_sklearn_dataset('iris')
# 加载葡萄酒数据集
wine_data = loader.load_sklearn_dataset('wine')
return {
'housing': housing_data,
'iris': iris_data,
'wine': wine_data
}
if __name__ == "__main__":
datasets = load_example_datasets()
# 显示数据集基本信息
for name, df in datasets.items():
print(f"\n{name}数据集:")
print(df.info())
print(f"缺失值统计:\n{df.isnull().sum()}")
print(f"描述性统计:\n{df.describe()}")
2.3 数据质量评估
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
class DataQualityAnalyzer:
"""数据质量分析器"""
def __init__(self, df):
self.df = df
self.report = {}
def generate_quality_report(self):
"""生成完整的数据质量报告"""
self._check_missing_values()
self._check_data_types()
self._check_statistical_properties()
self._check_outliers()
self._check_duplicates()
return self.report
def _check_missing_values(self):
"""检查缺失值"""
missing_info = self.df.isnull().sum()
missing_percentage = (missing_info / len(self.df)) * 100
self.report['missing_values'] = {
'count': missing_info.to_dict(),
'percentage': missing_percentage.to_dict(),
'total_missing': missing_info.sum(),
'total_percentage': missing_percentage.sum()
}
def _check_data_types(self):
"""检查数据类型"""
dtype_info = self.df.dtypes
numeric_cols = self.df.select_dtypes(include=[np.number]).columns.tolist()
categorical_cols = self.df.select_dtypes(include=['object', 'category']).columns.tolist()
self.report['data_types'] = {
'dtypes': dtype_info.to_dict(),
'numeric_columns': numeric_cols,
'categorical_columns': categorical_cols,
'total_columns': len(self.df.columns)
}
def _check_statistical_properties(self):
"""检查统计属性"""
numeric_df = self.df.select_dtypes(include=[np.number])
stats_info = {
'mean': numeric_df.mean().to_dict(),
'std': numeric_df.std().to_dict(),
'min': numeric_df.min().to_dict(),
'max': numeric_df.max().to_dict(),
'skewness': numeric_df.apply(lambda x: stats.skew(x.dropna())).to_dict(),
'kurtosis': numeric_df.apply(lambda x: stats.kurtosis(x.dropna())).to_dict()
}
self.report['statistics'] = stats_info
def _check_outliers(self):
"""使用IQR方法检测异常值"""
numeric_df = self.df.select_dtypes(include=[np.number])
outliers_info = {}
for col in numeric_df.columns:
Q1 = numeric_df[col].quantile(0.25)
Q3 = numeric_df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = numeric_df[(numeric_df[col] < lower_bound) | (numeric_df[col] > upper_bound)][col]
outliers_info[col] = {
'count': len(outliers),
'percentage': (len(outliers) / len(numeric_df)) * 100,
'lower_bound': lower_bound,
'upper_bound': upper_bound
}
self.report['outliers'] = outliers_info
def _check_duplicates(self):
"""检查重复值"""
duplicate_count = self.df.duplicated().sum()
self.report['duplicates'] = {
'count': duplicate_count,
'percentage': (duplicate_count / len(self.df)) * 100
}
def visualize_quality_report(self):
"""可视化数据质量报告"""
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
fig.suptitle('数据质量分析报告', fontsize=16)
# 1. 缺失值热图
plt.subplot(2, 3, 1)
sns.heatmap(self.df.isnull(), cbar=False, cmap='viridis')
plt.title('缺失值分布热图')
# 2. 数据类型分布
plt.subplot(2, 3, 2)
dtype_counts = self.df.dtypes.value_counts()
plt.pie(dtype_counts.values, labels=dtype_counts.index, autopct='%1.1f%%')
plt.title('数据类型分布')
# 3. 数值特征分布
plt.subplot(2, 3, 3)
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
sample_col = numeric_cols[0]
self.df[sample_col].hist(bins=30, ax=plt.gca())
plt.title(f'{sample_col}分布直方图')
# 4. 异常值箱线图
plt.subplot(2, 3, 4)
if len(numeric_cols) > 0:
sample_data = self.df[numeric_cols[:3]] # 取前3个数值特征
sample_data.boxplot(ax=plt.gca())
plt.title('异常值检测(箱线图)')
plt.xticks(rotation=45)
# 5. 相关性热图
plt.subplot(2, 3, 5)
if len(numeric_cols) > 1:
correlation = self.df[numeric_cols].corr()
sns.heatmap(correlation, annot=True, fmt='.2f', cmap='coolwarm',
square=True, ax=plt.gca())
plt.title('特征相关性热图')
# 6. 数据质量指标汇总
plt.subplot(2, 3, 6)
quality_metrics = [
f"总样本数: {len(self.df)}",
f"总特征数: {len(self.df.columns)}",
f"缺失值比例: {self.report['missing_values']['total_percentage']:.2f}%",
f"重复值比例: {self.report['duplicates']['percentage']:.2f}%",
f"数值特征: {len(self.report['data_types']['numeric_columns'])}",
f"分类特征: {len(self.report['data_types']['categorical_columns'])}"
]
plt.text(0.1, 0.5, '\n'.join(quality_metrics),
fontsize=12, verticalalignment='center')
plt.axis('off')
plt.title('数据质量指标汇总')
plt.tight_layout()
plt.show()
# 示例:分析数据质量
def analyze_data_quality_example():
"""数据质量分析示例"""
# 加载数据
loader = DataLoader()
housing_data = loader.load_sklearn_dataset('california_housing')
# 创建分析器
analyzer = DataQualityAnalyzer(housing_data)
# 生成报告
report = analyzer.generate_quality_report()
print("数据质量报告摘要:")
print(f"缺失值总数: {report['missing_values']['total_missing']}")
print(f"缺失值比例: {report['missing_values']['total_percentage']:.2f}%")
print(f"重复值数量: {report['duplicates']['count']}")
print(f"数值特征数量: {len(report['data_types']['numeric_columns'])}")
# 可视化报告
analyzer.visualize_quality_report()
return report
if __name__ == "__main__":
quality_report = analyze_data_quality_example()
第三章:探索性数据分析与可视化
3.1 单变量分析
class UnivariateAnalyzer:
"""单变量分析器"""
def __init__(self, df):
self.df = df
self.numeric_cols = df.select_dtypes(include=[np.number]).columns
self.categorical_cols = df.select_dtypes(include=['object', 'category']).columns
def analyze_numeric_features(self, columns=None):
"""分析数值特征"""
if columns is None:
columns = self.numeric_cols
analysis_results = {}
for col in columns:
if col in self.df.columns:
data = self.df[col].dropna()
# 基本统计量
stats = {
'count': len(data),
'mean': data.mean(),
'std': data.std(),
'min': data.min(),
'25%': data.quantile(0.25),
'median': data.median(),
'75%': data.quantile(0.75),
'max': data.max(),
'skewness': stats.skew(data),
'kurtosis': stats.kurtosis(data),
'missing': self.df[col].isnull().sum(),
'missing_percentage': (self.df[col].isnull().sum() / len(self.df)) * 100
}
analysis_results[col] = stats
return analysis_results
def analyze_categorical_features(self, columns=None):
"""分析分类特征"""
if columns is None:
columns = self.categorical_cols
analysis_results = {}
for col in columns:
if col in self.df.columns:
data = self.df[col]
# 分类特征统计
stats = {
'count': len(data.dropna()),
'unique': data.nunique(),
'top': data.mode()[0] if not data.mode().empty else None,
'freq': data.value_counts().iloc[0] if not data.value_counts().empty else 0,
'missing': data.isnull().sum(),
'missing_percentage': (data.isnull().sum() / len(data)) * 100,
'value_counts': data.value_counts().to_dict()
}
analysis_results[col] = stats
return analysis_results
def visualize_numeric_distributions(self, columns=None, figsize=(15, 10)):
"""可视化数值特征分布"""
if columns is None:
columns = self.numeric_cols[:6] # 限制显示6个特征
n_cols = min(3, len(columns))
n_rows = (len(columns) + n_cols - 1) // n_cols
fig, axes = plt.subplots(n_rows, n_cols, figsize=figsize)
axes = axes.flatten() if n_rows > 1 else [axes]
for idx, col in enumerate(columns):
if idx < len(axes):
ax = axes[idx]
data = self.df[col].dropna()
# 直方图与密度曲线
sns.histplot(data, kde=True, ax=ax, bins=30)
ax.set_title(f'{col}分布')
ax.set_xlabel(col)
ax.set_ylabel('频数')
# 添加统计信息
stats_text = f'均值: {data.mean():.2f}\n标准差: {data.std():.2f}\n偏度: {stats.skew(data):.2f}'
ax.text(0.05, 0.95, stats_text, transform=ax.transAxes,
verticalalignment='top', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
# 隐藏多余的子图
for idx in range(len(columns), len(axes)):
axes[idx].set_visible(False)
plt.tight_layout()
plt.show()
def visualize_categorical_distributions(self, columns=None, figsize=(15, 8)):
"""可视化分类特征分布"""
if columns is None:
columns = self.categorical_cols[:4] # 限制显示4个特征
n_cols = min(2, len(columns))
n_rows = (len(columns) + n_cols - 1) // n_cols
fig, axes = plt.subplots(n_rows, n_cols, figsize=figsize)
axes = axes.flatten() if n_rows > 1 else [axes]
for idx, col in enumerate(columns):
if idx < len(axes):
ax = axes[idx]
data = self.df[col].dropna()
value_counts = data.value_counts()
# 条形图
if len(value_counts) > 10:
# 如果类别太多,只显示前10个
top_values = value_counts.head(10)
bars = ax.bar(range(len(top_values)), top_values.values)
ax.set_xticks(range(len(top_values)))
ax.set_xticklabels(top_values.index, rotation=45, ha='right')
else:
bars = ax.bar(range(len(value_counts)), value_counts.values)
ax.set_xticks(range(len(value_counts)))
ax.set_xticklabels(value_counts.index, rotation=45, ha='right')
ax.set_title(f'{col}分布')
ax.set_xlabel(col)
ax.set_ylabel('频数')
# 在条形上显示数值
for bar in bars:
height = bar.get_height()
ax.text(bar.get_x() + bar.get_width()/2., height,
f'{int(height)}', ha='center', va='bottom')
# 隐藏多余的子图
for idx in range(len(columns), len(axes)):
axes[idx].set_visible(False)
plt.tight_layout()
plt.show()
# 示例:单变量分析
def univariate_analysis_example():
"""单变量分析示例"""
# 加载数据
loader = DataLoader()
housing_data = loader.load_sklearn_dataset('california_housing')
# 创建分析器
analyzer = UnivariateAnalyzer(housing_data)
# 分析数值特征
numeric_analysis = analyzer.analyze_numeric_features()
print("数值特征分析结果(前3个特征):")
for col, stats in list(numeric_analysis.items())[:3]:
print(f"\n{col}:")
for key, value in stats.items():
print(f" {key}: {value}")
# 可视化数值特征分布
analyzer.visualize_numeric_distributions()
return analyzer
if __name__ == "__main__":
analyzer = univariate_analysis_example()
3.2 多变量分析与相关性研究
class MultivariateAnalyzer:
"""多变量分析器"""
def __init__(self, df):
self.df = df
self.numeric_cols = df.select_dtypes(include=[np.number]).columns
def correlation_analysis(self, method='pearson'):
"""相关性分析"""
correlation_matrix = self.df[self.numeric_cols].corr(method=method)
# 找出高度相关的特征对
high_corr_pairs = []
for i in range(len(correlation_matrix.columns)):
for j in range(i+1, len(correlation_matrix.columns)):
corr_value = abs(correlation_matrix.iloc[i, j])
if corr_value > 0.7: # 阈值设为0.7
high_corr_pairs.append({
'feature1': correlation_matrix.columns[i],
'feature2': correlation_matrix.columns[j],
'correlation': correlation_matrix.iloc[i, j]
})
return {
'correlation_matrix': correlation_matrix,
'high_correlation_pairs': high_corr_pairs
}
def visualize_correlation_matrix(self, method='pearson', figsize=(12, 10)):
"""可视化相关性矩阵"""
correlation_matrix = self.df[self.numeric_cols].corr(method=method)
plt.figure(figsize=figsize)
mask = np.triu(np.ones_like(correlation_matrix, dtype=bool))
sns.heatmap(correlation_matrix, mask=mask, annot=True, fmt='.2f',
cmap='coolwarm', center=0, square=True,
linewidths=0.5, cbar_kws={"shrink": 0.8})
plt.title(f'特征相关性矩阵 ({method}相关系数)', fontsize=16)
plt.tight_layout()
plt.show()
def scatter_plot_matrix(self, columns=None, hue=None, figsize=(15, 15)):
"""散点图矩阵"""
if columns is None:
columns = self.numeric_cols[:5] # 限制显示5个特征
plot_data = self.df[columns].copy()
if hue and hue in self.df.columns:
plot_data[hue] = self.df[hue]
pairplot = sns.pairplot(plot_data, hue=hue, diag_kind='kde',
plot_kws={'alpha': 0.6, 's': 20},
diag_kws={'fill': True})
pairplot.fig.suptitle('散点图矩阵与分布', fontsize=16, y=1.02)
plt.tight_layout()
plt.show()
def parallel_coordinates_plot(self, columns=None, class_column=None, figsize=(14, 8)):
"""平行坐标图(用于多变量可视化)"""
if columns is None:
columns = self.numeric_cols[:6]
plot_data = self.df[columns].copy()
# 标准化数据
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(plot_data)
scaled_df = pd.DataFrame(scaled_data, columns=columns)
if class_column and class_column in self.df.columns:
scaled_df[class_column] = self.df[class_column]
plt.figure(figsize=figsize)
# 创建平行坐标图
for i, col in enumerate(columns):
plt.plot([i] * len(scaled_df), scaled_df[col], 'o', alpha=0.3, markersize=2)
plt.xticks(range(len(columns)), columns, rotation=45)
plt.xlabel('特征')
plt.ylabel('标准化值')
plt.title('平行坐标图(多变量关系可视化)')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def interaction_effects_analysis(self, feature1, feature2, target=None):
"""交互效应分析"""
if target and target in self.df.columns:
# 创建交互特征
interaction = self.df[feature1] * self.df[feature2]
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# 1. 散点图
axes[0].scatter(self.df[feature1], self.df[target], alpha=0.5, label=feature1)
axes[0].scatter(self.df[feature2], self.df[target], alpha=0.5, label=feature2)
axes[0].set_xlabel('特征值')
axes[0].set_ylabel(target)
axes[0].legend()
axes[0].set_title(f'{feature1}和{feature2}与{target}的关系')
# 2. 交互项散点图
axes[1].scatter(interaction, self.df[target], alpha=0.5)
axes[1].set_xlabel(f'{feature1} × {feature2}')
axes[1].set_ylabel(target)
axes[1].set_title('交互项与目标变量的关系')
# 3. 3D散点图
from mpl_toolkits.mplot3d import Axes3D
ax3d = fig.add_subplot(133, projection='3d')
ax3d.scatter(self.df[feature1], self.df[feature2], self.df[target],
alpha=0.5, c=self.df[target], cmap='viridis')
ax3d.set_xlabel(feature1)
ax3d.set_ylabel(feature2)
ax3d.set_zlabel(target)
ax3d.set_title('三维特征关系图')
plt.tight_layout()
plt.show()
# 计算交互效应的统计显著性
import statsmodels.api as sm
X = pd.DataFrame({
feature1: self.df[feature1],
feature2: self.df[feature2],
'interaction': interaction
})
X = sm.add_constant(X)
y = self.df[target]
model = sm.OLS(y, X).fit()
print("交互效应回归分析结果:")
print(model.summary())
return model
# 示例:多变量分析
def multivariate_analysis_example():
"""多变量分析示例"""
# 加载数据
loader = DataLoader()
housing_data = loader.load_sklearn_dataset('california_housing')
# 创建分析器
analyzer = MultivariateAnalyzer(housing_data)
# 相关性分析
corr_analysis = analyzer.correlation_analysis()
print("高度相关的特征对:")
for pair in corr_analysis['high_correlation_pairs'][:5]:
print(f"{pair['feature1']} 和 {pair['feature2']}: {pair['correlation']:.3f}")
# 可视化相关性矩阵
analyzer.visualize_correlation_matrix()
# 散点图矩阵
analyzer.scatter_plot_matrix(columns=['MedInc', 'HouseAge', 'AveRooms', 'AveBedrms', 'Population'])
# 平行坐标图
analyzer.parallel_coordinates_plot(columns=['MedInc', 'HouseAge', 'AveRooms', 'AveBedrms', 'Population', 'AveOccup'])
# 交互效应分析
if 'target' in housing_data.columns:
model = analyzer.interaction_effects_analysis('MedInc', 'HouseAge', 'target')
return analyzer
if __name__ == "__main__":
multivariate_analyzer = multivariate_analysis_example()
3.3 高级可视化技术
class AdvancedVisualizer:
"""高级可视化工具"""
def __init__(self, df):
self.df = df
def create_interactive_plot(self, x_col, y_col, color_col=None, size_col=None):
"""创建交互式散点图(使用Plotly)"""
try:
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
fig = px.scatter(self.df, x=x_col, y=y_col,
color=color_col, size=size_col,
hover_data=self.df.columns,
title=f'{x_col} vs {y_col}',
template='plotly_white')
fig.update_layout(
width=1000,
height=600,
showlegend=True,
hovermode='closest'
)
fig.show()
except ImportError:
print("Plotly未安装,使用Matplotlib替代")
self._create_static_scatter(x_col, y_col, color_col, size_col)
def _create_static_scatter(self, x_col, y_col, color_col=None, size_col=None):
"""静态散点图"""
plt.figure(figsize=(10, 6))
if color_col and color_col in self.df.columns:
scatter = plt.scatter(self.df[x_col], self.df[y_col],
c=self.df[color_col], cmap='viridis',
alpha=0.6, s=50)
plt.colorbar(scatter, label=color_col)
elif size_col and size_col in self.df.columns:
sizes = self.df[size_col] / self.df[size_col].max() * 100
plt.scatter(self.df[x_col], self.df[y_col],
s=sizes, alpha=0.6, c='blue')
else:
plt.scatter(self.df[x_col], self.df[y_col], alpha=0.6)
plt.xlabel(x_col)
plt.ylabel(y_col)
plt.title(f'{x_col} vs {y_col}')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def create_radar_chart(self, features, categories, figsize=(10, 8)):
"""创建雷达图(用于多维度比较)"""
# 标准化数据
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
scaled_features = scaler.fit_transform(self.df[features])
# 计算每个类别的平均值
radar_data = []
for category in categories:
if category in self.df.columns:
category_values = []
for feature in features:
category_mean = self.df[self.df[category] == 1][feature].mean()
category_values.append(category_mean)
radar_data.append(category_values)
# 创建雷达图
angles = np.linspace(0, 2 * np.pi, len(features), endpoint=False).tolist()
angles += angles[:1] # 闭合图形
fig, ax = plt.subplots(figsize=figsize, subplot_kw=dict(projection='polar'))
for i, data in enumerate(radar_data):
data += data[:1] # 闭合图形
ax.plot(angles, data, 'o-', linewidth=2, label=f'类别{i+1}')
ax.fill(angles, data, alpha=0.25)
ax.set_xticks(angles[:-1])
ax.set_xticklabels(features)
ax.set_ylim(0, max([max(d) for d in radar_data]) * 1.1)
ax.legend(loc='upper right', bbox_to_anchor=(1.3, 1.0))
plt.title('多维度雷达图比较')
plt.tight_layout()
plt.show()
def create_heatmap_with_clustering(self, columns=None, figsize=(12, 10)):
"""创建带聚类的热图"""
if columns is None:
columns = self.df.select_dtypes(include=[np.number]).columns
data = self.df[columns].dropna()
# 计算相关性矩阵
corr_matrix = data.corr()
# 使用层次聚类重新排列相关性矩阵
from scipy.cluster.hierarchy import dendrogram, linkage
from scipy.spatial.distance import squareform
# 将相关性转换为距离
distance_matrix = 1 - np.abs(corr_matrix)
condensed_distance = squareform(distance_matrix)
# 层次聚类
linkage_matrix = linkage(condensed_distance, method='ward')
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=figsize)
# 绘制树状图
dendrogram(linkage_matrix, labels=corr_matrix.columns, ax=ax1,
orientation='left', leaf_font_size=10)
ax1.set_title('特征聚类树状图')
# 获取聚类顺序
dendro_order = dendrogram(linkage_matrix, no_plot=True)['ivl']
ordered_features = [corr_matrix.columns[int(i)] for i in dendro_order]
# 重新排列相关性矩阵
ordered_corr = corr_matrix.loc[ordered_features, ordered_features]
# 绘制聚类后的热图
sns.heatmap(ordered_corr, annot=True, fmt='.2f', cmap='coolwarm',
center=0, square=True, ax=ax2, cbar_kws={"shrink": 0.8})
ax2.set_title('聚类后的特征相关性热图')
plt.tight_layout()
plt.show()
def create_time_series_decomposition(self, date_col, value_col, freq='M'):
"""时间序列分解(趋势、季节性、残差)"""
if date_col in self.df.columns and value_col in self.df.columns:
# 确保日期列是datetime类型
self.df[date_col] = pd.to_datetime(self.df[date_col])
# 创建时间序列
ts_data = self.df.set_index(date_col)[value_col]
ts_data = ts_data.asfreq(freq).fillna(method='ffill')
# 时间序列分解
from statsmodels.tsa.seasonal import seasonal_decompose
decomposition = seasonal_decompose(ts_data, model='additive', period=12)
fig, axes = plt.subplots(4, 1, figsize=(12, 10))
# 原始序列
axes[0].plot(ts_data, label='原始序列')
axes[0].legend()
axes[0].set_title('原始时间序列')
# 趋势成分
axes[1].plot(decomposition.trend, label='趋势', color='orange')
axes[1].legend()
axes[1].set_title('趋势成分')
# 季节性成分
axes[2].plot(decomposition.seasonal, label='季节性', color='green')
axes[2].legend()
axes[2].set_title('季节性成分')
# 残差成分
axes[3].plot(decomposition.resid, label='残差', color='red')
axes[3].legend()
axes[3].set_title('残差成分')
plt.tight_layout()
plt.show()
return decomposition
# 示例:高级可视化
def advanced_visualization_example():
"""高级可视化示例"""
# 加载数据
loader = DataLoader()
housing_data = loader.load_sklearn_dataset('california_housing')
# 创建可视化器
visualizer = AdvancedVisualizer(housing_data)
# 交互式散点图
visualizer.create_interactive_plot('MedInc', 'target', color_col='HouseAge')
# 带聚类的热图
numeric_cols = housing_data.select_dtypes(include=[np.number]).columns.tolist()
visualizer.create_heatmap_with_clustering(numeric_cols[:8])
# 创建模拟的时间序列数据用于演示
dates = pd.date_range(start='2020-01-01', periods=100, freq='D')
ts_df = pd.DataFrame({
'date': dates,
'value': np.sin(np.arange(100) * 0.1) + np.random.randn(100) * 0.2 + np.arange(100) * 0.01
})
ts_visualizer = AdvancedVisualizer(ts_df)
decomposition = ts_visualizer.create_time_series_decomposition('date', 'value')
return visualizer
if __name__ == "__main__":
advanced_visualizer = advanced_visualization_example()
第四章:数据预处理与特征工程
4.1 数据清洗与缺失值处理
class DataPreprocessor:
"""数据预处理器"""
def __init__(self, df):
self.df = df.copy()
self.preprocessing_steps = []
def handle_missing_values(self, strategy='mean', custom_values=None):
"""处理缺失值"""
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
categorical_cols = self.df.select_dtypes(include=['object', 'category']).columns
missing_report = self.df.isnull().sum()
cols_with_missing = missing_report[missing_report > 0].index.tolist()
print(f"处理缺失值前,缺失值统计:")
print(missing_report[missing_report > 0])
for col in cols_with_missing:
if col in numeric_cols:
if strategy == 'mean':
fill_value = self.df[col].mean()
elif strategy == 'median':
fill_value = self.df[col].median()
elif strategy == 'mode':
fill_value = self.df[col].mode()[0]
elif strategy == 'constant' and custom_values:
fill_value = custom_values.get(col, 0)
else:
fill_value = 0
missing_count = self.df[col].isnull().sum()
self.df[col].fillna(fill_value, inplace=True)
self.preprocessing_steps.append({
'step': 'missing_value_imputation',
'column': col,
'strategy': strategy,
'fill_value': fill_value,
'missing_count': missing_count
})
elif col in categorical_cols:
if strategy == 'mode':
fill_value = self.df[col].mode()[0] if not self.df[col].mode().empty else 'Unknown'
elif strategy == 'constant' and custom_values:
fill_value = custom_values.get(col, 'Unknown')
else:
fill_value = 'Unknown'
missing_count = self.df[col].isnull().sum()
self.df[col].fillna(fill_value, inplace=True)
self.preprocessing_steps.append({
'step': 'missing_value_imputation',
'column': col,
'strategy': strategy,
'fill_value': fill_value,
'missing_count': missing_count
})
print(f"处理缺失值后,剩余缺失值: {self.df.isnull().sum().sum()}")
return self
def remove_duplicates(self, subset=None, keep='first'):
"""移除重复值"""
before_count = len(self.df)
duplicate_count = self.df.duplicated(subset=subset).sum()
if duplicate_count > 0:
self.df = self.df.drop_duplicates(subset=subset, keep=keep)
after_count = len(self.df)
self.preprocessing_steps.append({
'step': 'remove_duplicates',
'subset': subset,
'keep': keep,
'removed_count': duplicate_count,
'before_count': before_count,
'after_count': after_count
})
print(f"移除了 {duplicate_count} 个重复行")
print(f"数据形状从 {before_count} 变为 {after_count}")
return self
def detect_and_handle_outliers(self, method='iqr', threshold=1.5):
"""检测和处理异常值"""
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
outlier_report = {}
for col in numeric_cols:
data = self.df[col].dropna()
if method == 'iqr':
Q1 = data.quantile(0.25)
Q3 = data.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - threshold * IQR
upper_bound = Q3 + threshold * IQR
outliers = self.df[(self.df[col] < lower_bound) | (self.df[col] > upper_bound)][col]
outlier_count = len(outliers)
# 处理异常值:用边界值替换
self.df.loc[self.df[col] < lower_bound, col] = lower_bound
self.df.loc[self.df[col] > upper_bound, col] = upper_bound
elif method == 'zscore':
from scipy import stats
z_scores = np.abs(stats.zscore(data))
outliers = data[z_scores > threshold]
outlier_count = len(outliers)
# 处理异常值:用中位数替换
median_value = data.median()
outlier_indices = outliers.index
self.df.loc[outlier_indices, col] = median_value
outlier_report[col] = {
'method': method,
'outlier_count': outlier_count,
'percentage': (outlier_count / len(data)) * 100
}
self.preprocessing_steps.append({
'step': 'outlier_handling',
'method': method,
'threshold': threshold,
'report': outlier_report
})
print("异常值处理报告:")
for col, report in outlier_report.items():
if report['outlier_count'] > 0:
print(f" {col}: 处理了 {report['outlier_count']} 个异常值 ({report['percentage']:.2f}%)")
return self
def convert_data_types(self, type_mapping):
"""转换数据类型"""
for col, dtype in type_mapping.items():
if col in self.df.columns:
original_dtype = str(self.df[col].dtype)
try:
self.df[col] = self.df[col].astype(dtype)
self.preprocessing_steps.append({
'step': 'data_type_conversion',
'column': col,
'from': original_dtype,
'to': dtype
})
print(f"将列 '{
更多推荐
所有评论(0)