RabbitMQ -springboot整合rabbitmq
课程:https://www.cnblogs.com/guchunchao/p/13173406.html
阅读目录
- 配置生产者
- 配置消费者
导入pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.everjiankang</groupId>
<artifactId>rabbitmqDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmqDemo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 导入配置文件处理器,配置文件进行绑定就会有提示 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.71</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
回到顶部
配置生产者
第一步:配置application.properties
spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=xiaochao
spring.rabbitmq.password=root
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
#生产端配置
#开启发送确认,此配置在Springboot2.3.0版本中已经@Deprecated了,默认就是
# spring.rabbitmq.publisher-confirms=true
#
spring.rabbitmq.publisher-confirm-type=simple
#开启发送失败退回
spring.rabbitmq.publisher-returns=true
#开启执行return回调
spring.rabbitmq.template.mandatory=true
第二步:开发启动类Application.java
package com.dwz.springboot;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
第三步:加入实体类order
package com.everjiankang.dependency.model;
import java.io.Serializable;
public class Order implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private String id;
private String name;
public Order() {
}
public Order(String id, String name) {
super();
this.id = id;
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
第四步:开发消息发送方法
(需要先手动创建交换器、队列、绑定关系。或者通过代码Build)
package com.everjiankang.dependency.springboot.producer;
import java.util.Map;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import com.everjiankang.dependency.model.Order;
@Component
public class RabbitmqSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* publisher-confirms,实现一个监听器用于监听broker端给我们返回的确认请求:
* RabbitTemplate.ConfirmCallback
*
* publisher-returns,保证消息对broker端是可达的,如果出现路由键不可达的情况,
* 则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功:
* RabbitTemplate.ReturnCallback
*
* 注意:在发送消息的时候对template配置mandatory=true保证ReturnCallback监听有效
* 生产端还可以配置其他属性,比如发送重试,超时时间、次数、间隔等。
*/
final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("confirm correlationData:" + correlationData);
System.err.println("confirm ack:" + ack);
if(!ack) {
System.err.println("异常处理。。。");
}
}
};
final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.err.println("return exchange:" + exchange);
System.err.println("return routingKey:" + routingKey);
System.err.println("return replyCode:" + replyCode);
System.err.println("return replyText:" + replyText);
}
};
public void send(Object message, Map<String, Object> properties) throws Exception {
MessageHeaders mhs = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, mhs);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
CorrelationData correlationData = new CorrelationData();
//实际消息的唯一id
correlationData.setId("dwz123456");//id + 时间戳 (必须是全局唯一的)
rabbitTemplate.convertAndSend("exchange-1", "spring.abc", msg, correlationData);
}
public void sendOrder(Order order) throws Exception {
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
CorrelationData correlationData = new CorrelationData();
correlationData.setId("zheaven123456");//id + 时间戳 (必须是全局唯一的)
rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
}
}
第五步:测试发送消息
package com.dwz.springboot;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.dwz.springboot.entity.Order;
import com.dwz.springboot.producer.RabbitSender;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootTest {
@Autowired
private RabbitSender send;
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
@Test
public void testSender1() throws Exception {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put("number", "12345");
properties.put("send_time", simpleDateFormat.format(new Date()));
send.send("Hello Rabbirmq For springboot!", properties);
}
@Test
public void testSender2() throws Exception {
Order order = new Order("001", "第一个订单");
send.sendOrder(order);
}
}
回到顶部
配置消费者
第一步:配置application.properties
spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=xiaochao
spring.rabbitmq.password=root
spring.rabbitmq.virtual-host=/vhost_dwz
spring.rabbitmq.connection-timeout=15000
#消费端配置
#配置手工确认模式,用于ACK的手工处理,这样我们可以保证消息的可靠性送达,或者在消费端消费失败的时候可以做到重回队列、根据业务记录日志等处理
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
#消费端binding配置 ,以订单为例
spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*
第二步:开发启动类Application.java
package com.dwz.springboot;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
第三步:添加主配置类
package com.dwz.springboot;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan({"com.dwz.springboot.*"})
public class MainConfig {
}
第四步:添加消息处理方法
package com.dwz.springboot.consumer;
import java.util.Map;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import com.dwz.springboot.entity.Order;
import com.rabbitmq.client.Channel;
@Component
public class RabbitReceiver {
//此配置可以直接创建Exchange、Queue、Binding
@RabbitListener(bindings = @QueueBinding(
value = @Queue(
name = "queue-1",
durable = "true"
),
exchange = @Exchange(
name = "exchange-1",
durable = "true",
type = "topic",
ignoreDeclarationExceptions = "true"
),
key = "springboot.*"
)
)
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("------------------------------------");
System.err.println("消费端Payload:" + message.getPayload());
Long deliverytag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliverytag, false);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(
name = "${spring.rabbitmq.listener.order.queue.name}",
durable = "${spring.rabbitmq.listener.order.queue.durable}"
),
exchange = @Exchange(
name = "${spring.rabbitmq.listener.order.exchange.name}",
durable = "${spring.rabbitmq.listener.order.exchange.durable}",
type = "${spring.rabbitmq.listener.order.exchange.type}",
ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"
),
key = "${spring.rabbitmq.listener.order.key}"
)
)
@RabbitHandler
//将Message用@Payload和@Headers拆分
//@Payload表示body里面的信息,@Headers表示properties的信息
public void onOrderMessage(@Payload Order order, Channel channel,
@Headers Map<String, Object> headers) throws Exception {
System.err.println("------------------------------------");
System.err.println("消费端Order:" + order.getId());
Long deliverytag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliverytag, false);
}
}
参考blog:
https://www.cnblogs.com/zheaven/p/11915480.html
参考视频:
https://www.bilibili.com/video/BV1Fz411B77A?p=34
还没有评论,来说两句吧...