nsq TCPServer(一)
TCPServer
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
//listener是在nsqd New的时候创建的
logf(lg.INFO, "TCP: listening on %s", listener.Addr())
var wg sync.WaitGroup
for {
clientConn, err := listener.Accept()
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
//判断是瞬时错误
logf(lg.WARN, "temporary Accept() failure - %s", err)
runtime.Gosched()//让当前goroutine让出CPU
continue
}
// theres no direct way to detect this error because it is not exposed
if !strings.Contains(err.Error(), "use of closed network connection") {
//只抛出使用已关闭的连接错误
return fmt.Errorf("listener.Accept() error - %s", err)
}
break
}
wg.Add(1)
go func() {
handler.Handle(clientConn)//每个连接都开一个goroutine去处理Handle
wg.Done()
}()
}
// wait to return until all handler goroutines complete
wg.Wait()
logf(lg.INFO, "TCP: closing %s", listener.Addr())
return nil
}
Handle
func (p *tcpServer) Handle(clientConn net.Conn) {
p.ctx.nsqd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr())
// The client should initialize itself by sending a 4 byte sequence indicating
// the version of the protocol that it intends to communicate, this will allow us
// to gracefully upgrade the protocol away from text/line oriented to whatever...
buf := make([]byte, 4)//处理连接时,先读出前4个字节数据,client 需要先写入4个字节的数据作为protocolMagic 一般是" V2"
_, err := io.ReadFull(clientConn, buf)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
clientConn.Close()
return
}
protocolMagic := string(buf)
p.ctx.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
var prot protocol.Protocol
switch protocolMagic {
case " V2":
prot = &protocolV2{
ctx: p.ctx}//prot 关联nsqd
default:
protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
clientConn.Close()
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
return
}
p.conns.Store(clientConn.RemoteAddr(), clientConn)//conns是sync.Map, key是addr,value是conn 添加kv
err = prot.IOLoop(clientConn)//主要的处理逻辑循环
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
}
p.conns.Delete(clientConn.RemoteAddr())//删除kv
}
主要的IOLoop在二里面细说
还没有评论,来说两句吧...