消息队列 : spring-boot 集成 rabbitmq
介绍:
RabbitMQ是基于Erlang语言编写的开源消息队列,通过Erlang的Actor模型实现了数据的稳定可靠传输。本身是实现AMQP的消息队列,因此官方推荐,如果仅仅是使用RabbitMQ的话,建议使用AMQP 0-9-1的协议。不过,因为其可扩展性,可以通过插件的形式使用STOMP、XMPP、AMQP 1.0,还可以通过插件使用HTTP这种非消息的传输协议。所以,RabbitMQ可以说是适应性非常强的一个消息队列中间件了。
当然,不仅是协议支持的多,还因为它实现了代理(Broker)架构,意味着消息在发送到客户端之前可以在中央节点上排队。此特性使得RabbitMQ易于使用和部署,适宜于很多场景如路由、负载均衡或消息持久化等,用消息队列只需几行代码即可搞定。但是,这使得它的可扩展性差,速度较慢,因为中央节点增加了延迟,消息封装后也比较大,如需配置RabbitMQ则需要在目标机器上安装Erlang环境。
总的来说,RabbitMQ在数据一致性、稳定性和可靠性方面比较优秀,而且直接或间接的支持多种协议,对多种语言支持良好。但是其性能和吞吐量差强人意,由于Erlang语言本身的限制,二次开发成本较高
代码部分:
1、增加rabbitmq的依赖包
<!-- ampq 依赖包 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、application.xml文件中配置
#RabbitMQ配置信息
#spring.application.name=spirng-boot-rabbitmq
spring.rabbitmq.host=39.108.***.****
spring.rabbitmq.port=*****
spring.rabbitmq.username=****
spring.rabbitmq.password=******
spring.rabbitmq.virtual-host=/hotcomm_manager
spring.rabbitmq.queue=hotcomm.manager.qingdao
3、RabbitMq的工厂连接和模板创建
@Configuration
public class RabbitMQConfig {
/**
* 注入配置文件属性
*/
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private Integer port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualhost;
@Value("${spring.rabbitmq.queue}")
private String queue;
@Autowired
SocketService socketService;
@Autowired
EventService eventService;
@Autowired
TMessageLogService MessageLogService;
@Autowired
AppPushService appPushService;
@Autowired
AppPushMsgMapper appPushMsgMapper;
/**
* 创建 ConnectionFactory
*
* @return
* @throws Exception
*/
@Bean
public ConnectionFactory creatConnectionFactory() throws Exception {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost(host);
factory.setVirtualHost(virtualhost);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
return factory;
}
//rabbitmq的模板配置
@Bean
public RabbitTemplate receiveRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);
return firstRabbitTemplate;
}
//消费者
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(new Queue[] { new Queue(queue) }); //设置监听的队列
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(5);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
container.setMessageListener(new Receiver());
return container;
}
class Receiver implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
byte[] body = message.getBody();
String ss = new String(body);
System.out.println(ss);
publishMessage(ss);
}
}
/**
* 解析message信息,确定设备类型,查询指定表,确定责任人,发送信息(测试)
*
* @param message
*/
public void publishMessage(String message) throws MyException {
TMessageLog tMessageLog = new TMessageLog();
Gson gson = new Gson();
T_dev_alarm dev_alarm = gson.fromJson(message, T_dev_alarm.class);
// 查询当前设备的责任人(安装位置等)
AlarmDev dev = eventService.selectDevForUser(dev_alarm.getDeviceid(), dev_alarm.getModuleid());
dev.setAlarmtime(ConverUtil.dateForString(new Date()));
String json = JSONUtil.toJson(dev);
tMessageLog.setMessage(json);
tMessageLog.setReceiverid(dev.getOwnid());
tMessageLog.setSendtime(ConverUtil.dateForString(new Date()));
tMessageLog.setUserid(0);
// String s="{\"systate\":,\"message\":"+tMessageLog+"}";
// 推送指定用户
// socketService.sendMessageToOne(tMessageLog);
// 广播
socketService.sendMessageToAll(tMessageLog);
if (dev.getOwnid() != null && !dev.getOwnid().equals(null) && !dev.getOwnid().equals("")) {
String[] uid = dev.getOwnid().split(",");
List<String> list = new ArrayList<>();
for (String u : uid) {
if (!list.contains(u)) {
list.add(u);
}
}
for (int i = 0; i < list.size(); i++) {
// 查出用户regid
T_hk_apppush t_hk_apppush = appPushService.selectRegid(Integer.valueOf(list.get(i)));
int code = 0;
if (t_hk_apppush != null) {
// 向单个用户推送报警消息
dev.setId(dev_alarm.getId());
code = PushUtil.sendAllsetNotification("报警消息", "设备:" + dev.getDevnum() + "发生报警,请及时查看处理",
JSONUtil.toJson(ApiResult.resultInfo("0", "报警", dev)), t_hk_apppush.getRegid(), 86400);
}
if (code == 0 || code == 201) {// 201推送失败
// 推送失败的时候,把推送信息存进数据库,等下次登录的时候从数据库取出推送
T_hk_apppush_msg t_hk_apppush_msg = new T_hk_apppush_msg();// 推送消息储存表
t_hk_apppush_msg.setTitle("报警消息");
t_hk_apppush_msg.setContent("设备:" + dev.getDevnum() + "发生报警,请及时查看处理");
t_hk_apppush_msg.setMessage(JSONUtil.toJson(ApiResult.resultInfo("0", "报警", dev)));
t_hk_apppush_msg.setRegids("0");
t_hk_apppush_msg.setTimeToLive(String.valueOf(86400));
t_hk_apppush_msg.setUserid(Integer.valueOf(list.get(i)));
// 插入推送消息数据库
appPushMsgMapper.insertSelective(t_hk_apppush_msg);
}
}
}
}
}
还没有评论,来说两句吧...