Java9 基于异步响应流的发布(2)

request() 方法添加n个数据项到当前未满的订阅请求中。如果n小于或等于0,订阅者的onError() 方法会被调用,并且抛出IllegalArgumentException 异常,此外,如果n大于0,订阅者就会在onNext() 方法的调用下接收到n个数据项,除非中间异常终止。 从Long.MAX_VALUE次到n次中间是无界的调用。

cancel() 用来终止订阅者接收数据项,它有一种尝试机制,也就是说,在调用它之后也有可能收到数据项。

最后,数据处理器(Processor)在不改变发布者与订阅者的情况下基于流做数据处理,可以在发布者与订阅者之间放多个数据处理器,成为一个处理器链,发布者与订阅者不依赖于数据处理,它们是单独的过程。JDK9中不提供具体的数据处理器,必须由开发者来通过实现无方法声明的Processor接口来自行构建。

SubmissionPublisher 实现自Flow.Publisher 接口,向当前订阅者异步提交非空的数据项,直到它被关闭。每个当前订阅者以一个相同的顺序接收新提交的数据项,除非数据项丢失或者遇到异常。SubmissionPublisher 允许数据项在丢失或阻塞的时候扮演发布者角色。

SubmissionPublisher 提供了三个构造方法来获取实例。无参的构造器依赖于 ForkJoinPool.commonPool() 方法来提交发布者,以此实现生产者向订阅者提供数据项的异步特性。

下面的程序演示了SubmissionPublisher 用法和这套发布-订阅框架的其他特性:

import java.util.Arrays; import java.util.concurrent.Flow.*; import java.util.concurrent.SubmissionPublisher; public class FlowDemo { public static void main(String[] args) { // Create a publisher. SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); // Create a subscriber and register it with the publisher. MySubscriber<String> subscriber = new MySubscriber<>(); publisher.subscribe(subscriber); // Publish several data items and then close the publisher. System.out.println("Publishing data items..."); String[] items = { "jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec" }; Arrays.asList(items).stream().forEach(i -> publisher.submit(i)); publisher.close(); try { synchronized("A") { "A".wait(); } } catch (InterruptedException ie) { } } } class MySubscriber<T> implements Subscriber<T> { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(T item) { System.out.println("Received: " + item); subscription.request(1); } @Override public void onError(Throwable t) { t.printStackTrace(); synchronized("A") { "A".notifyAll(); } } @Override public void onComplete() { System.out.println("Done"); synchronized("A") { "A".notifyAll(); } } }

Java9 基于异步响应流的发布

其中使用了wait()和notifyAll() 方法来使主线程等到onComplete() 的完成,否则是不会看到任何输出的。

下面是输出结果:

Publishing data items... Received: jan Received: feb Received: mar Received: apr Received: may Received: jun Received: jul Received: aug Received: sep Received: oct Received: nov Received: dec Done

Java9 基于异步响应流的发布

最后说一句,熟悉RxJava的同学可以会心一笑了。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/5b9a120d30a33835c03ff6627a299e2b.html