并发容器提供了线程安全的容器。比如线程安全的Map/Queue/List (ConcurrentHashMap
、BlockingQueue
、CopyOnWriteArrayList
),其它的还有阻塞队列,比如ArrayBolckingQueue
、LinkedBlockingQueue
、DelayQueue
、SynchronousQueue
ConcurrentHashMap
1 | public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> |
介绍
ConcurrentHashMap
是一个线程安全的Map
部分源码分析
public V put(K key, V value)
调用putVal(K key, V value, boolean onlyIfAbsent)
1 | public V put(K key, V value) { |
JDK8
在插入的时候会对数据进行预判断,不允许key
和value
为null
,否则抛空指针。
由于内部维护的数组tab
是懒加载,当tab
没初始化时,会将tab
初始化。通过轮询由volatile
修饰的变量来保证只能有一个线程初始化数组。
当哈希表正在进行扩容时,当前线程会尝试帮助扩容。因为在扩容时会将ForwardingNode
放到数组下标,而ForwardingNode
的hash为MOVED。
插入以后会将记录键值对数量的值尝试用CAS加1,如果没有成功,就将这个变化值存到一个数组中。最终键值对的数量是记录键值对数量的值加上记录变化的数组中的值。
扩容时会将原哈希表中的键值对放到一个大小是原哈希表二倍的新的哈希表中。
1 | final V putVal(K key, V value, boolean onlyIfAbsent) { |
JDK7
不允许value
为null
1 | public V put(K key, V value) { |
public V get(Object key)
JDK8
计算key对应的hash,然后通过hash定位到具体的桶,然后找到具体的value,因为value由volatile
修饰,所以可以保证拿到的是最新的value
1 | public V get(Object key) { |
JDK7
1 | public V get(Object key) { |
public int size()
JDK8
通过由volatile
修饰的baseCount
变量以及CounterCell
对象记录变化的次数求出size()
1 | public int size() { |
1 | final long sumCount() { |
JDK7
通过计算Segment数组中每一个Segment中的modcount(在put时会++modCount)求出sum。但是不止求一次sum,会至少求两次(由RETRIES_BEFORE_LOCK
决定的),如果两次求出的sum不一致,下次求就会将Segment数组全部加锁,重新求一次。
1 | public int size() { |
Q&A
JDK8和JDK7的区别?
存储结构
- JDK8存储结构采用Node类型数组加链表并通过拉链法解决hash冲突而JDK7基于AQS实现了一个Segment类继承自ReentrantLock,每个Segment对象持有一个HashEntry数组。Node和HashEntry都是实现了Map.Entry接口,只不过名字不同。
并发更新操作的实现
- JDK8通过synchronized+CAS机制进行并发更新,锁对象是数组下标对应的元素持有的Monitor。JDK7继承了AQS里的ReentrantLock进行加锁实现的并发更新。
计算size
- JDK8通过维护一个由volatile修饰的baseCount变量进行计数,以及一个CounterCell类进行记录变化的次数来确定size。JDK7采用延迟计算,在计算过程中会对每个Segment计算至少两次,如果出现数据不一致现象就进行全部加锁最后求得size。
参考
- ConcurrentHashMap 源码解读
- 剑指ConcurrentHashMap【基于JDK1.8】
- 谈谈ConcurrentHashMap1.7和1.8的不同实现
- ConcurrentHashMap源码分析(JDK1.8)
- ConcurrentHashMap实现原理及源码分析
BlockingQueue
BlockingQueue
接口下有ArrayBolckingQueue
、LinkedBlockingQueue
、DelayQueue
、SynchronousQueue
、PriorityQueue
直接实现BlockingDeque
接口继承自BlockingQueue
下面有LinkedBlockingDeque
实现
ArrayBlockingQueue
ArrayBlockingQueue
内部维护了一个ReentrantLock
以及两个和ReentrantLock
有关的Condition
对象来实现阻塞操作。整体是一个循环队列(基于数组实现),并且队列的长度一旦设定就不可在变。
1 | public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable |
部分源码分析
下面说的非阻塞/阻塞操作都是指已经获取锁后的操作
变量
final Object[] items; // 存放元素的数组
int takeIndex; // 队头元素位置
int putIndex; // 队尾元素位置
int count; // 队列长度
public ArrayBlockingQueue(int capacity)
构造函数必须指定容量,并且容量后期不可扩充,如果队列满了以后会直接阻塞当前线程
1 | public ArrayBlockingQueue(int capacity) { |
public boolean add(E e)
非阻塞的入队方法,如果返回false
则代表添加失败,父类会调用子类的offer(e)
1 | public boolean add(E e) { |
public boolean offer(E e, long timeout, TimeUnit unit)
如果当前队列已经满了,则等待timeout
,单位是unit
,超时以后如果当前队列还是满的,则返回false
1 | public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { |
public boolean offer(E e)
以阻塞的形式入队,如果当前队列已满则阻塞当前线程,并且采用可中断锁。
1 | public void put(E e) throws InterruptedException { |
public E poll()
非阻塞的出队操作,如果当前队列长度为0则返回false
,否则返回false
1 | public E poll() { |
public E take() throws InterruptedException
以阻塞形式出队,如果当前队列长度为0,则阻塞当前线程
1 | public E take() throws InterruptedException { |
public E poll(long timeout, TimeUnit unit) throws InterruptedException
以超时的形式出队,如果当前队列长度为0,则阻塞当前线程一段时间,超时以后如果当前队列仍然没有元素,则返回false,否则正常出队。
1 | public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
public E peek()
获取队头元素
1 | public E peek() { |
public int size()
获取队列中的元素数量,获取数量前加锁,保证获取到的数量是最新的
个人认为:去掉lock,用volatile修饰count也可以获取最新的正确的元素数量,因为每次对count的修改最多只是一个线程。去掉lock以后,保证每次读取count是最新的即可。
1 | public int size() { |
public int remainingCapacity()
获取队列的剩余空间(还可以存放多少元素),不能通过判断剩余空间数量来决定是否插入,因为判断 -> 插入是非原子操作。
1 | public int remainingCapacity() { |
public boolean remove(Object o)
移除队列中的元素
1 | public boolean remove(Object o) { |
CopyOnWriteArrayList
1 | public class CopyOnWriteArrayList<E> |
CopyOnWriteArrayList
是一个线程安全的List
,顾名思义,写时复制。当对数组做修改操作时,会加锁。比如添加元素时,会先将原数组元素拷贝到一个新的更大的数组中,将元素添加到新数组中,然后将原数组引用指向新数组。又因为数组由volatile
修饰,所以对原数组引用的修改对其它线程是可见的。相对于Vector
效率更高。
部分源码分析
public boolean add(E e)
当添加元素时,需要加锁保证线程安全,修改完以后将原数组引用指向新数组。
1 | public boolean add(E e) { |
public E set(int index, E element)
修改指定下标的元素。当修改元素时,需要加锁保证线程安全。修改完以后将原数组引用指向新数组。
1 | public E set(int index, E element) { |
public E get(int index)
获取指定下标的元素。因为数组由volatile
修饰,所以保证其它线程访问数组时都是最新的。
private transient volatile Object[] array;
1 | public E get(int index) { |
public E remove(int index)
删除指定下标的元素。先获取锁。分删除最后一个元素还是中间元素。当删除最后一个元素,将原数组的将n-1个元素拷贝到新数组中,然后将原数组引用指向新数组即可。当删除的是中间元素。分别将原数组中间元素的左侧元素和右侧元素拷贝到新数组。
1 | public E remove(int index) { |
Q&A
CopyOnWriteArrayList 和 Vector区别?
Vector是对每个方法加锁,导致并发效率低,而COWL只对修改操作进行了加锁操作,只读操作并没有加锁。很适合读多写少的应用。