kafka重新消费

刺骨的言语ヽ痛彻心扉 2022-03-28 04:11 441阅读 0赞
kafka重新消费的两种方式
  • 低级API
  • AUTO_OFFSET_RESET_CONFIG
方式一:低级API
  1. public class MylowerConsumer {
  2. public static void main(String[] args) {
  3. //1.brokers节点
  4. ArrayList<String> list = new ArrayList<>();
  5. list.add("hadoop102");
  6. list.add("hadoop103");
  7. list.add("hadoop105");
  8. //2.主题
  9. String topic = "first";
  10. //3.分区
  11. int partition = 0;
  12. //4.offset
  13. long offset = 0;//低级API中,通过设置偏移量进行重新读取数据,每次从偏移量位置处开始读取数据
  14. //1.获取leader
  15. String leader = getLeader(list, topic, partition);
  16. //2.获取数据
  17. getData(leader, topic, partition, offset);
  18. }
  19. private static void getData(String leader, String topic, int partition, long offset) {
  20. //1.创建SimpleConsumer
  21. SimpleConsumer consumer = new SimpleConsumer(leader,
  22. 9092,
  23. 1000,
  24. 1024 * 1024,
  25. "getData");
  26. //2.发送获取数据的请求
  27. FetchRequestBuilder builder = new FetchRequestBuilder();
  28. FetchRequest request = builder.addFetch(topic, partition, offset, 1024 * 1024).build();
  29. //3.获取响应
  30. FetchResponse response = consumer.fetch(request);
  31. //4.解析response
  32. ByteBufferMessageSet messageAndOffsets = response.messageSet(topic, partition);
  33. //5.遍历messageAndOffsets
  34. for (MessageAndOffset messageAndOffset : messageAndOffsets) {
  35. //获取offset
  36. long newOffset = messageAndOffset.offset();
  37. //获取数据
  38. Message message = messageAndOffset.message();
  39. ByteBuffer byteBuffer = message.payload();
  40. byte[] bytes = new byte[byteBuffer.limit()];
  41. byteBuffer.get(bytes);
  42. System.out.println("value:" + new String(bytes) + " offset:" + newOffset);
  43. }
  44. }
  45. private static String getLeader(ArrayList<String> list, String topic, int partition) {
  46. //1.创建SimpleConsumer
  47. for (String host : list) {
  48. SimpleConsumer consumer = new SimpleConsumer(host,
  49. 9092,
  50. 1000,
  51. 1024 * 1024,
  52. "getLeader");
  53. //2.封装获取leader的请求
  54. TopicMetadataRequest request = new TopicMetadataRequest(Arrays.asList(topic));
  55. //3.发送请求,获取相应
  56. TopicMetadataResponse metadataResponse = consumer.send(request);
  57. //4.解析相应
  58. List<TopicMetadata> topicsMetadata = metadataResponse.topicsMetadata();
  59. //5.遍历
  60. for (TopicMetadata topicMetadata : topicsMetadata) {
  61. //6.解析topicMetadata
  62. List<PartitionMetadata> partitionsMetadata = topicMetadata.partitionsMetadata();
  63. //7.遍历partitionsMetadata
  64. for (PartitionMetadata partitionMetadata : partitionsMetadata) {
  65. if (partitionMetadata.partitionId() == partition){
  66. String leader = partitionMetadata.leader().host();
  67. return leader;
  68. }
  69. }
  70. }
  71. }
  72. return null;
  73. }
  74. }
方式二:

/**
* auto.offset.reset
*/
public static final String AUTO_OFFSET_RESET_CONFIG = “auto.offset.reset”;
public static final String AUTO_OFFSET_RESET_DOC = “What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer’s group
  • anything else: throw exception to the consumer.

”;

在高级API中,设置AUTO_OFFSET_RESET_CONFIG 在设置新的组的情况下起作用或者当前offset不存在(数据过期删除或被删除)

发表评论

表情:
评论列表 (有 0 条评论,441人围观)

还没有评论,来说两句吧...

相关阅读

    相关 kafka消费

    消费者客户端使用kafkaconsumer向broker订阅topic,接收消息进行消费。kafka中消息的消费,要知晓两个紧密相关的概念:消费者consumer和消费者组co