Redis集群批量操作

àì夳堔傛蜴生んèń 2022-05-19 08:56 411阅读 0赞
  1. 众所周知,Redis集群是没法执行批量操作命令的,如mgetpipeline等。这是因为redis将集群划分为16383个哈希槽,不同的key会划分到不同的槽中。但是,Jedis客户端提供了计算keyslot方法,已经slot和节点之间的映射关系,通过这两个数据,就可以计算出每个key所在的节点,然后使用pipeline获取数据。具体代码如下:
  2. /**
  3. * 根据key计算slot,再根据slot计算node,获取pipeline
  4. * 进行批量操作
  5. */
  6. public class BatchUtil {
  7. public static Map<String,String> mget(JedisCluster jc,String... keys){
  8. Map<String,String> resMap = new HashMap<>();
  9. if(keys == null || keys.length == 0){
  10. return resMap;
  11. }
  12. //如果只有一条,直接使用get即可
  13. if(keys.length == 1){
  14. resMap.put(keys[0],jc.get(keys[0]));
  15. return resMap;
  16. }
  17. //JedisCluster继承了BinaryJedisCluster
  18. //BinaryJedisCluster的JedisClusterConnectionHandler属性
  19. //里面有JedisClusterInfoCache,根据这一条继承链,可以获取到JedisClusterInfoCache
  20. //从而获取slot和JedisPool直接的映射
  21. MetaObject metaObject = SystemMetaObject.forObject(jc);
  22. JedisClusterInfoCache cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache");
  23. //保存地址+端口和命令的映射
  24. Map<JedisPool,List<String>> jedisPoolMap = new HashMap<>();
  25. List<String> keyList = null;
  26. JedisPool currentJedisPool = null;
  27. Pipeline currentPipeline = null;
  28. for(String key:keys){
  29. //计算哈希槽
  30. int crc = JedisClusterCRC16.getSlot(key);
  31. //通过哈希槽获取节点的连接
  32. currentJedisPool = cache.getSlotPool(crc);
  33. //由于JedisPool作为value保存在JedisClusterInfoCache中的一个map对象中,每个节点的
  34. //JedisPool在map的初始化阶段就是确定的和唯一的,所以获取到的每个节点的JedisPool都是一样
  35. //的,可以作为map的key
  36. if(jedisPoolMap.containsKey(currentJedisPool)){
  37. jedisPoolMap.get(currentJedisPool).add(key);
  38. }else{
  39. keyList = new ArrayList<>();
  40. keyList.add(key);
  41. jedisPoolMap.put(currentJedisPool, keyList);
  42. }
  43. }
  44. //保存结果
  45. List<Object> res = new ArrayList<>();
  46. //执行
  47. for(Entry<JedisPool,List<String>> entry:jedisPoolMap.entrySet()){
  48. try {
  49. currentJedisPool = entry.getKey();
  50. keyList = entry.getValue();
  51. //获取pipeline
  52. currentPipeline = currentJedisPool.getResource().pipelined();
  53. for(String key:keyList){
  54. currentPipeline.get(key);
  55. }
  56. //从pipeline中获取结果
  57. res = currentPipeline.syncAndReturnAll();
  58. currentPipeline.close();
  59. for(int i=0;i<keyList.size();i++){
  60. resMap.put(keyList.get(i),res.get(i)==null?null:res.get(i).toString());
  61. }
  62. } catch (Exception e) {
  63. e.printStackTrace();
  64. return new HashMap<>();
  65. }
  66. }
  67. return resMap;
  68. }
  69. }

发表评论

表情:
评论列表 (有 0 条评论,411人围观)

还没有评论,来说两句吧...

相关阅读