golang的http源码与twirp源码笔记

矫情吗;* 2023-02-11 11:26 81阅读 0赞

golang的http源码跟踪笔记

  • http
    • 启动httpHandle
    • Server循环accept请求
    • 处理connection
  • twirp
    • TwirpServer
    • proto文件
    • Twirp的ServeHttp实现
    • serve请求

http

启动httpHandle

使用http包的ListenAndServe方法,需要提供一个Handler对象

  1. func ListenAndServe(addr string, handler Handler) error {
  2. server := &Server{Addr: addr, Handler: handler}
  3. return server.ListenAndServe()
  4. }
  5. func (srv *Server) ListenAndServe() error {
  6. if srv.shuttingDown() {
  7. return ErrServerClosed
  8. }
  9. addr := srv.Addr
  10. if addr == "" {
  11. addr = ":http"
  12. }
  13. ln, err := net.Listen("tcp", addr)
  14. if err != nil {
  15. return err
  16. }
  17. return srv.Serve(ln)
  18. }

Handler对象中提供处理请求的方法,ServerHttp

  1. type Handler interface {
  2. ServeHTTP(ResponseWriter, *Request)
  3. }

Server循环accept请求

在ListenAndServe方法中,使用Handler构建一个Server对象,最终调用其Server方法

  1. func (srv *Server) Serve(l net.Listener) error {
  2. .....
  3. //包装listener
  4. origListener := l
  5. l = &onceCloseListener{Listener: l}
  6. defer l.Close()
  7. //启动失败直接结束
  8. if err := srv.setupHTTP2_Serve(); err != nil {
  9. return err
  10. }
  11. if !srv.trackListener(&l, true) {
  12. return ErrServerClosed
  13. }
  14. defer srv.trackListener(&l, false)
  15. //绑定上下文等信息
  16. baseCtx := context.Background()
  17. if srv.BaseContext != nil {
  18. baseCtx = srv.BaseContext(origListener)
  19. if baseCtx == nil {
  20. panic("BaseContext returned a nil context")
  21. }
  22. }
  23. .....
  24. ctx := context.WithValue(baseCtx, ServerContextKey, srv)
  25. for {
  26. //循环accept请求
  27. rw, err := l.Accept()
  28. if err != nil {
  29. select {
  30. //如果服务器已经关闭,直接结束
  31. case <-srv.getDoneChan():
  32. return ErrServerClosed
  33. default:
  34. }
  35. if ne, ok := err.(net.Error); ok && ne.Temporary() {
  36. //如果accept失败,等待一个时间周期后continue
  37. .....
  38. return err
  39. }
  40. connCtx := ctx
  41. if cc := srv.ConnContext; cc != nil {
  42. connCtx = cc(connCtx, rw)
  43. if connCtx == nil {
  44. panic("ConnContext returned nil")
  45. }
  46. }
  47. tempDelay = 0
  48. //初始化一个连接对象
  49. c := srv.newConn(rw)
  50. //初始化连接状态
  51. c.setState(c.rwc, StateNew) // before Serve can return
  52. //开启一个新的goroutine去执行请求
  53. go c.serve(connCtx)
  54. }
  55. }

处理connection

  1. func (c *conn) serve(ctx context.Context) {
  2. //包装缓存区,上下文信息等
  3. .....
  4. for {
  5. //w是解析request数据得到的一个包装对象,包含请求的详细信息
  6. w, err := c.readRequest(ctx)
  7. if c.r.remain != c.server.initialReadLimitSize() {
  8. //如果这个连接已经被读取过(长连接),更新状态
  9. c.setState(c.rwc, StateActive)
  10. }
  11. if err != nil {
  12. //如果报文解析出错
  13. const errorHeaders = "\r\nContent-Type: text/plain; charset=utf-8\r\nConnection: close\r\n\r\n"
  14. switch {
  15. //根据错误类型,返回不同http status
  16. case .....
  17. default:
  18. publicErr := "400 Bad Request"
  19. if v, ok := err.(badRequestError); ok {
  20. publicErr = publicErr + ": " + string(v)
  21. }
  22. fmt.Fprintf(c.rwc, "HTTP/1.1 "+publicErr+errorHeaders+publicErr)
  23. return
  24. }
  25. }
  26. //进一步解析内容
  27. req := w.req
  28. if req.expectsContinue() {
  29. if req.ProtoAtLeast(1, 1) && req.ContentLength != 0 {
  30. req.Body = &expectContinueReader{readCloser: req.Body, resp: w}
  31. }
  32. } else if req.Header.get("Expect") != "" {
  33. w.sendExpectationFailed()
  34. return
  35. }
  36. c.curReq.Store(w)
  37. //如果body部分的内容还没读取完毕,等待本次io读取完毕后
  38. //修改管道的io读取为后台读取方式(开启另一个gorountine区完成数据读取)
  39. if requestBodyRemains(req.Body) {
  40. registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
  41. } else {
  42. w.conn.r.startBackgroundRead()
  43. }
  44. //调用ServeHTTP方法处理请求
  45. serverHandler{c.server}.ServeHTTP(w, w.req)
  46. w.cancelCtx()
  47. if c.hijacked() {
  48. return
  49. }
  50. //完成请求
  51. w.finishRequest()
  52. if !w.shouldReuseConnection() {
  53. //如果连接不允许重复使用,结束
  54. if w.requestBodyLimitHit || w.closedRequestBodyEarly() {
  55. c.closeWriteAndWait()
  56. }
  57. return
  58. }
  59. c.setState(c.rwc, StateIdle)
  60. c.curReq.Store((*response)(nil))
  61. //如果不是长连接,就结束了
  62. if !w.conn.server.doKeepAlives() {
  63. return
  64. }
  65. if d := c.server.idleTimeout(); d != 0 {
  66. //如果设置了超时时间
  67. c.rwc.SetReadDeadline(time.Now().Add(d))
  68. if _, err := c.bufr.Peek(4); err != nil {
  69. //如果连接超时了,结束
  70. return
  71. }
  72. }
  73. c.rwc.SetReadDeadline(time.Time{})
  74. }
  75. }

twirp

twirp是一个rpc框架,具体可以看看我的其他博客——go使用twirp开发rpc

TwirpServer

我们利用http启动twirp服务时传递的是TwirpServer

  1. type TwirpServer interface {
  2. http.Handler
  3. ServiceDescriptor() ([]byte, int)
  4. ProtocGenTwirpVersion() string
  5. PathPrefix() string
  6. }

proto文件

这是之前的proto文件,利用他我们编译生成了twirp的go代码

  1. syntax = "proto3";
  2. package main;
  3. message A{}
  4. message B {
  5. string result = 1;
  6. }
  7. service Test{
  8. rpc hello(A) returns(B);
  9. }

Twirp的ServeHttp实现

可以查看twirp编译后生成代码提供的server实现方法ServeHttp

  1. const TestPathPrefix = "/twirp/main.Test/"
  2. func (s *testServer) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
  3. ctx := req.Context()
  4. ctx = ctxsetters.WithPackageName(ctx, "main")
  5. ctx = ctxsetters.WithServiceName(ctx, "Test")
  6. ctx = ctxsetters.WithResponseWriter(ctx, resp)
  7. var err error
  8. ctx, err = callRequestReceived(ctx, s.hooks)
  9. if err != nil {
  10. s.writeError(ctx, resp, err)
  11. return
  12. }
  13. if req.Method != "POST" {
  14. msg := fmt.Sprintf("unsupported method %q (only POST is allowed)", req.Method)
  15. err = badRouteError(msg, req.Method, req.URL.Path)
  16. s.writeError(ctx, resp, err)
  17. return
  18. }
  19. switch req.URL.Path {
  20. case "/twirp/main.Test/Hello":
  21. s.serveHello(ctx, resp, req)
  22. return
  23. default:
  24. msg := fmt.Sprintf("no handler for path %q", req.URL.Path)
  25. err = badRouteError(msg, req.Method, req.URL.Path)
  26. s.writeError(ctx, resp, err)
  27. return
  28. }
  29. }

可以看到只有post请求被允许,并且请求的url格式为

  1. /twirp/包名.服务名/方法名

serve请求

最终再进去serveXxx的逻辑中去处理请求,根据是json格式数据还是protobuf格式数据,执行不同方法

  1. func (s *testServer) serveHello(ctx context.Context, resp http.ResponseWriter, req *http.Request) {
  2. header := req.Header.Get("Content-Type")
  3. i := strings.Index(header, ";")
  4. if i == -1 {
  5. i = len(header)
  6. }
  7. switch strings.TrimSpace(strings.ToLower(header[:i])) {
  8. case "application/json":
  9. s.serveHelloJSON(ctx, resp, req)
  10. case "application/protobuf":
  11. s.serveHelloProtobuf(ctx, resp, req)
  12. default:
  13. msg := fmt.Sprintf("unexpected Content-Type: %q", req.Header.Get("Content-Type"))
  14. twerr := badRouteError(msg, req.Method, req.URL.Path)
  15. s.writeError(ctx, resp, twerr)
  16. }
  17. }

比如进入protobuf

  1. func (s *testServer) serveHelloProtobuf(ctx context.Context, resp http.ResponseWriter, req *http.Request) {
  2. var err error
  3. ctx = ctxsetters.WithMethodName(ctx, "Hello")
  4. //如果在hook里面编写了路由,此处执行
  5. ctx, err = callRequestRouted(ctx, s.hooks)
  6. if err != nil {
  7. s.writeError(ctx, resp, err)
  8. return
  9. }
  10. //读取请求内容
  11. buf, err := ioutil.ReadAll(req.Body)
  12. if err != nil {
  13. s.writeError(ctx, resp, wrapInternal(err, "failed to read request body"))
  14. return
  15. }
  16. reqContent := new(A)
  17. //Unmarshal请求内容
  18. if err = proto.Unmarshal(buf, reqContent); err != nil {
  19. s.writeError(ctx, resp, malformedRequestError("the protobuf request could not be decoded"))
  20. return
  21. }
  22. // Call service method
  23. //执行对应的服务方法
  24. var respContent *B
  25. func() {
  26. defer ensurePanicResponses(ctx, resp, s.hooks)
  27. //Test对象就是嵌套在server对象中的真正的服务对象
  28. respContent, err = s.Test.Hello(ctx, reqContent)
  29. }()
  30. if err != nil {
  31. s.writeError(ctx, resp, err)
  32. return
  33. }
  34. if respContent == nil {
  35. s.writeError(ctx, resp, twirp.InternalError("received a nil *B and nil error while calling Hello. nil responses are not supported"))
  36. return
  37. }
  38. //执行完服务,执行返回前类型的hooks
  39. ctx = callResponsePrepared(ctx, s.hooks)
  40. //将要返回的数据Marshal为字节
  41. respBytes, err := proto.Marshal(respContent)
  42. if err != nil {
  43. s.writeError(ctx, resp, wrapInternal(err, "failed to marshal proto response"))
  44. return
  45. }
  46. //状态码处理等
  47. ctx = ctxsetters.WithStatusCode(ctx, http.StatusOK)
  48. resp.Header().Set("Content-Type", "application/protobuf")
  49. resp.Header().Set("Content-Length", strconv.Itoa(len(respBytes)))
  50. resp.WriteHeader(http.StatusOK)
  51. //将数据写回请求客户端
  52. if n, err := resp.Write(respBytes); err != nil {
  53. msg := fmt.Sprintf("failed to write response, %d of %d bytes written: %s", n, len(respBytes), err.Error())
  54. twerr := twirp.NewError(twirp.Unknown, msg)
  55. callError(ctx, s.hooks, twerr)
  56. }
  57. //执行返回后类型的hooks
  58. callResponseSent(ctx, s.hooks)
  59. }

更多文章,请搜索公众号歪歪梯Club
更多资料,请搜索公众号编程宝可梦

发表评论

表情:
评论列表 (有 0 条评论,81人围观)

还没有评论,来说两句吧...

相关阅读