查查知识网

barrier volleyball

发布者:张同华
导读面试官: CyclicBarrier有了解过吗?说说看前言目前正在出一个Java多线程专题长期系列教程,从入门到进阶含源码解读, 篇幅会较多, 喜欢的话,给个关注?? ?Java提供了一些

面试官: CyclicBarrier有了解过吗?说说看(源码剖析)

前言

目前正在出一个Java多线程专题长期系列教程,从入门到进阶含源码解读, 篇幅会较多, 喜欢的话,给个关注?? ?

Java提供了一些非常好用的并发工具类,不需要我们重复造轮子,本节我们讲解CyclicBarrier,一起来看下吧?

CyclicBarrier

这个跟我们上节讲的CountDownLatch有点类似,从字面意思讲是相当于一个可循环的屏障,他与CountDownLatch不同的是它可以重复利用,下一步的操作以,依赖上一步是否完成,就像去银行办业务一样,排在你前面的人办好了才轮到你,我们继续通过上节的例子,来改写一下它,这里我偷个懒,实际业务中尽量用类编写,不要直接new Thread

public class CyclicBarrierTest {
    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(1);

        IntStream.range(0, 10).forEach(i -> {
            new Thread(() -> {
                try {
                    Thread.sleep(2000);
                    System.out.println(&34;worker 1------> &34; + i);
                    cyclicBarrier.await();

                    Thread.sleep(2000);
                    System.out.println(&34;worker 2------> &34; + i);
                    cyclicBarrier.await();


                    Thread.sleep(2000);
                    System.out.println(&34;worker 3------> &34; + i);
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        });

        System.out.println(&34;completed !&34;);
    }
}

实际输出:

completed !
worker 1------> 9
worker 1------> 0
worker 1------> 6
worker 1------> 7
worker 1------> 5
worker 1------> 4
worker 1------> 1
worker 1------> 3
worker 1------> 2
worker 1------> 8
worker 2------> 7
worker 2------> 6
worker 2------> 5
worker 2------> 2
worker 2------> 3
worker 2------> 1
worker 2------> 8
worker 2------> 0
worker 2------> 9
worker 2------> 4
worker 3------> 6
worker 3------> 3
worker 3------> 2
worker 3------> 5
worker 3------> 7
worker 3------> 8
worker 3------> 1
worker 3------> 0
worker 3------> 9
worker 3------> 4

可以看到在即使在多线程下,每个操作都需要上一个await任务之后执行,使用很简单,也很好理解。

知其然知其所以然 & 源码剖析

下面我们就一起探究一下,它是如何做到的?

同样的,我们先看构造函数

public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

默认barrierAction是null, 这个参数是Runnable参数,当最后线程达到的时候执行的任务,刚刚的例子中没有演示,大家可以在初始化的时候传入一个,打印一下当前的线程名称,这样理解起来比较容易点,parties int型,它的意思是参与的线程数。

我们再看它的定义, 可以看到它没有继承任何类或实现任何接口

public class CyclicBarrier { .... }

await

我们重点看下这个方法

 public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

这个方法干嘛用的呢?等到所有各方都在此屏障上调用了await 。如果当前线程不是最后到达的,则出于线程调度目的将其禁用并处于休眠状态,除了以下情况:

  • 最后一个线程到达;或者
  • 其他一些线程中断当前线程;或者
  • 其他一些线程中断了其他等待线程之一;或者
  • 其他一些线程在等待屏障时超时;或者
  • 其他一些线程在此屏障上调用reset 。

再看dowait(), 它是一个私有方法

 private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        // 全局锁        
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 每次使用屏障都会生成一个实例
            // private Generation generation = new Generation();
            final Generation g = generation;

            // broken字面意思破坏,如果被破坏了就抛异常
            if (g.broken)
                throw new BrokenBarrierException();

            // 线程中断检测
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            // 剩余的等待线程数
            int index = --count;
            // 最后线程到达时 
            if (index == 0) {  // tripped
                // 标记任务是否被执行(就是传进入的runable参数)
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    // 执行任务
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 完成后 进行下一组 初始化 generation 初始化 count 并唤醒所有等待的线程 
                    // 
                    // private void nextGeneration() {
                    //     // signal completion of last generation
                    //     trip.signalAll();
                    //     // set up next generation
                    //     count = parties;
                    //     generation = new Generation();
                    // }
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // index 不为0时 进入自旋
            for (;;) {
                try {
                    // 先判断超时 没超时就继续等着
                    if (!timed)
                        trip.await();
                        // 如果超出指定时间 调用 awaitNanos 超时了释放锁
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                        // 中断异常捕获
                } catch (InterruptedException ie) {
                    // 判断是否被破坏
                    if (g == generation && ! g.broken) {
                        //  private void breakBarrier() {
                        //     generation.broken = true;
                        //     count = parties;
                        //     trip.signalAll();
                        // }
                        breakBarrier();
                        throw ie;
                    } else {
                        // 否则的话中断当前线程
                        Thread.currentThread().interrupt();
                    }
                }

                // 被破坏抛异常
                if (g.broken)
                    throw new BrokenBarrierException();

                // 正常调用 就返回 
                if (g != generation)
                    return index;

                // 超时了而被唤醒的情况 调用 breakBarrier()
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

如果被破坏了怎么恢复呢?来看下reset, 源码很简单,break之后重新生成新的实例,对应的会重新初始化count,在dowaitindex==0也调用了nextGeneration,所以说它是可以循环利用的

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

结束语

cyclicBarrier源码相对简单一些,下节给大家讲下Phaser,它是增强版的CountDownLatch,它的实现相对更加复杂一点 ?

往期并发编程内容推荐

  • Java多线程专题之线程与进程概述
  • Java多线程专题之线程类和接口入门
  • Java多线程专题之进阶学习Thread(含源码分析)
  • Java多线程专题之Callable、Future与FutureTask(含源码分析)
  • 面试官: 有了解过线程组和线程优先级吗
  • 面试官: 说一下线程的生命周期过程
  • 面试官: 说一下线程间的通信
  • 面试官: 说一下Java的共享内存模型
  • 面试官: 有了解过指令重排吗,什么是happens-before
  • 面试官: 有了解过volatile关键字吗 说说看
  • 面试官: 有了解过Synchronized吗 说说看
  • Java多线程专题之Lock锁的使用
  • 面试官: 有了解过ReentrantLock的底层实现吗?说说看
  • 面试官: 有了解过CAS和原子操作吗?说说看
  • Java多线程专题之线程池的基本使用
  • 面试官: 有了解过线程池的工作原理吗?说说看
  • 面试官: 线程池是如何做到线程复用的?有了解过吗,说说看
  • 面试官: 阻塞队列有了解过吗?说说看
  • 面试官: 阻塞队列的底层实现有了解过吗? 说说看
  • 面试官: 同步容器和并发容器有用过吗? 说说看
  • 面试官: CopyOnWrite容器有了解过吗? 说说看
  • 面试官: Semaphore在项目中有使用过吗?说说看(源码剖析)
  • 面试官: Exchanger在项目中有使用过吗?说说看(源码剖析)
  • 面试官: CountDownLatch有了解过吗?说说看(源码剖析)
  • 我的博客(阅读体验较佳)

项目源码(源码已更新 欢迎star??)

  • java-thread-all
  • 地址: http://github.com/origin/qiuChengleiy/java-thread-all.git

推荐 SpringBoot & SpringCloud (源码已更新 欢迎star??)

  • springboot-all
  • 地址: http://github.com/origin/qiuChengleiy/springboot-all.git
  • SpringBoot系列教程合集
  • 一起来学SpringCloud合集