RabbitMQ从入门到实战篇一:Hello RabbitMQ
一、pom文件的依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
二、创建一个生产者类:MessageSender
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class MessageSender {
private Logger logger = LoggerFactory.getLogger(MessageSender.class);
//第一个参数是对列名、第二个是发送的消息内容
public boolean sendMessage(String queueName,String message){
//new一个RabbitMQ的连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置需要连接的RabbitMQ地址,这里指向本机
factory.setHost("127.0.0.1");
Connection connection = null;
Channel channel = null;
try {
//尝试获取一个连接
connection = factory.newConnection();
//尝试创建一个channel
channel = connection.createChannel();
//参数详解
//第一个参数:队列的名字
//第二个参数:如果是持久队列则为true(队列将在服务器重启时存活)
//第三个参数:是否是排他性队列(别人看不到),只对当前连接有效,当前连接断开后,队列删除(设置了持久化也删除)
//第四个参数:自动删除,在最后一个连接断开后删除队列
//第五个参数:其他参数(后期用到再研究)
channel.queueDeclare(queueName, false, false, false, null);
//注意这里调用了getBytes(),发送的其实是byte数组,接收方收到消息后,需要重新组装成String
channel.basicPublish("", queueName, null, message.getBytes());
logger.info("生产者发送消息 '" + message + "'");
//关闭channel和连接
channel.close();
connection.close();
} catch (Exception e) {
//失败后记录日志,返回false,代表发送失败
logger.error("生产者发送消息失败",e);
return false;
}
return true;
}
}
三、创建消费者类:MessageConsumer
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class MessageConsumer {
private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
//第一个参数:要消费的队列名字,第二个参数:消费者名字(自己定义用于区分)
public boolean consume(String queueName,String consumerName){
//连接RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
//这里声明queue是为了取消息的时候,queue肯定会存在
//注意,queueDeclare是幂等的,也就是说,消费者和生产者,不论谁先声明,都只会有一个queue
channel.queueDeclare(queueName, false, false, false, null);
//这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
logger.info("消费者接受到的消息为: '" + message + "'");
}
};
//上面是声明消费者,这里用声明的消费者消费掉队列中的消息
channel.basicConsume(queueName, true, consumer);
//这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费
} catch (Exception e) {
//失败后记录日志,返回false,代表消费失败
logger.error("消费消息失败",e);
return false;
}
return true;
}
}
四、创建测试类
public class TestRabbitMQ {
public static void main( String[] args ){
//创建生产者生产信息
MessageSender sender = new MessageSender();
sender.sendMessage("hello","hello RabbitMQ!");
//创建消费者消费信息
MessageConsumer consumer = new MessageConsumer();
consumer.consume("hello","消费者一");
}
}
五、对测试类进行测试
1.生产消息
生产消息后,在管理平台进行查看,可看到一个名字为“hello”的队列,说明我们生产者创建队列成功
然后点击队列名字,查看消息内容
上述表明生产者发送消息成功,队列初始化建立成功
2.消费者消费信息
上图中的队列消息变为0,说明已被消耗掉
3.上述两个步骤就初步成功使用了RabbitMQ
4.RabbitMQ的管理界面还不了解,所以先不说
转载地址:点击送达
还没有评论,来说两句吧...