京东6.18大促主会场领京享红包更优惠

 找回密码
 立即注册

QQ登录

只需一步,快速开始

rocketmq 消息重试机制

2023-6-19 14:15| 发布者: zhaojun917| 查看: 596| 评论: 0

摘要: 一. MQ的重试机制由于MQ经常处于复杂的分布式系统中,考虑网络波动、服务宕机、程序异常因素,很有可能出现消息发送或者消费失败的问题。因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点。如果没有消息重试 ...

一. MQ的重试机制
由于MQ经常处于复杂的分布式系统中,考虑网络波动、服务宕机、程序异常因素,很有可能出现消息发送或者消费失败的问题。因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点。如果没有消息重试,就可能产生消息丢失的问题,可能对系统产生很大的影响。所以,秉承宁可多发消息,也不可丢失消息的原则,大部分MQ都对消息重试提供了很好的支持。

RocketMQ为使用者封装了消息重试的处理流程,无需开发人员手动处理。RocketMQ支持了生产端和消费端两类重试机制。

二. 生产端重试
如果由于网络抖动等原因,Producer程序向Broker发送消息时没有成功,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试。

相关API

DefaultMQProducer可以设置消息发送失败的最大重试次数,并可以结合发送的超时时间来进行重试的处理,具体API如下:

  1. //设置消息发送失败时的最大重试次数
  2. public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
  3.    this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
  4. }
  5. //同步发送消息,并指定超时时间
  6. public SendResult send(Message msg,
  7.                       long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  8.    return this.defaultMQProducerImpl.send(msg, timeout);
  9. }


因此,实现生产端的重试十分简单,例如下面的代码可以设置Producer如果在5s内没有发送成功,则重试5次:

  1. //同步发送消息,如果5秒内没有发送成功,则重试5次
  2. DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
  3. producer.setRetryTimesWhenSendFailed(5);
  4. producer.send(msg,5000L);


三. 消费端的重试

消费状态:

消费者消费消息后,需要给Broker返回消费状态。以MessageListenerConcurrently监听器为例,Consumer消费完成后需要返回ConsumeConcurrentlyStatus并发消费状态。查看源码,ConsumeConcurrentlyStatus是一个枚举,共有两种状态:

  1. public enum ConsumeConcurrentlyStatus {
  2.    //消费成功
  3.    ConsumeConcurrentlyStatus,
  4.    //消费失败,一段时间后重试
  5.    RECONSUME_LATER;
  6. }



Consumer端的重试包括两种情况

异常重试:由于Consumer端逻辑出现了异常,导致返回了RECONSUME_LATER状态,那么Broker就会在一段时间后尝试重试。
超时重试:如果Consumer端处理时间过长,或者由于某些原因线程挂起,导致迟迟没有返回消费状态,Broker就会认为Consumer消费超时,此时会发起超时重试。
因此,如果Consumer端正常消费成功,一定要返回ConsumeConcurrentlyStatus.ConsumeConcurrentlyStatus状态。

下面分别演示两种重试。

异常重试

RocketMQ可在broker.conf文件中配置Consumer端的重试次数和重试时间间隔,如下:

  1. messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
  2. 1


但是在大部分情况下,如果Consumer端逻辑出现异常,重试太多次也没有很大的意义,我们可以在代码中指定最大的重试次数。

 

但是在大部分情况下,如果Consumer端逻辑出现异常,重试太多次也没有很大的意义,我们可以在代码中指定最大的重试次数。如下:

  1. package william.rmq.consumer.quickstart;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  5. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  6. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  7. import org.apache.rocketmq.client.exception.MQClientException;
  8. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  9. import org.apache.rocketmq.common.message.MessageExt;
  10. import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
  11. import org.apache.rocketmq.remoting.common.RemotingHelper;
  12. import org.springframework.beans.factory.annotation.Value;
  13. import org.springframework.stereotype.Service;
  14. import org.springframework.util.CollectionUtils;
  15. import william.rmq.common.constant.RocketMQConstant;
  16. import javax.annotation.PostConstruct;
  17. 关闭

    站长推荐上一条 /6 下一条

    QQ|手机版|小黑屋|梦想之都-俊月星空 ( 粤ICP备18056059号 )|网站地图

    GMT+8, 2025-7-1 18:25 , Processed in 0.041629 second(s), 19 queries .

    Powered by Mxzdjyxk! X3.5

    © 2001-2025 Discuz! Team.