J.U.C源码阅读笔记(三)同步器

同步器用来协助线程同步,具体有 CountDownLatchCyclicBarrierSemaphoreExchanger

CountDownLatch

通俗理解: CountDownLatch可以理解为是一扇门,门上有若干把锁,当线程到达这个门时,如果门上锁的数量不为0,就会阻塞当前线程,直到门上锁的数量减为0,此时会唤醒被阻塞的线程。并且门打开以后不会在关闭。
具体实现: CountDownLatch是基于AQS实现的,锁的数量就是AQS中的state,通过构造函数设置state初始值。当调用CountDownLatchawait()方法时,如果state不为0,就会阻塞当前线程。如果为0就不会阻塞。当通过countDown()state减为0时,就会唤醒之前被阻塞的线程。并且state减为0后不会被重置。
CountDownLatch可以理解为:所有线程需要等待某个事件发生以后,才能继续执行,否则就会被阻塞。

部分源码分析

public CountDownLatch(int count)

通过构造函数设置锁的初始数量,也就是AQS里的state

1
2
3
4
5
6
7
8
9
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

// 设置AQS中的state,Sync继承了AQS
Sync(int count) {
setState(count);
}

public void await()

会判断state(锁)的数量是否为0,如果不为0就会被阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 获取共享模式下的锁,当state不为0时tryAcquireShared(arg)会返回-1 => 代表获取锁失败(具体到CountDownLatch,就是等待的事件没有发生)需要被阻塞。否则返回1
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

// 当锁的数量(state)不为0时,返回-1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 将节点添加到队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 前驱节点为头节点
if (p == head) {
// 再次尝试获取共享锁(判断锁的数量是否为0了)
int r = tryAcquireShared(arg);
// 如果锁的数量为0了,就会进这个if
if (r >= 0) {
// 设置当前节点为头节点,并且唤醒后继节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 判断是否阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

public void countDown()

释放锁(将state减1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void countDown() {
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
// 判断将state减1后是否为0,如果为0,则唤醒被阻塞的线程
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

doReleaseShared()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void doReleaseShared() {
for (;;) {
Node h = head;
// h == null 代表还没有线程被阻塞,也就是说没有添加到阻塞队列里,所以不用唤醒
// h == tail 代表state == 0了,但是又一个线程之前判断state不为0,正在创建head节点,
// 也不用管(因为执行 int r = tryAcquireShared(arg); 时,state为0,所以就会返回)
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 因为释放了后继节点以后,后继节点对应线程执行setHeadAndPropagate(node, r)会修改head指针
// 当前节点中的线程执行最后一个 if(h == head)时,可能后继节点中的线程已经修改了head, 判断false后当前节点中的线程就会重新循环
// 所以当前节点对应的线程以及后继节点对应的线程同时走到这里后,并发修改head节点的waitstatus就可能会失败
// 为什么要这样,不太清楚。有博客说是为了提高吞吐量。也就说当前线程唤醒了后继节点中的线程后,还可能会帮助唤醒其它节点中的线程
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒头节点head的后继节点
unparkSuccessor(h);
}
// 当前节点的waitStatus为0, 是因为之前只有当前节点被阻塞,当前节点修改时,有新的节点入队了,此时需要被唤醒,所以当前线程就会重新进入循环。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 返回false后,代表由当前节点对应线程唤醒的后继节点中的线程修改了head指针,导致返回false
if (h == head) // loop if head changed
break;
}
}

应用场景

  • 统计多个线程并发执行某个任务所需要的时间。首先是要保证所有线程都处于就绪状态。也就是说线程需要等待【所有线程都处于就绪状态】这个事件发生。然后并发执行任务。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    CountDownLatch startSignal = new CountDownLatch(1);
    CountDownLatch doneSignal = new CountDownLatch(N); // N代表N个线程
    ExecutorService threadPool = Executors.newFixedThreadPool(N);
    for (int i = 0; i < N; i++)
    threadPool.submit(()->{
    startSignal.await(); // 等待【所有线程就绪】事件发生
    doTask();
    doneSignal().countDown();
    });
    long startTime = System.currentTimeMillis();
    startSignal.countDown(); // 此时所有线程已经就绪,然后触发【所有线程就绪】事件发生
    doneSignal().await(); // 等待【所有线程执行完毕】这个事件发生
    long total = System.currentTimeMillis(); - startTime; // 统计时间

CyclicBarrier

CyclicBarrier循环_栅栏(障碍物)。直到所有线程都到达以后,才会唤醒线程,否则提前到达的线程会被阻塞。当唤醒所有被阻塞的线程以后,会进入下一代,并且重置障碍,达到循环(Cyclic)的目的。
CyclicBarrier可以理解为:等待所有线程都到达以后,被阻塞的线程才能继续执行。
CyclicBarrier并没有直接依赖AQS实现,具体是通过ReentrantLockCondition做的实现,因为要并发修改等待的线程数量。

部分源码解析

public CyclicBarrier(int parties)

parties是需要等待的线程的数量,当等待的线程数量达到parties时,会唤醒这些等待的线程。
barrierAction是当最后一个线程到达栅栏时,需要执行的操作。
count是还需要等待几个线程,当需要等待的线程数量为0时,表示可以唤醒等待的线程

1
2
3
4
5
6
7
8
9
10
public CyclicBarrier(int parties) {
this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

public int await()

将需要等待的线程数量减1,当减到0时就会唤醒所有等待的线程。
等待所有的线程到达,如果当前线程不是最后一个到达的,将会被阻塞,直到最后一个线程到达后被唤醒。具体的执行逻辑在dowait(boolean timed, long nanos)里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取当前分代
final Generation g = generation;
// 判断当前分代是否由于超时、中断等被打破
if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 判断还需要等待的线程的数量
int index = --count;
// 如果需要等待的线程的数量为0,则表明当前线程是最后一个线程
// 判断是否需要执行barrierCommand,然后唤醒等待的线程
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 获取注册的操作
final Runnable command = barrierCommand;
// 如果操作不为null,则执行run()
if (command != null)
command.run();
ranAction = true;
// 进入下一分代,重置count
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
// 如果当前线程不是最后一个线程,将会被阻塞
for (;;) {
try {
// 没有设置超时等待,直接阻塞
if (!timed)
trip.await();
// 设置了超时等待,则等待指定时间
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 如果线程被中断了, 需要打破分代(设置Generation标志位)
if (g == generation && ! g.broken) {
// 设置标志位、唤醒等待的线程
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// 当被唤醒的线程,发现分代被打破以后,直接抛出异常
if (g.broken)
throw new BrokenBarrierException();
// 当被唤醒的线程,发现当前分代不等于新的分代(最后一个线程会重新创建generation进入下一分代)。直接返回
if (g != generation)
return index;
// 如果超时以后,直接打破分代、并且抛出超时异常。
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

private void nextGeneration()

唤醒等待的线程,重置count,进入下一分代

1
2
3
4
5
6
7
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}

private void breakBarrier()

打破分代,重置count,并且唤醒其他线程。

1
2
3
4
5
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}

应用场景

  • 将大任务分解成若干个子任务。现在要统计一个矩阵所有元素相加和。可以让每一个线程求每一行。最后一个线程执行完以后,调用await()执行之前注册的barrierCommand将每一个线程的结果汇总。代码来自Oracle doc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;

class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}

public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
barrier = new CyclicBarrier(N, () -> {mergeRows(...);});
for (int i = 0; i < N; ++i)
new Thread(new Worker(i)).start();
}
}
  • 可以让固定数量的线程周期性的执行任务。比如周期性每次让10个线程并发执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CyclicBarrier cyclicBarrier = new CyclicBarrier(times, ()-> {
System.out.println("================");
});
for (int i = 0; i < 10; i++) {
new Thread(()-> {
try {
while (true) {
cyclicBarrier.await();
// do something
System.out.println(Thread.currentThread().getName());
}
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}

Semaphore

Semaphore

  • Semaphore可以理解为是一个资源池,线程需要从这个资源池中申请到资源才能继续运行,否则会被阻塞直到资源池中有可用资源。
  • Semaphore是基于AQS实现的,和CountDownLatch类似。CountDownLatch是当state为0时,需要唤醒线程。而它将AQS中的state表示为可用的资源数量,当state为0时代表没有可用资源,会将阻塞当前线程,将其添加到队列中,当其它线程释放资源后,此时state不为0,就会唤醒所有的被阻塞的线程。

部分源码分析

public void acquire(int permits)

非公平信号量获取许可过程:通过final int nonfairTryAcquireShared(int acquires)获取state值,判断是否还有可用的许可。

  • 如果有的话直接返回
  • 如果没有可用的许可,将当前线程封装到队列中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public void acquire(int permits) throws InterruptedException {
// 传入的许可数量是负数,抛异常
if (permits < 0) throw new IllegalArgumentException();
// 尝试获取许可
sync.acquireSharedInterruptibly(permits);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 判断可用的许可数量是否满足需要,如果不满足会将当前线程封装到队列中
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 获取可用的许可
int available = getState();
// 判断可用许可数量是否满足需要
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 将节点添加到队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 前驱节点为头节点
if (p == head) {
// 再次尝试获取共享锁(判断锁的数量是否为0了)
int r = tryAcquireShared(arg);
// 如果锁的数量为0了,就会进这个if
if (r >= 0) {
// 设置当前节点为头节点,并且唤醒后继节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 判断是否阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

public void release()

释放许可操作:将状态变量state加1,并唤醒阻塞的线程。注意(Java Doc):在释放许可时,不必提前获取许可

There is no requirement that a thread that releases a permit must have acquired that permit by calling acquire. Correct usage of a semaphore is established by programming convention in the application.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// 释放资源以后,唤醒之前被阻塞的所有线程,让它们竞争资源
if (tryReleaseShared(arg)) {
// 参照CountDownLatch注释
doReleaseShared();
return true;
}
return false;
}
// 通过CAS将state加1,因为释放操作是多个线程并发的。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}

应用场景

互斥锁

当维护的许可数量是1时,就是互斥锁

1
Semaphore mutex = new Semaphore(1);

控制线程提交的最大任务量

比如爬虫程序中,控制线程提交的最大任务量,将许可数量设置为线程池中最大线程数量 + 任务队列长度来保证提交的任务都能被执行,防止提交任务过多导致内存溢出或者执行拒绝策略。