kafka重复消费问题

布满荆棘的人生 2022-05-03 23:54 372阅读 0赞

开篇提示:kafka重复消费的根本原因就是“数据消费了,但是offset没更新”!而我们要探究一般什么情况下会导致offset没更新?

今天查看Elasticsearch索引的时候发现有一个索引莫名的多了20w+的数据,顿时心里一阵惊讶,然后赶紧打开订阅服务的日志(消费者),眼前的一幕让我惊呆了,我的消费服务的控制台一直在不断的刷着消费日志(刚开始我并没有意识到这是重复消费造成的),我还傻傻的以为是因为今天有人在刷单,所以导致日志狂刷,毕竟之前也遇到过有人用自动交易软件疯狂刷单的,所以当时也没在意;等过了几分钟,又去瞅了一眼控制台仍然在疯狂的刷着日志,妈呀!顿时隐隐感觉不对劲,赶紧看了一眼es索引,我滴天一下子多了几万的数据,突然在想是不是程序出问题了(因为头一天晚上发了一个版本),然后就开始死盯这日志看,发现了一个奇葩的问题:tmd怎么日志打印的数据都是重复的呀!这才恍然大悟,不用想了绝逼是kakfa重复消费了,好吧!能有什么办法了,开始疯狂的寻找解决的办法……

既然之前没有问题,那就是我昨天发版所导致的,那么我昨天究竟改了什么配置呢?对照了之前的版本比较了一下,发现这个参数enable-auto-commit被改成了true,即自动提交,理论上在数据并发不大,以及数据处理不耗时的情况下设置自动提交是没有什么问题的,但是我的情况恰恰相反,可能突然会并发很大(毕竟交易流水不好说的),所以可能在规定的时间(session.time.out默认30s)内没有消费完,就会可能导致re-blance重平衡,导致一部分offset自动提交失败,然后重平衡后重复消费(这种很常见);或者关闭kafka时,如果在close之前,调用consumer.unsubscribe()则可能有部分offset没提交,下次重启会重复消费

try { consumer.unsubscribe(); } catch (Exception e) { } try { consumer.close(); } catch (Exception e) { }

所以一般情况下我们设置offset自动提交为false!

解决方法:

1.设置

  1. spring.kafka.consumer.enable-auto-commit=false
  2. spring.kafka.consumer.auto-offset-reset=latest

2.就是修改offset为最新的偏移量呗!我们都知道offset是存在zookeeper中的,所以我就不赘述了!

我的解决方法:

我并没有去修改offset偏移量,毕竟生产环境还是不直接改这个了;

我重新指定了一个消费组(group.id=order_consumer_group),然后指定auto-offset-reset=latest这样我就只需要重启我的服务了,而不需要动kafka和zookeeper了!

  1. #consumer
  2. spring.kafka.consumer.group-id=order_consumer_group
  3. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  4. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  5. spring.kafka.consumer.enable-auto-commit=false
  6. spring.kafka.consumer.auto-offset-reset=latest

注:如果你想要消费者从头开始消费某个topic的全量数据,可以重新指定一个全新的group.id=new_group,然后指定auto-offset-reset=earliest即可

补充:

在kafka0.9.0版本的时候,开始启用了新的consumer config,这个新的consumer config采用bootstrap.servers替代之前版本的zookeeper.connect,主要是要渐渐弱化zk的依赖,把zk依赖隐藏到broker背后。
group coordinator
使用bootstrap.servers替代之前版本的zookeeper.connect,相关的有如下两个改动:

  1. 1. Server 端增加了 GroupCoordinator 这个角色
  2. 2. topic offset 信息由之前存储在 zookeeper(/consumers/<group.id>/offsets/<topic>/<partitionId>,zk写操作性能不高) 上改为存储到一个特殊的 topic 中(__consumer_offsets

从0.8.2版本开始Kafka开始支持将consumer的位移信息保存在Kafka内部的topic中(从0.9.0版本开始默认将offset存储到系统topic中)
Coordinator一般指的是运行在broker上的group Coordinator,用于管理Consumer Group中各个成员,每个KafkaServer都有一个GroupCoordinator实例,管理多个消费者组,主要用于offset位移管理和Consumer Rebalance。
rebalance时机
在如下条件下,partition要在consumer中重新分配:

  1. 条件1:有新的consumer加入
  2. 条件2:旧的consumer挂了
  3. 条件3coordinator挂了,集群选举出新的coordinator
  4. 条件4topicpartition新加
  5. 条件5consumer调用unsubscrible(),取消topic的订阅

__consumer_offsets
Consumer通过发送OffsetCommitRequest请求到指定broker(偏移量管理者)提交偏移量。这个请求中包含一系列分区以及在这些分区中的消费位置(偏移量)。偏移量管理者会追加键值(key-value)形式的消息到一个指定的topic(__consumer_offsets)。key是由consumerGroup-topic-partition组成的,而value是偏移量。

参考:https://segmentfault.com/a/1190000011441747

发表评论

表情:
评论列表 (有 0 条评论,372人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Kafka 心跳机制 重复消费

    kafka 心跳机制 Kafka是通过心跳机制来控制消费超时,心跳机制对于消费者客户端来说是无感的,它是一个异步线程,当我们启动一个消费者实例时,心跳线程就开始工作了。心

    相关 Kafka重复消费数据问题

      kafka重复消费的问题,主要的原因还是在指定的时间内,没有进行kafka的位移提交,导致根据上一次的位移重新poll出新的数据,而这个数据就是上一次没有消费处理完全的(

    相关 kafka-重复消费问题分析

    问题 我们的系统在全量初始化数据时,需要大量发送消息到kafka,观察发现消费者出现重复消费,甚至出现持续消费但队列里的消息却未见减少的情况。虽然consumer的处理逻辑已