通过 DefaultFuture.received() 反馈执行结果。
public static void received(String id, boolean result) { DefaultFuture future = FUTURES.get(id); if (future != null) { // 累加失败任务数量 if (!result) { future.feedbackFailResultCount.incrementAndGet(); } // 累加执行完成任务数量 future.feedbackResultCount.incrementAndGet(); if (future.isDone()) { FUTURES.remove(id); future.doReceived(); } } } private void doReceived() { lock.lock(); try { if (done != null) { // 唤醒阻塞的线程 done.signal(); } } finally { lock.unlock(); } }下面我们来总结整个流程:
收到 MQ 消息,组装成 DefaultFuture,通过 get 方法获取执行结果,未执行完的时候此方法阻塞。
通过切面切入加了 EventListener 的方法,判断是否有异常来判断任务的执行结果。
通过 DefaultFuture.received() 反馈结果。
反馈时计算是否全部完成,全部完成则唤醒阻塞的线程。DefaultFuture.get()就能获取到结果。
是否要进行 ACK 操作。
需要注意的是每个 EventListener 内部消费的逻辑都要做幂等控制。
源码地址:https://github.com/yinjihuan/kitty-cloud/tree/master/kitty-cloud-mqconsume