使用java jedis封装Redis Stream操作案例

我会带着你远行 2023-01-06 11:56 574阅读 0赞

前言

Redis在5.0后增加了Stream功能,在日常的项目中Redis用到的比较多,但是Stream这个功能用的却是比较少,今天学习了一下Stream的基本使用功能,可以方便在接下来项目中遇到合适的再场景使用。

接口代码

  1. public interface IStreamServer {
  2. /**
  3. * 添加消息
  4. * @param key
  5. * @param streamEntryID
  6. * @param content
  7. * @return
  8. */
  9. StreamEntryID xadd(String key, StreamEntryID streamEntryID, Map<String, String> content);
  10. /**
  11. * 创建分组
  12. * @param stream
  13. * @param group
  14. * @param makeStream
  15. * @return
  16. */
  17. String xgroupCreate(String stream, String group, Boolean makeStream);
  18. /**
  19. * 倒序获取历史消息
  20. * @param key
  21. * @param end
  22. * @param start
  23. * @param count
  24. * @return
  25. */
  26. List<StreamEntry> xrevrange(String key, StreamEntryID end, StreamEntryID start, int count);
  27. /**
  28. * 正序获取历史消息
  29. * @param key
  30. * @param start
  31. * @param end
  32. * @param count
  33. * @return
  34. */
  35. List<StreamEntry> xrange(String key, StreamEntryID start, StreamEntryID end, int count);
  36. /**
  37. * 按分组获取消息
  38. * @param group
  39. * @param consumer
  40. * @param count
  41. * @param streams
  42. * @return
  43. */
  44. List<Map.Entry<String, List<StreamEntry>>> xreadGroup(String group, String consumer, int count, Map.Entry<String, StreamEntryID>... streams);
  45. /**
  46. * 获取消息
  47. * @param count 获取数据
  48. * @param streams 起始消息ID
  49. * @return
  50. */
  51. List<Map.Entry<String, List<StreamEntry>>> xread(int count, Map.Entry<String, StreamEntryID>... streams);
  52. }

实现类代码

  1. public class StreamServiceImpl implements IStreamServer {
  2. @Resource
  3. private Pool<Jedis> jedisPool;
  4. @Override
  5. public StreamEntryID xadd(String key, StreamEntryID streamEntryID, Map<String, String> content) {
  6. try (Jedis jedis = jedisPool.getResource()) {
  7. return jedis.xadd(key, streamEntryID, content);
  8. }
  9. }
  10. @Override
  11. public String xgroupCreate(String stream, String group, Boolean makeStream) {
  12. try (Jedis jedis = jedisPool.getResource()) {
  13. return jedis.xgroupCreate(stream, group, null, makeStream);
  14. }
  15. }
  16. @Override
  17. public List<StreamEntry> xrevrange(String key, StreamEntryID end, StreamEntryID start, int count) {
  18. try (Jedis jedis = jedisPool.getResource()) {
  19. return jedis.xrevrange(key, end, start, count);
  20. }
  21. }
  22. @Override
  23. public List<StreamEntry> xrange(String key, StreamEntryID start, StreamEntryID end, int count) {
  24. try (Jedis jedis = jedisPool.getResource()) {
  25. return jedis.xrange(key, start, end, count);
  26. }
  27. }
  28. @Override
  29. public List<Map.Entry<String, List<StreamEntry>>> xreadGroup(String group, String consumer, int count, Map.Entry<String, StreamEntryID>... streams) {
  30. try (Jedis jedis = jedisPool.getResource()) {
  31. return jedis.xreadGroup(group, consumer, count, 0, false, streams);
  32. }
  33. }
  34. @Override
  35. public List<Map.Entry<String, List<StreamEntry>>> xread(int count, Map.Entry<String, StreamEntryID>... streams) {
  36. try (Jedis jedis = jedisPool.getResource()) {
  37. return jedis.xread(count, 0, streams);
  38. }
  39. }
  40. }

测试代码

  1. public class RedisStreamTest extends BaseTest {
  2. @Resource
  3. private IStreamServer iStreamServer;
  4. @Test
  5. public void setGroup() throws Exception {
  6. //创建 流名为 video 分组为 group1
  7. String res = iStreamServer.xgroupCreate("video", "group1", true);
  8. //创建 流名为 audio分组为 group1
  9. String res2 = iStreamServer.xgroupCreate("audio", "group1", true);
  10. }
  11. @Test
  12. public void xadd() throws Exception {
  13. Long time = System.currentTimeMillis();
  14. //向audio中插入100条消息
  15. for (int i = 0; i < 100; i++) {
  16. StreamEntryID streamEntryID = new StreamEntryID(String.format("%s-%s", time, i));
  17. StreamEntryID id = iStreamServer.xadd("audio", streamEntryID, ImmutableMap.of("id", String.valueOf(i)));
  18. System.out.println(id);
  19. }
  20. //向video中插入100条消息
  21. for (int i = 0; i < 100; i++) {
  22. StreamEntryID streamEntryID = new StreamEntryID(String.format("%s-%s", time, i));
  23. StreamEntryID id = iStreamServer.xadd("video", streamEntryID, ImmutableMap.of("id", String.valueOf(i)));
  24. System.out.println(id);
  25. }
  26. }
  27. @Test
  28. public void xreadGroup() throws Exception {
  29. Map<String, StreamEntryID> t = MapUtil.of("video", null);
  30. Map<String, StreamEntryID> t2 = MapUtil.of("audio", null);
  31. Map.Entry<String, StreamEntryID> video = t.entrySet().stream().findFirst().get();
  32. Map.Entry<String, StreamEntryID> audio = t2.entrySet().stream().findFirst().get();
  33. //client1 用 group1 分组 从 video/audio 流中获取两个消息 ,同一分组中不同客户端可共享消费消息(client1消费一条,client2消息偏移量增加1 ),不同分组中的客户端消息不影响(client1消费一条,不影响client2消息偏移量)
  34. List<Map.Entry<String, List<StreamEntry>>> list = iStreamServer.xreadGroup("group1", "client1", 2, video, audio);
  35. list.forEach(x -> {
  36. System.out.println(x.getKey() + "-->" + x.getValue());
  37. });
  38. //输出
  39. /**
  40. video-->[1610607445337-2 {id=2}, 1610607445337-3 {id=3}]
  41. audio-->[1610608368683-2 {id=2}, 1610608368683-3 {id=3}]
  42. **/
  43. }
  44. @Test
  45. public void xrange() throws Exception {
  46. //video中按正序 从1610607445337-10 到1610607445337-20 中获取3条消息
  47. List<StreamEntry> streamEntries = iStreamServer.xrange("video", new StreamEntryID("1610607445337-10"), new StreamEntryID("1610607445337-20"), 3);
  48. streamEntries.forEach(x -> System.out.println(x));
  49. //输出
  50. /**
  51. 1610607445337-10 {id=10}
  52. 1610607445337-11 {id=11}
  53. 1610607445337-12 {id=12}
  54. **/
  55. }
  56. @Test
  57. public void xrevrange() throws Exception {
  58. //video中按倒序 从1610607445337-80 到1610607445337-0 中获取3条消息
  59. List<StreamEntry> streamEntries = iStreamServer.xrevrange("video", new StreamEntryID("1610607445337-80"), new StreamEntryID("1610607445337-0"), 3);
  60. streamEntries.forEach(x -> System.out.println(x));
  61. //输出
  62. /**
  63. 1610607445337-80 {id=80}
  64. 1610607445337-79 {id=79}
  65. 1610607445337-78 {id=78}
  66. **/
  67. }
  68. @Test
  69. public void xread() throws Exception {
  70. //从1610607445337-2 开始获取
  71. Map<String, StreamEntryID> t = MapUtil.of("video", new StreamEntryID("1610607445337-2"));
  72. Map<String, StreamEntryID> t2 = MapUtil.of("audio", null);
  73. Map.Entry<String, StreamEntryID> video = t.entrySet().stream().findFirst().get();
  74. Map.Entry<String, StreamEntryID> audio = t2.entrySet().stream().findFirst().get();
  75. // 从video 1610607445337-2 开始,从audio 0开始获取 2条消息
  76. List<Map.Entry<String, List<StreamEntry>>> list = iStreamServer.xread(2, video, audio);
  77. list.forEach(x -> {
  78. System.out.println(x.getKey() + "-->" + x.getValue());
  79. });
  80. //输出
  81. /**
  82. audio-->[1610608368683-0 {id=0}, 1610608368683-1 {id=1}]
  83. video-->[1610607445337-3 {id=3}, 1610607445337-4 {id=4}]
  84. **/
  85. }
  86. }

总结

1.Redis Stream 消息支持持久化,可随意获取历史消息记录
2.Redis Stream 支持客户端分组功能,同一分组中的客户端可共同消费流中的消息

发表评论

表情:
评论列表 (有 0 条评论,574人围观)

还没有评论,来说两句吧...

相关阅读