spark入门知识(二)
上传文件打开文件方法打开文件并进行操作---------transform---------mapfilterflatMapsample(withreplacemant,fraciton,seed) 采样:从100个元素中取20%的数据,随机选20%括号中为替换的数据union(otherdataset)distinct:去重group...
上传文件
打开文件方法
打开文件并进行操作
---------transform---------
map
filter
flatMap
sample(withreplacemant,fraciton,seed) 采样:从100个元素中取20%的数据,随机选20%
括号中为替换的数据
union(otherdataset)
distinct:去重
groupByKey 把相同的key 分到同一个组(相对于不同的节点来说)
(hello,5)
(hello,6)
变成=》(hello,[5,6])
reduceByKey(func,numtasks) //提供一种运算方法
sortByKey() 按key排序
join //两个RDD进行相交
eg:(hello,5)RDD1
(hello,6)RDD2
join的结果就是(hello,[5,6])
cogroup
cartestian 求笛卡尔集
scala:
val disFile =sc.textFile("README,md") //distFile is a collection of lines ,collecte尽量不要出现
distFile.map(1=>1.split(" ")).collect()
//l是形式参数,表示RDD的每一个元素,对其进行分割
//对文本中的每一行 按空格进行分割
distFile.flatMap(l=>l.split(" ")).collect() //把数据分成一对多的关系
//eg:hello world
//map =>hello
world
//flatMap =>{hello,world}
java 不能传递函数对象
----------action-------------//对数据进行实际操作
reduce(func)//两个参数 返回一个结果 ,满足交换律的操作
collect()把每个节点的运行结果传回给diver
count()统计RDD里面元素的个数
first()返回数据集中的第一个元素
take(n)返回数据集中的前n个元素
takesD //挑元素返回
countByKey()按照关键词进行计数 //耗时
foreach(func)对买一个数据元素 施加f()函数操作 //改操作结束后要写入磁盘,很耗时
//n个节点中 “hello”的个数,每个节点统计完后,将结果 交给diver
scala
val f=sc.textFile("Readme.md")
val words=f.flatMap(l=>l.split(" ").map(word
word.reduceByKey(_+_).collect.foreach(println)
广播:
val broadcastVar =sc.broadcast(Array(1,2,3))
broadcastVar.value
accumulators 做加一的运算,每个节点都有accumulators,最后汇总到diver
//accumulators 的结果,每个节点(worker)不能读,只能写。可以读的diver
scala:
val accum = sc.accumulator(0)
sc.parallelize(Array(1,2,3,4)).foreach(x=>accum+=x)//worker代码
accum.value //diver代码
val pair =(a,b)
pair._1 //第一个元素
pair._2 //第二个元素
求π :圆内部元素与外部元素的比值
val n=100000
val count=spark.parallelize()
机器学习:
GraphX:
图中的顶点数据
图的边的数据
代表顶点2到顶点4有一条边(有向边),2是边属性
val nodeRDD: RDD[(Long, Peep)] = sc.parallelize(nodeArray) //把顶点对象变成RDD
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
//把边对象变成RDD
val g: Graph[Peep, Int] = Graph(nodeRDD, edgeRDD)
//定义一个名叫g的图,它也是一个RDD
val results = g.triplets.filter(t => t.attr > 7) for (triplet <- results.collect) {
println(s"${triplet.srcAttr.name} loves ${triplet.dstAttr.name}")
//把大于7的过滤出来(即删除)
//triplet既有顶点属性也有边的属性
//eg ((5,34)(4,23)9)==>称为triplet
重要程度代表权、权重*分数,才是有效分
val graph=GraphLoader.edgeListFile(" ") //把边加进去
val prGraph =graph.joinVertices(graph.outDegrees) //把顶点加到图里面
// Implement and Run PageRank
val pageRank =
prGraph.pregel(initialMessage = 0.0, iter = 10)(
(oldV, msgSum) => 0.15 + 0.85 * msgSum, //传入的三个函数,计算
triplet => triplet.src.pr / triplet.src.deg, //发送消息
(msgA, msgB) => msgA + msgB) //把两个消息汇总
// Get the top 20 pages
pageRank.vertices.top(20)(Ordering.by(_._2)).foreach(println)
//pregel ?
spark 机器学习库
1. Read data.
2. Preprocess or prepare data for processing.
3. Extract features.
4. Split data for training, validation, and testing.
5. Train a model with a training dataset.
6. Tune a model using cross-validation techniques.
7. Evaluate a model over a test dataset.
8. Deploy a model.
1。读取数据。
2。进行预处理或准备数据进行处理。
3。提取特征。
4。分离数据训练、验证和测试
5。火车模型的训练数据集。
6。使用交叉验证技术优化模型。
7。评估模型
找数据分割线
总误差为成本函数(目标是让成本函数最小,即求成本函数的最小值)
梯度(最陡的方向)▽表示
梯度下降法
wx-b-y = f(成本函数)
w*表达最佳值
训练集合(取80%)
得到模型的过程称为训练
学习 最好的地方是官网
把所有的训练的数据作用一次函数 叫做一次训练
评估:对预测结果进行评价
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()
逻辑回归----分类器 ----求函数最优解
把握算法很关键
读取文件---分训练数据和测试数据---逻辑回归--跑训练数据--测试数据 来测试结果---计算精确度----保存模型
vectors.dense 生成一个矢量(数组)
MSE--评估方法
cluster聚类 //http://spark.apache.org/docs/latest/mllib-clustering.html
K-mean 一种重要的方法 //用k个类把数据划分出来
//k=3,有三个类。
步骤:1、猜,任选三个点
2、从剩下的里面找,计算他们和三个点的空间距离。和谁更近,就和谁归为一类。
3、新地址等于类中所有点的地址的平均值
4、以此类推,直到所有的点都在三个类里面
距离表示了相似的程度
al data = sc.textFile("data/mllib/kmeans_data.txt")val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()// Cluster the data into two classes using KMeansval numClusters = 2val numIterations = 20val clusters = KMeans.train(parsedData, numClusters, numIterations)// Evaluate clustering by computing Within Set Sum of Squared Errorsval WSSSE = clusters.computeCost(parsedData)println("Within Set Sum of Squared Errors = " + WSSSE)// Save and load modelclusters.save(sc, "myModelPath")val sameModel = KMeansModel.load(sc, "myModelPath")
//--------------------------------------------------------------------------------------------
val data = sc.textFile("data/mllib/gmm_data.txt")val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()// Cluster the data into two classes using GaussianMixtureval gmm = new GaussianMixture().setK(2).run(parsedData)// Save and load modelgmm.save(sc, "myGMMModel")val sameModel = GaussianMixtureModel.load(sc, "myGMMModel")// output parameters of max-likelihood modelfor (i <- 0 until gmm.k) { println("weight=%f\nmu=%s\nsigma=\n%s\n" format (gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))}
//----------------------------------------------------------------------------------
val data = sc.textFile("data/mllib/pic_data.txt")val similarities = data.map { line => val parts = line.split(' ') (parts(0).toLong, parts(1).toLong, parts(2).toDouble)}// Cluster the data into two classes using PowerIterationClusteringval pic = new PowerIterationClustering() .setK(2) .setMaxIterations(10)val model = pic.run(similarities)model.assignments.foreach { a => println(s"${a.id} -> ${a.cluster}")}
//--------------------------------------------------------------------------------------
推荐系统:Recommendation
数据挖掘最重要的是找特征:算法可以随手拈来
用户的数据就是用户特征
a1=(4,4,2)
a2=(1,3,2)
a3=(4,2,1)
a4=(1,2,2)
a5=(2,3,2)
a6=(4,3,1)
找用户的相似度,即计算用户矢量间的距离即可。根据相似可做推算
矩阵划分:一个矩阵可以分为多个矩阵之和
ALS:交替的最小二乘法
case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble)
把文件中的数据按规定的方式存好
rank = 10 基本能量保证为10
掌握linux和python
//----------------------------------------------------------------------------------------
关系数据库---关系的集合
表--是大量的数据的集合
spark里的管理者叫 sqlContext
..createDataFrame(data,!['name',!'age']) //创建一张表
df.join(df2,!'name') //name 相同的进行join操作
df.join(df2,'name','left_outer')! 左边表
更多推荐
所有评论(0)