etcd网络层(六)——模拟etcd网络层的stream通道维护长连接

- 日理万妓 2021-12-20 13:05 228阅读 0赞

代码如下:

  1. package main
  2. import (
  3. "io"
  4. "log"
  5. "net"
  6. "fmt"
  7. "sync"
  8. "time"
  9. "strings"
  10. "strconv"
  11. "net/http"
  12. "encoding/json"
  13. "encoding/binary"
  14. )
  15. func main() {
  16. peerURL1 := "http://127.0.0.1:8081"
  17. peerURL2 := "http://127.0.0.1:8082"
  18. id1 := getPid(peerURL1)
  19. id2 := getPid(peerURL2)
  20. //开启节点1
  21. tr1 := &Transport{}
  22. tr1.Start(int64(id1))
  23. go func() {
  24. err := http.ListenAndServe(":8081",tr1.Handler())
  25. log.Fatal(err)
  26. }()
  27. //开启节点2
  28. tr2 := &Transport{}
  29. tr2.Start(int64(id2))
  30. go func() {
  31. err := http.ListenAndServe(":8082",tr2.Handler())
  32. log.Fatal(err)
  33. }()
  34. time.Sleep(time.Second*3)
  35. //节点1添加节点2
  36. tr1.AddPeer(int64(id2),peerURL2)
  37. //节点2添加节点1
  38. tr2.AddPeer(int64(id1),peerURL1)
  39. go func() {
  40. ticker := time.NewTicker(time.Second*10)
  41. defer ticker.Stop()
  42. for {
  43. select {
  44. case <- ticker.C:
  45. peers := tr1.GetPeer()
  46. for i := range peers {
  47. peers[i].send(&Message{MsgType:msgTypeProp, MsgBody:"Hello,I am tr1"})
  48. }
  49. }
  50. }
  51. }()
  52. go func() {
  53. ticker := time.NewTicker(time.Second*10)
  54. defer ticker.Stop()
  55. for {
  56. select {
  57. case <- ticker.C:
  58. peers := tr2.GetPeer()
  59. for i := range peers {
  60. peers[i].send(&Message{MsgType:msgTypeProp, MsgBody:"Hello,I am tr2"})
  61. }
  62. }
  63. }
  64. }()
  65. time.Sleep(time.Minute*10)
  66. tr1.Stop()
  67. tr2.Stop()
  68. }
  69. type Transport struct {
  70. ClusterID int64
  71. ID int64 // local member ID 当前节点自己的ID
  72. streamRt http.RoundTripper
  73. mu sync.RWMutex
  74. peers map[int64]*peer // peers map
  75. }
  76. func (tr *Transport) Start(id int64) error {
  77. tr.ID = id
  78. tr.streamRt = &http.Transport{
  79. Dial: (&net.Dialer{
  80. Timeout: 10*time.Second,
  81. // value taken from http.DefaultTransport
  82. KeepAlive: 30 * time.Second,
  83. }).Dial,
  84. }
  85. tr.peers = make(map[int64]*peer)
  86. return nil
  87. }
  88. func (tr *Transport) GetPeer() (result []*peer) {
  89. tr.mu.RLock()
  90. defer tr.mu.RUnlock()
  91. for k := range tr.peers{
  92. result = append(result,tr.peers[k])
  93. }
  94. return
  95. }
  96. func (tr *Transport) AddPeer(id int64, peerURL string) {
  97. tr.mu.RLock()
  98. if _,ok := tr.peers[id]; ok {
  99. tr.mu.RUnlock()
  100. return
  101. }
  102. tr.mu.RUnlock()
  103. peer := startPeer(tr,peerURL,tr.ID,id)
  104. tr.mu.Lock()
  105. tr.peers[id] = peer
  106. tr.mu.Unlock()
  107. }
  108. func (tr *Transport) Handler() http.Handler {
  109. streamHandler := newStreamHandler(tr,tr.ID,tr.ClusterID)
  110. mux := http.NewServeMux()
  111. mux.Handle("/raft/stream"+"/", streamHandler)
  112. return mux
  113. }
  114. type streamHandler struct {
  115. tr *Transport //关联的rafthttp.Transport实例
  116. id int64 //当前节点ID
  117. cid int64 //当前集群ID
  118. }
  119. func newStreamHandler(tr *Transport, id, cid int64) http.Handler {
  120. return &streamHandler{
  121. tr:tr,
  122. id:id,
  123. cid:cid,
  124. }
  125. }
  126. func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  127. //请求参数校验,如Method是否是GET,检验集群ID
  128. if r.Method != "GET" {
  129. w.Header().Set("Allow", "GET")
  130. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  131. return
  132. }
  133. id := r.Header.Get("PeerID")
  134. if id == ""{
  135. w.Header().Set("PeerID", "Must")
  136. http.Error(w, "PeerID is not allow empty", http.StatusMethodNotAllowed)
  137. return
  138. }
  139. pid,_ := strconv.ParseUint(id,10,64)
  140. p,ok := h.tr.peers[int64(pid)]
  141. if !ok {
  142. w.WriteHeader(http.StatusNotFound)
  143. time.Sleep(time.Second*5)
  144. return
  145. }
  146. w.WriteHeader(http.StatusOK) //返回状态码 200
  147. w.(http.Flusher).Flush() //调用Flush()方法将响应数据发送到对端节点
  148. c := newCloseNotifier()
  149. conn := &outgoingConn{ //创建outgoingConn实例
  150. Writer: w,
  151. Flusher: w.(http.Flusher),
  152. localID: h.tr.ID,
  153. Closer:c,
  154. peerID: h.id,
  155. }
  156. p.attachOutgoingConn(conn) //建立连接,将outgoingConn实例与对应的streamWriter实例绑定
  157. <- c.closeNotify()
  158. }
  159. type closeNotifier struct {
  160. done chan struct{}
  161. }
  162. func newCloseNotifier() *closeNotifier {
  163. return &closeNotifier{
  164. done: make(chan struct{}),
  165. }
  166. }
  167. func (n *closeNotifier) Close() error {
  168. close(n.done)
  169. return nil
  170. }
  171. func (n *closeNotifier) closeNotify() <-chan struct{} { return n.done }
  172. func (tr *Transport) Stop() {
  173. tr.mu.RLock()
  174. defer tr.mu.RUnlock()
  175. for _,v := range tr.peers{
  176. v.stop()
  177. }
  178. tr.peers = nil
  179. }
  180. const (
  181. msgTypeHeartbeat = "01" //心跳
  182. msgTypeProp = "02" //prop消息
  183. )
  184. type Message struct {
  185. MsgType string
  186. MsgBody string
  187. }
  188. type peer struct {
  189. localID int64 //当前节点ID
  190. // id of the remote raft peer node
  191. id int64 //该peer实例对应的节点ID,对端ID
  192. writer *streamWriter //负责向Stream消息通道中写消息
  193. msgAppReader *streamReader //负责从Stream消息通道中读消息
  194. msgc chan *Message
  195. stopc chan struct{}
  196. }
  197. func startPeer(t *Transport, peerURL string, localID,peerID int64) *peer {
  198. pr := & peer{
  199. localID:localID,
  200. id:peerID,
  201. writer:newstreamWriter(localID,peerID),
  202. msgAppReader:newstreamReader(localID,peerID,peerURL,t),
  203. msgc : make(chan *Message,20),
  204. stopc : make(chan struct{}),
  205. }
  206. go func() {
  207. for msg := range pr.msgc {
  208. select {
  209. case pr.writer.msgc <- msg:
  210. default:
  211. log.Printf("write to writer error msg is %v",msg)
  212. }
  213. }
  214. }()
  215. return pr
  216. }
  217. func (pr *peer) stop() {
  218. close(pr.stopc)
  219. }
  220. func (pr *peer) send(msg *Message) bool{
  221. select {
  222. case pr.msgc <- msg:
  223. return true
  224. default:
  225. return false
  226. }
  227. }
  228. func (pr *peer) attachOutgoingConn(conn *outgoingConn) {
  229. select {
  230. case pr.writer.connc <- conn:
  231. default:
  232. log.Printf("attachOutgoingConn error")
  233. }
  234. }
  235. type streamWriter struct {
  236. localID int64 //本端的ID
  237. peerID int64 //对端节点的ID
  238. closer io.Closer //负责关闭底层的长连接
  239. mu sync.Mutex
  240. enc *messageEncoder
  241. msgc chan *Message //Peer会将待发送的消息写入到该通道,streamWriter则从该通道中读取消息并发送出去
  242. connc chan *outgoingConn //通过该通道获取当前streamWriter实例关联的底层网络连接, outgoingConn其实是对网络连接的一层封装,其中记录了当前连接使用的协议版本,以及用于关闭连接的Flusher和Closer等信息。
  243. stopc chan struct{}
  244. }
  245. func newstreamWriter(localID,peerID int64) *streamWriter{
  246. sw := &streamWriter{
  247. localID:localID,
  248. peerID:peerID,
  249. msgc:make(chan *Message,20),
  250. connc:make(chan *outgoingConn),
  251. stopc:make(chan struct{}),
  252. }
  253. go sw.run()
  254. return sw
  255. }
  256. func (sw *streamWriter) writec() (chan<- *Message) {
  257. sw.mu.Lock()
  258. defer sw.mu.Unlock()
  259. return sw.msgc
  260. }
  261. func (sw *streamWriter) run() {
  262. var (
  263. heartbeatC <-chan time.Time
  264. flusher http.Flusher //负责刷新底层连接,将数据真正发送出去
  265. msgc chan *Message
  266. )
  267. tickc := time.NewTicker(time.Second*7) //发送心跳的定时器
  268. defer tickc.Stop()
  269. for {
  270. select {
  271. case msg := <- msgc:
  272. err := sw.enc.encode(msg)
  273. if err != nil {
  274. log.Printf("send to peer peerID is %d fail,error is %v",sw.peerID,err)
  275. }else {
  276. flusher.Flush()
  277. log.Printf("send to peer peerID is %d success, MsgType is :%s,MsgBody is:%s",sw.peerID,msg.MsgType,msg.MsgBody)
  278. }
  279. case <-heartbeatC: //向对端发送心跳消息
  280. err := sw.enc.encode(&Message{
  281. MsgType: msgTypeHeartbeat,
  282. MsgBody: time.Now().Format("2006-01-02 15:04:05"),
  283. })
  284. if err != nil {
  285. log.Printf("send to peer heartbeat data fail peerID is %d,error is %v",sw.peerID,err)
  286. }else {
  287. flusher.Flush()
  288. log.Printf("send to peer heartbeat data success peerID is %d ",sw.peerID)
  289. }
  290. case conn := <- sw.connc:
  291. sw.enc = &messageEncoder{w:conn.Writer}
  292. flusher = conn.Flusher
  293. sw.closer = conn.Closer
  294. heartbeatC,msgc = tickc.C,sw.msgc
  295. case <- sw.stopc:
  296. log.Println("msgWriter stop!")
  297. sw.closer.Close()
  298. return
  299. }
  300. }
  301. }
  302. func (sw *streamWriter) stop() {
  303. close(sw.stopc)
  304. }
  305. type streamReader struct {
  306. localID int64
  307. peerID int64 //对端节点的ID
  308. tr *Transport //关联的rafthttp.Transport实例
  309. peerURL string //对端URL
  310. mu sync.Mutex
  311. closer io.Closer //负责关闭底层的长连接
  312. done chan struct{}
  313. }
  314. func newstreamReader(localID,peerID int64,peerURL string,tr *Transport) *streamReader {
  315. sr := &streamReader{
  316. localID:localID,
  317. peerID:peerID,
  318. peerURL:peerURL,
  319. tr :tr,
  320. done : make(chan struct{}),
  321. }
  322. go sr.run()
  323. return sr
  324. }
  325. func (sr *streamReader) run() {
  326. time.Sleep(time.Second*5)
  327. for {
  328. readColser,err := sr.dial()
  329. if err != nil {
  330. log.Printf("dial peer error,peerID is %d,err is %v",sr.peerID,err)
  331. time.Sleep(time.Second*10)
  332. continue
  333. }
  334. sr.closer = readColser
  335. err = sr.decodeLoop(readColser)
  336. if err != nil {
  337. log.Printf("decodeLoop error,peerID is %d, error is %v",sr.peerID,err)
  338. }
  339. sr.closer.Close()
  340. }
  341. }
  342. func (sr *streamReader) dial() (io.ReadCloser, error) {
  343. req, err := http.NewRequest("GET", sr.peerURL+"/raft/stream/dial", nil)
  344. if err != nil {
  345. return nil,err
  346. }
  347. req.Header.Set("PeerID",fmt.Sprintf("%d",sr.localID))
  348. resp,err := sr.tr.streamRt.RoundTrip(req)
  349. if err != nil {
  350. return nil,err
  351. }
  352. return resp.Body,nil
  353. }
  354. func (sr *streamReader) decodeLoop(rc io.ReadCloser) error {
  355. dec := &messageDecoder{rc}
  356. for {
  357. msg,err := dec.decode()
  358. if err != nil {
  359. log.Printf("read decodeLoop error,peerID is %d,err is %v",sr.peerID,err)
  360. continue
  361. }
  362. log.Printf("read from peer MsgType is %s,MsgBody is %s",msg.MsgType,msg.MsgBody)
  363. }
  364. }
  365. type outgoingConn struct {
  366. io.Writer
  367. http.Flusher
  368. io.Closer
  369. localID int64
  370. peerID int64
  371. }
  372. type messageEncoder struct {
  373. w io.Writer
  374. }
  375. func (m *messageEncoder) encode(msg *Message) error {
  376. byts,err := json.Marshal(msg)
  377. if err != nil {
  378. return err
  379. }
  380. dataLen := make([]byte,8)
  381. binary.BigEndian.PutUint64(dataLen,uint64(len(byts)))
  382. sendData := append(dataLen,byts...)
  383. _,err = m.w.Write(sendData)
  384. if err != nil {
  385. return err
  386. }
  387. return nil
  388. }
  389. type messageDecoder struct {
  390. r io.Reader
  391. }
  392. func (dec *messageDecoder) decode() (*Message,error) {
  393. var m Message
  394. var l uint64
  395. if err := binary.Read(dec.r, binary.BigEndian, &l); err != nil {
  396. return nil, err
  397. }
  398. buf := make([]byte, int(l))
  399. if _, err := io.ReadFull(dec.r, buf); err != nil {
  400. return &m, err
  401. }
  402. err := json.Unmarshal(buf,&m)
  403. if err != nil {
  404. return nil,err
  405. }
  406. return &m, nil
  407. }
  408. func getPid(purl string) int64 {
  409. index := strings.LastIndex(purl,":")
  410. if index > 0 {
  411. id,err := strconv.ParseInt(purl[index+1:],10,64)
  412. if err != nil {
  413. println(err)
  414. }
  415. return id
  416. }
  417. return 0
  418. }

发表评论

表情:
评论列表 (有 0 条评论,228人围观)

还没有评论,来说两句吧...

相关阅读

    相关 网络网络

    网络层:IP 位于数据链路层之上的是网络层,它关注的是如何将包从源主机发送到目标主机。这一层执行了很多任务,比如: 将数据分解成足够小的的片段以便数据链路层进行传