golang的http源码与twirp源码笔记
golang的http源码跟踪笔记
- http
- 启动httpHandle
- Server循环accept请求
- 处理connection
- twirp
- TwirpServer
- proto文件
- Twirp的ServeHttp实现
- serve请求
http
启动httpHandle
使用http包的ListenAndServe方法,需要提供一个Handler对象
func ListenAndServe(addr string, handler Handler) error {
server := &Server{Addr: addr, Handler: handler}
return server.ListenAndServe()
}
func (srv *Server) ListenAndServe() error {
if srv.shuttingDown() {
return ErrServerClosed
}
addr := srv.Addr
if addr == "" {
addr = ":http"
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
return srv.Serve(ln)
}
Handler对象中提供处理请求的方法,ServerHttp
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}
Server循环accept请求
在ListenAndServe方法中,使用Handler构建一个Server对象,最终调用其Server方法
func (srv *Server) Serve(l net.Listener) error {
.....
//包装listener
origListener := l
l = &onceCloseListener{Listener: l}
defer l.Close()
//启动失败直接结束
if err := srv.setupHTTP2_Serve(); err != nil {
return err
}
if !srv.trackListener(&l, true) {
return ErrServerClosed
}
defer srv.trackListener(&l, false)
//绑定上下文等信息
baseCtx := context.Background()
if srv.BaseContext != nil {
baseCtx = srv.BaseContext(origListener)
if baseCtx == nil {
panic("BaseContext returned a nil context")
}
}
.....
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
//循环accept请求
rw, err := l.Accept()
if err != nil {
select {
//如果服务器已经关闭,直接结束
case <-srv.getDoneChan():
return ErrServerClosed
default:
}
if ne, ok := err.(net.Error); ok && ne.Temporary() {
//如果accept失败,等待一个时间周期后continue
.....
return err
}
connCtx := ctx
if cc := srv.ConnContext; cc != nil {
connCtx = cc(connCtx, rw)
if connCtx == nil {
panic("ConnContext returned nil")
}
}
tempDelay = 0
//初始化一个连接对象
c := srv.newConn(rw)
//初始化连接状态
c.setState(c.rwc, StateNew) // before Serve can return
//开启一个新的goroutine去执行请求
go c.serve(connCtx)
}
}
处理connection
func (c *conn) serve(ctx context.Context) {
//包装缓存区,上下文信息等
.....
for {
//w是解析request数据得到的一个包装对象,包含请求的详细信息
w, err := c.readRequest(ctx)
if c.r.remain != c.server.initialReadLimitSize() {
//如果这个连接已经被读取过(长连接),更新状态
c.setState(c.rwc, StateActive)
}
if err != nil {
//如果报文解析出错
const errorHeaders = "\r\nContent-Type: text/plain; charset=utf-8\r\nConnection: close\r\n\r\n"
switch {
//根据错误类型,返回不同http status
case .....
default:
publicErr := "400 Bad Request"
if v, ok := err.(badRequestError); ok {
publicErr = publicErr + ": " + string(v)
}
fmt.Fprintf(c.rwc, "HTTP/1.1 "+publicErr+errorHeaders+publicErr)
return
}
}
//进一步解析内容
req := w.req
if req.expectsContinue() {
if req.ProtoAtLeast(1, 1) && req.ContentLength != 0 {
req.Body = &expectContinueReader{readCloser: req.Body, resp: w}
}
} else if req.Header.get("Expect") != "" {
w.sendExpectationFailed()
return
}
c.curReq.Store(w)
//如果body部分的内容还没读取完毕,等待本次io读取完毕后
//修改管道的io读取为后台读取方式(开启另一个gorountine区完成数据读取)
if requestBodyRemains(req.Body) {
registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
} else {
w.conn.r.startBackgroundRead()
}
//调用ServeHTTP方法处理请求
serverHandler{c.server}.ServeHTTP(w, w.req)
w.cancelCtx()
if c.hijacked() {
return
}
//完成请求
w.finishRequest()
if !w.shouldReuseConnection() {
//如果连接不允许重复使用,结束
if w.requestBodyLimitHit || w.closedRequestBodyEarly() {
c.closeWriteAndWait()
}
return
}
c.setState(c.rwc, StateIdle)
c.curReq.Store((*response)(nil))
//如果不是长连接,就结束了
if !w.conn.server.doKeepAlives() {
return
}
if d := c.server.idleTimeout(); d != 0 {
//如果设置了超时时间
c.rwc.SetReadDeadline(time.Now().Add(d))
if _, err := c.bufr.Peek(4); err != nil {
//如果连接超时了,结束
return
}
}
c.rwc.SetReadDeadline(time.Time{})
}
}
twirp
twirp是一个rpc框架,具体可以看看我的其他博客——go使用twirp开发rpc
TwirpServer
我们利用http启动twirp服务时传递的是TwirpServer
type TwirpServer interface {
http.Handler
ServiceDescriptor() ([]byte, int)
ProtocGenTwirpVersion() string
PathPrefix() string
}
proto文件
这是之前的proto文件,利用他我们编译生成了twirp的go代码
syntax = "proto3";
package main;
message A{}
message B {
string result = 1;
}
service Test{
rpc hello(A) returns(B);
}
Twirp的ServeHttp实现
可以查看twirp编译后生成代码提供的server实现方法ServeHttp
const TestPathPrefix = "/twirp/main.Test/"
func (s *testServer) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
ctx := req.Context()
ctx = ctxsetters.WithPackageName(ctx, "main")
ctx = ctxsetters.WithServiceName(ctx, "Test")
ctx = ctxsetters.WithResponseWriter(ctx, resp)
var err error
ctx, err = callRequestReceived(ctx, s.hooks)
if err != nil {
s.writeError(ctx, resp, err)
return
}
if req.Method != "POST" {
msg := fmt.Sprintf("unsupported method %q (only POST is allowed)", req.Method)
err = badRouteError(msg, req.Method, req.URL.Path)
s.writeError(ctx, resp, err)
return
}
switch req.URL.Path {
case "/twirp/main.Test/Hello":
s.serveHello(ctx, resp, req)
return
default:
msg := fmt.Sprintf("no handler for path %q", req.URL.Path)
err = badRouteError(msg, req.Method, req.URL.Path)
s.writeError(ctx, resp, err)
return
}
}
可以看到只有post请求被允许,并且请求的url格式为
/twirp/包名.服务名/方法名
serve请求
最终再进去serveXxx的逻辑中去处理请求,根据是json格式数据还是protobuf格式数据,执行不同方法
func (s *testServer) serveHello(ctx context.Context, resp http.ResponseWriter, req *http.Request) {
header := req.Header.Get("Content-Type")
i := strings.Index(header, ";")
if i == -1 {
i = len(header)
}
switch strings.TrimSpace(strings.ToLower(header[:i])) {
case "application/json":
s.serveHelloJSON(ctx, resp, req)
case "application/protobuf":
s.serveHelloProtobuf(ctx, resp, req)
default:
msg := fmt.Sprintf("unexpected Content-Type: %q", req.Header.Get("Content-Type"))
twerr := badRouteError(msg, req.Method, req.URL.Path)
s.writeError(ctx, resp, twerr)
}
}
比如进入protobuf
func (s *testServer) serveHelloProtobuf(ctx context.Context, resp http.ResponseWriter, req *http.Request) {
var err error
ctx = ctxsetters.WithMethodName(ctx, "Hello")
//如果在hook里面编写了路由,此处执行
ctx, err = callRequestRouted(ctx, s.hooks)
if err != nil {
s.writeError(ctx, resp, err)
return
}
//读取请求内容
buf, err := ioutil.ReadAll(req.Body)
if err != nil {
s.writeError(ctx, resp, wrapInternal(err, "failed to read request body"))
return
}
reqContent := new(A)
//Unmarshal请求内容
if err = proto.Unmarshal(buf, reqContent); err != nil {
s.writeError(ctx, resp, malformedRequestError("the protobuf request could not be decoded"))
return
}
// Call service method
//执行对应的服务方法
var respContent *B
func() {
defer ensurePanicResponses(ctx, resp, s.hooks)
//Test对象就是嵌套在server对象中的真正的服务对象
respContent, err = s.Test.Hello(ctx, reqContent)
}()
if err != nil {
s.writeError(ctx, resp, err)
return
}
if respContent == nil {
s.writeError(ctx, resp, twirp.InternalError("received a nil *B and nil error while calling Hello. nil responses are not supported"))
return
}
//执行完服务,执行返回前类型的hooks
ctx = callResponsePrepared(ctx, s.hooks)
//将要返回的数据Marshal为字节
respBytes, err := proto.Marshal(respContent)
if err != nil {
s.writeError(ctx, resp, wrapInternal(err, "failed to marshal proto response"))
return
}
//状态码处理等
ctx = ctxsetters.WithStatusCode(ctx, http.StatusOK)
resp.Header().Set("Content-Type", "application/protobuf")
resp.Header().Set("Content-Length", strconv.Itoa(len(respBytes)))
resp.WriteHeader(http.StatusOK)
//将数据写回请求客户端
if n, err := resp.Write(respBytes); err != nil {
msg := fmt.Sprintf("failed to write response, %d of %d bytes written: %s", n, len(respBytes), err.Error())
twerr := twirp.NewError(twirp.Unknown, msg)
callError(ctx, s.hooks, twerr)
}
//执行返回后类型的hooks
callResponseSent(ctx, s.hooks)
}
更多文章,请搜索公众号歪歪梯Club
还没有评论,来说两句吧...