Java并发(一)——线程安全的容器(上) (3)

putFirst不支持插入null元素,首先新建一个Node对象,然后调用ReentrantLock的lock方法获取锁,插入操作通过boolean linkFirst(Node<E> node)实现,如果当前队列头已满,那么该线程等待(linkFirst方法在写入元素成功后会释放该锁信号),最后,在finally块中释放锁(ReentrantLock的使用)。

public E takeFirst() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; while ( (x = unlinkFirst()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } }

与putFirst类似,takeFirst首先获取锁,然后在try中解除尾元素对象的引用,如果unlinkFirst为空,表示队列为空,没有元素可删,那么该线程等待。同样,最后在finally块中释放锁。

那么问题来了,LinkedBlockingDeque为什么不使用LinkedBlockingQueue读写锁分离的方式呢?LinkedBlockingDeque与LinkedBlockingQueue的使用场景有什么区别呢?

1.3 DelayQueue

DelayQueue主要用于实现延时任务,比如:等待一段时间之后关闭连接,缓存对象过期删除,任务超时处理等等,这些任务的共同特点是等待一段时间之后执行(类似于TimerTask)。DelayQueue的实现包括三个核心特征:

延时任务:DelayQueue的泛型类需要继承自Delayed接口,而Delayed接口继承自Comparable<Delayed>,用于队列中优先排序的比较;

优先队列:DelayQueue的实现采用了优先队列PriorityQueue,即延迟时间越短的任务越优先(回忆下优先队列中二叉堆的实现)。

阻塞队列:支持并发读写,采用ReentrantLock来实现读写的锁操作。

因此,DelayQueue = Delayed + PriorityQueue + BlockingQueue。

public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); } public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>(); private Thread leader = null; private final Condition available = lock.newCondition(); }

接下来看下DelayQueue的读写操作如何实现延时任务的?

public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }

首先执行加锁操作,然后往优先队列中插入元素e,优先队列会调用泛型E的compareTo方法进行比较(具体关于二叉堆的操作,这里不再赘述,请参考数据结构部分相关分析),将延迟时间最短的任务添加到队头。最后检查下元素是否为队头,如果是队头的话,设置leader为空,唤醒所有等待的队列,释放锁。

public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }

首先执行加锁操作,然后取出优先队列的队头,如果对头为空,则该线程阻塞;

获得对头元素的延迟时间,如果延迟时间小于等于0,说明该元素已经到了可以使用的时间,调用poll方法弹出该元素;

在延迟时间大于0时,首先释放元素first的引用(避免内存泄露),其次判断如果leader线程不为空,则该线程阻塞(表示已有线程在等待)。否则,把当前线程赋值给leader元素,然后阻塞delay的时间,即等待队头到达延迟时间,在finally块中释放leader元素的引用。循环后,取出对头元素,退出for循环。

最后,如果leader为空并且优先级队列不为空的情况下(判断还有没有其他后续节点),调用signal通知其他的线程,并执行解锁操作。

1.4 TransferQueue与LinkedTransferQueue

TransferQueue是一个继承了BlockingQueue的接口,并且增加若干新的方法。LinkedTransferQueue是TransferQueue接口的实现类,其定义为一个无界的队列,具有先进先出(FIFO)的特性。

TransferQueue接口主要包含以下方法:

public interface TransferQueue<E> extends BlockingQueue<E> { boolean tryTransfer(E e); void transfer(E e) throws InterruptedException; boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException; boolean hasWaitingConsumer(); int getWaitingConsumerCount(); }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpwpds.html