spark-rdd实例
x 数结构rdd中每个元素数据,元素是是什么类型,就进行什么类型的计算操作。可以选择指定master,appName。
1、导入sparkcontext
from pyspark import SparkContext
2、生成sparkcontext对象
可以选择指定master,appName
sc = SparkContext()
3、生成rdd数据
3-1 将Python数据转为rdd
rdd_python = sc.parallelize([1, 2, 3, 4])
3-1 文件数据转为rdd
['hadoop,hive,spark,flink', 'sql,python,hadoop,spark', 'spark,java,scala,java']
rdd_file = sc.textFile('/data/data.txt')
4、调用transformation算子对数据转化计算
x 数结构rdd中每个元素数据,元素是是什么类型,就进行什么类型的计算操作
rdd_map_python = rdd_python.map(lambda x: x + 1)
对文件中的字符串数据进行切割获取每一个单词数据
rdd_map_file = rdd_file.map(lambda x: x.split(','))
将rdd中每个列表元素中单词拆分成一个一个单词,放入一个rdd中
rdd_flatMap_file = rdd_map_file.flatMap(lambda x: x)
对数据进行过滤 过滤字符串长度大于的数据
rdd_filter_file = rdd_flatMap_file.filter(lambda x: len(x) > 3)
数据去重
rdd_distinct_file = rdd_flatMap_file.distinct()
将数据转为 k-v
rdd_map_kv_file = rdd_flatMap_file.map(lambda x: (x, 1))
k-v数据处理
分组
rdd_groupByKey_file = rdd_map_kv_file.groupByKey()
x接受value值
rdd_mapValues_file = rdd_groupByKey_file.mapValues(lambda x: list(x))
排序
rdd_sortByKey_file = rdd_map_kv_file.sortByKey()
聚合
rdd_reduceByKey_file = rdd_map_kv_file.reduceByKey(lambda x, y: x + y)
sortBy
rdd_sortBy_file = rdd_reduceByKey_file.sortBy(lambda x:x[1])
5、调用action算执行计算任务
查看rdd的所有数据
print(rdd_map_python.collect())
print(rdd_file.collect())
print(rdd_map_file.collect())
print(rdd_flatMap_file.collect())
print(rdd_filter_file.collect())
print(rdd_distinct_file.collect())
print(rdd_map_kv_file.collect())
print(rdd_groupByKey_file.collect())
print(rdd_mapValues_file.collect())
print(rdd_sortByKey_file.collect())
print(rdd_reduceByKey_file.collect())
print(rdd_sortBy_file.collect())
更多推荐
所有评论(0)