MapReduce设计模式--分区

3020阅读 0评论2018-06-30 zpf1218
分类:云计算

     分区模式将记录进行分类,但它并不关心记录的顺序。其主要目的是将数据集中相似的记录分成不同的、更小的数据集
     分区主要原理是,自定义分区类继承Partitioner,根据业务需求实现分区函数 public int getPartition(Text key, Text value, int numPartitions),将Key相同的记录,划分到同一reduce函数中。需要注意的是如果在驱动程序中将NumReduceTasks值设置为时, 不会执行分区函数。这个可以理解,毕竟只有1个reduce,所以,没有必要执行Partitioner
 
具体实现如下:

点击(此处)折叠或打开

  1. import org.apache.hadoop.fs.FileSystem;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.Mapper;
  8. import org.apache.hadoop.mapreduce.Partitioner;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  12. import java.io.IOException;

  13. public class PartitionJob {

  14.     public static class PartitionMapper extends Mapper<LongWritable ,Text, Text,Text>{
  15.         private Text tmpKey = new Text();
  16.         @Override
  17.         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  18.             String data = value.toString();
  19.             String []arrays = data.split("\t");
  20.             tmpKey.set(arrays[0]);
  21.             System.out.println("data:"+data);
  22.             context.write(tmpKey, value);
  23.         }
  24.     }

  25.     public static class PartitionReduce extends Reducer<Text, Text, NullWritable, Text>{
  26.         private Text out = new Text();
  27.         @Override
  28.         protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  29.             String str="";
  30.             for (Text val:values){
  31.                 context.write(NullWritable.get(), val);
  32.             }
  33.         }

  34.     }

  35.     public static class Partition extends Partitioner<Text ,Text>{
  36.         @Override
  37.         public int getPartition(Text key, Text value, int numPartitions) {
  38.             return Math.abs(key.hashCode())%numPartitions;
  39.         }

  40.     }

  41.     public static void main(String []args){

  42.         try {
  43.             Job job = Job.getInstance();
  44.             job.setJobName("PartitionJob");
  45.             job.setJarByClass(PartitionJob.class);
  46.             job.setMapperClass(PartitionMapper.class);
  47.             job.setMapOutputKeyClass(Text.class);
  48.             job.setMapOutputValueClass(Text.class);

  49.             job.setReducerClass(PartitionReduce.class);
  50.             job.setOutputKeyClass(NullWritable.class);
  51.             job.setOutputValueClass(Text.class);
  52.             job.setPartitionerClass(Partition.class);
  53.             job.setNumReduceTasks(2);

  54.             FileInputFormat.addInputPath(job, new Path(args[0]));
  55.             FileOutputFormat.setOutputPath(job, new Path(args[1]));
  56.             FileSystem.get(job.getConfiguration()).delete(new Path(args[1]), true);

  57.            System.out.println(job.waitForCompletion(true));
  58.         } catch (IOException e) {
  59.             e.printStackTrace();
  60.         } catch (InterruptedException e) {
  61.             e.printStackTrace();
  62.         } catch (ClassNotFoundException e) {
  63.             e.printStackTrace();
  64.         }


  65.     }
  66. }

上一篇:MapReduce 分层结构模式--多map输入
下一篇:MapReduce设计模式--分箱(多个输出)