ikanalyzer的实现
@Description(name = "ansj_seg", value = "_FUNC_(str) - chinese words segment using Iknalyzer. Return list of words.", extended = "Example: select _FUNC_('我是测试字符串') from src limit 1;\n" + "[\"我\", \"是\", \"测试\", \"字符串\"]") public class IknalyzerSeg extends GenericUDF { private transient ObjectInspectorConverters.Converter[] converters; //用来存放停用词的集合 Set<String> stopWordSet = new HashSet<String>(); @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { if (arguments.length < 1 || arguments.length > 2) { throw new UDFArgumentLengthException( "The function AnsjSeg(str) takes 1 or 2 arguments."); } //读入停用词文件 BufferedReader StopWordFileBr = null; try { StopWordFileBr = new BufferedReader(new InputStreamReader(new FileInputStream(new File("stopwords/baidu_stopwords.txt")))); //初如化停用词集 String stopWord = null; for(; (stopWord = StopWordFileBr.readLine()) != null;){ stopWordSet.add(stopWord); } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } converters = new ObjectInspectorConverters.Converter[arguments.length]; converters[0] = ObjectInspectorConverters.getConverter(arguments[0], PrimitiveObjectInspectorFactory.writableStringObjectInspector); if (2 == arguments.length) { converters[1] = ObjectInspectorConverters.getConverter(arguments[1], PrimitiveObjectInspectorFactory.writableIntObjectInspector); } return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector); } @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { boolean filterStop = false; if (arguments[0].get() == null) { return null; } if (2 == arguments.length) { IntWritable filterParam = (IntWritable) converters[1].convert(arguments[1].get()); if (1 == filterParam.get()) filterStop = true; } Text s = (Text) converters[0].convert(arguments[0].get()); StringReader reader = new StringReader(s.toString()); IKSegmenter iks = new IKSegmenter(reader, true); List<Text> list = new ArrayList<>(); if (filterStop) { try { Lexeme lexeme; while ((lexeme = iks.next()) != null) { if (!stopWordSet.contains(lexeme.getLexemeText())) { list.add(new Text(lexeme.getLexemeText())); } } } catch (IOException e) { } } else { try { Lexeme lexeme; while ((lexeme = iks.next()) != null) { list.add(new Text(lexeme.getLexemeText())); } } catch (IOException e) { } } return list; } @Override public String getDisplayString(String[] children) { return "Usage: evaluate(String str)"; } } 第四步:编写测试用例GenericUDF 给我们提供了一些方法,这些方法可以用来构建测试需要的环境和参数,这样我们就可以测试这些代码了
@Test public void testAnsjSegFunc() throws HiveException { AnsjSeg udf = new AnsjSeg(); ObjectInspector valueOI0 = PrimitiveObjectInspectorFactory.javaStringObjectInspector; ObjectInspector valueOI1 = PrimitiveObjectInspectorFactory.javaIntObjectInspector; ObjectInspector[] init_args = {valueOI0, valueOI1}; udf.initialize(init_args); Text str = new Text("我是测试字符串"); GenericUDF.DeferredObject valueObj0 = new GenericUDF.DeferredJavaObject(str); GenericUDF.DeferredObject valueObj1 = new GenericUDF.DeferredJavaObject(0); GenericUDF.DeferredObject[] args = {valueObj0, valueObj1}; ArrayList<Object> res = (ArrayList<Object>) udf.evaluate(args); System.out.println(res); } @Test public void testIkSegFunc() throws HiveException { IknalyzerSeg udf = new IknalyzerSeg(); ObjectInspector valueOI0 = PrimitiveObjectInspectorFactory.javaStringObjectInspector; ObjectInspector valueOI1 = PrimitiveObjectInspectorFactory.javaIntObjectInspector; ObjectInspector[] init_args = {valueOI0, valueOI1}; udf.initialize(init_args); Text str = new Text("我是测试字符串"); GenericUDF.DeferredObject valueObj0 = new GenericUDF.DeferredJavaObject(str); GenericUDF.DeferredObject valueObj1 = new GenericUDF.DeferredJavaObject(0); GenericUDF.DeferredObject[] args = {valueObj0, valueObj1}; ArrayList<Object> res = (ArrayList<Object>) udf.evaluate(args); System.out.println(res); }我们看到加载停用词没有找到,但是整体还是跑起来了,因为读取不到HDFS 上的文件
但是我们第二个样例是不需要从HDFS 上加载停用词信息,所以可以完美的测试运行