etcd网络层(三)——stream实现过程

桃扇骨 2022-02-04 12:11 305阅读 0赞

stream消息通道主要是通过长连接的方式和对端进行数据交互,peer结构体中的stream相关数据如下(忽略V2版本):

  1. type peer struct {
  2. writer *streamWriter //负责向Stream消息通道中写消息
  3. msgAppReader *streamReader //负责从Stream消息通道中读消息
  4. .......其他字段省略
  5. }

从中我们看到stream是读写分离的,主要有streamWriter和streamReader结构体实现相关的功能。

stream实现的相关源码在 etcdserver/api/rafthttp/stream.go文件中。

#

一、streamWriter

streamWriter结构体如下:

  1. type streamWriter struct {
  2. lg *zap.Logger
  3. localID types.ID //本端的ID
  4. peerID types.ID //对端节点的ID
  5. status *peerStatus
  6. fs *stats.FollowerStats
  7. r Raft //底层的Raft实例
  8. mu sync.Mutex // guard field working and closer
  9. closer io.Closer //负责关闭底层的长连接
  10. working bool //负责标识当前的streamWriter是否可用
  11. msgc chan raftpb.Message //Peer会将待发送的消息写入到该通道,streamWriter则从该通道中读取消息并发送出去
  12. connc chan *outgoingConn //通过该通道获取当前streamWriter实例关联的底层网络连接, outgoingConn其实是对网络连接的一层封装,其中记录了当前连接使用的协议版本,以及用于关闭连接的Flusher和Closer等信息。
  13. stopc chan struct{}
  14. done chan struct{}
  15. }

从中我们看到:

localID和peerID分别记录本端ID和对端ID

status记录当前peer的可用性状态

r 是底层的Raft实例,用于和底层的raft模块交互

closer负责关闭底层的长连接

msgc peer会将待发送到对端的消息(非快照数据)写入到该通道,streamWriter则从该通道中读取并发送到对端节点。

connc outgoingConn其实是对网络连接的一层封装,connc用于等待对端的主动连接,当对端主动和当前节点建立连接之后,本节点才会向对端发送数据。

1. streamWriter的初始化

streamWriter的初始化过程在startStreamWriter函数中

  1. func startStreamWriter(lg *zap.Logger, local, id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
  2. w := &streamWriter{
  3. lg: lg,
  4. localID: local,
  5. peerID: id,
  6. status: status,
  7. fs: fs,
  8. r: r,
  9. msgc: make(chan raftpb.Message, streamBufSize),
  10. connc: make(chan *outgoingConn),
  11. stopc: make(chan struct{}),
  12. done: make(chan struct{}),
  13. }
  14. go w.run()
  15. return w
  16. }

该方法主要是用于初始化streamWriter中的相关字段,并开启一个协程执行run方法。

2.run方法

run方法是streamWriter的核心,源码如下:

  1. func (cw *streamWriter) run() {
  2. var (
  3. msgc chan raftpb.Message //指向当前streamWriter.msgc字段
  4. heartbeatc <-chan time.Time //定时器会定时向该通道发送信号,触发心跳消息的发送,该心跳消息与后台介绍的Raft的心跳消息有所不同,该心跳的主要目的是为了防止连接长时间不用断开的
  5. t streamType //用来记录消息的版本信息
  6. enc encoder //编码器,负责将消息序列化并写入连接的缓冲区
  7. flusher http.Flusher //负责刷新底层连接,将数据真正发送出去
  8. batched int //当前未Flush的消息个数
  9. )
  10. tickc := time.NewTicker(ConnReadTimeout / 3) //发送心跳的定时器
  11. defer tickc.Stop()
  12. unflushed := 0 //为Flush的字节数
  13. if cw.lg != nil {
  14. cw.lg.Info(
  15. "started stream writer with remote peer",
  16. zap.String("local-member-id", cw.localID.String()),
  17. zap.String("remote-peer-id", cw.peerID.String()),
  18. )
  19. } else {
  20. plog.Infof("started streaming with peer %s (writer)", cw.peerID)
  21. }
  22. for {
  23. select {
  24. case <-heartbeatc: //定时器到期,触发心跳消息
  25. err := enc.encode(&linkHeartbeatMessage) //通过encoder将心跳编码并写入到writer
  26. unflushed += linkHeartbeatMessage.Size() //增加未Flush出去的字节数
  27. if err == nil { //若没有异常,则使用flusher将缓存的消息全部发送出去,并重置batched和unflushed两个统计变量
  28. flusher.Flush()
  29. batched = 0
  30. sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
  31. unflushed = 0
  32. continue
  33. }
  34. //如果有异常则设置peer的status 为 不连通状态
  35. cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
  36. sentFailures.WithLabelValues(cw.peerID.String()).Inc()
  37. cw.close() //若发生异常,则关闭streamWriter,会导致底层连接的关闭
  38. if cw.lg != nil {
  39. cw.lg.Warn(
  40. "lost TCP streaming connection with remote peer",
  41. zap.String("stream-writer-type", t.String()),
  42. zap.String("local-member-id", cw.localID.String()),
  43. zap.String("remote-peer-id", cw.peerID.String()),
  44. )
  45. } else {
  46. plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
  47. }
  48. //将heartbeatc和msgc两个通道清空,后续就不会在发送心跳消息和其他类型的消息了
  49. heartbeatc, msgc = nil, nil
  50. case m := <-msgc: //peer向streamWriter.msgc写入待发送的消息
  51. err := enc.encode(&m) //将消息序列化并写入底层连接
  52. if err == nil { //若没有异常,则递增unflushed变量
  53. unflushed += m.Size()
  54. //msgc通道中的消息全部发送完成或是未Flush的消息较多(超过streamBufSize的一半),则触发Flush,否则只是递增batched变量
  55. if len(msgc) == 0 || batched > streamBufSize/2 {
  56. flusher.Flush()
  57. sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
  58. unflushed = 0
  59. batched = 0
  60. } else {
  61. batched++
  62. }
  63. continue
  64. }
  65. //若发生异常,则关闭streamWriter,清空heartbeatc和msgc两个通道
  66. cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
  67. cw.close()
  68. if cw.lg != nil {
  69. cw.lg.Warn(
  70. "lost TCP streaming connection with remote peer",
  71. zap.String("stream-writer-type", t.String()),
  72. zap.String("local-member-id", cw.localID.String()),
  73. zap.String("remote-peer-id", cw.peerID.String()),
  74. )
  75. } else {
  76. plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
  77. }
  78. heartbeatc, msgc = nil, nil
  79. cw.r.ReportUnreachable(m.To)
  80. sentFailures.WithLabelValues(cw.peerID.String()).Inc()
  81. /*
  82. 当其他节点(对端)主动与当前节点创建Stream消息通道时,会先通过StreamHandler的处理,
  83. StreamHandler会通过attach()方法将连接写入对应的peer.writer.connc通道,
  84. 而当前的goroutine会通过该通道获取连接,然后开始发送消息
  85. */
  86. case conn := <-cw.connc: //获取与当前streamWriter实例绑定的底层连接
  87. cw.mu.Lock()
  88. closed := cw.closeUnlocked()
  89. t = conn.t //获取该连接底层发送的消息版本,并创建相应的encoder实例
  90. switch conn.t { //不同的版本创建不同的编码器,将http.ResponseWriter封装成messageEncoder,上层调用通过messageEncoder实例完成消息发送
  91. case streamTypeMsgAppV2:
  92. enc = newMsgAppV2Encoder(conn.Writer, cw.fs)
  93. case streamTypeMessage:
  94. enc = &messageEncoder{w: conn.Writer}
  95. default:
  96. plog.Panicf("unhandled stream type %s", conn.t)
  97. }
  98. if cw.lg != nil {
  99. cw.lg.Info(
  100. "set message encoder",
  101. zap.String("from", conn.localID.String()),
  102. zap.String("to", conn.peerID.String()),
  103. zap.String("stream-type", t.String()),
  104. )
  105. }
  106. flusher = conn.Flusher //记录底层连接对应的Flusher
  107. unflushed = 0 //重置未Flush的字节数
  108. cw.status.activate() //peerStatus.active设置为true
  109. cw.closer = conn.Closer //负责关闭底层的长连接
  110. cw.working = true //标识当前streamWriter正在运行
  111. cw.mu.Unlock()
  112. if closed {
  113. if cw.lg != nil {
  114. cw.lg.Warn(
  115. "closed TCP streaming connection with remote peer",
  116. zap.String("stream-writer-type", t.String()),
  117. zap.String("local-member-id", cw.localID.String()),
  118. zap.String("remote-peer-id", cw.peerID.String()),
  119. )
  120. } else {
  121. plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
  122. }
  123. }
  124. if cw.lg != nil {
  125. cw.lg.Warn(
  126. "established TCP streaming connection with remote peer",
  127. zap.String("stream-writer-type", t.String()),
  128. zap.String("local-member-id", cw.localID.String()),
  129. zap.String("remote-peer-id", cw.peerID.String()),
  130. )
  131. } else {
  132. plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
  133. }
  134. //更新heartbeatc和msgc两个通道,自此之后,才能发送消息
  135. heartbeatc, msgc = tickc.C, cw.msgc
  136. case <-cw.stopc:
  137. if cw.close() {
  138. if cw.lg != nil {
  139. cw.lg.Warn(
  140. "closed TCP streaming connection with remote peer",
  141. zap.String("stream-writer-type", t.String()),
  142. zap.String("remote-peer-id", cw.peerID.String()),
  143. )
  144. } else {
  145. plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
  146. }
  147. }
  148. if cw.lg != nil {
  149. cw.lg.Warn(
  150. "stopped TCP streaming connection with remote peer",
  151. zap.String("stream-writer-type", t.String()),
  152. zap.String("remote-peer-id", cw.peerID.String()),
  153. )
  154. } else {
  155. plog.Infof("stopped streaming with peer %s (writer)", cw.peerID)
  156. }
  157. close(cw.done)
  158. return
  159. }
  160. }
  161. }

run方法的主要作用如下:

1.当其他节点主动与当前节点创建连接(即Stream消息通道底层使用的网络连接)时,该连接实例会写入对应的peer.writer.connc通道,

在streamWriter.run()方法中会通过该通道获取该连接实例并进行绑定,之后才能开始后续的消息发送

2.定时发送心跳,该心跳消息并不是前面介绍的etcd-raft模块时提到的MsgHeartbeat消息,而是为了防止底层连接超时的消息,为了维护长连接。

3.发送消息给对端,即msgc中等待发送给对端节点的消息。

3.attach方法

  1. func (cw *streamWriter) attach(conn *outgoingConn) bool {
  2. select {
  3. case cw.connc <- conn: //将conn实例写入streamWriter.connc通道中
  4. return true
  5. case <-cw.done:
  6. return false
  7. }
  8. }

该方法主要用于接收对端的连接(该连接被封装成outgoingConn),并将outgoingConn实例并写入streamWriter.connc通道中。

#

二、streamReader实例

  1. type streamReader struct {
  2. lg *zap.Logger
  3. peerID types.ID //对端节点的ID
  4. typ streamType //关联的底层连接使用的协议版本信息
  5. tr *Transport //关联的rafthttp.Transport实例
  6. picker *urlPicker //用于获取对端节点的可用的URL
  7. status *peerStatus //
  8. recvc chan<- raftpb.Message //创建streamReader时会启动一个后台goroutine从peer.recvc通道中读取消息。从对端节点发送来的非MsgProp类型消息会写入到该通道中,然后由新建的goroutine读取出来,交给底层的etcd-raft模块进行处理
  9. propc chan<- raftpb.Message //功能基本同上,只不过用于接收MsgProp类型的消息。
  10. rl *rate.Limiter // alters the frequency of dial retrial attempts
  11. errorc chan<- error
  12. mu sync.Mutex
  13. paused bool //是否暂停
  14. closer io.Closer //负责关闭底层的长连接
  15. ctx context.Context
  16. cancel context.CancelFunc
  17. done chan struct{}
  18. }

主要功能是从Stream通道(底层连接)中读取消息

recvc 创建streamReader时会启动一个后台goroutine从peer.recvc通道中读取消息。从对端节点发送来的非MsgProp类型消息会写入到该通道中,然后由新建的goroutine读取出来,交给底层的etcd-raft模块进行处理。

propc 功能基本同上,只不过用于接收MsgProp类型的消息。

streamReader的初始化

  1. func (cr *streamReader) start() {
  2. cr.done = make(chan struct{})
  3. if cr.errorc == nil {
  4. cr.errorc = cr.tr.ErrorC
  5. }
  6. if cr.ctx == nil {
  7. cr.ctx, cr.cancel = context.WithCancel(context.Background())
  8. }
  9. go cr.run()
  10. }

初始化 done、errorc及ctx、cancel等相关字段,然后开启一个协程执行run方法

  1. func (cr *streamReader) run() {
  2. t := cr.typ //获取使用的消息版本
  3. //省略日志相关
  4. for {
  5. rc, err := cr.dial(t) //向对端节点发送一个GET请求,然后获取并返回响应的ReadCloser,主要用于和对端建立连接
  6. if err != nil {
  7. if err != errUnsupportedStreamType {
  8. cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())
  9. }
  10. } else {//如果未出现异常,则开始读取对端返回的消息,并将读取到的消息写入streamReader.recvc通道中
  11. cr.status.activate()
  12. err = cr.decodeLoop(rc, t) //轮询读取消息
  13. //省略日志相关......
  14. switch {
  15. // all data is read out
  16. case err == io.EOF:
  17. // connection is closed by the remote
  18. case transport.IsClosedConnError(err):
  19. default:
  20. cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
  21. }
  22. }
  23. // Wait for a while before new dial attempt
  24. err = cr.rl.Wait(cr.ctx)
  25. if cr.ctx.Err() != nil {
  26. if cr.lg != nil {
  27. cr.lg.Info(
  28. "stopped stream reader with remote peer",
  29. zap.String("stream-reader-type", t.String()),
  30. zap.String("local-member-id", cr.tr.ID.String()),
  31. zap.String("remote-peer-id", cr.peerID.String()),
  32. )
  33. } else {
  34. plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
  35. }
  36. close(cr.done)
  37. return
  38. }
  39. if err != nil {
  40. if cr.lg != nil {
  41. cr.lg.Warn(
  42. "rate limit on stream reader with remote peer",
  43. zap.String("stream-reader-type", t.String()),
  44. zap.String("local-member-id", cr.tr.ID.String()),
  45. zap.String("remote-peer-id", cr.peerID.String()),
  46. zap.Error(err),
  47. )
  48. } else {
  49. plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err)
  50. }
  51. }
  52. }
  53. }

run方法时streamReader的核心,首先调用dial方法给对端发送一个GET请求,从而和对端建立长连接。对端的Header收到该连接后,会封装成outgoingConn结构,并发送给streamWriter,即上面介绍的,当streamWriter收到新建的连接后,会用心跳维持该连接。

连接建立成功之后会调用decodeLoop方法来轮询读取对端节点发送过来的消息,并将收到的字节流转成raftpb.Message消息类型的格式。如果消息是心跳类型则忽略,如果是raftpb.MsgProp类型的消息,则发送到propc通道中,会有专门的协程发送给本节点底层的raft模块。如果是其他消息(注意这些消息里没有快照类型的消息,因为快照类型的消息是专门的通道),则发送到recvc通道中,会有专门的协程将数据发送给底层的raft模块。

dial方法

  1. func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
  2. u := cr.picker.pick() //获取对端节点暴露的一个URL
  3. uu := u
  4. //根据使用的协议版本号和节点ID创建最终的URL地址
  5. uu.Path = path.Join(t.endpoint(), cr.tr.ID.String())
  6. //创建一个GET请求
  7. req, err := http.NewRequest("GET", uu.String(), nil)
  8. if err != nil {
  9. cr.picker.unreachable(u)
  10. return nil, fmt.Errorf("failed to make http request to %v (%v)", u, err)
  11. }
  12. //设置HTTP请求头
  13. req.Header.Set("X-Server-From", cr.tr.ID.String())
  14. req.Header.Set("X-Server-Version", version.Version)
  15. req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
  16. req.Header.Set("X-Etcd-Cluster-ID", cr.tr.ClusterID.String())
  17. req.Header.Set("X-Raft-To", cr.peerID.String())
  18. //通过设置Header,将当前节点暴露的URL也一起发送给对端节点
  19. setPeerURLsHeader(req, cr.tr.URLs)
  20. req = req.WithContext(cr.ctx)
  21. cr.mu.Lock()
  22. select {
  23. case <-cr.ctx.Done():
  24. cr.mu.Unlock()
  25. return nil, fmt.Errorf("stream reader is stopped")
  26. default:
  27. }
  28. cr.mu.Unlock()
  29. //发送请求 ping-pone
  30. resp, err := cr.tr.streamRt.RoundTrip(req)
  31. if err != nil {
  32. cr.picker.unreachable(u)
  33. return nil, err
  34. }
  35. rv := serverVersion(resp.Header)
  36. lv := semver.Must(semver.NewVersion(version.Version))
  37. if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) {
  38. httputil.GracefulClose(resp)
  39. cr.picker.unreachable(u)
  40. return nil, errUnsupportedStreamType
  41. }
  42. switch resp.StatusCode {
  43. case http.StatusOK:
  44. return resp.Body, nil
  45. //省略其他错误处理
  46. }
  47. }

主要负责与对端节点建立连接,其中包含了多种异常情况的处理。

看到创建GET请求,并设置HTTP请求头信息,调用RoundTrip发送请求到对端。

当返回成功时,将resp.Body返回。

decodeLoop方法

  1. func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
  2. var dec decoder
  3. cr.mu.Lock()
  4. //根据使用的协议版本创建对应的decoder实例
  5. switch t {
  6. case streamTypeMsgAppV2:
  7. dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
  8. case streamTypeMessage:
  9. dec = &messageDecoder{r: rc} //主要负责从连接中读取消息
  10. default:
  11. if cr.lg != nil {
  12. cr.lg.Panic("unknown stream type", zap.String("type", t.String()))
  13. } else {
  14. plog.Panicf("unhandled stream type %s", t)
  15. }
  16. }
  17. //检测是否已经暂停从该连接上读取消息
  18. select {
  19. case <-cr.ctx.Done():
  20. cr.mu.Unlock()
  21. if err := rc.Close(); err != nil {
  22. return err
  23. }
  24. return io.EOF
  25. default:
  26. cr.closer = rc
  27. }
  28. cr.mu.Unlock()
  29. // gofail: labelRaftDropHeartbeat:
  30. for {
  31. //从底层连接中读取数据,并反序列化成raftpb.Message实例
  32. m, err := dec.decode()
  33. if err != nil {
  34. cr.mu.Lock()
  35. cr.close()
  36. cr.mu.Unlock()
  37. return err
  38. }
  39. // gofail-go: var raftDropHeartbeat struct{}
  40. // continue labelRaftDropHeartbeat
  41. receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
  42. cr.mu.Lock()
  43. paused := cr.paused
  44. cr.mu.Unlock()
  45. if paused {
  46. continue
  47. }
  48. // 忽略连接层的心跳消息,注意与后面介绍的Raft的心跳消息进行区分
  49. if isLinkHeartbeatMessage(&m) {
  50. // raft is not interested in link layer
  51. // heartbeat message, so we should ignore
  52. // it.
  53. continue
  54. }
  55. //根据消息类型获取要写入的通道
  56. recvc := cr.recvc
  57. if m.Type == raftpb.MsgProp {
  58. recvc = cr.propc
  59. }
  60. select {
  61. case recvc <- m: //将消息写入到对应的通道中,之后交给底层的Raft状态机进行处理
  62. default:
  63. //省略异常处理
  64. }
  65. }
  66. }

该方法的主要作用就是 从底层的网络连接读取数据并进行反序列化,之前将得到的消息写入到recvc通道或propc通道中,等待Peer进行处理。

发表评论

表情:
评论列表 (有 0 条评论,305人围观)

还没有评论,来说两句吧...

相关阅读