源码
const (
MsgIDLength = 16
minValidMsgLength = MsgIDLength + 8 + 2 // Timestamp + Attempts
)
type MessageID [MsgIDLength]byte
type Message struct {
ID MessageID//长度为16的byte数组
Body []byte //消息体
Timestamp int64 //msg生成的时间戳
Attempts uint16 //尝试次数
// for in-flight handling
deliveryTS time.Time
clientID int64
pri int64
index int
deferred time.Duration
}
func NewMessage(id MessageID, body []byte) *Message {
return &Message{
ID: id,
Body: body,
Timestamp: time.Now().UnixNano(),
}
}
func (m *Message) WriteTo(w io.Writer) (int64, error) {
//写入的数据只有1.timestamp+attempts 2.ID 3.body,共4个数据
var buf [10]byte
var total int64
//对于整型、长整型等数据类型。Big endian 认为第一个字节是最高位字节
//按照从低地址到高地址的顺序存放数据的高位字节到低位字节
binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp)) //以bigEndian方式将时间戳放入[10]byte前8位
binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts))//以bigEndian方式将尝试次数放入[10]byte后2位
n, err := w.Write(buf[:])//1. 写入buf
total += int64(n)
if err != nil {
return total, err
}
n, err = w.Write(m.ID[:])//2. 写入msg.ID
total += int64(n)
if err != nil {
return total, err
}
n, err = w.Write(m.Body)//3. 写入msg.Body
total += int64(n)
if err != nil {
return total, err
}
return total, nil
}
// decodeMessage deserializes data (as []byte) and creates a new Message
// message format:
// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
// | (int64) || || (hex string encoded in ASCII) || (binary)
// | 8-byte || || 16-byte || N-byte
// ------------------------------------------------------------------------------------------...
// nanosecond timestamp ^^ message ID message body
// (uint16)
// 2-byte
// attempts
func decodeMessage(b []byte) (*Message, error) {
//解码过程与writeTo相反
var msg Message
if len(b) < minValidMsgLength {
//一个有效的msg包含 timestamp+attempts(8+2) + ID(16) + Body
return nil, fmt.Errorf("invalid message buffer size (%d)", len(b))
}
msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8])) //以big endian解码出 timestamp
msg.Attempts = binary.BigEndian.Uint16(b[8:10]) //以big endian解码出 attempts
copy(msg.ID[:], b[10:10+MsgIDLength]) //直接将b中id的部分取出
msg.Body = b[10+MsgIDLength:] //直接将b中body的部分取出
return &msg, nil
}
func writeMessageToBackend(buf *bytes.Buffer, msg *Message, bq BackendQueue) error {
buf.Reset()
_, err := msg.WriteTo(buf)//msg写入buf
if err != nil {
return err
}
return bq.Put(buf.Bytes()) //再将msg 写入 backendQueue,这个msg作为次级msg
}
还没有评论,来说两句吧...