集成方法的重要性
集成学习是机器学习中的重要技术,通过组合多个基学习器的预测结果来提升整体性能。集成方法能够有效减少过拟合、提高泛化能力,在许多实际应用中表现出色。Python的sklearn库提供了丰富的集成学习工具。本文将从基础的Bagging到高级的Stacking,全面介绍Python集成方法的最佳实践。
Bagging方法
1. 基础Bagging
def bagging_demo():
"""Bagging演示"""
print("=== Bagging方法 ===")
import numpy as np
import pandas as pd
from sklearn.datasets import make_classification, make_regression
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import BaggingClassifier, BaggingRegressor
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.svm import SVC, SVR
from sklearn.metrics import accuracy_score, mean_squared_error
import matplotlib.pyplot as plt
# 创建分类数据集
X_class, y_class = make_classification(
n_samples=1000,
n_features=20,
n_informative=15,
n_redundant=5,
n_classes=3,
random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X_class, y_class, test_size=0.3, random_state=42
)
print(f"训练集大小: {X_train.shape}")
print(f"测试集大小: {X_test.shape}")
# 1. 基础Bagging分类器
print("\n1. 基础Bagging分类器:")
def basic_bagging_classifier():
"""基础Bagging分类器"""
# 使用决策树作为基学习器
bagging_clf = BaggingClassifier(
base_estimator=DecisionTreeClassifier(random_state=42),
n_estimators=100,
max_samples=0.8, # 每个基学习器使用80%的样本
max_features=0.8, # 每个基学习器使用80%的特征
bootstrap=True, # 有放回采样
bootstrap_features=False, # 特征不放回采样
random_state=42,
n_jobs=-1
)
# 训练模型
bagging_clf.fit(X_train, y_train)
# 预测和评估
y_pred = bagging_clf.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f" Bagging分类器准确率: {accuracy:.3f}")
# 交叉验证评估
cv_scores = cross_val_score(bagging_clf, X_train, y_train, cv=5)
print(f" 交叉验证准确率: {cv_scores.mean():.3f} ± {cv_scores.std():.3f}")
# 基学习器数量对性能的影响
print(f" 基学习器数量影响分析:")
n_estimators_range = [10, 50, 100, 200, 500]
scores = []
for n_est in n_estimators_range:
bagging_temp = BaggingClassifier(
DecisionTreeClassifier(random_state=42),
n_estimators=n_est,
random_state=42,
n_jobs=-1
)
score = cross_val_score(bagging_temp, X_train, y_train, cv=3).mean()
scores.append(score)
print(f" {n_est}个基学习器: {score:.3f}")
return bagging_clf, scores
bagging_clf, n_est_scores = basic_bagging_classifier()
# 2. 不同基学习器的Bagging
print("\n2. 不同基学习器的Bagging:")
def different_base_estimators():
"""不同基学习器的Bagging"""
base_estimators = {
'DecisionTree': DecisionTreeClassifier(random_state=42),
'SVM': SVC(probability=True, random_state=42),
}
bagging_results = {}
for name, base_est in base_estimators.items():
bagging = BaggingClassifier(
base_estimator=base_est,
n_estimators=50,
random_state=42,
n_jobs=-1
)
score = cross_val_score(bagging, X_train, y_train, cv=3).mean()
bagging_results[name] = score
print(f" {name}基学习器: {score:.3f}")
return bagging_results
bagging_results = different_base_estimators()
return bagging_clf, bagging_results
bagging_results = bagging_demo()
2. 高级Bagging策略
def advanced_bagging_demo():
"""高级Bagging演示"""
print("\n=== 高级Bagging策略 ===")
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import BaggingClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import cross_val_score
# 创建数据集
X, y = make_classification(
n_samples=1000,
n_features=20,
n_informative=15,
n_redundant=5,
random_state=42
)
# 1. 特征和样本采样策略
print("1. 特征和样本采样策略:")
def sampling_strategies():
"""采样策略对比"""
strategies = {
'标准Bagging': {
'bootstrap': True,
'bootstrap_features': False,
'max_samples': 1.0,
'max_features': 1.0
},
'特征采样': {
'bootstrap': True,
'bootstrap_features': True,
'max_samples': 1.0,
'max_features': 0.8
},
'样本采样': {
'bootstrap': True,
'bootstrap_features': False,
'max_samples': 0.8,
'max_features': 1.0
},
'双重采样': {
'bootstrap': True,
'bootstrap_features': True,
'max_samples': 0.8,
'max_features': 0.8
}
}
results = {}
for strategy_name, params in strategies.items():
bagging = BaggingClassifier(
DecisionTreeClassifier(random_state=42),
n_estimators=50,
random_state=42,
n_jobs=-1,
**params
)
score = cross_val_score(bagging, X, y, cv=3).mean()
results[strategy_name] = score
print(f" {strategy_name}: {score:.3f}")
return results
sampling_results = sampling_strategies()
# 2. Bagging回归
print("\n2. Bagging回归:")
def bagging_regression():
"""Bagging回归"""
from sklearn.datasets import make_regression
from sklearn.ensemble import BaggingRegressor
from sklearn.tree import DecisionTreeRegressor
from sklearn.model_selection import cross_val_score
# 创建回归数据集
X_reg, y_reg = make_regression(
n_samples=1000,
n_features=20,
noise=0.1,
random_state=42
)
bagging_reg = BaggingRegressor(
DecisionTreeRegressor(random_state=42),
n_estimators=100,
random_state=42,
n_jobs=-1
)
score = cross_val_score(bagging_reg, X_reg, y_reg, cv=3, scoring='neg_mean_squared_error').mean()
print(f" Bagging回归MSE: {-score:.3f}")
return bagging_reg
bagging_reg = bagging_regression()
return sampling_results, bagging_reg
advanced_bagging_results = advanced_bagging_demo()
Boosting方法
1. AdaBoost
def adaboost_demo():
"""AdaBoost演示"""
print("\n=== AdaBoost方法 ===")
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import AdaBoostClassifier, AdaBoostRegressor
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.model_selection import cross_val_score, learning_curve
import matplotlib.pyplot as plt
# 创建数据集
X, y = make_classification(
n_samples=1000,
n_features=20,
n_informative=15,
n_redundant=5,
random_state=42
)
# 1. 基础AdaBoost
print("1. 基础AdaBoost:")
def basic_adaboost():
"""基础AdaBoost"""
# 使用决策树桩作为基学习器
ada_clf = AdaBoostClassifier(
base_estimator=DecisionTreeClassifier(max_depth=1),
n_estimators=100,
learning_rate=1.0,
algorithm='SAMME.R',
random_state=42
)
ada_clf.fit(X, y)
score = cross_val_score(ada_clf, X, y, cv=3).mean()
print(f" AdaBoost准确率: {score:.3f}")
print(f" 基学习器数量: {ada_clf.n_estimators}")
print(f" 学习率: {ada_clf.learning_rate}")
# 特征重要性
feature_importance = ada_clf.feature_importances_
top_features = np.argsort(feature_importance)[-5:]
print(f" 前5个重要特征: {top_features}")
print(f" 重要性分数: {feature_importance[top_features]}")
return ada_clf
ada_clf = basic_adaboost()
# 2. AdaBoost参数调优
print("\n2. AdaBoost参数调优:")
def adaboost_parameter_tuning():
"""AdaBoost参数调优"""
# 基学习器深度影响
depths = [1, 2, 3, 4, 5]
depth_scores = []
print(f" 基学习器深度影响:")
for depth in depths:
ada = AdaBoostClassifier(
DecisionTreeClassifier(max_depth=depth),
n_estimators=50,
random_state=42
)
score = cross_val_score(ada, X, y, cv=3).mean()
depth_scores.append(score)
print(f" 深度{depth}: {score:.3f}")
# 学习率影响
learning_rates = [0.1, 0.5, 1.0, 1.5, 2.0]
lr_scores = []
print(f" 学习率影响:")
for lr in learning_rates:
ada = AdaBoostClassifier(
DecisionTreeClassifier(max_depth=2),
n_estimators=50,
learning_rate=lr,
random_state=42
)
score = cross_val_score(ada, X, y, cv=3).mean()
lr_scores.append(score)
print(f" 学习率{lr}: {score:.3f}")
return depth_scores, lr_scores
depth_scores, lr_scores = adaboost_parameter_tuning()
return ada_clf, depth_scores, lr_scores
adaboost_results = adaboost_demo()
2. Gradient Boosting
def gradient_boosting_demo():
"""Gradient Boosting演示"""
print("\n=== Gradient Boosting方法 ===")
import numpy as np
from sklearn.datasets import make_classification, make_regression
from sklearn.ensemble import GradientBoostingClassifier, GradientBoostingRegressor
from sklearn.model_selection import cross_val_score, validation_curve
# 创建数据集
X_class, y_class = make_classification(
n_samples=1000,
n_features=20,
n_informative=15,
n_redundant=5,
random_state=42
)
X_reg, y_reg = make_regression(
n_samples=1000,
n_features=20,
noise=0.1,
random_state=42
)
# 1. Gradient Boosting分类
print("1. Gradient Boosting分类:")
def gradient_boosting_classification():
"""Gradient Boosting分类"""
gb_clf = GradientBoostingClassifier(
n_estimators=100,
learning_rate=0.1,
max_depth=3,
subsample=0.8,
max_features='sqrt',
random_state=42
)
gb_clf.fit(X_class, y_class)
score = cross_val_score(gb_clf, X_class, y_class, cv=3).mean()
print(f" Gradient Boosting准确率: {score:.3f}")
print(f" 基学习器数量: {gb_clf.n_estimators}")
print(f" 学习率: {gb_clf.learning_rate}")
print(f" 最大深度: {gb_clf.max_depth}")
print(f" 子采样比例: {gb_clf.subsample}")
# 特征重要性
feature_importance = gb_clf.feature_importances_
top_features = np.argsort(feature_importance)[-5:]
print(f" 前5个重要特征: {top_features}")
return gb_clf
gb_clf = gradient_boosting_classification()
# 2. Gradient Boosting回归
print("\n2. Gradient Boosting回归:")
def gradient_boosting_regression():
"""Gradient Boosting回归"""
gb_reg = GradientBoostingRegressor(
n_estimators=100,
learning_rate=0.1,
max_depth=3,
subsample=0.8,
max_features='sqrt',
random_state=42
)
gb_reg.fit(X_reg, y_reg)
score = cross_val_score(gb_reg, X_reg, y_reg, cv=3, scoring='neg_mean_squared_error').mean()
print(f" Gradient Boosting回归MSE: {-score:.3f}")
return gb_reg
gb_reg = gradient_boosting_regression()
# 3. 早停策略
print("\n3. 早停策略:")
def early_stopping_demo():
"""早停策略演示"""
# 使用早停防止过拟合
gb_early = GradientBoostingClassifier(
n_estimators=1000, # 设置较大的估计器数量
learning_rate=0.1,
max_depth=3,
subsample=0.8,
validation_fraction=0.2, # 20%数据用于验证
n_iter_no_change=10, # 10轮无改善则停止
random_state=42
)
gb_early.fit(X_class, y_class)
print(f" 早停后实际使用的估计器数量: {gb_early.n_estimators_}")
score = cross_val_score(gb_early, X_class, y_class, cv=3).mean()
print(f" 早停策略准确率: {score:.3f}")
return gb_early
gb_early = early_stopping_demo()
return gb_clf, gb_reg, gb_early
gradient_boosting_results = gradient_boosting_demo()
Stacking方法
1. 基础Stacking
def stacking_demo():
"""Stacking演示"""
print("\n=== Stacking方法 ===")
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import StackingClassifier, StackingRegressor
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.svm import SVC, SVR
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.model_selection import cross_val_score
# 创建数据集
X_class, y_class = make_classification(
n_samples=1000,
n_features=20,
n_informative=15,
n_redundant=5,
random_state=42
)
X_reg, y_reg = make_regression(
n_samples=1000,
n_features=20,
noise=0.1,
random_state=42
)
# 1. 基础Stacking分类
print("1. 基础Stacking分类:")
def basic_stacking_classification():
"""基础Stacking分类"""
# 定义基学习器
base_estimators = [
('rf', RandomForestClassifier(n_estimators=50, random_state=42)),
('svm', SVC(probability=True, random_state=42)),
('tree', DecisionTreeClassifier(random_state=42))
]
# 创建Stacking分类器
stacking_clf = StackingClassifier(
estimators=base_estimators,
final_estimator=LogisticRegression(random_state=42),
cv=5, # 5折交叉验证
stack_method='predict_proba', # 使用概率预测
n_jobs=-1
)
stacking_clf.fit(X_class, y_class)
score = cross_val_score(stacking_clf, X_class, y_class, cv=3).mean()
print(f" Stacking分类器准确率: {score:.3f}")
print(f" 基学习器数量: {len(stacking_clf.estimators_)}")
print(f" 最终估计器: {type(stacking_clf.final_estimator_).__name__}")
return stacking_clf
stacking_clf = basic_stacking_classification()
# 2. 基础Stacking回归
print("\n2. 基础Stacking回归:")
def basic_stacking_regression():
"""基础Stacking回归"""
# 定义基学习器
base_estimators = [
('rf', RandomForestRegressor(n_estimators=50, random_state=42)),
('svr', SVR()),
('tree', DecisionTreeRegressor(random_state=42))
]
# 创建Stacking回归器
stacking_reg = StackingRegressor(
estimators=base_estimators,
final_estimator=LinearRegression(),
cv=5,
n_jobs=-1
)
stacking_reg.fit(X_reg, y_reg)
score = cross_val_score(stacking_reg, X_reg, y_reg, cv=3, scoring='neg_mean_squared_error').mean()
print(f" Stacking回归器MSE: {-score:.3f}")
return stacking_reg
stacking_reg = basic_stacking_regression()
# 3. 多层Stacking
print("\n3. 多层Stacking:")
def multi_level_stacking():
"""多层Stacking"""
# 第一层基学习器
level1_estimators = [
('rf', RandomForestClassifier(n_estimators=50, random_state=42)),
('svm', SVC(probability=True, random_state=42))
]
# 第一层Stacking
level1_stacking = StackingClassifier(
estimators=level1_estimators,
final_estimator=LogisticRegression(random_state=42),
cv=3
)
# 第二层Stacking
level2_estimators = [
('level1', level1_stacking),
('tree', DecisionTreeClassifier(random_state=42))
]
multi_stacking = StackingClassifier(
estimators=level2_estimators,
final_estimator=LogisticRegression(random_state=42),
cv=3
)
multi_stacking.fit(X_class, y_class)
score = cross_val_score(multi_stacking, X_class, y_class, cv=3).mean()
print(f" 多层Stacking准确率: {score:.3f}")
return multi_stacking
multi_stacking = multi_level_stacking()
return stacking_clf, stacking_reg, multi_stacking
stacking_results = stacking_demo()
投票分类器
1. 硬投票和软投票
def voting_classifier_demo():
"""投票分类器演示"""
print("\n=== 投票分类器 ===")
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import VotingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
# 创建数据集
X, y = make_classification(
n_samples=1000,
n_features=20,
n_informative=15,
n_redundant=5,
random_state=42
)
# 1. 硬投票
print("1. 硬投票:")
def hard_voting():
"""硬投票"""
# 定义基学习器
estimators = [
('lr', LogisticRegression(random_state=42)),
('rf', RandomForestClassifier(n_estimators=50, random_state=42)),
('svm', SVC(random_state=42))
]
# 硬投票分类器
hard_voting_clf = VotingClassifier(
estimators=estimators,
voting='hard' # 硬投票
)
hard_voting_clf.fit(X, y)
score = cross_val_score(hard_voting_clf, X, y, cv=3).mean()
print(f" 硬投票准确率: {score:.3f}")
# 各基学习器性能
print(f" 各基学习器性能:")
for name, estimator in hard_voting_clf.named_estimators_.items():
individual_score = cross_val_score(estimator, X, y, cv=3).mean()
print(f" {name}: {individual_score:.3f}")
return hard_voting_clf
hard_voting_clf = hard_voting()
# 2. 软投票
print("\n2. 软投票:")
def soft_voting():
"""软投票"""
# 定义基学习器(需要支持概率预测)
estimators = [
('lr', LogisticRegression(random_state=42)),
('rf', RandomForestClassifier(n_estimators=50, random_state=42)),
('svm', SVC(probability=True, random_state=42)) # 启用概率预测
]
# 软投票分类器
soft_voting_clf = VotingClassifier(
estimators=estimators,
voting='soft' # 软投票
)
soft_voting_clf.fit(X, y)
score = cross_val_score(soft_voting_clf, X, y, cv=3).mean()
print(f" 软投票准确率: {score:.3f}")
return soft_voting_clf
soft_voting_clf = soft_voting()
# 3. 加权投票
print("\n3. 加权投票:")
def weighted_voting():
"""加权投票"""
# 定义基学习器和权重
estimators = [
('lr', LogisticRegression(random_state=42)),
('rf', RandomForestClassifier(n_estimators=50, random_state=42)),
('svm', SVC(probability=True, random_state=42))
]
# 加权软投票分类器
weighted_voting_clf = VotingClassifier(
estimators=estimators,
voting='soft',
weights=[0.3, 0.5, 0.2] # 自定义权重
)
weighted_voting_clf.fit(X, y)
score = cross_val_score(weighted_voting_clf, X, y, cv=3).mean()
print(f" 加权投票准确率: {score:.3f}")
print(f" 权重: [0.3, 0.5, 0.2]")
return weighted_voting_clf
weighted_voting_clf = weighted_voting()
return hard_voting_clf, soft_voting_clf, weighted_voting_clf
voting_results = voting_classifier_demo()
集成方法性能比较
1. 方法对比分析
def ensemble_methods_comparison():
"""集成方法对比分析"""
print("\n=== 集成方法性能比较 ===")
import numpy as np
import time
from sklearn.datasets import make_classification
from sklearn.ensemble import (
BaggingClassifier, AdaBoostClassifier, GradientBoostingClassifier,
StackingClassifier, VotingClassifier, RandomForestClassifier
)
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
# 创建数据集
X, y = make_classification(
n_samples=1000,
n_features=20,
n_informative=15,
n_redundant=5,
random_state=42
)
# 1. 集成方法性能对比
print("1. 集成方法性能对比:")
def performance_comparison():
"""性能对比"""
# 定义各种集成方法
ensemble_methods = {
'RandomForest': RandomForestClassifier(n_estimators=100, random_state=42),
'Bagging': BaggingClassifier(
DecisionTreeClassifier(random_state=42),
n_estimators=100, random_state=42
),
'AdaBoost': AdaBoostClassifier(
DecisionTreeClassifier(max_depth=2, random_state=42),
n_estimators=100, random_state=42
),
'GradientBoosting': GradientBoostingClassifier(
n_estimators=100, random_state=42
),
'Stacking': StackingClassifier(
estimators=[
('rf', RandomForestClassifier(n_estimators=50, random_state=42)),
('svm', SVC(probability=True, random_state=42))
],
final_estimator=LogisticRegression(random_state=42),
cv=3
),
'Voting': VotingClassifier(
estimators=[
('rf', RandomForestClassifier(n_estimators=50, random_state=42)),
('svm', SVC(probability=True, random_state=42)),
('lr', LogisticRegression(random_state=42))
],
voting='soft'
)
}
results = {}
for name, method in ensemble_methods.items():
print(f" 测试 {name}...")
start_time = time.time()
score = cross_val_score(method, X, y, cv=3).mean()
training_time = time.time() - start_time
results[name] = {
'score': score,
'time': training_time
}
print(f" {name}: 准确率={score:.3f}, 训练时间={training_time:.2f}秒")
# 性能排名
print(f"\n 性能排名 (按准确率):")
sorted_results = sorted(results.items(), key=lambda x: x[1]['score'], reverse=True)
for i, (name, result) in enumerate(sorted_results, 1):
print(f" {i}. {name}: {result['score']:.3f}")
return results
comparison_results = performance_comparison()
# 2. 集成方法选择建议
print("\n2. 集成方法选择建议:")
def ensemble_selection_guide():
"""集成方法选择指南"""
print(f" 集成方法选择指南:")
print(f" 1. Bagging方法:")
print(f" - 适合: 高方差模型,减少过拟合")
print(f" - 优点: 并行训练,稳定性好")
print(f" - 缺点: 可能欠拟合")
print(f" 2. Boosting方法:")
print(f" - 适合: 高偏差模型,减少欠拟合")
print(f" - 优点: 精度高,特征重要性")
print(f" - 缺点: 串行训练,容易过拟合")
print(f" 3. Stacking方法:")
print(f" - 适合: 异质基学习器组合")
print(f" - 优点: 灵活性高,性能优秀")
print(f" - 缺点: 计算复杂度高")
print(f" 4. 投票方法:")
print(f" - 适合: 快速集成,简单有效")
print(f" - 优点: 实现简单,解释性好")
print(f" - 缺点: 性能提升有限")
return True
selection_guide = ensemble_selection_guide()
return comparison_results, selection_guide
ensemble_comparison_results = ensemble_methods_comparison()
总结
集成方法的关键要点:
- Bagging:通过自助采样和特征采样减少方差,适合高方差模型
- Boosting:通过串行训练减少偏差,适合高偏差模型
- Stacking:通过元学习器组合基学习器,灵活性最高
- 投票方法:简单有效的集成策略,适合快速原型
- 参数调优:基学习器数量、学习率、采样策略等关键参数
- 方法选择:根据数据特点和计算资源选择合适的集成方法
- 性能评估:交叉验证、学习曲线等方法评估集成效果
掌握这些集成方法技能,可以显著提升机器学习模型的性能和稳定性,为复杂的数据科学项目提供强大的集成学习支持。
转载请注明:周志洋的博客 » Python机器学习-集成方法详解


