如何保证消息不丢失

一般来讲,消息队列都会有一定的机制去保证消息的不丢失,丢失消息大多数是使用问题。

检测消息是否丢失

这里有一个逻辑上的处理办法,就是发送消息时,给消息一个编号,然后利用消息的有序性来判断消息是否丢失。这里需要配合消息队列客户端的拦截机制去做,在拦截器中去检测消息的连续性,这样检测消息连续性的代码就不会侵入业务层。

但是需要注意的一点是,像Kafka和RocketMQ这种消息队列,他们并不能保证消息在Topic上是连续的,只能保证在分区内是连续的,那么我们在编号时,就需要按照分区来做消息的递增,确保编号连续性的检测是在每一个分区内部进行的。

如果producer是多实例的,由于并不好协调不同producer之间发送的顺序,所以该编号最好也是按照每个Producer单独递增的。

Consumer 实例的数量最好和分区数量一致,做到 Consumer 和分区一一对应,这样会比较方便地在 Consumer 内检测消息序号的连续性。

这里还需要考虑一种情况,就是消息重复发送的问题。因为难免会出现消息确认延迟,但实际上消息发送成功了,这时候,在消息的消费端,就需要保证消息的幂等性。要么给每条消息做标记,确保该消息只会消费一次,多次发送并不会消费该消息,要么保证一条消息多次消费的结果是一样的。

确保消息的可靠传递

消息的生产和消费一般分为一下几个阶段。

image-20231207110009586

  • 生产阶段: 在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。
  • 存储阶段: 在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
  • 消费阶段: 在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。

首先生产阶段,在编写代码时要注意,等收到Broker的响应确认后,再认为消息发送成功,否则进行重试。这样就可以保证在发送阶段消息的不丢失,换种说法,可以保证该条消息一定可以发送到Broker当中。

如果消息是异步发送的,我们需要在回调中,检查消息的发送结果。

而在于存储阶段,Broker不宕机是不会出现消息丢失的情况,这里涉及到刷盘策略,如果对于可靠性要求很高,可以将刷盘的频率调至最高频率。如果Broker是集群部署,则要确保发送到2个以上的节点,再给生产者返回存储成功。

消费阶段,这个阶段也是最容易代码编写出问题的阶段。该阶段不能在接收到消息后就给Broker返回消息确认,应该在业务代码全部执行完毕后,再返回消息确认,这时候Broker的ACK指针才会前移。