combineByKey 算子

函数功能:

聚合各分区的元素,而每个元素都是二元组。功能与基础RDD函数aggregate()差不多,可让用户返回与输入数据类型不同的返回值。

combineByKey函数的每个参数分别对应聚合操作的各个阶段。所以,理解此函数对Spark如何操作RDD会有很大帮助。

参数解析:

createCombiner:分区内 创建组合函数

mergeValue:分区内 合并值函数

mergeCombiners:多分区 合并组合器函数

partitioner:自定义分区数,默认为HashPartitioner

mapSideCombine:是否在map端进行Combine操作,默认为true

示例:

假如现有 男,李四 男,张三 女,韩梅梅 女,李思思 男,马云 这样五对数据,我想把它统计成

男,([李四, 张三,马云],3) 女,([韩梅梅, 李思思],2) 这样格式的数据,我们最简便的就是用combineByKey算子了,废话不多说 上代码。

import cn.hutool.core.collection.CollectionUtil;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

import java.util.ArrayList;

import java.util.Arrays;

import java.util.List;

/**

* @author cao kangle

* @Type CombineByKey.java

* @Desc 使用不同的返回类型合并具有相同键的值

* @date 2020/5/25 16:21

*/

public class CombineByKey {

public static void main(String[] args) {

SparkConf conf=new SparkConf().setMaster("local").setAppName("CombineByKey");

JavaSparkContext sparkContext=new JavaSparkContext(conf);

JavaRDD> parallelize = sparkContext.parallelize(Arrays.asList(new Tuple2<>("男", "李四"), new Tuple2<>("男", "张三"), new Tuple2<>("女", "韩梅梅"), new Tuple2<>("女", "李思思"), new Tuple2<>("男", "马云")));

parallelize.foreach(new VoidFunction>() {

@Override

public void call(Tuple2 stringStringTuple2) throws Exception {

System.out.println(stringStringTuple2._1+","+stringStringTuple2._2);

}

});

JavaPairRDD pairRDD = parallelize.mapToPair(new PairFunction, String, String>() {

@Override

public Tuple2 call(Tuple2 xx) throws Exception {

return new Tuple2<>(xx._1, xx._2);

}

});

JavaPairRDD, Integer>> combineRDD = pairRDD.combineByKey(new Function, Integer>>() {

//createCombiner:分区内 创建组合函数 每个分区内 遇到的第一个之前没有遇到过的元素 走这个方法

@Override

public Tuple2, Integer> call(String s) throws Exception {

List list = new ArrayList<>();

list.add(s);

return new Tuple2<>(list, 1);

}

}, new Function2, Integer>, String, Tuple2, Integer>>() {

//分区内 合并值函数

@Override

public Tuple2, Integer> call(Tuple2, Integer> listIntegerTuple2, String s) throws Exception {

List list = listIntegerTuple2._1;

list.add(s);

int x = listIntegerTuple2._2 + 1;

return new Tuple2<>(list, x);

}

}, new Function2, Integer>, Tuple2, Integer>, Tuple2, Integer>>() {

//多分区 合并组合器函数

@Override

public Tuple2, Integer> call(Tuple2, Integer> listIntegerTuple2, Tuple2, Integer> listIntegerTuple22) throws Exception {

List list = listIntegerTuple2._1;

list.addAll(listIntegerTuple22._1);

int x = listIntegerTuple2._2 +listIntegerTuple22._2;

return new Tuple2<>(list, x);

}

},

//自定义分区数,默认为HashPartitioner

2);

combineRDD.foreach(new VoidFunction, Integer>>>() {

@Override

public void call(Tuple2, Integer>> stringTuple2Tuple2) throws Exception {

System.out.println( stringTuple2Tuple2._1+","+stringTuple2Tuple2._2);

}

});

}

}

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐