Kafka:消费-转换-生产模式的事务

爱被打了一巴掌 2024-03-24 13:28 114阅读 0赞
  1. import org.apache.kafka.clients.consumer.*;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import org.apache.kafka.common.TopicPartition;
  6. import org.apache.kafka.common.serialization.StringDeserializer;
  7. import org.apache.kafka.common.serialization.StringSerializer;
  8. public class MyTransactional {
  9. public static KafkaProducer<String, String> getProducer() {
  10. Map<String, Object> configs = new HashMap<>();
  11. configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.25.129:9092");
  12. configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  13. configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  14. // 设置client.id
  15. configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer_01");
  16. // 设置事务id
  17. configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id_01");
  18. // 需要所有的ISR副本确认
  19. configs.put(ProducerConfig.ACKS_CONFIG, "all");
  20. // 启用幂等性
  21. configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
  22. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
  23. return producer;
  24. }
  25. public static KafkaConsumer<String, String> getConsumer(String consumerGroupId) {
  26. Map<String, Object> configs = new HashMap<>();
  27. configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.25.129:9092");
  28. configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  29. configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  30. // 设置消费组ID
  31. configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_grp_01");
  32. // 不启用消费者偏移量的自动确认,也不要手动确认
  33. configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  34. configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_client_01");
  35. configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  36. // 只读取已提交的消息
  37. // 消费-转换-生产 模式中生产者下游的消费者开启read_committed,默认为read_uncommitted
  38. // configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
  39. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
  40. return consumer;
  41. }
  42. public static void main(String[] args) {
  43. String consumerGroupId = "consumer_grp_id_01";
  44. KafkaProducer<String, String> producer = getProducer();
  45. KafkaConsumer<String, String> consumer = getConsumer(consumerGroupId);
  46. // 事务的初始化
  47. producer.initTransactions();
  48. //订阅主题
  49. consumer.subscribe(Collections.singleton("tp_tx_01"));
  50. final ConsumerRecords<String, String> records = consumer.poll(1_000);
  51. // 开启事务
  52. producer.beginTransaction();
  53. try {
  54. Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
  55. for (ConsumerRecord<String, String> record : records) {
  56. System.out.println(record);
  57. producer.send(new ProducerRecord<String, String>("tp_tx_out_01", record.key(), record.value()));
  58. offsets.put(
  59. new TopicPartition(record.topic(), record.partition()),
  60. new OffsetAndMetadata(record.offset() + 1)); // 偏移量表示下一条要消费的消息
  61. }
  62. // 将该消息的偏移量提交作为事务的一部分,随事务提交和回滚(不提交消费偏移量)
  63. producer.sendOffsetsToTransaction(offsets, consumerGroupId);
  64. // int i = 1 / 0;
  65. // 提交事务
  66. producer.commitTransaction();
  67. } catch (Exception e) {
  68. e.printStackTrace();
  69. // 回滚事务
  70. producer.abortTransaction();
  71. } finally {
  72. // 关闭资源
  73. producer.close();
  74. consumer.close();
  75. }
  76. }
  77. }

发表评论

表情:
评论列表 (有 0 条评论,114人围观)

还没有评论,来说两句吧...

相关阅读