点击(此处)折叠或打开
-
import org.apache.hadoop.fs.FileSystem;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-
import java.io.IOException;
-
-
-
public class MyPairsJob {
-
public static class MyPairsMapper extends Mapper<Object, Text, Text, IntWritable>{
-
@Override
-
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
-
System.out.println("map input:"+ value.toString());
-
String []arrays = value.toString().split(",");
-
for (int i=0;i<arrays.length;i++){//逐个遍历输入数据
-
String curPrdId = arrays[i];
-
for (int j=i+1;j<arrays.length;j++){//依次遍历后边数据
-
String nextPrdId = arrays[j];
-
if(curPrdId.equals(nextPrdId)){//过滤相同商品ID
-
continue;
-
}
-
-
String strKey = "["+arrays[i]+","+arrays[j]+"]";//生成key
-
//按商品ID大小比较,避免将[1,2] 和[2,1]这两种相同的情况分发到不同的reduce中
-
if(Integer.parseInt(curPrdId)>Integer.parseInt(nextPrdId)){
-
strKey = "["+arrays[j]+","+arrays[i]+"]";
-
}
-
System.out.println("map out:"+ strKey+", 1");
-
context.write(new Text(strKey), new IntWritable(1));
-
}
-
}
-
}
-
}
-
-
public static class MyPairsReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
-
@Override
-
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
-
int sum=0;
-
for (IntWritable val:values){
-
sum=sum+val.get();
-
}
-
context.write(key,new IntWritable(sum));
-
}
-
}
-
-
public static void main(String []args){
-
try {
-
Job job = Job.getInstance();
-
job.setJobName("MyPairsJob");
-
job.setJarByClass(MyPairsJob.class);
-
job.setMapperClass(MyPairsMapper.class);
-
job.setMapOutputKeyClass(Text.class);
-
job.setMapOutputValueClass(IntWritable.class);
-
-
job.setReducerClass(MyPairsReduce.class);
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(IntWritable.class);
-
-
job.setNumReduceTasks(1);
-
-
FileInputFormat.setInputPaths(job, new Path(args[0]));
-
FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
-
FileSystem.get(job.getConfiguration()).delete(new Path(args[1]), true);
-
-
System.out.println(job.waitForCompletion(true));
-
} catch (IOException e) {
-
e.printStackTrace();
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} catch (ClassNotFoundException e) {
-
e.printStackTrace();
-
}
-
-
}
- }