云南高端网站制作价格,电子商务的网站案例,微信营销的案例,海东地网站建设详细介绍了 Redis 5.0 版本新增加的数据结构Stream的使用方式以及原理#xff0c;如何实现更加可靠的消息队列。 文章目录 Stream 概述2 Stream基本结构3 存储数据3.1 Entry ID3.2 数量限制 4 获取数据4.1 范围查询4.2 独立消费消息4.2.1 非阻塞使用4.2.2 阻塞的使用 4.3 消费… 详细介绍了 Redis 5.0 版本新增加的数据结构Stream的使用方式以及原理如何实现更加可靠的消息队列。 文章目录 Stream 概述2 Stream基本结构3 存储数据3.1 Entry ID3.2 数量限制 4 获取数据4.1 范围查询4.2 独立消费消息4.2.1 非阻塞使用4.2.2 阻塞的使用 4.3 消费者组4.3.1 基本概念4.3.2 创建消费者组4.3.3 从消费者组消费 5 永久故障恢复5.1 XPENDING查看未处理消息5.2 XCLAIM转移消息5.3 XAUTOCLAIM自动转移 6 死信队列7 Stream监控8 删除消息9 零长度Stream10 ACK确认11 总结 Stream 概述
基于Reids的消息队列实现有很多种比如基于PUB/SUB订阅/发布模式、基于List的 PUSH和POP一系列命令的实现、基于Sorted-Set的实现。虽然它们都有各自的特点比如List支持阻塞式的获取消息Pub/Sub支持消息多播Sorted Set支持延时消息但它们有太多的缺点
Redis List没有消息多播功能没有ACK机制无法重复消费等等。Redis Pub/Sub消息无法持久化只管发送如果出现网络断开、Redis 宕机等消息就直接没了自然也没有ACK机制。Redis Sorted Set不支持阻塞式获取消息、不允许重复消费、不支持分组。
Redis Stream 则是 Redis 5.0 版本新增加的数据结构。Redis Stream 主要用于实现消息队列MQMessage Queue可以说是目前最新Redis版本6.2中最完美的消息队列实现。
Redis Stream 有如下功能
提供了对于消费者和消费者组的阻塞、非阻塞的获取消息的功能。提供了消息多播的功能同一个消息可被分发给多个单消费者和消费者组提供了消息持久化的功能可以让任何消费者访问任何时刻的历史消息提供了强大的消费者组的功能 消费者组实现同组多个消费者并行但不重复消费消息的能力提升消费能力。消费者组能够记住最新消费的信息保证消息连续消费消费者组能够记住消息转移次数实现消费失败重试以及永久性故障的消息转移。消费者组能够记住消息转移次数借此可以实现死信消息的功能需自己实现。消费者组提供了PEL未确认列表和ACK确认机制保证消息被成功消费不丢失
Redis Stream基本上可以满足你对消息队列的所有需求。
2 Stream基本结构
Redis Stream像是一个仅追加内容的消息链表把所有加入的消息都一个一个串起来每个消息都有一个唯一的 ID 和内容它还从 Kafka 借鉴了另一种概念消费者组(Consumer Group)这让Redis Stream变得更加复杂。
Redis Stream的结构如下 每个 Stream 都有唯一的名称它就是 Redis 的 key在我们首次使用 XADD 指令追加消息时自动创建。
Consumer Group消费者组消费者组记录了Starem的状态**使用 XGROUP CREATE 命令手动创建在同一个Stream内消费者组名称唯一。一个消费组可以有多个消费者(Consumer)同时进行组内消费所有消费者共享Stream内的所有信息但同一条消息只会有一个消费者消费到不同的消费者会消费Stream中不同的消息这样就可以应用在分布式的场景中来保证消息消费的唯一性。last_delivered_id游标用来记录某个消费者组在Stream上的消费位置信息**每个消费组会有个游标任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。创建消费者组时需要指定从Stream的哪一个消息ID哪个位置开始消费该位置之前的数据会被忽略同时还用来初始化 last_delivered_id 这个变量。这个last_delivered_id一般来说就是最新消费的消息ID。pending_ids消费者内部的状态变量作用是维护消费者的未确认的消息ID。pending_ids记录了当前已经被客户端读取但是还没有 ack (Acknowledge character确认字符的消息。 目的是为了保证客户端至少消费了消息一次而不会在网络传输的中途丢失而没有对消息进行处理。如果客户端没有 ack那么这个变量里面的消息ID 就会越来越多一旦某个消息被ack它就会对应开始减少。这个变量也被 Redis 官方称为 PEL (Pending Entries List)。
3 存储数据
使用XADD命令添加消息到Stream末尾Stream的每个消息不仅仅是一个字符串而是由一个或多个字段值对组成。XADD也是唯一可以向Stream中添加数据的 Redis 命令但还有其他命令例如 XDEL 和 XTRIM可以从Stream中删除数据。
完整的XADD语法为 XADD key [NOMKSTREAM] [MAXLEN|MINID [|~] threshold [LIMIT count]] *|ID field value [field value ...]
第一必须的参数是key的名字如果key对应的Stream不存在则自动创建key后面添加NOMKSTREAM命令可以禁止自动创建Stream。
一条消息由一组字段值对组成它基本上是一个小dict字典。键值对将会按照用户给定的顺序存储读取Stream的命令如 XRANGE 或 XREAD保证返回字段和值的顺序与 XADD 添加的顺序完全相同。
第二个必须的参数是表示Stream中当前消息的唯一IDStream中每一个消息都有一个唯一的IDXADD命令返回的也是添加的消息的ID。如果命令中指定的ID参数是*字符那么XADD 命令将自动生成一个唯一的ID。
在key和 ID 之后后面的必须的参数就是组成我们的消息的键值对。
如下案例向名为xx的Stream中插入一条消息
127.0.0.1:6379 XADD xx * name xiaoming age 22
1624458068086-0使用XLEN即可查看Stream中的消息数目
127.0.0.1:6379 XLEN xx
(integer) 13.1 Entry ID
自动生成的ID格式为
millisecondsTime-sequenceNumber 即当前“毫秒时间戳-序列号”它表示当前的消息是在毫秒时间戳millisecondsTime产生的并且是该毫秒内产生的第sequenceNumber1条消息。这种格式的ID满足自增的特性支持范围查找。
ID也可以由客户端自己定义但是形式必须是 “整数-整数”最小 ID 为 0-1而且后面加入的消息的 ID 必须要大于前面的消息 ID如果不大于那么会返回异常
127.0.0.1:6379 XADD xx 123-123 name xiaoming age 22
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item由此之前使用了自动生成的ID那么后面手动指定的ID也一定要比此前自动生成的ID更大才行比如
127.0.0.1:6379 XADD xx 1624458068096-0 name xiaoming age 22
1624458068096-03.2 数量限制
如果消息积累太多那么Stream 链表会很长对内存来说是一个大问题。而XDEL指令又不会真正的删除消息它只是给消息做了个标志位。
我们可以通过一些指定对Stream进行真正的修剪限制其最大长度。单独使用XTRIM指令也能对Stream进行限制它能指定MAXLEN参数用于指定Stream的最大长度消息之后长度超过MAXLEN时会自动将最老的消息清除确保最多不超过指定长度。
添加3个Stream元素
127.0.0.1:6379 XADD yy * a1 b1
1624460262356-0
127.0.0.1:6379 XADD yy * a2 b2
1624460267913-0
127.0.0.1:6379 XADD yy * a3 b3
1624460273296-0
127.0.0.1:6379 XRANGE yy -
1) 1) 1624460262356-02) 1) a12) b1
2) 1) 1624460267913-02) 1) a22) b2
3) 1) 1624460273296-02) 1) a32) b3使用XTRIM限制最多两个
127.0.0.1:6379 XTRIM yy MAXLEN 2
(integer) 1
127.0.0.1:6379 XRANGE yy -
1) 1) 1624460267913-02) 1) a22) b2
2) 1) 1624460273296-02) 1) a32) b3可以看到最老的元素被淘汰掉了。XADD指令也有XTRIM的功能它能在添加元素的同时对元素数量进行控制它的可选参数MAXLEN当添加消息之后长度超过MAXLEN时会自动将最老的消息清除确保最多不超过指定长度。
127.0.0.1:6379 XADD yy MAXLEN 2 * a4 b4
1624460537583-0
127.0.0.1:6379 XRANGE yy -
1) 1) 1624460273296-02) 1) a32) b3
2) 1) 1624460537583-02) 1) a42) b4使用 MAXLEN 选项精确修剪的花销是很大的Stream 为了节省内存空间采用了一种特殊的结构表示而这种结构的调整是需要额外的花销的。所以我们可以使用“~”来表示非精确修剪它会基保证至少会有指定的N条数据也可能会多一些。
例如以下列形式调用 XADD
ADD mystream MAXLEN ~ 1000 * ... entry fields here ...上面的命令将添加一个新元素但也会驱逐旧元素以便Stream将仅包含 1000 个元素或最多多几十个元素。
4 获取数据
从Stream中获取数据的方式很多
最基本的就是单个客户端阻塞或者非阻塞的获取消息。Redis Stream还支持消费者组的方式获取消费者组中的每个消费者将消费到不同的消息这借鉴了kafka的消费者组的特性。基于自增ID的特性Redis Stream还支持按时间范围获取消息还支持使用游标迭代消息以增量检查所有未确定的历史消费记录。
Redis Stream通过不同的命令支持上述所有查询模式。
4.1 范围查询
使用XRANGE 和XREVRANGE命令实现消息的正向和逆向的范围查询。
要按范围查询Stream我们只需要指定两个 ID一个开始和一个结束。还有两个特殊的ID- 和 分别表示可能的最小和最大 ID。
如下案例查询Stream中的全部消息
127.0.0.1:6379 XRANGE xx -
1) 1) 1624458068086-02) 1) name2) xiaoming3) age4) 22
2) 1) 1624458068096-02) 1) name2) xiaoming3) age4) 22返回是一个包含两项的元素数组ID 和字段键值对列表。
由于ID的第一部分是unix时间戳因此特别适合范围查找并且ID支持不传递序列值部分这是允许的。如果没有传递序列值那么范围的开头将假定ID的序列值为 0而在结束部分将假定ID序列之为最大值可用序列号。
127.0.0.1:6379 XRANGE xx 1624458068086 1624458068087
1) 1) 1624458068086-02) 1) name2) xiaoming3) age4) 22XRANGE 最后支持一个可选的 COUNT 选项通过该选项指定返回前 N 个消息。
127.0.0.1:6379 XRANGE xx - COUNT 1
1) 1) 1624458068086-02) 1) name2) xiaoming3) age4) 22默认情况下范围包含两个端点可以在第一个ID前使用“(”来排除第一个端点值匹配。
127.0.0.1:6379 XRANGE xx 1624458068086 1624458068096
1) 1) 1624458068086-02) 1) name2) xiaoming3) age4) 22
2) 1) 1624458068096-02) 1) name2) xiaoming3) age4) 22使用“(”之后
127.0.0.1:6379 XRANGE xx (1624458068086 1624458068096
1) 1) 1624458068096-02) 1) name2) xiaoming3) age4) 22由于 XRANGE 的复杂度是 O(log(N)) 来寻找然后 O(M) 来返回 M 个元素因此在很少计数的情况下该命令具有对数时间复杂度这意味着迭代的每一步都很快。所以 XRANGE 也可以作为的流迭代器不需要 XSCAN 命令Redis没有提供XSCAN。
命令 XREVRANGE 与 XRANGE 等效但以相反的顺序返回元素两个ID参数顺序也是相反的因此 XREVRANGE 的实际用途是检查Stream中的最后一项是什么
127.0.0.1:6379 XREVRANGE xx - COUNT 1
1) 1) 1624458068096-02) 1) name2) xiaoming3) age4) 224.2 独立消费消息
通常我们想要的是订阅到达Stream的新消息而不是不想按Stream中的范围访问消息。也就是生产-消费模式这个概念可能与 Redis Pub/Sub或 Redis 阻塞列表有关但在使用Stream的方式上存在根本差异。
一个Stream可以有多个客户端消费者等待数据。默认情况下每个新消息都将交付给在给定Stream中等待数据的每个消费者。这种行为不同于每个消费者将获得不同的消息的阻塞列表但是交付给多个消费者的能力类似于 Pub/Sub。
在Pub/Sub 中消息一经推出就被丢弃无论如何都不会存储而在使用阻塞列表时当客户端收到消息时消息会从列表中弹出有效地删除但Stream以完全不同的方式工作。所有的消息都无限期地存储在Stream中除非用户明确要求删除消息不同的消费者会通过记住收到的最后一条消息的 ID 来指导什么是最新消息。
XREAD命令提供侦听到达一个或者多个Stream的新消息的能力仅返回 ID 大于调用者传递的最后接收到的ID的消息。
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]4.2.1 非阻塞使用
如果不使用 BLOCK 选项则该命令是同步的可以认为与 XRANGE 有点相关它将返回流内的一系列项目但是即使我们只考虑同步用法它与 XRANGE 相比也有两个根本区别
如果我们想同时从多个key中读取可以在调用此命令传递多个Stream的key。这是 XREAD 的一个关键功能因为尤其是在使用 BLOCK 进行阻塞时能够通过单个连接侦听多个key是一项重要功能。XRANGE 常用于返回两个ID 范围中的一系列消息但 XREAD 更适合获取从某个ID开始的一系列消息该消息可能我们目前获取的任何其他消息的ID都大即从前向后消费最新的消息。XREAD常用于用于迭代Stream的消息所以我们传递给 XREAD 的通常是我们上一次从该Stream接收到的最后一个消息的ID。
XREAD的简单使用如下有两个流 xx和 yy并且我想从它们包含的第一个元素开始从这两个流中读取数据可以像下面的示例一样调用 XREAD
127.0.0.1:6379 XREAD STREAMS xx yy 0 0
1) 1) xx2) 1) 1) 1624458068086-02) 1) name2) xiaoming3) age4) 222) 1) 1624458068096-02) 1) name2) xiaoming3) age4) 22
2) 1) yy2) 1) 1) 1624460273296-02) 1) a32) b32) 1) 1624460537583-02) 1) a42) b4命令中的STREAMS 选项是必须的并且必须是最终选项因为该选项后面就是要获取对应Stream的起始ID
STREAMS key_1 key_2 key_3 ... key_N ID_1 ID_2 ID_3 ... ID_N上面的案例中我们编写的起始ID都是0不完整的ID是有效的规则和XRANGE一样因此我们希望从Stream xx和Stream yy中获取的所有消息的ID都大于0-0不会包含传递的ID。
我们也可以在之前添加COUNt N选项表示最多从每一个Stream中返回N个消息
127.0.0.1:6379 XREAD COUNT 1 STREAMS xx yy 0 0
1) 1) xx2) 1) 1) 1624458068086-02) 1) name2) xiaoming3) age4) 22
2) 1) yy2) 1) 1) 1624460273296-02) 1) a在上面的示例中我们收到的流 xx和yy收到的最后一个消息的ID为 1624458068086-0和1624460273296-0因此下一次迭代时我们传递的ID就是最后一条消息的ID
127.0.0.1:6379 XREAD COUNT 1 STREAMS xx yy 1624458068086-0 1624460273296-0
1) 1) xx2) 1) 1) 1624458068096-02) 1) name2) xiaoming3) age4) 22
2) 1) yy2) 1) 1) 1624460537583-02) 1) a4最终当Stream被迭代顺序消费完毕时调用不会返回任何消息而只是一个空数组如果想要随时获取最新推动的消息那么我们必须不断地重试该操作因此该命令也支持阻塞模式。
4.2.2 阻塞的使用
上面的非阻塞使用方式和XRANGE似乎没有太大的区别有趣的部分是我们可以通过指定 BLOCK 可选参数轻松地将 XREAD 转换为阻塞命令该命令能够根据指定的Stream和 ID 进行阻塞并在请求的key之一接受数据后自动解除阻塞。
重要的是如果存在多个使用此命令等待相同Stream的相同ID范围的客户端那么每个消费者都将获得一份数据副本这与使用阻塞列表的弹出操作时发生的情况不同阻塞列表中每一个消费者将获得不同的消息。
可以指定阻塞的超时时间单位是毫秒如果传递0则表示永久阻塞直到任何一个的Stream有数据返回。超时时间过了之后如果没有如何条件的消息到达该命令自动返回null。
即使传递了 BLOCK 命令但至少在传递的Stream之一中有符合数据可以立即返回时该命令将同步执行就像缺少 BLOCK 选项一样。
另外当阻塞时有时我们只想接收从阻塞那一刻开始通过 XADD 添加到流中的消息我们对已添加消息的历史记录不感兴趣。对于此需求我们可以使用特殊的“$”ID 向流发出信号表明我们只需要最新的消息这也通常是最常用的。Redis会默认检查Stream中目前最大的消息的ID并在 XREAD 命令行中使用该ID。
如下案例我们阻塞的监听xx和yy两个Stream都是采用“$”监听最新的数据且超时时间为10000毫秒即10秒。
127.0.0.1:6379 XREAD BLOCK 10000 STREAMS xx yy $ $
1) 1) yy2) 1) 1) 1624506208518-02) 1) a52) b5
(2.88s)当使用“$”监听最新的数据之后下一个指令的ID就应该传递这次返回的最大的ID。
与阻塞列表操作类似从等待数据的客户端的角度来看阻塞Stream读取是公平的因为语义是 FIFO 风格。当新项目可用时第一个阻塞给定Stream的客户端将是第一个解除阻塞的客户端。
XREAD 除了 COUNT 和 BLOCK 之外没有其他选项因此它是一个非常基本的命令具有特定目的可以直接使用消费者监听一个或多个流。使用消费者组 API 可以获得更强大的使用流的功能但是通过消费者组读取是由名为 XREADGROUP 的不同命令实现的。
4.3 消费者组
4.3.1 基本概念
XREAD可以实现一个消费者监听多个Stream当有数据到达的时候满足条件的数据将会返回给买一个消费者同样的副本实现消息的多播。但有时候我们可能需要的不是为多个不同的消费者或者客户端提供相同的消息流而是从同一Stream中向许多客户端提供不同的消息子集。
这样的一个明显有用的案例是处理缓慢的消息通过将Stream中不同的消息路由到准备好接收Stream的不同线程来扩展消息处理的能力。
我们想要的是不同的消费者消费Stream中的不同的数据这看起来和阻塞列表有些相似的。为了实现这一点Redis Stream使用了一个称为消费者组consumer groups的概念。这个名字明显借鉴了kafka但是从实现的角度来看Redis 消费者组与 Kafka消费者组没有任何关系。kafka的消费者组中的消费者还需要和分区对应而Redis的消费者组中的消费者相当于直接从Stream中获取消息。
一个消费者组就像一个从一个Stream中获取数据的伪消费者实际上服务于多个消费者将获取的消息分发到多个不同的消费者并且提供了一定的保证
每条消息都提供给不同的消费者因此不可能将相同的消息传递给同一个组内的多个消费者。在消费者组中消费者通过名称进行标识该名称是实现消费者的客户端必须提供的区分大小写的字符串。因此即使在断开连接后流消费者组仍保留所有消费者的状态客户端可以再次声明为同一个消费者。每个消费者组中都有第一个没有消费的 ID 的概念last_delivered_id这样当消费者请求新消息时它可以只提供以前未传递的消息。消费消息需要使用特定命令进行显式确认即ack。这表示此消息已正确处理因此可以从消费者组中移除。消费者组跟踪所有当前未ack的消息即已传递给消费者组的某个消费者但尚未确认为已处理的消息。由于此功能在访问Stream的消息历史记录时每个消费者只会看到传递给它的消息。
一个消费者组也可以被理解Stream的一种状态记录或者说Stream的一种辅助数据结构很明显单个Stream可以有多个消费者组这些消费者组可以具有不同的消费者集。
实际上甚至可以在同一个流中让客户端在没有消费者组的情况下通过 XREAD 读取而客户端在不同的消费者组中则通过 XREADGROUP 命令读取。
消费者组的相关命令
XGROUP 用于创建、销毁和管理消费者组。XREADGROUP 用于通过消费者组从Stream中读取消息。XACK 允许消费者将待处理消息标记为已正确处理。
4.3.2 创建消费者组
XGROUP命令的功能非常强大命令模版为
XGROUP [CREATE key groupname ID|$ [MKSTREAM]] [SETID key groupname ID|$] [DESTROY key groupname] [CREATECONSUMER key groupname consumername] [DELCONSUMER key groupname consumername]XGROUP命令用于管理与Stream数据结构关联的消费者组。能够做的事
创建与流关联的新消费者组。销毁一个消费者组。从消费者组中删除特定消费者。将消费者组的last_delivered_id设置为其他值。
通过XGROUP CREATE可以创建某个Stream的消费者组必须传递起始消息 ID 参数用来初始化 last_delivered_id 变量这样消费者组才能知道当第一个消费者连接时接下来要提供什么消息即刚创建组时的最后一条消息 ID 是什么。
一个简单的示例如下
127.0.0.1:6379 XGROUP CREATE yy mygroup 0
OK如果我们提供“ ”作为 I D 那么表示只有从现在开始到达 S t r e a m 中的新消息才会提供给组中的消费者。如果我们指定 0 那么表示消费者组将消费 S t r e a m 历史记录中的所有消息从第一条开始。我们也可以可以指定任何其他有效 I D 只需要直到消费者组将开始传递大于指定的 I D 的消息。因为“ ”作为ID 那么表示只有从现在开始到达Stream中的新消息才会提供给组中的消费者。如果我们指定 0 那么表示消费者组将消费Stream历史记录中的所有消息从第一条开始。我们也可以可以指定任何其他有效 ID只需要直到消费者组将开始传递大于指定的 ID 的消息。因为“ ”作为ID那么表示只有从现在开始到达Stream中的新消息才会提供给组中的消费者。如果我们指定0那么表示消费者组将消费Stream历史记录中的所有消息从第一条开始。我们也可以可以指定任何其他有效ID只需要直到消费者组将开始传递大于指定的ID的消息。因为“”表示流中当前最大的消息ID所以指定“$”将产生仅消费新消息的效果。
XGROUP CREATE 还支持自动创建Stream如果它不存在使用可选的 MKSTREAM 子命令作为最后一个参数
127.0.0.1:6379 XGROUP CREATE yyy mygroup $ MKSTREAM
OK4.3.3 从消费者组消费
我们可以使用 XREADGROUP 命令通过消费者组读取消息。
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...] XREADGROUP 与 XREAD 非常相似提供相同的 BLOCK 阻塞选项否则为同步命令。但是必须始终指定一个强制性选项即 GROUP 并且有两个参数消费者组的名称和尝试读取的消费者的名称。选项 COUNT 也受支持并且与 XREAD 中的选项相同。
消费者名称是客户端用来在组内标识自己的字符串。Redis发现新名字的消费者时它会在对应的消费者组内自动创建不同的客户端应该选择不同的消费者名称。
Stream yy中有4条数据
127.0.0.1:6379 XRANGE yy -
1) 1) 1624516905844-02) 1) a32) b3
2) 1) 1624516910389-02) 1) a42) b4
3) 1) 1624516914709-02) 1) a52) b5
4) 1) 1624516919774-02) 1) a62) b6我们创建一个消费者并且消费消费者组mygroup中的一条消息
127.0.0.1:6379 XREADGROUP GROUP mygroup c1 COUNT 1 STREAMS yy
1) 1) yy2) 1) 1) 1624516905844-02) 1) a32) b3上面的命令中。在 STREAMS 选项之后请求的 ID 是特殊 ID “”。这个特殊的 ID 只在消费者组的上下文中有效它的意思是获取到目前为止从未传递给其他消费者的消息并且会更新last_delivered_id通常都是传递这个参数。
ID也可以指定0、其他ID或者不完整的ID时间戳部分但这样的话Stream只会返回已传递给当前消费者并且没有被XACK确定的历史消息即该消费者内部的pending_ids集合在这种情况下BLOCK 和 NOACK 都被忽略。
我们对消费者组mygroup再创建一个消费者c2并且再消费一条数据
127.0.0.1:6379 XREADGROUP GROUP mygroup c2 COUNT 1 STREAMS yy
1) 1) yy2) 1) 1) 1624516910389-02) 1) a42) b4这时会返回a4-b4因为a3-b3已被同组内的a3-b3消费了。
XREADGROUP有几个特点
消费者在第一次被提及时自动创建无需显式创建。可以同时读取多个key但是要使其正常工作需要在每个Stream中创建一个具有相同名称的消费者组。XREADGROUP 是写命令因为即使从Stream中读取消费者组也会修改last_delivered_id因此只能在主实例上调用。
5 永久故障恢复
在上面的消费介绍中消费者因故障重启时可以通过XREADGROUP并在最后指定ID为0来获取分配给该消费者的所有未ACK的消息实现消息的不丢失。
然而在现实世界中消费者可能会永久失败并且永远无法恢复。由于任何原因停止后永远不会恢复的消费者的待处理消息该怎么办呢Redis的消费者组支持为已分配但未处理的消息进行重新分配。
5.1 XPENDING查看未处理消息
首先我们需要使用XPENDING命令获取某个消费者组中的未处理消息的相关信息该命令时只读命令可以安全使用
XPENDING key group [[IDLE min-idle-time] start end count [consumer]] 最简单的使用如下
127.0.0.1:6379 XPENDING yy mygroup
1) (integer) 3
2) 1624516905844-0
3) 1624516914709-0
4) 1) 1) c12) 12) 1) c22) 2一共输出四种数据第一行是当前消费者组中待确认的消息总数第二、第三行是待处理消息中最低和最高的消息ID最后是消费者列表和他们的待处理消息数。可以看到c1和c2分别有一条和两条没有ACK的消息。
上面是最简略的输出通过提供开始和结束 ID可以传递 - 和 以及控制命令返回的信息量的计数可以对未处理的消息进行迭代我们能够了解有关待处理消息的更多信息。
127.0.0.1:6379 XPENDING yy mygroup - 10
1) 1) 1624516905844-02) c13) (integer) 14220274) (integer) 1
2) 1) 1624516910389-02) c23) (integer) 13543784) (integer) 1
3) 1) 1624516914709-02) c23) (integer) 9297374) (integer) 1返回的格式为一个数组每一个元素包含四个属性。第1个是未ACK的消息ID第2个是消息的当前所有消费者名称第3个是自上次将此消息传递给此使用者以来经过的毫秒数IDLE第4个是此消息的传递次数当其他消费者使用 XCLAIM 转移次消息时或者当消费者通过 XREADGROUP 查看未ACK的历史消息时它会增加。
如果我们想将输出限制为给定消费者的待处理消息则使用可选的最终参数消费者名称
127.0.0.1:6379 XPENDING yy mygroup - 10 c1
1) 1) 1624516905844-02) c13) (integer) 30609664) (integer) 1然后可以使用XRANGE来通过ID获取每一条消息的详细内容
127.0.0.1:6379 XRANGE yy 1624516905844-0 1624516905844-0
1) 1) 1624516905844-02) 1) a32) b35.2 XCLAIM转移消息
现在我们知道了哪些消费者有哪些没有ACK的消息假设消费者c1永远不再恢复我们需要将c1未处理的消息转移给c2现在我们可以开始第二步需要使用到XCLAIM命令。在Stream消费者组的上下文中XCLAIM命令更改未处理消息的所有权因此新所有者是指定为命令参数的消费者。
XCLAIM通常用于永久故障恢复
有一个带有关联消费者组的Stream。某些消费者 A 在该消费者组的上下文中通过 XREADGROUP 从Stream中读取消息。每获取一个消息就会在该消费者的待处理消息列表(PEL)中创建一个待处理消息元素这意味着消息已传递给给定消费者但尚未通过 XACK 确认。由于某些情况突然那个消费者永远下线了。此时其他消费者可以先使用 XPENDING 命令检查的待处理消息列表为了继续处理此类消息接着使用 XCLAIM 获取消息的所有权并继续。从 Redis 6.2 开始消费者可以使用 XAUTOCLAIM 命令自动扫描和声明陈旧的待处理消息。
XCLAIM命令格式如下
XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID] consumer为目的消费者此外需要指定IDLE即该消息未处理的时间最小空闲时间以便只有在上述消息的空闲时间大于指定的空闲时间时该操作才会起作用。
指定IDLE的目的是为了防止两个客户端同时重试声明一条消息
Client 1: XCLAIM yy mygroup c2 3060966 1624516905844-0
Client 2: XCLAIM yy mygroup c3 3060966 1624516905844-0因为执行了XCLAIM之后该条消息的IDLE将会被重置并将增加转次数计数因此第二个客户端将无法声明再次转移它。
下面将c1的消息转移给c2
127.0.0.1:6379 XCLAIM yy mygroup c2 3060966 1624516905844-0
1) 1) 1624516905844-02) 1) a32) b3再次使用XPENDING查看c2
127.0.0.1:6379 XPENDING yy mygroup - 10
1) 1) 1624516905844-02) c23) (integer) 864724) (integer) 2
2) 1) 1624516910389-02) c23) (integer) 32928934) (integer) 1
3) 1) 1624516914709-02) c23) (integer) 28682524) (integer) 1可以发现1624516905844-0这条消息已被转移给c2了现在c2可以通过XREAD重新遍历PEL来处理该条消息。
XCLAIM命令将会返回对应的消息的详细信息如果不需要我们可以添加JUSTID参数来只返回成功声明的消息的 ID这有助于减少网络带宽的占用。
5.3 XAUTOCLAIM自动转移
XPENDING 和 XCLAIM 为不同类型的恢复机制提供了基本的步骤。Redis 6.2 中添加的 XAUTOCLAIM 命令则通过让 Redis 管理它来优化通用过程并为大多数恢复需求提供简单的解决方案。
XAUTOCLAIM 识别空闲的待处理消息并将它们的所有权转移给指定的消费者XAUTOCLAIM 相当于先调用 XPENDING然后调用 XCLAIM。
XAUTOCLAIM key group consumer min-idle-time start [COUNT count] [JUSTID]所以我们可以使用XAUTOCLAIM来声明转移一条消息申明一个c3从mygroup中拉取一条消息
127.0.0.1:6379 XREADGROUP GROUP mygroup c3 STREAMS yy
1) 1) yy2) 1) 1) 1624516919774-02) 1) a62) b6使用XAUTOCLAIM将c3中的消息转移给c2如下所示
127.0.0.1:6379 XAUTOCLAIM yy mygroup c2 100 1624516919773 COUNT 1
1) 0-0
2) 1) 1) 1624516919774-02) 1) a62) b6指定的毫秒值同样是空闲的时间ID表示的则是最小的消息ID而不是精确的IDCOUNT N表示最多转移N条消息。该命令同样返回消息的详细内容使用JUSTID可以只返回ID。当 XAUTOCLAIM 返回“0-0”Stream ID 作为时这意味着它到达了消费者组待处理条目列表的末尾。
6 死信队列
在 XPENDING 输出中观察到的第四个参数是每条消息的传递次数。
127.0.0.1:6379 XPENDING yy mygroup - 10
1) 1) 1624516905844-02) c23) (integer) 1290414) (integer) 5
2) 1) 1624516910389-02) c23) (integer) 12241774) (integer) 2
3) 1) 1624516914709-02) c23) (integer) 12241774) (integer) 2
4) 1) 1624516919774-02) c23) (integer) 599134) (integer) 2计数器以两种方式递增通过 XCLAIM 成功转移消息时使用 XREADGROUP 访问待处理消息的历史记录时。
当出现故障时消息会被多次传递这是正常的但最终它们通常会得到处理和确认。但是处理某些特定消息可能会出现问题因为它本身的数据有问题导致处理代码异常。在这种情况下消费者将持续无法处理此特定消息。
因为我们可以使用该计数器来检测由于某种原因而始终无法处理的消息这种消息被称为坏消息也叫死信DeadLetter无法投递的消息。因此一旦转移计数器达到要给给定阈值将此类消息放入另一个Stream并且发送一封邮件可能更明智。
这基本上是 Redis Streams 实现死信概念的方式。
7 Stream监控
Redis支持各种命令来监控Stream的信息此前我们已经介绍了 XPENDING命令它允许我们检查在给定时刻正在处理的消息列表以及它们的空闲时间和转移次数。
当我们想要获取其他信息的时候我们可以使用XINFO命令XINFO 命令是一个可观察性接口可以与子命令一起使用以获取有关Stream或消费者组的信息。
XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]XINFO的相关调用
XINFO STREAM key显示有关Stream的信息。XINFO STREAM key FULL [COUNT count]该命令返回Stream的整个状态的详细信息包括消息、组、消费者和待处理消息列表 (PEL)信息类似于几个命令的组合。XINFO GROUPS key获得与流关联的所有消费者组的信息。XINFO CONSUMERS key group获取特定消费者组中每个消费者的信息。
例如XINFO STREAM 显示有关Stream本身的信息
127.0.0.1:6379 XINFO STREAM yy1) length2) (integer) 43) radix-tree-keys4) (integer) 15) radix-tree-nodes6) (integer) 27) last-generated-id8) 1624516919774-09) groups
10) (integer) 1
11) first-entry
12) 1) 1624516905844-02) 1) a32) b3
13) last-entry
14) 1) 1624516919774-02) 1) a62) b6上面展示了有关Stream在内部如何编码的信息Stream 是基于 RadixTree 数据结构实现的。还显示Stream中的第一条和最后一条消息。还展示了此Stream关联的消费者组的数量我们可以进一步挖掘获取有关消费者群体的更多信息
127.0.0.1:6379 XINFO GROUPS yy
1) 1) name2) mygroup3) consumers4) (integer) 35) pending6) (integer) 47) last-delivered-id8) 1624516919774-0上面展示了Stream yy的全部group的信息包括名字、消费者数量、未确认消息数量、last-delivered-id游标。
然后我们可以通过检查组中注册的消费者来更详细地检查特定消费者组的状态。
127.0.0.1:6379 XINFO CONSUMERS yy mygroup
1) 1) name2) c13) pending4) (integer) 05) idle6) (integer) 6414621
2) 1) name2) c23) pending4) (integer) 45) idle6) (integer) 974923
3) 1) name2) c33) pending4) (integer) 05) idle6) (integer) 12554528 删除消息
Stream还有一个特殊的命令用于从Stream中通过 ID删除消息。通常对于仅附加数据结构比如其他消息队列来说这可能看起来像一个奇怪的功能但它实际上对涉及隐私法规的应用程序很有用。
127.0.0.1:6379 XRANGE yy -
1) 1) 1624516905844-02) 1) a32) b3
2) 1) 1624516910389-02) 1) a42) b4
3) 1) 1624516914709-02) 1) a52) b5
4) 1) 1624516919774-02) 1) a62) b6使用 XDEL 并传递Stream的名称后跟要删除的 ID
127.0.0.1:6379 XDEL yy 1624516919774-0
(integer) 1再次使用XRANGE
127.0.0.1:6379 XRANGE yy -
1) 1) 1624516905844-02) 1) a32) b3
2) 1) 1624516910389-02) 1) a42) b4
3) 1) 1624516914709-02) 1) a52) b5但是在当前Redis6.2的实现中实际上当从Stream中删除消息时该消息并未真正被移除只是被标记为已删除使用XPENDING也能够看到其还在存在于PEL中。最终如果所有消息都被标记为已删除则所有消息才将被销毁并回收内存。
在 Redis 的未来版本中如果达到给定数量的已删除消息可能会触发节点垃圾收集。但由于现在Stream的使用并不多并且实现复杂这种功能没有提供。
9 零长度Stream
Stream和其他Redis数据结构的区别在于当其他数据结构不再有任何元素时key本身将被删除。例如当调用 ZREM 将删除ZSET中的最后一个元素时将完全删除该ZSET。
由于使用计数为零的 MAXLEN 选项XADD 和 XTRIM 命令或者因为调用了 XDELStream被允许保持在零元素处。
之所以Stream这么特殊是因为Stream可能有关联的consumer group我们不想因为Stream中不再有任何消息而失去consumer group定义的状态。
目前Redis6.2的版本中即使没有关联的消费者组Stream也不会被删除但这在未来可能会改变。
10 ACK确认
当消息通过调用XREADGROUP被传递给某个消费者时或者当消费者获得调用 XCLAIM 的消息的所有权时消息处于待处理状态将被存储在该消费者的待处理消息列表 (Pending Entries ListPEL)中。PEL就是开头所说的pending_ids属性。
虽然待处理的消息已传递给某个消费者但服务器还不确定它是否至少被正确的处理过一次。对 XREADGROUP 调用并传递具体的ID值例如使用 ID 0可以获取消费者的消息历史记录也就是将返回PEL中的消息。也可以通过XPENDING 命令列出待处理消息列表PEL。
XACK 命令从Stream消费者的PEL中删除一条或多条消息。
XACK key group ID [ID ...]一旦消费者成功处理了一条消息它就应该调用 XACK这样这条消息就不会被再次处理而关于这条消息的 PEL 记录也被清除从 Redis 服务器释放内存。
127.0.0.1:6379 XPENDING yy mygroup - 10
1) 1) 1624516905844-02) c23) (integer) 28584164) (integer) 5
2) 1) 1624516910389-02) c23) (integer) 39535524) (integer) 2
3) 1) 1624516914709-02) c23) (integer) 39535524) (integer) 2
4) 1) 1624516919774-02) c23) (integer) 27892884) (integer) 2
127.0.0.1:6379 XACK yy mygroup 1624516905844-0
(integer) 1
127.0.0.1:6379 XPENDING yy mygroup - 10
1) 1) 1624516910389-02) c23) (integer) 47510154) (integer) 2
2) 1) 1624516914709-02) c23) (integer) 47510154) (integer) 2
3) 1) 1624516919774-02) c23) (integer) 35867514) (integer) 2该命令返回成功确认的消息数。某些消息 ID 可能不再是 PEL 列表的一部分例如因为它们已经被确认那么 XACK 不会将它们算作成功确认。
11 总结
Redis Stream 基于内存存储其速度相比于真正的消息队列比如kafka、rocketmq等更快但也是因为内存的原因我们无法使用Redis Stream长时间的存储大量的数据因为内存相比于磁盘来说要昂贵得多。另外Redis Stream也没有提供延时消息的能力。
虽然Redis Stream作为消息队列的功能已经很强大了但是因为“基于内存”这个Redis最重要的优点导致Redis Stream无法存储大量的数据这需要很多的内存因此到目前为止Redis Stream在生产环境的应用也并不多它更适用于小型、廉价的应用程序以及可以丢弃数据的场景限制Stream长度比如记录某些不重要的操作日志。
目前Redis 6.2版本的Redis Stream似乎还在完善过程中期待后续的更加强大的新功能。
相关文章
https://redis.io 如有需要交流或者文章有误请直接留言。另外希望点赞、收藏、关注我将不间断更新各种Java学习博客