第二章 kafka进阶笔记

淩亂°似流年 2023-06-09 04:57 84阅读 0赞

一、第一个Kafka程序

1、创建我们的主题

创建主题命令如下:kafka-topics.bat —zookeeper localhost:2181/kafka —create —topic hello-kafka —replication-factor 1 —partitions 4

2、生产者发送消息

我们这里使用Kafka内置的客户端API开发kafka应用程序。因为我们是Java程序员,所以这里我们使用Maven,使用最新版本。

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>

生产者代码示例如下:

  1. public class HelloKafkaProducer {
  2. public static void main(String[] args) {
  3. //TODO 生产者三个属性必须指定(broker地址清单、key和value的序列化器)
  4. Properties properties = new Properties();
  5. properties.put("bootstrap.servers","127.0.0.1:9092");
  6. properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  7. properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  8. KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
  9. try {
  10. ProducerRecord<String,String> record;
  11. try {
  12. //TODO 发送4条消息
  13. for(int i=0;i<4;i++){
  14. record = new ProducerRecord<String,String>(BusiConst.HELLO_TOPIC, String.valueOf(i),"lison");
  15. producer.send(record);
  16. System.out.println(i+",message is sent");
  17. }
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. }
  21. } finally {
  22. producer.close();
  23. }
  24. }
  25. }

必选属性:

创建生产者对象时有三个属性必须指定。

bootstrap.servers**:**

该属性指定broker的地址清单,地址的格式为host:port。清单里不需要包含所有的broker地址,生产者会从给定的broker里查询其他broker的信息。

不过最少提供2个broker的信息(用逗号分隔,比如: 127.0.0.1:9092,192.168.0.13:9092),一旦其中一个宕机,生产者仍能连接到集群上。

key.serializer**:**

生产者接口允许使用参数化类型,可以把Java对象作为键和值传broker,但是broker希望收到的消息的键和值都是字节数组,所以,必须提供将对象序列化成字节数组的序列化器。key.serializer必须设置为实现org.apache.kafka.common.serialization.Serializer的接口类,Kafka的客户端默认提供了ByteArraySerializer、IntegerSerializer、StringSerializer,也可以实现自定义的序列化器。

value.serializer**:**

同key.serializer,参见代码模块kafka-no-spring下包hellokafka中

properties.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
properties.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);

3、消费者接受消息

消费者代码示例如下(Kafka 只提供拉取的方式)

  1. public class HelloKafkaConsumer {
  2. public static void main(String[] args) {
  3. //TODO 消费者三个属性必须指定(broker地址清单、key和value的反序列化器)
  4. Properties properties = new Properties();
  5. properties.put("bootstrap.servers","127.0.0.1:9092");
  6. properties.put("key.deserializer", StringDeserializer.class);
  7. properties.put("value.deserializer", StringDeserializer.class);
  8. //TODO 群组并非完全必须
  9. properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
  10. KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
  11. try {
  12. //TODO 消费者订阅主题(可以多个)
  13. consumer.subscribe(Collections.singletonList(BusiConst.HELLO_TOPIC));
  14. while(true){
  15. //TODO 拉取(新版本)
  16. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
  17. for(ConsumerRecord<String, String> record:records){
  18. System.out.println(String.format("topic:%s,分区:%d,偏移量:%d," + "key:%s,value:%s",record.topic(),record.partition(),
  19. record.offset(),record.key(),record.value()));
  20. //do my work
  21. //打包任务投入线程池
  22. }
  23. }
  24. } finally {
  25. consumer.close();
  26. }
  27. }
  28. }

必选参数:

bootstrap.servers、key.serializer、value.serializer含义同生产者

group.id**:**

并非完全必需,它指定了消费者属于哪一个群组,但是创建不属于任何一个群组的消费者并没有问题。

参见代码,模块kafka-no-spring下包hellokafka中,新版本特点:poll(Duration)这个版本修改了这样的设计,会把元数据获取也计入整个超时时间(更加的合理)。

  1. public static void main(String[] args) {
  2. /*消费配置的实例*/
  3. Properties properties = KafkaConst.consumerConfig("groupC",StringDeserializer.class, StringDeserializer.class);
  4. /*消息消费者*/
  5. consumer = new KafkaConsumer<String, String>(properties);
  6. try {
  7. consumer.subscribe(Collections.singletonList(BusiConst.CONSUMER_GROUP_TOPIC));
  8. consumer.poll(0);
  9. while(true){
  10. ConsumerRecords<String, String> records = consumer.poll(500);//5秒 50000
  11. for(ConsumerRecord<String, String> record:records){
  12. System.out.println(String.format( "主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
  13. record.topic(),record.partition(),record.offset(), record.key(),record.value()));
  14. //do our work
  15. }
  16. }
  17. } finally {
  18. consumer.close();
  19. }
  20. }

4、演示示例

1)默认创建主题,只有一个分区时,演示生产者和消费者情况。

2)修改主题分区为2(使用管理命令),再重新演示生产者和消费者情况。

二、Kafka的生产者

1、生产者发送消息的基本流程

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L20wXzM3NjYxNDU4_size_16_color_FFFFFF_t_70

从创建一个ProducerRecord对象开始,ProducerRecord对象需要包含目标主题和要发送的内容。我们还可以指定键或分区,在发送ProducerRecord对象时,生产者要先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。

接下来,数据被传给分区器,如果之前在ProducerRecord对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区,那么分区器会根据ProducerRecord对象的键来选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条记录了。紧接着,这条记录被添加到一个记录批次里(双端队列,尾部写入),这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的broker上。

服务器在收到这些消息时会返回一个响应。如果消息成功写入Kafka,就返回一个RecordMetaData对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息。

生产者发送消息一般会发生两类错误:

一类是可重试错误,比如连接错误(可通过再次建立连接解决)、无主no leader(可通过分区重新选举首领解决)。

另一类是无法通过重试解决,比如“消息太大”异常,具体见message.max.bytes,这类消息不会进行任何重试,直接抛出异常。

2、使用Kafka生产者

三种发送方式,我们通过生成者的send方法进行发送。send方法会返回一个包含RecordMetadata的Future对象。RecordMetadata里包含了目标主题,分区信息和消息的偏移量。

2.1、发送并忘记

忽略send方法的返回值,不做任何处理。大多数情况下,消息会正常到达,而且生产者会自动重试,但有时会丢失消息。

2.2、同步发送

获得send方法返回的Future对象,在合适的时候调用Future的get方法。参见代码,模块kafka-no-spring下包sendtype中。

  1. public class KafkaFutureProducer {
  2. private static KafkaProducer<String,String> producer = null;
  3. public static void main(String[] args) {
  4. // 消息生产者
  5. producer = new KafkaProducer<String, String>(KafkaConst.producerConfig(StringSerializer.class, StringSerializer.class));
  6. try {/*待发送的消息实例*/
  7. ProducerRecord<String,String> record;
  8. try {
  9. record = new ProducerRecord<String,String>(BusiConst.HELLO_TOPIC,"test msg","chj");
  10. Future<RecordMetadata> future = producer.send(record);
  11. System.out.println("do other sth");
  12. RecordMetadata recordMetadata = future.get();
  13. if(null!=recordMetadata){
  14. System.out.println("offset:"+recordMetadata.offset()+"-" +"partition:"+recordMetadata.partition());
  15. }
  16. } catch (Exception e) {
  17. e.printStackTrace();
  18. }
  19. } finally {
  20. producer.close();
  21. }
  22. }
  23. }

2.3、异步发送

实现接口org.apache.kafka.clients.producer.Callback,然后将实现类的实例作为参数传递给send方法。参见代码,模块kafka-no-spring下包sendtype中。

  1. public class KafkaAsynProducer {
  2. private static KafkaProducer<String,String> producer = null;
  3. public static void main(String[] args) {
  4. // 消息生产者
  5. producer = new KafkaProducer<String, String>(KafkaConst.producerConfig(StringSerializer.class,StringSerializer.class));
  6. /*待发送的消息实例*/
  7. ProducerRecord<String,String> record;
  8. try {
  9. record = new ProducerRecord<String,String>(BusiConst.HELLO_TOPIC,"mesage02","hankin");
  10. producer.send(record, new Callback() {
  11. public void onCompletion(RecordMetadata metadata,Exception exception) {
  12. if(null!=exception){
  13. exception.printStackTrace();
  14. }
  15. if(null!=metadata){
  16. System.out.println("offset:"+metadata.offset()+"-" +"partition:"+metadata.partition());
  17. }
  18. }
  19. });
  20. } finally {
  21. producer.close();
  22. }
  23. }
  24. }

2.4、多线程下的生产者

KafkaProducer的实现是线程安全的,所以我们可以在多线程的环境下,安全的使用KafkaProducer的实例,如何节约资源的使用呢?参见代码,模块kafka-no-spring下包concurrent中:

多线程下使用生产者:

  1. public class KafkaConProducer {
  2. //发送消息的个数
  3. private static final int MSG_SIZE = 1000;
  4. //负责发送消息的线程池
  5. private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
  6. private static CountDownLatch countDownLatch = new CountDownLatch(MSG_SIZE);
  7. private static DemoUser makeUser(int id){
  8. DemoUser demoUser = new DemoUser(id);
  9. String userName = "xiangxue_"+id;
  10. demoUser.setName(userName);
  11. return demoUser;
  12. }
  13. /**
  14. * 发送消息的任务
  15. */
  16. private static class ProduceWorker implements Runnable{
  17. private ProducerRecord<String,String> record;
  18. private KafkaProducer<String,String> producer;
  19. public ProduceWorker(ProducerRecord<String, String> record,KafkaProducer<String, String> producer) {
  20. this.record = record;
  21. this.producer = producer;
  22. }
  23. public void run() {
  24. final String id = Thread.currentThread().getId() +"-"+System.identityHashCode(producer);
  25. try {
  26. producer.send(record, new Callback() {
  27. public void onCompletion(RecordMetadata metadata,Exception exception) {
  28. if(null!=exception){
  29. exception.printStackTrace();
  30. }
  31. if(null!=metadata){
  32. System.out.println(id+"|" +String.format("偏移量:%s,分区:%s",
  33. metadata.offset(),metadata.partition()));
  34. }
  35. }
  36. });
  37. System.out.println(id+":数据["+record+"]已发送。");
  38. countDownLatch.countDown();
  39. } catch (Exception e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. }
  44. public static void main(String[] args) {
  45. KafkaProducer<String,String> producer = new KafkaProducer<String, String>(
  46. KafkaConst.producerConfig(StringSerializer.class, StringSerializer.class));
  47. try { //循环发送,通过线程池的方式
  48. for(int i=0;i<MSG_SIZE;i++){
  49. DemoUser demoUser = makeUser(i);
  50. ProducerRecord<String,String> record = new ProducerRecord<String,String>(
  51. BusiConst.CONCURRENT_USER_INFO_TOPIC,null,
  52. System.currentTimeMillis(),demoUser.getId()+"", demoUser.toString());
  53. executorService.submit(new ProduceWorker(record,producer));
  54. }
  55. countDownLatch.await();
  56. } catch (Exception e) {
  57. e.printStackTrace();
  58. } finally {
  59. producer.close();
  60. executorService.shutdown();
  61. }
  62. }
  63. }

3、更多发送配置

生产者有很多属性可以设置,大部分都有合理的默认值,无需调整。有些参数可能对内存使用,性能和可靠性方面有较大影响。可以参考org.apache.kafka.clients.producer包下的ProducerConfig类。代码见模块kafka-no-spring下包ProducerConfig中ConfigKafkaProducer类。

  1. public class ConfigKafkaProducer {
  2. public static void main(String[] args) {
  3. //TODO 生产者三个属性必须指定(broker地址清单、key和value的序列化器)
  4. Properties properties = new Properties();
  5. properties.put("bootstrap.servers","127.0.0.1:9092");
  6. properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  7. properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  8. //TODO 更多发送配置(重要的)
  9. properties.put("acks","1"); //ack 0,1,all
  10. // 一个批次可以使用的内存大小 缺省16384(16k)
  11. properties.put("batch.size",16384);
  12. // 指定了生产者在发送批次前等待更多消息加入批次的时间, 缺省0 50ms
  13. properties.put("linger.ms",0L);
  14. // 控制生产者发送请求最大大小,默认1M (这个参数和Kafka主机的message.max.bytes 参数有关系)
  15. properties.put("max.request.size",1 * 1024 * 1024);
  16. //TODO 更多发送配置(非重要的)
  17. properties.put("buffer.memory",32 * 1024 * 1024L);//生产者内存缓冲区大小
  18. properties.put("retries",0); //重发消息次数
  19. //客户端将等待请求的响应的最大时间 默认30秒
  20. properties.put("request.timeout.ms",30 * 1000);
  21. //最大阻塞时间,超过则抛出异常 缺省60000ms
  22. properties.put("max.block.ms",60*1000);
  23. // 于压缩数据的压缩类型。默认是无压缩 ,none、gzip、snappy
  24. properties.put("compression.type","none");
  25. KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
  26. try {
  27. ProducerRecord<String,String> record;
  28. try {
  29. //TODO 发送4条消息
  30. for(int i=0;i<4;i++){
  31. record = new ProducerRecord<String,String>(
  32. BusiConst.HELLO_TOPIC, String.valueOf(i),"hankin");
  33. producer.send(record);
  34. System.out.println(i+",message is sent");
  35. }
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. }
  39. } finally {
  40. producer.close();
  41. }
  42. }
  43. }

OrderKafkaProducer:

  1. public class OrderKafkaProducer {
  2. public static void main(String[] args) {
  3. //TODO 生产者三个属性必须指定(broker地址清单、key和value的序列化器)
  4. Properties properties = new Properties();
  5. properties.put("bootstrap.servers","127.0.0.1:9092");
  6. properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  7. properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  8. //TODO 顺序消息的保证(只有一个分区、)
  9. //properties.put("retries",0); //重发消息次数(设置为0)
  10. //在阻塞之前,客户端将在单个连接上发送的未确认请求的最大数目
  11. //max.in.flight.request.per.connection 设为1,这样在生产者尝试发送第一批消息时,
  12. 就不会有其他的消息发送给broker
  13. //这个值默认是5
  14. properties.put("max.in.flight.requests.per.connection",1);
  15. KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
  16. try {
  17. ProducerRecord<String,String> record;
  18. try {
  19. //TODO发送4条消息
  20. for(int i=0;i<4;i++){
  21. record = new ProducerRecord<String,String>(
  22. BusiConst.HELLO_TOPIC, String.valueOf(i),"hankin");
  23. producer.send(record);
  24. System.out.println(i+",message is sent");
  25. }
  26. } catch (Exception e) {
  27. e.printStackTrace();
  28. }
  29. } finally {
  30. producer.close();
  31. }
  32. }
  33. }

1)acks

Kafk内部的复制机制是比较复杂的,这里不谈论内部机制(后续章节进行细讲),我们只讨论生产者发送消息时与副本的关系。 指定了必须要有多少个分区副本收到消息,生产者才会认为写入消息是成功的,这个参数对消息丢失的可能性有重大影响。

acks=0:生产者在写入消息之前不会等待任何来自服务器的响应,容易丢消息,但是吞吐量高。

acks=1:只要集群的首领节点收到消息,生产者会收到来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新首领没有选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失,默认使用这个配置。

acks=all:只有当所有参与复制的节点都收到消息,生产者才会收到一个来自服务器的成功响应。延迟高。

金融业务,主备外加异地灾备。所以很多高可用场景一般不是设置2个副本,有可能达到5个副本,不同机架上部署不同的副本,异地上也部署一套副本。buffer.memory设置生产者内存缓冲区的大小(结合生产者发送消息的基本流程),生产者用它缓冲要发送到服务器的消息。如果数据产生速度大于向 broker 发送的速度,导致生产者空间不足,producer会阻塞或者抛出异常。缺省33554432 (32M)

2)max.block.ms

指定了在调用send()方法或者使用partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到max.block.ms时,生产者会抛出超时异常,缺省60000ms。

3)retries

发送失败时,指定生产者可以重发消息的次数(缺省Integer.MAX_VALUE)。默认情况下,生产者在每次重试之间等待100ms,可以通过参数retry.backoff.ms参数来改变这个时间间隔。

4)r**eceive.buffer.bytes send.buffer.bytes**

指定TCP socket接受和发送数据包的缓存区大小。如果它们被设置为-1,则使用操作系统的默认值。如果生产者或消费者处在不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽,缺省102400。

5)batch.size

当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次内存被填满后,批次里的所有消息会被发送出去。但是生产者不一定都会等到批次被填满才发送,半满甚至只包含一个消息的批次也有可能被发送。缺省16384(16k) ,如果一条消息超过了批次的大小,会写不进去。

6)linger.ms

指定了生产者在发送批次前等待更多消息加入批次的时间。它和batch.size以先到者为先。也就是说,一旦我们获得消息的数量够batch.size的数量了,他将会立即发送而不顾这项设置,然而如果我们获得消息字节数比batch.size设置要小的多,我们需要“linger”特定的时间以获取更多的消息。这个设置默认为0,即没有延迟。设定linger.ms=5,例如,将会减少请求数目,但是同时会增加5ms的延迟,但也会提升消息的吞吐量。

7)compression.type

producer用于压缩数据的压缩类型,默认是无压缩,正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好。snappy占用cpu少,提供较好的性能和可观的压缩比,如果比较关注性能和网络带宽用这个。如果带宽紧张用 gzip会占用较多的cpu,但提供更高的压缩比。

8)client.id

当向server发出请求时,这个字符串会发送给server,目的是能够追踪请求源头,以此来允许ip/port许可列表之外的一些应用可以发送信息。这项应用可以设置任意字符串,因为没有任何功能性的目的,除了记录和跟踪。

9)max.in.flight.requests.per.connection

指定了生产者在接收到服务器响应之前可以发送多个消息,值越高,占用的内存越大,当然也可以提升吞吐量。发生错误时,可能会造成数据的发送顺序改变,默认是 5 (修改)。如果需要保证消息在一个分区上的严格顺序,这个值应该设为1。不过这样会严重影响生产者的吞吐量。

10)request.timeout.ms

客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求;超过重试次数将抛异常,默认30秒。

11)metadata.fetch.timeout.ms

是指我们所获取的一些元数据的第一个时间数据,元数据包含:topic、host、partitions。此项配置是指当等待元数据fetch成功完成所需要的时间,否则会跑出异常给客户端。

12)max.request.size

控制生产者发送请求最大大小。默认这个值为1M,如果一个请求里只有一个消息,那这个消息不能大于1M,如果一次请求是一个批次,该批次包含了1000条消息,那么每个消息不能大于1KB。注意:broker具有自己对消息记录尺寸的覆盖,如果这个尺寸小于生产者的这个设置,会导致消息被拒绝。这个参数和 Kafka主机的message.max.bytes参数有关系。如果生产者发送的消息超过message.max.bytes设置的大小,就会被Kafka服务器拒绝。

以上参数不用去,一般来说,就记住acks、batch.size、linger.ms、max.request.size就行了,因为这4个参数重要些,其他参数一般没有太大必要调整。

13)顺序保证

Kafka可以保证同一个分区里的消息是有序的。也就是说,发送消息时,主题只有且只有一个分区,同时生产者按照一定的顺序发送消息,broker就会按照这个顺序把它们写入分区,消费者也会按照同样的顺序读取它们。在某些情况下, 顺序是非常重要的。例如,往一个账户存入100元再取出来,这个与先取钱再存钱是截然不同的!不过,有些场景对顺序不是很敏感。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L20wXzM3NjYxNDU4_size_16_color_FFFFFF_t_70 1

如果把retires设为非零整数,同时把max.in.flight.requests.per.connection设为比1大的数,那么,如果第一个批次消息写入失败,而第二个批次写入成功,broker会重试写入第一个批次。如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了。

一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把retires设为0(不重试的话消息可能会因为连接关闭等原因会丢)。所以还是需要重试,同时把max.in.flight.request.per.connection设为 1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给broker。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。

4、序列化

创建生产者对象必须指定序列化器,默认的序列化器并不能满足我们所有的场景。我们完全可以自定义序列化器。只要实现org.apache.kafka.common.serialization.Serializer接口即可。

如何实现,看模块kafka-no-spring下包selfserial 中代码。

序列化代码如下:

  1. public class SelfSerializer implements Serializer<DemoUser> {
  2. public void configure(Map<String, ?> configs, boolean isKey) {
  3. //do nothing
  4. }
  5. public byte[] serialize(String topic, DemoUser data) {
  6. try {
  7. byte[] name;
  8. int nameSize;
  9. if(data==null){
  10. return null;
  11. }
  12. if(data.getName()!=null){
  13. name = data.getName().getBytes("UTF-8");
  14. //字符串的长度
  15. nameSize = data.getName().length();
  16. }else{
  17. name = new byte[0];
  18. nameSize = 0;
  19. }
  20. /*id的长度4个字节,字符串的长度描述4个字节,
  21. 字符串本身的长度nameSize个字节*/
  22. ByteBuffer buffer = ByteBuffer.allocate(4+4+nameSize);
  23. buffer.putInt(data.getId());//4
  24. buffer.putInt(nameSize);//4
  25. buffer.put(name);//nameSize
  26. return buffer.array();
  27. } catch (Exception e) {
  28. throw new SerializationException("Error serialize DemoUser:"+e);
  29. }
  30. }
  31. public void close() {
  32. //do nothing
  33. }
  34. }

反序列化代码:

  1. public class SelfDeserializer implements Deserializer<DemoUser> {
  2. public void configure(Map<String, ?> configs, boolean isKey) {
  3. //do nothing
  4. }
  5. public DemoUser deserialize(String topic, byte[] data) {
  6. try {
  7. if(data==null){
  8. return null;
  9. }
  10. if(data.length<8){
  11. throw new SerializationException("Error data size.");
  12. }
  13. ByteBuffer buffer = ByteBuffer.wrap(data);
  14. int id;
  15. String name;
  16. int nameSize;
  17. id = buffer.getInt();
  18. nameSize = buffer.getInt();
  19. byte[] nameByte = new byte[nameSize];
  20. buffer.get(nameByte);
  21. name = new String(nameByte,"UTF-8");
  22. return new DemoUser(id,name);
  23. } catch (Exception e) {
  24. throw new SerializationException("Error Deserializer DemoUser."+e);
  25. }
  26. }
  27. public void close() {
  28. //do nothing
  29. }
  30. }

发送消息—未来某个时候get发送结果

  1. public class SelfSerialProducer {
  2. private static KafkaProducer<String, DemoUser> producer = null;
  3. public static void main(String[] args){
  4. producer = new KafkaProducer<String, DemoUser>(KafkaConst.producerConfig(StringSerializer.class,SelfSerializer.class));
  5. try {
  6. // 待发送的消息实例
  7. ProducerRecord<String,DemoUser> record;
  8. try{
  9. record = new ProducerRecord<String,DemoUser>( BusiConst.SELF_SERIAL_TOPIC,
  10. "user001",new DemoUser(1,"hankinn01"));
  11. producer.send(record);
  12. System.out.println("sent: "+record);
  13. }catch (Exception e){
  14. e.printStackTrace();
  15. }
  16. } finally {
  17. producer.close();
  18. }
  19. }
  20. }

20191019235729186.png

自定义序列化需要考虑的问题**:**

自定义序列化容易导致程序的脆弱性。举例,在我们上面的实现里,我们有多种类型的消费者,每个消费者对实体字段都有各自的需求,比如,有的将字段变更为long型,有的会增加字段,这样会出现新旧消息的兼容性问题。特别是在系统升级的时候,经常会出现一部分系统升级,其余系统被迫跟着升级的情况。

解决这个问题,可以考虑使用自带格式描述以及语言无关的序列化框架。如Protobuf或者Kafka官方推荐的Apache Avro,Avro会使用一个JSON文件作为schema来描述数据,Avro在读写时会用到这个schema,可以把这个schema内嵌在数据文件中。这样不管数据格式如何变动,消费者都知道如何处理数据。

但是内嵌的消息,自带格式,会导致消息的大小不必要的增大,消耗了资源。我们可以使用schema注册表机制,将所有写入的数据用到的schema保存在注册表中,然后在消息中引用schema的标识符,而读取的数据的消费者程序使用这个标识符从注册表中拉取schema来反序列化记录。

注意:Kafka本身并不提供schema注册表,需要借助第三方,现在已经有很多的开源实现,比如 Confluent Schema Registry,可以从GitHub上获取。

如何使用参考如下网址:https://cloud.tencent.com/developer/article/1336568

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L20wXzM3NjYxNDU4_size_16_color_FFFFFF_t_70 2

5、kafka分区

我们在新增ProducerRecord对象中可以看到,ProducerRecord包含了目标主题,键和值,Kafka的消息都是一个个的键值对。键可以设置为默认的null。键的主要用途有两个:第一:用来决定消息被写往主题的哪个分区,拥有相同键的消息将被写往同一个分区,第二:还可以作为消息的附加消息。

如果键值为null,并且使用默认的分区器,分区器使用轮询算法将消息均衡地分布到各个分区上。

如果键不为空,并且使用默认的分区器,Kafka对键进行散列(Kafka自定义的散列算法,具体算法原理不知),然后根据散列值把消息映射到特定的分区上。很明显,同一个键总是被映射到同一个分区。但是只有不改变主题分区数量的情况下,键和分区之间的映射才能保持不变,一旦增加了新的分区,就无法保证了,所以如果要使用键来映射分区,那就要在创建主题的时候把分区规划好,而且永远不要增加新分区。

自定义分区器**: **

某些情况下,数据特性决定了需要进行特殊分区,比如电商业务,北京的业务量明显比较大,占据了总业务量的20%,我们需要对北京的订单进行单独分区处理,默认的散列分区算法不合适了, 我们就可以自定义分区算法,对北京的订单单独处理,其他地区沿用散列分区算法。或者某些情况下,我们用value来进行分区。 具体实现,先创建一个4分区的主题,然后观察模块kafka-no-spring下包selfpartition中代码。

定义分区器,以value值进行分区**代码**:

public class SelfPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//拿到

  • List partitionInfos = cluster.partitionsForTopic(topic); //*TODO 分区数
    * *int num = partitionInfos.size();
    //*
    TODO 根据value与分区数求余的方式得到分区ID *int parId = ((String)value).hashCode()%num; return parId;
    }
    public void* close() { //do nothing*
  • }
    public void configure(Map configs) {
    //do nothing*
  • *}
    }

可以和KafkaFutureProducer比较分区结果:

  1. public class SelfPartitionProducer {
  2. private static KafkaProducer<String,String> producer = null;
  3. public static void main(String[] args) {
  4. /*消息生产者*/
  5. Properties properties = KafkaConst.producerConfig(StringSerializer.class,StringSerializer.class);
  6. /*使用自定义的分区器*/
  7. properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.chj.selfpartition.SelfPartitioner");
  8. producer = new KafkaProducer<String, String>(properties);
  9. try {
  10. /*待发送的消息实例*/
  11. ProducerRecord<String,String> record;
  12. try {
  13. record = new ProducerRecord<String,String>( BusiConst.SELF_PARTITION_TOPIC,"teacher01","hankin");
  14. Future<RecordMetadata> future = producer.send(record);
  15. System.out.println("Do other something");
  16. RecordMetadata recordMetadata = future.get();
  17. if(null!=recordMetadata){
  18. System.out.println(String.format("偏移量:%s,分区:%s", recordMetadata.offset(),recordMetadata.partition()));
  19. }
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. }
  23. } finally {
  24. producer.close();
  25. }
  26. }
  27. }
  28. /**
  29. * 类说明:可以和KafkaFutureProducer比较分区结果
  30. */
  31. public class SysPartitionProducer {
  32. private static KafkaProducer<String,String> producer = null;
  33. public static void main(String[] args) {
  34. // 消息生产者
  35. Properties properties = KafkaConst.producerConfig(StringSerializer.class,StringSerializer.class);
  36. producer = new KafkaProducer<String, String>(properties);
  37. try {
  38. // 待发送的消息实例
  39. ProducerRecord<String,String> record;
  40. try {
  41. record = new ProducerRecord<String,String>(
  42. BusiConst.SELF_PARTITION_TOPIC,"teacher01","hankin");
  43. Future<RecordMetadata> future = producer.send(record);
  44. System.out.println("Do other something");
  45. RecordMetadata recordMetadata = future.get();
  46. if(null!=recordMetadata){
  47. System.out.println(String.format("偏移量:%s,分区:%s",recordMetadata.offset(),recordMetadata.partition()));
  48. }
  49. } catch (Exception e) {
  50. e.printStackTrace();
  51. }
  52. } finally {
  53. producer.close();
  54. }
  55. }

#

发表评论

表情:
评论列表 (有 0 条评论,84人围观)

还没有评论,来说两句吧...

相关阅读