etcd网络层(三)——stream实现过程
stream消息通道主要是通过长连接的方式和对端进行数据交互,peer结构体中的stream相关数据如下(忽略V2版本):
type peer struct {
writer *streamWriter //负责向Stream消息通道中写消息
msgAppReader *streamReader //负责从Stream消息通道中读消息
.......其他字段省略
}
从中我们看到stream是读写分离的,主要有streamWriter和streamReader结构体实现相关的功能。
stream实现的相关源码在 etcdserver/api/rafthttp/stream.go文件中。
#
一、streamWriter
streamWriter结构体如下:
type streamWriter struct {
lg *zap.Logger
localID types.ID //本端的ID
peerID types.ID //对端节点的ID
status *peerStatus
fs *stats.FollowerStats
r Raft //底层的Raft实例
mu sync.Mutex // guard field working and closer
closer io.Closer //负责关闭底层的长连接
working bool //负责标识当前的streamWriter是否可用
msgc chan raftpb.Message //Peer会将待发送的消息写入到该通道,streamWriter则从该通道中读取消息并发送出去
connc chan *outgoingConn //通过该通道获取当前streamWriter实例关联的底层网络连接, outgoingConn其实是对网络连接的一层封装,其中记录了当前连接使用的协议版本,以及用于关闭连接的Flusher和Closer等信息。
stopc chan struct{}
done chan struct{}
}
从中我们看到:
localID和peerID分别记录本端ID和对端ID
status记录当前peer的可用性状态
r 是底层的Raft实例,用于和底层的raft模块交互
closer负责关闭底层的长连接
msgc peer会将待发送到对端的消息(非快照数据)写入到该通道,streamWriter则从该通道中读取并发送到对端节点。
connc outgoingConn其实是对网络连接的一层封装,connc用于等待对端的主动连接,当对端主动和当前节点建立连接之后,本节点才会向对端发送数据。
1. streamWriter的初始化
streamWriter的初始化过程在startStreamWriter函数中
func startStreamWriter(lg *zap.Logger, local, id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
w := &streamWriter{
lg: lg,
localID: local,
peerID: id,
status: status,
fs: fs,
r: r,
msgc: make(chan raftpb.Message, streamBufSize),
connc: make(chan *outgoingConn),
stopc: make(chan struct{}),
done: make(chan struct{}),
}
go w.run()
return w
}
该方法主要是用于初始化streamWriter中的相关字段,并开启一个协程执行run方法。
2.run方法
run方法是streamWriter的核心,源码如下:
func (cw *streamWriter) run() {
var (
msgc chan raftpb.Message //指向当前streamWriter.msgc字段
heartbeatc <-chan time.Time //定时器会定时向该通道发送信号,触发心跳消息的发送,该心跳消息与后台介绍的Raft的心跳消息有所不同,该心跳的主要目的是为了防止连接长时间不用断开的
t streamType //用来记录消息的版本信息
enc encoder //编码器,负责将消息序列化并写入连接的缓冲区
flusher http.Flusher //负责刷新底层连接,将数据真正发送出去
batched int //当前未Flush的消息个数
)
tickc := time.NewTicker(ConnReadTimeout / 3) //发送心跳的定时器
defer tickc.Stop()
unflushed := 0 //为Flush的字节数
if cw.lg != nil {
cw.lg.Info(
"started stream writer with remote peer",
zap.String("local-member-id", cw.localID.String()),
zap.String("remote-peer-id", cw.peerID.String()),
)
} else {
plog.Infof("started streaming with peer %s (writer)", cw.peerID)
}
for {
select {
case <-heartbeatc: //定时器到期,触发心跳消息
err := enc.encode(&linkHeartbeatMessage) //通过encoder将心跳编码并写入到writer
unflushed += linkHeartbeatMessage.Size() //增加未Flush出去的字节数
if err == nil { //若没有异常,则使用flusher将缓存的消息全部发送出去,并重置batched和unflushed两个统计变量
flusher.Flush()
batched = 0
sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
unflushed = 0
continue
}
//如果有异常则设置peer的status 为 不连通状态
cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
sentFailures.WithLabelValues(cw.peerID.String()).Inc()
cw.close() //若发生异常,则关闭streamWriter,会导致底层连接的关闭
if cw.lg != nil {
cw.lg.Warn(
"lost TCP streaming connection with remote peer",
zap.String("stream-writer-type", t.String()),
zap.String("local-member-id", cw.localID.String()),
zap.String("remote-peer-id", cw.peerID.String()),
)
} else {
plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
}
//将heartbeatc和msgc两个通道清空,后续就不会在发送心跳消息和其他类型的消息了
heartbeatc, msgc = nil, nil
case m := <-msgc: //peer向streamWriter.msgc写入待发送的消息
err := enc.encode(&m) //将消息序列化并写入底层连接
if err == nil { //若没有异常,则递增unflushed变量
unflushed += m.Size()
//msgc通道中的消息全部发送完成或是未Flush的消息较多(超过streamBufSize的一半),则触发Flush,否则只是递增batched变量
if len(msgc) == 0 || batched > streamBufSize/2 {
flusher.Flush()
sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
unflushed = 0
batched = 0
} else {
batched++
}
continue
}
//若发生异常,则关闭streamWriter,清空heartbeatc和msgc两个通道
cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
cw.close()
if cw.lg != nil {
cw.lg.Warn(
"lost TCP streaming connection with remote peer",
zap.String("stream-writer-type", t.String()),
zap.String("local-member-id", cw.localID.String()),
zap.String("remote-peer-id", cw.peerID.String()),
)
} else {
plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
}
heartbeatc, msgc = nil, nil
cw.r.ReportUnreachable(m.To)
sentFailures.WithLabelValues(cw.peerID.String()).Inc()
/*
当其他节点(对端)主动与当前节点创建Stream消息通道时,会先通过StreamHandler的处理,
StreamHandler会通过attach()方法将连接写入对应的peer.writer.connc通道,
而当前的goroutine会通过该通道获取连接,然后开始发送消息
*/
case conn := <-cw.connc: //获取与当前streamWriter实例绑定的底层连接
cw.mu.Lock()
closed := cw.closeUnlocked()
t = conn.t //获取该连接底层发送的消息版本,并创建相应的encoder实例
switch conn.t { //不同的版本创建不同的编码器,将http.ResponseWriter封装成messageEncoder,上层调用通过messageEncoder实例完成消息发送
case streamTypeMsgAppV2:
enc = newMsgAppV2Encoder(conn.Writer, cw.fs)
case streamTypeMessage:
enc = &messageEncoder{w: conn.Writer}
default:
plog.Panicf("unhandled stream type %s", conn.t)
}
if cw.lg != nil {
cw.lg.Info(
"set message encoder",
zap.String("from", conn.localID.String()),
zap.String("to", conn.peerID.String()),
zap.String("stream-type", t.String()),
)
}
flusher = conn.Flusher //记录底层连接对应的Flusher
unflushed = 0 //重置未Flush的字节数
cw.status.activate() //peerStatus.active设置为true
cw.closer = conn.Closer //负责关闭底层的长连接
cw.working = true //标识当前streamWriter正在运行
cw.mu.Unlock()
if closed {
if cw.lg != nil {
cw.lg.Warn(
"closed TCP streaming connection with remote peer",
zap.String("stream-writer-type", t.String()),
zap.String("local-member-id", cw.localID.String()),
zap.String("remote-peer-id", cw.peerID.String()),
)
} else {
plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
}
}
if cw.lg != nil {
cw.lg.Warn(
"established TCP streaming connection with remote peer",
zap.String("stream-writer-type", t.String()),
zap.String("local-member-id", cw.localID.String()),
zap.String("remote-peer-id", cw.peerID.String()),
)
} else {
plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
}
//更新heartbeatc和msgc两个通道,自此之后,才能发送消息
heartbeatc, msgc = tickc.C, cw.msgc
case <-cw.stopc:
if cw.close() {
if cw.lg != nil {
cw.lg.Warn(
"closed TCP streaming connection with remote peer",
zap.String("stream-writer-type", t.String()),
zap.String("remote-peer-id", cw.peerID.String()),
)
} else {
plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
}
}
if cw.lg != nil {
cw.lg.Warn(
"stopped TCP streaming connection with remote peer",
zap.String("stream-writer-type", t.String()),
zap.String("remote-peer-id", cw.peerID.String()),
)
} else {
plog.Infof("stopped streaming with peer %s (writer)", cw.peerID)
}
close(cw.done)
return
}
}
}
run方法的主要作用如下:
1.当其他节点主动与当前节点创建连接(即Stream消息通道底层使用的网络连接)时,该连接实例会写入对应的peer.writer.connc通道,
在streamWriter.run()方法中会通过该通道获取该连接实例并进行绑定,之后才能开始后续的消息发送
2.定时发送心跳,该心跳消息并不是前面介绍的etcd-raft模块时提到的MsgHeartbeat消息,而是为了防止底层连接超时的消息,为了维护长连接。
3.发送消息给对端,即msgc中等待发送给对端节点的消息。
3.attach方法
func (cw *streamWriter) attach(conn *outgoingConn) bool {
select {
case cw.connc <- conn: //将conn实例写入streamWriter.connc通道中
return true
case <-cw.done:
return false
}
}
该方法主要用于接收对端的连接(该连接被封装成outgoingConn),并将outgoingConn实例并写入streamWriter.connc通道中。
#
二、streamReader实例
type streamReader struct {
lg *zap.Logger
peerID types.ID //对端节点的ID
typ streamType //关联的底层连接使用的协议版本信息
tr *Transport //关联的rafthttp.Transport实例
picker *urlPicker //用于获取对端节点的可用的URL
status *peerStatus //
recvc chan<- raftpb.Message //创建streamReader时会启动一个后台goroutine从peer.recvc通道中读取消息。从对端节点发送来的非MsgProp类型消息会写入到该通道中,然后由新建的goroutine读取出来,交给底层的etcd-raft模块进行处理
propc chan<- raftpb.Message //功能基本同上,只不过用于接收MsgProp类型的消息。
rl *rate.Limiter // alters the frequency of dial retrial attempts
errorc chan<- error
mu sync.Mutex
paused bool //是否暂停
closer io.Closer //负责关闭底层的长连接
ctx context.Context
cancel context.CancelFunc
done chan struct{}
}
主要功能是从Stream通道(底层连接)中读取消息
recvc 创建streamReader时会启动一个后台goroutine从peer.recvc通道中读取消息。从对端节点发送来的非MsgProp类型消息会写入到该通道中,然后由新建的goroutine读取出来,交给底层的etcd-raft模块进行处理。
propc 功能基本同上,只不过用于接收MsgProp类型的消息。
streamReader的初始化
func (cr *streamReader) start() {
cr.done = make(chan struct{})
if cr.errorc == nil {
cr.errorc = cr.tr.ErrorC
}
if cr.ctx == nil {
cr.ctx, cr.cancel = context.WithCancel(context.Background())
}
go cr.run()
}
初始化 done、errorc及ctx、cancel等相关字段,然后开启一个协程执行run方法
func (cr *streamReader) run() {
t := cr.typ //获取使用的消息版本
//省略日志相关
for {
rc, err := cr.dial(t) //向对端节点发送一个GET请求,然后获取并返回响应的ReadCloser,主要用于和对端建立连接
if err != nil {
if err != errUnsupportedStreamType {
cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())
}
} else {//如果未出现异常,则开始读取对端返回的消息,并将读取到的消息写入streamReader.recvc通道中
cr.status.activate()
err = cr.decodeLoop(rc, t) //轮询读取消息
//省略日志相关......
switch {
// all data is read out
case err == io.EOF:
// connection is closed by the remote
case transport.IsClosedConnError(err):
default:
cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
}
}
// Wait for a while before new dial attempt
err = cr.rl.Wait(cr.ctx)
if cr.ctx.Err() != nil {
if cr.lg != nil {
cr.lg.Info(
"stopped stream reader with remote peer",
zap.String("stream-reader-type", t.String()),
zap.String("local-member-id", cr.tr.ID.String()),
zap.String("remote-peer-id", cr.peerID.String()),
)
} else {
plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
}
close(cr.done)
return
}
if err != nil {
if cr.lg != nil {
cr.lg.Warn(
"rate limit on stream reader with remote peer",
zap.String("stream-reader-type", t.String()),
zap.String("local-member-id", cr.tr.ID.String()),
zap.String("remote-peer-id", cr.peerID.String()),
zap.Error(err),
)
} else {
plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err)
}
}
}
}
run方法时streamReader的核心,首先调用dial方法给对端发送一个GET请求,从而和对端建立长连接。对端的Header收到该连接后,会封装成outgoingConn结构,并发送给streamWriter,即上面介绍的,当streamWriter收到新建的连接后,会用心跳维持该连接。
连接建立成功之后会调用decodeLoop方法来轮询读取对端节点发送过来的消息,并将收到的字节流转成raftpb.Message消息类型的格式。如果消息是心跳类型则忽略,如果是raftpb.MsgProp类型的消息,则发送到propc通道中,会有专门的协程发送给本节点底层的raft模块。如果是其他消息(注意这些消息里没有快照类型的消息,因为快照类型的消息是专门的通道),则发送到recvc通道中,会有专门的协程将数据发送给底层的raft模块。
dial方法
func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
u := cr.picker.pick() //获取对端节点暴露的一个URL
uu := u
//根据使用的协议版本号和节点ID创建最终的URL地址
uu.Path = path.Join(t.endpoint(), cr.tr.ID.String())
//创建一个GET请求
req, err := http.NewRequest("GET", uu.String(), nil)
if err != nil {
cr.picker.unreachable(u)
return nil, fmt.Errorf("failed to make http request to %v (%v)", u, err)
}
//设置HTTP请求头
req.Header.Set("X-Server-From", cr.tr.ID.String())
req.Header.Set("X-Server-Version", version.Version)
req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
req.Header.Set("X-Etcd-Cluster-ID", cr.tr.ClusterID.String())
req.Header.Set("X-Raft-To", cr.peerID.String())
//通过设置Header,将当前节点暴露的URL也一起发送给对端节点
setPeerURLsHeader(req, cr.tr.URLs)
req = req.WithContext(cr.ctx)
cr.mu.Lock()
select {
case <-cr.ctx.Done():
cr.mu.Unlock()
return nil, fmt.Errorf("stream reader is stopped")
default:
}
cr.mu.Unlock()
//发送请求 ping-pone
resp, err := cr.tr.streamRt.RoundTrip(req)
if err != nil {
cr.picker.unreachable(u)
return nil, err
}
rv := serverVersion(resp.Header)
lv := semver.Must(semver.NewVersion(version.Version))
if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) {
httputil.GracefulClose(resp)
cr.picker.unreachable(u)
return nil, errUnsupportedStreamType
}
switch resp.StatusCode {
case http.StatusOK:
return resp.Body, nil
//省略其他错误处理
}
}
主要负责与对端节点建立连接,其中包含了多种异常情况的处理。
看到创建GET请求,并设置HTTP请求头信息,调用RoundTrip发送请求到对端。
当返回成功时,将resp.Body返回。
decodeLoop方法
func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
var dec decoder
cr.mu.Lock()
//根据使用的协议版本创建对应的decoder实例
switch t {
case streamTypeMsgAppV2:
dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
case streamTypeMessage:
dec = &messageDecoder{r: rc} //主要负责从连接中读取消息
default:
if cr.lg != nil {
cr.lg.Panic("unknown stream type", zap.String("type", t.String()))
} else {
plog.Panicf("unhandled stream type %s", t)
}
}
//检测是否已经暂停从该连接上读取消息
select {
case <-cr.ctx.Done():
cr.mu.Unlock()
if err := rc.Close(); err != nil {
return err
}
return io.EOF
default:
cr.closer = rc
}
cr.mu.Unlock()
// gofail: labelRaftDropHeartbeat:
for {
//从底层连接中读取数据,并反序列化成raftpb.Message实例
m, err := dec.decode()
if err != nil {
cr.mu.Lock()
cr.close()
cr.mu.Unlock()
return err
}
// gofail-go: var raftDropHeartbeat struct{}
// continue labelRaftDropHeartbeat
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
cr.mu.Lock()
paused := cr.paused
cr.mu.Unlock()
if paused {
continue
}
// 忽略连接层的心跳消息,注意与后面介绍的Raft的心跳消息进行区分
if isLinkHeartbeatMessage(&m) {
// raft is not interested in link layer
// heartbeat message, so we should ignore
// it.
continue
}
//根据消息类型获取要写入的通道
recvc := cr.recvc
if m.Type == raftpb.MsgProp {
recvc = cr.propc
}
select {
case recvc <- m: //将消息写入到对应的通道中,之后交给底层的Raft状态机进行处理
default:
//省略异常处理
}
}
}
该方法的主要作用就是 从底层的网络连接读取数据并进行反序列化,之前将得到的消息写入到recvc通道或propc通道中,等待Peer进行处理。
还没有评论,来说两句吧...