Python机器学习全流程实战:从数据探索到模型部署

摘要

本文全面介绍了使用Python进行机器学习的完整流程,涵盖了数据预处理、特征工程、模型训练、评估优化和部署应用等关键环节。通过实际案例演示,结合详细的代码实现和可视化分析,为读者提供了一套可复现的机器学习实践框架。文章包含超过20个完整的代码示例,30余张可视化图表,深入探讨了Scikit-learn、Pandas、Matplotlib、Seaborn、XGBoost等主流库的应用技巧,并引入了模型解释性、自动化机器学习等前沿话题。

第一章:引言与背景

1.1 机器学习的发展现状

机器学习作为人工智能的核心分支,在过去十年中取得了突破性进展。根据Gartner的技术成熟度曲线,机器学习技术已从期望膨胀期进入实质生产的高原期,在各行各业展现出强大的应用价值。Python凭借其简洁的语法、丰富的生态库和强大的社区支持,已成为机器学习领域的首选编程语言。

1.2 Python机器学习生态系统

Python的机器学习生态系统呈现出多层次、模块化的特点:

  1. 基础科学计算库:NumPy、SciPy提供高效的数值计算能力
  2. 数据处理库:Pandas、Dask处理结构化数据的利器
  3. 可视化库:Matplotlib、Seaborn、Plotly实现数据可视化
  4. 机器学习框架:Scikit-learn提供经典算法实现
  5. 深度学习框架:TensorFlow、PyTorch支持神经网络模型
  6. 自动化工具:AutoML、TPOT简化模型构建流程

1.3 本文结构与目标

本文采用"理论-实践-应用"的三段式结构,旨在帮助读者:

  1. 掌握机器学习的基本原理和算法思想
  2. 熟练使用Python进行数据分析和模型构建
  3. 理解模型评估和优化的方法论
  4. 学会将机器学习模型部署到生产环境

第二章:环境配置与数据准备

2.1 Python环境搭建

# 环境配置与依赖管理
import sys
import platform
import subprocess

def check_environment():
    """检查Python环境配置"""
    print(f"Python版本: {sys.version}")
    print(f"操作系统: {platform.system()} {platform.release()}")
    print(f"处理器架构: {platform.machine()}")
    
    # 检查关键库的版本
    required_libraries = {
        'numpy': '1.21.0',
        'pandas': '1.3.0',
        'scikit-learn': '1.0.0',
        'matplotlib': '3.4.0',
        'seaborn': '0.11.0',
        'xgboost': '1.5.0'
    }
    
    for lib, min_version in required_libraries.items():
        try:
            module = __import__(lib)
            version = getattr(module, '__version__', '未知')
            print(f"{lib}: {version}")
        except ImportError:
            print(f"{lib}: 未安装")

# 创建虚拟环境(示例命令)
def create_virtual_env():
    """创建Python虚拟环境"""
    commands = [
        'python -m venv ml_env',
        'source ml_env/bin/activate',  # Linux/Mac
        # 'ml_env\\Scripts\\activate',  # Windows
        'pip install --upgrade pip',
        'pip install numpy pandas scikit-learn matplotlib seaborn xgboost jupyter'
    ]
    
    print("建议执行以下命令创建环境:")
    for cmd in commands:
        print(f"  {cmd}")

if __name__ == "__main__":
    check_environment()
    create_virtual_env()

2.2 数据获取与加载

import pandas as pd
import numpy as np
from sklearn.datasets import fetch_california_housing, load_iris, load_wine
import requests
import zipfile
import io
import os

class DataLoader:
    """数据加载器类"""
    
    def __init__(self, data_dir='./data'):
        self.data_dir = data_dir
        os.makedirs(data_dir, exist_ok=True)
    
    def load_sklearn_dataset(self, dataset_name='california_housing'):
        """加载Scikit-learn内置数据集"""
        datasets = {
            'california_housing': fetch_california_housing,
            'iris': load_iris,
            'wine': load_wine,
            'diabetes': lambda: load_diabetes(return_X_y=False),
            'breast_cancer': load_breast_cancer
        }
        
        if dataset_name in datasets:
            data = datasets[dataset_name]()
            
            if dataset_name == 'california_housing':
                df = pd.DataFrame(data.data, columns=data.feature_names)
                df['target'] = data.target
            else:
                df = pd.DataFrame(data.data, columns=data.feature_names)
                df['target'] = data.target
            
            print(f"数据集 '{dataset_name}' 加载成功")
            print(f"数据形状: {df.shape}")
            print(f"特征列: {list(df.columns[:-1])}")
            print(f"目标列: {df.columns[-1]}")
            
            return df
        else:
            raise ValueError(f"不支持的数据集: {dataset_name}")
    
    def load_csv_from_url(self, url, filename=None):
        """从URL加载CSV数据"""
        if filename is None:
            filename = url.split('/')[-1]
        
        filepath = os.path.join(self.data_dir, filename)
        
        if not os.path.exists(filepath):
            print(f"从 {url} 下载数据...")
            response = requests.get(url)
            
            if response.status_code == 200:
                with open(filepath, 'wb') as f:
                    f.write(response.content)
                print(f"数据已保存到 {filepath}")
            else:
                raise Exception(f"下载失败,状态码: {response.status_code}")
        
        df = pd.read_csv(filepath)
        print(f"CSV数据加载成功,形状: {df.shape}")
        return df
    
    def load_compressed_data(self, url, extract_dir=None):
        """加载压缩格式数据"""
        if extract_dir is None:
            extract_dir = self.data_dir
        
        print(f"下载并解压数据: {url}")
        response = requests.get(url)
        
        if response.status_code == 200:
            with zipfile.ZipFile(io.BytesIO(response.content)) as z:
                z.extractall(extract_dir)
                print(f"数据解压到: {extract_dir}")
                
                # 返回解压后的文件列表
                return z.namelist()
        else:
            raise Exception(f"下载失败,状态码: {response.status_code}")

# 示例:加载多个数据集
def load_example_datasets():
    """加载示例数据集"""
    loader = DataLoader()
    
    # 加载加州房价数据集
    housing_data = loader.load_sklearn_dataset('california_housing')
    
    # 加载鸢尾花数据集
    iris_data = loader.load_sklearn_dataset('iris')
    
    # 加载葡萄酒数据集
    wine_data = loader.load_sklearn_dataset('wine')
    
    return {
        'housing': housing_data,
        'iris': iris_data,
        'wine': wine_data
    }

if __name__ == "__main__":
    datasets = load_example_datasets()
    
    # 显示数据集基本信息
    for name, df in datasets.items():
        print(f"\n{name}数据集:")
        print(df.info())
        print(f"缺失值统计:\n{df.isnull().sum()}")
        print(f"描述性统计:\n{df.describe()}")

2.3 数据质量评估

import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats

class DataQualityAnalyzer:
    """数据质量分析器"""
    
    def __init__(self, df):
        self.df = df
        self.report = {}
    
    def generate_quality_report(self):
        """生成完整的数据质量报告"""
        self._check_missing_values()
        self._check_data_types()
        self._check_statistical_properties()
        self._check_outliers()
        self._check_duplicates()
        
        return self.report
    
    def _check_missing_values(self):
        """检查缺失值"""
        missing_info = self.df.isnull().sum()
        missing_percentage = (missing_info / len(self.df)) * 100
        
        self.report['missing_values'] = {
            'count': missing_info.to_dict(),
            'percentage': missing_percentage.to_dict(),
            'total_missing': missing_info.sum(),
            'total_percentage': missing_percentage.sum()
        }
    
    def _check_data_types(self):
        """检查数据类型"""
        dtype_info = self.df.dtypes
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns.tolist()
        categorical_cols = self.df.select_dtypes(include=['object', 'category']).columns.tolist()
        
        self.report['data_types'] = {
            'dtypes': dtype_info.to_dict(),
            'numeric_columns': numeric_cols,
            'categorical_columns': categorical_cols,
            'total_columns': len(self.df.columns)
        }
    
    def _check_statistical_properties(self):
        """检查统计属性"""
        numeric_df = self.df.select_dtypes(include=[np.number])
        
        stats_info = {
            'mean': numeric_df.mean().to_dict(),
            'std': numeric_df.std().to_dict(),
            'min': numeric_df.min().to_dict(),
            'max': numeric_df.max().to_dict(),
            'skewness': numeric_df.apply(lambda x: stats.skew(x.dropna())).to_dict(),
            'kurtosis': numeric_df.apply(lambda x: stats.kurtosis(x.dropna())).to_dict()
        }
        
        self.report['statistics'] = stats_info
    
    def _check_outliers(self):
        """使用IQR方法检测异常值"""
        numeric_df = self.df.select_dtypes(include=[np.number])
        outliers_info = {}
        
        for col in numeric_df.columns:
            Q1 = numeric_df[col].quantile(0.25)
            Q3 = numeric_df[col].quantile(0.75)
            IQR = Q3 - Q1
            
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            
            outliers = numeric_df[(numeric_df[col] < lower_bound) | (numeric_df[col] > upper_bound)][col]
            outliers_info[col] = {
                'count': len(outliers),
                'percentage': (len(outliers) / len(numeric_df)) * 100,
                'lower_bound': lower_bound,
                'upper_bound': upper_bound
            }
        
        self.report['outliers'] = outliers_info
    
    def _check_duplicates(self):
        """检查重复值"""
        duplicate_count = self.df.duplicated().sum()
        
        self.report['duplicates'] = {
            'count': duplicate_count,
            'percentage': (duplicate_count / len(self.df)) * 100
        }
    
    def visualize_quality_report(self):
        """可视化数据质量报告"""
        fig, axes = plt.subplots(2, 3, figsize=(15, 10))
        fig.suptitle('数据质量分析报告', fontsize=16)
        
        # 1. 缺失值热图
        plt.subplot(2, 3, 1)
        sns.heatmap(self.df.isnull(), cbar=False, cmap='viridis')
        plt.title('缺失值分布热图')
        
        # 2. 数据类型分布
        plt.subplot(2, 3, 2)
        dtype_counts = self.df.dtypes.value_counts()
        plt.pie(dtype_counts.values, labels=dtype_counts.index, autopct='%1.1f%%')
        plt.title('数据类型分布')
        
        # 3. 数值特征分布
        plt.subplot(2, 3, 3)
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns
        if len(numeric_cols) > 0:
            sample_col = numeric_cols[0]
            self.df[sample_col].hist(bins=30, ax=plt.gca())
            plt.title(f'{sample_col}分布直方图')
        
        # 4. 异常值箱线图
        plt.subplot(2, 3, 4)
        if len(numeric_cols) > 0:
            sample_data = self.df[numeric_cols[:3]]  # 取前3个数值特征
            sample_data.boxplot(ax=plt.gca())
            plt.title('异常值检测(箱线图)')
            plt.xticks(rotation=45)
        
        # 5. 相关性热图
        plt.subplot(2, 3, 5)
        if len(numeric_cols) > 1:
            correlation = self.df[numeric_cols].corr()
            sns.heatmap(correlation, annot=True, fmt='.2f', cmap='coolwarm', 
                       square=True, ax=plt.gca())
            plt.title('特征相关性热图')
        
        # 6. 数据质量指标汇总
        plt.subplot(2, 3, 6)
        quality_metrics = [
            f"总样本数: {len(self.df)}",
            f"总特征数: {len(self.df.columns)}",
            f"缺失值比例: {self.report['missing_values']['total_percentage']:.2f}%",
            f"重复值比例: {self.report['duplicates']['percentage']:.2f}%",
            f"数值特征: {len(self.report['data_types']['numeric_columns'])}",
            f"分类特征: {len(self.report['data_types']['categorical_columns'])}"
        ]
        
        plt.text(0.1, 0.5, '\n'.join(quality_metrics), 
                fontsize=12, verticalalignment='center')
        plt.axis('off')
        plt.title('数据质量指标汇总')
        
        plt.tight_layout()
        plt.show()

# 示例:分析数据质量
def analyze_data_quality_example():
    """数据质量分析示例"""
    # 加载数据
    loader = DataLoader()
    housing_data = loader.load_sklearn_dataset('california_housing')
    
    # 创建分析器
    analyzer = DataQualityAnalyzer(housing_data)
    
    # 生成报告
    report = analyzer.generate_quality_report()
    
    print("数据质量报告摘要:")
    print(f"缺失值总数: {report['missing_values']['total_missing']}")
    print(f"缺失值比例: {report['missing_values']['total_percentage']:.2f}%")
    print(f"重复值数量: {report['duplicates']['count']}")
    print(f"数值特征数量: {len(report['data_types']['numeric_columns'])}")
    
    # 可视化报告
    analyzer.visualize_quality_report()
    
    return report

if __name__ == "__main__":
    quality_report = analyze_data_quality_example()

第三章:探索性数据分析与可视化

3.1 单变量分析

class UnivariateAnalyzer:
    """单变量分析器"""
    
    def __init__(self, df):
        self.df = df
        self.numeric_cols = df.select_dtypes(include=[np.number]).columns
        self.categorical_cols = df.select_dtypes(include=['object', 'category']).columns
    
    def analyze_numeric_features(self, columns=None):
        """分析数值特征"""
        if columns is None:
            columns = self.numeric_cols
        
        analysis_results = {}
        
        for col in columns:
            if col in self.df.columns:
                data = self.df[col].dropna()
                
                # 基本统计量
                stats = {
                    'count': len(data),
                    'mean': data.mean(),
                    'std': data.std(),
                    'min': data.min(),
                    '25%': data.quantile(0.25),
                    'median': data.median(),
                    '75%': data.quantile(0.75),
                    'max': data.max(),
                    'skewness': stats.skew(data),
                    'kurtosis': stats.kurtosis(data),
                    'missing': self.df[col].isnull().sum(),
                    'missing_percentage': (self.df[col].isnull().sum() / len(self.df)) * 100
                }
                
                analysis_results[col] = stats
        
        return analysis_results
    
    def analyze_categorical_features(self, columns=None):
        """分析分类特征"""
        if columns is None:
            columns = self.categorical_cols
        
        analysis_results = {}
        
        for col in columns:
            if col in self.df.columns:
                data = self.df[col]
                
                # 分类特征统计
                stats = {
                    'count': len(data.dropna()),
                    'unique': data.nunique(),
                    'top': data.mode()[0] if not data.mode().empty else None,
                    'freq': data.value_counts().iloc[0] if not data.value_counts().empty else 0,
                    'missing': data.isnull().sum(),
                    'missing_percentage': (data.isnull().sum() / len(data)) * 100,
                    'value_counts': data.value_counts().to_dict()
                }
                
                analysis_results[col] = stats
        
        return analysis_results
    
    def visualize_numeric_distributions(self, columns=None, figsize=(15, 10)):
        """可视化数值特征分布"""
        if columns is None:
            columns = self.numeric_cols[:6]  # 限制显示6个特征
        
        n_cols = min(3, len(columns))
        n_rows = (len(columns) + n_cols - 1) // n_cols
        
        fig, axes = plt.subplots(n_rows, n_cols, figsize=figsize)
        axes = axes.flatten() if n_rows > 1 else [axes]
        
        for idx, col in enumerate(columns):
            if idx < len(axes):
                ax = axes[idx]
                data = self.df[col].dropna()
                
                # 直方图与密度曲线
                sns.histplot(data, kde=True, ax=ax, bins=30)
                ax.set_title(f'{col}分布')
                ax.set_xlabel(col)
                ax.set_ylabel('频数')
                
                # 添加统计信息
                stats_text = f'均值: {data.mean():.2f}\n标准差: {data.std():.2f}\n偏度: {stats.skew(data):.2f}'
                ax.text(0.05, 0.95, stats_text, transform=ax.transAxes,
                       verticalalignment='top', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
        
        # 隐藏多余的子图
        for idx in range(len(columns), len(axes)):
            axes[idx].set_visible(False)
        
        plt.tight_layout()
        plt.show()
    
    def visualize_categorical_distributions(self, columns=None, figsize=(15, 8)):
        """可视化分类特征分布"""
        if columns is None:
            columns = self.categorical_cols[:4]  # 限制显示4个特征
        
        n_cols = min(2, len(columns))
        n_rows = (len(columns) + n_cols - 1) // n_cols
        
        fig, axes = plt.subplots(n_rows, n_cols, figsize=figsize)
        axes = axes.flatten() if n_rows > 1 else [axes]
        
        for idx, col in enumerate(columns):
            if idx < len(axes):
                ax = axes[idx]
                data = self.df[col].dropna()
                value_counts = data.value_counts()
                
                # 条形图
                if len(value_counts) > 10:
                    # 如果类别太多,只显示前10个
                    top_values = value_counts.head(10)
                    bars = ax.bar(range(len(top_values)), top_values.values)
                    ax.set_xticks(range(len(top_values)))
                    ax.set_xticklabels(top_values.index, rotation=45, ha='right')
                else:
                    bars = ax.bar(range(len(value_counts)), value_counts.values)
                    ax.set_xticks(range(len(value_counts)))
                    ax.set_xticklabels(value_counts.index, rotation=45, ha='right')
                
                ax.set_title(f'{col}分布')
                ax.set_xlabel(col)
                ax.set_ylabel('频数')
                
                # 在条形上显示数值
                for bar in bars:
                    height = bar.get_height()
                    ax.text(bar.get_x() + bar.get_width()/2., height,
                           f'{int(height)}', ha='center', va='bottom')
        
        # 隐藏多余的子图
        for idx in range(len(columns), len(axes)):
            axes[idx].set_visible(False)
        
        plt.tight_layout()
        plt.show()

# 示例:单变量分析
def univariate_analysis_example():
    """单变量分析示例"""
    # 加载数据
    loader = DataLoader()
    housing_data = loader.load_sklearn_dataset('california_housing')
    
    # 创建分析器
    analyzer = UnivariateAnalyzer(housing_data)
    
    # 分析数值特征
    numeric_analysis = analyzer.analyze_numeric_features()
    print("数值特征分析结果(前3个特征):")
    for col, stats in list(numeric_analysis.items())[:3]:
        print(f"\n{col}:")
        for key, value in stats.items():
            print(f"  {key}: {value}")
    
    # 可视化数值特征分布
    analyzer.visualize_numeric_distributions()
    
    return analyzer

if __name__ == "__main__":
    analyzer = univariate_analysis_example()

3.2 多变量分析与相关性研究

class MultivariateAnalyzer:
    """多变量分析器"""
    
    def __init__(self, df):
        self.df = df
        self.numeric_cols = df.select_dtypes(include=[np.number]).columns
    
    def correlation_analysis(self, method='pearson'):
        """相关性分析"""
        correlation_matrix = self.df[self.numeric_cols].corr(method=method)
        
        # 找出高度相关的特征对
        high_corr_pairs = []
        for i in range(len(correlation_matrix.columns)):
            for j in range(i+1, len(correlation_matrix.columns)):
                corr_value = abs(correlation_matrix.iloc[i, j])
                if corr_value > 0.7:  # 阈值设为0.7
                    high_corr_pairs.append({
                        'feature1': correlation_matrix.columns[i],
                        'feature2': correlation_matrix.columns[j],
                        'correlation': correlation_matrix.iloc[i, j]
                    })
        
        return {
            'correlation_matrix': correlation_matrix,
            'high_correlation_pairs': high_corr_pairs
        }
    
    def visualize_correlation_matrix(self, method='pearson', figsize=(12, 10)):
        """可视化相关性矩阵"""
        correlation_matrix = self.df[self.numeric_cols].corr(method=method)
        
        plt.figure(figsize=figsize)
        mask = np.triu(np.ones_like(correlation_matrix, dtype=bool))
        
        sns.heatmap(correlation_matrix, mask=mask, annot=True, fmt='.2f',
                   cmap='coolwarm', center=0, square=True,
                   linewidths=0.5, cbar_kws={"shrink": 0.8})
        
        plt.title(f'特征相关性矩阵 ({method}相关系数)', fontsize=16)
        plt.tight_layout()
        plt.show()
    
    def scatter_plot_matrix(self, columns=None, hue=None, figsize=(15, 15)):
        """散点图矩阵"""
        if columns is None:
            columns = self.numeric_cols[:5]  # 限制显示5个特征
        
        plot_data = self.df[columns].copy()
        
        if hue and hue in self.df.columns:
            plot_data[hue] = self.df[hue]
        
        pairplot = sns.pairplot(plot_data, hue=hue, diag_kind='kde',
                               plot_kws={'alpha': 0.6, 's': 20},
                               diag_kws={'fill': True})
        
        pairplot.fig.suptitle('散点图矩阵与分布', fontsize=16, y=1.02)
        plt.tight_layout()
        plt.show()
    
    def parallel_coordinates_plot(self, columns=None, class_column=None, figsize=(14, 8)):
        """平行坐标图(用于多变量可视化)"""
        if columns is None:
            columns = self.numeric_cols[:6]
        
        plot_data = self.df[columns].copy()
        
        # 标准化数据
        from sklearn.preprocessing import MinMaxScaler
        scaler = MinMaxScaler()
        scaled_data = scaler.fit_transform(plot_data)
        scaled_df = pd.DataFrame(scaled_data, columns=columns)
        
        if class_column and class_column in self.df.columns:
            scaled_df[class_column] = self.df[class_column]
        
        plt.figure(figsize=figsize)
        
        # 创建平行坐标图
        for i, col in enumerate(columns):
            plt.plot([i] * len(scaled_df), scaled_df[col], 'o', alpha=0.3, markersize=2)
        
        plt.xticks(range(len(columns)), columns, rotation=45)
        plt.xlabel('特征')
        plt.ylabel('标准化值')
        plt.title('平行坐标图(多变量关系可视化)')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()
    
    def interaction_effects_analysis(self, feature1, feature2, target=None):
        """交互效应分析"""
        if target and target in self.df.columns:
            # 创建交互特征
            interaction = self.df[feature1] * self.df[feature2]
            
            fig, axes = plt.subplots(1, 3, figsize=(15, 5))
            
            # 1. 散点图
            axes[0].scatter(self.df[feature1], self.df[target], alpha=0.5, label=feature1)
            axes[0].scatter(self.df[feature2], self.df[target], alpha=0.5, label=feature2)
            axes[0].set_xlabel('特征值')
            axes[0].set_ylabel(target)
            axes[0].legend()
            axes[0].set_title(f'{feature1}和{feature2}与{target}的关系')
            
            # 2. 交互项散点图
            axes[1].scatter(interaction, self.df[target], alpha=0.5)
            axes[1].set_xlabel(f'{feature1} × {feature2}')
            axes[1].set_ylabel(target)
            axes[1].set_title('交互项与目标变量的关系')
            
            # 3. 3D散点图
            from mpl_toolkits.mplot3d import Axes3D
            ax3d = fig.add_subplot(133, projection='3d')
            ax3d.scatter(self.df[feature1], self.df[feature2], self.df[target], 
                        alpha=0.5, c=self.df[target], cmap='viridis')
            ax3d.set_xlabel(feature1)
            ax3d.set_ylabel(feature2)
            ax3d.set_zlabel(target)
            ax3d.set_title('三维特征关系图')
            
            plt.tight_layout()
            plt.show()
            
            # 计算交互效应的统计显著性
            import statsmodels.api as sm
            X = pd.DataFrame({
                feature1: self.df[feature1],
                feature2: self.df[feature2],
                'interaction': interaction
            })
            X = sm.add_constant(X)
            y = self.df[target]
            
            model = sm.OLS(y, X).fit()
            print("交互效应回归分析结果:")
            print(model.summary())
            
            return model

# 示例:多变量分析
def multivariate_analysis_example():
    """多变量分析示例"""
    # 加载数据
    loader = DataLoader()
    housing_data = loader.load_sklearn_dataset('california_housing')
    
    # 创建分析器
    analyzer = MultivariateAnalyzer(housing_data)
    
    # 相关性分析
    corr_analysis = analyzer.correlation_analysis()
    print("高度相关的特征对:")
    for pair in corr_analysis['high_correlation_pairs'][:5]:
        print(f"{pair['feature1']} 和 {pair['feature2']}: {pair['correlation']:.3f}")
    
    # 可视化相关性矩阵
    analyzer.visualize_correlation_matrix()
    
    # 散点图矩阵
    analyzer.scatter_plot_matrix(columns=['MedInc', 'HouseAge', 'AveRooms', 'AveBedrms', 'Population'])
    
    # 平行坐标图
    analyzer.parallel_coordinates_plot(columns=['MedInc', 'HouseAge', 'AveRooms', 'AveBedrms', 'Population', 'AveOccup'])
    
    # 交互效应分析
    if 'target' in housing_data.columns:
        model = analyzer.interaction_effects_analysis('MedInc', 'HouseAge', 'target')
    
    return analyzer

if __name__ == "__main__":
    multivariate_analyzer = multivariate_analysis_example()

3.3 高级可视化技术

class AdvancedVisualizer:
    """高级可视化工具"""
    
    def __init__(self, df):
        self.df = df
    
    def create_interactive_plot(self, x_col, y_col, color_col=None, size_col=None):
        """创建交互式散点图(使用Plotly)"""
        try:
            import plotly.express as px
            import plotly.graph_objects as go
            from plotly.subplots import make_subplots
            
            fig = px.scatter(self.df, x=x_col, y=y_col, 
                           color=color_col, size=size_col,
                           hover_data=self.df.columns,
                           title=f'{x_col} vs {y_col}',
                           template='plotly_white')
            
            fig.update_layout(
                width=1000,
                height=600,
                showlegend=True,
                hovermode='closest'
            )
            
            fig.show()
            
        except ImportError:
            print("Plotly未安装,使用Matplotlib替代")
            self._create_static_scatter(x_col, y_col, color_col, size_col)
    
    def _create_static_scatter(self, x_col, y_col, color_col=None, size_col=None):
        """静态散点图"""
        plt.figure(figsize=(10, 6))
        
        if color_col and color_col in self.df.columns:
            scatter = plt.scatter(self.df[x_col], self.df[y_col], 
                                c=self.df[color_col], cmap='viridis',
                                alpha=0.6, s=50)
            plt.colorbar(scatter, label=color_col)
        elif size_col and size_col in self.df.columns:
            sizes = self.df[size_col] / self.df[size_col].max() * 100
            plt.scatter(self.df[x_col], self.df[y_col], 
                       s=sizes, alpha=0.6, c='blue')
        else:
            plt.scatter(self.df[x_col], self.df[y_col], alpha=0.6)
        
        plt.xlabel(x_col)
        plt.ylabel(y_col)
        plt.title(f'{x_col} vs {y_col}')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()
    
    def create_radar_chart(self, features, categories, figsize=(10, 8)):
        """创建雷达图(用于多维度比较)"""
        # 标准化数据
        from sklearn.preprocessing import MinMaxScaler
        scaler = MinMaxScaler()
        scaled_features = scaler.fit_transform(self.df[features])
        
        # 计算每个类别的平均值
        radar_data = []
        for category in categories:
            if category in self.df.columns:
                category_values = []
                for feature in features:
                    category_mean = self.df[self.df[category] == 1][feature].mean()
                    category_values.append(category_mean)
                radar_data.append(category_values)
        
        # 创建雷达图
        angles = np.linspace(0, 2 * np.pi, len(features), endpoint=False).tolist()
        angles += angles[:1]  # 闭合图形
        
        fig, ax = plt.subplots(figsize=figsize, subplot_kw=dict(projection='polar'))
        
        for i, data in enumerate(radar_data):
            data += data[:1]  # 闭合图形
            ax.plot(angles, data, 'o-', linewidth=2, label=f'类别{i+1}')
            ax.fill(angles, data, alpha=0.25)
        
        ax.set_xticks(angles[:-1])
        ax.set_xticklabels(features)
        ax.set_ylim(0, max([max(d) for d in radar_data]) * 1.1)
        ax.legend(loc='upper right', bbox_to_anchor=(1.3, 1.0))
        plt.title('多维度雷达图比较')
        plt.tight_layout()
        plt.show()
    
    def create_heatmap_with_clustering(self, columns=None, figsize=(12, 10)):
        """创建带聚类的热图"""
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns
        
        data = self.df[columns].dropna()
        
        # 计算相关性矩阵
        corr_matrix = data.corr()
        
        # 使用层次聚类重新排列相关性矩阵
        from scipy.cluster.hierarchy import dendrogram, linkage
        from scipy.spatial.distance import squareform
        
        # 将相关性转换为距离
        distance_matrix = 1 - np.abs(corr_matrix)
        condensed_distance = squareform(distance_matrix)
        
        # 层次聚类
        linkage_matrix = linkage(condensed_distance, method='ward')
        
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=figsize)
        
        # 绘制树状图
        dendrogram(linkage_matrix, labels=corr_matrix.columns, ax=ax1,
                  orientation='left', leaf_font_size=10)
        ax1.set_title('特征聚类树状图')
        
        # 获取聚类顺序
        dendro_order = dendrogram(linkage_matrix, no_plot=True)['ivl']
        ordered_features = [corr_matrix.columns[int(i)] for i in dendro_order]
        
        # 重新排列相关性矩阵
        ordered_corr = corr_matrix.loc[ordered_features, ordered_features]
        
        # 绘制聚类后的热图
        sns.heatmap(ordered_corr, annot=True, fmt='.2f', cmap='coolwarm',
                   center=0, square=True, ax=ax2, cbar_kws={"shrink": 0.8})
        ax2.set_title('聚类后的特征相关性热图')
        
        plt.tight_layout()
        plt.show()
    
    def create_time_series_decomposition(self, date_col, value_col, freq='M'):
        """时间序列分解(趋势、季节性、残差)"""
        if date_col in self.df.columns and value_col in self.df.columns:
            # 确保日期列是datetime类型
            self.df[date_col] = pd.to_datetime(self.df[date_col])
            
            # 创建时间序列
            ts_data = self.df.set_index(date_col)[value_col]
            ts_data = ts_data.asfreq(freq).fillna(method='ffill')
            
            # 时间序列分解
            from statsmodels.tsa.seasonal import seasonal_decompose
            
            decomposition = seasonal_decompose(ts_data, model='additive', period=12)
            
            fig, axes = plt.subplots(4, 1, figsize=(12, 10))
            
            # 原始序列
            axes[0].plot(ts_data, label='原始序列')
            axes[0].legend()
            axes[0].set_title('原始时间序列')
            
            # 趋势成分
            axes[1].plot(decomposition.trend, label='趋势', color='orange')
            axes[1].legend()
            axes[1].set_title('趋势成分')
            
            # 季节性成分
            axes[2].plot(decomposition.seasonal, label='季节性', color='green')
            axes[2].legend()
            axes[2].set_title('季节性成分')
            
            # 残差成分
            axes[3].plot(decomposition.resid, label='残差', color='red')
            axes[3].legend()
            axes[3].set_title('残差成分')
            
            plt.tight_layout()
            plt.show()
            
            return decomposition

# 示例:高级可视化
def advanced_visualization_example():
    """高级可视化示例"""
    # 加载数据
    loader = DataLoader()
    housing_data = loader.load_sklearn_dataset('california_housing')
    
    # 创建可视化器
    visualizer = AdvancedVisualizer(housing_data)
    
    # 交互式散点图
    visualizer.create_interactive_plot('MedInc', 'target', color_col='HouseAge')
    
    # 带聚类的热图
    numeric_cols = housing_data.select_dtypes(include=[np.number]).columns.tolist()
    visualizer.create_heatmap_with_clustering(numeric_cols[:8])
    
    # 创建模拟的时间序列数据用于演示
    dates = pd.date_range(start='2020-01-01', periods=100, freq='D')
    ts_df = pd.DataFrame({
        'date': dates,
        'value': np.sin(np.arange(100) * 0.1) + np.random.randn(100) * 0.2 + np.arange(100) * 0.01
    })
    
    ts_visualizer = AdvancedVisualizer(ts_df)
    decomposition = ts_visualizer.create_time_series_decomposition('date', 'value')
    
    return visualizer

if __name__ == "__main__":
    advanced_visualizer = advanced_visualization_example()

第四章:数据预处理与特征工程

4.1 数据清洗与缺失值处理

class DataPreprocessor:
    """数据预处理器"""
    
    def __init__(self, df):
        self.df = df.copy()
        self.preprocessing_steps = []
    
    def handle_missing_values(self, strategy='mean', custom_values=None):
        """处理缺失值"""
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns
        categorical_cols = self.df.select_dtypes(include=['object', 'category']).columns
        
        missing_report = self.df.isnull().sum()
        cols_with_missing = missing_report[missing_report > 0].index.tolist()
        
        print(f"处理缺失值前,缺失值统计:")
        print(missing_report[missing_report > 0])
        
        for col in cols_with_missing:
            if col in numeric_cols:
                if strategy == 'mean':
                    fill_value = self.df[col].mean()
                elif strategy == 'median':
                    fill_value = self.df[col].median()
                elif strategy == 'mode':
                    fill_value = self.df[col].mode()[0]
                elif strategy == 'constant' and custom_values:
                    fill_value = custom_values.get(col, 0)
                else:
                    fill_value = 0
                
                missing_count = self.df[col].isnull().sum()
                self.df[col].fillna(fill_value, inplace=True)
                
                self.preprocessing_steps.append({
                    'step': 'missing_value_imputation',
                    'column': col,
                    'strategy': strategy,
                    'fill_value': fill_value,
                    'missing_count': missing_count
                })
                
            elif col in categorical_cols:
                if strategy == 'mode':
                    fill_value = self.df[col].mode()[0] if not self.df[col].mode().empty else 'Unknown'
                elif strategy == 'constant' and custom_values:
                    fill_value = custom_values.get(col, 'Unknown')
                else:
                    fill_value = 'Unknown'
                
                missing_count = self.df[col].isnull().sum()
                self.df[col].fillna(fill_value, inplace=True)
                
                self.preprocessing_steps.append({
                    'step': 'missing_value_imputation',
                    'column': col,
                    'strategy': strategy,
                    'fill_value': fill_value,
                    'missing_count': missing_count
                })
        
        print(f"处理缺失值后,剩余缺失值: {self.df.isnull().sum().sum()}")
        return self
    
    def remove_duplicates(self, subset=None, keep='first'):
        """移除重复值"""
        before_count = len(self.df)
        duplicate_count = self.df.duplicated(subset=subset).sum()
        
        if duplicate_count > 0:
            self.df = self.df.drop_duplicates(subset=subset, keep=keep)
            after_count = len(self.df)
            
            self.preprocessing_steps.append({
                'step': 'remove_duplicates',
                'subset': subset,
                'keep': keep,
                'removed_count': duplicate_count,
                'before_count': before_count,
                'after_count': after_count
            })
            
            print(f"移除了 {duplicate_count} 个重复行")
            print(f"数据形状从 {before_count} 变为 {after_count}")
        
        return self
    
    def detect_and_handle_outliers(self, method='iqr', threshold=1.5):
        """检测和处理异常值"""
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns
        
        outlier_report = {}
        
        for col in numeric_cols:
            data = self.df[col].dropna()
            
            if method == 'iqr':
                Q1 = data.quantile(0.25)
                Q3 = data.quantile(0.75)
                IQR = Q3 - Q1
                
                lower_bound = Q1 - threshold * IQR
                upper_bound = Q3 + threshold * IQR
                
                outliers = self.df[(self.df[col] < lower_bound) | (self.df[col] > upper_bound)][col]
                outlier_count = len(outliers)
                
                # 处理异常值:用边界值替换
                self.df.loc[self.df[col] < lower_bound, col] = lower_bound
                self.df.loc[self.df[col] > upper_bound, col] = upper_bound
                
            elif method == 'zscore':
                from scipy import stats
                z_scores = np.abs(stats.zscore(data))
                outliers = data[z_scores > threshold]
                outlier_count = len(outliers)
                
                # 处理异常值:用中位数替换
                median_value = data.median()
                outlier_indices = outliers.index
                self.df.loc[outlier_indices, col] = median_value
            
            outlier_report[col] = {
                'method': method,
                'outlier_count': outlier_count,
                'percentage': (outlier_count / len(data)) * 100
            }
        
        self.preprocessing_steps.append({
            'step': 'outlier_handling',
            'method': method,
            'threshold': threshold,
            'report': outlier_report
        })
        
        print("异常值处理报告:")
        for col, report in outlier_report.items():
            if report['outlier_count'] > 0:
                print(f"  {col}: 处理了 {report['outlier_count']} 个异常值 ({report['percentage']:.2f}%)")
        
        return self
    
    def convert_data_types(self, type_mapping):
        """转换数据类型"""
        for col, dtype in type_mapping.items():
            if col in self.df.columns:
                original_dtype = str(self.df[col].dtype)
                try:
                    self.df[col] = self.df[col].astype(dtype)
                    self.preprocessing_steps.append({
                        'step': 'data_type_conversion',
                        'column': col,
                        'from': original_dtype,
                        'to': dtype
                    })
                    print(f"将列 '{

 

 

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐