嘘!异步事件这样用真的好么? (2)

通过 DefaultFuture.received() 反馈执行结果。

public static void received(String id, boolean result) { DefaultFuture future = FUTURES.get(id); if (future != null) { // 累加失败任务数量 if (!result) { future.feedbackFailResultCount.incrementAndGet(); } // 累加执行完成任务数量 future.feedbackResultCount.incrementAndGet(); if (future.isDone()) { FUTURES.remove(id); future.doReceived(); } } } private void doReceived() { lock.lock(); try { if (done != null) { // 唤醒阻塞的线程 done.signal(); } } finally { lock.unlock(); } }

下面我们来总结整个流程:

收到 MQ 消息,组装成 DefaultFuture,通过 get 方法获取执行结果,未执行完的时候此方法阻塞。

通过切面切入加了 EventListener 的方法,判断是否有异常来判断任务的执行结果。

通过 DefaultFuture.received() 反馈结果。

反馈时计算是否全部完成,全部完成则唤醒阻塞的线程。DefaultFuture.get()就能获取到结果。

是否要进行 ACK 操作。

需要注意的是每个 EventListener 内部消费的逻辑都要做幂等控制。

源码地址:https://github.com/yinjihuan/kitty-cloud/tree/master/kitty-cloud-mqconsume

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyxxpp.html