Spark学习笔记--计算平均值

1600阅读 0评论2018-11-04 zpf1218
分类:大数据

需要注意:
1、计数类需要继承Serializable实现序列化
2、聚合函数aggregate需要3个参数,第一个参数初始化值,第二个参数累计个数和数值,第三个参数累计个数和数值总数

点击(此处)折叠或打开

  1. import org.apache.spark.SparkConf;
  2. import org.apache.spark.api.java.JavaRDD;
  3. import org.apache.spark.api.java.JavaSparkContext;
  4. import org.apache.spark.api.java.function.Function2;

  5. import java.io.Serializable;
  6. import java.util.Arrays;

  7. public class BasicAvg {
  8.     public static class CountAvg implements Serializable {
  9.         private int num=0;
  10.         private int total;
  11.        public CountAvg(int num,int total){
  12.             this.num=num;
  13.             this.total=total;
  14.         }
  15.         public int avg(){
  16.             return total/num;
  17.         }
  18.         public int getTotal() {
  19.             return total;
  20.         }

  21.         public void setTotal(int total) {
  22.             this.total = total;
  23.         }

  24.         public int getNum() {
  25.             return num;
  26.         }

  27.         public void setNum(int num) {
  28.             this.num = num;
  29.         }

  30.     }
  31.     public static void main(String []args){
  32.         SparkConf conf = new SparkConf().setMaster("local").setAppName("BasicAvg");
  33.         JavaSparkContext sc = new JavaSparkContext(conf);

  34.         JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4));

  35.         CountAvg countAvg = new CountAvg(0, 0);
  36.         CountAvg avg = rdd.aggregate(countAvg, new Function2<CountAvg, Integer, CountAvg>() {
  37.              @Override
  38.              public CountAvg call(CountAvg countAvg, Integer integer) throws Exception {
  39.                  countAvg.num = countAvg.num + 1;
  40.                  countAvg.total = countAvg.total + integer;
  41.                  return countAvg;
  42.              }
  43.          }, new Function2<CountAvg, CountAvg, CountAvg>() {
  44.              @Override
  45.              public CountAvg call(CountAvg countAvg, CountAvg countAvg2) throws Exception {
  46.                  countAvg.num = countAvg2.num + countAvg.num ;
  47.                  countAvg.total = countAvg2.total + countAvg.total;
  48.                  return countAvg;
  49.              }
  50.          });

  51.         System.out.println(avg.avg());

  52.     }

上一篇:Spark学习笔记--wordcount
下一篇:Spark学习笔记-累计器和广播变量