上传文件

 

打开文件方法

打开文件并进行操作

---------transform---------

map

filter

flatMap

sample(withreplacemant,fraciton,seed) 采样:从100个元素中取20%的数据,随机选20%

括号中为替换的数据

union(otherdataset)

distinct:去重

 

groupByKey 把相同的key 分到同一个组(相对于不同的节点来说)

(hello,5)

(hello,6)

变成=》(hello,[5,6])

 

reduceByKey(func,numtasks) //提供一种运算方法

sortByKey() 按key排序

join //两个RDD进行相交

eg:(hello,5)RDD1

(hello,6)RDD2

join的结果就是(hello,[5,6])

 

cogroup

cartestian 求笛卡尔集

 

scala:

val disFile =sc.textFile("README,md") //distFile is a collection of lines ,collecte尽量不要出现

distFile.map(1=>1.split(" ")).collect()

//l是形式参数,表示RDD的每一个元素,对其进行分割

//对文本中的每一行 按空格进行分割

distFile.flatMap(l=>l.split(" ")).collect() //把数据分成一对多的关系

//eg:hello world

//map =>hello

world

//flatMap =>{hello,world}

 

java 不能传递函数对象

 

 

----------action-------------//对数据进行实际操作

reduce(func)//两个参数 返回一个结果 ,满足交换律的操作

collect()把每个节点的运行结果传回给diver

count()统计RDD里面元素的个数

first()返回数据集中的第一个元素

take(n)返回数据集中的前n个元素

takesD //挑元素返回

countByKey()按照关键词进行计数 //耗时

foreach(func)对买一个数据元素 施加f()函数操作 //改操作结束后要写入磁盘,很耗时

//n个节点中 “hello”的个数,每个节点统计完后,将结果 交给diver

 

scala

val f=sc.textFile("Readme.md")

val words=f.flatMap(l=>l.split(" ").map(word

word.reduceByKey(_+_).collect.foreach(println)

 

广播:

val broadcastVar =sc.broadcast(Array(1,2,3))

broadcastVar.value

accumulators 做加一的运算,每个节点都有accumulators,最后汇总到diver

//accumulators 的结果,每个节点(worker)不能读,只能写。可以读的diver

 

scala:

val accum = sc.accumulator(0)

sc.parallelize(Array(1,2,3,4)).foreach(x=>accum+=x)//worker代码

accum.value //diver代码

 

val pair =(a,b)

pair._1 //第一个元素

pair._2 //第二个元素

 

求π :圆内部元素与外部元素的比值

val n=100000

val count=spark.parallelize()

 

机器学习:

GraphX:

 

图中的顶点数据

图的边的数据

代表顶点2到顶点4有一条边(有向边),2是边属性

 

val nodeRDD: RDD[(Long, Peep)] = sc.parallelize(nodeArray) //把顶点对象变成RDD

val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)

//把边对象变成RDD

val g: Graph[Peep, Int] = Graph(nodeRDD, edgeRDD)

//定义一个名叫g的图,它也是一个RDD

val results = g.triplets.filter(t => t.attr > 7) for (triplet <- results.collect) {

println(s"${triplet.srcAttr.name} loves ${triplet.dstAttr.name}")

//把大于7的过滤出来(即删除)

//triplet既有顶点属性也有边的属性

//eg ((5,34)(4,23)9)==>称为triplet

 

 

 

重要程度代表权、权重*分数,才是有效分

val graph=GraphLoader.edgeListFile(" ") //把边加进去

val prGraph =graph.joinVertices(graph.outDegrees) //把顶点加到图里面

// Implement and Run PageRank

val pageRank =

prGraph.pregel(initialMessage = 0.0, iter = 10)(

(oldV, msgSum) => 0.15 + 0.85 * msgSum, //传入的三个函数,计算

triplet => triplet.src.pr / triplet.src.deg, //发送消息

(msgA, msgB) => msgA + msgB) //把两个消息汇总

// Get the top 20 pages

pageRank.vertices.top(20)(Ordering.by(_._2)).foreach(println)

//pregel ?

 

 

spark 机器学习库

1. Read data.

2. Preprocess or prepare data for processing.

3. Extract features.

4. Split data for training, validation, and testing.

5. Train a model with a training dataset.

6. Tune a model using cross-validation techniques.

7. Evaluate a model over a test dataset.

8. Deploy a model.

1。读取数据。

2。进行预处理或准备数据进行处理。

3。提取特征。

4。分离数据训练、验证和测试

5。火车模型的训练数据集。

6。使用交叉验证技术优化模型。

7。评估模型

 

 

找数据分割线

总误差为成本函数(目标是让成本函数最小,即求成本函数的最小值)

梯度(最陡的方向)▽表示

梯度下降法

wx-b-y = f(成本函数)

w*表达最佳值

训练集合(取80%)

得到模型的过程称为训练

 

学习 最好的地方是官网

把所有的训练的数据作用一次函数 叫做一次训练

 

评估:对预测结果进行评价

val metrics = new BinaryClassificationMetrics(scoreAndLabels)

val auROC = metrics.areaUnderROC()

 

逻辑回归----分类器 ----求函数最优解

把握算法很关键

读取文件---分训练数据和测试数据---逻辑回归--跑训练数据--测试数据 来测试结果---计算精确度----保存模型

 

vectors.dense 生成一个矢量(数组)

MSE--评估方法

 

cluster聚类 //http://spark.apache.org/docs/latest/mllib-clustering.html

K-mean 一种重要的方法 //用k个类把数据划分出来

//k=3,有三个类。

步骤:1、猜,任选三个点

2、从剩下的里面找,计算他们和三个点的空间距离。和谁更近,就和谁归为一类。

3、新地址等于类中所有点的地址的平均值

4、以此类推,直到所有的点都在三个类里面

距离表示了相似的程度

 

al data = sc.textFile("data/mllib/kmeans_data.txt")val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()// Cluster the data into two classes using KMeansval numClusters = 2val numIterations = 20val clusters = KMeans.train(parsedData, numClusters, numIterations)// Evaluate clustering by computing Within Set Sum of Squared Errorsval WSSSE = clusters.computeCost(parsedData)println("Within Set Sum of Squared Errors = " + WSSSE)// Save and load modelclusters.save(sc, "myModelPath")val sameModel = KMeansModel.load(sc, "myModelPath")

//--------------------------------------------------------------------------------------------

val data = sc.textFile("data/mllib/gmm_data.txt")val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()// Cluster the data into two classes using GaussianMixtureval gmm = new GaussianMixture().setK(2).run(parsedData)// Save and load modelgmm.save(sc, "myGMMModel")val sameModel = GaussianMixtureModel.load(sc, "myGMMModel")// output parameters of max-likelihood modelfor (i <- 0 until gmm.k) { println("weight=%f\nmu=%s\nsigma=\n%s\n" format (gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))}

//----------------------------------------------------------------------------------

val data = sc.textFile("data/mllib/pic_data.txt")val similarities = data.map { line => val parts = line.split(' ') (parts(0).toLong, parts(1).toLong, parts(2).toDouble)}// Cluster the data into two classes using PowerIterationClusteringval pic = new PowerIterationClustering() .setK(2) .setMaxIterations(10)val model = pic.run(similarities)model.assignments.foreach { a => println(s"${a.id} -> ${a.cluster}")}

 

//--------------------------------------------------------------------------------------

 

 

推荐系统:Recommendation

数据挖掘最重要的是找特征:算法可以随手拈来

用户的数据就是用户特征

a1=(4,4,2)

a2=(1,3,2)

a3=(4,2,1)

a4=(1,2,2)

a5=(2,3,2)

a6=(4,3,1)

找用户的相似度,即计算用户矢量间的距离即可。根据相似可做推算

 

矩阵划分:一个矩阵可以分为多个矩阵之和

ALS:交替的最小二乘法

 

case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble)

把文件中的数据按规定的方式存好

rank = 10 基本能量保证为10

 

掌握linux和python

 

//----------------------------------------------------------------------------------------

关系数据库---关系的集合

表--是大量的数据的集合

 

spark里的管理者叫 sqlContext

..createDataFrame(data,!['name',!'age']) //创建一张表

 

df.join(df2,!'name') //name 相同的进行join操作

 

df.join(df2,'name','left_outer')! 左边表

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐