如何给Apache Pig自定义UDF函数?

近日由于工作所需,需要使用到Pig来分析线上的搜索日志数据,本人本打算使用hive来分析的,但由于种种原因,没有用成,而Pig(pig0.12-cdh)本人一直没有接触过,所以只能临阵磨枪了,花了两天时间,大致看完了pig官网的文档,在看文档期间,也是边实战边学习,这样以来,对pig的学习,会更加容易,当然本篇不是介绍如何快速学好一门框架或语言的文章,正如标题所示,本人打算介绍下如何在Pig中,使用用户自定义的UDF函数,关于学习经验,本人会在后面的文章里介绍。

一旦你学会了UDF的使用,就意味着,你可以以更加灵活的方式来使用Pig,使它扩展一些为我们的业务场景定制的特殊功能,而这些功能,在通用的pig里是没有的,举个例子:

你从HDFS上读取的数据格式,如果使用默认的PigStorage()来加载,存储可能只支持有限的数据编码和类型,如果我们定义了一种特殊的编码存储或序列化方式,那么当我们使用默认的Pig来加载的时候,就会发现加载不了,这时候我们的UDF就派上用场了,我们只需要自定义一个LoadFunction和一个StoreFunction就可以解决,这种问题。

本篇本人根据官方文档的例子,来实战一下,并在Hadoop集群上使用Pig测试通过:

我们先来看下定义一个UDF扩展类,需要几个步骤:

序号   步骤   说明  
1   在eclipse里新建一个java工程,并导入pig的核心包   java项目  
2   新建一个包,继承特定的接口或类,重写自定义部分   核心业务  
3   编写完成后,使用ant打包成jar   编译时需要pig依赖,但不用把pig的jar包打入UDF中  
4   把打包完成后的jar上传到HDFS上   pig运行时候需要加载使用  
5   在pig脚本里,注册我们自定义的udf的jar包   注入运行时环境  
6   编写我们的核心业务pig脚本运行   测试是否运行成功  

项目工程截图如下:

如何给Apache Pig自定义UDF函数?

核心代码如下:

package com.pigudf; 

 

import java.io.IOException; 

 

import org.apache.pig.EvalFunc; 

import org.apache.pig.data.Tuple; 

import org.apache.pig.impl.util.WrappedIOException; 

/** 

 * 自定义UDF类,对字符串转换大写 

 * @author qindongliang 

 * */ 

public class MyUDF extends EvalFunc<String> { 

 

    @Override 

    public String exec(Tuple input) throws IOException { 

         

         //判断是否为null或空,就跳过 

        if(input==null||input.size()==0){ 

            return null; 

        } 

        try{ 

            //获取第一个元素 

            String str=(String) input.get(0); 

            //转成大写返回 

            return str.toUpperCase(); 

             

        }catch(Exception e){ 

            throw WrappedIOException.wrap("Caught exception processing input row ",e); 

        } 

    } 

     

 



关于打包的ant脚本,散仙会在文末上传附件,下面看下造的一些测试数据(注意,文件一定要上传到HDFS上,除非你是local模式): 

Java代码 

grunt> cat s.txt 

zhang san,12 

Song,34 

long,34 

abC,12 

grunt>   




我们在看下,操作文件和jar包是放在一起的: 

Java代码 

grunt> ls 

hdfs://dnode1:8020/tmp/udf/pudf.jar<r 3>        1295 

hdfs://dnode1:8020/tmp/udf/s.txt<r 3>   36 

grunt>   



最后,我们看下pig脚本的定义: 

Pig代码 

--注册自定义的jar包 

REGISTER pudf.jar;   

--加载测试文件的数据,逗号作为分隔符 

a = load 's.txt' using PigStorage(',');     

--遍历数据,对name列转成大写 

b =  foreach a generate com.pigudf.MyUDF((chararray)$0);   

--启动MapReduce的Job进行数据分析 

dump b 


最后,我们看下结果,只要过程不出现异常和任务失败,就证明我们的udf使用成功: 

Java代码 

Counters: 

Total records written : 

Total bytes written : 64 

Spillable Memory Manager spill count : 

Total bags proactively spilled: 

Total records proactively spilled: 

 

Job DAG: 

job_1419419533357_0147 

 

 

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/3185b10fd5c6f765fee3dd57ff46f27c.html