映射 Map
Map最核心的功能, 是根据Key查找值. 因此一个Map中. 不能拥有重复的Key.
- key : 是
Set
类型 - value : 是
Collection
类型
每个Map要求实现Set<Map.Entry<K, V>> entrySet()
接口, 可见Entry在Map中是以集合的形式存在(不能重复), 但是同样不能说Map是存储Entry的集合(Set), 这是因为, Map并没有要求一定要将Entry存储下来, 可以在entrySet
中动态计算Entry的集合.
把Map看作容器是片面的. 因为Map的核心能力, 不是提供容器而是提供快速的数据查找能力. 那本质是映射, 根据不同的Map实现, 可能会把Entry
存储到某个容器当中. 也可能将Key和Value存储到两个不同的容器当中.
Entry<K, V>
是一个 key-value
集合, 不是容器, 是一种映射关系. keySet(), values() 方法, 可以知道, key不能重复, 而value是可以重复的. Entry
像是描述映射数据.
HashTable
是线程安全的, 而 HashMap
不是. HashTable
不允许 null
的.
LinkedHashMap
也是基于 Hash
的, 不过有个显著的特定, 根据添加的时间, 形成链表. 所以可以用来实现 LRUCache
, 使用队列, 实现Iterable
接口
LRU Least Recently Use 最近最久未使用.
LinkedHashMap
同时具有哈希表和链表的特征. 哈希表可以帮助利用Key
存储缓存条目, 链表帮助实现LRU算法.
Map 的本质是 映射. HashMap 就是用 Hash 表实现的 Map. TreeSet 是使用 树结构 实现的集合, 不可重复的容器.
思考 : 为什么简单的知识却很难理解到本质.
树可以用来构建索引, 也可以用来构建存储.
总结:
但是归根结底, 其实只有两类. 一类就是存储数据的容器(CollectionIterable
. 但Map的本质不是容器, 而是映射.
HashMap
JDK8之前 HashMap: 数组 + 链表
数组默认长度是 16, 每个元素存储的就是链表的头节点, hash(key.hashCode())%len
.
有一种极端的情况, 每次hash的值都一样, 被分配到一个 bucket
中, 这样会使得某一链表长度很长, 由于链表的查询需要从头部开始逐个遍历, 因此在最恶劣的情况下, 性能恶化: 从 O(1)
变成 O(n)
.
JDK8之后 HashMap : 数组 + 链表 + 红黑树
有一个TREEIFY_THRESHOLD
常量(默认是8)来判断是否要转为红黑树(树化).
性能从 O(n)
-> O(logn)
.
HashMap 的 put 逻辑
- 如果
HashMap
未被初始化过, 则初始化(懒加载) - 对
Key
求Hash
值, 然后再计算下标 - 如果没有碰撞, 直接放入桶中
- 如果碰撞了, 以链表的方式链接到后面
- 如果链表长度超过阈值, 就把链表转成红黑树
- 如果链表长度低于6, 就把红黑树转回链表
- 如果节点已经存在就替换旧值
- 如果桶满了(容量16*加载因子0.75), 就需要
resize
(扩容2倍后重排)
// HashMap 部分源码注释
public class HashMap<K,V> extends AbstractMap<K,V> implements Map<K,V>, Cloneable, Serializable {
// 时间和空间成本的权衡 决定了扩容因子是 0.75 决定了泊松分布的参数λ是0.5
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// bucket 大于8的时候就会转换为 红黑树 由泊松分布计算出结果为8时,概率为亿分之六,决定了树化节点为8
static final int TREEIFY_THRESHOLD = 8;
// 当低于6的时候, 又会转回 链表
static final int UNTREEIFY_THRESHOLD = 6;
// 从构造函数可以看出, hashmap 使用的是懒加载
public HashMap() {
this.loadFactor = DEFAULT_LOAD_FACTOR; // all other fields defaulted
}
// put 主要逻辑方法
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
// 1. 如果没有初始化, 就初始化 table -- resize()这个方法同时具备扩容的能力
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
// 2. 对key(在传入的时候已经进行hash了), 然后再计算下标 (n - 1) & hash,本质是取模
// 3. 如果没有碰撞, 直接放入桶中
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
// 已经存在键值对, 且和传入的键是一致的, 直接替换数组的元素.
e = p;
else if (p instanceof TreeNode)
// 已经树化
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {
// 4. 如果碰撞了, 以链表的方式链接到后面
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
// 5. 判断链表是否超过了阈值, 如果超过了阈值, 转换为 红黑树
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
if (e != null) { // existing mapping for key 值已经存在
V oldValue = e.value;
// 7. 如果节点已经存在, 就替换旧值
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
// 8. 如果桶到了阈值(16*0.75), 就进行扩容, 扩容2倍后重排
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}
}
有效减少碰撞
- 扰动函数 : 促使元素位置分布均匀, 减少碰撞机率. 如果两个不相等的对象, 返回不同的
hashCode
, 或者说元素位置尽量地分布均匀些, 那么碰撞的机率就会少一些. - 使用
final
对象, 并采用合适的equals()
和hashCode()
方法. 不可变性, 使得可以缓存不同key
的hashCode
, 这将提高获取对象的速度, 使用String
,Integer
这样的wrapper
类作为key
, 是非常好的选择. 因为String
是final的, 而且已经重写了equals
和hashCode
方法.
// hash值计算的方法
static final int hash(Object key) {
int h;
// 先获取对象hashCode; 再将高位移到低位, 无符号右移动16位(其实是去除低位); 再与自己进行 异或 运算.
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
从获取hash
到散列的过程, 最后计算下标, 用与运算.
int 的范围是 -2147483648 ~ 2147483648, 只要hash
映射均匀, 是很难出现碰撞的, 但是一个40亿长度的数组放入内存, 显然是不现实的, 况且, hashmap在扩容之前默认容量是16, 所以直接拿来用是不现实的.
计算Hash: 混合原始hashCode
的高位和低位, 以此来加大低位的随机性, 而混合后的低位混合了高位的部分特征, 这样高位信息也被变相保存了. 主要是从速度, 功效, 质量来考虑的, 这样可以在数组 table 长度比较短的时候, 也能保证考虑到 高低位 都参与了hash
的运算当中, 同时也不会有太大的开销. 简单来说, 就是让散列更加均匀的做法.
最后计算下标: (n-1) & hash
(n是数组的长度), 位与的操作 取代 取模, 这样可以达到更高的效率, 而之所以能够用这样的方法取替换取模操作, 是因为 HashMap 底层数组的长度总是 2 的 n 次方.
构造时, 不是传入的容量是多少就是多少的, 最后会被换算成 2^n^ 的. 主要是为了用 与操作 替代 取模(位运算比取余运算更快)。
public HashMap(int initialCapacity, float loadFactor) {
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal initial capacity: " +
initialCapacity);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
if (loadFactor <= 0 || Float.isNaN(loadFactor))
throw new IllegalArgumentException("Illegal load factor: " +
loadFactor);
this.loadFactor = loadFactor;
this.threshold = tableSizeFor(initialCapacity);
}
/**
* Returns a power of two size for the given target capacity.
*/
static final int tableSizeFor(int cap) {
// 转成离 cap 最近的 2的n次方 的值
int n = -1 >>> Integer.numberOfLeadingZeros(cap - 1);
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
HashMap 扩容
用 新的数组 代替 旧的数组. 默认的 负载因子 是 0.75. 是非常符合通用场景需求的(泊松分布). 当 HashMap
填满了 75% 的 bucket
的时候, 将会创建原来 2倍
大小的数组, 并将原来的数据放入新的 bucket
中.
final Node<K,V>[] resize() {
Node<K,V>[] oldTab = table;
int oldCap = (oldTab == null) ? 0 : oldTab.length;
int oldThr = threshold;
int newCap, newThr = 0;
if (oldCap > 0) {
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
// 扩容 2倍
newThr = oldThr << 1; // double threshold 左移
}
else if (oldThr > 0) // initial capacity was placed in threshold
newCap = oldThr;
else { // zero initial threshold signifies using defaults
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
if (newThr == 0) {
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
threshold = newThr;
@SuppressWarnings({"rawtypes","unchecked"})
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab;
if (oldTab != null) {
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
if (e.next == null)
// 重新计算下标后放入新的数组
newTab[e.hash & (newCap - 1)] = e;
else if (e instanceof TreeNode)
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else { // preserve order
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}
扩容问题
- 多线程环境下, 调整大小存在条件竞争, 容易造成死锁.
ReHashing
是一个比较耗时的操作.
HashMap 总结
- 成员变量 : 数据结构, 树化阈值
- 构造函数 : 延迟创建
put
和get
的流程- 哈希算法, 扩容, 性能
ConcurrentHashMap
如何优化HashTable? 通过锁细粒度化, 将整锁拆解成多个锁进行优化
早期的 ConcurrentHashMap
: 通过分段锁 Segment 来实现.
分成16个 segment
, 理论上比 hashmap
提高了 16 倍. 就是将 HashMap 数组, 逻辑上拆分成多个子数组, 每个子数组配置一把锁, 线程在获取到某把分段锁的时候, 上图是获取到编号为7的segment
, 才能操作这个字数组, 其它线程就会被阻塞, 如果是其它未被占用的segment
, 是不会被阻塞的.
当前的 ConcurrentHashMap
: CAS + synchronized 使锁更细化.
synchronized
只锁定当前链表或红黑树的头节点, 这样只要 Hash
不冲突, 就不会产生并发.
和 HashMap
很多相似的地方, 有一个特别的参数sizeCtl
.
比起 Segment
, 锁拆分得更细.
- 首先使用无锁操作 CAS 插入头节点, 失败则循环重试
- 若头节点已经存在, 则尝试获取头节点的同步锁, 再进行操作
ConcurrentHashMap 的 put 方法逻辑
- 判断
Node[]
数组是否初始化, 没有则进行初始化操作 - 通过
hash
定位数组的索引坐标, 是否有Node
节点, 如果没有则使用CAS
进行添加(链表的头节点), 添加失败则进入下次循环 - 检测到如果内部正在扩容, 则帮助它一起扩容
- 如果
f!=null
, 则使用synchronized
锁住 f 元素(链表/红黑二叉树的头节点)- 如果是
Node(链表结构)
则执行链表的添加操作 - 如果是
TreeNode(树型结构)
则执行树添加操作 - 如果是
ReservationNode
就抛出异常, 这个和 本地缓存 相关
- 如果是
- 判断链表长度是否达到临界值8(默认值, 可以做调整), 当节点数超过这个值就需要把链表转换为树结构.
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
// 用来做大小控制的标识符. 是 hashmap 扩容或初始化时一个控制位表示量.
// -1 代表正在初始化
// -n 表示有 n-1 个线程正在进行扩容操作
// 如果表还没有创建, 就是默认值或者是0; 如果创建完成了, 保存的就是下一个要扩容的大小
private transient volatile int sizeCtl;
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 不允许插入空键值
if (key == null || value == null) throw new NullPointerException();
// 计算 hash
int hash = spread(key.hashCode());
int binCount = 0;
// 对数组的更新操作, 是使用 CAS 方式的, 所以循环
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)
// 为空, 进行初始化
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// f 表示链表或者红黑二叉树, 如果没有, 尝试使用 CAS 进行添加
// 如果添加失败, 就进入下一次循环
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break; // no lock when adding to empty bin
}
// 如果已经存在, 并且被其它线程操作, 移动扩容中
else if ((fh = f.hash) == MOVED)
// 协助其扩容
tab = helpTransfer(tab, f);
else if (onlyIfAbsent // check first node without acquiring lock
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
else {
// 发生了 hash 碰撞, 就是说头节点已经存在了
V oldVal = null;
// 获取锁再进行操作
synchronized (f) {
if (tabAt(tab, i) == f) {
// 如果是链表
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
// 如果是树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
// ReservationNode 是一个保留节点, 是一个占位符, 不会保存实际的数据
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
}
需要注意的点
size()
和mappingCount()
方法的异同, 两者计算是否准确- 多线程环境下如何进行扩容
HashMap, HashTable, ConcurrentHashMap
HashMap
线程不安全, 数组 + 链表 + 红黑树,允许为nullHashTable
线程安全, 锁住整个对象, 数组 + 链表,不允许为nullConcurrentHashMap
线程安全, 通过对锁细粒度化提升性能, CAS + 同步锁, 数组 + 链表 + 红黑树,不允许为nullHashMap
的key, value
均可为null
, 而其他两个类不支持
对于并发的结构,不能为null,因为在并发的操作下,如果放入null,就无法通过null来判断原来是否有这个key,而非并发的结构,就不会出现这个问题。