实例讲解Hadoop中的map/reduce查询(Python语言实现)

条件,假设你已经装好了Hadoop集群,配好了hdfs并可以正常运行。


$hadoop dfs -ls /data/dw/explorer
Found 1 items
drwxrwxrwx     - rsync supergroup                    0 2011-11-30 01:06 /data/dw/explorer/20111129


$ hadoop dfs -ls /data/dw/explorer/20111129
Found 4 items
-rw-r--r--     3 rsync supergroup     12294748 2011-11-29 21:10 /data/dw/explorer/20111129/explorer_20111129_19_part-00000.lzo
-rw-r--r--     3 rsync supergroup             1520 2011-11-29 21:11 /data/dw/explorer/20111129/explorer_20111129_19_part-00000.lzo.index
-rw-r--r--     3 rsync supergroup     12337366 2011-11-29 22:09 /data/dw/explorer/20111129/explorer_20111129_20_part-00000.lzo
-rw-r--r--     3 rsync supergroup             1536 2011-11-29 22:10 /data/dw/explorer/20111129/explorer_20111129_20_part-00000.lzo.index


数据格式如下


20111129/23:59:54 111.161.25.184 182.132.25.243 <Log_Explorer ProductVer="5.05.1026.1111" UUID="{C9B80A9B-704E-B106-9134-1ED3581D0123}"><UserDoubleClick FileExt="mp3" AssociateKey="Audio.mp3" Count="1"/></Log_Explorer>




1.map脚本取数据explorer_map.py


#!/usr/bin/Python
#-*-coding:UTF-8 -*-
import sys
import cElementTree

debug = False#设置lzo文件偏移位
if debug:
        lzo = 0
else:
        lzo = 1

for line in sys.stdin:
        try:
                flags = line[:-1].split('\t')
#hadoop查询走标准输入,数据以\t分隔,去掉每行中的\n
                if len(flags) == 0:
                        break
                if len(flags) != 11+lzo:
#hadoop采用lzo则偏移位+1,lzo设置为False则+1
                        continue
                stat_date=flags[0+lzo]#日期
                stat_date_bar = stat_date[:4]+"-"+stat_date[4:6]+'-'+stat_date[6:8]#拼成2011-11-29格式
                version = flags[4+lzo]
                xmlstr = flags[10+lzo]
                #xmlstr=line
                dom = cElementTree.fromstring(xmlstr)
#xml字段对象,以下均为取值操作
                uuid = dom.attrib['UUID']
                node = dom.find('UserDoubleClick')
                associateKey=node.get('AssociateKey')
                associateKeys=associateKey.split('.')
                player = associateKeys[0]
                fileext=node.get('FileExt')
                count=node.get('Count')
                print stat_date_bar+','+version+','+fileext+','+player+','+associateKey+'\t'+count
#输出map后的数据,这里map不对数据做任何处理,只做取值,拼接操作
#将\t前的字符串作为key输入reduce,\t后的count作为reduce计算用的value
except Exception,e:
print e
#抛出异常        


2.reduce脚本计算结果并输出explorer_red.py


内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/psjjg.html