分布式锁
锁在JAVA中是一个非常重要的概念, 尤其是在当今的互联网时代, 高并发的场景下, 更是离不开锁. 在计算机科学中, 锁(lock)或互斥(mutual)同步机制, 用于在有许多执行线程的环境中强制对资源的访问限制. 锁旨在强制实施互斥排他, 并发控制策略.
- 数据库
- memcached(add命令)
- redis(setnx命令)
- zookeeper(临时节点)
乐观锁与悲观锁
悲观锁假定会发⽣冲突, 访问的时候都要先获得锁, 保证同⼀个时刻只有一个线程获得锁, 读也会阻塞;乐观锁假设不会发⽣冲突, 只有在提交操作的时候检查是否有冲突.
Java 乐观锁通过 CAS 实现, 悲观锁通过 synchronize 实现. MySQL 通过 MVCC, 也就是版本实现, 悲观锁可以通过 select… for update 加上排它锁.
CAS 是乐观锁, 不需要阻塞, 硬件级别实现的原⼦性; synchronize 会阻塞, JVM 级别实现的原⼦性. 使⽤场景不同, 线程冲突严重时 CAS 会造成CPU压力过大, 导致吞吐量下降, synchronize 的原理是先自旋然后阻塞, 线程冲突严重时仍然有较⾼的吞吐量, 因为线程都被阻塞了, 不会占⽤CPU.
通过锁解决电商中的超卖问题
现象:某商品10件, 结果卖出了15件.
商品卖出的数量超出了商品库存的数量.
产生原因:多个线程同时对数据库进行操作, 导致一个库存产生多个订单.
解决方案:扣减库存不再程序中进行, 而在数据库中操作, 向数据库发送库存增量, 扣减一个库存, 增量为-1. 在数据库中使用update语句计算库存, 通过update行锁解决并发.
行锁:在update时会对数据有加锁, 只有等加锁的语句执行完才释放.
这样还是会发生库存变成负数的现象, 由于并发判断库存数, 造成有剩余库存的假象.
使用锁解决. 校验库存, 扣减库存统一加锁, 使之成为原子性的操作. 这样只有获得锁的线程才能进行库存校验和库存扣减.
- 方法锁. 基于 synchronized 锁解决问题, 最原始的方案. 还是有可能出现库存是-1的现象, 因为事务没有提交. 所以要手动控制事务, 把事务也锁起来.
- 块锁.
- ReentrantLock. 并发包中的锁.
块锁
// 当前的 OrderService 是一个单例
// 只有获取到 this(OrderService) 的线程才能执行里面的代码
synchronized (this){
}
// 只有获取到 object(OrderService) 的线程才能执行里面的代码
synchronized (object) {
}
// 类锁. 对象是通过new出来的, 上面的方式, 是锁对象, 而类只有一个.
synchronized (OrderService.class) {
}
单体应用的局限性, 不能跨 JVM. 单点故障, 集群.
基于数据库悲观锁的分布式锁
- 多个进程, 多个线程可以共同访问的数据库组件
- 通过 select … for update 访问同一条数据(行锁)
- for update 锁住数据, 其他线程只能等待
- 关闭事务自动提交
set autocommit = 0
。
优缺点:
- 简单方便, 易于理解, 易于操作
- 并发量大时, 对数据库压力大
建议:
作为锁的数据库与业务的数据库分开。
Redis 分布式锁
实现原理
基于 Redis 的命令. 利用 NX 的原子性, 多个线程并发了, 只有一个可以成功. 设置成功获取锁, 可以继续后面的业务代码.
Redis是单线程的, 并行转变为串行.
set resource_name my_random_value NX PX 30000
- resource_name 资源名称, 可根据不同的业务区分不同的锁
- my_random_value 随机值, 每个线程的的随机值都不一样, 用于释放锁时的校验
- NX key不存在时设置成功, key存在则设置不成功
- PX 自动失效时间, 出现异常情况, 锁可以过期失效
释放锁使用 Redis
的 delete 命令。释放锁时要校验之前设置的随机数,相同才释放。同时要使用 LUA
脚本(单线程)以保证操作是原子性的,如果不是的话,释放锁时有可能误删其他线程的锁。
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
实现代码
package org.lgq.lock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.core.types.Expiration;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
/**
* @author DevLGQ
* @version 1.0
*/
public class RedisLock implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(RedisLock.class);
private RedisTemplate redisTemplate;
private String key;
private String value;
/**
* 过期时间,单位:秒
*/
private int expireTime;
public RedisLock(RedisTemplate redisTemplate, String key, int expireTime) {
this.redisTemplate = redisTemplate;
this.key = key;
this.value = UUID.randomUUID().toString();
this.expireTime = expireTime;
}
/**
* 获取分布式锁
*
* @return
*/
public boolean getLock() {
RedisCallback<Boolean> redisCallback = connection -> {
// 设置NX
RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();
// 设置过期时间
Expiration expiration = Expiration.seconds(expireTime);
// byte[] redisKey = key.getBytes(); 不建议这么做
// 序列化key
byte[] redisKey = this.redisTemplate.getKeySerializer().serialize(key);
// 序列化value
byte[] redisVal = this.redisTemplate.getValueSerializer().serialize(value);
// 执行setnx操作
return connection.set(redisKey, redisVal, expiration, setOption);
};
// 获取分布式锁
return (Boolean) this.redisTemplate.execute(redisCallback);
}
/**
* 解锁
*
* @return
*/
private boolean unLock() {
// 不能使用以下方式释放锁,要使用lua脚本来保证原子性
// final String val = (String) this.redisTemplate.opsForValue().get(key);
// if (val != null) {
// 如果在判断完之后,线程失去了cpu时间,而其他线程获取到锁了,再次获取到cpu时间的时候,就会把其他线程的锁给释放掉!
// if (val.equals(value)) {
// return this.redisTemplate.delete(key);
// } else {
// return false;
// }
// }
// 释放锁 使用lua脚本
String luaStr = "if redis.call(\"get\", KEYS[1]) == ARGV[1] then\n" +
" return redis.call(\"del\", KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
RedisScript<Boolean> redisScript = RedisScript.of(luaStr, Boolean.class);
List<String> keys = Collections.singletonList(key);
return (Boolean) this.redisTemplate.execute(redisScript, keys, value);
}
@Override
public void close() throws Exception {
log.info("释放锁的结果:{}", this.unLock());
}
}
定时任务部署集群
定时任务重复执行问题, 使用分布式锁解决.
Zookeeper 分布式锁
Zookeeper 数据结构
- 红色:持久节点
- 黄色:瞬时节点, 有序
瞬时节点不可再有子节点, 会话结束后瞬时节点自动消失.
Zookeeper 创建节点
./zkCli.sh -server 192.168.123.26:2181
# 创建节点
create /lock distribute-lock
Zookeeper 分布式锁原理
zookeeper 观察器, 检测某个节点的状态.
主要的三个方法:getData(); getChild(); exists();
节点数据发生变化时, 发送给客户端.
监控器只能监控一次, 再次监控需要重新设置.
- 利用 zookeeper 瞬时节点的有序性.
- 多线程并发创建瞬时节点时, 得到有序的序列.
- 序号最小的获得到锁.
- 其他线程监听自己序号的前一个序号.
代码实现
package org.lgq.lock;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
/**
* 基于 ZooKeeper 的分布式锁
*
* @author DevLGQ
* @version 1.0
*/
public class ZkLock implements AutoCloseable, Watcher {
private static final Logger log = LoggerFactory.getLogger(ZkLock.class);
private final ZooKeeper zooKeeper;
private String zNode;
public ZkLock() throws IOException {
this.zooKeeper = new ZooKeeper("192.168.123.26:2181", 10000, this);
}
public boolean getLock(String businessCode) {
try {
Stat rootStat = this.zooKeeper.exists("/" + businessCode, false);
// 创建业务根节点
if (rootStat == null) {
this.zooKeeper.create("/" + businessCode, businessCode.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 创建瞬时有序节点 e.g. /order/order_auto_serial_number
// 多个访问的时候, 不会报错, 而是按顺序来创建, 会在后面添加序号
zNode = this.zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 获取业务下所有子节点
List<String> childrenNodes = this.zooKeeper.getChildren("/" + businessCode, false);
// 升序
Collections.sort(childrenNodes);
String firstNode = childrenNodes.get(0);
// 如果创建的子节点是第一个节点,则获得锁
if (zNode.endsWith(firstNode)) {
return true;
}
// 不是第一个节点,则监听前一个节点
String preNode = firstNode;
for (String node : childrenNodes) {
if (zNode.endsWith(node)) {
// 监听前一个节点 最后一个参数是true
this.zooKeeper.exists("/" + businessCode + "/" + preNode, true);
break;
} else {
preNode = node;
}
}
synchronized (this) {
wait();
}
return true;
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
return false;
}
@Override
public void close() throws Exception {
this.zooKeeper.delete(zNode, -1);
this.zooKeeper.close();
log.info("释放锁");
}
@Override
public void process(WatchedEvent event) {
// 监听节点删除事件
if (event.getType().equals(Event.EventType.NodeDeleted)) {
synchronized (this){
notify();
}
}
}
}
基于 Curator 客户端实现分布式锁
- 引入Curator客户端
- curator已经实现分布式锁的方法
引入依赖
<!-- curator -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>
使用
@RequestMapping("/curatorLock")
public String curatorLock() throws Exception {
log.info("进入方法");
InterProcessMutex lock = new InterProcessMutex(client, "/order");
if (lock.acquire(30, TimeUnit.SECONDS)) {
try {
// do some work inside of the critical section here
log.info("我获得了锁!");
Thread.sleep(10000);
} finally {
lock.release();
log.info("释放锁");
}
}
log.info("方法执行完成");
return "方法执行完成";
}
基于 Redisson 分布式锁
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid).
- Redisson提供了使用Redis的最简单和最便捷的方法
- 开发人员不需过分关注Redis, 集中精力关注业务即可
- 基于Redis, 提供了在Java中具有分布式特性的工具类
- 使Java中的并发工具包获得了协调多机多线程并发的能力
引入依赖
<!-- Redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.14.0</version>
</dependency>
使用
package org.lgq.controller;
//...
/**
* @author DevLGQ
* @version 1.0
*/
@RestController
public class RedissonController {
private static final Logger log = LoggerFactory.getLogger(RedissonController.class);
@Autowired
RedissonClient redissonClient;
@RequestMapping("/redissonLock")
public String redissonLock(){
log.info("进入方法");
RLock rLock = redissonClient.getLock("order");
rLock.lock(30, TimeUnit.SECONDS);
try {
log.info("获取锁");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rLock.unlock();
log.info("释放锁");
}
log.info("方法执行完毕");
return "方法执行完毕";
}
}
分布式锁对比
方式 | 优点 | 缺点 |
---|---|---|
数据库 | 实现简单, 易于理解 | 对数据库压力大 |
Redis | 易于理解 | 自己实现, 不支持阻塞 |
Zookeeper | 支持阻塞 | 需要理解Zookeeper, 程序复杂 |
Curator | 提供锁的方法 | 依赖Zookeeper, 强一致 |
Redisson | 提供锁的方法, 可阻塞 |