如何使用Redis作为消息队列
消息队列的存取需求
在分布式系统中,当两个组件要基于消息队列进行通信,一个组件会把消息传递给消息队列,然后就去做其他的事情,另一个组件会从消息队列中读取数据,在进行处理。我们把发送消息的称为生产者,消费消息的称为消费者。
这样处理的好处是,如果生产者发送消息的速度很快,消费者来不及处理也没问题,可以将这些消息暂存在消息队列当中,然后消费者可以按照一定的处理速度去异步的处理这些消息,从而达到一个流量消峰的效果。
不过,消息队列在存取消息时,必须要满足三个需求,分别是消息保序、处理重复的消息和保证消息可靠性。
消息队列对可靠性的要求
消息保序
虽然消费者是异步处理这些消息的,但是需要按照消息的顺序进行消费。不然可能会出现错误。
假如现在有一个x=3,第一个消息要把x * 2,然后第二个消息要把x + 3,如果消息正常执行,x的值最终为9,如果先执行了x + 3,那么x的值最终就会变为12。
重复消息处理
消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。此时,消费者可能会收到多条重复的消息。对于重复的消息,消费者如果多次处理的话,就可能造成一个业务逻辑被多次执行,如果业务逻辑正好是要修改数据,那就会出现数据被多次修改的问题了。
消息可靠性保证
另外,消费者在处理消息的时候,还可能出现因为故障或宕机导致消息没有处理完成的情况。此时,消息队列需要能提供消息可靠性的保证,也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。
Redis消息队列的解决方案
Redis 的 List 和 Streams 两种数据类型,就可以满足消息队列的这三个需求。
基于 List 的消息队列解决方案
具体来说,生产者可以使用 LPUSH 命令把要发送的消息依次写入 List,而消费者则可以使用 RPOP 命令,从 List 的另一端按照消息的写入顺序,依次读取消息并进行处理。
但是这里有一个性能问题,在生产者往 List 中写入数据时,List 并不会主动地通知消费者有新消息写入,如果消费者想要及时处理消息,就需要在程序中不停地调用 RPOP 命令(比如使用一个 while(1) 循环)。如果有新消息写入,RPOP 命令就会返回结果,否则,RPOP 命令返回空值,再继续循环。
所以,即使没有新消息写入 List,消费者也要不停地调用 RPOP 命令,这就会导致消费者程序的 CPU 一直消耗在执行 RPOP 命令上,带来不必要的性能损失。
Redis 提供了 BRPOP 命令。BRPOP 命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据
我们在将消息插入list时,要生成一个全局唯一的id,用来作为消息是否处理的标志。
为了留存消息,List 类型提供了 BRPOPLPUSH 命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存。这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。
基于 Streams 的消息队列解决方案
Streams 是 Redis 专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令。
XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
XREAD:用于读取消息,可以按 ID 读取数据;
XREADGROUP:按消费组形式读取消息;
XPENDING 和 XACK:XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,而 XACK 命令用于向消息队列确认消息处理已完成。
list支持的操作,stream都支持,下面是stream特有的。
1、Streams 本身可以使用 XGROUP 创建消费组,创建消费组之后,Streams 可以使用 XREADGROUP 命令让消费组内的消费者读取消息,消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了。使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。
为了保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息,Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”。
参考
《Redis核心技术实战》