rocketmq-spring的consumer设置消费失败最大重试次数

分手后的思念是犯贱 2022-10-07 13:50 991阅读 0赞

说明

rocketmq-spring的consumer的相关属性配置有两种方式:

  1. 在配置文件中进行配置
  2. 类上使用@RocketMQMessageListener注解配置相关属性

关于注解中的属性可以查看:org.apache.rocketmq.spring.annotation.RocketMQMessageListener,而在文件中可以配置的属性只有如下几个(并不遵守spring boot自动配置规范,所以在idea中不会有相关提示)

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3g3NjM3OTUxNTE_size_16_color_FFFFFF_t_70

说明如下:






























配置项

说明

rocketmq.name-server rocketmq的name server地址,格式:主机:端口;主机:端口,多个地址以英文分号分隔
rocketmq.consumer.secret-key  ACL的secret-key属性
rocketmq.consumer.access-key ACL的access-key属性
  1. rocketmq.consumer.customized-trace-topic
自定义消费轨迹topic,不使用忽略
  1. rocketmq.access-channe
枚举类型,值为:【LOCAL, CLOUD】,值为CLOUD表示设置接入阿里云。忽略。

如果想要设置最大重试次数等一些相关初始化参数配置,很明显是不支持的。

同时,看一下构造consumer的源码,可以看到只配置了固定的几个属性:

  1. private void initRocketMQPushConsumer() throws MQClientException {
  2. RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
  3. this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());
  4. boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();
  5. if (Objects.nonNull(rpcHook)) {
  6. consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
  7. enableMsgTrace, this.applicationContext.getEnvironment().
  8. resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
  9. consumer.setVipChannelEnabled(false);
  10. consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));
  11. } else {
  12. log.debug("Access-key or secret-key not configure in " + this + ".");
  13. consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
  14. this.applicationContext.getEnvironment().
  15. resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
  16. }
  17. String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
  18. if (customizedNameServer != null) {
  19. consumer.setNamesrvAddr(customizedNameServer);
  20. } else {
  21. consumer.setNamesrvAddr(nameServer);
  22. }
  23. if (accessChannel != null) {
  24. consumer.setAccessChannel(accessChannel);
  25. }
  26. consumer.setConsumeThreadMax(consumeThreadMax);
  27. if (consumeThreadMax < consumer.getConsumeThreadMin()) {
  28. consumer.setConsumeThreadMin(consumeThreadMax);
  29. }
  30. consumer.setConsumeTimeout(consumeTimeout);
  31. switch (messageModel) {
  32. case BROADCASTING:
  33. consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
  34. break;
  35. case CLUSTERING:
  36. consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
  37. break;
  38. default:
  39. throw new IllegalArgumentException("Property 'messageModel' was wrong.");
  40. }
  41. switch (selectorType) {
  42. case TAG:
  43. consumer.subscribe(topic, selectorExpression);
  44. break;
  45. case SQL92:
  46. consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
  47. break;
  48. default:
  49. throw new IllegalArgumentException("Property 'selectorType' was wrong.");
  50. }
  51. switch (consumeMode) {
  52. case ORDERLY:
  53. consumer.setMessageListener(new DefaultMessageListenerOrderly());
  54. break;
  55. case CONCURRENTLY:
  56. consumer.setMessageListener(new DefaultMessageListenerConcurrently());
  57. break;
  58. default:
  59. throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
  60. }
  61. if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
  62. ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
  63. } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
  64. ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
  65. }
  66. }

但是看代码的最后几行,rocketMQListener如果实现了RocketMQPushConsumerLifecycleListener接口,则会调用RocketMQPushConsumerLifecycleListener的prepareStart(consumer)方法,很明显,可以在这里设置consuemr的参数。

说明:rocketMQListener就是类上带有RocketMQMessageListener的bean。

解决方案

  1. @RocketMQMessageListener(topic = "test_topic", consumerGroup = "test_topic_consumer", selectorExpression = "*")
  2. class StringConsumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
  3. @Override
  4. public void onMessage(String message) {
  5. LOGGER.info("receive message: {}", message);
  6. }
  7. @Override public void prepareStart(DefaultMQPushConsumer consumer) {
  8. // 设置最大重试次数
  9. consumer.setMaxReconsumeTimes(5);
  10. // 如下,设置其它consumer相关属性
  11. consumer.setPullBatchSize(16);
  12. }
  13. }

末语

我是在翻源码的才想到这个解决方案,我想既然提供有这个接口进行自定义配置,官方文档应该会有示例说明,然后翻了下github,是有类似的使用方式的,源码上还有其它示例,如果有其它问题,建议还是先看官方示例是否提供了相关解决方案。github地址:https://github.com/apache/rocketmq-spring/tree/master/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer

发表评论

表情:
评论列表 (有 0 条评论,991人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Kafka消费失败处理

    默认情况下,Spring-Kafka 达到配置的重试次数时,【每条消息的失败重试时间,由配置的时间隔决定】Consumer 如果依然消费失败 ,那么该消息就会进入到死信队列。