Kafka安装,SpringBoot整合Kafka实现日志收集附源码
一、Kafka安装
- 点击下载地址
- 解压,进入windows目录,启动文件都在这个目录下。
二、启动服务
没有java环境先安装java。
1、启动ZooKeeper
进入D:\my_software\kafka_2.13-2.4.1\bin\windows目录,右键打开PowerShell,输入命令
.\zookeeper-server-start.bat ..\..\config\zookeeper.properties
2、启动Kafka
进入D:\my_software\kafka_2.13-2.4.1\bin\windows目录,右键打开PowerShell,输入命令
.\kafka-server-start.bat ..\..\config\server.properties
Kafka默认连接暴露端口:9092
三、SpringBoot整合Kafka实现日志收集
该工程达到以下效果:只需要在需要收集日志的工程里调用Logger的info、debugger、warning、error方法即可在消费者工程里面收集到信息
1. 创建父工程,贴上pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>per.cyl</groupId>
<artifactId>springboot-kafka-log</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>log-producer</module>
<module>log-consumer</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<name>springboot-kafka-log</name>
<description>springboot kafka log日志收集</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2. 创建消息产生者工程(log-producer)
2.1 配置文件
yml
logging:
config: classpath:log4j.xml
server:
port: 8080
log4j.xml
<?xml version="1.0" encoding="UTF-8"?>
<!-- Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,当设置成trace时, 你会看到log4j2内部各种详细输出。可以设置成OFF(关闭)或Error(只输出错误信息) -->
<Configuration status="OFF">
<Properties>
<!-- 配置日志文件输出目录 -->
<Property name="log_path" value="log/" />
<Property name="file_name">log</Property>
<Property name="kafka_log_topic">kafka_log_topic</Property>
<Property name="bootstrap_servers">localhost:9092</Property>
</Properties>
<Appenders>
<!-- 输出控制台日志的配置 -->
<Console name="console" target="SYSTEM_OUT">
<!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
<ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY" />
<!-- 输出日志的格式 -->
<PatternLayout pattern="%d{yyyy-MM-dd HH
ss SSS} [%t] %-5level %logger{36} - %msg%n" />
</Console>
<File name="log_file" fileName="${log_path}/${file_name}.log" append="true" immediateFlush="true">
<PatternLayout pattern="%d{yy-MM-dd HH
ss.SSS} [%t] %-5level %logger{36} - %msg%n" />
</File>
<Kafka name="kafka" topic="${kafka_log_topic}">
<!--<PatternLayout pattern="%date %message"/>-->
<Property name="bootstrap.servers">${bootstrap_servers}</Property>
<!--json格式输出-->
<JsonLayout compact="true" locationInfo="true" complete="false" eventEol="true"/>
</Kafka>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="kafka"/>
<AppenderRef ref="console"/>
<AppenderRef ref="log_file"/>
</Root>
<!-- <Logger name="org.apache.kafka" level="INFO" />--> <!-- avoid recursive logging -->
<logger name="org.springframework" level="INFO"/>
</Loggers>
</Configuration>
2.2 启动类和业务代码(ProducerApp.java)
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/** * @author 陈玉林 * @desc TODO * @date 2020/6/4 9:00 */
@SpringBootApplication
@RestController
public class ProducerApp {
private static Logger logger = LoggerFactory.getLogger(ProducerApp.class);
public static void main(String[] args) {
SpringApplication.run(ProducerApp.class, args);
}
@GetMapping("/log_test")
public String test() {
for (int i = 0; i < 10; i++) {
logger.info("info-"+i);
logger.debug("debuggger-"+i);
logger.warn("warn-"+i);
logger.error("error-"+i);
}
return "success";
}
}
3. 创建消息消费者工程(log-consumer)
3.1 pom文件增加一个依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
3.2 yml配置文件
server:
port: 8081
spring:
kafka:
# kafka服务器地址,多个集群用逗号分隔
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default_consumer_group
enable-auto-commit: true
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.3 启动类和业务代码
启动类ConsumerApp.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/** * @author 陈玉林 * @desc TODO * @date 2020/6/4 9:00 */
@SpringBootApplication
public class ConsumerApp {
public static void main(String[] args) {
SpringApplication.run(ConsumerApp.class, args);
}
}
Message.java
public class Message {
private String level;
private String message;
private String loggerName;
public String getLevel() {
return level;
}
public void setLevel(String level) {
this.level = level;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public String getLoggerName() {
return loggerName;
}
public void setLoggerName(String loggerName) {
this.loggerName = loggerName;
}
@Override
public String toString() {
return "Message{" +
"level='" + level + '\'' +
", message='" + message + '\'' +
", loggerName='" + loggerName + '\'' +
'}';
}
}
消息监听MessageConsumer.java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/** * @author 陈玉林 * @desc TODO * @date 2020/6/4 9:04 */
@Component
public class MessageConsumer {
@KafkaListener(topics = "kafka_log_topic")
public void listen (ConsumerRecord<?, ?> record){
JSONObject jsonObject = (JSONObject)JSON.parse(record.value().toString());
Message message = new Message();
message.setLevel(jsonObject.get("level").toString());
message.setLoggerName(jsonObject.get("loggerName").toString());
message.setMessage(jsonObject.get("message").toString());
System.out.println("收到的消息:"+message);
}
}
4. 测试
- 启动zookeeper、kafka
- 启动消费工程
- 启动生产工程
访问:http://localhost:8080/log\_test ,在消费者控制台可以看到如下结果
收到的消息:Message{level='INFO', message='info-0', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='WARN', message='warn-0', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='ERROR', message='error-0', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='INFO', message='info-1', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='WARN', message='warn-1', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='ERROR', message='error-1', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='INFO', message='info-2', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='WARN', message='warn-2', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='ERROR', message='error-2', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='INFO', message='info-3', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='WARN', message='warn-3', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='ERROR', message='error-3', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='INFO', message='info-4', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='WARN', message='warn-4', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='ERROR', message='error-4', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='INFO', message='info-5', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='WARN', message='warn-5', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='ERROR', message='error-5', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='INFO', message='info-6', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='WARN', message='warn-6', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='ERROR', message='error-6', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='INFO', message='info-7', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='WARN', message='warn-7', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='ERROR', message='error-7', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='INFO', message='info-8', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='WARN', message='warn-8', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='ERROR', message='error-8', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='INFO', message='info-9', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='WARN', message='warn-9', loggerName='per.cyl.log.producer.ProducerApp'}
收到的消息:Message{level='ERROR', message='error-9', loggerName='per.cyl.log.producer.ProducerApp'}
源码下载
还没有评论,来说两句吧...