spark分布式运行xgboost

  • 数据集如下所示
    在这里插入图片描述
  • 程序完整代码
# coding=UTF-8
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /data/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-spark-0.90.jar,/data/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-0.90.jar pyspark-shell'

from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext

# conf = SparkConf().setMaster("local").setAppName("My App")
conf = SparkConf().setMaster("yarn").setAppName("pyspark_xgboost_yarn")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.appName('CalculatingGeoDistances').getOrCreate()
sqlContext = SQLContext(sparkContext=sc)

from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

# spark.sparkContext.addPyFile("hdfs:///tmp/rd/lp/sparkxgb.zip")

schema = StructType(
    [StructField("PassengerId", DoubleType()),
     StructField("Survived", DoubleType()),
     StructField("Pclass", DoubleType()),
     StructField("Name", StringType()),
     StructField("Sex", StringType()),
     StructField("Age", DoubleType()),
     StructField("SibSp", DoubleType()),
     StructField("Parch", DoubleType()),
     StructField("Ticket", StringType()),
     StructField("Fare", DoubleType()),
     StructField("Cabin", StringType()),
     StructField("Embarked", StringType())
     ])

df_raw = spark \
    .read \
    .option("header", "true") \
    .schema(schema) \
    .csv("train.csv")

df_raw.show(20)
df = df_raw.na.fill(0)

vectorAssembler = VectorAssembler() \
    .setInputCols(["Pclass", "Age", "SibSp", "Parch", "Fare"]) \
    .setOutputCol("features")

from sparkxgb import XGBoostClassifier

xgboost = XGBoostClassifier(
    featuresCol="features",
    labelCol="Survived",
    predictionCol="prediction",
    missing=0.0
)
pipeline = Pipeline(stages=[vectorAssembler, xgboost])
# randomSplit 随机分为测试集合训练集
trainDF, testDF = df.randomSplit([0.8, 0.2], seed=24)
trainDF.show(2)
print("************************开始训练****************************")
# 拟合模型
model = pipeline.fit(trainDF)

print("************************训练结束****************************")
print("************************开始预测****************************")

model.transform(testDF).select("PassengerId", "Survived", "prediction").show()

print("************************预测结束*****************************")
# 输出的所有结果
model.transform(testDF).show()
print("程序結束")

在这里插入图片描述

  • 遇到的问题

1.上面两个jar包必须放到spark-submit提交参数里面。os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /data/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-spark-0.90.jar,/data/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-0.90.jar pyspark-shell

2.将sparkxgb.zip 解压到python3 的包的安装目录里面,linux里面默认安装路径如下/usr/local/python3/lib/python3.6/site-packages

3.如果不想将sparkxgb.zip解压到python包的安装目录,不想把jar包放到Python代码里面可以。那么就可以使用spark shell首先要注释:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /data/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-spark-0.90.jar,/data/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-0.90.jar pyspark-shell

然后在linux里面运行如下spark shell命令:

spark-submit --master yarn --py-files /data/pycharm/zhanglong/pysparkxgboostnew/sparkxgb.zip --jars /data/pycharm/zhanglon/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-0.90.jar,/data/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-spark-0.90.jar /data/pycharm/zhanglong/pysparkxgboostnew/test.py

zip包和jar包需要指定到具体的位置。

4.spark 默认读取的csv文件在hdfs的 /user/root/ 目录下,运行前需要提前将train.csv文件上传到该目录下面。

如需sparkxgb.zip包和两个jar包和训练集可以Q:2316352792

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐