nsq Message定义

绝地灬酷狼 2023-02-28 15:15 54阅读 0赞

源码

  1. const (
  2. MsgIDLength = 16
  3. minValidMsgLength = MsgIDLength + 8 + 2 // Timestamp + Attempts
  4. )
  5. type MessageID [MsgIDLength]byte
  6. type Message struct {
  7. ID MessageID//长度为16的byte数组
  8. Body []byte //消息体
  9. Timestamp int64 //msg生成的时间戳
  10. Attempts uint16 //尝试次数
  11. // for in-flight handling
  12. deliveryTS time.Time
  13. clientID int64
  14. pri int64
  15. index int
  16. deferred time.Duration
  17. }
  18. func NewMessage(id MessageID, body []byte) *Message {
  19. return &Message{
  20. ID: id,
  21. Body: body,
  22. Timestamp: time.Now().UnixNano(),
  23. }
  24. }
  25. func (m *Message) WriteTo(w io.Writer) (int64, error) {
  26. //写入的数据只有1.timestamp+attempts 2.ID 3.body,共4个数据
  27. var buf [10]byte
  28. var total int64
  29. //对于整型、长整型等数据类型。Big endian 认为第一个字节是最高位字节
  30. //按照从低地址到高地址的顺序存放数据的高位字节到低位字节
  31. binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp)) //以bigEndian方式将时间戳放入[10]byte前8位
  32. binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts))//以bigEndian方式将尝试次数放入[10]byte后2位
  33. n, err := w.Write(buf[:])//1. 写入buf
  34. total += int64(n)
  35. if err != nil {
  36. return total, err
  37. }
  38. n, err = w.Write(m.ID[:])//2. 写入msg.ID
  39. total += int64(n)
  40. if err != nil {
  41. return total, err
  42. }
  43. n, err = w.Write(m.Body)//3. 写入msg.Body
  44. total += int64(n)
  45. if err != nil {
  46. return total, err
  47. }
  48. return total, nil
  49. }
  50. // decodeMessage deserializes data (as []byte) and creates a new Message
  51. // message format:
  52. // [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
  53. // | (int64) || || (hex string encoded in ASCII) || (binary)
  54. // | 8-byte || || 16-byte || N-byte
  55. // ------------------------------------------------------------------------------------------...
  56. // nanosecond timestamp ^^ message ID message body
  57. // (uint16)
  58. // 2-byte
  59. // attempts
  60. func decodeMessage(b []byte) (*Message, error) {
  61. //解码过程与writeTo相反
  62. var msg Message
  63. if len(b) < minValidMsgLength {
  64. //一个有效的msg包含 timestamp+attempts(8+2) + ID(16) + Body
  65. return nil, fmt.Errorf("invalid message buffer size (%d)", len(b))
  66. }
  67. msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8])) //以big endian解码出 timestamp
  68. msg.Attempts = binary.BigEndian.Uint16(b[8:10]) //以big endian解码出 attempts
  69. copy(msg.ID[:], b[10:10+MsgIDLength]) //直接将b中id的部分取出
  70. msg.Body = b[10+MsgIDLength:] //直接将b中body的部分取出
  71. return &msg, nil
  72. }
  73. func writeMessageToBackend(buf *bytes.Buffer, msg *Message, bq BackendQueue) error {
  74. buf.Reset()
  75. _, err := msg.WriteTo(buf)//msg写入buf
  76. if err != nil {
  77. return err
  78. }
  79. return bq.Put(buf.Bytes()) //再将msg 写入 backendQueue,这个msg作为次级msg
  80. }

发表评论

表情:
评论列表 (有 0 条评论,54人围观)

还没有评论,来说两句吧...

相关阅读

    相关 NSQ简介(一)

    NSQ组件 nsqd是一个守护进程,负责接收,排队,投递消息给客户端。 它可以独立运行,不过通常它是由 nsqlookupd 实例所在集群配置的(它在这

    相关 NSQ理解

    一、介绍    NSQ是Go语言编写的,开源的分布式消息队列中间件,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。NSQ 具有分布式和去中心化拓扑结构,该结构具