国外最大的素材网站,陕西省建设网站查询证件相片,南宫28在线注册网站,什么事网络营销文章目录 一、简单的list消息队列1.命令示例2.伪代码示例3.方案优劣 二、Pub/Sub发布订阅1.消息丢失2.消息堆积 三、相对成熟的Stream1.redis命令介绍2.多消费者组测试3.Stream会持久化吗#xff1f;4.消息堆积如何解决#xff1f; 总结 用redis也是比较久了#xff0c;并且… 文章目录 一、简单的list消息队列1.命令示例2.伪代码示例3.方案优劣 二、Pub/Sub发布订阅1.消息丢失2.消息堆积 三、相对成熟的Stream1.redis命令介绍2.多消费者组测试3.Stream会持久化吗4.消息堆积如何解决 总结 用redis也是比较久了并且也对其他消息中间件也用了相当多的时间现在就redis是否适合做消息队列来梳理下获取梳理完之后可以有一个更加清晰的认知。笔者会从以下几个方面进行梳理。 一、简单的list消息队列 众所周知redis常见的数据结构有String、Hash、List、Set、zset。其中List可以是一个列表结构。可以通过LPUSH、RPOP两个命令来实现一个简单的队列。
LPUSH 将元素依次插入到列表头部RPOP 获取最后一个元素并且删除。 如下图所示生产者通过LPUSH命令依次插入a、b、c、d四个元素。消费者通过RPOP命令进行消费。 1.命令示例
生产者
# 通过LPUSH命令往test_queue填充a、b、c、d
127.0.0.1:6379 LPUSH test_queue a
(integer) 1
127.0.0.1:6379 LPUSH test_queue b
(integer) 2
127.0.0.1:6379 LPUSH test_queue c
(integer) 3
127.0.0.1:6379 LPUSH test_queue d
(integer) 4
127.0.0.1:6379 消费者
消费者
127.0.0.1:6379 RPOP test_queue
a
127.0.0.1:6379 RPOP test_queue
b
127.0.0.1:6379 RPOP test_queue
c
127.0.0.1:6379 RPOP test_queue
d
127.0.0.1:6379 RPOP test_queue
(nil)
127.0.0.1:63792.伪代码示例 生产者相对简单以简单的订单支付为例子
//在订单支付成功之后发送给积分系统
public void afterPayHandler(Order order){//发送消息到积分系统redisTemp.LPUSH(order,order);
}消费者
public void orderMessageListener(){while(true){//获取订单信息Order order redisTemp.RPOP(order);if(order ! null){//做积分系统的业务如添加积分等逻辑。}}
}如上所示消费者在消费的时候必须通过循环一直拉取队列数据达到数据的实时性但是也出现了CPU空转的问题。如果我们判断空的时候sleep休眠一段时间那就会存在消息实时性问题。休眠多久合适就成为了难以处理的问题。
好在redis有阻塞拉取的命令。
BRPOP test_queue 10(秒)。拉取命令阻塞10秒。如果是0就是一直阻塞。
3.方案优劣
优点足够简单也很好理解。但是我确实是想不到哪个场景适合这个方案笑哭。感觉也只能算普及知识了。缺点 不支持多消费者。任何一个消费者将redis的元素拉取删除之后其他消费者都无法再次拉取到。那就只能仅限于一对一消费了。消息丢失。没有ACK机制如果拉取消息后宕机后无法正常消费就会导致消息的丢失。
二、Pub/Sub发布订阅 List数据结构可以认为是开发者为了简单方便从而引进的一种消息队列的方式但是绝不”正宗“。Pub/Sub这种从名字上可以看出来就是专门为了消息队列而生的。 从上图可以看出发布订阅模式解决了多消费者的问题。但是还是存在两个问题。
1.消息丢失 发布订阅模型没有进行消息存储只是一个单纯的通道实时的把消息传送给消费者。那么这样就会有一个问题如果消费者中间下线再次上线的时候只能从最新的位置进行消费这样就会有消息丢失啦。
2.消息堆积 上文说发布订阅模型没有基于任何数据类似因此这个操作不会写入RDB和AOF中redis持久化机制。另外在消息堆积的时候数据是通过Buffer缓冲区实现的。这个缓冲区的大小可以在redis中进行配置。如果超过了缓冲区配置的上限此时Redis 就会「强制」把这个消费者踢下线。 总的说这个发布订阅模式相对比较脆弱虽然解决了多消费者的问题但是消息一致性较低消息丢失概率较高发布版本时重启了就可能丢消息试用的场景较少。
三、相对成熟的Stream Redis5.0 中增加了Stream消息队列相对成熟解决了较多的问题。
支持消息ACK反馈在消息消费成功的时候返回消费成功才不会再次推送消息。支持多消费者组。消息堆积问题优化。 1.redis命令介绍
发布命令
解释
#topic为 myStream1
# * 代表使用自动生成的ID作为消息的ID
# 接下来是多个 field value 组成的信息。
127.0.0.1:6379 XADD myStream1 * name zhangsan sex 20
1717500637523-0
127.0.0.1:6379 XADD myStream1 * name lisi sex 20
1717500644429-0
127.0.0.1:6379消费命令
# 消费myStream队列的10个数据最后的0意思是从头开始消费。
127.0.0.1:6379 XREAD COUNT 10 STREAMS myStream1 0
1) 1) myStream12) 1) 1) 1717500637523-02) 1) name2) zhangsan3) sex4) 202) 1) 1717500644429-02) 1) name2) lisi3) sex4) 20
127.0.0.1:63792.多消费者组测试
#创建一个消费者组为myGroup1并且指定消费位置。最后这个长串是信息的id
127.0.0.1:6379 XGROUP CREATE myStream1 myGroup1 1717500644429-0
OK#消费者组消费消费者组为myGroup1 当前消费者id为consumer1 拉取10个信息 注意最后这个 ‘‘
127.0.0.1:6379 XREADGROUP GROUP myGroup1 consumer1 COUNT 10 STREAMS myStream1
1) 1) myStream12) 1) 1) 1717501299234-02) 1) name2) lisi3) sex4) 20
127.0.0.1:6379验证ACK机制
myGroup1消费者组拉取一次之后将所有的消息拉取回来因为没有进行消息反馈ACK。所以再次拉取的时候还是将全量的消息拉取回来。执行一次ACK命令之后再次拉取消息发现少了一条消息。再次执行ACK命令后拉取不到消息了。
127.0.0.1:6379 XREADGROUP GROUP myGroup1 consumer1 COUNT 10 STREAMS myStream1 0
1) 1) myStream12) 1) 1) 1717501299234-02) 1) name2) lisi3) sex4) 202) 1) 1717502213457-02) 1) name2) lisi3) sex4) 20
127.0.0.1:6379 XREADGROUP GROUP myGroup1 consumer1 COUNT 10 STREAMS myStream1 0
1) 1) myStream12) 1) 1) 1717501299234-02) 1) name2) lisi3) sex4) 202) 1) 1717502213457-02) 1) name2) lisi3) sex4) 20
127.0.0.1:6379
127.0.0.1:6379
127.0.0.1:6379
127.0.0.1:6379 XACK myStream1 myGroup1 1717502213457-0
(integer) 1
127.0.0.1:6379 XREADGROUP GROUP myGroup1 consumer1 COUNT 10 STREAMS myStream1 0
1) 1) myStream12) 1) 1) 1717501299234-02) 1) name2) lisi3) sex4) 20
127.0.0.1:6379 XACK myStream1 myGroup1 1717501299234-0
(integer) 1
127.0.0.1:6379 XREADGROUP GROUP myGroup1 consumer1 COUNT 10 STREAMS myStream1 0
1) 1) myStream12) (empty list or set)
127.0.0.1:63793.Stream会持久化吗 会不管是RDB 还是AOF都会写入。所以不用担心宕机的问题。
4.消息堆积如何解决 既然会将Stream会进行持久化那么必然消息也会保存在内存中但是为了内存爆炸Stream可以在XADD命令的时候可以通过MAXLEN命令指定消息的最大长度在超过最大长度的时候旧消息会被删除只保留固定长度的新消息。这样看来消息堆积的问题只是进行了优化并没有完美的解决。
总结 到此获取对于redis消息队列的历史有了一定的了解redis作为运行在内存的数据库而言这个功能已经是很不错了或许你的场景足够简单消息的数量不多并且对于消息的丢失不是特别的敏感的话redis的Stream消息队列也是一个不错的选择。