Kafka安装,SpringBoot整合Kafka实现日志收集附源码

偏执的太偏执、 2023-07-24 14:38 92阅读 0赞

一、Kafka安装

  1. 点击下载地址
    在这里插入图片描述
  2. 解压,进入windows目录,启动文件都在这个目录下。
    在这里插入图片描述

二、启动服务

没有java环境先安装java。

1、启动ZooKeeper

进入D:\my_software\kafka_2.13-2.4.1\bin\windows目录,右键打开PowerShell,输入命令

  1. .\zookeeper-server-start.bat ..\..\config\zookeeper.properties

在这里插入图片描述

2、启动Kafka

进入D:\my_software\kafka_2.13-2.4.1\bin\windows目录,右键打开PowerShell,输入命令

  1. .\kafka-server-start.bat ..\..\config\server.properties

在这里插入图片描述
Kafka默认连接暴露端口:9092

三、SpringBoot整合Kafka实现日志收集

该工程达到以下效果:只需要在需要收集日志的工程里调用Logger的info、debugger、warning、error方法即可在消费者工程里面收集到信息

1. 创建父工程,贴上pom

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <groupId>per.cyl</groupId>
  5. <artifactId>springboot-kafka-log</artifactId>
  6. <packaging>pom</packaging>
  7. <version>1.0-SNAPSHOT</version>
  8. <modules>
  9. <module>log-producer</module>
  10. <module>log-consumer</module>
  11. </modules>
  12. <parent>
  13. <groupId>org.springframework.boot</groupId>
  14. <artifactId>spring-boot-starter-parent</artifactId>
  15. <version>2.3.0.RELEASE</version>
  16. <relativePath/> <!-- lookup parent from repository -->
  17. </parent>
  18. <name>springboot-kafka-log</name>
  19. <description>springboot kafka log日志收集</description>
  20. <properties>
  21. <java.version>1.8</java.version>
  22. </properties>
  23. <dependencies>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-web</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.springframework.kafka</groupId>
  30. <artifactId>spring-kafka</artifactId>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.springframework.boot</groupId>
  34. <artifactId>spring-boot-starter</artifactId>
  35. <exclusions>
  36. <exclusion>
  37. <groupId>org.springframework.boot</groupId>
  38. <artifactId>spring-boot-starter-logging</artifactId>
  39. </exclusion>
  40. </exclusions>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.springframework.boot</groupId>
  44. <artifactId>spring-boot-starter-log4j2</artifactId>
  45. </dependency>
  46. </dependencies>
  47. <build>
  48. <plugins>
  49. <plugin>
  50. <groupId>org.springframework.boot</groupId>
  51. <artifactId>spring-boot-maven-plugin</artifactId>
  52. </plugin>
  53. </plugins>
  54. </build>
  55. </project>

2. 创建消息产生者工程(log-producer)

2.1 配置文件

  1. yml

    1. logging:
    2. config: classpath:log4j.xml
    3. server:
    4. port: 8080
  2. log4j.xml

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <!-- Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,当设置成trace时, 你会看到log4j2内部各种详细输出。可以设置成OFF(关闭)或Error(只输出错误信息) -->
    3. <Configuration status="OFF">
    4. <Properties>
    5. <!-- 配置日志文件输出目录 -->
    6. <Property name="log_path" value="log/" />
    7. <Property name="file_name">log</Property>
    8. <Property name="kafka_log_topic">kafka_log_topic</Property>
    9. <Property name="bootstrap_servers">localhost:9092</Property>
    10. </Properties>
    11. <Appenders>
    12. <!-- 输出控制台日志的配置 -->
    13. <Console name="console" target="SYSTEM_OUT">
    14. <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
    15. <ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY" />
    16. <!-- 输出日志的格式 -->
    17. <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss SSS} [%t] %-5level %logger{36} - %msg%n" />
    18. </Console>
    19. <File name="log_file" fileName="${log_path}/${file_name}.log" append="true" immediateFlush="true">
    20. <PatternLayout pattern="%d{yy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" />
    21. </File>
    22. <Kafka name="kafka" topic="${kafka_log_topic}">
    23. <!--<PatternLayout pattern="%date %message"/>-->
    24. <Property name="bootstrap.servers">${bootstrap_servers}</Property>
    25. <!--json格式输出-->
    26. <JsonLayout compact="true" locationInfo="true" complete="false" eventEol="true"/>
    27. </Kafka>
    28. </Appenders>
    29. <Loggers>
    30. <Root level="info">
    31. <AppenderRef ref="kafka"/>
    32. <AppenderRef ref="console"/>
    33. <AppenderRef ref="log_file"/>
    34. </Root>
    35. <!-- <Logger name="org.apache.kafka" level="INFO" />--> <!-- avoid recursive logging -->
    36. <logger name="org.springframework" level="INFO"/>
    37. </Loggers>
    38. </Configuration>

2.2 启动类和业务代码(ProducerApp.java)

  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. /** * @author 陈玉林 * @desc TODO * @date 2020/6/4 9:00 */
  8. @SpringBootApplication
  9. @RestController
  10. public class ProducerApp {
  11. private static Logger logger = LoggerFactory.getLogger(ProducerApp.class);
  12. public static void main(String[] args) {
  13. SpringApplication.run(ProducerApp.class, args);
  14. }
  15. @GetMapping("/log_test")
  16. public String test() {
  17. for (int i = 0; i < 10; i++) {
  18. logger.info("info-"+i);
  19. logger.debug("debuggger-"+i);
  20. logger.warn("warn-"+i);
  21. logger.error("error-"+i);
  22. }
  23. return "success";
  24. }
  25. }

3. 创建消息消费者工程(log-consumer)

3.1 pom文件增加一个依赖

  1. <dependency>
  2. <groupId>com.alibaba</groupId>
  3. <artifactId>fastjson</artifactId>
  4. <version>1.2.62</version>
  5. </dependency>

3.2 yml配置文件

  1. server:
  2. port: 8081
  3. spring:
  4. kafka:
  5. # kafka服务器地址,多个集群用逗号分隔
  6. bootstrap-servers: localhost:9092
  7. producer:
  8. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  9. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  10. consumer:
  11. group-id: default_consumer_group
  12. enable-auto-commit: true
  13. auto-commit-interval: 1000
  14. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  15. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.3 启动类和业务代码

  1. 启动类ConsumerApp.java

    1. import org.springframework.boot.SpringApplication;
    2. import org.springframework.boot.autoconfigure.SpringBootApplication;
    3. /** * @author 陈玉林 * @desc TODO * @date 2020/6/4 9:00 */
    4. @SpringBootApplication
    5. public class ConsumerApp {
    6. public static void main(String[] args) {
    7. SpringApplication.run(ConsumerApp.class, args);
    8. }
    9. }
  2. Message.java

    1. public class Message {
    2. private String level;
    3. private String message;
    4. private String loggerName;
    5. public String getLevel() {
    6. return level;
    7. }
    8. public void setLevel(String level) {
    9. this.level = level;
    10. }
    11. public String getMessage() {
    12. return message;
    13. }
    14. public void setMessage(String message) {
    15. this.message = message;
    16. }
    17. public String getLoggerName() {
    18. return loggerName;
    19. }
    20. public void setLoggerName(String loggerName) {
    21. this.loggerName = loggerName;
    22. }
    23. @Override
    24. public String toString() {
    25. return "Message{" +
    26. "level='" + level + '\'' +
    27. ", message='" + message + '\'' +
    28. ", loggerName='" + loggerName + '\'' +
    29. '}';
    30. }
    31. }
  3. 消息监听MessageConsumer.java

    1. import com.alibaba.fastjson.JSON;
    2. import com.alibaba.fastjson.JSONObject;
    3. import org.apache.kafka.clients.consumer.ConsumerRecord;
    4. import org.springframework.kafka.annotation.KafkaListener;
    5. import org.springframework.stereotype.Component;
    6. /** * @author 陈玉林 * @desc TODO * @date 2020/6/4 9:04 */
    7. @Component
    8. public class MessageConsumer {
    9. @KafkaListener(topics = "kafka_log_topic")
    10. public void listen (ConsumerRecord<?, ?> record){
    11. JSONObject jsonObject = (JSONObject)JSON.parse(record.value().toString());
    12. Message message = new Message();
    13. message.setLevel(jsonObject.get("level").toString());
    14. message.setLoggerName(jsonObject.get("loggerName").toString());
    15. message.setMessage(jsonObject.get("message").toString());
    16. System.out.println("收到的消息:"+message);
    17. }
    18. }

4. 测试

  1. 启动zookeeper、kafka
  2. 启动消费工程
  3. 启动生产工程
  4. 访问:http://localhost:8080/log\_test ,在消费者控制台可以看到如下结果

    1. 收到的消息:Message{level='INFO', message='info-0', loggerName='per.cyl.log.producer.ProducerApp'}
    2. 收到的消息:Message{level='WARN', message='warn-0', loggerName='per.cyl.log.producer.ProducerApp'}
    3. 收到的消息:Message{level='ERROR', message='error-0', loggerName='per.cyl.log.producer.ProducerApp'}
    4. 收到的消息:Message{level='INFO', message='info-1', loggerName='per.cyl.log.producer.ProducerApp'}
    5. 收到的消息:Message{level='WARN', message='warn-1', loggerName='per.cyl.log.producer.ProducerApp'}
    6. 收到的消息:Message{level='ERROR', message='error-1', loggerName='per.cyl.log.producer.ProducerApp'}
    7. 收到的消息:Message{level='INFO', message='info-2', loggerName='per.cyl.log.producer.ProducerApp'}
    8. 收到的消息:Message{level='WARN', message='warn-2', loggerName='per.cyl.log.producer.ProducerApp'}
    9. 收到的消息:Message{level='ERROR', message='error-2', loggerName='per.cyl.log.producer.ProducerApp'}
    10. 收到的消息:Message{level='INFO', message='info-3', loggerName='per.cyl.log.producer.ProducerApp'}
    11. 收到的消息:Message{level='WARN', message='warn-3', loggerName='per.cyl.log.producer.ProducerApp'}
    12. 收到的消息:Message{level='ERROR', message='error-3', loggerName='per.cyl.log.producer.ProducerApp'}
    13. 收到的消息:Message{level='INFO', message='info-4', loggerName='per.cyl.log.producer.ProducerApp'}
    14. 收到的消息:Message{level='WARN', message='warn-4', loggerName='per.cyl.log.producer.ProducerApp'}
    15. 收到的消息:Message{level='ERROR', message='error-4', loggerName='per.cyl.log.producer.ProducerApp'}
    16. 收到的消息:Message{level='INFO', message='info-5', loggerName='per.cyl.log.producer.ProducerApp'}
    17. 收到的消息:Message{level='WARN', message='warn-5', loggerName='per.cyl.log.producer.ProducerApp'}
    18. 收到的消息:Message{level='ERROR', message='error-5', loggerName='per.cyl.log.producer.ProducerApp'}
    19. 收到的消息:Message{level='INFO', message='info-6', loggerName='per.cyl.log.producer.ProducerApp'}
    20. 收到的消息:Message{level='WARN', message='warn-6', loggerName='per.cyl.log.producer.ProducerApp'}
    21. 收到的消息:Message{level='ERROR', message='error-6', loggerName='per.cyl.log.producer.ProducerApp'}
    22. 收到的消息:Message{level='INFO', message='info-7', loggerName='per.cyl.log.producer.ProducerApp'}
    23. 收到的消息:Message{level='WARN', message='warn-7', loggerName='per.cyl.log.producer.ProducerApp'}
    24. 收到的消息:Message{level='ERROR', message='error-7', loggerName='per.cyl.log.producer.ProducerApp'}
    25. 收到的消息:Message{level='INFO', message='info-8', loggerName='per.cyl.log.producer.ProducerApp'}
    26. 收到的消息:Message{level='WARN', message='warn-8', loggerName='per.cyl.log.producer.ProducerApp'}
    27. 收到的消息:Message{level='ERROR', message='error-8', loggerName='per.cyl.log.producer.ProducerApp'}
    28. 收到的消息:Message{level='INFO', message='info-9', loggerName='per.cyl.log.producer.ProducerApp'}
    29. 收到的消息:Message{level='WARN', message='warn-9', loggerName='per.cyl.log.producer.ProducerApp'}
    30. 收到的消息:Message{level='ERROR', message='error-9', loggerName='per.cyl.log.producer.ProducerApp'}

源码下载

发表评论

表情:
评论列表 (有 0 条评论,92人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Kafka日志删除分析

    昨天[Kafka][]集群磁盘容量达到了90%,于是赶紧将Log的保存时间设置成24小时,但是发现设置完之后Log仍然没有被删除。于是今天特意去看了一下[Kafka][]日志删

    相关 SpringBoot整合kafka

    > 经过前三篇文章 安装jdk 安装zookeeper 以及安装kafka 全部已经竣工了,不知道小伙伴们成功搭建kafka了不。 > 憋了三天的大招,今天放出来吧。今天大家