public class Main {
public static void main(String[] args) throws InterruptedException {
//创建订单工厂
OrderFactory orderFactory = new OrderFactory();
//ringbuffer的大小
int RINGBUFFER_SIZE = 1024;
//创建disruptor
Disruptor<Order> disruptor = new Disruptor<Order>(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());
//设置事件处理器 即消费者
disruptor.handleEventsWith(new OrderHandler());
disruptor.start();
RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
//-------------生产数据
for(int i = 0 ; i < 3 ; i++){
long sequence = ringBuffer.next();
Order order = ringBuffer.get(sequence);
order.setId(i);
ringBuffer.publish(sequence);
System.out.println(Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i);
}
Thread.sleep(1000);
disruptor.shutdown();
}
}
运行结果:
说明:
其实上面的结果已经很明显的说明了,在初始阶段构造Disruptor的时候,会调用工厂Factory去实例化RingBuffer中的Event数据对象。
另外在构造Disruptor的时候,在3.3.6之前使用的是API:
到了3.3.6这些API都不推荐使用了,即不再推荐传入Executor这样的线程池,而是推荐传入ThreadFactory线程工厂。这样的话,关闭disruptor就会自动关闭Executor线程池,而不需要像以前那样必须在关闭disruptor的时候再关闭线程池了。
构造Disruptor时,需要注意ProducerType(SINGLE or MULTI 指示是单个生产者还是多个生产者模式)、WaitStrategy(策略选择,决定了消费者如何等待生产者)。
单独使用RingBuffer:WorkerPool
如果场景比较简单,我们完全可以不用创建Disruptor,而是仅仅使用RingBuffer功能。
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(3);
RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.SINGLE,new OrderFactory(),1024,new YieldingWaitStrategy());
WorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer,ringBuffer.newBarrier(),new IgnoreExceptionHandler(),new OrderHandler());
workerPool.start(executor);
//-------------生产数据
for(int i = 0 ; i < 30 ; i++){
long sequence = ringBuffer.next();
Order order = ringBuffer.get(sequence);
order.setId(i);
ringBuffer.publish(sequence);
System.out.println(Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i);
}
Thread.sleep(1000);
workerPool.halt();
executor.shutdown();
}
实际上是利用WorkerPool辅助连接消费者。
一个生产者+多个消费者
public static void main(String[] args) throws InterruptedException {
//创建订单工厂
OrderFactory orderFactory = new OrderFactory();
//ringbuffer的大小
int RINGBUFFER_SIZE = 1024;
//创建disruptor
Disruptor<Order> disruptor = new Disruptor<Order>(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());
//设置事件处理器 即消费者
EventHandlerGroup<Order> eventHandlerGroup = disruptor.handleEventsWith(new OrderHandler(),new OrderHandler2());
eventHandlerGroup.then(new OrderHandler3());
disruptor.start();
RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
//-------------生产数据
for(int i = 0 ; i < 3 ; i++){
long sequence = ringBuffer.next();
Order order = ringBuffer.get(sequence);
order.setId(i);
ringBuffer.publish(sequence);
System.out.println(Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i);
}
Thread.sleep(1000);
disruptor.shutdown();
}
运行结果: