Redis Stream最全用法详解一 系统管理员 2022-12-26 10:11 22阅读 0赞 ### redis Stream: ### redis Stream是redis 5.0版本新增加的数据结构。 redis stream主要用于消息队列(MQ,Message Queue),Redis本身是有一个Redis发布订阅(pub/sub)来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis宕机等,消息就会被丢弃。 简单来说发布订阅(pub/sub)可以分发消息,但无法记录历使消息。 而Redis Stream提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。 Redis Stream的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容: ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpX3dfY2g_size_16_color_FFFFFF_t_70][] 每个Stream都有一个唯一的名称,它就是Redis的key,在我们首次使用xadd指令追加消息时自动创建。 * Consumer Group:消费组,使用XGROUP CREATE命令创建,一个消费组有多个消费者(Consumer) * last\_delivered\_id:游标,每个消费者都会有个游标laset\_delivered\_id,任意一个消费者读取了消息都会使游标last\_delivered\_id往前移动 * pending\_ids:消费者(Consumer)的状态变量,作用是维护消费者的未确认的id。pending\_ids记录了当前已经被客户端读取的消息,但是还没有ack(Acknowledge character:确认字符) ### 消息队列的相关命令: ### * XADD:添加消息到末尾(生产消息) * XTRIM:对流进行修剪,限制长度 * XDEL:删除消息 * XLEN:获取流包含的元素数量,及消息长度 * XRANGE:获取消息列表,会自动过滤已经删除的消息 * XREVERANGE:反向获取消息列表,ID从大到小 * XREAD:以阻塞或非阻塞方式获取消息列表(消费消息) ### 消费者组相关命令: ### * XGROUP CREATE:创建消费者组 * XREADGROUP GROUP:读取消费者组中的消息 * XACK:将消息标记为“已处理” * XGROUP SETID:为消费者组设置新的最后递送消息ID * XGROUP DELCONSUMER:删除消费者 * XGROUP DESTROY:删除消费者组 * XPENDING:显示待处理消息的相关信息 * XCLAIM:转移消费者组的相关信息 * XINFO GROUPS:打印消费者组的信息 * XINFO STREAM:打印流信息 ### XADD: ### 使用XADD向队列添加消息,如果指定的队列不存在,则创建一个队列,XADD语法格式: XADD key ID field value [field value ...] * key:队列名称,如果不存在就创建 * ID:消息id,我们使用\*表示由redis生成,可以自定义,但是要自己保证递增性 * field value:记录,当前消息内容,由一个或多个key-value构成 ### XADD实例: ### //需要提供key,消息ID方案,消息内容,其中消息内容为key-value型数据。 //ID,最常用*,表示由redis生成消息ID,这也是强烈建议的方案 redis> XADD mystream * name Sara surname OConnor "1601372323627-0" redis> XADD mystream * field1 value1 field2 value2 field3 value3 "1601372323627-1" //获取流包含的元素数量,即消息长度 redis> XLEN mystream (integer) 2 //- 表示最小值 , + 表示最大值 redis> XRANGE mystream - + 1) 1) "1601372323627-0" 2) 1) "name" 2) "Sara" 3) "surname" 4) "OConnor" 2) 1) "1601372323627-1" 2) 1) "field1" 2) "value1" 3) "field2" 4) "value2" 5) "field3" 6) "value3" redis> ### XTRIM: ### 使用XTRIM对流进行修剪,限制长度,语法格式为: XTRIM key MAXLEN [~] count * key:队列名称 * MAXLEN:长度 * count:数量 ### XTRIM实例: ### 127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D "1601372434568-0" 127.0.0.1:6379> XTRIM mystream MAXLEN 2 (integer) 0 127.0.0.1:6379> XRANGE mystream - + 1) 1) "1601372434568-0" 2) 1) "field1" 2) "A" 3) "field2" 4) "B" 5) "field3" 6) "C" 7) "field4" 8) "D" 127.0.0.1:6379> redis> ### XDEL: ### 使用XDEL删除消息,语法格式: XDEL key ID [ID ...] * key:队列名称 * ID:消息ID ### XLEN: ### 使用XLEN获取流包含的元素数量,即消息长度,语法格式: XLEN key ### XLEN实例: ### redis> XADD mystream * item 1 "1601372563177-0" redis> XADD mystream * item 2 "1601372563178-0" redis> XADD mystream * item 3 "1601372563178-1" redis> XLEN mystream (integer) 3 redis> ### XRANGE: ### 使用XRANGE获取消息列表,会自动过滤已经删除的消息,语法格式: XRANGE key start end [COUNT count] * key:队列名 * start:开始值,-表示最小值 * end:结束值,+表示最大值 * count:数量 ### XRANGE实例: ### redis> XADD writers * name Virginia surname Woolf "1601372577811-0" redis> XADD writers * name Jane surname Austen "1601372577811-1" redis> XADD writers * name Toni surname Morrison "1601372577811-2" redis> XADD writers * name Agatha surname Christie "1601372577812-0" redis> XADD writers * name Ngozi surname Adichie "1601372577812-1" redis> XLEN writers (integer) 5 redis> XRANGE writers - + COUNT 2 1) 1) "1601372577811-0" 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) "1601372577811-1" 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen" redis> ### XREVRANGE: ### 使用XREVRANGE获取消息列表(反向,ID从大到小),会自动过滤已经删除的消息,语法格式: XREVRANGE key end start [COUNT count] * key:队列名 * end:结束值,+表示最大值 * start:开始值,-表示最小值 * count:数量 ### XREVRANGE实例: ### redis> XADD writers * name Virginia surname Woolf "1601372731458-0" redis> XADD writers * name Jane surname Austen "1601372731459-0" redis> XADD writers * name Toni surname Morrison "1601372731459-1" redis> XADD writers * name Agatha surname Christie "1601372731459-2" redis> XADD writers * name Ngozi surname Adichie "1601372731459-3" redis> XLEN writers (integer) 5 redis> XREVRANGE writers + - COUNT 1 1) 1) "1601372731459-3" 2) 1) "name" 2) "Ngozi" 3) "surname" 4) "Adichie" redis> ### XREAD: ### 使用XREAD以阻塞或非阻塞方式获取消息列表,语法格式: XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...] * count:用于限定获取的消息数量 * \[BLOCK milliseconds\]:用于设置XREAD为阻塞模式,默认为非阻塞模式 * ID:用于设置由哪个消息ID开始读取,使用0表示从第一条消息开始。此处需要注意,消息队列ID是单调递增的,所以通过设置起点,可以向后读取。在阻塞模式中,可以使用$,表示最新的消息ID。(非阻塞模式下$无意义)。 XREAD读消息时分为阻塞和非阻塞模式,使用BLOCK选项可以表示阻塞模式,需要设置阻塞时长。非阻塞模式下,读取完毕(即使没有任何消息)立即返回,而在阻塞模式下,若读取不到内容,则阻塞等待。 **一个典型的阻塞模式用法为:** 127.0.0.1:6379> XREAD block 1000 streams memberMessage $ (nil) (1.07s) 我们使用Block模式,配合使用$作为ID,表示读取最新的消息,若没有消息,命令阻塞,等待过程中,其他客户端向队列追加消息,则胡立即读取到。 因此,典型的队列就是XADD配合XREAD Block完成。XADD负责生成消息,XREAD负责消费消息。 ### XREAD实例: ### # 从 Stream 头部读取两条消息 > XREAD COUNT 2 STREAMS mystream writers 0-0 0-0 1) 1) "mystream" 2) 1) 1) 1526984818136-0 2) 1) "duration" 2) "1532" 3) "event-id" 4) "5" 5) "user-id" 6) "7782813" 2) 1) 1526999352406-0 2) 1) "duration" 2) "812" 3) "event-id" 4) "9" 5) "user-id" 6) "388234" 2) 1) "writers" 2) 1) 1) 1526985676425-0 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) 1526985685298-0 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen" ### 消息ID说明: ### XADD生成的1526984818136-0,就是Redis生成的消息ID,由两部分组成:时间戳-序号。时间戳时毫秒级单位,是生成消息的Redis服务器时间,它是个64位整型。序号是在这个毫秒时间点内的消息序号。它也是个64位整型。 **通过multi批处理,来验证序号的递增:** 127.0.0.1:6379> MULTI OK 127.0.0.1:6379> XADD memberMessage * msg one QUEUED 127.0.0.1:6379> XADD memberMessage * msg two QUEUED 127.0.0.1:6379> XADD memberMessage * msg three QUEUED 127.0.0.1:6379> XADD memberMessage * msg four QUEUED 127.0.0.1:6379> XADD memberMessage * msg five QUEUED 127.0.0.1:6379> EXEC 1) "1553441006884-0" 2) "1553441006884-1" 3) "1553441006884-2" 4) "1553441006884-3" 5) "1553441006884-4" 由于一个redis命令的执行很快,所以可以看到在同一时间戳内,是通过序号递增来表示消息的。 为了保证消息是有序的,因此Redis生成的ID是*单调递增*有序的。由于ID中包含时间戳部分,为了避免服务器时间错误而带来的问题(例如服务器时间延后了),Redis的每个Stream类型数据都维护一个latest\_generated\_id属性,用于记录最后一个消息的ID。若发现当前时间戳退后(小于latest\_generated\_id所记录的),则采用时间戳不变而序号递增的方案来作为新消息ID(这也是序号为什么使用int64的原因,保证有足够多的的序号),从而保证ID的单调递增性质。 强烈建议使用Redis的方案生成消息ID,因为这种时间戳+序号的单调递增的ID方案,几乎可以满足你全部的需求。但同时,记住ID是支持自定义的,别忘了! ### XGROUP CREATE: ### **使用XGROUP CREATE创建消费者组,语法格式如下:** XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername] * key:队列名称,如果不存在就创建 * groupname:组名 * $:表示从尾部开始消费,只接受新消息,当前Stream消息会全部忽略 **从头开始消费:** XGROUP CREATE mystream consumer-group-name 0-0 **从尾部开始消费:** XGROUP CREATE mystream consumer-group-name $ ### XREADGROUP GROUP: ### **使用XREADGROUP GROUP读取消费组中的消息,语法格式:** XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...] * goup:消费组名 * consumer:消费者名 * count:读取数量 * milliseconds:阻塞毫秒数 * key:队列名 * ID:消息ID XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream > 相关博文:[最全Redis Stream用法详解二][Redis Stream] [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpX3dfY2g_size_16_color_FFFFFF_t_70]: /images/20221120/7465898833cf43beb0e8b711ae4a0ec5.png [Redis Stream]: https://blog.csdn.net/li_w_ch/article/details/110660189
还没有评论,来说两句吧...