Python数据分析案例——KYC客户风险等级评分模型
本文展示了一个完整的KYC客户风险等级评分模型构建流程,基于7791条模拟客户数据。项目从数据读取、探索性分析(包括可视化)、数据预处理(异常值处理)开始,通过特征工程创建新特征。随后比较了逻辑回归、决策树、随机森林和梯度提升树四种模型,采用交叉验证和网格搜索优化随机森林参数。最终模型在测试集上表现良好,并进行了特征重要性分析和风险评分转换。该流程全面规范,可作为机器学习案例模板,方便替换数据用于
·
机器学习全流程,读取数据,探索性分析,可视化,预处理,特征工程,异常值处理。建模,模型对比,交叉验证,超参数搜索,模型评估,特征变量重要性。
首先对7791条客户数据,很好的学习机器学习的案例,流程很全面充足,自己换个数据也很方便套用。可以用于课程作业。
KYC 客户风险等级评分模型构建全流程
1. 项目背景与目标
KYC(了解你的客户)是金融机构风险管理的重要环节,通过对客户信息的分析评估其风险等级,有助于防范欺诈、洗钱等金融犯罪。本项目旨在构建一个客户风险等级评分模型,通过机器学习算法预测客户的风险等级,为金融机构提供决策支持。
# 导入必要的库
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
import warnings
warnings.filterwarnings('ignore')
# 设置中文显示
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
# ------------------------------------------------------
# 1. 数据读取
# ------------------------------------------------------
# 生成模拟数据(7791条客户数据)
np.random.seed(42)
n_samples = 7791
# 生成特征
data = pd.DataFrame({
# 基本信息
'年龄': np.random.randint(18, 70, n_samples),
'性别': np.random.choice(['男', '女'], n_samples),
'婚姻状况': np.random.choice(['已婚', '未婚', '离异', '丧偶'], n_samples),
'教育程度': np.random.choice(['小学', '初中', '高中', '大专', '本科', '研究生及以上'], n_samples),
# 财务信息
'月收入': np.random.lognormal(10, 0.8, n_samples).astype(int),
'账户余额': np.random.lognormal(10, 1.0, n_samples).astype(int),
'信用卡数量': np.random.randint(0, 6, n_samples),
'贷款次数': np.random.randint(0, 10, n_samples),
# 行为信息
'交易频率': np.random.randint(1, 30, n_samples),
'大额交易次数': np.random.randint(0, 10, n_samples),
'异地交易比例': np.random.uniform(0, 1, n_samples),
'职业稳定性': np.random.choice(['稳定', '一般', '不稳定'], n_samples),
# 历史风险信息
'逾期次数': np.random.randint(0, 5, n_samples),
'违约记录': np.random.choice([0, 1], n_samples, p=[0.9, 0.1]),
# 目标变量:风险等级(0-低风险,1-中风险,2-高风险)
'风险等级': np.where(
np.random.rand(n_samples) < 0.7, 0,
np.where(np.random.rand(n_samples) < 0.6, 1, 2)
)
})
# 为了使数据更真实,添加一些关联关系
data.loc[data['逾期次数'] > 2, '风险等级'] = data.loc[data['逾期次数'] > 2, '风险等级'].apply(
lambda x: max(x, 1)
)
data.loc[data['违约记录'] == 1, '风险等级'] = data.loc[data['违约记录'] == 1, '风险等级'].apply(
lambda x: max(x, 2)
)
print(f"数据集形状: {data.shape}")
print("\n前5行数据:")
print(data.head())
# ------------------------------------------------------
# 2. 探索性数据分析(EDA)
# ------------------------------------------------------
print("\n\n===== 探索性数据分析 =====")
# 2.1 数据基本信息
print("\n数据类型和缺失值情况:")
print(data.info())
# 2.2 描述性统计
print("\n数值型变量统计描述:")
print(data.describe())
# 2.3 目标变量分布
print("\n风险等级分布:")
risk_distribution = data['风险等级'].value_counts(normalize=True)
print(risk_distribution)
# 可视化目标变量分布
plt.figure(figsize=(8, 5))
sns.countplot(x='风险等级', data=data)
plt.title('客户风险等级分布')
plt.xlabel('风险等级 (0-低风险, 1-中风险, 2-高风险)')
plt.ylabel('客户数量')
plt.show()
# 2.4 数值型变量相关性分析
numeric_cols = data.select_dtypes(include=['int64', 'float64']).columns
corr_matrix = data[numeric_cols].corr()
plt.figure(figsize=(12, 10))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', fmt='.2f', linewidths=0.5)
plt.title('数值型变量相关性矩阵')
plt.show()
# 2.5 数值型变量与目标变量的关系
plt.figure(figsize=(15, 10))
for i, col in enumerate(['年龄', '月收入', '账户余额', '逾期次数']):
plt.subplot(2, 2, i+1)
sns.boxplot(x='风险等级', y=col, data=data)
plt.title(f'{col}与风险等级的关系')
plt.tight_layout()
plt.show()
# 2.6 分类型变量与目标变量的关系
plt.figure(figsize=(15, 10))
for i, col in enumerate(['性别', '婚姻状况', '教育程度', '职业稳定性']):
plt.subplot(2, 2, i+1)
crosstab = pd.crosstab(data[col], data['风险等级'], normalize='index')
crosstab.plot(kind='bar', stacked=True, ax=plt.gca())
plt.title(f'{col}与风险等级的关系')
plt.ylabel('比例')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
# ------------------------------------------------------
# 3. 数据预处理
# ------------------------------------------------------
print("\n\n===== 数据预处理 =====")
# 3.1 划分特征和目标变量
X = data.drop('风险等级', axis=1)
y = data['风险等级']
# 3.2 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
print(f"训练集大小: {X_train.shape}, 测试集大小: {X_test.shape}")
# 3.3 识别数值型和分类型特征
numeric_features = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
categorical_features = X.select_dtypes(include=['object']).columns.tolist()
print(f"数值型特征: {numeric_features}")
print(f"分类型特征: {categorical_features}")
# 3.4 异常值处理(使用IQR方法处理数值型特征)
def handle_outliers(df, columns):
df_copy = df.copy()
for col in columns:
q1 = df_copy[col].quantile(0.25)
q3 = df_copy[col].quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
# 截断异常值
df_copy[col] = df_copy[col].clip(lower_bound, upper_bound)
return df_copy
# 仅对训练集处理异常值,避免数据泄露
X_train_numeric = X_train[numeric_features]
X_train_numeric_cleaned = handle_outliers(X_train_numeric, numeric_features)
# 替换训练集中的数值型特征
X_train_cleaned = X_train.copy()
X_train_cleaned[numeric_features] = X_train_numeric_cleaned
# ------------------------------------------------------
# 4. 特征工程
# ------------------------------------------------------
print("\n\n===== 特征工程 =====")
# 4.1 创建新特征
def create_features(df):
df_new = df.copy()
# 财务比率特征
df_new['月收入/账户余额'] = df_new['月收入'] / (df_new['账户余额'] + 1) # 避免除零
df_new['贷款/信用卡比率'] = df_new['贷款次数'] / (df_new['信用卡数量'] + 1)
# 风险行为特征
df_new['风险行为指数'] = df_new['逾期次数'] * 2 + df_new['违约记录'] * 5
df_new['高风险交易指数'] = df_new['大额交易次数'] * df_new['异地交易比例']
# 年龄分组特征
df_new['年龄分组'] = pd.cut(
df_new['年龄'],
bins=[17, 30, 45, 60, 100],
labels=['青年', '中年', '中老年', '老年']
)
return df_new
# 为训练集和测试集创建新特征
X_train_fe = create_features(X_train_cleaned)
X_test_fe = create_features(X_test)
# 更新特征列表
numeric_features = X_train_fe.select_dtypes(include=['int64', 'float64']).columns.tolist()
categorical_features = X_train_fe.select_dtypes(include=['object', 'category']).columns.tolist()
print(f"添加新特征后 - 数值型特征: {len(numeric_features)}个")
print(f"添加新特征后 - 分类型特征: {len(categorical_features)}个")
# 4.2 特征预处理管道
# 数值型特征处理:填充缺失值 + 标准化
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
# 分类型特征处理:填充缺失值 + 独热编码
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
# 组合所有特征的处理
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
# ------------------------------------------------------
# 5. 模型构建与评估
# ------------------------------------------------------
print("\n\n===== 模型构建与评估 =====")
# 5.1 定义要比较的模型
models = {
'逻辑回归': LogisticRegression(max_iter=1000, multi_class='multinomial'),
'决策树': DecisionTreeClassifier(random_state=42),
'随机森林': RandomForestClassifier(random_state=42),
'梯度提升树': GradientBoostingClassifier(random_state=42)
}
# 5.2 交叉验证比较模型
print("\n各模型交叉验证准确率:")
cv_results = {}
for name, model in models.items():
# 创建包含预处理和模型的管道
pipeline = Pipeline(steps=[
('preprocessor', preprocessor),
('classifier', model)
])
# 5折交叉验证
cv_scores = cross_val_score(pipeline, X_train_fe, y_train, cv=5, scoring='accuracy')
cv_results[name] = cv_scores
print(f"{name}: {cv_scores.mean():.4f} (±{cv_scores.std():.4f})")
# 可视化交叉验证结果
plt.figure(figsize=(10, 6))
sns.boxplot(data=pd.DataFrame(cv_results).T)
plt.title('各模型交叉验证准确率分布')
plt.ylabel('准确率')
plt.xticks(rotation=45)
plt.show()
# 5.3 选择表现较好的随机森林进行超参数调优
print("\n===== 超参数调优 =====")
rf_pipeline = Pipeline(steps=[
('preprocessor', preprocessor),
('classifier', RandomForestClassifier(random_state=42))
])
# 定义参数网格
param_grid = {
'classifier__n_estimators': [100, 200, 300],
'classifier__max_depth': [None, 10, 20, 30],
'classifier__min_samples_split': [2, 5, 10],
'classifier__min_samples_leaf': [1, 2, 4]
}
# 网格搜索
grid_search = GridSearchCV(
estimator=rf_pipeline,
param_grid=param_grid,
cv=5,
n_jobs=-1,
scoring='accuracy',
verbose=1
)
grid_search.fit(X_train_fe, y_train)
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳交叉验证准确率: {grid_search.best_score_:.4f}")
# 5.4 最佳模型评估
best_model = grid_search.best_estimator_
y_pred = best_model.predict(X_test_fe)
y_pred_proba = best_model.predict_proba(X_test_fe)
print("\n===== 最佳模型测试集评估 =====")
print("混淆矩阵:")
print(confusion_matrix(y_test, y_pred))
print("\n分类报告:")
print(classification_report(y_test, y_pred))
# 可视化混淆矩阵
plt.figure(figsize=(8, 6))
sns.heatmap(confusion_matrix(y_test, y_pred), annot=True, fmt='d', cmap='Blues',
xticklabels=['低风险', '中风险', '高风险'],
yticklabels=['低风险', '中风险', '高风险'])
plt.title('混淆矩阵')
plt.xlabel('预测风险等级')
plt.ylabel('实际风险等级')
plt.show()
# ------------------------------------------------------
# 6. 特征重要性分析
# ------------------------------------------------------
print("\n\n===== 特征重要性分析 =====")
# 获取特征名称(处理后)
cat_features = best_model.named_steps['preprocessor'].transformers_[1][1]\
.named_steps['onehot'].get_feature_names_out(categorical_features)
all_features = list(numeric_features) + list(cat_features)
# 获取特征重要性(仅适用于树模型)
if hasattr(best_model.named_steps['classifier'], 'feature_importances_'):
importances = best_model.named_steps['classifier'].feature_importances_
indices = np.argsort(importances)[::-1]
# 打印前10个重要特征
print("前10个最重要的特征:")
for i in range(min(10, len(all_features))):
print(f"{all_features[indices[i]]}: {importances[indices[i]]:.4f}")
# 可视化特征重要性
plt.figure(figsize=(12, 8))
plt.barh(range(min(15, len(all_features))), importances[indices[:15]][::-1])
plt.yticks(range(min(15, len(all_features))), [all_features[i] for i in indices[:15]][::-1])
plt.title('特征重要性(前15名)')
plt.xlabel('重要性得分')
plt.tight_layout()
plt.show()
# ------------------------------------------------------
# 7. 模型应用:客户风险评分
# ------------------------------------------------------
print("\n\n===== 客户风险评分示例 =====")
# 计算风险概率并转换为评分(0-100分,分数越高风险越低)
risk_proba = best_model.predict_proba(X_test_fe)
risk_score = 100 - (np.argmax(risk_proba, axis=1) * 50 + np.max(risk_proba, axis=1) * 50)
# 随机选择5个客户展示结果
sample_indices = np.random.choice(len(X_test_fe), 5, replace=False)
sample_results = pd.DataFrame({
'客户ID': sample_indices,
'预测风险等级': np.argmax(risk_proba[sample_indices], axis=1),
'风险概率': np.max(risk_proba[sample_indices], axis=1).round(4),
'风险评分(0-100)': risk_score[sample_indices].round(1)
})
sample_results['风险等级说明'] = sample_results['预测风险等级'].map({
0: '低风险', 1: '中风险', 2: '高风险'
})
print(sample_results)
3. 流程说明
-
数据读取与准备:
- 生成了 7791 条模拟客户数据,包含基本信息、财务信息、行为信息等特征
- 目标变量为客户风险等级(0 - 低风险,1 - 中风险,2 - 高风险)
-
探索性数据分析 (EDA):
- 分析数据基本结构和缺失值情况
- 研究目标变量分布和特征相关性
- 通过可视化分析数值型和分类型特征与风险等级的关系
-
数据预处理:
- 划分训练集和测试集(8:2 比例)
- 使用 IQR 方法处理数值型特征的异常值
- 区分数值型和分类型特征,为后续处理做准备
-
特征工程:
- 创建有业务意义的新特征(如风险行为指数、高风险交易指数等)
- 构建特征预处理管道,包括缺失值填充、标准化和独热编码
-
模型构建与评估:
- 比较四种经典分类模型(逻辑回归、决策树、随机森林、梯度提升树)
- 使用 5 折交叉验证评估模型性能
- 对表现较好的随机森林进行超参数调优
- 在测试集上评估最佳模型,生成混淆矩阵和分类报告
-
特征重要性分析:
- 提取并可视化模型的特征重要性
- 识别对客户风险等级影响最大的因素
-
模型应用:
- 将模型预测结果转换为风险评分(0-100 分)
- 展示客户风险评分示例
4. 扩展说明
- 数据替换:您可以将代码中的模拟数据替换为实际的客户数据,只需确保特征含义与处理逻辑匹配
- 模型扩展:可以添加更多模型(如 XGBoost、LightGBM 等)进行比较
- 评分调整:风险评分的计算方式可以根据业务需求进行调整
- 特征优化:可以根据特征重要性结果筛选更有价值的特征,简化模型
更多推荐
所有评论(0)