Hadoop之使用python实现数据集合间join操作

2820阅读 0评论2016-03-17 levy-linux
分类:HADOOP

hadoop之steaming介绍

hadoop有个工具叫做steaming,能够支持python、shell、C++、PHP等其他任何支持标准输入stdin及标准输出stdout的语言,其运行原理可以通过和标准java的map-reduce程序对比来说明:
使用原生java语言实现Map-reduce程序

    hadoop准备好数据后,将数据传送给java的map程序
    java的map程序将数据处理后,输出O1
    hadoop将O1打散、排序,然后传给不同的reduce机器
    每个reduce机器将传来的数据传给reduce程序
    reduce程序将数据处理,输出最终数据O2

借助hadoop streaming使用python语言实现Map-reduce程序

    hadoop准备好数据后,将数据传送给java的map程序
    java的map程序将数据处理成“键/值”对,并传送给python的map程序
    python的map程序将数据处理后,将结果传回给java的map程序
    java的map程序将数据输出为O1
    hadoop将O1打散、排序,然后传给不同的reduce机器
    每个reduce机器将传来的数据处理成“键/值”对,并传送给python的reduce程序
    python的reduce程序将数据处理后,将结果返回给java的reduce程序
    java的reduce程序将数据处理,输出最终数据O2

上面红色表示map的对比,蓝色表示reduce的对比,可以看出streaming程序多了一步中间处理,这样说来steaming程序的效率和性能应该低于java版的程序,然而python的开发效率、运行性能有时候会大于java,这就是streaming的优势所在。
hadoop之实现集合join的需求

hadoop是用来做数据分析的,大都是对集合进行操作,因此该过程中将集合join起来使得一个集合能得到另一个集合对应的信息的需求非常常见。

比如以下这个需求,有两份数据:学生信息(学号,姓名)和学生成绩(学号、课程、成绩),特点是有个共同的主键“学号”,现在需要将两者结合起来得到数据(学号,姓名,课程,成绩),计算公式:

(学号,姓名) join (学号,课程,成绩)= (学号,姓名,课程,成绩)

数据事例1-学生信息(data_info):

学号sno 姓名name
01 name1
02 name2
03 name3
04 name4

数据事例2:-学生成绩(data_grade):

学号sno 课程号courseno 成绩grade
01 01 80
01 02 90
02 01 82
02 02 95

期待的最终输出:

学号sno 姓名name 课程courseno 成绩grade
01 name1 01 80
01 name1 02 90
02 name2 01 82
02 name2 02 95

实现join的注意点和易踩坑总结

如果你想写一个完善健壮的map reduce程序,我建议你首先弄清楚输入数据的格式、输出数据的格式,然后自己手动构建输入数据并手动计算出输出数据,这个过程中你会发现一些写程序中需要特别处理的地方:

    实现join的key是哪个,是1个字段还是2个字段,本例中key是sno,1个字段
    每个集合中key是否可以重复,本例中数据1不可重复,数据2的key可以重复
    每个集合中key的对应值是否可以不存在,本例中有学生会没成绩,所以数据2的key可以为空

第1条会影响到hadoop启动脚本中key.fields和partition的配置,第2条会影响到map-reduce程序中具体的代码实现方式,第3条同样影响代码编写方式。
hadoop实现join操作的思路

具体思路是给每个数据源加上一个数字标记label,这样hadoop对其排序后同一个字段的数据排在一起并且按照label排好序了,于是直接将相邻相同key的数据合并在一起输出就得到了结果。

1、 map阶段:给表1和表2加标记,其实就是多输出一个字段,比如表一加标记为0,表2加标记为2;

2、 partion阶段:根据学号key为第一主键,标记label为第二主键进行排序和分区

3、 reduce阶段:由于已经按照第一主键、第二主键排好了序,将相邻相同key数据合并输出
hadoop使用python实现join的map和reduce代码

将本地文件data_info与data_grade上传到HDFS目录
    
hadoop fs -mkdir /user/hdfs/jointest/input/
hadoop fs -put data_info data_grade /user/hdfs/jointest/input/

mapper.py的代码:

点击(此处)折叠或打开

  1. # -*- coding: utf-8 -*-
  2. #!/usr/bin/env python
  3. #Mapper.py
  4. import os
  5. import sys

  6. #mapper脚本
  7. def mapper():
  8.         #获取当前正在处理的文件的名字,这里我们有两个输入文件
  9.         #所以要加以区分
  10.         filepath = os.environ["map_input_file"]
  11.         filename = os.path.split(filepath)[-1]
  12.         for line in sys.stdin:
  13.                 if line.strip()=="":
  14.                         continue
  15.                 fields = line[:-1].split("\t")
  16.                 sno = fields[0]
  17.                 #以下判断filename的目的是不同的文件有不同的字段,并且需加上不同的标记
  18.                 if filename == 'data_info':
  19.                         name = fields[1]
  20.                         #下面的数字'0'就是为数据源1加上的统一标记
  21.                         print '\t'.join((sno,'0',name))
  22.                 elif filename == 'data_grade':
  23.                         courseno = fields[1]
  24.                         grade = fields[2]
  25.                         #下面的数字'1'就是为数据源1加上的统一标记
  26.                         print '\t'.join((sno,'1',courseno,grade))

  27. if __name__=='__main__':
  28.         mapper()

reducer的代码:

点击(此处)折叠或打开

  1. # -*- coding: utf-8 -*-
  2. #!/usr/bin/env python
  3. #reducer.py
  4. import sys
  5. #为了记录和上一个记录的区别,用lastsno记录上个sno
  6. lastsno = ''
  7. for line in sys.stdin:
  8.     if line.strip()=="":
  9.         continue
  10.     fields = line.strip().split('\t')
  11.     sno = fields[0]
  12.     '''
  13.     处理思路:
  14.     遇见当前key与上一条key不同并且label=0,就记录下来name值,
  15.     当前key与上一条key相同并且label==1,则将本条数据的courseno、
  16.     grade联通上一条记录的name一起输出成最终结果
  17.     '''
  18.     if sno != lastsno:
  19.         name=''
  20.     #这里没有判断label==1的情况,
  21.     #因为sno!=lastno,并且label=1表示该条key没有数据源1的数据
  22.         if fields[1] == '0':
  23.             name = fields[2]
  24.     elif sno==lastsno:
  25.     #这里没有判断label==0的情况,
  26.     #因为sno==lastno并且label==0表示该条key没有数据源2的数据
  27.         if fields[1] == '1':
  28.             courseno = fields[2]
  29.             grade = fields[3]
  30.             if name:
  31.                 print '\t'.join((lastsno,name,courseno,grade))
  32.     lastsno = sno

使用shell脚本启动hadoop程序的方法:

点击(此处)折叠或打开

  1. #!/bin/bash
  2. #先删除输出目录
  3. hdfs dfs -rmr /user/hdfs/jointest/output

  4. hadoop jar /usr/hdp/2.3.0.0-2557/hadoop-mapreduce/hadoop-streaming.jar \
  5.         -D mapred.map.tasks=5 \
  6.         -D mapred.reduce.tasks=5 \
  7.         -D mapred.job.map.capacity=5 \
  8.         -D mapred.job.reduce.capacity=5 \
  9.         -D mapred.job.name="join--sno_name-sno_courseno_grade" \
  10.         -D num.key.fields.for.partition=1 \
  11.         -D stream.num.map.output.key.fields=2 \
  12.         -D stream.non.zero.exit.is.failure=false \
  13.         -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
  14.         -input "/user/hdfs/jointest/input/data_info" \
  15.         -input "/user/hdfs/jointest/input/data_grade" \
  16.         -output "/user/hdfs/jointest/output" \
  17.         -mapper "python mapper.py" \
  18.         -reducer "python reducer.py" \
  19.         -file "mapper.py" \
  20.         -file "reducer.py"

更多需要注意的地方

hadoop的join操作可以分为很多类型,各种类型脚本的编写有所不同,其分类是按照key字段数目、value字段数目、key是否可重复来划分的,以下是一个个人总结的对照表,表示会影响的地方:

影响类型
影响的范围

key字段数目:
1、启动脚本中num.key.fields.for.partition的配置
2、启动脚本中stream.num.map.output.key.fields的配置
3、map和reduce脚本中key的获取
4、map和reduce脚本中每一条数据和上一条数据比较的方法key是否可重复如果数据源1可重复,标记为M;数据源2可重复标记为N,那么join可以分为:1*1、M*1、M*N类型

1*1类型:
reduce中先记录第一个value,然后在下一条直接合并输出;

M*1类型:
将类型1作为标记小的输出,然后每次遇见label=1就记录value,每遇见一次label=2就输出一次最终结果;

M*N类型:
遇见类型1,就用数组记录value值,遇见label=2就将将记录的数组值全部连同该行value输出。value字段数目影响每次label=1时记录的数据个数,需要将value都记录下来

参考:

http://www.cnblogs.com/tychyg/p/5277494.html
上一篇:ambari迁移HistoryServer服务
下一篇:Ubuntu下安装numpy matplotlib scikit-learn