Kafka的单线程生产消费测试

悠悠 2022-05-10 15:06 281阅读 0赞

代码:

  1. package com.weichai.kafka;
  2. import java.util.Properties;
  3. import kafka.javaapi.producer.Producer;
  4. import kafka.producer.KeyedMessage;
  5. import kafka.producer.ProducerConfig;
  6. /**
  7. * Kafka生产者的简单示例
  8. * @author lhy
  9. * @date 2018.10.09
  10. */
  11. public class SimpleProducer {
  12. public static void main(String[] args) {
  13. // TODO Auto-generated method stub
  14. Properties props = new Properties();
  15. props.setProperty("metadata.broker.list", "localhost:9092"); // 设置kafka的端口为默认端口9020
  16. props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
  17. props.put("request.required.acks", "1");
  18. ProducerConfig config = new ProducerConfig(props);
  19. //创建生产者对象
  20. Producer<String, String> producer = new Producer<String, String>(config);
  21. //生成消息
  22. KeyedMessage<String, String> data = new KeyedMessage<String, String>("SimpleNode", "Kafka Simple Test");
  23. int i = 1;
  24. try {
  25. while (i<100){
  26. // 发送消息
  27. producer.send(data);
  28. }
  29. } catch (Exception e) {
  30. // TODO Auto-generated catch block
  31. e.printStackTrace();
  32. }
  33. producer.close();
  34. }
  35. }
  36. package com.weichai.kafka;
  37. import java.util.HashMap;
  38. import java.util.List;
  39. import java.util.Map;
  40. import java.util.Properties;
  41. import kafka.consumer.ConsumerConfig;
  42. import kafka.consumer.ConsumerIterator;
  43. import kafka.consumer.KafkaStream;
  44. import kafka.javaapi.consumer.ConsumerConnector;
  45. /**
  46. * 消费者简单测试(单线程获取消费数据)
  47. * @author lhy
  48. * @date 2018.10.09
  49. */
  50. public class SimpleConsumer extends Thread{
  51. //消费者连接
  52. private final ConsumerConnector consumer;
  53. // 要消费的话题
  54. private final String topic;
  55. public SimpleConsumer(String topic) {
  56. consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
  57. this.topic = topic;
  58. }
  59. //配置相关信息
  60. private static ConsumerConfig createConsumerConfig() {
  61. // TODO Auto-generated method stub
  62. Properties props = new Properties();
  63. // props.put("zookeeper.connect","localhost:2181,10.XX.XX.XX:2181,10.XX.XX.XX:2181");
  64. // 配置要连接的zookeeper地址与端口
  65. props.put("zookeeper.connect", "localhost:2181");
  66. // 配置zookeeper的组id (The ‘group.id’ string defines the Consumer Group
  67. // this process is consuming on behalf of.)
  68. props.put("group.id", "0");
  69. // 配置zookeeper连接超时间隔
  70. props.put("zookeeper.session.timeout.ms", "10000");
  71. // The ‘zookeeper.sync.time.ms’ is the number of milliseconds a
  72. // ZooKeeper ‘follower’ can be behind the master before an error occurs.
  73. props.put("zookeeper.sync.time.ms", "200");
  74. // The ‘auto.commit.interval.ms’ setting is how often updates to the
  75. // consumed offsets are written to ZooKeeper.
  76. // Note that since the commit frequency is time based instead of # of
  77. // messages consumed, if an error occurs between updates to ZooKeeper on
  78. // restart you will get replayed messages.
  79. props.put("auto.commit.interval.ms", "1000");
  80. return new ConsumerConfig(props);
  81. }
  82. public void run(){
  83. Map<String, Integer> topickMap = new HashMap<String, Integer>();
  84. topickMap.put(topic, 1);
  85. Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = consumer.createMessageStreams(topickMap);
  86. KafkaStream<byte[], byte[]> stream = streamMap.get(topic).get(0);
  87. ConsumerIterator<byte[], byte[]> it = stream.iterator();
  88. System.out.println("*********Kafka消费者结果********");
  89. while(true){
  90. if(it.hasNext()){
  91. //打印得到的消息
  92. System.err.println(Thread.currentThread() + " get kafka data:" + new String(it.next().message()));
  93. }
  94. try {
  95. Thread.sleep(1000);
  96. } catch (InterruptedException e) {
  97. // TODO Auto-generated catch block
  98. e.printStackTrace();
  99. }
  100. }
  101. }
  102. public static void main(String[] args) {
  103. // TODO Auto-generated method stub
  104. SimpleConsumer consumerThread = new SimpleConsumer("SimpleNode");
  105. consumerThread.start();
  106. }
  107. }

运行完生产者,打开Kafka-manager即可看到刚才生产者产生的Topic—-SimpleNode

70

运行消费者程序,单线程打印生产的topic信息,如下:

70 1

发表评论

表情:
评论列表 (有 0 条评论,281人围观)

还没有评论,来说两句吧...

相关阅读