golang:使用activeMQ[只有队列,没有主题]

我不是女神ヾ 2023-10-12 07:57 71阅读 0赞

文章目录

  • 入门连接
  • 使用一
  • 使用二

入门连接

  1. package main
  2. import (
  3. "crypto/md5"
  4. "encoding/hex"
  5. "fmt"
  6. "github.com/go-stomp/stomp"
  7. "net"
  8. "strconv"
  9. )
  10. func MapPoint(temp map[string]string){
  11. temp["cd"] = "gtfrdesxxcf"
  12. temp["ls"] = "11111111111111111111"
  13. }
  14. func ArrayPoint(temp *[]map[string]string){
  15. (*temp) = append((*temp), map[string]string{
  16. "vv":"aaa",
  17. "aa": "aaaaaaaaa",
  18. })
  19. }
  20. func EncodeMD5(value string)string{
  21. m := md5.New()
  22. m.Write([]byte(value))
  23. return hex.EncodeToString(m.Sum(nil))
  24. }
  25. func connAcitiveMq(host, port string)(stompConn *stomp.Conn){
  26. fmt.Println( net.JoinHostPort(host, port))
  27. stompConn, err := stomp.Dial("tcp", net.JoinHostPort(host, port))
  28. if err != nil{
  29. fmt.Println(" 连接失败"+err.Error())
  30. }else{
  31. fmt.Println("连接成功")
  32. }
  33. return stompConn
  34. }
  35. // 将消息发送到ActiveMQ中
  36. func activeMqProducer(c chan string, queue string, conn *stomp.Conn){
  37. for{
  38. err := conn.Send(queue, "text/plan", []byte(<-c))
  39. fmt.Println("send active mq..." + queue)
  40. if err != nil {
  41. fmt.Println("active mq message send erorr: " + err.Error())
  42. }
  43. }
  44. }
  45. func main() {
  46. activeMq :=connAcitiveMq("127.0.0.1", "61613")
  47. defer activeMq.Disconnect()
  48. c := make(chan string)
  49. go activeMqProducer(c, "", activeMq)
  50. for i := 0; i < 10; i ++{
  51. // 发送1万个消息
  52. c <- "hello world" + strconv.Itoa(i)
  53. }
  54. }

参考:https://www.cnblogs.com/vincenshen/p/10804675.html

使用一

  • 生产者

    package main

    import (

    1. "fmt"
    2. "github.com/go-stomp/stomp"
    3. "time"

    )

    /*

    • 生产者
      */
  1. func main(){
  2. // 调用Dial方法,第一个参数是"tcp",第二个参数则是ip:port
  3. // 返回conn(连接)和err(错误)
  4. conn, err := stomp.Dial("tcp", "127.0.0.1:61613")
  5. if err!=nil{
  6. fmt.Println("err =", err)
  7. return
  8. }
  9. //发送十条数据
  10. for i:=0; i<10; i++{
  11. // 调用conn下的send方法,接收三个参数
  12. //参数一:队列的名字
  13. //参数二:数据类型,一般是文本类型,直接写text/plain即可
  14. //参数三:内容,记住要转化成byte数组的格式
  15. err := conn.Send("testQ", "text/plain",[]byte(fmt.Sprintf("message:%d", i)))
  16. if err!=nil{
  17. fmt.Println("err =", err)
  18. }
  19. }
  20. /*
  21. 这里为什么要sleep一下,那就是conn.Send这个过程是不阻塞的
  22. 相当于Send把数据放到了一个channel里面
  23. 另一个goroutine从channel里面去取数据再放到消息队列里面
  24. 但是还没等到另一个goroutine放入数据,此时循环已经结束了
  25. 因此最好要sleep一下,根据测试,如果不sleep,那么发送1000条数据,
  26. 最终进入队列的大概是980条数据,这说明了什么
  27. 说明了当程序把1000条数据放到channel里面的时候,另一个goroutine只往队列里面放了980条
  28. 剩余的20条还没有来得及放入,程序就结束了
  29. */
  30. time.Sleep(time.Second * 1)
  31. }
  • 消费者

    package main

    import (

    1. "fmt"
    2. "github.com/go-stomp/stomp"
    3. "time"

    )

    func recv_data(ch chan *stomp.Message){

    1. //不断地循环,从channel里面获取数据
    2. for {
    3. v := <-ch
    4. //这里是打印当然还可以做其他的操作
    5. fmt.Println(string(v.Body))
    6. }

    }

  1. func main(){
  2. //创建一个channel,存放的是*stomp.Message类型
  3. ch := make(chan *stomp.Message)
  4. //将管道传入函数中
  5. go recv_data(ch)
  6. //和生产者一样,调用Dial方法,返回conn和err
  7. conn, err := stomp.Dial("tcp", "127.0.0.1:61613")
  8. if err != nil {
  9. fmt.Println("err =", err)
  10. }
  11. //消费者订阅这个队列
  12. //参数一:队列名
  13. //参数二:确认信息,直接填默认地即可
  14. sub, err := conn.Subscribe(
  15. "testQ", stomp.AckMode(stomp.AckAuto))
  16. for {
  17. //无限循环
  18. select {
  19. //sub.C是一个channel,如果订阅的队列有数据就读取
  20. case v := <-sub.C:
  21. //读取的数据是一个*stomp.Message类型
  22. ch <- v
  23. //如果2min还没有人发数据的话,就结束
  24. case <-time.After(time.Minute * 2):
  25. return
  26. }
  27. }
  28. }

浏览器访问:http://localhost:8161/admin/queues.jsp
可以看到有消息有订阅
在这里插入图片描述

使用二

  • 生产者

    package main

    import stomp “github.com/go-stomp/stomp”
    import “fmt”

    //Connect to ActiveMQ and produce messages
    func main() {

    1. conn, err := stomp.Dial("tcp", "localhost:61613")
    2. if err != nil {
    3. fmt.Println(err)
    4. }
    5. c := make(chan string)
    6. quit := make(chan string)
    7. go Producer(c, quit, conn)
    8. for {
    9. fmt.Println(<-c)
    10. }
    11. quit<-"read"

    }

    func Producer(c, quit chan string, conn *stomp.Conn) {

    1. for {
    2. select {
    3. case c <- "msg sent":
    4. err := conn.Send(
    5. "/queue/test-1", // destination
    6. "text/plain", // content-type
    7. []byte("Test message #1")) // body
    8. if err != nil {
    9. fmt.Println(err)
    10. return;
    11. }
    12. case <-quit:
    13. fmt.Println("finish")
    14. return;
    15. }
    16. }

    }

  • 消费者

    package main

    import stomp “github.com/go-stomp/stomp”
    import “fmt”

    //Connect to ActiveMQ and listen for messages
    func main() {

    1. conn, err := stomp.Dial("tcp", "localhost:61613")
    2. if err != nil {
    3. fmt.Println(err)
    4. }
    5. sub, err := conn.Subscribe("/queue/test-1", stomp.AckAuto)
    6. if err != nil {
    7. fmt.Println(err)
    8. }
    9. for {
    10. msg := <-sub.C
    11. fmt.Println(msg)
    12. }
    13. err = sub.Unsubscribe()
    14. if err != nil {
    15. fmt.Println(err)
    16. }
    17. defer conn.Disconnect()

    }

https://blog.csdn.net/qq\_30505673/article/details/84945554

发表评论

表情:
评论列表 (有 0 条评论,71人围观)

还没有评论,来说两句吧...

相关阅读