面试官: CyclicBarrier有了解过吗?说说看(源码剖析)
前言
目前正在出一个Java多线程专题长期系列教程,从入门到进阶含源码解读, 篇幅会较多, 喜欢的话,给个关注?? ?
Java提供了一些非常好用的并发工具类,不需要我们重复造轮子,本节我们讲解CyclicBarrier,一起来看下吧?
CyclicBarrier
这个跟我们上节讲的CountDownLatch有点类似,从字面意思讲是相当于一个可循环的屏障,他与CountDownLatch不同的是它可以重复利用,下一步的操作以,依赖上一步是否完成,就像去银行办业务一样,排在你前面的人办好了才轮到你,我们继续通过上节的例子,来改写一下它,这里我偷个懒,实际业务中尽量用类编写,不要直接new Thread
public class CyclicBarrierTest {
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(1);
IntStream.range(0, 10).forEach(i -> {
new Thread(() -> {
try {
Thread.sleep(2000);
System.out.println(&34;worker 1------> &34; + i);
cyclicBarrier.await();
Thread.sleep(2000);
System.out.println(&34;worker 2------> &34; + i);
cyclicBarrier.await();
Thread.sleep(2000);
System.out.println(&34;worker 3------> &34; + i);
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
});
System.out.println(&34;completed !&34;);
}
}
实际输出:
completed !
worker 1------> 9
worker 1------> 0
worker 1------> 6
worker 1------> 7
worker 1------> 5
worker 1------> 4
worker 1------> 1
worker 1------> 3
worker 1------> 2
worker 1------> 8
worker 2------> 7
worker 2------> 6
worker 2------> 5
worker 2------> 2
worker 2------> 3
worker 2------> 1
worker 2------> 8
worker 2------> 0
worker 2------> 9
worker 2------> 4
worker 3------> 6
worker 3------> 3
worker 3------> 2
worker 3------> 5
worker 3------> 7
worker 3------> 8
worker 3------> 1
worker 3------> 0
worker 3------> 9
worker 3------> 4
可以看到在即使在多线程下,每个操作都需要上一个await任务之后执行,使用很简单,也很好理解。
知其然知其所以然 & 源码剖析
下面我们就一起探究一下,它是如何做到的?
同样的,我们先看构造函数
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
默认barrierAction是null, 这个参数是Runnable参数,当最后线程达到的时候执行的任务,刚刚的例子中没有演示,大家可以在初始化的时候传入一个,打印一下当前的线程名称,这样理解起来比较容易点,parties int型,它的意思是参与的线程数。
我们再看它的定义, 可以看到它没有继承任何类或实现任何接口
public class CyclicBarrier { .... }
await
我们重点看下这个方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
这个方法干嘛用的呢?等到所有各方都在此屏障上调用了await 。如果当前线程不是最后到达的,则出于线程调度目的将其禁用并处于休眠状态,除了以下情况:
- 最后一个线程到达;或者
- 其他一些线程中断当前线程;或者
- 其他一些线程中断了其他等待线程之一;或者
- 其他一些线程在等待屏障时超时;或者
- 其他一些线程在此屏障上调用reset 。
再看dowait(), 它是一个私有方法
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 全局锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 每次使用屏障都会生成一个实例
// private Generation generation = new Generation();
final Generation g = generation;
// broken字面意思破坏,如果被破坏了就抛异常
if (g.broken)
throw new BrokenBarrierException();
// 线程中断检测
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 剩余的等待线程数
int index = --count;
// 最后线程到达时
if (index == 0) { // tripped
// 标记任务是否被执行(就是传进入的runable参数)
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 执行任务
if (command != null)
command.run();
ranAction = true;
// 完成后 进行下一组 初始化 generation 初始化 count 并唤醒所有等待的线程
//
// private void nextGeneration() {
// // signal completion of last generation
// trip.signalAll();
// // set up next generation
// count = parties;
// generation = new Generation();
// }
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// index 不为0时 进入自旋
for (;;) {
try {
// 先判断超时 没超时就继续等着
if (!timed)
trip.await();
// 如果超出指定时间 调用 awaitNanos 超时了释放锁
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
// 中断异常捕获
} catch (InterruptedException ie) {
// 判断是否被破坏
if (g == generation && ! g.broken) {
// private void breakBarrier() {
// generation.broken = true;
// count = parties;
// trip.signalAll();
// }
breakBarrier();
throw ie;
} else {
// 否则的话中断当前线程
Thread.currentThread().interrupt();
}
}
// 被破坏抛异常
if (g.broken)
throw new BrokenBarrierException();
// 正常调用 就返回
if (g != generation)
return index;
// 超时了而被唤醒的情况 调用 breakBarrier()
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 释放锁
lock.unlock();
}
}
如果被破坏了怎么恢复呢?来看下reset, 源码很简单,break之后重新生成新的实例,对应的会重新初始化count,在dowait里index==0也调用了nextGeneration,所以说它是可以循环利用的
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
结束语
cyclicBarrier源码相对简单一些,下节给大家讲下Phaser,它是增强版的CountDownLatch,它的实现相对更加复杂一点 ?
往期并发编程内容推荐
- Java多线程专题之线程与进程概述
- Java多线程专题之线程类和接口入门
- Java多线程专题之进阶学习Thread(含源码分析)
- Java多线程专题之Callable、Future与FutureTask(含源码分析)
- 面试官: 有了解过线程组和线程优先级吗
- 面试官: 说一下线程的生命周期过程
- 面试官: 说一下线程间的通信
- 面试官: 说一下Java的共享内存模型
- 面试官: 有了解过指令重排吗,什么是happens-before
- 面试官: 有了解过volatile关键字吗 说说看
- 面试官: 有了解过Synchronized吗 说说看
- Java多线程专题之Lock锁的使用
- 面试官: 有了解过ReentrantLock的底层实现吗?说说看
- 面试官: 有了解过CAS和原子操作吗?说说看
- Java多线程专题之线程池的基本使用
- 面试官: 有了解过线程池的工作原理吗?说说看
- 面试官: 线程池是如何做到线程复用的?有了解过吗,说说看
- 面试官: 阻塞队列有了解过吗?说说看
- 面试官: 阻塞队列的底层实现有了解过吗? 说说看
- 面试官: 同步容器和并发容器有用过吗? 说说看
- 面试官: CopyOnWrite容器有了解过吗? 说说看
- 面试官: Semaphore在项目中有使用过吗?说说看(源码剖析)
- 面试官: Exchanger在项目中有使用过吗?说说看(源码剖析)
- 面试官: CountDownLatch有了解过吗?说说看(源码剖析)
- 我的博客(阅读体验较佳)
项目源码(源码已更新 欢迎star??)
- java-thread-all
- 地址: http://github.com/origin/qiuChengleiy/java-thread-all.git
推荐 SpringBoot & SpringCloud (源码已更新 欢迎star??)
- springboot-all
- 地址: http://github.com/origin/qiuChengleiy/springboot-all.git
- SpringBoot系列教程合集
- 一起来学SpringCloud合集