java 读取文件内容写入kafka

た 入场券 2022-11-18 04:54 397阅读 0赞
  1. package Project;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import java.io.BufferedReader;
  5. import java.io.FileReader;
  6. import java.util.Properties;
  7. public class KafkaProducerDemo {
  8. public static void main(String[] args) throws Exception{
  9. writeToKafka("test");
  10. }
  11. //包装一个写入kafka的方法
  12. public static void writeToKafka(String topic) throws Exception{
  13. //kafka配置
  14. Properties properties = new Properties();
  15. properties.setProperty("bootstrap.servers", "localhost:9092");
  16. properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  17. properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  18. //定义Kafka Producer
  19. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  20. //用缓冲方式读取文本
  21. BufferedReader bufferedReader = new BufferedReader(new FileReader("filePath"));
  22. String line;
  23. while((line = bufferedReader.readLine())!=null){
  24. ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, line);
  25. //发送数据
  26. kafkaProducer.send(producerRecord);
  27. }
  28. kafkaProducer.close();
  29. }
  30. }

发表评论

表情:
评论列表 (有 0 条评论,397人围观)

还没有评论,来说两句吧...

相关阅读