• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    Redis实现分布式锁和等待序列的方法示例

    在集群下,经常会因为同时处理发生资源争抢和并发问题,但是我们都知道同步锁 synchronized 、 cas 、 ReentrankLock 这些锁的作用范围都是 JVM ,说白了在集群下没啥用。这时我们就需要能在多台 JVM 之间决定执行顺序的锁了,现在分布式锁主要有 redis 、 Zookeeper 实现的,还有数据库的方式,不过性能太差,也就是需要一个第三方的监管。

    背景

    最近在做一个消费 Kafka 消息的时候发现,由于线上的消费者过多,经常会遇到,多个机器同时处理一个主键类型的数据的情况发生,如果最后是执行更新操作的话,也就是一个更新顺序的问题,但是如果恰好都需要插入数据的时候,会出现主键重复的问题。这是生产上不被允许的(因为公司有异常监管的机制,扣分啥的),这是就需要个分布式锁了,斟酌后用了 Redis 的实现方式(因为网上例子多)

    分析

    redis 实现的分布式锁,实现原理是 set 方法,因为多个线程同时请求的时候,只有一个线程可以成功并返回结果,还可以设置有效期,来避免死锁的发生,一切都是这么的完美,不过有个问题,在 set 的时候,会直接返回结果,成功或者失败,不具有阻塞效果,需要我们自己对失败的线程进程处理,有两种方式

    代码

    直接上代码 其实直接redis的工具类就可以解决了

    package com.test
    import redis.clients.jedis.Jedis;
    
    import java.util.Collections;
    import java.util.List;
    
    /**
     * @desc redis队列实现方式
     * @anthor 
     * @date 
     **/
    public class RedisUcUitl {
    
      private static final String LOCK_SUCCESS = "OK";
      private static final String SET_IF_NOT_EXIST = "NX";
      private static final String SET_WITH_EXPIRE_TIME = "PX";
    
      private static final Long RELEASE_SUCCESS = 1L;
    
      private RedisUcUitl() {
    
      }
      /**
       * logger
       **/
    
      /**
       * 存储redis队列顺序存储 在队列首部存入
       *
       * @param key  字节类型
       * @param value 字节类型
       */
      public static Long lpush(Jedis jedis, final byte[] key, final byte[] value) {
    
        return jedis.lpush(key, value);
      
      }
    
      /**
       * 移除列表中最后一个元素 并将改元素添加入另一个列表中 ,当列表为空时 将阻塞连接 直到等待超时
       *
       * @param srckey
       * @param dstkey
       * @param timeout 0 表示永不超时
       * @return
       */
      public static byte[] brpoplpush(Jedis jedis,final byte[] srckey, final byte[] dstkey, final int timeout) {
    
        return jedis.brpoplpush(srckey, dstkey, timeout);
    
      }
    
      /**
       * 返回制定的key,起始位置的redis数据
       * @param redisKey
       * @param start
       * @param end -1 表示到最后
       * @return
       */
      public static Listbyte[]> lrange(Jedis jedis,final byte[] redisKey, final long start, final long end) {
        
        return jedis.lrange(redisKey, start, end);
      }
    
      /**
       * 删除key
       * @param redisKey
       */
      public static void delete(Jedis jedis, final byte[] redisKey) {
        
         return jedis.del(redisKey);
      }
    
      /**
       * 尝试加锁
       * @param lockKey key名称
       * @param requestId 身份标识
       * @param expireTime 过期时间
       * @return
       */
      public static boolean tryGetDistributedLock(Jedis jedis,final String lockKey, final String requestId, final int expireTime) {
        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
        return LOCK_SUCCESS.equals(result);
    
      }
    
      /**
       * 释放锁
       * @param lockKey key名称
       * @param requestId 身份标识
       * @return
       */
      public static boolean releaseDistributedLock(Jedis jedis,final String lockKey, final String requestId) {
        final String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
    
        return RELEASE_SUCCESS.equals(result);
    
      }
    }
    
    

    业务逻辑主要代码如下

    1.先消耗队列中的

    while(true){
      // 消费队列
      try{
        // 被放入redis队列的数据 序列化后的
        byte[] bytes = RedisUcUitl.brpoplpush(keyStr.getBytes(UTF_8), dstKeyStr.getBytes(UTF_8), 1);
        if(bytes == null || bytes.isEmpty()){
          // 队列中没数据时退出
          break;
        }
        // 反序列化对象
        MapString, Object> singleMap = (MapString, Object>) ObjectSerialUtil.bytesToObject(bytes);
        // 塞入唯一的值 防止被其他线程误解锁
        String requestId = UUID.randomUUID().toString();
        boolean lockGetFlag = RedisUcUitl.tryGetDistributedLock(keyStr,requestId, 100);
        if(lockGetFlag){
          // 成功获取锁 进行业务处理
          //TODO
          // 处理完毕释放锁 
          boolean freeLock = RedisUcUitl.releaseDistributedLock(keyStr, requestId);
    
        }else{
          // 未能获得锁放入等待队列
         RedisUcUitl.lpush(keyStr.getBytes(UTF_8), ObjectSerialUtil.objectToBytes(param));
      
        }
        
      }catch(Exception e){
        break;
      }
      
    }
    
    

    2.处理最新接到的数据

    同样是走尝试获取锁,获取不到放入队列的流程

    一般序列化用 fastJson 之列的就可以了,这里用的是 JDK 自带的,工具类如下

    public class ObjectSerialUtil {
    
      private ObjectSerialUtil() {
    //    工具类
      }
    
      /**
       * 将Object对象序列化为byte[]
       *
       * @param obj 对象
       * @return byte数组
       * @throws Exception
       */
      public static byte[] objectToBytes(Object obj) throws IOException {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(obj);
        byte[] bytes = bos.toByteArray();
        bos.close();
        oos.close();
        return bytes;
      }
    
    
      /**
       * 将bytes数组还原为对象
       *
       * @param bytes
       * @return
       * @throws Exception
       */
      public static Object bytesToObject(byte[] bytes) {
        try {
          ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
          ObjectInputStream ois = new ObjectInputStream(bin);
          return ois.readObject();
        } catch (Exception e) {
          throw new BaseException("反序列化出错!", e);
        }
      }
    }
    
    

    以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

    您可能感兴趣的文章:
    • Redis分布式锁的正确实现方法总结
    • Redis分布式锁的实现方式(redis面试题)
    • SpringBoot使用Redisson实现分布式锁(秒杀系统)
    • SpringBoot集成Redisson实现分布式锁的方法示例
    • Java Redis分布式锁的正确实现方式详解
    • redis分布式锁的问题与解决方法
    • 浅谈Redis分布式锁的正确实现方式
    • 单机redis分布式锁实现原理解析
    上一篇:Redis如何优雅的删除特定前缀key
    下一篇:redis单线程快的原因和原理
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    Redis实现分布式锁和等待序列的方法示例 Redis,实现,分布式,锁,和,