前言
网站的建设成都创新互联专注网站定制,经验丰富,不做模板,主营网站定制开发.小程序定制开发,H5页面制作!给你焕然一新的设计体验!已为成都展览展示等企业提供专业服务。
在JUC包中,除了一些常用的或者说常见的并发工具类(ReentrantLock,CountDownLatch,CyclicBarrier,Semaphore)等,还有一个不常用的线程同步器类 —— Exchanger。
Exchanger是适用在两个线程之间数据交换的并发工具类,它的作用是找到一个同步点,当两个线程都执行到了同步点(exchange方法)之后(有一个没有执行到就一直等待,也可以设置等待超时时间),就将自身线程的数据与对方交换。
Exchanger 是什么?
它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这个两个线程通过exchange方法交换数据,如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。因此使用Exchanger的中断时成对的线程使用exchange()方法,当有一对线程到达了同步点,就会进行交换数据,因此该工具类的线程对象是成对的。
线程可以在成对内配对和交换元素的同步点。每个线程在输入exchange方法时提供一些对象,与合作者线程匹配,并在返回时接收其合作伙伴的对象。交换器可以被视为一个的双向形式的SynchroniuzedQueue。交换器在诸如遗传算法和管道设计的应用中可能是有用的。
一个用于两个工作线程之间交换数据的封装工具类,简单说就是一个线程在完成一定事务后想与另一个线程交换数据,则第一个先拿出数据的线程会一直等待第二个线程,直到第二个线程拿着数据到来时才能彼此交换对应数据。
Exchanger 用法
应用场景
Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出2个交配结果。
Exchanger也可以用于校对工作。比如我们需要将纸制银流通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对这两个Excel数据进行校对,看看是否录入的一致
Exchanger的典型应用场景是:一个任务在创建对象,而这些对象的生产代价很高,另一个任务在消费这些对象。通过这种方式,可以有更多的对象在被创建的同时被消费。
案例说明
Exchanger 用于两个线程间交换数据,当然实际参与的线程可以不止两个,测试用例如下:
- private static void test1() throws InterruptedException {
- Exchanger
exchanger = new Exchanger<>(); - CountDownLatch countDownLatch = new CountDownLatch(5);
- for (int i = 0; i < 5; i++) {
- new Thread(() -> {
- try {
- String origMsg = RandomStringUtils.randomNumeric(6);
- // 先到达的线程会在此等待,直到有一个线程跟它交换数据或者等待超时
- String exchangeMsg = exchanger.exchange(origMsg,5, TimeUnit.SECONDS);
- System.out.println(Thread.currentThread().getName() + "\t origMsg:" + origMsg + "\t exchangeMsg:" + exchangeMsg);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }finally {
- countDownLatch.countDown();
- }
- },String.valueOf(i)).start();
- }
- countDownLatch.await();
- }
第5个线程因为没有匹配的线程而等待超时,输出如下:
- 0 origMsg:524053 exchangeMsg:098544
- 3 origMsg:433246 exchangeMsg:956604
- 4 origMsg:098544 exchangeMsg:524053
- 1 origMsg:956604 exchangeMsg:433246
- java.util.concurrent.TimeoutException
- at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
- at com.nuih.juc.ExchangerDemo.lambda$test1$0(ExchangerDemo.java:37)
- at java.lang.Thread.run(Thread.java:748)
上述测试用例是比较简单,可以模拟消息消费的场景来观察Exchanger的行为,测试用例如下:
- private static void test2() throws InterruptedException {
- Exchanger
exchanger = new Exchanger<>(); - CountDownLatch countDownLatch = new CountDownLatch(4);
- CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
- // 生产者
- Runnable producer = new Runnable() {
- @Override
- public void run() {
- try{
- cyclicBarrier.await();
- for (int i = 0; i < 5; i++) {
- String msg = RandomStringUtils.randomNumeric(6);
- exchanger.exchange(msg,5,TimeUnit.SECONDS);
- System.out.println(Thread.currentThread().getName() + "\t producer msg -> " + msg + " ,\t i -> " + i);
- }
- }catch (Exception e){
- e.printStackTrace();
- }finally {
- countDownLatch.countDown();
- }
- }
- };
- // 消费者
- Runnable consumer = new Runnable() {
- @Override
- public void run() {
- try{
- cyclicBarrier.await();
- for (int i = 0; i < 5; i++) {
- String msg = exchanger.exchange(null,5,TimeUnit.SECONDS);
- System.out.println(Thread.currentThread().getName() + "\t consumer msg -> " + msg + ",\t" + i);
- }
- }catch (Exception e){
- e.printStackTrace();
- }finally {
- countDownLatch.countDown();
- }
- }
- };
- for (int i = 0; i < 2; i++){
- new Thread(producer).start();
- new Thread(consumer).start();
- }
- countDownLatch.await();
- }
输出如下,上面生产者和消费者线程数是一样的,循环次数也是一样的,但是还是出现等待超时的情形:
- Thread-3 consumer msg -> null, 0
- Thread-1 consumer msg -> null, 0
- Thread-1 consumer msg -> null, 1
- Thread-2 producer msg -> 640010 , i -> 0
- Thread-2 producer msg -> 733133 , i -> 1
- Thread-3 consumer msg -> null, 1
- Thread-3 consumer msg -> 476520, 2
- Thread-1 consumer msg -> 640010, 2
- Thread-1 consumer msg -> null, 3
- Thread-0 producer msg -> 993414 , i -> 0
- Thread-0 producer msg -> 292745 , i -> 1
- Thread-2 producer msg -> 476520 , i -> 2
- Thread-2 producer msg -> 408446 , i -> 3
- Thread-3 consumer msg -> null, 3
- Thread-1 consumer msg -> 292745, 4
- Thread-2 producer msg -> 251971 , i -> 4
- Thread-0 producer msg -> 078939 , i -> 2
- Thread-3 consumer msg -> 251971, 4
- java.util.concurrent.TimeoutException
- at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
- at com.nuih.juc.ExchangerDemo$1.run(ExchangerDemo.java:70)
- at java.lang.Thread.run(Thread.java:748)
- Process finished with exit code 0
这种等待超时是概率出现的,这是为啥?
因为系统调度的不均衡和Exchanger底层的大量自旋等待导致这4个线程并不是调用exchanger成功的次数并不一致。另外从输出可以看出,消费者线程并没有像我们想的那样跟生产者线程一一匹配,生产者线程有时也充当来消费者线程,这是为啥?因为Exchanger匹配时完全不关注这个线程的角色,两个线程之间的匹配完全由调度决定的,即CPU同时执行来或者紧挨着执行来两个线程,这两个线程就匹配成功来。
源码分析
Exchanger 类图
其内部主要变量和方法如下:
成员属性
- // ThreadLocal变量,每个线程都有之间的一个副本
- private final Participant participant;
- // 高并发下使用的,保存待匹配的Node实例
- private volatile Node[] arena;
- // 低并发下,arena未初始化时使用的保存待匹配的Node实例
- private volatile Node slot;
- // 初始值为0,当创建arena后被负责SEQ,用来记录arena数组的可用最大索引,
- // 会随着并发的增大而增大直到等于最大值FULL,
- // 会随着并行的线程逐一匹配成功而减少恢复成初始值
- private volatile int bound;
还有多个表示字段偏移量的静态属性,通过static代码块初始化,如下:
- // Unsafe mechanics
- private static final sun.misc.Unsafe U;
- private static final long BOUND;
- private static final long SLOT;
- private static final long MATCH;
- private static final long BLOCKER;
- private static final int ABASE;
- static {
- int s;
- try {
- U = sun.misc.Unsafe.getUnsafe();
- Class> ek = Exchanger.class;
- Class> nk = Node.class;
- Class> ak = Node[].class;
- Class> tk = Thread.class;
- BOUND = U.objectFieldOffset
- (ek.getDeclaredField("bound"));
- SLOT = U.objectFieldOffset
- (ek.getDeclaredField("slot"));
- MATCH = U.objectFieldOffset
- (nk.getDeclaredField("match"));
- BLOCKER = U.objectFieldOffset
- (tk.getDeclaredField("parkBlocker"));
- s = U.arrayIndexScale(ak);
- // ABASE absorbs padding in front of element 0
- ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);
- } catch (Exception e) {
- throw new Error(e);
- }
- if ((s & (s-1)) != 0 || s > (1 << ASHIFT))
- throw new Error("Unsupported array scale");
- }
Exchanger 定义来多个静态变量,如下:
- // 初始化arena时使用, 1 << ASHIFT 是一个缓存行的大小,避免来不同的Node落入到同一个高速缓存行
- // 这里实际是把数组容量扩大来8倍,原来索引相邻的两个元素,扩容后中间隔来7个元素,从元素的起始地址上看就隔来8个元素,中间的7个都是空的,为来避免原来相邻的两个元素都落入到同一个缓存行中
- // 因为arena是对象数组,一个元素占8字节,8个就是64字节
- private static final int ASHIFT = 7;
- // arena 数组元素的索引最大值即255
- private static final int MMASK = 0xff;
- // arena 数组的最大长度即256
- private static final int SEQ = MMASK + 1;
- // 获取CPU核数
- private static final int NCPU = Runtime.getRuntime().availableProcessors();
- // 实际的数组长度,因为是线程两两配对的,所以最大长度是核数除以2
- static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
- // 自旋等待的次数
- private static final int SPINS = 1 << 10;
- // 如果交换的对象是null,则返回此对象
- private static final Object NULL_ITEM = new Object();
- // 如果等待超时导致交换失败,则返回此对象
- private static final Object TIMED_OUT = new Object();
内部类
Exchanger类中有两个内部类,一个Node,一个Participant。 Participant继承了ThreadLocal并且重写了其initialValue方法,返回一个Node对象。其定义如下:
- @sun.misc.Contended static final class Node {
- int index; // Arena index
- int bound; // Last recorded value of Exchanger.bound
- int collides; // Number of CAS failures at current bound
- int hash; // Pseudo-random for spins
- Object item; // This thread's current item
- volatile Object match; // Item provided by releasing thread
- volatile Thread parked; // Set to this thread when parked, else null
- }
- /** The corresponding thread local class */
- static final class Participant extends ThreadLocal
{ - public Node initialValue() { return new Node(); }
- }
其中Contended注解是为了避免高速缓存行导致的伪共享问题
重要方法
exchange()方法
- @SuppressWarnings("unchecked")
- public V exchange(V x) throws InterruptedException {
- Object v;
- Object item = (x == null) ? NULL_ITEM : x; // translate null args
- if ((arena != null || // 是null就执行后面的方法
- (v = slotExchange(item, false, 0L)) == null) &&
- // 如果执行slotExchange有结果就执行后面的,否则返回
- ((Thread.interrupted() || // 非中断则执行后面的方法
- (v = arenaExchange(item, false, 0L)) == null)))
- throw new InterruptedException();
- return (v == NULL_ITEM) ? null : (V)v;
- }
exchange 方法的执行步骤:
slotExchange()方法
slotExchange 是基于slot属性来完成交换的,调用soltExchange方法时,如果slot属性为null,当前线程会将slot属性由null修改成当前线程的Node,如果修改失败则下一次for循环走solt属性不为null的逻辑,如果修改成功则自旋等待,自旋一定次数后通过Unsafe的park方法当当前线程休眠,可以指定休眠的时间,如果没有指定则无限期休眠直到被唤醒;无论是因为线程中断被唤醒,等待超时被唤醒还是其它线程unpark唤醒的,都会检查当前线程的Node的属性释放为null,如果不为null说明交互成功,返回该对象;否则返回null或者TIME_OUT,在返回前会将item,match等属性置为null,保存之前自旋时计算的hash值,方便下一次调用slotExchange。
调用slotExchange方法时,如果slot属性不为null,则当前线程会尝试将其修改null,如果cas修改成功,表示当前线程与slot属性对应的线程匹配成功,会获取slot属性对应Node的item属性,将当前线程交换的对象保存到slot属性对应的Node的match属性,然后唤醒获取slot属性对应Node的waiter属性,即处理休眠状态的线程,至此交换完成,同样的在返回前需要将item,match等属性置为null,保存之前自旋时计算的hash置,方便下一次调用slotExchange;如果cas修改slot属性失败,说明有其它线程也在抢占slot,则初始化arena属性,下一次for循环因为arena属性不为null,直接返回null,从而通过arenaExchange完成交换。
- // arena 为null是会调用此方法,返回null表示交换失败
- // item是交换的对象,timed表示是否等待指定的时间,为false表示无限期等待,ns为等待时间
- private final Object slotExchange(Object item, boolean timed, long ns) {
- // 获取当前线程关联的participant Node
- Node p = participant.get();
- Thread t = Thread.currentThread();
- // 被中断,返回null
- if (t.isInterrupted()) // preserve interrupt status so caller can recheck
- return null;
- for (Node q;;) {
- if ((q = slot) != null) { // slot 不为null
- // 将slot置为null,slot对应的线程与当前线程匹配成功
- if (U.compareAndSwapObject(this, SLOT, q, null)) {
- Object v = q.item;
- // 保存item,即完成交互
- q.match = item;
- // 唤醒q对应的处于休眠状态的线程
- Thread w = q.parked;
- if (w != null)
- U.unpark(w);
- return v;
- }
- // slot修改失败,其它某个线程抢占来该slot,多个线程同时调用exchange方法会触发此逻辑
- // bound等于0表示未初始化,此处校验避免重复初始化
- if (NCPU > 1 && bound == 0 &&
- U.compareAndSwapInt(this, BOUND, 0, SEQ))
- arena = new Node[(FULL + 2) << ASHIFT];
- }
- else if (arena != null)
- return null; // carena不为null,通过arenaExchange交互
- else {
- // slot和arena都为null
- p.item = item;
- // 修改slot为p,修改成功则终止循环
- if (U.compareAndSwapObject(this, SLOT, null, p))
- break;
- // 修改失败则继续for循环,将otem恢复成null
- p.item = null;
- }
- }
- // 将slot修改为p后会进入此分支
- int h = p.hash; // hash初始为0
- long end = timed ? System.nanoTime() + ns : 0L;
- int spins = (NCPU > 1) ? SPINS : 1;
- Object v;
- // match保存着同其他线程交换的对象,如果不为null,说明交换成功了
- while ((v = p.match) == null) {
- // 执行自旋等待
- if (spins > 0) {
- h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
- if (h == 0)
- h = SPINS | (int)t.getId(); 初始化h
- // 只有生成的h小于0时才减少spins
- else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
- Thread.yield();
- }
- // slot被修改了,已经有匹配的线程,重新自旋,读取属性,因为是先修改slot再修改属性的,两者因为CPU调度的问题可能有时间差
- else if (slot != p)
- spins = SPINS;
- // 线程没有被中断且arena为null
- else if (!t.isInterrupted() && arena == null &&
- (!timed || (ns = end - System.nanoTime()) > 0L)) {
- U.putObject(t, BLOCKER, this);
- p.parked = t;
- if (slot == p)
- U.park(false, ns);
- // 线程被唤醒,继续下一次for循环
- // 如果是因为等待超时而被唤醒,下次for循环进入下没的else if分支,返回TIMED_OUT
- p.parked = null;
- U.putObject(t, BLOCKER, null);
- }
- // 将slot修改成p
- else if (U.compareAndSwapObject(this, SLOT, p, null)) {
- // timed为flase,无限期等待,因为中断被唤醒返回null
- // timed为ture,因为超时被唤醒,返回TIMED_OUT,因为中断被唤醒返回null
- v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
- break;
- }
- }
- // 修改match为null,item为null,保存h,下一次exchange是h就不是初始值为0了
- U.putOrderedObject(p, MATCH, null);
- // 重置 item
- p.item = null;
- // 保留伪随机数,供下次种子数字
- p.hash = h;
- // 返回
- return v;
- }
总结一下上面执行的逻辑:
在该方法中,会返回 2 种结果,一是有效的 item,二是 null 要么是线程竞争使用 slot 了,创建了 arena 数组,要么是线程中断了。
通过一副图来看看具体逻辑
arenaExchange() 方法
arenaExchange是基于arena属性完成交换的,整体逻辑比较复杂,有以下几个要点:
- // 抢占slot失败后进入此方法,arena不为空
- private final Object arenaExchange(Object item, boolean timed, long ns) {
- Node[] a = arena;
- Node p = participant.get();
- // index初始为0
- for (int i = p.index;;) { // access slot at i
- int b, m, c; long j; // j is raw array offset
- // 在创建arena时,将本来的数组容量 << ASHIFT,为了避免数组元素落到了同一个高速缓存行
- // 这里获取真实的数组元素索引时也需要 << ASHIFR
- Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
- // 如果q不为null,则将对应的数组元素置为null,表示当前线程和该元素对应的线程匹配l
- if (q != null && U.compareAndSwapObject(a, j, q, null)) {
- Object v = q.item; // release
- q.match = item; // 保存item,交互成功
- Thread w = q.parked;
- if (w != null) // 唤醒等待的线程
- U.unpark(w);
- return v;
- }
- // q为null 或者q不为null,cas抢占q失败了
- // bound初始化时时SEQ,SEQ & MMASK 就是0,即m的初始值就0,m为0时,i肯定为0
- else if (i <= (m = (b = bound) & MMASK) && q == null) {
- p.item = item; // offer
- if (U.compareAndSwapObject(a, j, null, p)) {
- long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
- Thread t = Thread.currentThread(); // wait
- for (int h = p.hash, spins = SPINS;;) {
- Object v = p.match;
- if (v != null) {
- U.putOrderedObject(p, MATCH, null);
- p.item = null; // clear for next use
- p.hash = h;
- return v;
- }
- else if (spins > 0) {
- h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
- if (h == 0) // initialize hash
- h = SPINS | (int)t.getId();
- else if (h < 0 && // approx 50% true
- (--spins & ((SPINS >>> 1) - 1)) == 0)
- Thread.yield(); // two yields per wait
- }
- else if (U.getObjectVolatile(a, j) != p)
- spins = SPINS; // releaser hasn't set match yet
- else if (!t.isInterrupted() && m == 0 &&
- (!timed ||
- (ns = end - System.nanoTime()) > 0L)) {
- U.putObject(t, BLOCKER, this); // emulate LockSupport
- &n
新闻名称:并发编程之Exchanger原理与使用
文章转载:http://www.shufengxianlan.com/qtweb/news46/534896.html网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联