J.U.C源码阅读笔记(四)Lock

Lock接口下的锁是基于AQS实现的显式锁。具体有ReentrantLockReentrantReadWriteLock.ReadLockReentrantReadWriteLock.WriteLock。相对于synchronized隐式锁,这些锁更灵活。

ReentrantLock

1
public class ReentrantLock implements Lock, java.io.Serializable

介绍

实现了Lock接口,是一个显示可重入锁,并且提供了公平锁和非公平锁的实现。

非公平锁 new ReentrantLock();

lock()

非公平锁获取锁的过程:

  • 首先会尝试将锁状态state通过CAScompareAndSetState(0, 1)设置为独占模式(修改为1)
    • 如果修改成功,将会调用setExclusiveOwnerThread(Thread.currentThread())将独占线程变量exclusiveOwnerThread指向当前线程
    • 如果修改失败,将会调用acquire(1)尝试重新获取锁。原因是可能当前线程之前通过lock()已经获取到锁了,现在又调用lock(),所以会失败,所以会判断独占锁的线程是否是当前线程。如果不是的话或者重试以后还是没有获取到锁就会将当前线程封装成一个队列节点放到阻塞队列中,挂起当前线程。
1
2
3
4
5
6
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

首先会再调用tryAcquire(arg)尝试通过CAS获取锁

  • 如果获取成功,直接返回
  • 如果获取失败,就会调用addWaiter(Node.EXCLUSIVE), arg)创建队列节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

// 获取非公平锁
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取锁状态
int c = getState();
// 如果当前锁没有被锁定,就会尝试再次通过CAS获取锁
if (c == 0) {
// 如果获取锁成功(通过CAS成功修改state为1),设置当前线程为独占锁线程
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获取锁的线程再次进入临界区,直接将锁状态加1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
// 整数溢出
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 设置锁计数
setState(nextc);
return true;
}
// 尝试获取锁失败,返回false
return false;
}

将当前线程封装成队列节点,插入到队列尾部

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private Node addWaiter(Node mode) {
// 定义一个关于当前线程的节点Node
Node node = new Node(Thread.currentThread(), mode);
// 先通过CAS将当前节点放到队列尾部,如果失败,在执行死循环保证将该节点放到队列尾部
// 优点是:如果成功放到队列尾部,就不用去执行死循环
// Try the fast path of enq; backup to full enq on failure
// 获取队尾节点
Node pred = tail;
// 如果队列已经存在
if (pred != null) {
// 设置当前节点的前驱节点为目前队尾节点
node.prev = pred;
// 通过CAS设置tail指向node
if (compareAndSetTail(pred, node)) {
// 设置tail指向node成功以后,将原来的队尾节点(pred目前指向的节点)后继节点指向node
pred.next = node;
return node;
}
}
// CAS设置失败,执行死循环保证将封装当前线程的节点放到队列中
enq(node);
return node;
}

// CAS设置失败,执行死循环保证将封装当前线程的节点放到队列中
private Node enq(final Node node) {
for (;;) {
// 获取队尾节点
Node t = tail;
// 如果队列还没创建
if (t == null) { // Must initialize
// 创建一个虚假节点new Node()队头head和队尾tail都指向该节点
if (compareAndSetHead(new Node()))
tail = head;
// 队列已经存在:通过CAS将当前节点入队即可
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

插入到队列以后,根据当前线程节点的前驱节点状态waitstatus决定当前线程是否应该被阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 直到获取锁,才会跳出死循环
for (;;) {
// 获取当前节点的前驱节点:为了在锁定之前决定是否会再次去尝试获取锁...
final Node p = node.predecessor();
// 如果当前节点的前驱节点是头节点,在锁定之前会再次去尝试获取锁...
if (p == head && tryAcquire(arg)) {
// 获取锁成功以后将此节点设置为头节点(head = node;),释放线程(node.thread = null;)
setHead(node);
// 将之前的头节点next指针置空
p.next = null; // help GC
failed = false;
return interrupted;
}
// 在挂起当前线程之前,还会再次判断当前线程是否应该阻塞(可能它前面的节点【都】已经取消了呢?(节点处于CANCELL状态),此时就会返回false)
// 如果应该阻塞当前线程,就会调用parkAndCheckInterrupt()阻塞当前线程并检查中断状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

封装当前线程的节点会根据它前驱节点的状态判断当前线程是否应该被阻塞

  • 如果当前线程节点的前驱节点状态是SIGNAL(-1),说明已经可以保证它的前驱节点在释放锁时唤醒它,所以当前线程可以安全的被阻塞了…
  • 如果当前线程节点的前驱节点状态大于0,说明前驱节点封装的线程被取消了,应该从队列中移除,然后判断前驱节点的前驱节点… 直到找到状态为小于0的节点
  • 如果当前线程节点的前驱节点状态等于0,说明前驱节点里的线程已经被阻塞了,此时会将前驱节点的状态改为SIGNAL,然后返回true,代表应该阻塞当前线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // SIGNAL == -1
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 当前节点的前驱节点已经被取消了(失效了,可能已经为null),需要重置当前节点的前驱节点。
// 节点操作因为超时或者对应的线程被interrupt。节点不应该留在此状态,一旦达到此状态将从CHL队列中踢出。
if (ws > 0) { // cancelled == 1 > 0
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 当head指向的是一个dummy节点时(当第一个节点插入到CLH队列中时),ws为0。 需要修改dummy节点的waitstatus
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

挂起当前线程。当此线程被唤醒时判断线程的中断状态,如果线程被中断过,就通过

1
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;

重新设置线程的中断状态

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

unlock()

首先将锁计数state减1

  • 如果锁计数state为0了,则释放当前锁,唤醒头节点的下一节点中封装的线程
  • 如果锁计数state不为0,则不释放当前锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public void unlock() {
sync.release(1);
}
// 1. 修改锁计数 2. 唤醒下一节点中封装的线程
public final boolean release(int arg) {
// 修改锁计数
if (tryRelease(arg)) {
Node h = head;
// 可能当前线程第一个获取锁的线程,此时head并没有被初始化
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

// 尝试释放:将state减1,根据state的值决定是否应该释放锁
protected final boolean tryRelease(int releases) {
// 获取释放掉锁以后的锁计数
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果锁计数为0,则释放当前线程持有的锁
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

// 唤醒下一节点中封装的线程
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 获取头节点线程的等待状态为SIGNAL==-1,表示下一节点不用再阻塞
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 下一节点可能已经取消,所以如果下一节点取消的话,就需要找到一个有效的下一节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒下一节点
if (s != null)
LockSupport.unpark(s.thread);
}

tryLock()

如果获取不到锁,直接返回false,不会阻塞当前线程。
tryLock()中尝试获取一次锁,如果没有成功,就不会再去获取或者阻塞当前线程,而是返回false
如果获取不到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

tryLock(long timeout, TimeUnit unit)

超时获取锁:在指定时间段内timeout会一直尝试获取锁,直到获取锁返回true,如果一直没有获取锁,超时以后会返回false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 先尝试获取一次,如果失败,再执行超时获取
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}

private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 获取超时时间
final long deadline = System.nanoTime() + nanosTimeout;
// 将当前线程封装成节点添加到队列中
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// 死循环中尝试多次获取。注意:如果获取不到,就会阻塞线程【指定】时间。
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
// 如果超时,直接返回false,代表没有获取到锁
if (nanosTimeout <= 0L)
return false;
// 如果队列中有等待线程,此时应该阻塞当前线程【一段时间】
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

lockInterruptibly()

支持中断锁:如果一个线程在获取锁的过程中被中断了,会抛出InterruptedException。
Java Doc

If the current thread:
has its interrupted status set on entry to this method; or
is interrupted while acquiring the lock,
then InterruptedException is thrown and the current thread’s interrupted status is cleared.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 如果当前线程设置了中断,直接向上抛异常,并且清除中断状态,不会再去获取锁。
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// 被阻塞的线程唤醒之后:如果检查到当前线程设置了中断,直接抛异常不会再去尝试获取锁
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

公平锁 new ReentrantLock(true);

lock()

公平锁获取锁的过程:

  • 首先会根据锁状态state来判断是否应该获取锁,而不是直接像非公平锁那样,直接尝试通过CAScompareAndSetState(0, 1)设置为独占模式(修改为1)来获取锁。
    • 如果此时可以获取锁(state为0),会进一步判断等待队列中是否存在等待获取锁的线程(和非公平锁的区别)
      • 如果此时队列中有等待线程,就会直接挂起当前线程。
      • 如果此时队列中没有等待线程,就会尝试通过CAScompareAndSetState(0, acquires)去尝试获取锁。
        • 如果修改成功,将会调用setExclusiveOwnerThread(Thread.currentThread())将独占线程变量exclusiveOwnerThread指向当前线程
        • 如果修改失败,将会调用acquire(1)
    • 如果此时不可以获取锁(state不为0),直接挂起当前线程。

注意:公平锁和非公平锁都重写了tryAcquire(int acquires)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
final void lock() {
acquire(1);
}

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 判断锁状态,是否可以获取
if (c == 0) {
// 如果此时可以获取锁,会进一步判断等待队列中是否有线程正在等待获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 判断此次是否是重入,如果重入,直接将锁计数加1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 即:不能获取锁
return false;
}
// 判断队列中是否还存在因为没有获取到锁而被阻塞的线程
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 如果h.next == null,说明其它线程刚创建了一个dummy节点,还没入队,并且此时tail为null,head指向dummy节点
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

Q&A

公平锁和非公平锁区别?

  • 处于非公平锁的线程直接尝试获取锁,不会去判断等待队列中是否已经有线程正在等待锁的释放。非公平锁会导致线程饥饿现象
  • 处于公平锁模式的线程按照请求锁的顺序决定获取锁的顺序(加锁时会判断等待队列中是否已经有线程正在等待锁,如果有的话就不会再去获取锁而是插到队列尾部)。

ReentrantLock和synchronized的区别?

  • 在用法上
    • ReentrantLock需要显示的获取锁和释放锁。(注意:为了安全,需要在finally中释放锁)。而synchronized会自动地获取锁和释放锁
    • ReentrantLock通过Condition实现了唤醒线程和使当前线程睡眠的API,没有使用Object的API。
    • ReentrantLock只能修饰代码块,并不能修饰方法。而synchronized既可以修饰方法也可以修饰代码块
  • 在特性上
    • ReentrantLock可以以非阻塞的形式来获取锁,如果没有获取锁,会返回false,而不会阻塞当前线程。 synchronized如果没有获取到锁,会阻塞当前线程
    • ReentrantLock可以以超时的形式来获取锁,如果在指定时间段内没有获取到锁,直接返回false,不会阻塞当前线程。
    • ReentrantLock实现了公平锁,防止了非公平锁可能产生的线程饥饿现象。
    • ReentrantLock实现了以中断形式来获取锁,如果线程在获取锁的过程中,被中断了,会直接抛出异常,不会再去尝试获取锁。