Streaming+Kafka消费偏移量的维护

妖狐艹你老母 2022-04-18 02:59 377阅读 0赞

如何管理kafka消费偏移量:
一、 kafka消息的位置至关重要,维护其消息偏移量对于避免消息的重复消费与遗漏消费,确保消息的Exactly-once。
kafka的消息所在的位置Topic、Partitions、Offsets三个因素决定。
Kafka消费者消费的消息位置还与consumer的group.id有关。
二、consumerOffsets与earlieastLeaderOffsets的关系
earlieastLeaderOffsets :存储在broker上的leader节点的最早的消息偏移量
consumerOffsets :消费者消费的消息偏移量位置
为了表述方便,我们记earlieastLeaderOffsets为A,记consumerOffsets为B 。
情况一:正常情况下,消费的消息偏移量应该大于broker上存储的最早的消息偏移量,即 A < B:
在这里插入图片描述
我们知道,存储在broker上的kafka的消息常设置消息过期配置,当到达过期时间时过期的消息将会被清除。

情况二:如果A 依然小于 B,则仍可以正常消费:
在这里插入图片描述

情况三:然而,当 A > B 时,则说明还没有被消费的消息已经被清除:
在这里插入图片描述
此种情况会抛出 kafka.common.OffsetOutOfRangeException 异常。
consumerOffsets 小于 earlieastLeaderOffsets的影响与解决办法

当情况三发生时,在(B,A)区间内的消息还没有被消费就已经被清除了,将导致两个后果。

  1. 消息丢失。
  2. 抛出 kafka.common.OffsetOutOfRangeException 异常。

在对消息完整性有严格要求的系统中,消息的丢失造成的影响会比较严重,所以在这种情况下,要保证消息不会遭到丢失。
避免消息丢失包含两个方面:

1、还没有被消费过的消息不会被清除。
在没有外部系统清除kafka消息的情况下,协调设置broker的最大保留大小 log.retention.bytes 和 最大保留时间log.retention.hours 等,来配合消费者端的读取消息。可以通过读取和监控消费者消费的offsets,来保证消息不会被意外清除。
2、 消费者端消费消息没有遗漏。
当消费者意外中断时,重新启动消费时能够从上一次中断的消息偏移量开始消费。
三、如何维护
在从kafka接受流式数据的时候,spark提供了两种方式,Dstream和DirectStream,在spark2.2中已经不在提供第一种方式,具体区别这儿就不再描述了,第二种方式spark是用的kafka低阶api,每个RDD对应一个topic的分区,这种情况,需要借助于外部存储来管理offset,或者简单点,自己手动利用kafka来管理offset,否则在程序重启时找不到offset从最新的开始消费,会有丢失数据的情况。一般步骤如下:

(1)在 Direct DStream初始化的时候,需要指定一个包含每个topic的每个分区的offset用于让Direct DStream从指定位置读取数据。
(2)读取并处理消息
(3)处理完之后存储结果数据
(4)最后,将offsets保存在外部持久化数据库如 HBase, Kafka, HDFS, and ZooKeeper中
四、具体实现

  1. 方法一:kafka管理offset
    Apache Spark 2.1.x以及spark-streaming-kafka-0-10使用新的的消费者API即异步提交API。你可以在你确保你处理后的数据已经妥善保存之后使用commitAsync API(异步提交 API)来向Kafka提交offsets。新的消费者API会以消费者组id作为唯一标识来提交offsets,将offsets提交到Kafka中。目前这还是实验性特性。
    stream.foreachRDD { rdd =>

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

// some time later, after outputs have completed

stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

}

  1. stream.foreachRDD { rdd =>
  2. val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  3. // some time later, after outputs have completed
  4. stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  5. }
  1. 方法二:zookeeper管理offset

在初始化 kafka stream 的时候,查看 zookeeper 中是否保存有 offset,有就从该 offset 进行读取,没有就从最新/旧进行读取。在消费 kafka 数据的同时,将每个 partition 的 offset 保存到 zookeeper 中进行备份

  1. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-streaming")
  2. val ssc = new StreamingContext(sparkConf, Seconds(10))
  3. val topic: String = "test"
  4. val kafkaParams = Map[String, Object](
  5. "bootstrap.servers" -> "master:9092",
  6. "key.deserializer" -> classOf[StringDeserializer],
  7. "value.deserializer" -> classOf[StringDeserializer],
  8. "group.id" -> "spark-streaming-group01",
  9. "auto.offset.reset" -> "earliest",
  10. "enable.auto.commit" -> (false: java.lang.Boolean)
  11. )
  12. var kafkaStream: InputDStream[ConsumerRecord[String, String]] = null
  13. val zkClient = new ZkClient("master")
  14. var fromOffsets: Map[TopicPartition, Long] = Map()
  15. val children = zkClient.countChildren("offsetDir")
  16. if (children > 0) {
  17. for (i <- 0 until children) {
  18. val partitionOffset = zkClient.readData[String]("offsetDir" + "/" + i)
  19. val tp = new TopicPartition(topic, i)
  20. fromOffsets += (tp -> partitionOffset.toLong)
  21. kafkaStream = KafkaUtils.createDirectStream[String, String](
  22. ssc, PreferConsistent, Subscribe[String, String](Set(topic), kafkaParams, fromOffsets)
  23. )
  24. }
  25. } else {
  26. kafkaStream = KafkaUtils.createDirectStream[String, String](
  27. ssc, PreferConsistent, Subscribe[String, String](Set(topic), kafkaParams)
  28. )
  29. }

上述方法二在streaming程序停止太长时间重启,kafka消息过期(设置),会造成消息丢失,部分消息没消费就被清除了,避免这种情况,所以每次重启后要拿kafka最小的offset和zookeeper里的offset比较一下。

  1. import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, TopicMetadataRequest}
  2. import kafka.common.TopicAndPartition
  3. import kafka.consumer.SimpleConsumer
  4. import kafka.message.MessageAndMetadata
  5. import kafka.serializer.StringDecoder
  6. import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
  7. import org.I0Itec.zkclient.ZkClient
  8. import org.I0Itec.zkclient.exception.ZkMarshallingError
  9. import org.I0Itec.zkclient.serialize.ZkSerializer
  10. import org.apache.spark.SparkConf
  11. import org.apache.spark.rdd.RDD
  12. import org.apache.spark.streaming.dstream.InputDStream
  13. import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
  14. import org.apache.spark.streaming.{Seconds, StreamingContext}
  15. /**
  16. * Created by grant on 2018/8/6.
  17. * * 并行度:
  18. * 1、linesDStram里面封装到的是RDD, RDD里面有partition与读取topic的parititon数是一致的。
  19. * 2、从kafka中读来的数据封装一个DStream里面,可以对这个DStream重分区 reaprtitions(numpartition
  20. */
  21. object WithOffset2 {
  22. def main(args: Array[String]): Unit = {
  23. val conf = new SparkConf().setAppName("StreamingDemoWithOffset2").setMaster("local[2]")
  24. //val DIR = "E:\\BigData\\IDEA_pro\\Learn\\resource\\"
  25. /**
  26. * 可以不设置checkpoint,因为内存中也有一份偏移量offset
  27. * 设置后如果停止程序,可以从checkpoint中读出来
  28. */
  29. val ssc = new StreamingContext(conf,batchDuration = Seconds(5))
  30. /**
  31. * topic and brokers
  32. */
  33. val topic = "user_events"
  34. val topics = Set(topic)//创建 stream 时使用的 topic 名字集合
  35. val brokers = "master:9092,worker1:9092,worker2:9092"
  36. /**
  37. * kafka查询参数
  38. */
  39. var kafkaParams = Map[String,String]()
  40. /**
  41. * Map默认是immutable包下的 定义时定义成var 使用+= ->添加元素
  42. */
  43. kafkaParams +=("auto.offset.reset" -> "smallest")
  44. kafkaParams +=("metadata.broker.list" -> brokers)
  45. kafkaParams +=("serializer.class" -> "kafka.serializer.StringEncoder")
  46. /**
  47. * 创建direct stream
  48. * String,String,StringDecoder,StringDecoder
  49. * key和value的编码格式和解码格式
  50. */
  51. val stream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
  52. /**
  53. * kafkaStream这个tuple的第二部分为接受kafka topic里的文本流
  54. */
  55. //创建一个 ZKGroupTopicDirs 对象,对保存
  56. val topicDirs = new ZKGroupTopicDirs("test_spark_streaming_group", topic)
  57. //获取 zookeeper 中的路径,这里会变成 /consumers/test_spark_streaming_group/offsets/topic_name
  58. val zkTopicPath = s"${topicDirs.consumerOffsetDir}"
  59. //zookeeper 的host 和 ip,创建一个 client
  60. //创建ZKClient,API有好几个,最后用带序列化参数的,不然保存offset的时候容易出现乱码。
  61. val zkClient = new ZkClient("master:2181",60000,60000,new ZkSerializer {
  62. override def serialize(data: Object): Array[Byte] = {
  63. try {
  64. return data.toString.getBytes("UTF-8")
  65. }catch {
  66. case e: ZkMarshallingError => return null
  67. }
  68. }
  69. override def deserialize(bytes: Array[Byte]): AnyRef = {
  70. try {
  71. return new String(bytes,"UTF-8")
  72. }catch {
  73. case e: ZkMarshallingError => return null
  74. }
  75. }
  76. })
  77. /**
  78. * 保存偏移量至zookeeper
  79. * @param zkTopicPath
  80. * @param rdd
  81. */
  82. def saveOffset(zkTopicPath: String, rdd: RDD[(String, String)]) = {
  83. val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  84. for(o <- offsetRanges){
  85. ZkUtils.updatePersistentPath(zkClient,s"${zkTopicPath}/${o.partition}",String.valueOf(o.untilOffset))
  86. }
  87. }
  88. //查询该路径下是否字节点(默认有字节点为我们自己保存不同 partition 时生成的)
  89. //查看该groupId在该topic下是否有消费记录,如果有,肯定在对应目录下会有分区数,children大于0则有记录。
  90. val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}")
  91. var kafkaStream : InputDStream[(String, String)] = null
  92. var fromOffsets: Map[TopicAndPartition, Long] = Map() //如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置
  93. //在有记录的情况下,去拿具体的offset
  94. if(children > 0) {
  95. var fromOffsets: Map[TopicAndPartition, Long] = Map()
  96. //---get partition leader begin---
  97. val topicList = List(topic)
  98. val req = new TopicMetadataRequest(topicList, 0)
  99. //得到topic的一些信息,比如broker,partition分布情况
  100. val getLeaderConsumer = new SimpleConsumer("master", 9092, 10000, 10000, "OffsetLookup")
  101. //brokerList的host、brokerList的port、过期时间、过期时间
  102. val res = getLeaderConsumer.send(req)
  103. //TopicMetadataRequest topic broker partition 的一些信息
  104. val topicMetaOption = res.topicsMetadata.headOption
  105. val partitions = topicMetaOption match {
  106. case Some(tm) => {
  107. tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String]
  108. }
  109. case None => Map[Int, String]()
  110. }
  111. for (i <- 0 until children) {
  112. val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")
  113. val tp = TopicAndPartition(topic, i)
  114. //---additional begin---
  115. val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
  116. // -2,1
  117. val consumerMin = new SimpleConsumer(partitions(i), 9092, 10000, 10000, "getMinOffset")
  118. val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
  119. var nextOffset = partitionOffset.toLong
  120. //在zookeeper里存储的offset有可能在kafka里过期了,所以要拿kafka最小的offset和zookeeper里的offset比较一下。
  121. if (curOffsets.length > 0 && nextOffset < curOffsets.head) {
  122. //如果下一个offset小于当前的offset,就把当前kafka里的偏移量更新至zookeeper
  123. nextOffset = curOffsets.head
  124. }
  125. //---additional end
  126. //将不同partition对应的offset增加到fromOffset中
  127. fromOffsets += (tp -> nextOffset)
  128. //当前topic的若干分区的偏移量
  129. println("------ topic[" + topic + "] partition[" + i + "] offset[" + partitionOffset + "] ------")
  130. }
  131. //这个会将 kafka 的消息进行 transform,最终 kafka 的数据都会变成 (topic_name, message) 这样的 tuple
  132. val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
  133. //接下来就可以创建Kafka Direct DStream了,前者是从zookeeper拿的offset,后者是直接从最新的开始(第一次消费)。
  134. kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
  135. }else{
  136. kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics)
  137. }
  138. kafkaStream.foreachRDD{rdd =>
  139. if(!rdd.isEmpty()){
  140. //doSomething
  141. rdd.foreachPartition(message =>{
  142. while (message.hasNext){
  143. println(s"@^_^@ [" + message.next() + "] @^_^@")
  144. }
  145. })
  146. saveOffset(zkTopicPath,rdd)
  147. }
  148. }
  149. ssc.start()
  150. ssc.awaitTermination()
  151. ssc.stop()
  152. }
  153. }

上述方法三即可应对消息过期等问题。

发表评论

表情:
评论列表 (有 0 条评论,377人围观)

还没有评论,来说两句吧...

相关阅读