代码:
package com.weichai.kafka;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* Kafka生产者的简单示例
* @author lhy
* @date 2018.10.09
*/
public class SimpleProducer {
public static void main(String[] args) {
// TODO Auto-generated method stub
Properties props = new Properties();
props.setProperty("metadata.broker.list", "localhost:9092"); // 设置kafka的端口为默认端口9020
props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
//创建生产者对象
Producer<String, String> producer = new Producer<String, String>(config);
//生成消息
KeyedMessage<String, String> data = new KeyedMessage<String, String>("SimpleNode", "Kafka Simple Test");
int i = 1;
try {
while (i<100){
// 发送消息
producer.send(data);
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
producer.close();
}
}
package com.weichai.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
/**
* 消费者简单测试(单线程获取消费数据)
* @author lhy
* @date 2018.10.09
*/
public class SimpleConsumer extends Thread{
//消费者连接
private final ConsumerConnector consumer;
// 要消费的话题
private final String topic;
public SimpleConsumer(String topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
}
//配置相关信息
private static ConsumerConfig createConsumerConfig() {
// TODO Auto-generated method stub
Properties props = new Properties();
// props.put("zookeeper.connect","localhost:2181,10.XX.XX.XX:2181,10.XX.XX.XX:2181");
// 配置要连接的zookeeper地址与端口
props.put("zookeeper.connect", "localhost:2181");
// 配置zookeeper的组id (The ‘group.id’ string defines the Consumer Group
// this process is consuming on behalf of.)
props.put("group.id", "0");
// 配置zookeeper连接超时间隔
props.put("zookeeper.session.timeout.ms", "10000");
// The ‘zookeeper.sync.time.ms’ is the number of milliseconds a
// ZooKeeper ‘follower’ can be behind the master before an error occurs.
props.put("zookeeper.sync.time.ms", "200");
// The ‘auto.commit.interval.ms’ setting is how often updates to the
// consumed offsets are written to ZooKeeper.
// Note that since the commit frequency is time based instead of # of
// messages consumed, if an error occurs between updates to ZooKeeper on
// restart you will get replayed messages.
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public void run(){
Map<String, Integer> topickMap = new HashMap<String, Integer>();
topickMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = consumer.createMessageStreams(topickMap);
KafkaStream<byte[], byte[]> stream = streamMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
System.out.println("*********Kafka消费者结果********");
while(true){
if(it.hasNext()){
//打印得到的消息
System.err.println(Thread.currentThread() + " get kafka data:" + new String(it.next().message()));
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) {
// TODO Auto-generated method stub
SimpleConsumer consumerThread = new SimpleConsumer("SimpleNode");
consumerThread.start();
}
}
运行完生产者,打开Kafka-manager即可看到刚才生产者产生的Topic—-SimpleNode

运行消费者程序,单线程打印生产的topic信息,如下:

还没有评论,来说两句吧...