etcd网络层(四)——Peer接口的实现

青旅半醒 2022-01-27 01:23 453阅读 0赞

peer结构体是Peer接口的实现,peer是远程raft节点的代表,本地raft节点通过peer给远程raft节点发送消息

每个peer有两个发送消息的基本机制:stream和pipeline

stream是长连接,它始终可以传输消息。除了通常的stream,还有优化的stream用于leader发送msgApp消息,因为msgApp占了大部分的消息

pipeline是一系列客户端发送请求到远程节点,在传输完成后会立即关闭连接,主要负责数据量较大、发送频率较低的消息。如MsgSnap消息等。

peer结构体及字段的含义如下:

  1. type peer struct {
  2. lg *zap.Logger
  3. localID types.ID //当前节点ID
  4. // id of the remote raft peer node
  5. id types.ID //该peer实例对应的节点ID,对端ID
  6. r Raft //Raft接口,在Raft接口实现的底层封装了etcd-raft模块
  7. status *peerStatus //peer的状态
  8. /*
  9. 每个节点可能提供了多个URL供其他节点正常访问,当其中一个访问失败时,我们应该可以尝试访问另一个。
  10. urlPicker提供的主要功能就是在这些URL之间进行切换
  11. */
  12. picker *urlPicker
  13. msgAppV2Writer *streamWriter
  14. writer *streamWriter //负责向Stream消息通道中写消息
  15. pipeline *pipeline //pipeline消息通道
  16. snapSender *snapshotSender // snapshot sender to send v3 snapshot messages 负责发送快照数据
  17. msgAppV2Reader *streamReader
  18. msgAppReader *streamReader //负责从Stream消息通道中读消息
  19. recvc chan raftpb.Message //从Stream消息通道中读取到消息之后,会通过该通道将消息交给Raft接口,然后由它返回给底层etcd-raft模块进行处理
  20. propc chan raftpb.Message //从Stream消息通道中读取到MsgProp类型的消息之后,会通过该通道将MsgApp消息交给Raft接口,然后由它返回给底层的etcd-raft模块进行处理
  21. mu sync.Mutex
  22. paused bool //是否暂停向其他节点发送消息
  23. cancel context.CancelFunc // cancel pending works in go routine created by peer. context的取消函数
  24. stopc chan struct{}
  25. }

以上就是peer结构体,peer实现读写分离的方式和对端进行交互。

一、peer的创建

peer的创建过程主要在startPeer函数中

  1. func startPeer(t *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer {
  2. //省略日志处理
  3. status := newPeerStatus(t.Logger, t.ID, peerID) //创建节点的状态信息 status
  4. picker := newURLPicker(urls) //根据节点提供的URL创建urlPicker
  5. errorc := t.ErrorC
  6. r := t.Raft //底层的Raft状态及
  7. pipeline := &pipeline{ //创建pipeline实例
  8. peerID: peerID,
  9. tr: t,
  10. picker: picker,
  11. status: status,
  12. followerStats: fs,
  13. raft: r,
  14. errorc: errorc,
  15. }
  16. pipeline.start() //启动pipeline
  17. p := &peer{ //创建peer实例
  18. lg: t.Logger,
  19. localID: t.ID,
  20. id: peerID,
  21. r: r,
  22. status: status,
  23. picker: picker,
  24. msgAppV2Writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r), //创建并启动streamWriter
  25. writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
  26. pipeline: pipeline,
  27. snapSender: newSnapshotSender(t, picker, peerID, status),
  28. recvc: make(chan raftpb.Message, recvBufSize), //创建recvc通道
  29. propc: make(chan raftpb.Message, maxPendingProposals), //创建propc通道
  30. stopc: make(chan struct{}),
  31. }
  32. //启动单独的goroutine,它负责将recvc通道中读取消息,该通道中的消息就是从对端节点发送过来的消息,
  33. // 然后将读取到的消息交给底层的Raft状态机进行处理
  34. ctx, cancel := context.WithCancel(context.Background())
  35. p.cancel = cancel
  36. go func() {
  37. for {
  38. select {
  39. case mm := <-p.recvc: //从recvc通道中获取连接上读取到的消息
  40. if err := r.Process(ctx, mm); err != nil { //将Message交给底层Raft状态机处理
  41. if t.Logger != nil {
  42. t.Logger.Warn("failed to process Raft message", zap.Error(err))
  43. } else {
  44. plog.Warningf("failed to process raft message (%v)", err)
  45. }
  46. }
  47. case <-p.stopc:
  48. return
  49. }
  50. }
  51. }()
  52. // r.Process might block for processing proposal when there is no leader.
  53. // Thus propc must be put into a separate routine with recvc to avoid blocking
  54. // processing other raft messages.
  55. //在底层的Raft状态机处理MsgProp类型的消息时,可能会阻塞,所以启动单独的goroutine来处理
  56. go func() {
  57. for {
  58. select {
  59. case mm := <-p.propc: //从propc通道中获取MsgProp类型的Message
  60. if err := r.Process(ctx, mm); err != nil { //将Message交给底层的Raft状态机来处理
  61. plog.Warningf("failed to process raft message (%v)", err)
  62. }
  63. case <-p.stopc:
  64. return
  65. }
  66. }
  67. }()
  68. //创建并启动streamReader实例,主要负责从Stream消息通道上读取消息
  69. p.msgAppV2Reader = &streamReader{
  70. lg: t.Logger,
  71. peerID: peerID,
  72. typ: streamTypeMsgAppV2,
  73. tr: t,
  74. picker: picker,
  75. status: status,
  76. recvc: p.recvc,
  77. propc: p.propc,
  78. rl: rate.NewLimiter(t.DialRetryFrequency, 1),
  79. }
  80. p.msgAppReader = &streamReader{
  81. lg: t.Logger,
  82. peerID: peerID,
  83. typ: streamTypeMessage,
  84. tr: t,
  85. picker: picker,
  86. status: status,
  87. recvc: p.recvc,
  88. propc: p.propc,
  89. rl: rate.NewLimiter(t.DialRetryFrequency, 1),
  90. }
  91. p.msgAppV2Reader.start() //V2版本
  92. p.msgAppReader.start()
  93. return p
  94. }

status用于记录当且peer的可用性状态

picker从URL列表中获取可用URL的机制

pipeline的创建过程我们前面已经详细介绍,这里就不在累述。

streamWriter和streamReader 上一节也详细介绍过了。

这里看到开启两个协程分别获取p.recvc通道和p.propc通道中的值,并将获取的值通过Process方法 交给底层的raft模块。recvc和propc通道中的值是上节我们讲streamReader的时候从对端节点读取到的消息。

二、向对端发送消息

peer的send方法主要用于向对端发送消息。

  1. func (p *peer) send(m raftpb.Message) {
  2. p.mu.Lock()
  3. paused := p.paused
  4. p.mu.Unlock()
  5. if paused { //如果暂停则返回
  6. return
  7. }
  8. //根据消息类型选择一个通道去发送消息
  9. writec, name := p.pick(m)
  10. select {
  11. case writec <- m: //将消息发送到writec通道中,等待发送
  12. default:
  13. //如果出现阻塞,则将消息报告给底层Raft状态机,这里会根据消息类型选择合适的报告方法
  14. p.r.ReportUnreachable(m.To)
  15. if isMsgSnap(m) { //如果消息类型的MsgSnap则通知底层的raft-node,当前快照数据发送失败
  16. p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
  17. }
  18. //其他处理省略
  19. }
  20. }

send主要作用是向对端节点发送消息,pick方法根据消息类型和协议版本选择一个通道发送消息。该通道主要是pipeline和stream两类消息,pipeline通道通过HTTP POST发送给对端,stream通道和对端维护的长连接。

如果出现阻塞,则将消息报告给底层的raft状态机。

pick方法

根据消息类型和版本获取对应的通道发送对端节点

  1. func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
  2. var ok bool
  3. // Considering MsgSnap may have a big size, e.g., 1G, and will block
  4. // stream for a long time, only use one of the N pipelines to send MsgSnap.
  5. if isMsgSnap(m) {
  6. return p.pipeline.msgc, pipelineMsg
  7. } else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) {
  8. return writec, streamAppV2
  9. } else if writec, ok = p.writer.writec(); ok {
  10. return writec, streamMsg
  11. }
  12. return p.pipeline.msgc, pipelineMsg
  13. }

1.如果该消息是MsgSnap快照消息,则选择pipeline.msgc通道,返回的channel名为pipeline

2.如果该消息是MsgApp消息,则选择msgAppV2Writer的msgc通道,channel名为streamMsgAppV2

3.最后选择writer的msgc通道,channel名为streamMsg

如果Stream通道不可用,则使用Pipeline消息通道发送所有类型的消息

peer中的方法都比较简单这里就不在叙述。

发表评论

表情:
评论列表 (有 0 条评论,453人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Peer To Peer——对等网络

           今年的考试,大问题没怎么出现。就是考英语第二天的下午,发生网络堵塞的现象,不影响大局,但是事出有因,我们还是需要看看是什么影响到了考生抽题。最后查了一圈,发现其他