四时宝库

程序员的知识宝库

消息队列专栏(六)如何保证消息不丢失

本文为极客时间《消息队列高手课》学习笔记。

在工作中,对于刚刚入行的小伙伴在使用消息队列的时候往往最担心的事情就是消息队列是否会丢消息,那今天的学习笔记我们就成如何检测消息是否丢失开始。

1.检测消息丢失的方法

在我们使用消息队列的时候,实际上我们会在项目上线灰度阶段或者测试阶段对消息队列的消息可靠性进行验证。虽然一般大型公司已经提供了分布式链路跟踪系统来跟踪每条消息,但是项目上线初期,我们一般用比较简单,或者说比较笨的方式来校验消息的可靠性。

在实际应用中,我们会按照一定的频次不断的往生产端发送消息,发送一定时间的数据,那么消费端应该也会消费到多少数据,这时候我们简单的做一下生产端和消费端的数据量就大体知道是否有消息丢失。然后逐步提升QPS,直至有消息丢失,或者有消息消费延时,这时候我们就大体知道我们的消息队列的消息处理上限。但是一般情况下,正常的项目很难达到几十万甚至是几百万的QPS,所以一般不会出现消息队列无法支撑业务的场景。

在极客时间课程中,作者提出了利用消息队列的有序性来验证是否有消息丢失,实际上这种方式不仅可以校验消息的可靠性,还可以快速定位到丢失了哪些数据。具体实现逻辑如下。

在Producer端我们给发出的每条消息都附加一个连续递增的序号,然后在Consumer端来检查这个序号的连续性。如果消息没有丢失,Consumer收到消息的序号必然也是递增的,如果出现不连续的序号,那就是出现了消息丢失。

实际上文章也提到,类似于这种添加序号的操作,大多数消息队列的客户端都是支持拦截器机制的,可以利用拦截器机制在producer发送消息之前的拦截器中将序号注入,减少对于业务代码的侵入性,说起来像极了Spring的AOP机制。

但是我们按照上面的文档,也不得不先考虑到,如果是多个队列,这种情况就会出现问题,因此,我们只能保证分区上的消息有序,我们在发送消息的时候指定分区,并且在每个分区单独检测消息序号的连续性。

如果是多个Producer,则需要每个producer分别生成各自消息序号,增加producer的标识,在Consumer端按照每个Producer分别来检测序号的连续性。

Consumer实例的数量最好和分区数量一致,做到Consumer和分区一一对应,这样会比较方便地在 Consumer 内检测消息序号的连续性。

2.如何确保消息的可靠传递

一条消息从生产到消费可以划分为三个阶段,如图所示:

2.1 生产阶段

生产阶段就是消息在Producer创建经过网络传输到达Broker端的过程,消息队列通过最常用的请求确认机制确保消息的可靠传递,即:Broker收到消息后,会给客户端返回一个确认相应,表示消息已经收到。

在消息确认机制中需要明确:

  • 同步发送,既要处理好返回值,也要处理好对异常的捕获;
  • 异步发送:处理好回调方法;
  • 超时重试机制:如果长时间收不到确认返回结果,则需要进行重试,直到消息发送成功或者在重试一定的次数之后发送报警,人工参与。

但是需要注意的是,如果使用同步发送,一旦出现堵塞,会增加调用时间,因此,这种场景一般只针对消息丢失或者有消息延时特别敏感的场景。

2.2 存储阶段

存储阶段主要指的是Broker端,只要Broker正常运行,一般就不会出现消息丢失的问题。

但是如果是宕机,也会出现消息丢失的情况,比如说Kafka为了性能,只要消息已经写入pageCache就会返回确定消息,消息提交后如果还没有刷盘就发生宕机,就会存在丢失消息的风险。

对于单个节点的Broker,需要配置Broker参数,在收到消息后,将消息写入磁盘后再给Produer返回确认响应。例如RabbitMQ将消息的delivermode设置为2,exchange持久化后才返回确认消息,保证消息不会丢失;RocketMQ也可以将flushDiskType设置为SYNC_FLUSH同步刷盘机制。

如果是多节点集群,需要将Broker集群配置成至少将消息发送到一定阈值以上的节点(比如说N/2+1),再给客户端回复发送确认响应。

2.3 消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给Broker发送消费确认响应。如果Broker没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。

2.3.1 多个消费者消费相同分区的注意点

这里需要注意下:如果在同一个消费组内,A消费者拉走了Index=10的消息,还没有返回确认,这时候这个分区的消费位置还是10,如果此时B消费者来拉消息,会有两种情况:

  • 超时前,Broker认为这个分区还被A占用这,拒绝B的请求;
  • 超时后,Broker认为A已经超时没有返回,这次消费就失败了,当前消费位置还是10,B再来拉消息,会返回Index=10的消息。

即:如果多个消费者消费同一个分区,是在分区内顺序消费消息的,Index=10的消息没有被消费,那么index=11的消息也就不会消费。详情查阅:https://rocketmq.apache.org/docs/best-practice-consumer/

Orderly
The Consumer will lock each MessageQueue to make sure it is consumed one by one in order. This will cause a performance loss, but it is useful when you care about the order of the messages. It is not recommended to throw exceptions, you can return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT instead.

这也是为什么我们在补偿消息的时候,只要知道最近一次消费的消息index,然后将之后的消息重新消费不会出现重复消息消费的原因。

2.3.2 重试消费失败场景

RocketMQ对于消费时一直无法消费的消息,超过一定重试次数后,会将消息写入私信队列,避免分区被卡住。

3.消息队列如何处理重复消息

如果消息在网络传输过程中发送错误,由于发送方收不到确认,会通过重发来保证消息不丢失。但是,如果确认响应在网络传输时丢失,也会导致重发消息。也就是说,无论是Broker还是Consumer都是有可能收到重复消息的,那我们在编写消费代码时,就需要考虑这种情况,你可以想一下,在消费消息的代码中,该如何处理这种重复消息,才不会影响业务逻辑的正确性?

一般在业务中常用的解决方法有:

  1. 建立一个消息表,consumer消费之前,拿到消息做insert操作,用消息id做唯一主键,重复消费会导致主键冲突,携程的QMQ实现方式;
  2. 利用redis给消息分配一个全局id,只要消费过该消息,将消息以K-V形式写入redis,消费消息之前,根据key去redis查询是否有对应记录;
  3. 还是redis方案,不再是K-V形式,可以采用布隆过滤器或者bitmap方式实现;但是布隆过滤器随着数据量过大,误判率也会逐渐升高,因此,需要定时清除过滤器,同时使用布隆过滤器进行反向判定,如果不存在那就一定不存在;
  4. 如果数据量级过大,比如说亿级数据,实际上我们还可以使用分块的形式,比如说用hash将id分散到上万个队列中去,这样一个key值也就存储几万个数据,内存不会很大,不会出现bigkey;如果是数据库,那就可以分库分表;

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言
    友情链接