1、导入sparkcontext

from pyspark import SparkContext

2、生成sparkcontext对象

可以选择指定master,appName

sc = SparkContext()

3、生成rdd数据

3-1 将Python数据转为rdd

rdd_python = sc.parallelize([1, 2, 3, 4])

3-1 文件数据转为rdd

['hadoop,hive,spark,flink', 'sql,python,hadoop,spark', 'spark,java,scala,java']

rdd_file = sc.textFile('/data/data.txt')

4、调用transformation算子对数据转化计算

x 数结构rdd中每个元素数据,元素是是什么类型,就进行什么类型的计算操作

rdd_map_python = rdd_python.map(lambda x: x + 1)

对文件中的字符串数据进行切割获取每一个单词数据

rdd_map_file = rdd_file.map(lambda x: x.split(','))

将rdd中每个列表元素中单词拆分成一个一个单词,放入一个rdd中

rdd_flatMap_file = rdd_map_file.flatMap(lambda x: x)

对数据进行过滤 过滤字符串长度大于的数据

rdd_filter_file = rdd_flatMap_file.filter(lambda x: len(x) > 3)

数据去重

rdd_distinct_file = rdd_flatMap_file.distinct()

将数据转为 k-v

rdd_map_kv_file = rdd_flatMap_file.map(lambda x: (x, 1))

k-v数据处理

分组

rdd_groupByKey_file = rdd_map_kv_file.groupByKey()

x接受value值

rdd_mapValues_file = rdd_groupByKey_file.mapValues(lambda x: list(x))

排序

rdd_sortByKey_file = rdd_map_kv_file.sortByKey()

聚合

rdd_reduceByKey_file = rdd_map_kv_file.reduceByKey(lambda x, y: x + y)

sortBy

rdd_sortBy_file = rdd_reduceByKey_file.sortBy(lambda x:x[1])

5、调用action算执行计算任务

查看rdd的所有数据

print(rdd_map_python.collect())
print(rdd_file.collect())
print(rdd_map_file.collect())
print(rdd_flatMap_file.collect())
print(rdd_filter_file.collect())
print(rdd_distinct_file.collect())
print(rdd_map_kv_file.collect())
print(rdd_groupByKey_file.collect())
print(rdd_mapValues_file.collect())
print(rdd_sortByKey_file.collect())
print(rdd_reduceByKey_file.collect())
print(rdd_sortBy_file.collect())

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐