条件,假设你已经装好了Hadoop集群,配好了hdfs并可以正常运行。
$hadoop dfs -ls /data/dw/explorer
Found 1 items
drwxrwxrwx - rsync supergroup 0 2011-11-30 01:06 /data/dw/explorer/20111129
$ hadoop dfs -ls /data/dw/explorer/20111129
Found 4 items
-rw-r--r-- 3 rsync supergroup 12294748 2011-11-29 21:10 /data/dw/explorer/20111129/explorer_20111129_19_part-00000.lzo
-rw-r--r-- 3 rsync supergroup 1520 2011-11-29 21:11 /data/dw/explorer/20111129/explorer_20111129_19_part-00000.lzo.index
-rw-r--r-- 3 rsync supergroup 12337366 2011-11-29 22:09 /data/dw/explorer/20111129/explorer_20111129_20_part-00000.lzo
-rw-r--r-- 3 rsync supergroup 1536 2011-11-29 22:10 /data/dw/explorer/20111129/explorer_20111129_20_part-00000.lzo.index
数据格式如下
20111129/23:59:54 111.161.25.184 182.132.25.243 <Log_Explorer ProductVer="5.05.1026.1111" UUID="{C9B80A9B-704E-B106-9134-1ED3581D0123}"><UserDoubleClick FileExt="mp3" AssociateKey="Audio.mp3" Count="1"/></Log_Explorer>
1.map脚本取数据explorer_map.py
#!/usr/bin/Python
#-*-coding:UTF-8 -*-
import sys
import cElementTree
debug = False#设置lzo文件偏移位
if debug:
lzo = 0
else:
lzo = 1
for line in sys.stdin:
try:
flags = line[:-1].split('\t')
#hadoop查询走标准输入,数据以\t分隔,去掉每行中的\n
if len(flags) == 0:
break
if len(flags) != 11+lzo:
#hadoop采用lzo则偏移位+1,lzo设置为False则+1
continue
stat_date=flags[0+lzo]#日期
stat_date_bar = stat_date[:4]+"-"+stat_date[4:6]+'-'+stat_date[6:8]#拼成2011-11-29格式
version = flags[4+lzo]
xmlstr = flags[10+lzo]
#xmlstr=line
dom = cElementTree.fromstring(xmlstr)
#xml字段对象,以下均为取值操作
uuid = dom.attrib['UUID']
node = dom.find('UserDoubleClick')
associateKey=node.get('AssociateKey')
associateKeys=associateKey.split('.')
player = associateKeys[0]
fileext=node.get('FileExt')
count=node.get('Count')
print stat_date_bar+','+version+','+fileext+','+player+','+associateKey+'\t'+count
#输出map后的数据,这里map不对数据做任何处理,只做取值,拼接操作
#将\t前的字符串作为key输入reduce,\t后的count作为reduce计算用的value
except Exception,e:
print e
#抛出异常
2.reduce脚本计算结果并输出explorer_red.py