J.U.C源码阅读笔记(二)并发容器

并发容器提供了线程安全的容器。比如线程安全的Map/Queue/List (ConcurrentHashMapBlockingQueueCopyOnWriteArrayList),其它的还有阻塞队列,比如ArrayBolckingQueueLinkedBlockingQueueDelayQueueSynchronousQueue

ConcurrentHashMap

1
2
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable

介绍

ConcurrentHashMap是一个线程安全的Map

部分源码分析

public V put(K key, V value)

调用putVal(K key, V value, boolean onlyIfAbsent)

1
2
3
public V put(K key, V value) {
return putVal(key, value, false);
}

JDK8

在插入的时候会对数据进行预判断,不允许keyvaluenull,否则抛空指针。
由于内部维护的数组tab是懒加载,当tab没初始化时,会将tab初始化。通过轮询由volatile修饰的变量来保证只能有一个线程初始化数组。
当哈希表正在进行扩容时,当前线程会尝试帮助扩容。因为在扩容时会将ForwardingNode 放到数组下标,而ForwardingNode的hash为MOVED。
插入以后会将记录键值对数量的值尝试用CAS加1,如果没有成功,就将这个变化值存到一个数组中。最终键值对的数量是记录键值对数量的值加上记录变化的数组中的值。
扩容时会将原哈希表中的键值对放到一个大小是原哈希表二倍的新的哈希表中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 预处理
if (key == null || value == null) throw new NullPointerException();
// 为了分布均匀对key的hashcode再hash
int hash = spread(key.hashCode());
// 根据binCount通过计算链表长度,判断链表是否修改过,如果链表的数量超过设定的阈值,插入完成之后需要变成红黑树
int binCount = 0;
// 初始化tab,死循环直到插入成功
for (Node<K,V>[] tab = table;;) {
// 指向目标节点
Node<K,V> f;
int n, i, fh;
// 延迟加载:如果没有初始化进行初始化,并且通过轮询volatile变量保证只有一个线程执行哈希表的初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 如果数组下标对应的元素为null,通过CAS放到当前数组下标处
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果当前节点的hash值为MOVED常量,则表示这个ConcurrentHashMap正在进行扩容
// (在转移当前桶时,将将当前数组下表设置为ForwardingNode)
// ForwardingNode的Hash等于MOVED: super(MOVED, null, null, null);
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 当前数组下标有元素,并且可以正常插入,则根据节点元素类型是红黑树节点还是链表节点然后执行插入
else {
V oldVal = null;
// 加锁,Monitor对象是当前数组下标元素持有的Monitor
synchronized (f) {
// tabAt(tab,i)会通过Unsafe类求出数组下标元素,再次判断确保是否是上面所确定的f
if (tabAt(tab, i) == f) {
// 代表是链表元素,执行插入链表操作
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果已经存在此key,则更新即可(此处为了性能,尽量不用equals(),所以显示比较是否指向同一个对象)
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 不存在,则插入到链表尾部
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
// 当前数组下标元素类型是红黑树,执行红黑树插入操作
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 根据binCount判断链表元素个数
if (binCount != 0) {
// 超过阈值,则将链表变成红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 将当前维护的baseCount + 1,然后判断是否需要扩容
addCount(1L, binCount);
return null;
}

/* 下面两个函数来源: https://yfzhou.coding.me/2018/12/24/%E5%89%91%E6%8C%87ConcurrentHashMap%E3%80%90%E5%9F%BA%E4%BA%8EJDK1-8%E3%80%91/ */

// 经过一系列的判断,如果可以帮助扩容的话,最终会调用transfer(tab, nextTab);协助扩容
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// 如果 table 不是空 且 node 节点是转移类型,数据检验
// 且 node 节点的 nextTable(新 table) 不是空,同样也是数据校验
// 尝试帮助扩容
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 根据 length 得到一个标识符号
int rs = resizeStamp(tab.length);
// 如果 nextTab 没有被并发修改 且 tab 也没有被并发修改
// 且 sizeCtl < 0 (说明还在扩容)
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 如果 sizeCtl 无符号右移 16 不等于 rs ( sc前 16 位如果不等于标识符,则标识符变化了)
// 或者 sizeCtl == rs + 1 (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)
// 或者 sizeCtl == rs + 65535 (如果达到最大帮助线程的数量,即 65535)
// 或者转移下标正在调整 (扩容结束)
// 结束循环,返回 table
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 如果以上都不是, 将 sizeCtl + 1, (表示增加了一个线程帮助其扩容)
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 进行转移,将table指向的tab中的键值对重新放到nextTab中(nextTab容量是tab的二倍),扩容完毕后,将table指向nextTab
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

// 从 putVal 传入的参数是 1, binCount,binCount 默认是0,只有 hash 冲突了才会大于 1.且他的大小是链表的长度(如果不是红黑数结构的话)。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 如果CounterCell不为null,就将这次变化的值放到CounterCell,否则尝试用CAS更新baseCount,如果更新失败,就将这次变化的值放到CountCell中
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 如果CAS更新CounterCell失败,就会进入这个方法,保证最终更新成功
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}

//如果需要检查,检查是否需要扩容,在 putVal 方法调用时,默认就是要检查的。
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 如果map.size() 大于 sizeCtl(达到扩容阈值需要扩容) 且
// table 不是空;且 table 的长度小于 1 << 30。(可以扩容)
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 根据 length 得到一个标识
int rs = resizeStamp(n);
// 如果正在扩容
if (sc < 0) {
// 如果 sc 的低 16 位不等于标识符(校验异常 sizeCtl 变化了)
// 如果 sc == 标识符 + 1 (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)
// 如果 sc == 标识符 + 65535(帮助线程数已经达到最大)
// 如果 nextTable == null(结束扩容了)
// 如果 transferIndex <= 0 (转移状态变化了)
// 结束循环
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 如果可以帮助扩容,那么将 sc 加 1. 表示多了一个线程在帮助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// 扩容
transfer(tab, nt);
}
// 如果不在扩容,将 sc 更新:标识符左移 16 位 然后 + 2. 也就是变成一个负数。高 16 位是标识符,低 16 位初始是 2.
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 更新 sizeCtl 为负数后,开始扩容。
transfer(tab, null);
s = sumCount();
}
}
}

JDK7

不允许valuenull

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public V put(K key, V value) {
Segment<K, V> s;
// 不允许value为null
if (value == null) throw new NullPointerException();
int hash = hash(key);
// UNSAFE通过hash找到Segment
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K, V>) UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 加锁,如果加锁成功则执行后续操作,没有竞争到锁就需要自旋直到获取到锁并执行插入操作(自旋一定次数,就会调用阻塞锁)。
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
// 找到引用的数组
HashEntry<K,V>[] tab = table;
// 求模计算数组下标
int index = (tab.length - 1) & hash;
// 找到数组第一个元素
HashEntry<K,V> first = entryAt(tab, index);
// 死循环,直到插入成功
for (HashEntry<K,V> e = first;;) {
// 数组对应下标已经存在数值。
if (e != null) {
K k;
// 已经存在此key,进行更新
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
// 记录每一个Segment中的HashEntry数组中元素的数量
++modCount;
}
break;
}
e = e.next;
// 如果当前数组下标已经存在元素
} else {
// 当前线程可能自旋过(自旋过程可能会返回一个已经初始化的node),所以需要判断node是否为null
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 判断是否需要扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 释放锁
unlock();
}
return oldValue;
}

public V get(Object key)

JDK8

计算key对应的hash,然后通过hash定位到具体的桶,然后找到具体的value,因为value由volatile修饰,所以可以保证拿到的是最新的value

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public V get(Object key) {
Node<K,V>[] tab;
Node<K,V> e, p;
int n, eh;
K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 在红黑树中查找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 在链表中查找
while ((e = e.next) != null) {
if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

JDK7

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public V get(Object key) {
Segment<K, V> s; // manually integrate access methods to reduce overhead
HashEntry<K, V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 通过UNSAFE获取Segment,通过Segment获取HashEntry数组引用,进而找到指定的key
if ((s = (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
for (HashEntry<K, V> e = (HashEntry<K, V>) UNSAFE.getObjectVolatile
(tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}

public int size()

JDK8

通过由volatile修饰的baseCount变量以及CounterCell对象记录变化的次数求出size()

1
2
3
4
5
6
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
// 获取baseCount
long sum = baseCount;
if (as != null) {
// 循环遍历每一个CountCell中的value
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

JDK7

通过计算Segment数组中每一个Segment中的modcount(在put时会++modCount)求出sum。但是不止求一次sum,会至少求两次(由RETRIES_BEFORE_LOCK决定的),如果两次求出的sum不一致,下次求就会将Segment数组全部加锁,重新求一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K, V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (; ; ) {
// 超出两次,将Segment全部加锁
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K, V> seg = segmentAt(segments, j);
if (seg != null) {
// 获取Segment对象的modCount
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
// 和上一次求的结果比较
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}

Q&A

JDK8和JDK7的区别?

存储结构

  • JDK8存储结构采用Node类型数组加链表并通过拉链法解决hash冲突而JDK7基于AQS实现了一个Segment类继承自ReentrantLock,每个Segment对象持有一个HashEntry数组。Node和HashEntry都是实现了Map.Entry接口,只不过名字不同。

并发更新操作的实现

  • JDK8通过synchronized+CAS机制进行并发更新,锁对象是数组下标对应的元素持有的Monitor。JDK7继承了AQS里的ReentrantLock进行加锁实现的并发更新。

计算size

  • JDK8通过维护一个由volatile修饰的baseCount变量进行计数,以及一个CounterCell类进行记录变化的次数来确定size。JDK7采用延迟计算,在计算过程中会对每个Segment计算至少两次,如果出现数据不一致现象就进行全部加锁最后求得size。

参考


BlockingQueue

  • BlockingQueue接口下有ArrayBolckingQueueLinkedBlockingQueueDelayQueueSynchronousQueuePriorityQueue直接实现
  • BlockingDeque接口继承自BlockingQueue下面有LinkedBlockingDeque实现

ArrayBlockingQueue

ArrayBlockingQueue内部维护了一个ReentrantLock以及两个和ReentrantLock有关的Condition对象来实现阻塞操作。整体是一个循环队列(基于数组实现),并且队列的长度一旦设定就不可在变。

1
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable

部分源码分析

下面说的非阻塞/阻塞操作都是指已经获取锁后的操作

变量

  • final Object[] items; // 存放元素的数组
  • int takeIndex; // 队头元素位置
  • int putIndex; // 队尾元素位置
  • int count; // 队列长度

public ArrayBlockingQueue(int capacity)

构造函数必须指定容量,并且容量后期不可扩充,如果队列满了以后会直接阻塞当前线程

1
2
3
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

public boolean add(E e)

非阻塞的入队方法,如果返回false则代表添加失败,父类会调用子类的offer(e)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public boolean add(E e) {
return super.add(e);
}
public boolean offer(E e) {
// 队列不允许添加null,如果添加null直接抛出异常
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
// 如果队列已满,则返回false
if (count == items.length)
return false;
// 否则入队,返回true代表入队成功
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
// 将入队元素放在入队位置
items[putIndex] = x;
// 如果入完队以后队列已经满了,则将putIndex放到初始位置0
// 因为用了一个冗余变量count来记录队列中元素的数量,所以不必再空一个元素来判断队列满或者队列空
if (++putIndex == items.length)
putIndex = 0;
// 增加队列数量
count++;
// 唤醒因为出队时队列没有元素而被阻塞的线程
notEmpty.signal();
}

public boolean offer(E e, long timeout, TimeUnit unit)

如果当前队列已经满了,则等待timeout,单位是unit,超时以后如果当前队列还是满的,则返回false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 提前判空
checkNotNull(e);
// 将超时时间换成纳秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 判断队列长度
while (count == items.length) {
// 超时以后,如果当前队列还是满的,则返回false
if (nanos <= 0)
return false;
// 阻塞当前线程nanos时间
nanos = notFull.awaitNanos(nanos);
}
// 正常入队
enqueue(e);
return true;
} finally {
lock.unlock();
}
}

public boolean offer(E e)

以阻塞的形式入队,如果当前队列已满则阻塞当前线程,并且采用可中断锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void put(E e) throws InterruptedException {
// 提前判空
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列已满,则阻塞当前线程
while (count == items.length)
notFull.await();
// 否则入队
enqueue(e);
} finally {
lock.unlock();
}
}

public E poll()

非阻塞的出队操作,如果当前队列长度为0则返回false,否则返回false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 获取要出队的元素
E x = (E) items[takeIndex];
// 将出对元素的值设为null
items[takeIndex] = null;
// 实现循环队列
if (++takeIndex == items.length)
takeIndex = 0;
// 修改队列长度
count--;
// 迭代器相关操作
if (itrs != null)
itrs.elementDequeued();
// 唤醒当队列满时,因为入队操作而被阻塞的线程
notFull.signal();
// 返回出队元素
return x;
}

public E take() throws InterruptedException

以阻塞形式出队,如果当前队列长度为0,则阻塞当前线程

1
2
3
4
5
6
7
8
9
10
11
12
13
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列长度为0,阻塞当前线程
while (count == 0)
notEmpty.await();
// 队列中存在元素,正常出队
return dequeue();
} finally {
lock.unlock();
}
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException

以超时的形式出队,如果当前队列长度为0,则阻塞当前线程一段时间,超时以后如果当前队列仍然没有元素,则返回false,否则正常出队。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}

public E peek()

获取队头元素

1
2
3
4
5
6
7
8
9
10
11
12
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return (E) items[i];
}

public int size()

获取队列中的元素数量,获取数量前加锁,保证获取到的数量是最新的

个人认为:去掉lock,用volatile修饰count也可以获取最新的正确的元素数量,因为每次对count的修改最多只是一个线程。去掉lock以后,保证每次读取count是最新的即可。

1
2
3
4
5
6
7
8
9
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}

public int remainingCapacity()

获取队列的剩余空间(还可以存放多少元素),不能通过判断剩余空间数量来决定是否插入,因为判断 -> 插入是非原子操作。

1
2
3
4
5
6
7
8
9
10
public int remainingCapacity() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 队列中数组的长度 - 队列中的元素数量
return items.length - count;
} finally {
lock.unlock();
}
}

public boolean remove(Object o)

移除队列中的元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 预判断队列中的元素数量
if (count > 0) {
// 获取下一个要插入元素的位置
final int putIndex = this.putIndex;
// 获取队头元素位置
int i = takeIndex;
do {
// 找到目标元素
if (o.equals(items[i])) {
// 移除队列中下标为i的元素
removeAt(i);
return true;
}
// 保证循环
if (++i == items.length)
i = 0;
} while (i != putIndex); // 到达队尾
}
return false;
} finally {
lock.unlock();
}
}

CopyOnWriteArrayList

1
2
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable

CopyOnWriteArrayList是一个线程安全的List,顾名思义,写时复制。当对数组做修改操作时,会加锁。比如添加元素时,会先将原数组元素拷贝到一个新的更大的数组中,将元素添加到新数组中,然后将原数组引用指向新数组。又因为数组由volatile修饰,所以对原数组引用的修改对其它线程是可见的。相对于Vector效率更高。

部分源码分析

public boolean add(E e)

当添加元素时,需要加锁保证线程安全,修改完以后将原数组引用指向新数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean add(E e) {
final ReentrantLock lock = this.lock;
// 修改之前先获取锁
lock.lock();
try {
// 获取原数组引用
Object[] elements = getArray();
int len = elements.length;
// 将原数组元素复制到更大的新数组中
Object[] newElements = Arrays.copyOf(elements, len + 1);
// 向新数组中添加元素
newElements[len] = e;
// 将原数组引用指向新数组
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

public E set(int index, E element)

修改指定下标的元素。当修改元素时,需要加锁保证线程安全。修改完以后将原数组引用指向新数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public E set(int index, E element) {
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
Object[] elements = getArray();
// 获取要修改的元素
E oldValue = get(elements, index);
// 如果要修改的元素和预期元素值不同 => 正常修改
if (oldValue != element) {
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len);
newElements[index] = element;
setArray(newElements);
// 如果要修改的元素的值和预期元素相同 => 保持不动即可:将原数组引用指向原数组。
} else {
// Not quite a no-op; ensures volatile write semantics
setArray(elements);
}
return oldValue;
} finally {
lock.unlock();
}
}

public E get(int index)

获取指定下标的元素。因为数组由volatile修饰,所以保证其它线程访问数组时都是最新的。

private transient volatile Object[] array;

1
2
3
4
5
6
public E get(int index) {
return get(getArray(), index);
}
private E get(Object[] a, int index) {
return (E) a[index];
}

public E remove(int index)

删除指定下标的元素。先获取锁。分删除最后一个元素还是中间元素。当删除最后一个元素,将原数组的将n-1个元素拷贝到新数组中,然后将原数组引用指向新数组即可。当删除的是中间元素。分别将原数组中间元素的左侧元素和右侧元素拷贝到新数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public E remove(int index) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1;
// 删除的是最后一个元素
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
// 删除的元素是中间元素
else {
Object[] newElements = new Object[len - 1];
// 将要删除元素左侧元素放到新数组中的左侧
// 第二个参数是从原数组的起始位置开始拷贝,第三个是从新数组的起始位置开始放,第四个是要拷贝的元素长度
System.arraycopy(elements, 0, newElements, 0, index);
// 将要删除元素右侧元素放到新数组的右侧
System.arraycopy(elements, index + 1, newElements, index, numMoved);
setArray(newElements);
}
return oldValue;
} finally {
lock.unlock();
}
}

Q&A

CopyOnWriteArrayList 和 Vector区别?

Vector是对每个方法加锁,导致并发效率低,而COWL只对修改操作进行了加锁操作,只读操作并没有加锁。很适合读多写少的应用。