消息积压该如何处理
消息积压该如何处理
一般来说,消息积压的直接原因,是因为系统中某个部分出了问题,来不及处理上游发送的消息,才会导致消息积压。
优化性能避免消息积压
一般来说,我们并不需要考虑消息队列本身的性能,因为它的处理消息的能力要远强于我们业务处理能力。也就是说,性能瓶颈一般出现在生产者和消费者这两端。
1. 发送端性能优化
发送端一般都是执行完业务逻辑后,才开始发送消息。如果发送端速度过慢,则考虑是否是因为发送端业务逻辑执行太慢。
Producer 发送消息的过程:Producer 发消息给 Broker,Broker 收到消息后返回确认响应,这是一次完整的交互,它的耗时是以下几个步骤耗时的和:
- 发送端准备数据、序列化消息、构造请求等逻辑的时间,也就是发送端在发送网络请求之前的耗时;
- 发送消息和返回响应在网络传输中的耗时;
- Broker 处理消息的时延。
这里,如果想提高发送端的性能,最好的办法就是多线程发送消息。如果是一般的web项目,那么它本身就是多线程的,并不需要我们手动开多线程去发送消息(消息的产生与业务相关,手动开多线程没有太大的意义)。如果是一些特殊场景,则可以尝试手动开多线程发送数据,比如日志的推送。
如果是一个离线系统,不关心时延,则可以采用批量发送,kafka还是什么就采用了这种优化,它并不会一有消息就发出去,而是稍微等待一下,达到一定量再发送。
2. 消费端性能优化
如果消息积压,那么就需要考虑为什么消费者的消费速度跟不上生产速度。一般的优化手段是,对消费端进行水平进行扩容,来提升总体的消费性能。但是需要注意的是:在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(Partition)(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的,如果Consumer的数量超过了分区(或者队列)的数量,扩容是没有效果的,因为每个分区只支持单线程消费。
还有一种处理办法,就是只从消息队列中取消息,然后放入本地内存队列当中,这样可以避免消息堆积。但是这并非一个很好的解决方案,甚至可以说是一个错误的解决方案。因为它很容易导致消息的丢失。本地节点如果挂了,消息就没了。
积压了如何处理
积压无外乎两种情况导致的,一是发送速度过快了,二是消费速度变慢了。这个时候就要配合监控日志去排查到底是什么原因,然后采取相对应的解决办法。
还有一种情况是,频繁的出现消费错误,然后一直重试,导致消费变慢,这种情况就要去查看日志,看是否有大量的报错。