Redis只用来做缓存?来认识一下它其他强大的能力吧。

喜欢ヅ旅行 2023-09-25 22:57 42阅读 0赞

当今互联网应用中,随着业务的发展,数据量越来越大,查询效率越来越高,对于时序数据的存储、查询和分析需求也越来越强烈,这时候 Redis 就成为了首选的方案之一。

Redis 提供了多种数据结构,如字符串、哈希表、列表、集合、有序集合等,每种数据结构都具备不同的特性,可以满足不同的业务需求。其中,有序集合的 score 可以存储时间戳,非常适合用于存储时序数据,例如监控指标、日志、统计数据、报表等。下面举几个时序数据场景例子:

  1. 监控指标:

假设我们有一个服务,名为 my_service,需要监控它的请求响应时间。我们可以使用 Redis 有序集合来存储数据,每个请求的响应时间作为 value,请求的时间戳作为 score。示例如下:

  1. > ZADD requests:my_service 1613115560 350
  2. (integer) 1
  3. > ZADD requests:my_service 1613115570 450
  4. (integer) 1
  5. > ZADD requests:my_service 1613115580 550
  6. (integer) 1
  7. 复制代码

这些命令向名为 requests:my_service 的有序集合中添加了 3 条数据,分别是 2021 年 2 月 12 日 10:19:20 的请求响应时间为 350ms,10:19:30 的请求响应时间为 450ms,10:19:40 的请求响应时间为 550ms。

接下来,我们来看一下如何使用 Redis 命令查询这些监控指标的数据。下面的命令会返回 requests:my_service 有序集合内所有数据:

  1. > ZRANGE requests:my_service 0 -1 WITHSCORES
  2. 1) "350"
  3. 2) "1613115560"
  4. 3) "450"
  5. 4) "1613115570"
  6. 5) "550"
  7. 6) "1613115580"
  8. 复制代码

命令执行结果表示,数据按照 score 排序,其中 score 是时间戳(单位为秒),value 是请求响应时间(单位为毫秒)。同时,使用 ZRANGEBYSCORE 命令可以获取一段时间范围内的监控数据,例如:

  1. > ZRANGEBYSCORE requests:my_service 1613115570 1613115580 WITHSCORES
  2. 1) "450"
  3. 2) "1613115570"
  4. 3) "550"
  5. 4) "1613115580"
  6. 复制代码

这条命令返回了 requests:my_service 有序集合中在时间戳 1613115570 到 1613115580 之间的所有数据。

  1. 日志:

假设我们要存储的日志是一条指定格式的字符串,包含时间戳和日志内容。使用 Redis 列表存储日志数据,每次写入新日志时可以使用 Redis 列表的 rpush 命令将数据写入列表的尾部。示例如下:

  1. > RPUSH logs:my_logs 2021-02-12 10:30:00 INFO message 1
  2. (integer) 1
  3. > RPUSH logs:my_logs 2021-02-12 10:30:01 ERROR message 2
  4. (integer) 2
  5. > RPUSH logs:my_logs 2021-02-12 10:30:02 WARN message 3
  6. (integer) 3
  7. 复制代码

这些命令向名为 logs:my_logs 的列表尾部添加 3 条数据,分别是 2021 年 2 月 12 日 10:30:00 的 INFO 级别消息,10:30:01 的 ERROR 级别消息和 10:30:02 的 WARN 级别消息。

接下来,我们来看一下如何使用 Redis 命令查询这些日志数据。下面的命令会返回 logs:my_logs 列表内所有数据:

  1. > LRANGE logs:my_logs 0 -1
  2. 1) "2021-02-12 10:30:00 INFO message 1"
  3. 2) "2021-02-12 10:30:01 ERROR message 2"
  4. 3) "2021-02-12 10:30:02 WARN message 3"
  5. 复制代码

命令执行结果表示,数据按照插入顺序排序,从列表头部开始遍历。使用 ZRANGEBYSCORE 命令可以获取一段时间范围内的日志数据,例如:

  1. > ZRANGEBYSCORE logs:my_logs 1613115570 1613115580
  2. 1) "2021-02-12 10:30:01 ERROR message 2"
  3. 复制代码

这条命令返回了 logs:my_logs 列表中在时间戳 1613115570 到 1613115580 之间的日志数据,但因为日志数据并没有具体的 time stamp 做 score,所以这个例子只是演示这个命令的用法,实际上应该使用有序集合去查询时间区间内的日志数据。

  1. 统计数据:

假设我们要存储的统计数据是一些具体业务相关的计数器,例如每分钟用户访问量。我们可以使用 Redis 有序集合来存储统计数据,key 是计数器名称,score 是时间戳,value 是具体的计数值(例如访问次数)。示例如下:

  1. > ZADD visits 1613167800 100
  2. (integer) 1
  3. > ZADD visits 1613167860 120
  4. (integer) 1
  5. > ZADD visits 1613167920 150
  6. (integer) 1
  7. 复制代码

这些命令向名为 visits 的有序集合中添加了 3 条数据,分别是 2021 年 2 月 12 日 23:30:00 的访问次数为 100,23:31:00 的访问次数为 120,23:32:00 的访问次数为 150。

接下来,我们来看一下如何使用 Redis 命令查询这些统计数据。下面的命令会返回 visits 有序集合内所有数据:

  1. > ZRANGE visits 0 -1 WITHSCORES
  2. 1) "100"
  3. 2) "1613167800"
  4. 3) "120"
  5. 4) "1613167860"
  6. 5) "150"
  7. 6) "1613167920"
  8. 复制代码

命令执行结果表示,数据按照 score 排序,其中 score 是时间戳(单位为秒),value 是访问次数。使用 ZRANGEBYSCORE 命令可以获取一段时间范围内的统计数据,例如:

  1. > ZRANGEBYSCORE visits 1613167860 1613167920 WITHSCORES
  2. 1) "120"
  3. 2) "1613167860"
  4. 3) "150"
  5. 4) "1613167920"
  6. 复制代码

这条命令返回了 visits 有序集合中在时间戳 1613167860 到 1613167920 之间的所有数据。

使用 Redis 有序集合中的另一个常见场景是计算 TopN,例如找出访问次数最多的前 10 个计数器,可以使用命令 ZREVRANGE visits 0 9 WITHSCORES,它返回 visits 有序集合中前 10 个元素,按照 value 从大到小排列,并且返回每个元素的 score。

需求实践:

这是一个实时监控系统,主要用于记录和统计服务发生的错误情况,以便在错误数量超过预设阈值时发出警告信息。

系统每秒钟生成随机错误数据,并将它们存储到 Redis 数据库中。每隔 10 秒钟,系统会从 Redis 数据库中聚合最近一分钟内的错误数据,并按照服务名和错误类型进行统计计算。如果某个服务的错误数量超过预设阈值,系统会输出一条警告信息提示用户。

整个系统的目标是帮助用户及时了解每个服务的错误情况,以便及时采取相应的措施,保障服务的稳定性和可靠性。

代码示例:

模拟接口服务异常数据

  1. package com.example.demo.redis;
  2. import redis.clients.jedis.Jedis;
  3. import java.util.*;
  4. public class DataGenerator {
  5. // 定义服务列表
  6. private static final List<String> SERVICES = Arrays.asList("service1", "service2", "service3");
  7. // 定义错误列表
  8. public static final List<String> ERRORS = Arrays.asList("invalid_param", "timeout", "unknown_error");
  9. /**
  10. * 生成数据
  11. *
  12. * @param total 数据总数
  13. * @param jedis Redis 客户端连接
  14. */
  15. public static void generateData(int total, Jedis jedis) {
  16. Random rand = new Random(); // 初始化随机数生成器
  17. long currentTimestamp = System.currentTimeMillis() / 1000; // 获取当前时间戳,精确到秒
  18. long startTimestamp = currentTimestamp - 60; // 计算起始时间戳,为当前时间戳减去 60 秒
  19. for (int i = 0; i < total; i++) { // 循环 total 次,生成 total 条数据
  20. String service = SERVICES.get(rand.nextInt(SERVICES.size())); // 随机选择一个服务
  21. String error = ERRORS.get(rand.nextInt(ERRORS.size())); // 随机选择一个错误
  22. long timestamp = startTimestamp + rand.nextInt(60); // 生成一个随机时间戳,精确到秒,范围为起始时间戳到当前时间戳
  23. int count = 1;
  24. String item = String.format("%s:%s:%d:%d", service, error, timestamp, count);
  25. jedis.zadd("error_data", timestamp, item); // 将错误数据存储到 Redis 数据库中
  26. }
  27. }
  28. }
  29. 复制代码

聚合异常数据,达到阈值告警

  1. package com.example.demo.redis;
  2. import redis.clients.jedis.Jedis;
  3. import java.util.*;
  4. import java.util.concurrent.Executors;
  5. import java.util.concurrent.ScheduledExecutorService;
  6. import java.util.concurrent.TimeUnit;
  7. public class DataAggregator {
  8. private static final String REDIS_HOST = "localhost"; // Redis 主机名
  9. private static final int REDIS_PORT = 6379; // Redis 端口号
  10. private static final int THRESHOLD = 100; // 预设阈值,当错误数量超过该阈值时触发警告
  11. public static void main(String[] args) {
  12. ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); // 创建一个只有一个线程的定时任务执行程序
  13. Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT); // 创建 Redis 客户端连接
  14. scheduler.scheduleAtFixedRate(() -> {
  15. // 并发情况下,线程会阻塞
  16. synchronized (jedis) {
  17. DataGenerator.generateData(20, jedis); // 生成随机错误数据,并将其存储到 Redis 数据库中
  18. }
  19. }, 0, 1, TimeUnit.SECONDS); // 定时任务间隔为 1 秒钟
  20. scheduler.scheduleAtFixedRate(() -> { // 定时任务逻辑
  21. synchronized (jedis) {
  22. long currentTimestamp = System.currentTimeMillis() / 1000; // 获取当前时间戳,精确到秒
  23. long startTimestamp = currentTimestamp - 60; // 计算起始时间戳,为当前时间戳减去 60 秒
  24. Set<String> data = jedis.zrangeByScore("error_data", startTimestamp, currentTimestamp); // 使用 zrange 命令获取指定时间范围内的数据
  25. Map<String, Map<String, Integer>> countMap = new HashMap<>(); // 用于记录聚合后的服务和错误数量信息
  26. for (String item : data) { // 遍历所有错误数据
  27. String[] parts = item.split(":"); // 以冒号为分隔符,将错误数据分割为部分
  28. String service = parts[0]; // 获取服务名
  29. String error = parts[1]; // 获取错误类型
  30. long timestamp = Long.parseLong(parts[2]); // 获取时间戳
  31. int count = Integer.parseInt(parts[3]); // 获取错误数量
  32. if (timestamp < startTimestamp) { // 如果时间戳早于起始时间戳,则跳过该数据
  33. continue;
  34. }
  35. Map<String, Integer> serviceCountMap = countMap.computeIfAbsent(service, k -> new HashMap<>()); // 获取指定服务的错误数量信息
  36. serviceCountMap.put(error, serviceCountMap.getOrDefault(error, 0) + count); // 更新指定服务和错误类型的错误数量信息
  37. }
  38. List<String> alerts = new ArrayList<>(); // 用于存储警告信息
  39. for (String service : countMap.keySet()) { // 遍历服务名列表
  40. Map<String, Integer> serviceCountMap = countMap.get(service); // 获取服务和错误数量信息
  41. int totalErrors = 0;
  42. for (String error : serviceCountMap.keySet()) { // 遍历错误列表
  43. int count = serviceCountMap.get(error); // 获取错误数量
  44. totalErrors += count;
  45. }
  46. if (totalErrors > THRESHOLD) { // 如果错误数量超过预设阈值
  47. alerts.add(service + " has too many errors: " + serviceCountMap.keySet() + ", count: " + totalErrors); // 将该服务名添加到警告信息列表中
  48. }
  49. }
  50. if (!alerts.isEmpty()) { // 如果警告信息列表不为空
  51. System.out.println(String.join("\n", alerts)); // 打印警告信息
  52. }
  53. }
  54. }, 0, 10, TimeUnit.SECONDS); // 定时任务间隔为 10 秒
  55. // 关闭 Redis 连接
  56. jedis.close();
  57. }
  58. }
  59. 复制代码

以上代码可正常运行,有疑问可以交流~~

发表评论

表情:
评论列表 (有 0 条评论,42人围观)

还没有评论,来说两句吧...

相关阅读