• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    redis分布式锁之可重入锁的实现代码

    上篇redis实现的分布式锁,有一个问题,它不可重入。

    所谓不可重入锁,即若当前线程执行某个方法已经获取了该锁,那么在方法中尝试再次获取锁时,就会获取不到被阻塞。 同一个人拿一个锁 ,只能拿一次不能同时拿2次。

    1、什么是可重入锁?它有什么作用?

    可重入锁,也叫做递归锁,指的是在同一线程内,外层函数获得锁之后,内层递归函数仍然可以获取到该锁。 说白了就是同一个线程再次进入同样代码时,可以再次拿到该锁。 它的作用是:防止在同一线程中多次获取锁而导致死锁发生。

    2、那么java中谁实现了可重入锁了?

    在java的编程中synchronized 和 ReentrantLock都是可重入锁。我们可以参考ReentrantLock的代码

    3、基于ReentrantLock的可重入锁

    ReentrantLock,是一个可重入且独占式的锁,是一种递归无阻塞的同步锁。

    3.1、看个ReentrantLock的例子

    import lombok.extern.slf4j.Slf4j;
    import java.util.concurrent.locks.ReentrantLock;
    
    @Slf4j
    public class ReentrantLockDemo {
        //锁
        private static ReentrantLock lock =  new ReentrantLock();
        public void doSomething(int n){
            try{
                //进入递归第一件事:加锁
                lock.lock();
                log.info("--------lock()执行后,getState()的值:{} lock.isLocked():{}",lock.getHoldCount(),lock.isLocked());
                log.info("--------递归{}次--------",n);
                if(n=2){
                    this.doSomething(++n);
                }else{
                    return;
                }
            }finally {
                lock.unlock();
                log.info("--------unlock()执行后,getState()的值:{} lock.isLocked():{}",lock.getHoldCount(),lock.isLocked());
            }
        }
    
        public static void main(String[] args) {
            ReentrantLockDemo reentrantLockDemo=new ReentrantLockDemo();
            reentrantLockDemo.doSomething(1);
            log.info("执行完doSomething方法 是否还持有锁:{}",lock.isLocked());
        }
    
    }

    3.2、执行结果

    16:35:58.051 [main] INFO com.test.ReentrantLockDemo - --------lock()执行后,getState()的值:1 lock.isLocked():true
    16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------递归1次--------
    16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------lock()执行后,getState()的值:2 lock.isLocked():true
    16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------递归2次--------
    16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------lock()执行后,getState()的值:3 lock.isLocked():true
    16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------递归3次--------
    16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------unlock()执行后,getState()的值:2 lock.isLocked():true
    16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------unlock()执行后,getState()的值:1 lock.isLocked():true
    16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------unlock()执行后,getState()的值:0 lock.isLocked():false
    16:35:58.055 [main] INFO com.test.ReentrantLockDemo - 执行完doSomething方法 是否还持有锁:false

    3.3、 从上面栗子可以看出ReentrantLock是可重入锁,那么他是如何实现的了,我们看下源码就知道了

     final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                //先判断,c(state)是否等于0,如果等于0,说明没有线程持有锁
                if (c == 0) {
                    //通过cas方法把state的值0替换成1,替换成功说明加锁成功
                    if (compareAndSetState(0, acquires)) {
                        //如果加锁成功,设置持有锁的线程是当前线程
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {//判断当前持有锁的线程是否是当前线程
                    //如果是当前线程,则state值加acquires,代表了当前线程加锁了多少次
                    int nextc = c + acquires;
                    if (nextc  0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }

    ReentrantLock的加锁流程是:
    1,先判断是否有线程持有锁,没有加锁进行加锁
    2、如果加锁成功,则设置持有锁的线程是当前线程
    3、如果有线程持有了锁,则再去判断,是否是当前线程持有了锁
    4、如果是当前线程持有锁,则加锁数量(state)+1

    /**
             * 释放锁
             * @param releases
             * @return
             */
            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;//state-1 减加锁次数
                //如果持有锁的线程,不是当前线程,抛出异常
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
    
                boolean free = false;
                if (c == 0) {//如果c==0了说明当前线程,已经要释放锁了
                    free = true;
                    setExclusiveOwnerThread(null);//设置当前持有锁的线程为null
                }
                setState(c);//设置c的值
                return free;
            }

    看ReentrantLock的解锁代码我们知道,每次释放锁的时候都对state减1,
    当c值等于0的时候,说明锁重入次数也为0了,
    最终设置当前持有锁的线程为null,state也设置为0,锁就释放了。

    4、那么redis要怎么实现可重入的操作了?

    看ReentrantLock的源码我们知道,它是加锁成功了,记录了当前持有锁的线程,并通过一个int类型的数字,来记录了加锁次数。

    我们知道ReentrantLock的实现原理了,那么redis只要下面两个问题解决,就能实现重入锁了:
    1、怎么保存当前持有的线程
    2、加锁次数(重入了多少次),怎么记录维护

    4.1、第一个问题:怎么保存当前持有的线程

    1.上一篇文章我们用的是redis 的set命令存的是string类型,他能保存当前持有的线程吗?
    valus值我们可以保存当前线程的id来解决。
    2. 但是集群环境下我们线程id可能是重复了那怎么解决?
    项目在启动的生成一个全局进程id,使用进程id+线程id 那就是唯一的了

    4.2、第二个问题:加锁次数(重入了多少次),怎么记录维护

    他能记录下来加锁次数吗?
    如果valus值存的格式是:系进程id+线程id+加锁次数,那可以实现

    存没问题了,但是重入次数要怎么维护了, 它肯定要保证原子性的,能解决吗?
    好像用java代码或者lua脚本都没法解决,因为都是实现都需要两步来维护这个重入次数的

    5、我们已经知道SET是不支持重入锁的,但我们需要重入锁,怎么办呢?

    目前对于redis的重入锁业界还是有很多解决方案的,最流行的就是采用Redisson。

    6、什么是 Redisson?

    Redisson是Redis官方推荐的Java版的Redis客户端。 它基于Java实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。 它在网络通信上是基于NIO的Netty框架,保证网络通信的高性能。 在分布式锁的功能上,它提供了一系列的分布式锁;如:可重入锁(Reentrant Lock)、公平锁(Fair Lock、联锁(MultiLock)、 红锁(RedLock)、 读写锁(ReadWriteLock)等等。

    Redisson github地址

    7、Redisson的分布锁如何使用

    引入依赖包

    dependency>
       groupId>org.redisson/groupId>
       artifactId>redisson/artifactId>
       version>3.15.5/version>
    /dependency>  

    代码

    import lombok.extern.slf4j.Slf4j;
    import org.redisson.Redisson;
    import org.redisson.api.RLock;
    import org.redisson.api.RedissonClient;
    import org.redisson.config.Config;
    import org.redisson.config.SingleServerConfig;
    
    
    @Slf4j
    public class ReentrantLockDemo1 {
        //锁
        public static RLock lock;
    
        static {
            //Redisson需要的配置
            Config config = new Config();
            String node = "127.0.0.1:6379";//redis地址
            node = node.startsWith("redis://") ? node : "redis://" + node;
            SingleServerConfig serverConfig = config.useSingleServer()
                    .setAddress(node)
                    .setTimeout(3000)//超时时间
                    .setConnectionPoolSize(10)
                    .setConnectionMinimumIdleSize(10);
            //serverConfig.setPassword("123456");//设置redis密码
            // 创建RedissonClient客户端实例
            RedissonClient redissonClient = Redisson.create(config);
            //创建redisson的分布式锁
            RLock rLock = redissonClient.getLock("666");
            lock = rLock;
        }
        public void doSomething(int n){
            try{
                //进入递归第一件事:加锁
                lock.lock();
                log.info("--------lock()执行后,getState()的值:{} lock.isLocked():{}",lock.getHoldCount(),lock.isLocked());
                log.info("--------递归{}次--------",n);
                if(n=2){
                    this.doSomething(++n);
                }else{
                    return;
                }
            }finally {
                lock.unlock();
                log.info("--------unlock()执行后,getState()的值:{} lock.isLocked():{}",lock.getHoldCount(),lock.isLocked());
            }
        }
        public static void test(){
            log.info("--------------start---------------");
            ReentrantLockDemo1 reentrantLockDemo=new ReentrantLockDemo1();
            reentrantLockDemo.doSomething(1);
            log.info("执行完doSomething方法 是否还持有锁:{}",ReentrantLockDemo1.lock.isLocked());
            log.info("--------------end---------------");
        }
        public static void main(String[] args) {
            test();
        }
    }

    执行结果

    2021-05-23 22:49:01.322 INFO 69041 --- [nio-9090-exec-1] org.redisson.Version : Redisson 3.15.5
    2021-05-23 22:49:01.363 INFO 69041 --- [sson-netty-5-22] o.r.c.pool.MasterConnectionPool : 10 connections initialized for /127.0.0.1:6379
    2021-05-23 22:49:01.363 INFO 69041 --- [sson-netty-5-23] o.r.c.pool.MasterPubSubConnectionPool : 1 connections initialized for /127.0.0.1:6379
    2021-05-23 22:49:01.367 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------------start---------------
    2021-05-23 22:49:01.435 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------lock()执行后,getState()的值:1 lock.isLocked():true
    2021-05-23 22:49:01.436 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------递归1次--------
    2021-05-23 22:49:01.442 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------lock()执行后,getState()的值:2 lock.isLocked():true
    2021-05-23 22:49:01.442 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------递归2次--------
    2021-05-23 22:49:01.448 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------lock()执行后,getState()的值:3 lock.isLocked():true
    2021-05-23 22:49:01.448 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------递归3次--------
    2021-05-23 22:49:01.456 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------unlock()执行后,getState()的值:2 lock.isLocked():true
    2021-05-23 22:49:01.461 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------unlock()执行后,getState()的值:1 lock.isLocked():true
    2021-05-23 22:49:01.465 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------unlock()执行后,getState()的值:0 lock.isLocked():false
    2021-05-23 22:49:01.467 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : 执行完doSomething方法 是否还持有锁:false
    2021-05-23 22:49:01.467 INFO 69041 --- [nio-9090-exec-1] com.test.ReentrantLockDemo1 : --------------end---------------

    看控制台打印能清楚知道Redisson是支持可重入锁了。

    8、那么Redisson是如何实现的了?

    我们跟一下lock.lock()的代码,发现它最终调用的是org.redisson.RedissonLock#tryLockInnerAsync的方法,具体如下:

     T> RFutureT> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommandT> command) {
            return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                            "end; " +
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                            "end; " +
                            "return redis.call('pttl', KEYS[1]);",
                    Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
        }

    8.1、上面的代码,用到的redis命令先梳理一下

    exists 查询一个key是否存在

    EXISTS key [key ...]
    返回值
    如下的整数结果
    1 如果key存在
    0 如果key不存在

    hincrby :将hash中指定域的值增加给定的数字

    pexpire:设置key的有效时间以毫秒为单位

    hexists:判断field是否存在于hash中

    pttl:获取key的有效毫秒数

    8.2、看lua脚本传入的参数我们知道:

    protected String getLockName(long threadId) {
            return id + ":" + threadId;
        }

    8.3、代码截图

    从截图上可以看到,它是使用lua脚本来保证多个命令执行的原子性,使用了hash来实现了分布式锁
    现在我们来看下lua脚本的加锁流程

    8.4、第一个if判断

    8.5、下面来看第二个if判断

    8.6、下图是redis可视化工具看到是如何在hash存储的结构

    Redisson的整个加锁流程跟ReentrantLock的加锁逻辑基本相同

    8.7、解锁代码位于 org.redisson.RedissonLock#unlockInnerAsync,如下:

     return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                            "return nil;" +
                            "end; " +
                            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                            "if (counter > 0) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                            "return 0; " +
                            "else " +
                            "redis.call('del', KEYS[1]); " +
                            "redis.call('publish', KEYS[2], ARGV[1]); " +
                            "return 1; " +
                            "end; " +
                            "return nil;",
                    Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
        }

    看这个解锁的Lua脚本,流程跟Reentrantlock的解锁逻辑也基本相同没啥好说的了。

    以上就是redis分布式锁-可重入锁的详细内容,更多关于redis分布式锁的资料请关注脚本之家其它相关文章!

    您可能感兴趣的文章:
    • 详解redis分布式锁的这些坑
    • Java基于redis实现分布式锁
    • 详解Redis 分布式锁遇到的序列化问题
    • php基于redis的分布式锁实例详解
    • Redis分布式锁升级版RedLock及SpringBoot实现方法
    • redis分布式锁的go-redis实现方法详解
    • Redis分布式锁的使用和实现原理详解
    • redission分布式锁防止重复初始化问题
    • Redis如何实现分布式锁详解
    上一篇:分布式锁为什么要选择Zookeeper而不是Redis?看完这篇你就明白了
    下一篇:redis实现排行榜功能
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    redis分布式锁之可重入锁的实现代码 redis,分布式,锁之,可,重入,