• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    redis 集群批量操作实现

     Redis集群是没法执行批量操作命令的,如mget,pipeline等。这是因为redis将集群划分为16383个哈希槽,不同的key会划分到不同的槽中。但是,Jedis客户端提供了计算key的slot方法,已经slot和节点之间的映射关系,通过这两个数据,就可以计算出每个key所在的节点,然后使用pipeline获取数据。具体代码如下:

    初始化    JedisCluster类

    @Configuration
    public class JedisClusterConfig {
    
        @Value("${spring.redis.cluster.nodes}")
        private String clusterNodes;
    
        @Value("${spring.redis.cache.commandTimeout}")
        private Integer commandTimeout;
    
        @Bean
        public JedisCluster getJedisCluster() {
    
            String[] serverArray = clusterNodes.split(",");
            SetHostAndPort> nodes = new HashSet>();
            for (String ipPort : serverArray) {
                String[] ipPortPair = ipPort.split(":");
                nodes.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim())));
            }
            return new JedisCluster(nodes, commandTimeout);
        }
    }
    
    

    工具类 JedisClusterUtil

    @Component
    public class JedisClusterUtil {
    
        @Autowired
        private JedisCluster jedisCluster;
    
        @Resource(name = "redisTemplate4Json")
        protected RedisTemplateString, Object> redisTemplate;
    
        /**
         * ZSet批量查询
         * @param keys
         * @return
         */
        public ListObject> batchZRange(ListString> keys) {
    
            ListObject> resList = new ArrayList>();
            if (keys == null || keys.size() == 0) {
                return resList;
            }
    
            if (keys.size() == 1) {
                BoundZSetOperationsString, Object> operations = redisTemplate.boundZSetOps(keys.get(0));
                SetObject> set = operations.reverseRange(0, 0);
                resList.add(set.iterator().next());
                return resList;
            }
    
            MapJedisPool, ListString>> jedisPoolMap = getJedisPool(keys);
    
            ListString> keyList;
            JedisPool currentJedisPool = null;
            Pipeline currentPipeline;
            ListObject> res = new ArrayList>();
            MapString, Object> resultMap = new HashMap>();
    
            //执行
            for (Map.EntryJedisPool, ListString>> entry : jedisPoolMap.entrySet()) {
                Jedis jedis = null;
                try {
                    currentJedisPool = entry.getKey();
                    keyList = entry.getValue();
                    //获取pipeline
                    jedis = currentJedisPool.getResource();
                    currentPipeline = jedis.pipelined();
                    for (String key : keyList) {
                        currentPipeline.zrevrange(key, 0, 0);
                    }
                    //从pipeline中获取结果
                    res = currentPipeline.syncAndReturnAll();
                    currentPipeline.close();
                    for (int i = 0; i  keyList.size(); i++) {
                        if (null == res.get(i)) {
                            resultMap.put(keyList.get(i), null);
                        } else {
                            SetObject> set = (SetObject>) res.get(i);
                            if (null == set || set.isEmpty()) {
                                resultMap.put(keyList.get(i), null);
                            } else {
                                byte[] byteStr = set.iterator().next().toString().getBytes();
                                Object obj = redisTemplate.getDefaultSerializer().deserialize(byteStr);
                                resultMap.put(keyList.get(i), obj);
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    returnResource(jedis, currentJedisPool);
                }
            }
            resList = sortList(keys, resultMap);
            return resList;
        }
    
        /**
         * Value批量查询
         * @param keys
         * @return
         */
        public ListObject> batchGet(ListString> keys){
            ListObject> resList = new ArrayList>();
            if (keys == null || keys.size() == 0) {
                return resList;
            }
    
            if (keys.size() == 1) {
                BoundValueOperationsString, Object> operations = redisTemplate.boundValueOps(keys.get(0));
                resList.add(operations.get());
                return resList;
            }
    
            MapJedisPool, ListString>> jedisPoolMap = getJedisPool(keys);
    
            ListString> keyList;
            JedisPool currentJedisPool = null;
            Pipeline currentPipeline;
            ListObject> res = new ArrayList>();
            MapString, Object> resultMap = new HashMap>();
    
            for (Map.EntryJedisPool, ListString>> entry : jedisPoolMap.entrySet()) {
                Jedis jedis = null;
                try {
                    currentJedisPool = entry.getKey();
                    keyList = entry.getValue();
                    //获取pipeline
                    jedis = currentJedisPool.getResource();
                    currentPipeline = jedis.pipelined();
                    for (String key : keyList) {
                        currentPipeline.get(key);
                    }
                    //从pipeline中获取结果
                    res = currentPipeline.syncAndReturnAll();
                    currentPipeline.close();
                    for (int i = 0; i  keyList.size(); i++) {
                        if (null == res.get(i)) {
                            resultMap.put(keyList.get(i), null);
                        } else {
                            byte[] byteStr = keyList.get(i).toString().getBytes();
                            Object obj = redisTemplate.getDefaultSerializer().deserialize(byteStr);
                            resultMap.put(keyList.get(i), obj);
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    returnResource(jedis, currentJedisPool);
                }
            }
            resList = sortList(keys, resultMap);
            return resList;
        }
    
        private MapJedisPool, ListString>> getJedisPool(ListString> keys){
            //JedisCluster继承了BinaryJedisCluster
            //BinaryJedisCluster的JedisClusterConnectionHandler属性
            //里面有JedisClusterInfoCache,根据这一条继承链,可以获取到JedisClusterInfoCache
            //从而获取slot和JedisPool直接的映射
            MetaObject metaObject = SystemMetaObject.forObject(jedisCluster);
            JedisClusterInfoCache cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache");
            //保存地址+端口和命令的映射
            MapJedisPool, ListString>> jedisPoolMap = new HashMap>();
    
            JedisPool currentJedisPool = null;
            ListString> keyList;
            for (String key : keys) {
                //计算哈希槽
                int crc = JedisClusterCRC16.getSlot(key);
                //通过哈希槽获取节点的连接
                currentJedisPool = cache.getSlotPool(crc);
    
                //由于JedisPool作为value保存在JedisClusterInfoCache中的一个map对象中,每个节点的
                //JedisPool在map的初始化阶段就是确定的和唯一的,所以获取到的每个节点的JedisPool都是一样
                //的,可以作为map的key
                if (jedisPoolMap.containsKey(currentJedisPool)) {
                    jedisPoolMap.get(currentJedisPool).add(key);
                } else {
                    keyList = new ArrayList>();
                    keyList.add(key);
                    jedisPoolMap.put(currentJedisPool, keyList);
                }
            }
            return jedisPoolMap;
        }
    
        private ListObject> sortList(ListString> keys, MapString, Object> params) {
            ListObject> resultList = new ArrayList>();
            IteratorString> it = keys.iterator();
            while (it.hasNext()) {
                String key = it.next();
                resultList.add(params.get(key));
            }
            return resultList;
        }
    
        /**
         * 释放jedis资源
         *
         * @param jedis
         */
        public void returnResource(Jedis jedis, JedisPool jedisPool) {
            if (jedis != null  jedisPool != null) {
                jedisPool.returnResource(jedis);
            }
        }

     注意:一定要完成后释放 jedis 资源  不然会造成卡死现象

    到此这篇关于redis 集群批量操作实现的文章就介绍到这了,更多相关redis 集群批量操作内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    您可能感兴趣的文章:
    • 用python 批量操作redis数据库
    • 详解redis大幅性能提升之使用管道(PipeLine)和批量(Batch)操作
    上一篇:Redis5之后版本的高可用集群搭建的实现
    下一篇:redis 查看所有的key方式
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    redis 集群批量操作实现 redis,集群,批量,操作,实现,