Python3 连接spark,spark集群 [亲测]
·
pyspark 基础中文教程
点击???? 链接
Pyspark 中文教程
1. 连接spark
1.1. 简单连接spark
from <span class="wp_keywordlink_affiliate"><a href="https://www.168seo.cn/tag/pyspark" title="View all posts in pyspark" target="_blank">pyspark</a></span>.sql import SparkSession <span class="wp_keywordlink_affiliate"><a href="https://www.168seo.cn/tag/spark" title="View all posts in spark" target="_blank">spark</a></span>=SparkSession \ .builder \ .appName('my_first_app_name') \ .getOrCreate()
|
1
2
3
4
5
6
7
|
from pyspark . sql import SparkSession
spark = SparkSession \
. builder \
. appName ( 'my_first_app_name' ) \
. getOrCreate ( )
|
1.2. 连接spark集群
# 使支持hive spark = SparkSession \ .builder \ .enableHiveSupport() \ .master("xxx.xxx.xxx.xxx:7077") \ .appName("my_first_app_name") \ .getOrCreate()
|
1
2
3
4
5
6
7
8
|
# 使支持hive
spark = SparkSession \
. builder \
. enableHiveSupport ( ) \
. master ( "xxx.xxx.xxx.xxx:7077" ) \
. appName ( "my_first_app_name" ) \
. getOrCreate ( )
|
1.3. 集群python环境
如果需要在集群中使用指定的python版本(系统默认是2.6),如python3.5,那么就需要在每个节点都安装python3.5,而且将python3.5的目录增加到spark的环境变量中。
或者在python程序中指定也行。
import os os.environ['SPARK_HOME'] = '/usr/local/workspace/spark-2.1.0-bin-hadoop2.7' os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3.5' os.environ['PYSPARK_DRIVER_PYTHON']='python3' from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .enableHiveSupport()\ .master("xxx.xxx.xxx.xxx:7077")\ .appName("my_first_app_name")\ .getOrCreate()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import os
os . environ [ 'SPARK_HOME' ] = '/usr/local/workspace/spark-2.1.0-bin-hadoop2.7'
os . environ [ 'PYSPARK_PYTHON' ] = '/usr/local/bin/python3.5'
os . environ [ 'PYSPARK_DRIVER_PYTHON' ] = 'python3'
from pyspark . sql import SparkSession
spark = SparkSession \
. builder \
. enableHiveSupport ( ) \
. master ( "xxx.xxx.xxx.xxx:7077" ) \
. appName ( "my_first_app_name" ) \
. getOrCreate ( )
|
1.4. config参数
# 在连接spark的时候,还可以添加其他参数,用config from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .enableHiveSupport()\ .master("xxx.xxx.xxx.xxx:7077")\ .appName("my_first_app_name")\ .config('spark.some.config.option','value') \ .config('spark.some.config.option','value') \ ... .getOrCreate()
|
1
2
3
4
5
6
7
8
9
10
11
12
|
# 在连接spark的时候,还可以添加其他参数,用config
from pyspark . sql import SparkSession
spark = SparkSession \
. builder \
. enableHiveSupport ( ) \
. master ( "xxx.xxx.xxx.xxx:7077" ) \
. appName ( "my_first_app_name" ) \
. config ( 'spark.some.config.option' , 'value' ) \
. config ( 'spark.some.config.option' , 'value' ) \
. . .
. getOrCreate ( )
|
2. 提交作业
提交作业可以通过上面的方法,先连接spark,然后使用spark session做各种操作。另一种就是通过submit方式将.py文件提交到spark集群。
特别的,如果使用python虚拟环境,可以通过PYSPARK_DRIVER_PYTHON,PYSPARK_PYTHON 指定。
# 提交spark作业 PYSPARK_DRIVER_PYTHON=/opt/anaconda3/envs/xxljob/bin/python \ PYSPARK_PYTHON=/opt/anaconda3/envs/xxljob/bin/python \ /usr/local/workspace/spark-2.1.0-bin-hadoop2.7/bin/spark-submit \ --master yarn \ #也可以是 yarn-client,yarn-cluster --queue ai \ --num-executors 12 \ --driver-memory 30g \ --executor-cores 4 \ --executor-memory 32G /tmp/test_spark.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
# 提交spark作业
PYSPARK_DRIVER_PYTHON = / opt / anaconda3 / envs / xxljob / bin / python \
PYSPARK_PYTHON = / opt / anaconda3 / envs / xxljob / bin / python \
/ usr / local / workspace / spark - 2.1.0 - bin - hadoop2 . 7 / bin / spark - submit \
-- master yarn \ #也可以是 yarn-client,yarn-cluster
-- queue ai \
-- num - executors 12 \
-- driver - memory 30g \
-- executor - cores 4 \
-- executor - memory 32G
/ tmp / test_spark . py
|
更多推荐
所有评论(0)