Java并发编程框架Disruptor(2)

public class Main {
 
    public static void main(String[] args) throws InterruptedException {
 
        //创建订单工厂
        OrderFactory orderFactory = new OrderFactory();
 
        //ringbuffer的大小
        int RINGBUFFER_SIZE = 1024;
 
        //创建disruptor
        Disruptor<Order> disruptor = new Disruptor<Order>(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());
 
        //设置事件处理器 即消费者
        disruptor.handleEventsWith(new OrderHandler());
 
        disruptor.start();
 
        RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
 
        //-------------生产数据
        for(int i = 0 ; i < 3 ; i++){
 
            long sequence = ringBuffer.next();
 
            Order order = ringBuffer.get(sequence);
 
            order.setId(i);
 
            ringBuffer.publish(sequence);
            System.out.println(Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i);
        }
 
        Thread.sleep(1000);
 
        disruptor.shutdown();
    }
 
}

运行结果:

Java并发编程框架Disruptor

说明:

其实上面的结果已经很明显的说明了,在初始阶段构造Disruptor的时候,会调用工厂Factory去实例化RingBuffer中的Event数据对象。

另外在构造Disruptor的时候,在3.3.6之前使用的是API:

Java并发编程框架Disruptor

到了3.3.6这些API都不推荐使用了,即不再推荐传入Executor这样的线程池,而是推荐传入ThreadFactory线程工厂。这样的话,关闭disruptor就会自动关闭Executor线程池,而不需要像以前那样必须在关闭disruptor的时候再关闭线程池了。

构造Disruptor时,需要注意ProducerType(SINGLE or MULTI 指示是单个生产者还是多个生产者模式)、WaitStrategy(策略选择,决定了消费者如何等待生产者)。

Java并发编程框架Disruptor

单独使用RingBuffer:WorkerPool

如果场景比较简单,我们完全可以不用创建Disruptor,而是仅仅使用RingBuffer功能。

public static void main(String[] args) throws InterruptedException {
 
    ExecutorService executor = Executors.newFixedThreadPool(3);
    RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.SINGLE,new OrderFactory(),1024,new YieldingWaitStrategy());
    WorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer,ringBuffer.newBarrier(),new IgnoreExceptionHandler(),new OrderHandler());
 
    workerPool.start(executor);
 
    //-------------生产数据
    for(int i = 0 ; i < 30 ; i++){
 
        long sequence = ringBuffer.next();
 
        Order order = ringBuffer.get(sequence);
        order.setId(i);
 
        ringBuffer.publish(sequence);
 
        System.out.println(Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i);
    }
 
    Thread.sleep(1000);
   
    workerPool.halt();
    executor.shutdown();
 
}

实际上是利用WorkerPool辅助连接消费者。

一个生产者+多个消费者

Java并发编程框架Disruptor

public static void main(String[] args) throws InterruptedException {
 
    //创建订单工厂
    OrderFactory orderFactory = new OrderFactory();
 
    //ringbuffer的大小
    int RINGBUFFER_SIZE = 1024;
 
    //创建disruptor
    Disruptor<Order> disruptor = new Disruptor<Order>(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());
 
    //设置事件处理器 即消费者
    EventHandlerGroup<Order> eventHandlerGroup = disruptor.handleEventsWith(new OrderHandler(),new OrderHandler2());
    eventHandlerGroup.then(new OrderHandler3());
    disruptor.start();
 
    RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
 
    //-------------生产数据
    for(int i = 0 ; i < 3 ; i++){
 
        long sequence = ringBuffer.next();
 
        Order order = ringBuffer.get(sequence);
 
        order.setId(i);
 
        ringBuffer.publish(sequence);
        System.out.println(Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i);
    }
 
    Thread.sleep(1000);
    disruptor.shutdown();
}

运行结果:

Java并发编程框架Disruptor

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/a4b34e967540869213649a219a3c7a92.html