首先,如果要确保每个消息只被消费一次,那么就要确保每一个消息都正常到达了消费端,即不能出现消息丢失。

以下三个地方会造成消息丢失:

1、消息从生产者写入到消息队列的过程;

2、消息在消息队列中的存储场景;

3、消息被消费者消费的过程。

1、在消息生产的过程中丢失消息

生产者一般是独立部署的应用程序,而消息队列一般也是独立部署。那么生产者的消息发往消息队列需要通过网络,这就有可能丢失。这里的一个比较好的解决办法就是重试,即重新发送消息。

但是重试则会导致消息重复,比如第一条消息因为在网络中拥堵,导致超过时延,生产设判断消息丢失,重新发送消息。但是过段时间重新发送的消息和之前的消息都到达了消息队列,那么这条消息就重复了。

2、在消息队列中丢失消息

拿Kafka来说,消息一般是存储在本地磁盘,而为了减少刷盘次数,消息会先写入Page Cache(操作系统提供的缓存)中,然后找合适时间刷盘。

这样设计,好处在于减少I/O,但是如果在未刷盘时,服务器掉电,就会导致在Page Cache中的数据丢失。

这里的解决办法有,调整刷盘时机,即过一段时间,或者一定量消息强行刷盘,但是会影响性能。另一种方法,部署Kafka集群,通过多个数据备份,防止丢数据。

3、在消费的过程中存在消息丢失的可能

在这里,消费端接收消息有可能失败,而正确接受完消息后,在处理的过程中也可能失败。所以这里需要注意,消费端一定要等到处理消息的逻辑执行完后,再给消息队列返回。

如何保证消息只被消费一次

1、保证幂等性

幂等性指的是不论执行多少次,最终的结果都是一致的。

比如说下面语句:

1
update table set a = 1 where id = 2;

这条语句不论执行多少次,结果都不会变(不考虑其他语句穿插),也就是说不论执行多少次,都不会影响最终结果的正确性。

这样就保证了消息重复时,虽然被执行了多次,但是不影响最终结果的正确性。

2、在生产、消费过程中增加消息幂等性的保证

生产者这边,可以通过给每一个消息增加一个全局唯一的id,这样消息队列接收到重复的id时,就知道消息是重复的,丢弃即可。

在消费端,可以通过给每一个消息一个全局id,如果消费完该消息,就将id存储起来,每次消费前查看是否已经消费该id,以此来确保每条消息只被消费一次。

另外一种做法就是通过增加一个类似于乐观锁的版本号,每次消费时,就把版本号增加。这样的话,第二次消费该消息时,就会发现版本号对不上,就会放弃消费该消息。

类似于:

1
update table set a = 1 , controler_version = controler_version + 1 where control_version = 1

参考

《消息队列高手课》