源码分析:Exchanger之数据交换器

Exchanger是Java5 开始引入的一个类,它允许两个线程之间交换持有的数据。当Exchanger在一个线程中调用exchange方法之后,会阻塞等待另一个线程调用同样的exchange方法,然后以线程安全的方式交换数据,之后线程继续执行。

官方示例

在JDK的源码注释中,提供了一个简单的示例demo,稍加修改后就可以运行

public class FillAndEmpty { Exchanger<Integer> exchanger = new Exchanger<Integer>(); Integer initialEmptyBuffer = 1; Integer initialFullBuffer = 2; class FillingLoop implements Runnable { public void run() { Integer currentBuffer = initialEmptyBuffer; try { while (currentBuffer != 2) { currentBuffer = exchanger.exchange(currentBuffer); } System.out.println("FillingLoop:"+currentBuffer); } catch (InterruptedException ex) { } } } class EmptyingLoop implements Runnable { public void run() { Integer currentBuffer = initialFullBuffer; try { while (currentBuffer != 1) { currentBuffer = exchanger.exchange(currentBuffer); } System.out.println("EmptyingLoop:"+currentBuffer); } catch (InterruptedException ex) { } } } void start() { new Thread(new FillingLoop()).start(); new Thread(new EmptyingLoop()).start(); } public static void main(String[] args){ FillAndEmpty f = new FillAndEmpty(); f.start(); } } 源码分析 内部类

Exchanger 中定义了两个内部类:Node、Participant

// 使用 @sun.misc.Contended 注解避免出现伪共享 @sun.misc.Contended static final class Node { int index; // Arena 中的索引 int bound; // Exchanger.bound的最后记录值 int collides; // 当前 bound 的CAS 失败数 int hash; // Pseudo-random for spins Object item; // 线程的当前数据项 volatile Object match; // 由释放线程提供的项目 volatile Thread parked; // 当阻塞(parked)时,设置此线程,否则为null } /** 继承了ThreadLocal,并初始化了Node对象 */ static final class Participant extends ThreadLocal<Node> { public Node initialValue() { return new Node(); } } 重要的属性 /** 每个线程的状态 */ private final Participant participant; /** 消除数组;在启用(在slotExchange中)之前为空。元素访问使用volatile get和CAS */ private volatile Node[] arena; /** 在检测到争用之前一直使用的插槽,可以理解为先到的线程的数据项 */ private volatile Node slot; /** 每次更新时,将最大有效竞技场位置的索引与高位SEQ号进行“或”运算。 */ private volatile int bound; exchange()方法

等待另一个线程到达交换点(除非当前线程被中断),然后将给定的对象传递给它,作为回报接收另一个的对象。

public V exchange(V x) throws InterruptedException { // 交换后的对象v Object v; // item 为交换出去的对象,如果为null则换成NULL_ITEM对象 Object item = (x == null) ? NULL_ITEM : x; // translate null args // 1.1构造方法没有初始化arena,所以第一个进来的线程看见的arena肯定为null // 1.2第一个进来的线程继续调用slotExchange(item, false, 0L)方法 if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && // 2.1 Thread.interrupted(): 检测线程是否有被中断 // 2.2 arenaExchange(item, false, 0L):slotExchange方法 返回了null时会进入到这个方法 ((Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); return (v == NULL_ITEM) ? null : (V)v; }

arenaExchange()方法总结:

调用exchange方法的线程等待另一个线程到达交换点完成交换数据

如果交换的数据为null,会被转换成一个NULL_ITEM 的Object对象作为转换的数据项

构造方法未初始化arena对象,所以会先调用slotExchange方法借用slot插槽来交换对象

如果slotExchange方法成功返回了另一个交换到的对象,则直接返回交换到的数据项

如果slotExchange方法成功返回了null,会继续调用arenaExchange方法完成数据交换并返回

slotExchange()方法 /** * item:要交换的项目 * timed:是否有设置超时 * ns: 设置的超时时间 * return: 返回另一个线程的数据项;如果启用arena或线程在完成之前被中断,则为null;如果超时,则为TIMED_OUT */ private final Object slotExchange(Object item, boolean timed, long ns) { // 获取当前线程node节点对象 Node p = participant.get(); Thread t = Thread.currentThread(); // 当前线程 if (t.isInterrupted()) // preserve interrupt status so caller can recheck return null; // 自旋 for (Node q;;) { if ((q = slot) != null) { // 两个线程先到的线程,slot肯定为null,一般后到的线程会进入到这个if分支 // 如果在当前线程之前已经有线程调用了exchange方法,slot就肯定不为null,条件成立 if (U.compareAndSwapObject(this, SLOT, q, null)) {// 后来的线程会调用CAS吧slot再置为null // q.item 是较早的线程的数据项 Object v = q.item; // item 是当前线程的数据项;by: https://jinglingwang.cn q.match = item; // 之前阻塞(park)的线程 Thread w = q.parked; if (w != null) //可能另一个线程还在自旋,没有阻塞,所以这里可能会为null // 唤醒之前被阻塞的线程 U.unpark(w); // 返回之前的线程的数据项 return v; } // create arena on contention, but continue until slot null // 上面CAS修改slot失败后,会进入到这里;https://jinglingwang.cn // SEQ = MMASK + 1 = 256 if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ)) // if条件成立,初始化arena数组 // 我8核的CPU,计算的length是 (4+2) << 7 == 768 arena = new Node[(FULL + 2) << ASHIFT]; } else if (arena != null) // 如果上面的if条件成立并且初始化了arena数组,会进入到arenaExchange方法 return null; // caller must reroute to arenaExchange else { p.item = item; // p节点的item设置为当前项item if (U.compareAndSwapObject(this, SLOT, null, p)) // CAS 修改slot的值,修改成功退出自旋 break; p.item = null; //CAS 修改失败没有退出自旋,重置p节点的item为null } } // 理论上第一个先到的线程会进入到下面,会阻塞自己,等待另一个线程的数据项到来 // await release int h = p.hash; long end = timed ? System.nanoTime() + ns : 0L; // 超时时间 // 根据CPU的核数确定自旋的次数1024 or 1 int spins = (NCPU > 1) ? SPINS : 1; Object v; while ((v = p.match) == null) { // 先到的线程 p.match 可能会为null,下面开始自旋等待另一个线程交换的数据设置到match if (spins > 0) { **// 至少先自旋 1024 次,等待match数据项,自旋后才阻塞自己** h ^= h << 1; h ^= h >>> 3; h ^= h << 10; if (h == 0) h = SPINS | (int)t.getId(); // 重新计算hash else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) // 减少自旋次数 Thread.yield(); // 让出CPU的使用权 } else if (slot != p) // 上面自旋次数已经减到0了,并且slot != p,没有冲突的话理论上slot 应该是等于 p 的 spins = SPINS; // 重置自旋次数 else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); p.parked = t; if (slot == p) U.park(false, ns); // 调用底层阻塞最早的线程 // 线程被唤醒了,回到上面再次判断while自旋,p.match理论上不会是null了,p.match是后到的线程的数据项,是需要返回给当前线程的项 p.parked = null; U.putObject(t, BLOCKER, null); } else if (U.compareAndSwapObject(this, SLOT, p, null)) { // 如果线程阻塞超时了,还是没等待要交换的数据项,会进入到这里,返回一个TIMED_OUT 对象或null v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; break; } } // 将 当前线程p 的 match 属性设置成 null U.putOrderedObject(p, MATCH, null); p.item = null; p.hash = h; // 返回匹配后的数据项v return v; }

slotExchange()方法总结:

线程进入该方法后,会先拿到[Exchanger](https://jinglingwang.cn)的Participant,也就是Node数据节点p;

检查线程的状态,是否有被中断,如果是返回null,会进入到下面的arenaExchange方法逻辑

先调用slotExchange()方法的线程会使用CAS的方式线程安全的占用slot插槽

然后会自旋至少1024次并不断让出CPU使用权,期间如果成功等待到了另外的线程的数据项(p.match != null),则直接返回交换到的数据(v = p.match)

如果自旋后没有等到交换的数据项,调用U.park阻塞当前线程,等待另一个线程的到来将其唤醒或者超时

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgsjs.html