Kafka:生产者发送消息数据去重

你的名字 2024-03-25 21:08 140阅读 0赞
幂等性

开启参数 enable.idempotence 默认为 true,false 关闭。

生产者事务

开启事务,必须开启幂等性。

Kafka 的事务一共有如下 5 个 API

  1. // 1 初始化事务
  2. void initTransactions();
  3. // 2 开启事务
  4. void beginTransaction() throws ProducerFencedException;
  5. // 3 在事务内提交已经消费的偏移量(主要用于消费者)
  6. void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;
  7. // 4 提交事务
  8. void commitTransaction() throws ProducerFencedException;
  9. // 5 放弃事务(类似于回滚事务的操作)
  10. void abortTransaction() throws ProducerFencedException;
  11. import org.apache.kafka.clients.producer.KafkaProducer;
  12. import org.apache.kafka.clients.producer.ProducerConfig;
  13. import org.apache.kafka.clients.producer.ProducerRecord;
  14. import org.apache.kafka.common.serialization.StringSerializer;
  15. import java.util.Properties;
  16. public class CustomProducerTranactions {
  17. public static void main(String[] args) {
  18. // 0 配置
  19. Properties properties = new Properties();
  20. // 连接集群 bootstrap.servers
  21. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.25.129:9092,192.168.25.128:9092");
  22. // 指定对应的key和value的序列化类型 key.serializer
  23. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  24. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  25. // 设置事务 id(必须),事务 id 任意起名 "tranactional_id_01"随意确保全局唯一即可
  26. properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranactional_id_01");
  27. // 1 创建kafka生产者对象
  28. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  29. kafkaProducer.initTransactions();
  30. kafkaProducer.beginTransaction();
  31. try {
  32. // 2 发送数据
  33. for (int i = 0; i < 5; i++) {
  34. kafkaProducer.send(new ProducerRecord<>("test", "hello" + i));
  35. }
  36. int i = 1 / 0;
  37. kafkaProducer.commitTransaction();
  38. } catch (Exception e) {
  39. kafkaProducer.abortTransaction();
  40. } finally {
  41. // 3 关闭资源
  42. kafkaProducer.close();
  43. }
  44. }
  45. }

发表评论

表情:
评论列表 (有 0 条评论,140人围观)

还没有评论,来说两句吧...

相关阅读

    相关 kafka生产者发送消息

    本文简单介绍kafka发送消息一些基础,先上代码,复制粘贴然后根据自己情况改一下ip地址,可直接发消息!!!贼强!!! package producer;