线程1在成功竞争CAS前,线程2将尾结点的next从指向线程1的结点改为指向线程2的结点,哪怕线程1执行CAS成功,队列的结构已经被破坏了。最好的办法是在修改尾结点的next指针前判断其是否已经指向其他结点,因而对next指针的修改必须原子性的进行。当然最简单省事的方法就是使用AtomicReference即原子引用,然而正如在自己动手构建无锁的并发容器(栈和队列)第3节中指出的,原子引用逻辑上可以当成个特殊的引用,但其本质上是个对象。Node结点在队列使用时会被大量创建,其内部的AtomicReference对象也将被大量创建,这在高并发环境下无疑会一定程度影响性能。真的需要为原子的更新一个成员变量而每次创建一个对象吗?其实没这个必要。所有的原子变量,其CAS算法底层都是通过sun.misc.Unsafe类中提供的本地方法实现的,尽管其被设计成不能直接被开发者使用,但是通过反射我们还是能够轻松绕过这层限制。Node类的定义中通过静态代码块在该类被加载时以反射的方式获得了Unsafe对象,并计算出了要进行原子更新的next指针域在Node对象中的相对偏移量,基于此可以实现对结点的next指针进行原子更新。这部分逻辑在casNext方法中定义,如下:
private boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } 2.3 队列的内部成员变量队列的成员变量定义如下
//不带任何信息的哨兵结点 Node<E> sentinel = new Node<>(null); //头指针指向哨兵结点 Node<E> head = sentinel; //尾指针,原子引用 AtomicReference<Node<E>> tail = new AtomicReference<>(sentinel);在创建队列的时候就会初始化一个sentinel表示的哨兵结点。头指针head从始至终都指向该哨兵结点,因不会被修改,故不需要用volatile修饰。尾指针使用AtomicReference保证原子的修改。队列的结构如下图所示
3. 基于CAS算法和双向链表构建无锁的并发队列 3.1 出队方法 /** * 将队列首元素从队列中移除并返回该元素,若队列为空则返回null * * @return */ @Override public E dequeue() { for (; ; ) { //队列首元素结点(哨兵结点之后的结点) Node<E> headed = head.next; //队列尾结点 Node<E> tailed = tail.get(); if (headed == null) { //队列为空,返回null return null; } else if (headed == tailed) { //队列中只含一个元素 //CAS方式修改哨兵结点的next指针指向null if (sentinel.casNext(headed, null)) { //cas方式修改尾指针指向哨兵结点,失败也没关系 tail.compareAndSet(tailed, sentinel); return headed.item; } } else if (sentinel.casNext(headed, headed.next)) { //队列中元素个数大于1 return headed.item; } } }出队逻辑相对简单,只需要分三种情况讨论即可
1.队列为空,直接返回null
2.队列中元素结点个数(不包括哨兵结点)大于1,只需要修改哨兵结点的next指针指向原队列首元素结点的下一个结点。
3.队列中只有1个元素结点(不包括哨兵结点),需要同时更新尾指针和哨兵结点的next指针(头指针head自始至终都指向哨兵结点,队列首元素结点由哨兵结点的next指针指向)。
这里分为了两步,首先以CAS方式更新哨兵结点的next指针指向null,如果执行成功再以CAS方式修改tail指针指向哨兵结点。当然这里第二部修改tail指针的操作可能会失败,也可能还没来得及执行就失去cpu执行权导致队列处于一种混乱状态。比如下面这种情形:
只考虑出队情形,出队时一般只需要修改哨兵结点的next指针即可,只有在队列含元素的结点个数恰好为1时才需要更新tail指针,即使该操作没有执行或执行失败,如上图所示,只要哨兵结点的next指针为null,已经可以表示队列为空,之后的出队操作将返回null。所以仅就出队情形而言,tail指针是否更新成功对出队的结果没有任何影响。对于入队的情形,只要在新结点入队之前能够正确判断出队列此时正处于这种中间状态,并对其进行修复就行。
3.2 入队方法 /** * 将元素加入队列尾部 * @param e 要入队的元素 * @return true:入队成功 false:入队失败 */ @Override public boolean enqueue(E e) { //构造新结点 Node<E> newNode = new Node<>(e); //死循环,保证入队 for (; ; ) { //当前尾结点 Node<E> tailed = tail.get(); //当前尾结点的下一个结点 Node<E> tailedNext = tailed.next; //判断队列此时正处于出队操作导致的中间状态 if (sentinel.next == null && tailed != sentinel) { //CAS方式使尾指针指向哨兵结点,失败也没关系 tail.compareAndSet(tailed, sentinel); } else if (tailed == tail.get()) { //尾指针尚未改变,即没有其他线程将结点插入队列 //其他线程正在执行入队,此时队列处于中间状态 if (tailedNext != null) { //替其他线程完成更新尾指针的操作 tail.compareAndSet(tailed, tailedNext); } else if (tailed.casNext(null, newNode)) { //没有其他线程正在执行入队,CAS更新尾原结点的next指针指向新结点 //CAS更新尾指针,失败也没关系 tail.compareAndSet(tailed, newNode); return true; } } } }