golang:使用activeMQ[只有队列,没有主题]
文章目录
- 入门连接
- 使用一
- 使用二
入门连接
package main
import (
"crypto/md5"
"encoding/hex"
"fmt"
"github.com/go-stomp/stomp"
"net"
"strconv"
)
func MapPoint(temp map[string]string){
temp["cd"] = "gtfrdesxxcf"
temp["ls"] = "11111111111111111111"
}
func ArrayPoint(temp *[]map[string]string){
(*temp) = append((*temp), map[string]string{
"vv":"aaa",
"aa": "aaaaaaaaa",
})
}
func EncodeMD5(value string)string{
m := md5.New()
m.Write([]byte(value))
return hex.EncodeToString(m.Sum(nil))
}
func connAcitiveMq(host, port string)(stompConn *stomp.Conn){
fmt.Println( net.JoinHostPort(host, port))
stompConn, err := stomp.Dial("tcp", net.JoinHostPort(host, port))
if err != nil{
fmt.Println(" 连接失败"+err.Error())
}else{
fmt.Println("连接成功")
}
return stompConn
}
// 将消息发送到ActiveMQ中
func activeMqProducer(c chan string, queue string, conn *stomp.Conn){
for{
err := conn.Send(queue, "text/plan", []byte(<-c))
fmt.Println("send active mq..." + queue)
if err != nil {
fmt.Println("active mq message send erorr: " + err.Error())
}
}
}
func main() {
activeMq :=connAcitiveMq("127.0.0.1", "61613")
defer activeMq.Disconnect()
c := make(chan string)
go activeMqProducer(c, "", activeMq)
for i := 0; i < 10; i ++{
// 发送1万个消息
c <- "hello world" + strconv.Itoa(i)
}
}
参考:https://www.cnblogs.com/vincenshen/p/10804675.html
使用一
生产者
package main
import (
"fmt"
"github.com/go-stomp/stomp"
"time"
)
/*
- 生产者
*/
func main(){
// 调用Dial方法,第一个参数是"tcp",第二个参数则是ip:port
// 返回conn(连接)和err(错误)
conn, err := stomp.Dial("tcp", "127.0.0.1:61613")
if err!=nil{
fmt.Println("err =", err)
return
}
//发送十条数据
for i:=0; i<10; i++{
// 调用conn下的send方法,接收三个参数
//参数一:队列的名字
//参数二:数据类型,一般是文本类型,直接写text/plain即可
//参数三:内容,记住要转化成byte数组的格式
err := conn.Send("testQ", "text/plain",[]byte(fmt.Sprintf("message:%d", i)))
if err!=nil{
fmt.Println("err =", err)
}
}
/*
这里为什么要sleep一下,那就是conn.Send这个过程是不阻塞的
相当于Send把数据放到了一个channel里面
另一个goroutine从channel里面去取数据再放到消息队列里面
但是还没等到另一个goroutine放入数据,此时循环已经结束了
因此最好要sleep一下,根据测试,如果不sleep,那么发送1000条数据,
最终进入队列的大概是980条数据,这说明了什么
说明了当程序把1000条数据放到channel里面的时候,另一个goroutine只往队列里面放了980条
剩余的20条还没有来得及放入,程序就结束了
*/
time.Sleep(time.Second * 1)
}
消费者
package main
import (
"fmt"
"github.com/go-stomp/stomp"
"time"
)
func recv_data(ch chan *stomp.Message){
//不断地循环,从channel里面获取数据
for {
v := <-ch
//这里是打印当然还可以做其他的操作
fmt.Println(string(v.Body))
}
}
func main(){
//创建一个channel,存放的是*stomp.Message类型
ch := make(chan *stomp.Message)
//将管道传入函数中
go recv_data(ch)
//和生产者一样,调用Dial方法,返回conn和err
conn, err := stomp.Dial("tcp", "127.0.0.1:61613")
if err != nil {
fmt.Println("err =", err)
}
//消费者订阅这个队列
//参数一:队列名
//参数二:确认信息,直接填默认地即可
sub, err := conn.Subscribe(
"testQ", stomp.AckMode(stomp.AckAuto))
for {
//无限循环
select {
//sub.C是一个channel,如果订阅的队列有数据就读取
case v := <-sub.C:
//读取的数据是一个*stomp.Message类型
ch <- v
//如果2min还没有人发数据的话,就结束
case <-time.After(time.Minute * 2):
return
}
}
}
浏览器访问:http://localhost:8161/admin/queues.jsp
可以看到有消息有订阅
使用二
生产者
package main
import stomp “github.com/go-stomp/stomp”
import “fmt”//Connect to ActiveMQ and produce messages
func main() {conn, err := stomp.Dial("tcp", "localhost:61613")
if err != nil {
fmt.Println(err)
}
c := make(chan string)
quit := make(chan string)
go Producer(c, quit, conn)
for {
fmt.Println(<-c)
}
quit<-"read"
}
func Producer(c, quit chan string, conn *stomp.Conn) {
for {
select {
case c <- "msg sent":
err := conn.Send(
"/queue/test-1", // destination
"text/plain", // content-type
[]byte("Test message #1")) // body
if err != nil {
fmt.Println(err)
return;
}
case <-quit:
fmt.Println("finish")
return;
}
}
}
消费者
package main
import stomp “github.com/go-stomp/stomp”
import “fmt”//Connect to ActiveMQ and listen for messages
func main() {conn, err := stomp.Dial("tcp", "localhost:61613")
if err != nil {
fmt.Println(err)
}
sub, err := conn.Subscribe("/queue/test-1", stomp.AckAuto)
if err != nil {
fmt.Println(err)
}
for {
msg := <-sub.C
fmt.Println(msg)
}
err = sub.Unsubscribe()
if err != nil {
fmt.Println(err)
}
defer conn.Disconnect()
}
https://blog.csdn.net/qq\_30505673/article/details/84945554
还没有评论,来说两句吧...