剖析nsq消息队列(四) 消息的负载处理

野性酷女 2023-06-17 09:56 5阅读 0赞

剖析nsq消息队列-目录
实际应用中,一部分服务集群可能会同时订阅同一个topic,并且处于同一个channel下。当nsqd有消息需要发送给订阅客户端去处理时,发给哪个客户端是需要考虑的,也就是我要说的消息的负载。
www.wityx.com

如果不考虑负载情况,把随机的把消息发送到某一个客服端去处理消息,如果机器的性能不同,可能发生的情况就是某一个或几个客户端处理速度慢,但还有大量新的消息需要处理,其他的客户端处于空闲状态。理想的状态是,找到当前相对空闲的客户端去处理消息。

nsq的处理方式是客户端主动向nsqd报告自已的可处理消息数量(也就是RDY命令)。nsqd根据每个连接的客户端的可处理消息的状态来随机把消息发送到可用的客户端,来进行消息处理

如下图所示:
www.wityx.com

客户端更新RDY

从第一篇帖子的例子中我们就有配置consumer的config

  1. config := nsq.NewConfig()
  2. config.MaxInFlight = 1000
  3. config.MaxBackoffDuration = 5 * time.Second
  4. config.DialTimeout = 10 * time.Second

MaxInFlight 来设置最大的处理中的消息数量,会根据这个变量计算在是否更新RDY
初始化的时候 客户端会向连接的nsqd服务端来发送updateRDY来设置最大处理数,

  1. func (r *Consumer) maybeUpdateRDY(conn *Conn) {
  2. inBackoff := r.inBackoff()
  3. inBackoffTimeout := r.inBackoffTimeout()
  4. if inBackoff || inBackoffTimeout {
  5. r.log(LogLevelDebug, "(%s) skip sending RDY inBackoff:%v || inBackoffTimeout:%v",
  6. conn, inBackoff, inBackoffTimeout)
  7. return
  8. }
  9. remain := conn.RDY()
  10. lastRdyCount := conn.LastRDY()
  11. count := r.perConnMaxInFlight()
  12. // refill when at 1, or at 25%, or if connections have changed and we're imbalanced
  13. if remain <= 1 || remain < (lastRdyCount/4) || (count > 0 && count < remain) {
  14. r.log(LogLevelDebug, "(%s) sending RDY %d (%d remain from last RDY %d)",
  15. conn, count, remain, lastRdyCount)
  16. r.updateRDY(conn, count)
  17. } else {
  18. r.log(LogLevelDebug, "(%s) skip sending RDY %d (%d remain out of last RDY %d)",
  19. conn, count, remain, lastRdyCount)
  20. }
  21. }

当剩余的可用处理数量remain 小于等于1,或者小于最后一次设置的可用数量lastRdyCount的1/4时,或者可用连接平均的maxInFlight大于0并且小于remain时,则更新RDY状态

当有多个nsqd时,会把最大的消息进行平均计算:

  1. // perConnMaxInFlight calculates the per-connection max-in-flight count.
  2. //
  3. // This may change dynamically based on the number of connections to nsqd the Consumer
  4. // is responsible for.
  5. func (r *Consumer) perConnMaxInFlight() int64 {
  6. b := float64(r.getMaxInFlight())
  7. s := b / float64(len(r.conns()))
  8. return int64(math.Min(math.Max(1, s), b))
  9. }

当有消息从nsqd发送过来后也会调用maybeUpdateRDY方法,计算是否需要发送RDY命令

  1. func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
  2. atomic.AddInt64(&r.totalRdyCount, -1)
  3. atomic.AddUint64(&r.messagesReceived, 1)
  4. r.incomingMessages <- msg
  5. r.maybeUpdateRDY(c)
  6. }

上面就是主要的处理逻辑,但还有一些逻辑,就是当消息处理发生错误时,nsq有自己的退避算法backoff也会更新RDY 简单来说就是当发现有处理错误时,来进行重试和指数退避,在退避期间RDY会为0,重试时会先放尝试RDY为1看有没有错误,如果没有错误则全部放开,这个算法这篇帖子我就不详细说了。

服务端nsqd选择客户端进行发送消息

同时订阅同一topic的客户端(comsumer)有很多个,每个客户端根据自己的配置或状态发送RDY命令到nsqd表明自己能处理多少消息量
nsqd服务端会检查每个客户端的的状态是否可以发送消息。也就是IsReadyForMessages方法,判断inFlightCount是否大于readyCount,如果大于或者等于就不再给客户端发送数据,等待Ready后才会再给客户端发送数据

  1. func (c *clientV2) IsReadyForMessages() bool {
  2. if c.Channel.IsPaused() {
  3. return false
  4. }
  5. readyCount := atomic.LoadInt64(&c.ReadyCount)
  6. inFlightCount := atomic.LoadInt64(&c.InFlightCount)
  7. c.ctx.nsqd.logf(LOG_DEBUG, "[%s] state rdy: %4d inflt: %4d", c, readyCount, inFlightCount)
  8. if inFlightCount >= readyCount || readyCount <= 0 {
  9. return false
  10. }
  11. return true

每一次发送消息inFlightCount会+1并保存到发送中的队列中,当客户端发送FIN会-1在之前的帖子中有说过。

  1. func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
  2. // ...
  3. for {
  4. // 检查订阅状态和消息是否可处理状态
  5. if subChannel == nil || !client.IsReadyForMessages() {
  6. // the client is not ready to receive messages...
  7. memoryMsgChan = nil
  8. backendMsgChan = nil
  9. flusherChan = nil
  10. // ...
  11. flushed = true
  12. } else if flushed {
  13. memoryMsgChan = subChannel.memoryMsgChan
  14. backendMsgChan = subChannel.backend.ReadChan()
  15. flusherChan = nil
  16. } else {
  17. memoryMsgChan = subChannel.memoryMsgChan
  18. backendMsgChan = subChannel.backend.ReadChan()
  19. flusherChan = outputBufferTicker.C
  20. }
  21. select {
  22. case <-flusherChan:
  23. // ...
  24. // 消息处理
  25. case b := <-backendMsgChan:
  26. client.SendingMessage()
  27. // ...
  28. case msg := <-memoryMsgChan:
  29. client.SendingMessage()
  30. //...
  31. }
  32. }
  33. // ...
  34. }

发表评论

表情:
评论列表 (有 0 条评论,5人围观)

还没有评论,来说两句吧...

相关阅读

    相关 NSQ分布式消息队列

    NSQ是目前比较流行的一个分布式的消息队列,本文主要介绍了NSQ及Go语言如何操作NSQ。 NSQ NSQ介绍 [NSQ][]是Go语言编写的一个开源的实时分布式

    相关 消息队列动态负载

    关于reids中的消息队列,以及其他的队列的处理方式。(底层一定要是队列,而不是伪队列。) 1.Reid中的消息队列的长度控制原理 对于一个队列,他的长度会随着放入数据而增