Kafka 心跳机制 重复消费

超、凢脫俗 2022-12-15 14:25 461阅读 0赞

kafka 心跳机制

Kafka是通过心跳机制来控制消费超时,心跳机制对于消费者客户端来说是无感的,它是一个异步线程,当我们启动一个消费者实例时,心跳线程就开始工作了。心跳超时会导致消息重复消费

在org.apache.kafka.clients.consumer.internals.AbstractCoordinator中会启动一个HeartbeatThread线程来定时发送心跳和检测消费者的状态。每个消费者都有个org.apache.kafka.clients.consumer.internals.ConsumerCoordinator,而每个ConsumerCoordinator都会启动一个HeartbeatThread线程来维护心跳,心跳信息存放在org.apache.kafka.clients.consumer.internals.Heartbeat中,声明的Schema如下所示:

  1. private final int sessionTimeoutMs;
  2. private final int heartbeatIntervalMs;
  3. private final int maxPollIntervalMs;
  4. private final long retryBackoffMs;
  5. private volatile long lastHeartbeatSend;
  6. private long lastHeartbeatReceive;
  7. private long lastSessionReset;
  8. private long lastPoll;
  9. private boolean heartbeatFailed;

心跳线程实现方法

  1. public void run() {
  2. try {
  3. log.debug("Heartbeat thread started");
  4. while (true) {
  5. synchronized (AbstractCoordinator.this) {
  6. if (closed)
  7. return;
  8. if (!enabled) {
  9. AbstractCoordinator.this.wait();
  10. continue;
  11. }
  12. if (state != MemberState.STABLE) {
  13. // the group is not stable (perhaps because we left the group or because the coordinator
  14. // kicked us out), so disable heartbeats and wait for the main thread to rejoin.
  15. disable();
  16. continue;
  17. }
  18. client.pollNoWakeup();
  19. long now = time.milliseconds();
  20. if (coordinatorUnknown()) {
  21. if (findCoordinatorFuture != null || lookupCoordinator().failed())
  22. // the immediate future check ensures that we backoff properly in the case that no
  23. // brokers are available to connect to.
  24. AbstractCoordinator.this.wait(retryBackoffMs);
  25. } else if (heartbeat.sessionTimeoutExpired(now)) {
  26. // the session timeout has expired without seeing a successful heartbeat, so we should
  27. // probably make sure the coordinator is still healthy.
  28. markCoordinatorUnknown();
  29. } else if (heartbeat.pollTimeoutExpired(now)) {
  30. // the poll timeout has expired, which means that the foreground thread has stalled
  31. // in between calls to poll(), so we explicitly leave the group.
  32. maybeLeaveGroup();
  33. } else if (!heartbeat.shouldHeartbeat(now)) {
  34. // poll again after waiting for the retry backoff in case the heartbeat failed or the
  35. // coordinator disconnected
  36. AbstractCoordinator.this.wait(retryBackoffMs);
  37. } else {
  38. heartbeat.sentHeartbeat(now);
  39. sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {
  40. @Override
  41. public void onSuccess(Void value) {
  42. synchronized (AbstractCoordinator.this) {
  43. heartbeat.receiveHeartbeat(time.milliseconds());
  44. }
  45. }
  46. @Override
  47. public void onFailure(RuntimeException e) {
  48. synchronized (AbstractCoordinator.this) {
  49. if (e instanceof RebalanceInProgressException) {
  50. // it is valid to continue heartbeating while the group is rebalancing. This
  51. // ensures that the coordinator keeps the member in the group for as long
  52. // as the duration of the rebalance timeout. If we stop sending heartbeats,
  53. // however, then the session timeout may expire before we can rejoin.
  54. heartbeat.receiveHeartbeat(time.milliseconds());
  55. } else {
  56. heartbeat.failHeartbeat();
  57. // wake up the thread if it's sleeping to reschedule the heartbeat
  58. AbstractCoordinator.this.notify();
  59. }
  60. }
  61. }
  62. });
  63. }
  64. }
  65. }
  66. } catch (AuthenticationException e) {
  67. log.error("An authentication error occurred in the heartbeat thread", e);
  68. this.failed.set(e);
  69. } catch (GroupAuthorizationException e) {
  70. log.error("A group authorization error occurred in the heartbeat thread", e);
  71. this.failed.set(e);
  72. } catch (InterruptedException | InterruptException e) {
  73. Thread.interrupted();
  74. log.error("Unexpected interrupt received in heartbeat thread", e);
  75. this.failed.set(new RuntimeException(e));
  76. } catch (Throwable e) {
  77. log.error("Heartbeat thread failed due to unexpected error", e);
  78. if (e instanceof RuntimeException)
  79. this.failed.set((RuntimeException) e);
  80. else
  81. this.failed.set(new RuntimeException(e));
  82. } finally {
  83. log.debug("Heartbeat thread has closed");
  84. }
  85. }

在心跳线程中这里面包含两个最重要的超时函数,分别是sessionTimeoutExpired() 和 pollTimeoutExpired()。

  1. public boolean sessionTimeoutExpired(long now) {
  2. return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeoutMs;
  3. }
  4. public boolean pollTimeoutExpired(long now) {
  5. return now - lastPoll > maxPollIntervalMs;}
  • sessionTimeoutExpired

如果sessionTimeout超时,则会被标记为当前协调器处理断开, 即将将消费者移除,重新分配分区和消费者的对应关系。在Kafka Broker Server中,Consumer Group定义了5中(如果算上Unknown,应该是6种状态)状态,org.apache.kafka.common.ConsumerGroupState,如下图所示:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3UwMTA4OTk5ODU_size_16_color_FFFFFF_t_70

  • pollTimeoutExpired

如果触发了poll超时,此时消费者客户端会退出ConsumerGroup,当再次poll的时候,会重新加入到ConsumerGroup,触发消费者再平衡策略 RebalanceGroup。而KafkaConsumer Client是不会帮我们重复poll的,需要我们自己在实现的消费逻辑中不停的调用poll方法

Kafka消费者负载均衡策略 消费者再平衡 consumer rebalance

Kafka 如何解决消息重复

发表评论

表情:
评论列表 (有 0 条评论,461人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Kafka 心跳机制 重复消费

    kafka 心跳机制 Kafka是通过心跳机制来控制消费超时,心跳机制对于消费者客户端来说是无感的,它是一个异步线程,当我们启动一个消费者实例时,心跳线程就开始工作了。心

    相关 Kafka重复消费数据问题

      kafka重复消费的问题,主要的原因还是在指定的时间内,没有进行kafka的位移提交,导致根据上一次的位移重新poll出新的数据,而这个数据就是上一次没有消费处理完全的(

    相关 kafka-重复消费问题分析

    问题 我们的系统在全量初始化数据时,需要大量发送消息到kafka,观察发现消费者出现重复消费,甚至出现持续消费但队列里的消息却未见减少的情况。虽然consumer的处理逻辑已