同步器用来协助线程同步,具体有 CountDownLatch
、CyclicBarrier
、Semaphore
、Exchanger
。
CountDownLatch
通俗理解: CountDownLatch
可以理解为是一扇门,门上有若干把锁,当线程到达这个门时,如果门上锁的数量不为0,就会阻塞当前线程,直到门上锁的数量减为0,此时会唤醒被阻塞的线程。并且门打开以后不会在关闭。
具体实现: CountDownLatch
是基于AQS
实现的,锁的数量就是AQS
中的state
,通过构造函数设置state
初始值。当调用CountDownLatch
的await()
方法时,如果state
不为0,就会阻塞当前线程。如果为0就不会阻塞。当通过countDown()
将state
减为0时,就会唤醒之前被阻塞的线程。并且state
减为0后不会被重置。CountDownLatch
可以理解为:所有线程需要等待某个事件发生以后,才能继续执行,否则就会被阻塞。
部分源码分析
public CountDownLatch(int count)
通过构造函数设置锁的初始数量,也就是AQS
里的state
1 | public CountDownLatch(int count) { |
public void await()
会判断state
(锁)的数量是否为0,如果不为0就会被阻塞。
1 | public void await() throws InterruptedException { |
public void countDown()
释放锁(将state减1)
1 | public void countDown() { |
doReleaseShared()
1 | private void doReleaseShared() { |
应用场景
- 统计多个线程并发执行某个任务所需要的时间。首先是要保证所有线程都处于就绪状态。也就是说线程需要等待【所有线程都处于就绪状态】这个事件发生。然后并发执行任务。
1
2
3
4
5
6
7
8
9
10
11
12
13CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N); // N代表N个线程
ExecutorService threadPool = Executors.newFixedThreadPool(N);
for (int i = 0; i < N; i++)
threadPool.submit(()->{
startSignal.await(); // 等待【所有线程就绪】事件发生
doTask();
doneSignal().countDown();
});
long startTime = System.currentTimeMillis();
startSignal.countDown(); // 此时所有线程已经就绪,然后触发【所有线程就绪】事件发生
doneSignal().await(); // 等待【所有线程执行完毕】这个事件发生
long total = System.currentTimeMillis(); - startTime; // 统计时间
CyclicBarrier
CyclicBarrier
循环_栅栏(障碍物)。直到所有线程都到达以后,才会唤醒线程,否则提前到达的线程会被阻塞。当唤醒所有被阻塞的线程以后,会进入下一代,并且重置障碍,达到循环(Cyclic)的目的。CyclicBarrier
可以理解为:等待所有线程都到达以后,被阻塞的线程才能继续执行。CyclicBarrier
并没有直接依赖AQS实现,具体是通过ReentrantLock
和Condition
做的实现,因为要并发修改等待的线程数量。
部分源码解析
public CyclicBarrier(int parties)
parties
是需要等待的线程的数量,当等待的线程数量达到parties时,会唤醒这些等待的线程。barrierAction
是当最后一个线程到达栅栏时,需要执行的操作。count
是还需要等待几个线程,当需要等待的线程数量为0时,表示可以唤醒等待的线程
1 | public CyclicBarrier(int parties) { |
public int await()
将需要等待的线程数量减1,当减到0时就会唤醒所有等待的线程。
等待所有的线程到达,如果当前线程不是最后一个到达的,将会被阻塞,直到最后一个线程到达后被唤醒。具体的执行逻辑在dowait(boolean timed, long nanos)
里。
1 | public int await() throws InterruptedException, BrokenBarrierException { |
private void nextGeneration()
唤醒等待的线程,重置count
,进入下一分代
1 | private void nextGeneration() { |
private void breakBarrier()
打破分代,重置count
,并且唤醒其他线程。
1 | private void breakBarrier() { |
应用场景
- 将大任务分解成若干个子任务。现在要统计一个矩阵所有元素相加和。可以让每一个线程求每一行。最后一个线程执行完以后,调用
await()
执行之前注册的barrierCommand将每一个线程的结果汇总。代码来自Oracle doc
1 | class Solver { |
- 可以让固定数量的线程周期性的执行任务。比如周期性每次让10个线程并发执行
1 | CyclicBarrier cyclicBarrier = new CyclicBarrier(times, ()-> { |
Semaphore
Semaphore
可以理解为是一个资源池,线程需要从这个资源池中申请到资源才能继续运行,否则会被阻塞直到资源池中有可用资源。Semaphore
是基于AQS实现的,和CountDownLatch
类似。CountDownLatch
是当state
为0时,需要唤醒线程。而它将AQS中的state
表示为可用的资源数量,当state
为0时代表没有可用资源,会将阻塞当前线程,将其添加到队列中,当其它线程释放资源后,此时state
不为0,就会唤醒所有的被阻塞的线程。
部分源码分析
public void acquire(int permits)
非公平信号量获取许可过程:通过final int nonfairTryAcquireShared(int acquires)
获取state值,判断是否还有可用的许可。
- 如果有的话直接返回
- 如果没有可用的许可,将当前线程封装到队列中。
1 | public void acquire(int permits) throws InterruptedException { |
public void release()
释放许可操作:将状态变量state
加1,并唤醒阻塞的线程。注意(Java Doc):在释放许可时,不必提前获取许可
There is no requirement that a thread that releases a permit must have acquired that permit by calling acquire. Correct usage of a semaphore is established by programming convention in the application.
1 | public void release() { |
应用场景
互斥锁
当维护的许可数量是1时,就是互斥锁
1 | Semaphore mutex = new Semaphore(1); |
控制线程提交的最大任务量
比如爬虫程序中,控制线程提交的最大任务量,将许可数量设置为线程池中最大线程数量 + 任务队列长度来保证提交的任务都能被执行,防止提交任务过多导致内存溢出或者执行拒绝策略。