NSQ分布式消息队列

悠悠 2023-02-20 03:14 85阅读 0赞

NSQ是目前比较流行的一个分布式的消息队列,本文主要介绍了NSQ及Go语言如何操作NSQ。

NSQ

NSQ介绍

NSQ是Go语言编写的一个开源的实时分布式内存消息队列,其性能十分优异。 NSQ的优势有以下优势:

  1. NSQ提倡分布式和分散的拓扑,没有单点故障,支持容错和高可用性,并提供可靠的消息交付保证
  2. NSQ支持横向扩展,没有任何集中式代理。
  3. NSQ易于配置和部署,并且内置了管理界面。

NSQ的应用场景

通常来说,消息队列都适用以下场景。

异步处理

参照下图利用消息队列把业务流程中的非关键流程异步化,从而显著降低业务请求的响应时间
在这里插入图片描述

应用解耦

通过使用消息队列将不同的业务逻辑解耦,降低系统间的耦合,提高系统的健壮性。后续有其他业务要使用订单数据可直接订阅消息队列,提高系统的灵活性。
在这里插入图片描述

流量削峰

类似秒杀(大秒)等场景下,某一时间可能会产生大量的请求,使用消息队列能够为后端处理请求提供一定的缓冲区,保证后端服务的稳定性。
在这里插入图片描述

安装

官方下载页面根据自己的平台下载并解压即可。

NSQ组件

nsqd

nsqd是一个守护进程,它接收、排队并向客户端发送消息。

启动nsqd,指定-broadcast-address=127.0.0.1来配置广播地址

  1. ./nsqd -broadcast-address=127.0.0.1

如果是在搭配nsqlookupd使用的模式下需要还指定nsqlookupd地址:

  1. ./nsqd -broadcast-address=127.0.0.1 -lookupd-tcp-address=127.0.0.1:4160

如果是部署了多个nsqlookupd节点的集群,那还可以指定多个-lookupd-tcp-address

nsqdq相关配置项如下:

  1. -auth-http-address value
  2. <addr>:<port> to query auth server (may be given multiple times)
  3. -broadcast-address string
  4. address that will be registered with lookupd (defaults to the OS hostname) (default "PROSNAKES.local")
  5. -config string
  6. path to config file
  7. -data-path string
  8. path to store disk-backed messages
  9. -deflate
  10. enable deflate feature negotiation (client compression) (default true)
  11. -e2e-processing-latency-percentile value
  12. message processing time percentiles (as float (0, 1.0]) to track (can be specified multiple times or comma separated '1.0,0.99,0.95', default none)
  13. -e2e-processing-latency-window-time duration
  14. calculate end to end latency quantiles for this duration of time (ie: 60s would only show quantile calculations from the past 60 seconds) (default 10m0s)
  15. -http-address string
  16. <addr>:<port> to listen on for HTTP clients (default "0.0.0.0:4151")
  17. -http-client-connect-timeout duration
  18. timeout for HTTP connect (default 2s)
  19. -http-client-request-timeout duration
  20. timeout for HTTP request (default 5s)
  21. -https-address string
  22. <addr>:<port> to listen on for HTTPS clients (default "0.0.0.0:4152")
  23. -log-prefix string
  24. log message prefix (default "[nsqd] ")
  25. -lookupd-tcp-address value
  26. lookupd TCP address (may be given multiple times)
  27. -max-body-size int
  28. maximum size of a single command body (default 5242880)
  29. -max-bytes-per-file int
  30. number of bytes per diskqueue file before rolling (default 104857600)
  31. -max-deflate-level int
  32. max deflate compression level a client can negotiate (> values == > nsqd CPU usage) (default 6)
  33. -max-heartbeat-interval duration
  34. maximum client configurable duration of time between client heartbeats (default 1m0s)
  35. -max-msg-size int
  36. maximum size of a single message in bytes (default 1048576)
  37. -max-msg-timeout duration
  38. maximum duration before a message will timeout (default 15m0s)
  39. -max-output-buffer-size int
  40. maximum client configurable size (in bytes) for a client output buffer (default 65536)
  41. -max-output-buffer-timeout duration
  42. maximum client configurable duration of time between flushing to a client (default 1s)
  43. -max-rdy-count int
  44. maximum RDY count for a client (default 2500)
  45. -max-req-timeout duration
  46. maximum requeuing timeout for a message (default 1h0m0s)
  47. -mem-queue-size int
  48. number of messages to keep in memory (per topic/channel) (default 10000)
  49. -msg-timeout string
  50. duration to wait before auto-requeing a message (default "1m0s")
  51. -node-id int
  52. unique part for message IDs, (int) in range [0,1024) (default is hash of hostname) (default 616)
  53. -snappy
  54. enable snappy feature negotiation (client compression) (default true)
  55. -statsd-address string
  56. UDP <addr>:<port> of a statsd daemon for pushing stats
  57. -statsd-interval string
  58. duration between pushing to statsd (default "1m0s")
  59. -statsd-mem-stats
  60. toggle sending memory and GC stats to statsd (default true)
  61. -statsd-prefix string
  62. prefix used for keys sent to statsd (%s for host replacement) (default "nsq.%s")
  63. -sync-every int
  64. number of messages per diskqueue fsync (default 2500)
  65. -sync-timeout duration
  66. duration of time per diskqueue fsync (default 2s)
  67. -tcp-address string
  68. <addr>:<port> to listen on for TCP clients (default "0.0.0.0:4150")
  69. -tls-cert string
  70. path to certificate file
  71. -tls-client-auth-policy string
  72. client certificate auth policy ('require' or 'require-verify')
  73. -tls-key string
  74. path to key file
  75. -tls-min-version value
  76. minimum SSL/TLS version acceptable ('ssl3.0', 'tls1.0', 'tls1.1', or 'tls1.2') (default 769)
  77. -tls-required
  78. require TLS for client connections (true, false, tcp-https)
  79. -tls-root-ca-file string
  80. path to certificate authority file
  81. -verbose
  82. enable verbose logging
  83. -version
  84. print version string
  85. -worker-id
  86. do NOT use this, use --node-id

nsqlookupd

nsqlookupd是维护所有nsqd状态、提供服务发现的守护进程。它能为消费者查找特定topic下的nsqd提供了运行时的自动发现服务。 它不维持持久状态,也不需要与任何其他nsqlookupd实例协调以满足查询。因此根据你系统的冗余要求尽可能多地部署nsqlookupd节点。它们小豪的资源很少,可以与其他服务共存。我们的建议是为每个数据中心运行至少3个集群。

nsqlookupd相关配置项如下:

  1. -broadcast-address string
  2. address of this lookupd node, (default to the OS hostname) (default "PROSNAKES.local")
  3. -config string
  4. path to config file
  5. -http-address string
  6. <addr>:<port> to listen on for HTTP clients (default "0.0.0.0:4161")
  7. -inactive-producer-timeout duration
  8. duration of time a producer will remain in the active list since its last ping (default 5m0s)
  9. -log-prefix string
  10. log message prefix (default "[nsqlookupd] ")
  11. -tcp-address string
  12. <addr>:<port> to listen on for TCP clients (default "0.0.0.0:4160")
  13. -tombstone-lifetime duration
  14. duration of time a producer will remain tombstoned if registration remains (default 45s)
  15. -verbose
  16. enable verbose logging
  17. -version
  18. print version string

nsqadmin

一个实时监控集群状态、执行各种管理任务的Web管理平台。 启动nsqadmin,指定nsqlookupd地址:

  1. ./nsqadmin -lookupd-http-address=127.0.0.1:4161

我们可以使用浏览器打开http://127.0.0.1:4171/访问如下管理界面。
在这里插入图片描述

nsqadmin相关的配置项如下:

  1. -allow-config-from-cidr string
  2. A CIDR from which to allow HTTP requests to the /config endpoint (default "127.0.0.1/8")
  3. -config string
  4. path to config file
  5. -graphite-url string
  6. graphite HTTP address
  7. -http-address string
  8. <addr>:<port> to listen on for HTTP clients (default "0.0.0.0:4171")
  9. -http-client-connect-timeout duration
  10. timeout for HTTP connect (default 2s)
  11. -http-client-request-timeout duration
  12. timeout for HTTP request (default 5s)
  13. -http-client-tls-cert string
  14. path to certificate file for the HTTP client
  15. -http-client-tls-insecure-skip-verify
  16. configure the HTTP client to skip verification of TLS certificates
  17. -http-client-tls-key string
  18. path to key file for the HTTP client
  19. -http-client-tls-root-ca-file string
  20. path to CA file for the HTTP client
  21. -log-prefix string
  22. log message prefix (default "[nsqadmin] ")
  23. -lookupd-http-address value
  24. lookupd HTTP address (may be given multiple times)
  25. -notification-http-endpoint string
  26. HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent
  27. -nsqd-http-address value
  28. nsqd HTTP address (may be given multiple times)
  29. -proxy-graphite
  30. proxy HTTP requests to graphite
  31. -statsd-counter-format string
  32. The counter stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string. (default "stats.counters.%s.count")
  33. -statsd-gauge-format string
  34. The gauge stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string. (default "stats.gauges.%s")
  35. -statsd-interval duration
  36. time interval nsqd is configured to push to statsd (must match nsqd) (default 1m0s)
  37. -statsd-prefix string
  38. prefix used for keys sent to statsd (%s for host replacement, must match nsqd) (default "nsq.%s")
  39. -version
  40. print version string

NSQ架构

NSQ工作模式

nsq架构设计

Topic和Channel

每个nsqd实例旨在一次处理多个数据流。这些数据流称为“topics”,一个topic具有1个或多个“channels”。每个channel都会收到topic所有消息的副本,实际上下游的服务是通过对应的channel来消费topic消息。

topicchannel不是预先配置的。topic在首次使用时创建,方法是将其发布到指定topic,或者订阅指定topic上的channelchannel是通过订阅指定的channel在第一次使用时创建的。

topicchannel都相互独立地缓冲数据,防止缓慢的消费者导致其他chennel的积压(同样适用于topic级别)。

channel可以并且通常会连接多个客户端。假设所有连接的客户端都处于准备接收消息的状态,则每条消息将被传递到随机客户端。例如:

nsq架构设计总而言之,消息是从topic -> channel(每个channel接收该topic的所有消息的副本)多播的,但是从channel -> consumers均匀分布(每个消费者接收该channel的一部分消息)。

NSQ接收和发送消息流程

nsq架构设计

NSQ特性

  • 消息默认不持久化,可以配置成持久化模式。nsq采用的方式时内存+硬盘的模式,当内存到达一定程度时就会将数据持久化到硬盘。

    • 如果将--mem-queue-size设置为0,所有的消息将会存储到磁盘。
    • 服务器重启时也会将当时在内存中的消息持久化。
  • 每条消息至少传递一次。
  • 消息不保证有序。

Go操作NSQ

官方提供了Go语言版的客户端:go-nsq,更多客户端支持请查看CLIENT LIBRARIES。

安装

  1. go get -u github.com/nsqio/go-nsq

生产者

一个简单的生产者示例代码如下:

  1. // nsq_producer/main.go
  2. package main
  3. import (
  4. "bufio"
  5. "fmt"
  6. "os"
  7. "strings"
  8. "github.com/nsqio/go-nsq"
  9. )
  10. // NSQ Producer Demo
  11. var producer *nsq.Producer
  12. // 初始化生产者
  13. func initProducer(str string) (err error) {
  14. config := nsq.NewConfig()
  15. producer, err = nsq.NewProducer(str, config)
  16. if err != nil {
  17. fmt.Printf("create producer failed, err:%v\n", err)
  18. return err
  19. }
  20. return nil
  21. }
  22. func main() {
  23. nsqAddress := "127.0.0.1:4150"
  24. err := initProducer(nsqAddress)
  25. if err != nil {
  26. fmt.Printf("init producer failed, err:%v\n", err)
  27. return
  28. }
  29. reader := bufio.NewReader(os.Stdin) // 从标准输入读取
  30. for {
  31. data, err := reader.ReadString('\n')
  32. if err != nil {
  33. fmt.Printf("read string from stdin failed, err:%v\n", err)
  34. continue
  35. }
  36. data = strings.TrimSpace(data)
  37. if strings.ToUpper(data) == "Q" { // 输入Q退出
  38. break
  39. }
  40. // 向 'topic_demo' publish 数据
  41. err = producer.Publish("topic_demo", []byte(data))
  42. if err != nil {
  43. fmt.Printf("publish msg to nsq failed, err:%v\n", err)
  44. continue
  45. }
  46. }
  47. }

将上面的代码编译执行,然后在终端输入两条数据123456

  1. $ ./nsq_producer
  2. 123
  3. 2018/10/22 18:41:20 INF 1 (127.0.0.1:4150) connecting to nsqd
  4. 456

使用浏览器打开http://127.0.0.1:4171/可以查看到类似下面的页面: 在下面这个页面能看到当前的topic信息:nsqadmin界面1

点击页面上的topic_demo就能进入一个展示更多详细信息的页面,在这个页面上我们可以查看和管理topic,同时能够看到目前在LWZMBP:4151 (127.0.01:4151)这个nsqd上有2条message。又因为没有消费者接入所以暂时没有创建channel
在这里插入图片描述

/nodes这个页面我们能够很方便的查看当前接入lookupdnsqd节点。
在这里插入图片描述

这个/counter页面显示了处理的消息数量,因为我们没有接入消费者,所以处理的消息数量为0。
在这里插入图片描述
/lookup界面支持创建topicchannel
在这里插入图片描述

消费者

一个简单的消费者示例代码如下:

  1. // nsq_consumer/main.go
  2. package main
  3. import (
  4. "fmt"
  5. "os"
  6. "os/signal"
  7. "syscall"
  8. "time"
  9. "github.com/nsqio/go-nsq"
  10. )
  11. // NSQ Consumer Demo
  12. // MyHandler 是一个消费者类型
  13. type MyHandler struct {
  14. Title string
  15. }
  16. // HandleMessage 是需要实现的处理消息的方法
  17. func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {
  18. fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body))
  19. return
  20. }
  21. // 初始化消费者
  22. func initConsumer(topic string, channel string, address string) (err error) {
  23. config := nsq.NewConfig()
  24. config.LookupdPollInterval = 15 * time.Second
  25. c, err := nsq.NewConsumer(topic, channel, config)
  26. if err != nil {
  27. fmt.Printf("create consumer failed, err:%v\n", err)
  28. return
  29. }
  30. consumer := &MyHandler{
  31. Title: "沙河1号",
  32. }
  33. c.AddHandler(consumer)
  34. // if err := c.ConnectToNSQD(address); err != nil { // 直接连NSQD
  35. if err := c.ConnectToNSQLookupd(address); err != nil { // 通过lookupd查询
  36. return err
  37. }
  38. return nil
  39. }
  40. func main() {
  41. err := initConsumer("topic_demo", "first", "127.0.0.1:4161")
  42. if err != nil {
  43. fmt.Printf("init consumer failed, err:%v\n", err)
  44. return
  45. }
  46. c := make(chan os.Signal) // 定义一个信号的通道
  47. signal.Notify(c, syscall.SIGINT) // 转发键盘中断信号到c
  48. <-c // 阻塞
  49. }

将上面的代码保存之后编译执行,就能够获取之前我们publish的两条消息了:

  1. $ ./nsq_consumer
  2. 2018/10/22 18:49:06 INF 1 [topic_demo/first] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=topic_demo
  3. 2018/10/22 18:49:06 INF 1 [topic_demo/first] (127.0.0.1:4150) connecting to nsqd
  4. 沙河1 recv from 127.0.0.1:4150, msg:123
  5. 沙河1 recv from 127.0.0.1:4150, msg:456

同时在nsqadmin的/counter页面查看到处理的数据数量为2。
在这里插入图片描述

关于go-nsq的更多内容请阅读go-nsq的官方文档。

发表评论

表情:
评论列表 (有 0 条评论,85人围观)

还没有评论,来说两句吧...

相关阅读

    相关 NSQ分布式消息队列

    NSQ是目前比较流行的一个分布式的消息队列,本文主要介绍了NSQ及Go语言如何操作NSQ。 NSQ NSQ介绍 [NSQ][]是Go语言编写的一个开源的实时分布式

    相关 分布式消息队列

    一、消息队列概述  消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量销峰等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的

    相关 分布式消息队列XXL-MQ

    [一、简介][Link 1] [1.1 概述][1.1] XXL-MQ是一款轻量级分布式消息队列,支持串行、并行和广播等多种消息模型。现已开放源代码,开箱即用。

    相关 分布式消息队列RabbitMQ

    这篇文章简单讲述下分布式消息队列的基础知识,不会太深入,因为类似MQ这样的分布式组件有很多不同的种类,都有各自的特征和其对应的应用场景,需要在实际应用中才能更加深入的理解。