第 31 行就是监听数据源被发现的事件,回调函数的参数类型是空接口(其实就是事件的名称)。在回调函数内,调用我随后要建立的 SubscribeToDataEvent 方法,把 eventData 转化为字符串传递进去。
下面看一看 SubscribeToDataEvent 方法:
该方法的参数是事件的名称。
第 39 行,对已注册的监听器进行遍历,如果传进来的事件名称已注册,return 即可。
否则的话,需要注册这个数据源,这个事件的名称是 MessageReceived_+Queue 的名称。
第 45 行的回调函数,我将传入一个立即执行的匿名函数,它会返回我们真正需要使用的回调函数,也就是闭包。这种做法的好处就是返回的函数可以捕获其被定义的作用域内的变量,这样的话真正的回调函数就可以拥有一些可持续的“状态”(也就是 prevTime 和 buf)。这里我的需求是至少要间隔 5 秒钟以上,才记录一次(到数据库)。
回调函数内其它的逻辑都很简单,就不逐行介绍了。
第 67 行,发布消息使用的是 Default Exchange,并路由到持久化的那个 Queue。
修改 queuelistener.go 里面的构造函数
让其传入 EventAggregator 作为参数并赋值给 QueueListener 的 ea 字段。
修改协调器的 main 函数
创建包级共享的 DatabaseConsumer 变量,在 main 里用构造函数进行创建并赋值。
创建 EventAggregator,并传递给 DatabaseConsumer 和 QueueListener,让他们共享同一个 EventAggregator。