简单地说,complete latency 表示了tuple 从emit 到被acked 经过的时间,可以认为是tuple以及该tuple 的后续子孙(形成一棵树)整个处理时间。其次spout 的emit 和transfered 还统计了spout和acker 之间内部的通信信息,比如对于可靠处理的spout 来说,会在emit 的时候同时发送一个_ack_init给acker,记录tuple id 到task id 的映射,以便ack 的时候能找到正确的acker task。
1.8关于Storm的ack和fail问题
在学习storm的过程中,有不少人对storm的Spout组件中的ack及fail相关的问题存在困惑,这里做一个简要的概述。
Storm保证每一个数据都得到有效处理,这是如何保证的呢?正是ack及fail机制确保数据都得到处理的保证,但是storm只是提供给我们一个接口,而具体的方法得由我们自己来实现。例如在spout下一个拓扑节点的bolt上,我们定义某种情况下为数据处理失败,则调用fail,则我们可以在fail方法中进行数据重发,这样就保证了数据都得到了处理。其实,通过读storm的源码,里面有讲到,有些类(BaseBasicBolt?)是会自动调用ack和fail的,不需要我们程序员去ack和fail,但是其他Bolt就没有这种功能了。
1.9关于IRichBolt与IBasicBolt接口的区别
首先从类的组成上进行分析可以看到,IBasicBolt接口只有execute方法和declareOutputFields方法,而IRichBolt接口上除了以上几个方法还有prepare方法和cleanup及map方法。而且其中execute方法是有些不一样的,其参数列表不同。
总体来说Rich方法比较完善,我们可以使用prepare方法进行该Bolt类的初始化工作,例如我们链接数据库时,需要进行一次数据库连接操作,我们就可以把该操作放入prepare中,只需要执行一次就可以了。而cleanup方法能在该类调用结束时进行收尾工作,往往在处理数据的时候用到,例如在写hdfs(Hadoop的文件系统)数据的时候,在结束时需要进行数据clear,则需要进行数据收尾。当然,根据官网及实际的测验,该方法往往是执行失败的。
2关于Topology发布
2.1发布topologies 到远程集群时,出现Nimbus host is not set 异常
原因是Nimbus 没有被正确启动起来,可能是storm.yaml 文件没有配置,或者配置有问题。
解决方法:打开storm.yaml 文件正确配置:nimbus.host: "xxx.xxx.xxx.xxx",重启nimbus后台程序即可。
2.2发布topology到远程集群时,出现AlreadyAliveException(msg: xxx is alreadyactive)异常
原因是提供的topology 与已经在运行的topology 重名。
解决方法:发布时换一个拓扑名称即可。
2.3启动Supervisor 时,出现java.lang.UnsatisfiedLinkError
具体信息:启动Supervisor 时,出现java.lang.UnsatisfiedLinkError:
/usr/local/lib/libjzmq.so.0.0.0: libzmq.so.1: cannot open sharedobject
file: No such file or directory 异常。
原因是未找到zmq 动态链接库。
解决方法1:配置环境变量 export LD_LIBRARY_PATH=/usr/local/lib
解决方法2:编辑/etc/ld.so.conf 文件,增加一行:/usr/local/lib。再执行
sudo ldconfig 命令,重启Supervisor。
2.4发布topologies 时,出现不能序列化log4j.Logger 的异常
原因是日志系统无法正确支付序列化。
解决方法:使用slf4j 代替log4j。
2.5bolt 在处理消息时,worker 的日志中出现Failing message
原因:可能是因为Topology 的消息处理超时所致。
解决方法:提交Topology 时设置适当的消息超时时间,比默认消息超时时间(30
秒)更长。比如:
conf.setMessageTimeoutSecs(60);
2.6在打包toplogy工程的时候, 如果采用assembly方式,对于相关的依赖的配置一般要这样:
Xml代码
1. <dependencySets>
2. <dependencySet>
3. <outputDirectory>/</outputDirectory>
4. <unpack>true</unpack>
5. <excludes>
6. <exclude>storm:storm</exclude>
7. </excludes>
8. </dependencySet>
9. </dependencySets>
wiki上说可以用<scope>compile</scope>。然后将storm依赖设置为runtime,貌似不行。 另外就是所有的依赖包将全部解压,然后将所有依赖的配置和class文件生成一个文件。这个是通过<unpack>true</unpack>参数来控制的。
2.7在提交topology的时候有时可能出现如下异常: