docker安装和使用kafka

「爱情、让人受尽委屈。」 2024-04-22 12:26 169阅读 0赞

1. 启动zookeeper

Kafka依赖zookeeper, 首先安装zookeeper
-p:设置映射端口(默认2181

  1. docker run --name zookeeper \
  2. --network app-tier \
  3. -e ALLOW_ANONYMOUS_LOGIN=yes \
  4. --restart=always \
  5. -d bitnami/zookeeper:latest

2. 启动kafka

  1. docker run --name kafka \
  2. --network app-tier \
  3. -p 9092:9092 \
  4. -e ALLOW_PLAINTEXT_LISTENER=yes \
  5. -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
  6. -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  7. --restart=always \
  8. -d bitnami/kafka:latest





















命令 解释
ALLOW_PLAINTEXT_LISTENER=yes 任何人可以访问
KAFKA_CFG_ZOOKEEPER_CONNECT zookeeper地址
KAFKA_CFG_ADVERTISED_LISTENERS 当前kafka安装的主机地址 如果是服务器部署则配服务器IP或域名否则客户端监听消息会报地址错误

2. 启动kafka-map管理工具

  1. docker run --name kafka-map \
  2. --network app-tier \
  3. -p 9001:8080 \
  4. -v /usr/local/kafka-map/data:/usr/local/kafka-map/data \
  5. -e DEFAULT_USERNAME=admin \
  6. -e DEFAULT_PASSWORD=admin \
  7. --restart=always \
  8. -d dushixiang/kafka-map:latest

启动成功后, 访问客户端: http://localhost:9001
账户: admin
密码: admin

在这里插入图片描述

3. springboot集成kafka

pom.xml配置

  1. <dependencies>
  2. <!--kafka依赖-->
  3. <dependency>
  4. <groupId>org.springframework.kafka</groupId>
  5. <artifactId>spring-kafka</artifactId>
  6. </dependency>
  7. </dependencies>

配置application.yml

  1. #------------------------------------spring----------------------------------
  2. spring:
  3. #------------------------------------消息队列kafka配置----------------------------------
  4. kafka:
  5. # kafka server的地址,如果有多个,使用逗号分割
  6. bootstrap-servers: localhost:9092
  7. producer:
  8. # 发生错误后,消息重发的次数。
  9. retries: 1
  10. #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
  11. batch-size: 16384
  12. # 设置生产者内存缓冲区的大小。32MB的批处理缓冲区
  13. buffer-memory: 33554432
  14. # 键的序列化方式
  15. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  16. # 值的序列化方式
  17. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  18. # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
  19. # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
  20. # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
  21. acks: 1
  22. properties:
  23. # 自定义拦截器
  24. interceptor.classes: com.wms.message.kafka.interceptor.CustomProducerInterceptor
  25. #自定义分区器
  26. partitioner.classes: com.wms.message.kafka.interceptor.CustomPartitioner
  27. consumer:
  28. # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
  29. auto-commit-interval: 1S
  30. # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
  31. # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
  32. # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
  33. auto-offset-reset: earliest
  34. # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
  35. enable-auto-commit: false
  36. # 键的反序列化方式
  37. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  38. # 值的反序列化方式
  39. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  40. properties:
  41. # 自定义消费者拦截器
  42. interceptor.classes: com.wms.message.kafka.interceptor.CustomConsumerInterceptor
  43. # 默认消费者组
  44. group-id: code-safe-group
  45. # 设置最大轮询间隔时间(毫秒),默认值为 300000(5分钟)
  46. # 如果两次 poll() 之间的时间超过此配置值,可能导致 rebalance, 消费者会被剔除 此处设置10分钟
  47. max-poll-interval-ms: 600000
  48. # 批量一次最大拉取数据量
  49. max-poll-records: 1000
  50. batch:
  51. # 批消费并发量,小于或等于Topic的分区数
  52. concurrency: 3
  53. listener:
  54. # 在侦听器容器中运行的线程数。
  55. concurrency: 5
  56. #listner负责ack,每调用一次,就立即commit
  57. ack-mode: manual_immediate
  58. missing-topics-fatal: false
  59. topics:
  60. # 自定义主题名称
  61. twsm: webSocket_send_message_dev
  62. group-id: group-id
  63. topic-name:
  64. - topic1

测试发送消息到kafka

  1. /**
  2. * Kafka测试
  3. *
  4. * @version 1.0
  5. * @author: web
  6. * @date: 2024/1/18 15:07
  7. */
  8. @Slf4j
  9. @RestController
  10. @RequestMapping("/message/kafkaTest")
  11. public class KafkaTestController extends BaseController
  12. {
  13. @Autowired
  14. private KafkaUtils kafkaUtils;
  15. /**
  16. * 生产者_推送消息到kafka
  17. *
  18. * @param msg
  19. * @author: web
  20. * @return: AjaxResult
  21. * @date: 2024/1/18 15:16
  22. */
  23. @PostMapping("/send")
  24. public AjaxResult send(@RequestBody Map<String, Object> msg)
  25. {
  26. try
  27. {
  28. String userId = msg.get("userId").toString();
  29. Object content = msg.get("content");
  30. Message message = kafkaUtils.setMessage(userId, content);
  31. kafkaUtils.send(KafkaUtils.TOPIC_TEST, message);
  32. }
  33. catch (Exception e)
  34. {
  35. log.error("生产者_推送消息到kafka发生异常");
  36. }
  37. return success();
  38. }
  39. /**
  40. * 消费者1
  41. *
  42. * @param record
  43. * @param ack
  44. * @param topic
  45. * @author: web
  46. * @return: void
  47. * @date: 2024/1/18 15:07
  48. */
  49. @KafkaListener(topics = KafkaUtils.TOPIC_TEST)
  50. public void topicTest1(ConsumerRecord<?, ?> record, Acknowledgment ack,
  51. @Header(KafkaHeaders.RECEIVED_TOPIC) String topic)
  52. {
  53. Optional message = Optional.ofNullable(record.value());
  54. if (message.isPresent())
  55. {
  56. Object msg = message.get();
  57. log.info("topic.group1 消费了: Topic:" + topic + ",Message:" + msg);
  58. ack.acknowledge();
  59. }
  60. }
  61. /**
  62. * 消费者2
  63. *
  64. * @param record
  65. * @param ack
  66. * @param topic
  67. * @author: web
  68. * @return: void
  69. * @date: 2024/1/18 15:07
  70. */
  71. // @KafkaListener(topics = KafkaUtils.TOPIC_TEST, groupId = KafkaUtils.TOPIC_GROUP2)
  72. // public void topicTest2(ConsumerRecord<?, ?> record, Acknowledgment ack,
  73. // @Header(KafkaHeaders.RECEIVED_TOPIC) String topic)
  74. // {
  75. //
  76. // Optional message = Optional.ofNullable(record.value());
  77. // if (message.isPresent())
  78. // {
  79. // Object msg = message.get();
  80. // log.info("topic.group2 消费了: Topic:" + topic + ",Message:" + msg);
  81. // ack.acknowledge();
  82. // }
  83. // }
  84. }

KafkaUtils类

  1. /**
  2. * 生产者
  3. *
  4. * @version: 1.0
  5. * @author: web
  6. * @date: 2024/1/18 10:37
  7. */
  8. @Component
  9. @Slf4j
  10. public class KafkaUtils
  11. {
  12. @Resource
  13. private KafkaTemplate<String, Object> kafkaTemplate;
  14. /**
  15. * 自定义topic
  16. */
  17. public static final String TOPIC_TEST = "topic.code-safe";
  18. /**
  19. * 自定义消费组
  20. */
  21. public static final String TOPIC_GROUP1 = "topic.group1";
  22. public static final String TOPIC_GROUP2 = "topic.group2";
  23. // 业务相关topic
  24. /**
  25. * 主题: webSocket发送消息到客户端
  26. */
  27. public static String TOPIC_WEBSOCKET_SEND_MESSAGE;
  28. @Autowired
  29. private String[] kafkaTopicName;
  30. /**
  31. * 获取配置文件中的盐值,并设置到静态变量中
  32. *
  33. * @param topic 主题
  34. */
  35. @Value("${spring.kafka.topics.twsm}")
  36. private void setTwsmTopic(String topic)
  37. {
  38. TOPIC_WEBSOCKET_SEND_MESSAGE = topic;
  39. }
  40. /**
  41. * 发送消息
  42. *
  43. * @param topic 主题
  44. * @param message 消息内容
  45. * @author: web
  46. * @return: void
  47. * @date: 2024/1/18 10:42
  48. */
  49. public void send(String topic, Object message)
  50. {
  51. if (StringUtils.isEmpty(topic) || StringUtils.isNull(message))
  52. {
  53. throw new ServiceException("生产者发送消息到kafka_主题或消息内容不能为空!");
  54. }
  55. String obj2String = JsonUtils.toJsonString(message);
  56. // log.info("准备发送消息为:{}", obj2String);
  57. //发送消息
  58. ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, obj2String);
  59. // 监听回调
  60. future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>()
  61. {
  62. @Override
  63. public void onFailure(Throwable throwable)
  64. {
  65. //发送失败的处理
  66. log.error(topic + " - 生产者 发送消息失败:" + throwable.getMessage());
  67. }
  68. @Override
  69. public void onSuccess(SendResult<String, Object> stringObjectSendResult)
  70. {
  71. //成功的处理
  72. // log.info(topic + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
  73. }
  74. });
  75. }
  76. /**
  77. * 设置websocket发送的消息体
  78. *
  79. * @param userId 用户ID
  80. * @param msg 消息内容
  81. * @author: web
  82. * @return: Message 消息对象
  83. * @date: 2024/1/19 11:36
  84. */
  85. public Message setMessage(String userId, Object msg)
  86. {
  87. Message message = new Message();
  88. message.setSendUserId(userId);
  89. message.setSendTime(DateUtils.getTime());
  90. message.setSendContent(String.valueOf(msg));
  91. return message;
  92. }
  93. }

Message类

  1. @Data
  2. public class Message implements Serializable
  3. {
  4. private static final long serialVersionUID = -118L;
  5. /**
  6. * 发送人ID
  7. */
  8. private String sendUserId;
  9. /**
  10. * 发送人
  11. */
  12. // private String sendUserName;
  13. /**
  14. * 发送时间
  15. */
  16. private String sendTime;
  17. /**
  18. * 发送内容
  19. */
  20. private String sendContent;
  21. }

监听消息

  1. /**
  2. * 消息接收监听器【分布式系统】
  3. *
  4. * @version: 1.0
  5. * @author: web
  6. * @date: 2024/1/19 13:44
  7. */
  8. @Component
  9. @Slf4j
  10. public class MessageListener
  11. {
  12. /**
  13. * 根据用户id发送消息到客户端
  14. *
  15. * @param record
  16. * @param ack
  17. * @param topic
  18. * @author: web
  19. * @return: void
  20. * @date: 2024/1/20 22:05
  21. */
  22. @KafkaListener(topics = "#{'${spring.kafka.topics.twsm}'}", groupId = "#{topicGroupId}")
  23. public void sendMessageByUserId(ConsumerRecord<String, String> record, Acknowledgment ack,
  24. @Header(KafkaHeaders.RECEIVED_TOPIC) String topic)
  25. {
  26. Optional<String> optional = Optional.ofNullable(record.value());
  27. if (optional.isPresent())
  28. {
  29. Message message = JsonUtils.parseObject(optional.get(), Message.class);
  30. if (StringUtils.isNull(message))
  31. {
  32. log.error("消费者收到kafka消息的内容为空!");
  33. return;
  34. }
  35. // log.info("消费者收到kafka消息");
  36. String sendUserId = message.getSendUserId();
  37. String sendContent = message.getSendContent();
  38. // 确认收到消息
  39. ack.acknowledge();
  40. }
  41. }
  42. }

发表评论

表情:
评论列表 (有 0 条评论,169人围观)

还没有评论,来说两句吧...

相关阅读