nsq TCPServer(一)

拼搏现实的明天。 2023-03-01 05:41 74阅读 0赞

TCPServer

  1. func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
  2. //listener是在nsqd New的时候创建的
  3. logf(lg.INFO, "TCP: listening on %s", listener.Addr())
  4. var wg sync.WaitGroup
  5. for {
  6. clientConn, err := listener.Accept()
  7. if err != nil {
  8. if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
  9. //判断是瞬时错误
  10. logf(lg.WARN, "temporary Accept() failure - %s", err)
  11. runtime.Gosched()//让当前goroutine让出CPU
  12. continue
  13. }
  14. // theres no direct way to detect this error because it is not exposed
  15. if !strings.Contains(err.Error(), "use of closed network connection") {
  16. //只抛出使用已关闭的连接错误
  17. return fmt.Errorf("listener.Accept() error - %s", err)
  18. }
  19. break
  20. }
  21. wg.Add(1)
  22. go func() {
  23. handler.Handle(clientConn)//每个连接都开一个goroutine去处理Handle
  24. wg.Done()
  25. }()
  26. }
  27. // wait to return until all handler goroutines complete
  28. wg.Wait()
  29. logf(lg.INFO, "TCP: closing %s", listener.Addr())
  30. return nil
  31. }

Handle

  1. func (p *tcpServer) Handle(clientConn net.Conn) {
  2. p.ctx.nsqd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr())
  3. // The client should initialize itself by sending a 4 byte sequence indicating
  4. // the version of the protocol that it intends to communicate, this will allow us
  5. // to gracefully upgrade the protocol away from text/line oriented to whatever...
  6. buf := make([]byte, 4)//处理连接时,先读出前4个字节数据,client 需要先写入4个字节的数据作为protocolMagic 一般是" V2"
  7. _, err := io.ReadFull(clientConn, buf)
  8. if err != nil {
  9. p.ctx.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
  10. clientConn.Close()
  11. return
  12. }
  13. protocolMagic := string(buf)
  14. p.ctx.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
  15. clientConn.RemoteAddr(), protocolMagic)
  16. var prot protocol.Protocol
  17. switch protocolMagic {
  18. case " V2":
  19. prot = &protocolV2{
  20. ctx: p.ctx}//prot 关联nsqd
  21. default:
  22. protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
  23. clientConn.Close()
  24. p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
  25. clientConn.RemoteAddr(), protocolMagic)
  26. return
  27. }
  28. p.conns.Store(clientConn.RemoteAddr(), clientConn)//conns是sync.Map, key是addr,value是conn 添加kv
  29. err = prot.IOLoop(clientConn)//主要的处理逻辑循环
  30. if err != nil {
  31. p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
  32. }
  33. p.conns.Delete(clientConn.RemoteAddr())//删除kv
  34. }

主要的IOLoop在二里面细说

发表评论

表情:
评论列表 (有 0 条评论,74人围观)

还没有评论,来说两句吧...

相关阅读

    相关 NSQ简介(

    NSQ组件 nsqd是一个守护进程,负责接收,排队,投递消息给客户端。 它可以独立运行,不过通常它是由 nsqlookupd 实例所在集群配置的(它在这

    相关 NSQ理解

    一、介绍    NSQ是Go语言编写的,开源的分布式消息队列中间件,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。NSQ 具有分布式和去中心化拓扑结构,该结构具