最简单的 spring 整合 kafka 例子

深碍√TFBOYSˉ_ 2022-02-24 05:19 418阅读 0赞

这周到联调阶段,回顾项目的开发,之前在spring跟kafka这块吃了不少亏,网上的资料太繁琐、配置好了还各种报错,我今天整理一个最最简单的demo,以供参考。

前提条件:

  • 安装zookeeper,下载解压,更改配置文件名称、配置环境变量,启动即可,资料比较全,搜一下就有
  • 安装kafka, 更简单,下载解压,启动即可
  • jdk8 + 的环境,现在的spring基本指spring boot
  • 在kafka创建两个topic,wen.topic、wen.fooTopic

现在开始集成,只有5个类+一个配置文件+pom.xml

在这里插入图片描述
1.pom.xml 引进spring和kafka的包

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <groupId>com.wen</groupId>
  5. <artifactId>spring-kafka</artifactId>
  6. <version>0.0.1-SNAPSHOT</version>
  7. <packaging>jar</packaging>
  8. <name>spring-kafka</name>
  9. <url>http://maven.apache.org</url>
  10. <parent>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-parent</artifactId>
  13. <version>2.0.0.RELEASE</version>
  14. <relativePath/>
  15. </parent>
  16. <properties>
  17. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  18. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  19. <java.version>1.8</java.version>
  20. </properties>
  21. <dependencies>
  22. <dependency>
  23. <groupId>org.springframework.kafka</groupId>
  24. <artifactId>spring-kafka</artifactId>
  25. <version>2.1.4.RELEASE</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.springframework.boot</groupId>
  29. <artifactId>spring-boot-starter-web</artifactId>
  30. </dependency>
  31. </dependencies>
  32. <build>
  33. <plugins>
  34. <plugin>
  35. <groupId>org.springframework.boot</groupId>
  36. <artifactId>spring-boot-maven-plugin</artifactId>
  37. </plugin>
  38. </plugins>
  39. </build>
  40. <repositories>
  41. <repository>
  42. <id>spring-milestones</id>
  43. <name>Spring Milestones</name>
  44. <url>https://repo.spring.io/libs-milestone</url>
  45. <snapshots>
  46. <enabled>false</enabled>
  47. </snapshots>
  48. </repository>
  49. </repositories>
  50. </project>

2.application.yml 配置文件

  1. kafka:
  2. brokerAddress: localhost:9092
  3. topic: wen.topic
  4. fooTopic: wen.fooTopic
  5. spring:
  6. jmx:
  7. enabled: false
  1. ConfigProperties ,这是配置类,将配置文件中的连接参数、主题名称转换为对象

    package com.wen.kafka;

    import org.springframework.boot.context.properties.ConfigurationProperties;

    @ConfigurationProperties(prefix = “kafka”)
    public class ConfigProperties {

    1. private String brokerAddress;
    2. private String topic;
    3. private String fooTopic;
    4. public String getBrokerAddress() {
    5. return this.brokerAddress;
    6. }
    7. public void setBrokerAddress(String brokerAddress) {
    8. this.brokerAddress = brokerAddress;
    9. }
    10. public String getTopic() {
    11. return this.topic;
    12. }
    13. public void setTopic(String topic) {
    14. this.topic = topic;
    15. }
    16. public String getFooTopic() {
    17. return this.fooTopic;
    18. }
    19. public void setFooTopic(String fooTopic) {
    20. this.fooTopic = fooTopic;
    21. }

    }

4.CommonConfiguration 配置消费者、生产者

  1. package com.wen.kafka;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import org.apache.kafka.clients.consumer.ConsumerConfig;
  5. import org.apache.kafka.clients.consumer.ConsumerRecord;
  6. import org.apache.kafka.clients.producer.ProducerConfig;
  7. import org.apache.kafka.common.serialization.StringDeserializer;
  8. import org.apache.kafka.common.serialization.StringSerializer;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.context.annotation.Bean;
  11. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  12. import org.springframework.kafka.core.ConsumerFactory;
  13. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  14. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  15. import org.springframework.kafka.core.KafkaTemplate;
  16. import org.springframework.kafka.core.ProducerFactory;
  17. import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
  18. import org.springframework.kafka.support.converter.StringJsonMessageConverter;
  19. import org.springframework.retry.support.RetryTemplate;
  20. public class CommonConfiguration {
  21. @Autowired
  22. private ConfigProperties configProperties;
  23. @Bean
  24. public ProducerFactory<String, String> producerFactory() {
  25. Map<String, Object> props = new HashMap<>();
  26. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configProperties.getBrokerAddress());
  27. props.put(ProducerConfig.RETRIES_CONFIG, 0);
  28. props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  29. props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  30. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
  31. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  32. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  33. return new DefaultKafkaProducerFactory<>(props);
  34. }
  35. @Bean
  36. public KafkaTemplate<String, String> kafkaTemplate() {
  37. return new KafkaTemplate<>(producerFactory());
  38. }
  39. @Bean
  40. public ConsumerFactory<String, String> consumerFactory() {
  41. return new DefaultKafkaConsumerFactory<>(consumerProperties());
  42. }
  43. @Bean
  44. public Map<String, Object> consumerProperties() {
  45. Map<String, Object> props = new HashMap<>();
  46. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configProperties.getBrokerAddress());
  47. props.put(ConsumerConfig.GROUP_ID_CONFIG, "s1pGroup");
  48. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  49. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
  50. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  51. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  52. return props;
  53. }
  54. @Bean
  55. public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  56. ConcurrentKafkaListenerContainerFactory<String, String> factory =
  57. new ConcurrentKafkaListenerContainerFactory<>();
  58. factory.setConsumerFactory(consumerFactory());
  59. return factory;
  60. }
  61. @Bean
  62. public ConcurrentKafkaListenerContainerFactory<String, String> jsonKafkaListenerContainerFactory() {
  63. ConcurrentKafkaListenerContainerFactory<String, String> factory =
  64. new ConcurrentKafkaListenerContainerFactory<>();
  65. factory.setConsumerFactory(consumerFactory());
  66. factory.setMessageConverter(new StringJsonMessageConverter());
  67. return factory;
  68. }
  69. @Bean
  70. public ConcurrentKafkaListenerContainerFactory<String, String> retryKafkaListenerContainerFactory() {
  71. ConcurrentKafkaListenerContainerFactory<String, String> factory =
  72. new ConcurrentKafkaListenerContainerFactory<>();
  73. factory.setConsumerFactory(consumerFactory());
  74. factory.setRetryTemplate(new RetryTemplate());
  75. return factory;
  76. }
  77. }

5.Producer 生产者

  1. package com.wen.kafka;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class Producer {
  7. @Autowired
  8. private ConfigProperties configProperties;
  9. @Autowired
  10. private KafkaTemplate<String, String> template;
  11. public void send(String foo) {
  12. this.template.send(this.configProperties.getTopic(), foo);
  13. }
  14. }

6.Consumer 消费者

  1. package com.wen.kafka;
  2. import java.util.concurrent.CountDownLatch;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class Consumer {
  7. public final CountDownLatch latch = new CountDownLatch(1);
  8. @KafkaListener(topics = "${kafka.topic}")
  9. public void listen(String foo) {
  10. System.out.println("Received: " + foo);
  11. this.latch.countDown();
  12. }
  13. }

7.KafkaApplication 使用,我这边是传入一个字符串

  1. package com.wen.kafka;
  2. import java.util.concurrent.CountDownLatch;
  3. import java.util.concurrent.TimeUnit;
  4. import com.wen.kafka.CommonConfiguration;
  5. import com.wen.kafka.ConfigProperties;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.boot.WebApplicationType;
  8. import org.springframework.boot.autoconfigure.SpringBootApplication;
  9. import org.springframework.boot.builder.SpringApplicationBuilder;
  10. import org.springframework.context.ConfigurableApplicationContext;
  11. import org.springframework.context.annotation.Bean;
  12. import org.springframework.context.annotation.Import;
  13. import org.springframework.kafka.annotation.EnableKafka;
  14. import org.springframework.kafka.annotation.KafkaListener;
  15. import org.springframework.kafka.core.KafkaTemplate;
  16. @SpringBootApplication
  17. @Import({ CommonConfiguration.class, ConfigProperties.class })
  18. @EnableKafka
  19. public class KafkaApplication {
  20. public static void main(String[] args) throws Exception {
  21. ConfigurableApplicationContext context = new SpringApplicationBuilder(KafkaApplication.class)
  22. .web(WebApplicationType.NONE)
  23. .run(args);
  24. Producer producer = context.getBean(Producer.class);
  25. producer.send("stupid");
  26. context.getBean(Consumer.class).latch.await(10, TimeUnit.SECONDS);
  27. context.close();
  28. }
  29. }

8.结果
在这里插入图片描述
以上是最基本的使用例子,至于在项目中怎么运用,灵活调用生产消费者即可

发表评论

表情:
评论列表 (有 0 条评论,418人围观)

还没有评论,来说两句吧...

相关阅读