源码分析:Exchanger之数据交换器 (2)

另一个线程进入slotExchange()方法后,发现slot插槽已经被占用(已经有线程在等它交换数据了),取出slot插槽中的item数据(第一个线程的数据),并设置自己的数据到插槽的match项,然后唤醒另一个线程,成功换反交换到的数据。

被唤醒的线程成功获得match数据,并返回交换后的match数据

slotExchange方法返回null的2种情况:

线程被中断,会返回null

设置了超时时间,并且时间超时,会返回TIMED_OUT

第一个线程超时了,把slot从p置为null的同事第二个线程刚好调用CAS也在把slot从q修改为null,这时候第二个线程会修改失败,然后就会去初始化arena数组,然后第二个线程就可能返回null

arenaExchange()方法

从exchange()方法实现中可以看到,只有当slotExchange()方法返回null之后才会执行到arenaExchange()方法,而线程中断的情况是不会进入到该方法的,所以只有另一种情况,但是要进入的几率太小了,断点调试的话难以构造这种情况。

private final Object arenaExchange(Object item, boolean timed, long ns) { // 实质上就是个Node数组 Node[] a = arena; // 获取当前线程node节点对象 Node p = participant.get(); // p.index 访问插槽的索引位置,初始值为0 for (int i = p.index;;) { // access slot at i // j是原始数组偏移量 https://jinglingwang.cn int b, m, c; long j; // j is raw array offset // ABASE:返回Node数组中第一个元素的偏移地址+128; i << ASHIFT : i<<7 // getObjectVolatile:获取obj对象中offset偏移地址对应的object型field的值,支持volatile load语义 // q节点就是通过CAS获取arena数组偏移(i + 1) * 128个地址位上的node Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); // 如果获取到的节点不为空,并且再次吧j位置的q元素置为null if (q != null && U.compareAndSwapObject(a, j, q, null)) { // 整个条件成立,代表线程获得了交换的数据 Object v = q.item; // release q.match = item; Thread w = q.parked; if (w != null) // 有阻塞的线程就唤醒 U.unpark(w); return v; // 返回交换的数据 } else if (i <= (m = (b = bound) & MMASK) && q == null) { // i 没有越界,并且q==null // 把当前线程的数据赋予给p节点的item p.item = item; // offer if (U.compareAndSwapObject(a, j, null, p)) { // 再使用CAS的方式把p节点安全的放入到数组的j位置上 // CAS 修改成功 // 计算超时时间 long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; Thread t = Thread.currentThread(); // wait 当前线程 // 自旋 1024 for (int h = p.hash, spins = SPINS;;) { Object v = p.match; //交换的数据 if (v != null) { // 交换的数据不为null,说明有其他线程把交换的数据送进来了 U.putOrderedObject(p, MATCH, null); // 将match和item置为null p.item = null; // clear for next use p.hash = h; return v;// 返回数据 } else if (spins > 0) { // 异或移位 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift if (h == 0) // initialize hash 初始化hash h = SPINS | (int)t.getId(); else if (h < 0 && // approx 50% true (--spins & ((SPINS >>> 1) - 1)) == 0) // 减少自旋次数 Thread.yield(); // two yields per wait 让出CPU使用权 } else if (U.getObjectVolatile(a, j) != p) // 和slotExchange方法中的类似 // 重置自旋次数 spins = SPINS; // releaser hasn't set match yet else if (!t.isInterrupted() && m == 0 && (!timed || // 超时时间设置 (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); // emulate LockSupport p.parked = t; // minimize window if (U.getObjectVolatile(a, j) == p) U.park(false, ns); // 阻塞当前线程,等待被唤醒 p.parked = null; // 线程被唤醒了 U.putObject(t, BLOCKER, null); } else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) { // m会跟着bound变化,初始会是0 if (m != 0) // try to shrink U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); // 修改b p.item = null; p.hash = h; // i = p.index无符号右移1位 i = p.index >>>= 1; // descend if (Thread.interrupted()) //线程被中断 return null; if (timed && m == 0 && ns <= 0L) // 超时,返回TIME_OUT return TIMED_OUT; break; // expired; restart } } } else // 使用CAS的方式把p节点安全的放入到数组的j位置上失败(可能有其他线程已经捷足先登),重置p节点的item p.item = null; // clear offer } else { // 上面两个if条件都没成立:比如q!=null,compareAndSwapObject失败,数组未越界 if (p.bound != b) { // stale; reset p.bound = b; // b变化了,重置bond p.collides = 0; // 当前 bound 的CAS 失败数 i = (i != m || m == 0) ? m : m - 1; // 确定索引i } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { p.collides = c + 1; // bound 的CAS 失败数+1 // 确定循环遍历i,继续回到上面最初的地方自旋 i = (i == 0) ? m : i - 1; // cyclically traverse } else // 此时表示bound值增加了SEQ+1 i = m + 1; // grow p.index = i; // 设置下标,继续自旋 } } } Exchanger总结:

Exchanger 可以以线程安全的方式完成两个线程之间数据的交换工作

By:

Exchanger 主要是使用了自旋和CAS来保证数据的原子性

一般情况下,slotExchange()方法即可完成数据交换的工作

JDK8 版本的Exchanger 使用了 @sun.misc.Contended注解来避免伪共享

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgsjs.html