点击(此处)折叠或打开
-
import org.apache.spark.SparkConf;
-
import org.apache.spark.api.java.JavaPairRDD;
-
import org.apache.spark.api.java.JavaRDD;
-
import org.apache.spark.api.java.JavaSparkContext;
-
import org.apache.spark.api.java.function.Function;
-
import org.apache.spark.api.java.function.PairFunction;
-
import scala.Tuple2;
-
-
import java.io.Serializable;
-
import java.util.ArrayList;
-
import java.util.Collections;
-
import java.util.Comparator;
-
import java.util.List;
-
-
public class SecondarySort {
-
static List<Tuple2<Integer,Integer>> iterableToList(Iterable<Tuple2<Integer,Integer>> iterable) {
-
List<Tuple2<Integer,Integer>> list = new ArrayList<Tuple2<Integer,Integer>>();
-
for (Tuple2<Integer,Integer> item : iterable) {
-
list.add(item);
-
}
-
return list;
-
}
-
-
public static class SparkTupleComparator
-
implements Comparator<Tuple2<Integer, Integer>>, Serializable {
-
-
public static final SparkTupleComparator INSTANCE = new SparkTupleComparator();
-
-
private SparkTupleComparator() {
-
}
-
-
@Override
-
public int compare(Tuple2<Integer, Integer> t1, Tuple2<Integer, Integer> t2){
-
return t1._1.compareTo(t2._1);
-
}
-
}
-
public static void main(String []args){
-
-
SparkConf sparkConf = new SparkConf();
-
sparkConf.setMaster("local").setAppName("SecondarySort");
-
-
JavaSparkContext sc = new JavaSparkContext(sparkConf);
-
JavaRDD<String> lines = sc.textFile("E:\\tmp\\input\\secondarysort.txt");
-
-
//生成键值对
-
JavaPairRDD<String, Tuple2<Integer, Integer>> pairRDD = lines.mapToPair(new PairFunction<String, String, Tuple2<Integer, Integer>>() {
-
@Override
-
public Tuple2<String, Tuple2<Integer, Integer>> call(String s) throws Exception {
-
String[] values = s.split(",");
-
Tuple2<Integer, Integer> tupleValue = new Tuple2<Integer, Integer>(Integer.parseInt(values[1]), Integer.parseInt(values[2]));
-
Tuple2<String, Tuple2<Integer, Integer>> tuple = new Tuple2<String, Tuple2<Integer, Integer>>(values[0], tupleValue);
-
return tuple;
-
}
-
});
-
-
List<Tuple2<String, Tuple2<Integer, Integer>>> tuple2List = pairRDD.collect();
-
System.out.println("k-v list:");
-
for (Tuple2<String, Tuple2<Integer, Integer>> tuple:tuple2List){
-
System.out.println(tuple._1() +" "+ tuple._2()._1() +" "+ tuple._2()._2() );
-
}
-
-
//根据key进行group,并按key进行排序
-
JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>> groupBySortByKeyRDD=pairRDD.groupByKey().sortByKey();
-
List<Tuple2<String, Iterable<Tuple2<Integer, Integer>>>> groupList = groupBySortByKeyRDD.collect();
-
System.out.println("---------------sort by key result---------------");
-
for (Tuple2<String,Iterable<Tuple2<Integer, Integer>>> tuple:groupList){
-
for (Tuple2<Integer, Integer> value:tuple._2()){
-
System.out.println("key:" +tuple._1() +" value: " +value._1() +" "+ value._2());
-
}
-
}
-
-
//对value进行排序
-
JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>> sortRDD = groupBySortByKeyRDD.mapValues(
-
new Function<Iterable<Tuple2<Integer, Integer>>, Iterable<Tuple2<Integer, Integer>>>() {
-
@Override
-
public Iterable<Tuple2<Integer, Integer>> call(Iterable<Tuple2<Integer, Integer>> v1) throws Exception {
-
List<Tuple2<Integer, Integer>> newList = new ArrayList<Tuple2<Integer, Integer>>(SecondarySort.iterableToList(v1));
-
Collections.sort(newList, SparkTupleComparator.INSTANCE);
-
return newList;
-
}
-
});
-
-
-
List<Tuple2<String, Iterable<Tuple2<Integer, Integer>>>> sortList = sortRDD.collect();
-
System.out.println("---------------result---------------");
-
for (Tuple2<String,Iterable<Tuple2<Integer, Integer>>> tuple:sortList){
-
for (Tuple2<Integer, Integer> value:tuple._2()){
-
System.out.println( " key:" +tuple._1() +" value: "+value._1() +" "+ value._2());
-
}
-
}
-
- }