抱歉,您的浏览器无法访问本站

本页面需要浏览器支持(启用)JavaScript


了解详情 >

分布式锁

锁在JAVA中是一个非常重要的概念, 尤其是在当今的互联网时代, 高并发的场景下, 更是离不开锁. 在计算机科学中, 锁(lock)或互斥(mutual)同步机制, 用于在有许多执行线程的环境中强制对资源的访问限制. 锁旨在强制实施互斥排他, 并发控制策略.

  • 数据库
  • memcached(add命令)
  • redis(setnx命令)
  • zookeeper(临时节点)

乐观锁与悲观锁

悲观锁假定会发⽣冲突, 访问的时候都要先获得锁, 保证同⼀个时刻只有一个线程获得锁, 读也会阻塞;乐观锁假设不会发⽣冲突, 只有在提交操作的时候检查是否有冲突.

Java 乐观锁通过 CAS 实现, 悲观锁通过 synchronize 实现. MySQL 通过 MVCC, 也就是版本实现, 悲观锁可以通过 select… for update 加上排它锁.

CAS 是乐观锁, 不需要阻塞, 硬件级别实现的原⼦性; synchronize 会阻塞, JVM 级别实现的原⼦性. 使⽤场景不同, 线程冲突严重时 CAS 会造成CPU压力过大, 导致吞吐量下降, synchronize 的原理是先自旋然后阻塞, 线程冲突严重时仍然有较⾼的吞吐量, 因为线程都被阻塞了, 不会占⽤CPU.

通过锁解决电商中的超卖问题

现象:某商品10件, 结果卖出了15件.

商品卖出的数量超出了商品库存的数量.

产生原因:多个线程同时对数据库进行操作, 导致一个库存产生多个订单.

解决方案:扣减库存不再程序中进行, 而在数据库中操作, 向数据库发送库存增量, 扣减一个库存, 增量为-1. 在数据库中使用update语句计算库存, 通过update行锁解决并发.

行锁:在update时会对数据有加锁, 只有等加锁的语句执行完才释放.

这样还是会发生库存变成负数的现象, 由于并发判断库存数, 造成有剩余库存的假象.

使用锁解决. 校验库存, 扣减库存统一加锁, 使之成为原子性的操作. 这样只有获得锁的线程才能进行库存校验和库存扣减.

  • 方法锁. 基于 synchronized 锁解决问题, 最原始的方案. 还是有可能出现库存是-1的现象, 因为事务没有提交. 所以要手动控制事务, 把事务也锁起来.
  • 块锁.
  • ReentrantLock. 并发包中的锁.

块锁

// 当前的 OrderService 是一个单例
// 只有获取到 this(OrderService) 的线程才能执行里面的代码
synchronized (this){
}
//  只有获取到 object(OrderService) 的线程才能执行里面的代码
synchronized (object) {
}
// 类锁. 对象是通过new出来的, 上面的方式, 是锁对象, 而类只有一个. 
synchronized (OrderService.class) {
}

单体应用的局限性, 不能跨 JVM. 单点故障, 集群.

基于数据库悲观锁的分布式锁

  • 多个进程, 多个线程可以共同访问的数据库组件
  • 通过 select … for update 访问同一条数据(行锁)
  • for update 锁住数据, 其他线程只能等待
  • 关闭事务自动提交set autocommit = 0

优缺点:

  • 简单方便, 易于理解, 易于操作
  • 并发量大时, 对数据库压力大

建议:

作为锁的数据库与业务的数据库分开。

Redis 分布式锁

实现原理

基于 Redis 的命令. 利用 NX 的原子性, 多个线程并发了, 只有一个可以成功. 设置成功获取锁, 可以继续后面的业务代码.

Redis是单线程的, 并行转变为串行.

set resource_name my_random_value NX PX 30000

  • resource_name 资源名称, 可根据不同的业务区分不同的锁
  • my_random_value 随机值, 每个线程的的随机值都不一样, 用于释放锁时的校验
  • NX key不存在时设置成功, key存在则设置不成功
  • PX 自动失效时间, 出现异常情况, 锁可以过期失效

释放锁使用 Redis 的 delete 命令。释放锁时要校验之前设置的随机数,相同才释放。同时要使用 LUA 脚本(单线程)以保证操作是原子性的,如果不是的话,释放锁时有可能误删其他线程的锁。

if redis.call("get", KEYS[1]) == ARGV[1] then
    return redis.call("del", KEYS[1])
else
    return 0
end

实现代码

package org.lgq.lock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.core.types.Expiration;

import java.util.Collections;
import java.util.List;
import java.util.UUID;

/**
 * @author DevLGQ
 * @version 1.0
 */
public class RedisLock implements AutoCloseable {

    private static final Logger log = LoggerFactory.getLogger(RedisLock.class);
    private RedisTemplate redisTemplate;
    private String key;
    private String value;
    /**
     * 过期时间,单位:秒
     */
    private int expireTime;

    public RedisLock(RedisTemplate redisTemplate, String key, int expireTime) {
        this.redisTemplate = redisTemplate;
        this.key = key;
        this.value = UUID.randomUUID().toString();
        this.expireTime = expireTime;
    }

    /**
     * 获取分布式锁
     *
     * @return
     */
    public boolean getLock() {
        RedisCallback<Boolean> redisCallback = connection -> {
            // 设置NX
            RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();
            // 设置过期时间
            Expiration expiration = Expiration.seconds(expireTime);
            // byte[] redisKey = key.getBytes(); 不建议这么做
            // 序列化key
            byte[] redisKey = this.redisTemplate.getKeySerializer().serialize(key);
            // 序列化value
            byte[] redisVal = this.redisTemplate.getValueSerializer().serialize(value);
            // 执行setnx操作
            return connection.set(redisKey, redisVal, expiration, setOption);
        };
        // 获取分布式锁
        return (Boolean) this.redisTemplate.execute(redisCallback);
    }

    /**
     * 解锁
     *
     * @return
     */
    private boolean unLock() {
    // 不能使用以下方式释放锁,要使用lua脚本来保证原子性
//        final String val = (String) this.redisTemplate.opsForValue().get(key);
//        if (val != null) {
// 如果在判断完之后,线程失去了cpu时间,而其他线程获取到锁了,再次获取到cpu时间的时候,就会把其他线程的锁给释放掉!
//            if (val.equals(value)) {
//                return this.redisTemplate.delete(key);
//            } else {
//                return false;
//            }
//        }
        // 释放锁 使用lua脚本
        String luaStr = "if redis.call(\"get\", KEYS[1]) == ARGV[1] then\n" +
                "    return redis.call(\"del\", KEYS[1])\n" +
                "else\n" +
                "    return 0\n" +
                "end";
        RedisScript<Boolean> redisScript = RedisScript.of(luaStr, Boolean.class);
        List<String> keys = Collections.singletonList(key);
        return (Boolean) this.redisTemplate.execute(redisScript, keys, value);
    }

    @Override
    public void close() throws Exception {
        log.info("释放锁的结果:{}", this.unLock());
    }

}

定时任务部署集群

定时任务重复执行问题, 使用分布式锁解决.

Zookeeper 分布式锁

Zookeeper 数据结构

  • 红色:持久节点
  • 黄色:瞬时节点, 有序

瞬时节点不可再有子节点, 会话结束后瞬时节点自动消失.

Zookeeper 创建节点

./zkCli.sh -server 192.168.123.26:2181
# 创建节点
create /lock distribute-lock

Zookeeper 分布式锁原理

zookeeper 观察器, 检测某个节点的状态.

主要的三个方法:getData(); getChild(); exists();

节点数据发生变化时, 发送给客户端.

监控器只能监控一次, 再次监控需要重新设置.

  • 利用 zookeeper 瞬时节点的有序性.
  • 多线程并发创建瞬时节点时, 得到有序的序列.
  • 序号最小的获得到锁.
  • 其他线程监听自己序号的前一个序号.

代码实现

package org.lgq.lock;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
 * 基于 ZooKeeper 的分布式锁
 *
 * @author DevLGQ
 * @version 1.0
 */
public class ZkLock implements AutoCloseable, Watcher {

    private static final Logger log = LoggerFactory.getLogger(ZkLock.class);
    private final ZooKeeper zooKeeper;
    private String zNode;

    public ZkLock() throws IOException {
        this.zooKeeper = new ZooKeeper("192.168.123.26:2181", 10000, this);
    }

    public boolean getLock(String businessCode) {
        try {
            Stat rootStat = this.zooKeeper.exists("/" + businessCode, false);
            // 创建业务根节点
            if (rootStat == null) {
                this.zooKeeper.create("/" + businessCode, businessCode.getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            // 创建瞬时有序节点 e.g. /order/order_auto_serial_number
            // 多个访问的时候, 不会报错, 而是按顺序来创建, 会在后面添加序号
            zNode = this.zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            // 获取业务下所有子节点
            List<String> childrenNodes = this.zooKeeper.getChildren("/" + businessCode, false);
            // 升序
            Collections.sort(childrenNodes);
            String firstNode = childrenNodes.get(0);
            // 如果创建的子节点是第一个节点,则获得锁
            if (zNode.endsWith(firstNode)) {
                return true;
            }
            // 不是第一个节点,则监听前一个节点
            String preNode = firstNode;
            for (String node : childrenNodes) {
                if (zNode.endsWith(node)) {
                    // 监听前一个节点 最后一个参数是true
                    this.zooKeeper.exists("/" + businessCode + "/" + preNode, true);
                    break;
                } else {
                    preNode = node;
                }
            }
            synchronized (this) {
                wait();
            }
            return true;
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
        return false;
    }

    @Override
    public void close() throws Exception {
        this.zooKeeper.delete(zNode, -1);
        this.zooKeeper.close();
        log.info("释放锁");
    }

    @Override
    public void process(WatchedEvent event) {
        // 监听节点删除事件
        if (event.getType().equals(Event.EventType.NodeDeleted)) {
            synchronized (this){
                notify();
            }
        }
    }
}

基于 Curator 客户端实现分布式锁

  • 引入Curator客户端
  • curator已经实现分布式锁的方法

引入依赖

<!-- curator -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.1.0</version>
</dependency>

使用

@RequestMapping("/curatorLock")
public String curatorLock() throws Exception {
    log.info("进入方法");
    InterProcessMutex lock = new InterProcessMutex(client, "/order");
    if (lock.acquire(30, TimeUnit.SECONDS)) {
        try {
            // do some work inside of the critical section here
            log.info("我获得了锁!");
            Thread.sleep(10000);
        } finally {
            lock.release();
            log.info("释放锁");
        }
    }
    log.info("方法执行完成");
    return "方法执行完成";
}

基于 Redisson 分布式锁

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid).

  • Redisson提供了使用Redis的最简单和最便捷的方法
  • 开发人员不需过分关注Redis, 集中精力关注业务即可
  • 基于Redis, 提供了在Java中具有分布式特性的工具类
  • 使Java中的并发工具包获得了协调多机多线程并发的能力

引入依赖

<!-- Redisson -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.14.0</version>
</dependency>

使用

package org.lgq.controller;
//...

/**
 * @author DevLGQ
 * @version 1.0
 */
@RestController
public class RedissonController {

    private static final Logger log = LoggerFactory.getLogger(RedissonController.class);

    @Autowired
    RedissonClient redissonClient;

    @RequestMapping("/redissonLock")
    public String redissonLock(){
        log.info("进入方法");
        RLock rLock = redissonClient.getLock("order");
        rLock.lock(30, TimeUnit.SECONDS);
        try {
            log.info("获取锁");
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            rLock.unlock();
            log.info("释放锁");
        }
        log.info("方法执行完毕");
        return "方法执行完毕";
    }
}

分布式锁对比

方式 优点 缺点
数据库 实现简单, 易于理解 对数据库压力大
Redis 易于理解 自己实现, 不支持阻塞
Zookeeper 支持阻塞 需要理解Zookeeper, 程序复杂
Curator 提供锁的方法 依赖Zookeeper, 强一致
Redisson 提供锁的方法, 可阻塞

评论