package Project;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
public class KafkaProducerDemo {
public static void main(String[] args) throws Exception{
writeToKafka("test");
}
//包装一个写入kafka的方法
public static void writeToKafka(String topic) throws Exception{
//kafka配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//定义Kafka Producer
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//用缓冲方式读取文本
BufferedReader bufferedReader = new BufferedReader(new FileReader("filePath"));
String line;
while((line = bufferedReader.readLine())!=null){
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, line);
//发送数据
kafkaProducer.send(producerRecord);
}
kafkaProducer.close();
}
}
还没有评论,来说两句吧...