spark安装及入门笔记
spark介绍Spark是个通用的集群计算框架,通过将大量数据集计算任务分配到多台计算机上,提供高效内存计算。如果你熟悉Hadoop,那么你知道分布式计算框架要解决两个问题:如何分发数据和如何分发计算。Hadoop使用HDFS来解决分布式数据问题,MapReduce计算范式提供有效的分布式计算。类似的,Spark拥有多种语言的函数式编程API,提供了除map和reduce之外更多的运算符,这些
spark介绍
Spark是个通用的集群计算框架,通过将大量数据集计算任务分配到多台计算机上,提供高效内存计算。如果你熟悉Hadoop,那么你知道分布式计算框架要解决两个问题:如何分发数据和如何分发计算。Hadoop使用HDFS来解决分布式数据问题,MapReduce计算范式提供有效的分布式计算。类似的,Spark拥有多种语言的函数式编程API,提供了除map和reduce之外更多的运算符,这些操作是通过一个称作弹性分布式数据集(resilient distributed datasets, RDDs)的分布式数据框架进行的。
本质上,RDD是种编程抽象,代表可以跨机器进行分割的只读对象集合。RDD可以从一个继承结构(lineage)重建(因此可以容错),通过并行操作访问,可以读写HDFS或S3这样的分布式存储,更重要的是,可以缓存到worker节点的内存中进行立即重用。由于RDD可以被缓存在内存中,Spark对迭代应用特别有效,因为这些应用中,数据是在整个算法运算过程中都可以被重用。大多数机器学习和最优化算法都是迭代的,使得Spark对数据科学来说是个非常有效的工具。另外,由于Spark非常快,可以通过类似Python REPL的命令行提示符交互式访问。
Spark库本身包含很多应用元素,这些元素可以用到大部分大数据应用中,其中包括对大数据进行类似SQL查询的支持,机器学习和图算法,甚至对实时流数据的支持。
核心组件如下:
· Spark Core:包含Spark的基本功能;尤其是定义RDD的API、操作以及这两者上的动作。其他Spark的库都是构建在RDD和Spark Core之上的。
· Spark SQL:提供通过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每个数据库表被当做一个RDD,Spark SQL查询被转换为Spark操作。对熟悉Hive和HiveQL的人,Spark可以拿来就用。
· Spark Streaming:允许对实时数据流进行处理和控制。很多实时数据库(如Apache Store)可以处理实时数据。Spark Streaming允许程序能够像普通RDD一样处理实时数据。
· MLlib:一个常用机器学习算法库,算法被实现为对RDD的Spark操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。之前可选的大数据机器学习库Mahout,将会转到Spark,并在未来实现。
· GraphX:控制图、并行图操作和计算的一组算法和工具的集合。GraphX扩展了RDD API,包含控制图、创建子图、访问路径上所有顶点的操作。
spark2.0发布后,官网中将spark相关的第三方包放到libraries中 :https://spark-packages.org/,其中包含spark-als,mllib-grid-search等一些机器学习方法。
Spark mllib
MLlib 是spark的可以扩展的机器学习库,由以下部分组成:通用的学习算法和工具类,包括分类,回归,聚类,协同过滤,降维,当然也包括调优的部分。
· Data types
· Basic statistics (基本统计)
o summary statistics 概括统计
o correlations 相关性
o stratified sampling 分层取样
o hypothesis testing 假设检验
o random data generation 随机数生成
· Classification and regression (分类一般针对离散型数据而言的,回归是针对连续型数据的。本质上是一样的)
o linear models (SVMs, logistic regression, linear regression) 线性模型(支持向量机,逻辑回归,线性回归)
o naive Bayes 贝叶斯算法
o decision trees 决策树
o ensembles of trees (Random Forests and Gradient-Boosted Trees) 多种树(随机森林和梯度增强树)
· Collaborative filtering 协同过滤
o alternating least squares (ALS) (交替最小二乘法(ALS) )
· Clustering 聚类
o k-means k均值算法
· Dimensionality reduction (降维)
o singular value decomposition (SVD) 奇异值分解
o principal component analysis (PCA) 主成分分析
· Feature extraction and transformation 特征提取和转化
· Optimization (developer) 优化部分
o stochastic gradient descent 随机梯度下降
o limited-memory BFGS (L-BFGS) 短时记忆的BFGS (拟牛顿法中的一种,解决非线性问题)
Spark提供了四种语言的编程API,如下:
Finally, full API documentation is available in Scala, Java, Python and R.
RDD概念:
RDD,全称为Resilient Distributed Datasets(弹性分布式集合),是一个容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。同时,RDD还提供了一组丰富的操作(算子)来操作这些数据。算子包括Transformation(filter,map)和action,在这些操作中,诸如map、flatMap、filter等转换操作实现了monad模式,很好地契合了Scala的集合操作。除此之外,RDD还提供了诸如join、groupBy、reduceByKey等更为方便的操作(注意,reduceByKey是action,而非 transformation),以支持常见的数据运算。
RDD可以看做是一个数组,数据的元素分布在集群中的各个机器上。Spark操作的对象是RDD,算子操作会对RDD进行转换,当RDD遇到aciton算子时,将之前的所有算子形成一个有向无环图(DAG)RDD Graph,再在Spark中转化为Job,提交到集群执行。
RDD存储以partition为单位,和hdfs的block概念相同。(与hive 的partition概念不同,hive的是逻辑分区,比如按照日期分区)
参考:《Spark大数据处理-技术、应用于性能优化》
http://spark.apache.org/docs/latest/index.html
安装
1, 安装scala
下载scala地址www.scala-lang.org
解压tar -zxvf scala-2.11.7.tgz
配置scala环境变量,vi /etc/profile
#set the scala environment
export SCALA_HOME=/home/hadoop/Spark/scala-2.11.7
PATH=${SCALA_HOME}/bin:$PATH
使文件生效:source /etc/profile
2, 安装spark
Spark的安装和配置比较简单,
下载spark地址http://spark.apache.org/downloads.html
这里直接下载带有hadoop依赖的版本spark-*-bin-hadoop2.6.tgz,(spark-*-bin-without-hadoop是预编译的无hadoop的版本,需要手动将hadoop依赖jar包进行拷贝):
解压tar -zxvf spark-1.5.0-bin-hadoop2.6.tgz
配置conf/spark-env.sh文件:
cp spark-env.sh.template spark-env.sh
vi conf/spark-env.sh
export SCALA_HOME=/home/hadoop/Spark/scala-2.11.7 //necessary
export SPARK_WORKER_MEMORY=1g
export SPARK_MASTER_IP=192.168.40.128
export MASTER=spark://192.168.40.128:7077
参数SPARK_WORKER_MEMORY决定在每一个Worker节点上可用的最大内存,正价这个数值可以在内存中缓存更多的数据,但是一定要给Slave的操作系统和其他服务预留足够的内存,这里的单位可以使m,g,数值不能为小数,否则报错。配置MASTER,否则会造成Slave无法注册主机错误。
配置slaves文件:
cp slaves.template slaves
vi conf/slaves
ubuntu2
spark的master需要免密码登录到slaves上,所以保证安装了ssh:
[ubuntu1]: ssh ubuntu2
Welcome to Ubuntu 14.04.3 LTS (GNU/Linux 3.19.0-26-generic x86_64)
l Documentation: https://help.ubuntu.com/
System information disabled due to load higher than 1.0
Last login: Fri Sep 25 18:07:58 2015 from 192.168.40.1
hadoop@ubuntu2:~$ jps
6377 Jps
hadoop@ubuntu2:~$ ssh ubuntu1
hadoop@ubuntu1's password:
在所有worker上安装并配置Spark
既然master上的这个文件件已经配置好了,把它拷贝到所有的worker。注意,三台机器spark所在目录必须一致,因为master会登陆到worker上执行命令,master认为worker的spark路径与自己一样。
$ cd
$ scp -r spark-0.7.2 dev@slave01:~
3,启动集群
启动spark
./sbin/start-all.sh
关闭spark
./sbin/stop-all.sh
检测是否安装成功:
遇到问题,在 slave节点上,提示JAVA_HOME is not set,但是使用命令查看:
echo $JAVA_HOME是存在的。
这里解决方法是将export JAVA_HOME语句写到spark-env.sh文件中。
查看日志在logs下的.out文件中。
启动完hadoop后:
通过webUI查看集群状态:
Interactive Analysis with the Spark Shell
参考: http://spark.apache.org/docs/latest/quick-start.html Programming guidesàQuick Start
首先将README.rm文件上传到hadoop的/user/hadoop/目录下:
hadoop fs -put README.md /user/hadoop
Spark-shell help
运行./spark-shell --help 查看spark-shell命令帮助:
--master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
--deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
on one of the worker machines inside the cluster ("cluster")
(Default: client).
--class CLASS_NAME Your application's main class (for Java / Scala apps).
--name NAME A name of your application.
--jars JARS Comma-separated list of local jars to include on the driver
and executor classpaths.
--packages Comma-separated list of maven coordinates of jars to include
on the driver and executor classpaths. Will search the local
maven repo, then maven central and any additional remote
repositories given by --repositories. The format for the
coordinates should be groupId:artifactId:version.
--exclude-packages Comma-separated list of groupId:artifactId, to exclude while
resolving the dependencies provided in --packages to avoid
dependency conflicts.
--repositories Comma-separated list of additional remote repositories to
search for the maven coordinates given with --packages.
--py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
on the PYTHONPATH for Python apps.
--files FILES Comma-separated list of files to be placed in the working
directory of each executor.
--conf PROP=VALUE Arbitrary Spark configuration property.
--properties-file FILE Path to a file from which to load extra properties. If not
specified, this will look for conf/spark-defaults.conf.
--driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
--driver-java-options Extra Java options to pass to the driver.
--driver-library-path Extra library path entries to pass to the driver.
--driver-class-path Extra class path entries to pass to the driver. Note that
jars added with --jars are automatically included in the
classpath.
--executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
--proxy-user NAME User to impersonate when submitting the application.
--help, -h Show this help message and exit
--verbose, -v Print additional debug output
--version, Print the version of current Spark
Spark standalone with cluster deploy mode only:
--driver-cores NUM Cores for driver (Default: 1).
Spark standalone or Mesos with cluster deploy mode only:
--supervise If given, restarts the driver on failure.
--kill SUBMISSION_ID If given, kills the driver specified.
--status SUBMISSION_ID If given, requests the status of the driver specified.
Spark standalone and Mesos only:
--total-executor-cores NUM Total cores for all executors.
Spark standalone and YARN only:
--executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode,
or all available cores on the worker in standalone mode)
YARN-only:
--driver-cores NUM Number of cores used by the driver, only in cluster mode
(Default: 1).
--queue QUEUE_NAME The YARN queue to submit to (Default: "default").
--num-executors NUM Number of executors to launch (Default: 2).
--archives ARCHIVES Comma separated list of archives to be extracted into the
working directory of each executor.
--principal PRINCIPAL Principal to be used to login to KDC, while running on
secure HDFS.
--keytab KEYTAB The full path to the file that contains the keytab for the
principal specified above. This keytab will be copied to
the node running the Application Master via the Secure
Distributed Cache, for renewing the login tickets and the
delegation tokens periodically.
Python spark shell
这里执行python 的spark shell:
./bin/pyspark
执行:
textFile = sc.textFile("README.md")//默认是从/user/hadoop目录下读取的
>>> textFile.count()
15/09/28 16:18:42 INFO mapred.FileInputFormat: Total input paths to process : 1
15/09/28 16:18:49 INFO spark.SparkContext: Starting job: count at <stdin>:1
15/09/28 16:18:49 INFO scheduler.DAGScheduler: Got job 0 (count at <stdin>:1) with 2 output partitions
15/09/28 16:18:49 INFO scheduler.DAGScheduler: Final stage: ResultStage 0(count at <stdin>:1)
15/09/28 16:18:49 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/09/28 16:18:49 INFO scheduler.DAGScheduler: Missing parents: List()
15/09/28 16:18:49 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] at count at <stdin>:1), which has no missing parents
15/09/28 16:18:49 INFO storage.MemoryStore: ensureFreeSpace(6064) called with curMem=145215, maxMem=560497950
15/09/28 16:18:49 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 5.9 KB, free 534.4 MB)
15/09/28 16:18:49 INFO storage.MemoryStore: ensureFreeSpace(3559) called with curMem=151279, maxMem=560497950
15/09/28 16:18:49 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.5 KB, free 534.4 MB)
15/09/28 16:18:49 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.40.128:60254 (size: 3.5 KB, free: 534.5 MB)
15/09/28 16:18:49 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861
15/09/28 16:18:49 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (PythonRDD[2] at count at <stdin>:1)
15/09/28 16:18:49 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/09/28 16:18:49 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 192.168.40.129, ANY, 2157 bytes)
15/09/28 16:18:53 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.40.129:42679 (size: 3.5 KB, free: 534.5 MB)
15/09/28 16:18:57 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.40.129:42679 (size: 14.1 KB, free: 534.5 MB)
15/09/28 16:19:00 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 192.168.40.129, ANY, 2157 bytes)
15/09/28 16:19:00 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 10722 ms on 192.168.40.129 (1/2)
15/09/28 16:19:00 INFO scheduler.DAGScheduler: ResultStage 0 (count at <stdin>:1) finished in 10.779 s
15/09/28 16:19:00 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 109 ms on 192.168.40.129 (2/2)
15/09/28 16:19:00 INFO scheduler.DAGScheduler: Job 0 finished: count at <stdin>:1, took 10.941706 s
15/09/28 16:19:00 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
98
>>> textFile.first()
Programming guidesà Spark Programming guides
RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).
to add code.py to the search path (in order to later be able to import code), use:
$ ./bin/pyspark --master local[4] --py-files code.py
在programming guide的最后面,都会有下一步将要做什么的说明(Where to go from here),接下来运行一个py的实例:
For Python examples, use spark-submit instead:
./bin/spark-submit examples/src/main/python/pi.py
Scala spark shell
进入spark交互式界面:
./bin/spark-shell --master local[2]
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.0
/_/
Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_65)
Type in expressions to have them evaluated.
Type :help for more information.
16/01/29 18:39:13 WARN Utils: Your hostname, zhangwj-OptiPlex-3020 resolves to a loopback address: 127.0.1.1; using 10.12.167.46 instead (on interface eth0)
16/01/29 18:39:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Spark context available as sc.
16/01/29 18:39:15 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/01/29 18:39:15 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/01/29 18:39:18 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/01/29 18:39:18 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
16/01/29 18:39:19 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/01/29 18:39:20 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/01/29 18:39:22 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/01/29 18:39:22 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
3 17:08:09 INFO SessionState: Created local directory: /tmp/d1025668-13e4-4780-8b79-b97a1370525e_resources
16/03/03 17:08:09 INFO SessionState: Created HDFS directory: /tmp/hive/root/d1025668-13e4-4780-8b79-b97a1370525e
16/03/03 17:08:09 INFO SessionState: Created local directory: /tmp/root/d1025668-13e4-4780-8b79-b97a1370525e
16/03/03 17:08:09 INFO SessionState: Created HDFS directory: /tmp/hive/root/d1025668-13e4-4780-8b79-b97a1370525e/_tmp_space.db
16/03/03 17:08:09 INFO HiveContext: default warehouse location is /user/hive/warehouse
16/03/03 17:08:09 INFO HiveContext: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes.
SQL context available as sqlContext.
启动过程中可以看到一些默认配置,如sc,sqlContext这些可以在spark-shell直接使用,还可以看到一些路径配置信息等。
Spark-shell
官方文档: https://spark.apache.org/docs/latest/quick-start.html
Spark的操作都是针对RDD,下面是RDD的一些操作:
RDD CREATE
scala> val textFile = sc.textFile("~/Applications/spark-1.6.0-bin-hadoop2.6/README.md")
16/03/03 17:37:30 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 208.5 KB, free 1205.7 KB)
16/03/03 17:37:30 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 19.3 KB, free 1225.0 KB)
16/03/03 17:37:30 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:60573 (size: 19.3 KB, free: 511.0 MB)
16/03/03 17:37:30 INFO SparkContext: Created broadcast 6 from textFile at <console>:27
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at textFile at <console>:27
RDD ACTION
scala> textFile.count //First item in this RDD
Scala> textFile.first
RDD TRANSFORM
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
Transform+Action:
scala> textFile.filter(_.contains("Spark")).count() // How many lines contain "Spark"? //95
scala> textFile.map(_.split(" ").size).reduce( (a,b)=> if(a>b)a else b ) //14
上面的语句等价于下面:
scala> import java.lang.Mathimport java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))res5: Int = 15
Word Count程序
scala> val wordCounts = textFile.flatMap(_.split(" ")).map( word=>(word,1) ).reduceByKey( (a,b)=>a+b )
flatMap返回Seq(序列),map将每个元素转换为元组(word,1),reduceByKey将函数不断应用在value上
scala> wordCounts.collect; //转化为
scala> linesWithSpark.cache() //pulling data sets into a cluster-wide in-memory cache
scala> :paste
// Entering paste mode (ctrl-D to finish) //拷贝.scala代码
运行jar包
Spark-submit
1,简单使用spark自带examples jar包提交运行
jar包在lib/mllib目录下:
spark-examples-1.6.0-hadoop2.6.0.jar
查找jar包类的路径:
lib$ jar -ft spark-examples-1.6.0-hadoop2.6.0.jar | grep DecisionTreeClassificationExample
org/apache/spark/examples/ml/DecisionTreeClassificationExample.class //使用该类的路径
org/apache/spark/examples/ml/DecisionTreeClassificationExample$.class
org/apache/spark/examples/ml/JavaDecisionTreeClassificationExample.class
org/apache/spark/examples/mllib/DecisionTreeClassificationExample.class
org/apache/spark/examples/mllib/DecisionTreeClassificationExample$$anonfun$1.class
org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample$2.class
org/apache/spark/examples/mllib/DecisionTreeClassificationExample$.class
org/apache/spark/examples/mllib/DecisionTreeClassificationExample$$anonfun$2.class
org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample$1.class
org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.class
Jar的用法查看: jar -h
$ ../bin/spark-submit --master local[2] --class org.apache.spark.examples.ml.DecisionTreeClassificationExample spark-examples-1.6.0-hadoop2.6.0.jar
将会看spark执行结果,中间可能会报找不到数据文件的错误,根据给出的路径将data/mllib下的文件拷贝过去即可。
2,create Spark code project and submit to execute:
Spark source code中的examples module包含了spark各种各样的样例程序,基于这里面的代码作为参考。
新建maven项目,TestSpark,增加scala支持:-->add Framework Support-->scala,这时候就可以new scala class了:这里参照源码中的决策树使用的例子,写一个简单的决策树来分类(DecisionTreeClassificationExample.scala)的程序:
(1)添加spark依赖:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.10</artifactId>
<version>1.5.0</version>
</dependency>
</dependencies>
(2)新建object: testHello,将DecisionTreeClassificationExample.scala中的代码copy到main函数中,依赖的包会自动import进来:
package test001_hello
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.{SparkConf, SparkContext}
object testHello {
def main(args:Array[String]): Unit ={
val conf = new SparkConf().setAppName("DecisionTreeClassificationExample")
val sc = new SparkContext(conf)
// $example on$
// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a DecisionTree model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "gini"
val maxDepth = 5
val maxBins = 32
val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
impurity, maxDepth, maxBins)
// Evaluate model on test instances and compute test error
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count().toDouble / testData.count()
println("Test Error = " + testErr) //输出错误率
println("Learned classification tree model:\n" + model.toDebugString) //输出model
// 保存model
model.save(sc, "./target/tmp/myDecisionTreeClassificationModel")
//读取文件中保存的model
val sameModel = DecisionTreeModel.load(sc, "./target/tmp/myDecisionTreeClassificationModel")
}
}
同时,将所需要的数据文件copy到resources目录下,数据文件格式为:
0 128:51 129:159 130:253 131:159
1 159:124 160:253 161:255 162:63
可以看出第一列为label,后面的列都是 feature:sample的形式。
(3)Mvn Package,将代码打包。会发现生成的jar包中没有class文件。
注意:mvn clean package(包括compile)默认只处理java源代码的编译、打包,而不管scala。
要执行单个class可以直接在scala文件上compile xxx.scala,生成class文件后再package。要编译所有的scala文件,可以在mvn pom文件中添加如下scala插件,生成的jar包名指定为test
<build>
<finalName>test</finalName>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
(4)在spark中提交jar包,在spark安装目录下新建test目录,将jar包copy到该目录下: spark-1.6.0-bin-hadoop2.6/test$ \cp /home/zhangwj/MyProjects/TestSpark/target/test.jar .
执行: ../bin/spark-submit --master local[2] --class test001_hello.testHello test.jar
发生错误Input path does not exist: file:~/Applications/spark-1.6.0-bin-hadoop2.6/test/sample_libsvm_data.txt
,需要将数据文件copy到test目录下。
(5)再次执行../bin/spark-submit --master local[2] --class test001_hello.testHello test.jar>>result.txt,程序可以正常结束,结果被重定向到了result.txt文件:
Test Error = 0.03125 //可以看到错误率为3.125%
Learned classification tree model: //训练模型的结果
DecisionTreeModel classifier of depth 2 with 5 nodes
If (feature 434 <= 0.0)
If (feature 99 <= 0.0)
Predict: 0.0
Else (feature 99 > 0.0)
Predict: 1.0
Else (feature 434 > 0.0)
Predict: 1.0
可以看到test目录下生成了target文件夹,该文件夹下保存的是训练的模型。注意:如果结果输出的目录target已经存在,程序也会报错。
spark-submit用法:
>bin/spark-submit -h:
Usage: spark-submit [options] <app jar | python file> [app arguments]
Usage: spark-submit --kill [submission ID] --master [spark://...]
Usage: spark-submit --status [submission ID] --master [spark://...]
Options:
--master MASTER_URL spark://host:port, mesos://host:port, yarn, or local. // local[2]代表本地模式,开启两个线程。
--deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
on one of the worker machines inside the cluster ("cluster")
(Default: client).
--class CLASS_NAME Your application's main class (for Java / Scala apps).
--name NAME A name of your application.
--jars JARS Comma-separated list of local jars to include on the driver
and executor classpaths.
--packages Comma-separated list of maven coordinates of jars to include
on the driver and executor classpaths. Will search the local
maven repo, then maven central and any additional remote
repositories given by --repositories. The format for the
coordinates should be groupId:artifactId:version.
--exclude-packages Comma-separated list of groupId:artifactId, to exclude while
resolving the dependencies provided in --packages to avoid
dependency conflicts.
--repositories Comma-separated list of additional remote repositories to
search for the maven coordinates given with --packages.
--py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
on the PYTHONPATH for Python apps.
--files FILES Comma-separated list of files to be placed in the working
directory of each executor.
--conf PROP=VALUE Arbitrary Spark configuration property.
--properties-file FILE Path to a file from which to load extra properties. If not
specified, this will look for conf/spark-defaults.conf.
--driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
--driver-java-options Extra Java options to pass to the driver.
--driver-library-path Extra library path entries to pass to the driver.
--driver-class-path Extra class path entries to pass to the driver. Note that
jars added with --jars are automatically included in the
classpath.
--executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
--proxy-user NAME User to impersonate when submitting the application.
--help, -h Show this help message and exit
--verbose, -v Print additional debug output
--version, Print the version of current Spark
Spark standalone with cluster deploy mode only:
--driver-cores NUM Cores for driver (Default: 1).
Spark standalone or Mesos with cluster deploy mode only:
--supervise If given, restarts the driver on failure.
--kill SUBMISSION_ID If given, kills the driver specified.
--status SUBMISSION_ID If given, requests the status of the driver specified.
Spark standalone and Mesos only:
--total-executor-cores NUM Total cores for all executors.
Spark standalone and YARN only:
--executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode,
or all available cores on the worker in standalone mode)
YARN-only:
--driver-cores NUM Number of cores used by the driver, only in cluster mode
(Default: 1).
--queue QUEUE_NAME The YARN queue to submit to (Default: "default").
--num-executors NUM Number of executors to launch (Default: 2).
--archives ARCHIVES Comma separated list of archives to be extracted into the
working directory of each executor.
--principal PRINCIPAL Principal to be used to login to KDC, while running on
secure HDFS.
--keytab KEYTAB The full path to the file that contains the keytab for the
principal specified above. This keytab will be copied to
the node running the Application Master via the Secure
Distributed Cache, for renewing the login tickets and the
delegation tokens periodically.
修改日志输出:
log4j.properties在conf目录下: cp log4j.properties.template log4j.properties:
修改:log4j.rootCategory=INFO, console, FILE
在末尾添加:
log4j.appender.FILE=org.apache.log4j.RollingFileAppender
log4j.appender.FILE.Append=true
log4j.appender.FILE.File=../logs/log4jtest.log
log4j.appender.FILE.Threshold=INFO
log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
log4j.appender.FILE.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%5p] - %c -%F(%L) -%m%n
log4j.appender.FILE.MaxFileSize=10MB
IDEA本地执行&调试Spark Application方法
(1)引入jar包
运行的前提是将必要的jar包在Libraries中配置好,jar包在spark_home的lib目录下的spark-assembly-1.6.0-hadoop2.6.0.jar,在项目下新建目录lib,将jar包拷贝到目录下,右键-->Add as Library
(也可以Project structure-->Global Libraries中添加)
(2)修改代码中的连接配置代码:
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.{SparkConf, SparkContext}
object testDecisionTree {
def main(args:Array[String]): Unit ={
val conf = new SparkConf().setAppName("DecisionTreeClassificationExample")
conf.setMaster("local")
.setSparkHome(System.getenv("SPARK_HOME")) //master上spark安装目录
//setMaster设置spark master节点, setSparkHome设置spark_home,submit的时候会根据spark_home下配置文件信息将jar包分发到worker节点
val sc = new SparkContext(conf)
// $example on$
// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "src/main/resources/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a DecisionTree model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "gini"
val maxDepth = 5
val maxBins = 32
val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
impurity, maxDepth, maxBins)
// Evaluate model on test instances and compute test error
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count().toDouble / testData.count()
println("Test Error = " + testErr)
println("Learned classification tree model:\n" + model.toDebugString)
// Save and load model
model.save(sc, "./target/tmp/myDecisionTreeClassificationModel")
val sameModel = DecisionTreeModel.load(sc, "./target/tmp/myDecisionTreeClassificationModel")
// $example off$
}
}
(3)运行。
运行后发现出现如下错误:
Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
经过在网上搜索问题发现该问题是由于spark和scala版本兼容的问题,代码中使用的scala版本是系统中安装的scala2.11.7,而查看 spark源码中的pom文件发现使用的scala版本是2.10.5,因此首先需要将scala版本进行切换:
Project structure-->Global Libraries:
选中maven提供的2.10.5版本,apply,删除2.11.7版本。
再次运行,能够正常执行。
如果还想直接在IDEA中调试spark源码,进入spark的.class后,点击Attach Sources可以将下载的spark源码添加进去(测试可以)。
SparkConf设置master
1)Local模式:
conf.setMaster("local[2]") //本地模式,2个线程
2)Spark模式:
conf.setMaster("spark://zhangwj-OptiPlex3020:7077") //要启动spark server,master要和spark使用的主机名一致
3)Yarn模式:
conf.setMaster("yarn-client") //需要配置了HADOOP_CONF_DIR等hadoop环境变量。
conf.setMaster("yarn-cluster")
提交spark任务到yarn:
要求配置了HADOOP_CONF_DIR环境变量,会在该路径下读取yarn的配置。
两种模式:yarn-cluster、yarn-client
对于yarn-cluster模式,Spark driver运行在由Yarn 管理的AM(Application Master)中,而提交作业的客户端仅仅起到初始化的作用,初始化后则可以离开。
对于yarn-client模式,Spark driver运行在client端, 而AM仅用来向yarn申请资源。
用法:./bin/spark-submit --class path.to.your.Class --master yarn-cluster [options] <app jar> [app options]
Example:
$ ./bin/spark-submit --class GraphX.test001_Hello.testHello \
--master yarn-cluster \
--num-executors 3 \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
../test/spark-examples*.jar
Spark+HDFS
使用伪分布式hadoop,参考:Hadoop安装_单机_伪分布式配.html
将sample_libsvm_data.txt上传到hdfs目录/user/hadoop/mllib下,代码中只需修改:
val data = MLUtils.loadLibSVMFile(sc, "hdfs://localhost:9000/user/hadoop/mllib/sample_libsvm_data.txt") //read hdfs files
Run即可。
官方文档
Spark programming guide (spark shell)
https://spark.apache.org/docs/latest/quick-start.html
Spark document (root)
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.package
RDD
RDD提供的API接口参见文档:https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD
RDD transforms文档:https://spark.apache.org/docs/latest/programming-guide.html#actions
RDD actions文档:https://spark.apache.org/docs/latest/programming-guide.html#transformations
RDD的创建
1,从外部存储(sc.textFile),得到data都是RDD[T]类型的数据:
val data = sc.textFile("src/main/resources/test.txt")
val data = MLUtils.loadLibSVMFile(sc, "hdfs://localhost:9000/user/hadoop/mllib/sample_libsvm_data.txt")
sc.textFile可以读取:
文件:sc.textFile("test.txt")
目录:sc.textFile("testDir/")
递归读取文件:sc.textFile("testDir/*")
2,集合中创建RDD(sc.parallelize,sc.makeRDD):
集合中创建:Spark主要提供了两中函数:parallelize和makeRDD,makeRDD有两种实现,而且第一个makeRDD函数接收的参数和parallelize完全一致,实现方式也相同。第二种实现可以为数据提供位置信息。
代码如下:
scala> val rdd_list = sc.parallelize( List(1,2,3) )
rdd_list: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> val rdd_list2 = sc.makeRDD( List(1,2,3) ) //分区个数默认为固定为seq参数的size大小
rdd_list2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:27
scala> val seq = List((1, List("iteblog.com", "sparkhost1.com", "sparkhost2.com")),(2,List("iteblog.com", "sparkhost2.com")))
seq: List[(Int, List[String])] = List((1,List(iteblog.com, sparkhost1.com, sparkhost2.com)), (2,List(iteblog.com, sparkhost2.com)))
scala> val rdd_2 = sc.makeRDD(seq) //List中指定了分区个数
rdd_2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:29
scala> rdd_2.preferredLocations(rdd_2.partitions(0))
res0: Seq[String] = List(iteblog.com, sparkhost1.com, sparkhost2.com)
RDD提供的方法
除了查看RDD文档,还可以查看源码来看方法的具体调用及实现:
data.map: 将RDD中类型为T的元素一对一地映射为类型为U的元素
data.flatMap: 将集合中的每一个元素进行一对多转换
data.collect/toArray: Return an array that contains all of the elements in this RDD.以集合形式返回s。
data.take(num): 返回集合中0到num-1下标的元素
data.top(num):按默认或指定的排序规则,返回前num个元素
data.takeOrdered(num):以与top相反的规则,返回前num个元素
data.++(x): Return the union of this RDD and another one.
data.Intersection(x): Return the intersection of this RDD and another one.
data,substract(x): Return an RDD with the elements from this that are not in other.
data.zip(x): Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD, etc.
data.checkPoint: 将RDD持久化到HDFS中,会切断此RDD之前的依赖关系
data.persist: Persist this RDD with the default storage level (MEMORY_ONLY). 源码可知调用的是persist(StorageLevel),它可以设置
Storage level
data.cache: 缓存的内存/磁盘Persist this RDD with the default storage level (MEMORY_ONLY).
通过观察RDD.scala源代码即可知道cache实际调用的就是persist方法。
data.first:返回第一个元素
data.count:返回元素个数
data.reduce: Reduces the elements of this RDD using the specified commutative and associative binary operator.对集合中的元素进行二元运算。data.reduce(_+_) +-*/等。(scala reduceLeft)
data.filter(line => line.contains("Spark"))
data.partitions.size:查看分区数量
data.preferredLocations(p):对分区p返回每个数据块所在ip或机器名
data.dependencies:查看依赖,OneToOneDependency(窄)、ShuffleDependency(宽)
rdd.recuceByKey( (a,b)=>a+b ): (k,v)根据k相同的将values取和。(scala reduceLeft用法)
rdd.groupByKey(): (k,v)根据k相同的将values放到一个CompactBuffer(继承的Seq)中
rdd.groupByKey(new org.apache.spark.HashPartitioner(3))对group操作创建了新的分区操作(可以再join阶段有效的减少shuffle操作)
rdd.sortByKey(true):按照key升序排列(k,v),true表示升序
rdd.partitionBy(new org.apache.spark.hashPartitioner(3)):将ShuffledRDD重新分区
mapPartitions: 对每个分区map
zipPartitions:
repartition: 对RDD分区进行重新划分
coalesce:对RDD分区进行重新划分
RDD和DataFrame
RDD提供原始的API实现,DataFrame提供更高层pipeline实现
val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF()
Parallelize将Recored转换为RDD,RDD是集合类型,RDD[Double]存储的是Double类型分布式集合。
toDF函数将RDD转换为DataFrame
DataFrame提供了很多操作,如sql操作等,通常这些操作是RDD的高层调用。
PairRDDFunctions
(k,v)类型RDD Function在中:https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions
reduceByKey:
join:
randomSplit:
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)
SparkContext
提供的API接口见文档:https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext
sc.broadcast(index) //把数据同步到其他节点
sc.textFile("src/main/resources/people.txt").map(_.split(",")).map(p=>(p(0),p(1))
//p是每行,是个Array,可以通过p(index)来取得Array中的元素。
MLlib
Spark mllib官方文档:https://spark.apache.org/docs/latest/mllib-guide.html
mllib divides into two packages:
· spark.mllib contains the original API built on top of RDDs.
· spark.ml provides higher-level API built on top of DataFrames for constructing ML pipelines.(推荐)
Data Types - MLlib
Local vector(integer or double)
支持两种local vector: dense,sparse
import org.apache.spark.mllib.linalg.{Vector, Vectors}//需要引入mllib的Vector,否则使用的是scala默认Vector
// Create a dense vector (1.0, 0.0, 3.0).
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
//(3, [0, 2], [1.0, 3.0]) ,3元素个数,
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
Labeled point (double: classification,regression)
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
// Create a labeled point with a positive label and a dense feature vector.正类样本
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
// Create a labeled point with a negative label and a sparse feature vector.负类样本
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
Sparse data
稀疏矩阵以文本形式表示:
label index1:value1 index2:value2 ...
每行代表一个特征向量,特征索引升序,基数为0
MLUtils
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.util.MLUtils$
MLUtils包中提供很多文件操作(load & save & kfold):
loadLibSVMFile(6个重载)
loadVectors(2个重载)
loadLabeledPoints(2个重载)
loadLabeledData(1,加载的是(Label,Features))
kFold
saveLabeledData
ML方法建模: model = Method.train(trainingData, boostingStrategy),
模型保存: model.save(path)
后读取使用 MethodModel.load(path)
ML
Programming guide: https://spark.apache.org/docs/latest/ml-guide.html
The spark.ml package aims to provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines. Spark ML standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline, or workflow.
Main concepts in Pipelines
DataFrame: Spark ML uses DataFrame from Spark SQL as an ML dataset, which can hold a variety of data types. E.g., a DataFrame could have different columns storing text, feature vectors, true labels, and predictions.
Transformer: A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms DataFrame with features into a DataFrame with predictions.(如split操作)。A Transformer is an abstraction that includes feature transformers and learned models. Technically, a Transformer implements a method transform(), which converts one DataFrame into another, generally by appending one or more columns
Estimator: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm(通常所说的算法) is an Estimator which trains on a DataFrame and produces a model.
Technically, an Estimator implements a method fit(), which accepts a DataFrame and produces a Model, which is a Transformer. For example, a learning algorithm such as LogisticRegression is an Estimator, and calling fit() trains a LogisticRegressionModel, which is a Model and hence a Transformer(通常estimator调用fit方法来训练模型).
Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.In machine learning, it is common to run a sequence of algorithms to process and learn from data. E.g., a simple text document processing workflow might include several stages:
· Split each document’s text into words.
· Convert each document’s words into a numerical feature vector.
· Learn a prediction model using the feature vectors and labels.
Spark ML represents such a workflow as a Pipeline, which consists of a sequence of PipelineStages (Transformers and Estimators) to be run in a specific order.
Parameter: All Transformers and Estimators now share a common API for specifying parameters.Spark ML Estimators and Transformers use a uniform API for specifying parameters.
A Param is a named parameter with self-contained documentation. A ParamMap is a set of (parameter, value) pairs.
There are two main ways to pass parameters to an algorithm(有两种方式来设置算法的参数):
1. Set parameters for an instance(使用set方法). E.g., if lr is an instance of LogisticRegression, one could call lr.setMaxIter(10) to make lr.fit() use at most 10 iterations. This API resembles the API used in spark.mllib package.
2. Pass a ParamMap to fit() or transform()(放到ParamMap中). Any parameters in the ParamMap will override parameters previously specified via setter methods.
A Pipeline is specified as a sequence of stages, and each stage is either a Transformer or an Estimator(pipeline由一系列状态构成,每个状态是一个transformer/estimator).
·
Spark SQL可以与ML衔接,SQL得到的结果为RDD可以直接供RDD使用:
Spark SQL
DataFrames
A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.
DataFrame分布式数据集合,等价于关系型数据库中的table,提供丰富的操作功能。
Datasets
A Dataset is a new experimental interface added in Spark 1.6 that tries to provide the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.).
The unified Dataset API can be used both in Scala and Java. Python does not yet have support for the Dataset API, but due to its dynamic nature many of the benefits are already available (i.e. you can access the field of a row by name naturally row.columnName). Full python support will be added in a future release.
SQLContext
https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/SQLContext.html
Interoperating with RDDs
Spark SQL supports two different methods for converting existing RDDs into DataFrames. The first method uses reflection to infer the schema of an RDD that contains specific types of objects. This reflection based approach leads to more concise code and works well when you already know the schema while writing your Spark application.The second method for creating DataFrames is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows you to construct DataFrames when the columns and their types are not known until runtime.
(1)Inferring the Schema Using Reflection
The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as Sequences or Arrays. This RDD can be implicitly converted to a DataFrame and then be registered as a table. Tables can be used in subsequent SQL statements.
使用case class来推断schema
import sqlContext.implicits._
val people = sc.textFile("src/main/resources/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF
implicits是一个object ( 定义为:object implicits extends SQLImplicits ), SQLImplicits 是一个abstract class,SQLImplicits类中提供了一个方法:implicit def rddToDataFrameHolder[A <: Product : TypeTag](rdd: RDD[A]): DataFrameHolder,用于将RDD转换为DataFrameHolder对象。DataFrameHolder中提供了 toDF 方法可以返回 DataFrames对象
(2) Programmatically Specifying the Schema
When case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.
1. Create an RDD of Rows from the original RDD;
2. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
3. Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.
创建schema,然后将schema应用到RDD上
Data Sources
Spark SQL supports operating on a variety of data sources through the DataFrame interface. A DataFrame can be operated on as normal RDDs and can also be registered as a temporary table. Registering a DataFrame as a table allows you to run SQL queries over its data. This section describes the general methods for loading and saving data using the Spark Data Sources and then goes into specific options that are available for the built-in data sources.
Generic Load/Save Functions
In the simplest form, the default data source (parquet unless otherwise configured by spark.sql.sources.default) will be used for all operations.
默认的加载和存储数据格式为parquet。
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
Manually Specifying Options
手工指定加载的文件格式:
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
Run SQL on files directly
直接在文件上运行sql
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Save Modes
可以设置保存模式(saveMode):
df_parquet.write.format("parquet").mode("overwrite").save("target/tmp/sql/people.parquet")
|
Scala/Java |
Any Language |
Meaning |
|
SaveMode.ErrorIfExists (default) |
"error" (default) |
When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown. |
|
SaveMode.Append |
"append" |
When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data. |
|
SaveMode.Overwrite |
"overwrite" |
Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame. |
|
SaveMode.Ignore |
"ignore" |
Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a CREATE TABLE IF NOT EXISTS in SQL. |
Saving to Persistent Tables
HiveContext,DataFrames提供了savaAsTable的方法可以把表写到metastore中。
Parquet Files
Parquet is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data. When writing Parquet files, all columns are automatically converted to be nullable for compatibility reasons.
Parquet是列式存储格式。Spark sql能够读写parquet文件并且自动保存数据schema。
import sqlContext.implicits._
val people =sc.textFile("src/main/resources/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF
people.write.mode("overwrite").parquet("src/main/resources/people.parquet")
val parquetFile = sqlContext.read.parquet("src/main/resources/people.parquet") //得到的仍然是一个DataFrame
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
//注册为temp table后可以直接使用sql查询
val teenagers = sqlContext.sql("select name from parquetFile where age>=18 and age <=25") //查询的结果类型仍然是DataFrame
teenagers.map( p=> "name:"+p(0)).collect().foreach(println)
HiveContext
HiveContext,使用hive metastore作为元数据,并可以将dataframe中的数据saveAsTable存储到hive中。Sqlcontext不具备此功能。
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
//需要添加hive依赖包,否则会报class not found的错误
//可以在spark-shell中调试,默认的sqlContext使用的就是HiveContext
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
val data = sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
data.write.saveAsTable("tmp.data")
//如果hive该table存在,那么会报错,可以设置save的mode,参考DataFrameWriter API:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
data.write.mode("overwrite")
调试
方式一:
进入spark-shell调试:sudo ../bin/spark-shell --master local
Spark在local模式下默认hive存储目录为:
scala> sqlContext.sql("use default");
scala> sqlContext.sql("create table people(name string, age int)")
scala> sqlContext.sql("insert into people select count(*) from people")
//执行insert into people values("aaa",12)会报错,因为hive不支持行级别更新
scala> sqlContext.sql("select * from people")
方式二:
将上面代码打包,spark-submit提交执行:
./spark-submit --master local[4] --class SQLTest.test003_sql.testSql test.jar
注意submit的时候spark server要开启。
Spark on hive
配置spark连接hive数据源:
(1)查看hive能否正确连接mysql,并且hive/lib目录下包含mysql-connector-java*.jar驱动。
(2) 编辑 HIVE_HOME/conf/hive-site.xml,增加如下内容:
<property>
<name>hive.metastore.uris</name>
<value>thrift://master:9083</value>
<description>Thrift uri for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
(3)将 $HIVE_HOME/lib/mysql-connector-java-5.1.12.jar copy或者软链到$SPARK_HOME/lib/
ln -s ../../apache-hive-1.2.1-bin/conf/hive-site.xml .
(4)启动hive metastore: hive --service metastore &
(5)启动spark-sql:./bin/spark-sql
sqlContext.sql("select * from testdb_001.employees sort by age limit 5")
Spark读取hive metadata及data,执行引擎为hive。
UDF (结合hive)
UDAF group
Spark streaming
Programming guide
https://spark.apache.org/docs/latest/streaming-programming-guide.html
Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.
Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of RDDs.(Spark Streaming中使用Dstream作为input stream的抽象)
需要引入的依赖
Similar to Spark, Spark Streaming is available through Maven Central. To write your own Spark Streaming program, you will have to add the following dependency to your SBT or Maven project.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.0</version>
</dependency>
For ingesting data from sources like Kafka, Flume, and Kinesis that are not present in the Spark Streaming core API, you will have to add the corresponding artifact spark-streaming-xyz_2.10 to the dependencies. For example, some of the common ones are as follows.
|
Source |
Artifact |
|
Kafka |
spark-streaming-kafka_2.10 |
|
Flume |
spark-streaming-flume_2.10 |
|
Kinesis |
spark-streaming-kinesis-asl_2.10 [Amazon Software License] |
|
|
spark-streaming-twitter_2.10 |
|
ZeroMQ |
spark-streaming-zeromq_2.10 |
|
MQTT |
spark-streaming-mqtt_2.10 |
支持的data source:
Spark Streaming provides two categories of built-in streaming sources.
Basic sources: Sources directly available in the StreamingContext API. Examples: file systems, socket connections, and Akka actors.
Advanced sources: Sources like Kafka, Flume, Kinesis, Twitter, etc. are available through extra utility classes. These require linking against extra dependencies as discussed in the linking section.
Streaming Context
API Streaming Context: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext
创建SreamingContext既可以从SparkConf也可以从SparkContext
val conf = new SparkConf().setAppName("testStreaming").setMaster("local[2]") //不能用local/local[1]
/* When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL, where n > number of receivers to run */
// 从 SparkConf 创建 StreamingContext
import org.apache.spark.streaming._
val ssc = new StreamingContext(conf,Seconds(3))
或者
// A StreamingContext object can also be created from an existing SparkContext object. 从
val ssc2 = new StreamingContext(sc,Seconds(3)
After a context is defined, you have to do the following.
1. Define the input sources by creating input DStreams.
2. Define the streaming computations by applying transformation and output operations to DStreams.
3. Start receiving data and processing it using streamingContext.start().
4. Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
5. The processing can be manually stopped using streamingContext.stop().
需要注意:
1. Once a context has been started, no new streaming computations can be set up or added to it.
2. Once a context has been stopped, it cannot be restarted.
3. Only one StreamingContext can be active in a JVM at the same time.
4. stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
5. A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.
6. to running on a cluster, the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process it.
Discretized Streams (DStreams)
Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs, which is Spark’s abstraction of an immutable, distributed dataset.Each RDD in a DStream contains data from a certain interval, as shown in the following figure.
Transformations on DStreams
和RDD操作类似,transformations操作运行对Dstream数据进行修改。DStreams support many of the transformations available on normal Spark RDD’s. Some of the common ones are as follows.
|
Transformation |
Meaning |
|
map(func) |
Return a new DStream by passing each element of the source DStream through a function func. |
|
flatMap(func) |
Similar to map, but each input item can be mapped to 0 or more output items. |
|
filter(func) |
Return a new DStream by selecting only the records of the source DStream on which func returns true. |
|
repartition(numPartitions) |
Changes the level of parallelism in this DStream by creating more or fewer partitions. |
|
union(otherStream) |
Return a new DStream that contains the union of the elements in the source DStream and otherDStream. |
|
count() |
Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. |
|
reduce(func) |
Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel. |
|
countByValue() |
When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. |
|
reduceByKey(func, [numTasks]) |
When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
|
join(otherStream, [numTasks]) |
When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key. |
|
cogroup(otherStream, [numTasks]) |
When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples. |
|
transform(func) |
Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream. |
|
updateStateByKey(func) |
Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key. |
updateStateByKey函数可以根据func累加streaming的结果。
GraphX
Spark GraphX是一个分布式图处理框架,Spark GraphX基于Spark平台提供对图计算和图挖掘简洁易用的而丰富多彩的接口,极大的方便了大家对分布式图处理的需求,如社交网络中人与人之间有很多关系链。
The Property Graph
在Spark GraphX中的Graph其实是Property Graph,也就是说图的每个顶点和边都是有属性的。
The property graph is a directed multigraph with user defined objects attached to each vertex and edge. A directed multigraph is a directed graph with potentially multiple parallel edges sharing the same source and destination vertex.
左图中为3的顶点的名称为rxin,是学生stu.,5这个顶点是franlin,是一个prof.,5到3表明5是3的Advisor,上图中蓝色的表示的是 相应顶点的Property,而黄色橙黄色部分表示的边的Property,边和顶点都是有ID的,对于顶点而言有自身的ID,而对于边来说有 SourceID和DestinationID,即对于边而言会有两个ID来表达从哪个顶点出发到哪个顶点结束,来表明边的方向,这就是Property Graph的表示方法;如果把Property反映到表上的话,例如我们在Vertex Table中Id为的3的Property就是(rxin, student),而在Edge Table中3到7表明的边的Property是Collaborator的关系,2到5是Colleague的关系;更为重要的是Property Graph和Table之间是可以相互转换的,在GraphX中所有操作的基础是table operator和graph operator,,其继承自Spark中的RDD,都是针对集合进行操作。
更多推荐
所有评论(0)