centos安装RabbitMQ和spring cloud使用RebbitMQ发送和接收消息 一时失言乱红尘 2022-05-17 01:56 153阅读 0赞 # centos安装rabbitmq: # * 环境:centos7 * 使yum包含[EPEL][]库: rpm -Uvh [https://download.fedoraproject.org/pub/epel/epel-release-latest-6.noarch.rpm][https_download.fedoraproject.org_pub_epel_epel-release-latest-6.noarch.rpm] * 下载repo: wget -O /etc/yum.repos.d/epel-erlang.repo [http://repos.fedorapeople.org/repos/peter/erlang/epel-erlang.repo][http_repos.fedorapeople.org_repos_peter_erlang_epel-erlang.repo] * 安装erlang和RabbitMQ: yum install erlang rabbitmq-server * 防火墙打开端口15672和5672,防火墙配置:[iptables快速配置][iptables] * 打开rabbitMQ的management: cd /usr/lib/rabbitmq/ rabbitmq-plugins enable rabbitmq\_management * 启动: rabbitmq-server * 访问:http://:15672可看到rabbitMQ管理登陆页面 * 创建用户并授权: rabbitmqctl add\_user root root rabbitmqctl set\_user\_tags root administrator rabbitmqctl set\_permissions -p / root ‘.*’ '.*’ ‘.\*’ //后台启动 rabbitmq-server -detached # spring cloud使用rabbitMQ收发消息简单示例 # * 引入依赖:gradle compile(‘org.springframework.cloud:spring-cloud-starter-bus-amqp’) * 注入队列bean import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; @Bean public Queue helloQueue() { return new Queue("hello"); } * 发送消息示例: import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hello" +new Date(); System.out.println("sender:"+context); this.rabbitTemplate.convertAndSend("hello",context); } } * 接收消息示例: import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues="hello",containerFactory="rabbitListenerContainerFactory") public class Receiver { @RabbitHandler public void process(String hello) { System.out.println("receiver:"+hello); } } 注意:其中containerFactory="rabbitListenerContainerFactory"是为了解决消费消息时一直报异常:org.springframework.amqp.AmqpException: No method found for class \[B ,并且有时无限循环该异常。 参考文档:[https://docs.spring.io/spring-boot/docs/current-SNAPSHOT/reference/htmlsingle/\#boot-features-amqp][https_docs.spring.io_spring-boot_docs_current-SNAPSHOT_reference_htmlsingle_boot-features-amqp] 原因:文档原话:By default, if retries are disabled and the listener throws an exception, the delivery is retried indefinitely. You can modify this behavior in two ways: Set the defaultRequeueRejected property to false so that zero re-deliveries are attempted or throw an AmqpRejectAndDontRequeueException to signal the message should be rejected. The latter is the mechanism used when retries are enabled and the maximum number of delivery attempts is reached. 大致意思就是,重试(spring.rabbitmq.template.retry.enabled=false)在默认情况下是关闭的,如果没有打开重试,并且消费listener检测到了异常,就相当与当前消息并没有消费,就会一直尝试重试。而导致异常的根本原因就是@RabbitListener管道中消息内容转换出问题。所以解决办法就是手动给message添加Convrter,注册Bean: @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } * REST api: import java.util.HashMap; import java.util.Map; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import com.mysoft.rabbit.Sender; @RestController public class RabbitController { @Autowired private Sender sender; @GetMapping(value = "/send") public Map<String,Object> sendMessage(){ Map<String,Object> result = new HashMap<>(); sender.send(); result.put("result", true); return result; } } * 完整代码示例:[github][] [EPEL]: https://fedoraproject.org/wiki/EPEL/FAQ#howtouse [https_download.fedoraproject.org_pub_epel_epel-release-latest-6.noarch.rpm]: https://download.fedoraproject.org/pub/epel/epel-release-latest-6.noarch.rpm [http_repos.fedorapeople.org_repos_peter_erlang_epel-erlang.repo]: http://repos.fedorapeople.org/repos/peter/erlang/epel-erlang.repo [iptables]: https://blog.csdn.net/xcg132566/article/details/78797638 [https_docs.spring.io_spring-boot_docs_current-SNAPSHOT_reference_htmlsingle_boot-features-amqp]: https://docs.spring.io/spring-boot/docs/current-SNAPSHOT/reference/htmlsingle/#boot-features-amqp [github]: https://github.com/Xchunguang/spring-cloud-framework/tree/master/cloud-mysoft-config
还没有评论,来说两句吧...