Golang http之transport源码详解

£神魔★判官ぃ 2022-02-19 15:07 836阅读 0赞

使用golang net/http库发送http请求,最后都是调用 transport的 RoundTrip方法中。

  1. type RoundTripper interface {
  2. RoundTrip(*Request) (*Response, error)
  3. }

RoundTrip代表一个http事务,给一个请求返回一个响应。RoundTripper必须是并发安全的。RoundTripper接口的实现Transport结构体在源码包net/http/transport.go 中。

  1. type Transport struct {
  2. idleMu sync.Mutex
  3. wantIdle bool // user has requested to close all idle conns 用户是否已关闭所有的空闲连接
  4. idleConn map[connectMethodKey][]*persistConn // most recently used at end,保存从connectMethodKey(代表着不同的协议,不同的host,也就是不同的请求)到persistConn的映射
  5. /*
  6. idleConnCh 用来在并发http请求的时候在多个 goroutine 里面相互发送持久连接,也就是说,
  7. 这些持久连接是可以重复利用的, 你的http请求用某个persistConn用完了,
  8. 通过这个channel发送给其他http请求使用这个persistConn
  9. */
  10. idleConnCh map[connectMethodKey]chan *persistConn
  11. idleLRU connLRU
  12. reqMu sync.Mutex
  13. reqCanceler map[*Request]func(error) //请求取消器
  14. altMu sync.Mutex // guards changing altProto only
  15. altProto atomic.Value // of nil or map[string]RoundTripper, key is URI scheme 为空或者map[string]RoundTripper,key为URI 的scheme,用于自定义的协议及对应的处理请求的RoundTripper
  16. // Proxy specifies a function to return a proxy for a given
  17. // Request. If the function returns a non-nil error, the
  18. // request is aborted with the provided error.
  19. //
  20. // The proxy type is determined by the URL scheme. "http"
  21. // and "socks5" are supported. If the scheme is empty,
  22. // "http" is assumed.
  23. //
  24. // If Proxy is nil or returns a nil *URL, no proxy is used.
  25. Proxy func(*Request) (*url.URL, error) //根据给定的Request返回一个代理,如果返回一个不为空的error,请求会终止
  26. // DialContext specifies the dial function for creating unencrypted TCP connections.
  27. // If DialContext is nil (and the deprecated Dial below is also nil),
  28. // then the transport dials using package net.
  29. /*
  30. DialContext用于指定创建未加密的TCP连接的dial功能,如果该函数为空,则使用net包下的dial函数
  31. */
  32. DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
  33. // Dial specifies the dial function for creating unencrypted TCP connections.
  34. //
  35. // Deprecated: Use DialContext instead, which allows the transport
  36. // to cancel dials as soon as they are no longer needed.
  37. // If both are set, DialContext takes priority.
  38. /*
  39. Dial获取一个tcp连接,也就是net.Conn结构,然后就可以写入request,从而获取到response
  40. DialContext比Dial函数的优先级高
  41. */
  42. Dial func(network, addr string) (net.Conn, error)
  43. // DialTLS specifies an optional dial function for creating
  44. // TLS connections for non-proxied HTTPS requests.
  45. //
  46. // If DialTLS is nil, Dial and TLSClientConfig are used.
  47. //
  48. // If DialTLS is set, the Dial hook is not used for HTTPS
  49. // requests and the TLSClientConfig and TLSHandshakeTimeout
  50. // are ignored. The returned net.Conn is assumed to already be
  51. // past the TLS handshake.
  52. /*
  53. DialTLS 为创建非代理的HTTPS请求的TLS连接提供一个可选的dial功能
  54. 如果DialTLS为空,则使用Dial和TLSClientConfig
  55. 如果设置了DialTLS,则HTTPS的请求不使用Dial的钩子,并且TLSClientConfig 和 TLSHandshakeTimeout会被忽略
  56. 返回的net.Conn假设已经通过了TLS握手
  57. */
  58. DialTLS func(network, addr string) (net.Conn, error)
  59. // TLSClientConfig specifies the TLS configuration to use with
  60. // tls.Client.
  61. // If nil, the default configuration is used.
  62. // If non-nil, HTTP/2 support may not be enabled by default.
  63. /*
  64. TLSClientConfig指定tls.Client使用的TLS配置信息
  65. 如果为空,则使用默认配置
  66. 如果不为空,默认情况下未启动HTTP/2支持
  67. */
  68. TLSClientConfig *tls.Config
  69. // TLSHandshakeTimeout specifies the maximum amount of time waiting to
  70. // wait for a TLS handshake. Zero means no timeout.
  71. /*
  72. 指定TLS握手的超时时间
  73. */
  74. TLSHandshakeTimeout time.Duration
  75. // DisableKeepAlives, if true, prevents re-use of TCP connections
  76. // between different HTTP requests.
  77. DisableKeepAlives bool //如果为true,则阻止在不同http请求之间重用TCP连接
  78. // DisableCompression, if true, prevents the Transport from
  79. // requesting compression with an "Accept-Encoding: gzip"
  80. // request header when the Request contains no existing
  81. // Accept-Encoding value. If the Transport requests gzip on
  82. // its own and gets a gzipped response, it's transparently
  83. // decoded in the Response.Body. However, if the user
  84. // explicitly requested gzip it is not automatically
  85. // uncompressed.
  86. DisableCompression bool //如果为true,则进制传输使用 Accept-Encoding: gzip
  87. // MaxIdleConns controls the maximum number of idle (keep-alive)
  88. // connections across all hosts. Zero means no limit.
  89. MaxIdleConns int //指定最大的空闲连接数
  90. // MaxIdleConnsPerHost, if non-zero, controls the maximum idle
  91. // (keep-alive) connections to keep per-host. If zero,
  92. // DefaultMaxIdleConnsPerHost is used.
  93. MaxIdleConnsPerHost int //用于控制某一个主机的连接的最大空闲数
  94. // IdleConnTimeout is the maximum amount of time an idle
  95. // (keep-alive) connection will remain idle before closing
  96. // itself.
  97. // Zero means no limit.
  98. IdleConnTimeout time.Duration //指定空闲连接保持的最长时间,如果为0,则不受限制
  99. // ResponseHeaderTimeout, if non-zero, specifies the amount of
  100. // time to wait for a server's response headers after fully
  101. // writing the request (including its body, if any). This
  102. // time does not include the time to read the response body.
  103. /*
  104. ResponseHeaderTimeout,如果非零,则指定在完全写入请求(包括其正文,如果有)之后等待服务器响应头的最长时间。
  105. 此时间不包括读响应体的时间。
  106. */
  107. ResponseHeaderTimeout time.Duration
  108. // ExpectContinueTimeout, if non-zero, specifies the amount of
  109. // time to wait for a server's first response headers after fully
  110. // writing the request headers if the request has an
  111. // "Expect: 100-continue" header. Zero means no timeout and
  112. // causes the body to be sent immediately, without
  113. // waiting for the server to approve.
  114. // This time does not include the time to send the request header.
  115. /*
  116. 如果请求头是"Expect:100-continue",ExpectContinueTimeout 如果不为0,它表示等待服务器第一次响应头的最大时间
  117. 零表示没有超时并导致正文立即发送,无需等待服务器批准。
  118. 此时间不包括发送请求标头的时间。
  119. */
  120. ExpectContinueTimeout time.Duration
  121. // TLSNextProto specifies how the Transport switches to an
  122. // alternate protocol (such as HTTP/2) after a TLS NPN/ALPN
  123. // protocol negotiation. If Transport dials an TLS connection
  124. // with a non-empty protocol name and TLSNextProto contains a
  125. // map entry for that key (such as "h2"), then the func is
  126. // called with the request's authority (such as "example.com"
  127. // or "example.com:1234") and the TLS connection. The function
  128. // must return a RoundTripper that then handles the request.
  129. // If TLSNextProto is not nil, HTTP/2 support is not enabled
  130. // automatically.
  131. /*
  132. TLSNextProto指定在TLS NPN / ALPN协议协商之后传输如何切换到备用协议(例如HTTP / 2)。
  133. 如果传输使用非空协议名称拨打TLS连接并且TLSNextProto包含该密钥的映射条目(例如“h2”),则使用请求的权限调用func(例如“example.com”或“example” .com:1234“)和TLS连接。
  134. 该函数必须返回一个RoundTripper,然后处理该请求。 如果TLSNextProto不是nil,则不会自动启用HTTP / 2支持。
  135. */
  136. TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
  137. // ProxyConnectHeader optionally specifies headers to send to
  138. // proxies during CONNECT requests.
  139. /*
  140. ProxyConnectHeader可选地指定在CONNECT请求期间发送给代理的标头。
  141. */
  142. ProxyConnectHeader Header
  143. // MaxResponseHeaderBytes specifies a limit on how many
  144. // response bytes are allowed in the server's response
  145. // header.
  146. //
  147. // Zero means to use a default limit.
  148. /*
  149. 指定服务器返回的响应头的最大字节数
  150. 为0则使用默认的限制
  151. */
  152. MaxResponseHeaderBytes int64
  153. // nextProtoOnce guards initialization of TLSNextProto and
  154. // h2transport (via onceSetNextProtoDefaults)
  155. //nextProtoOnce保护 TLSNextProto和 h2transport 的初始化
  156. nextProtoOnce sync.Once
  157. h2transport *http2Transport // non-nil if http2 wired up,如果是http2连通,则不为nil
  158. // TODO: tunable on max per-host TCP dials in flight (Issue 13957)
  159. }

以上是Transport结构体及每个字段的主要功能,从中可以看到

idleConn 可以理解成 空闲的连接池,用于存放空闲的连接,从而使连接可以复用。

idleConnCh用来在并发http请求的时候在多个goroutine里相互发送persistConn,可以使persistConn持久化连接得到重复使用。

RoundTrip方法

Transport的核心方法时RoundTrip方法,该方法的工作流程基本如下:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3NraDIwMTVqYXZh_size_16_color_FFFFFF_t_70

RoundTrip方法源码如下:

  1. //RoundTrip实现了RoundTripper接口
  2. func (t *Transport) RoundTrip(req *Request) (*Response, error) {
  3. //初始化TLSNextProto http2使用
  4. t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
  5. //获取请求的上下文
  6. ctx := req.Context()
  7. trace := httptrace.ContextClientTrace(ctx)
  8. //错误处理
  9. if req.URL == nil {
  10. req.closeBody()
  11. return nil, errors.New("http: nil Request.URL")
  12. }
  13. if req.Header == nil {
  14. req.closeBody()
  15. return nil, errors.New("http: nil Request.Header")
  16. }
  17. scheme := req.URL.Scheme
  18. isHTTP := scheme == "http" || scheme == "https"
  19. //如果是http或https请求,对Header中的数据进行校验
  20. if isHTTP {
  21. for k, vv := range req.Header {
  22. if !httplex.ValidHeaderFieldName(k) {
  23. return nil, fmt.Errorf("net/http: invalid header field name %q", k)
  24. }
  25. for _, v := range vv {
  26. if !httplex.ValidHeaderFieldValue(v) {
  27. return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k)
  28. }
  29. }
  30. }
  31. }
  32. //如果该scheme有自定义的RoundTrip,则使用自定义的RoundTrip处理request,并返回response
  33. altProto, _ := t.altProto.Load().(map[string]RoundTripper)
  34. if altRT := altProto[scheme]; altRT != nil {
  35. if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
  36. return resp, err
  37. }
  38. }
  39. //如果不是http请求,则关闭并退出
  40. if !isHTTP {
  41. req.closeBody()
  42. return nil, &badStringError{"unsupported protocol scheme", scheme}
  43. }
  44. //对请求的Method进行校验
  45. if req.Method != "" && !validMethod(req.Method) {
  46. return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
  47. }
  48. //请求的host为空,则返回
  49. if req.URL.Host == "" {
  50. req.closeBody()
  51. return nil, errors.New("http: no Host in request URL")
  52. }
  53. for {
  54. // treq gets modified by roundTrip, so we need to recreate for each retry.
  55. //初始化transportRequest,transportRequest是request的包装器
  56. treq := &transportRequest{Request: req, trace: trace}
  57. //根据用户的请求信息获取connectMethod cm
  58. cm, err := t.connectMethodForRequest(treq)
  59. if err != nil {
  60. req.closeBody()
  61. return nil, err
  62. }
  63. // Get the cached or newly-created connection to either the
  64. // host (for http or https), the http proxy, or the http proxy
  65. // pre-CONNECTed to https server. In any case, we'll be ready
  66. // to send it requests.
  67. //从缓存中获取一个连接,或者新建一个连接
  68. pconn, err := t.getConn(treq, cm)
  69. if err != nil {
  70. t.setReqCanceler(req, nil)
  71. req.closeBody()
  72. return nil, err
  73. }
  74. var resp *Response
  75. if pconn.alt != nil {
  76. // HTTP/2 path.
  77. t.setReqCanceler(req, nil) // not cancelable with CancelRequest
  78. resp, err = pconn.alt.RoundTrip(req)
  79. } else {
  80. resp, err = pconn.roundTrip(treq)
  81. }
  82. if err == nil {
  83. return resp, nil
  84. }
  85. if !pconn.shouldRetryRequest(req, err) {
  86. // Issue 16465: return underlying net.Conn.Read error from peek,
  87. // as we've historically done.
  88. if e, ok := err.(transportReadFromServerError); ok {
  89. err = e.err
  90. }
  91. return nil, err
  92. }
  93. testHookRoundTripRetried()
  94. // Rewind the body if we're able to. (HTTP/2 does this itself so we only
  95. // need to do it for HTTP/1.1 connections.)
  96. if req.GetBody != nil && pconn.alt == nil {
  97. newReq := *req
  98. var err error
  99. newReq.Body, err = req.GetBody()
  100. if err != nil {
  101. return nil, err
  102. }
  103. req = &newReq
  104. }
  105. }
  106. }

该方法首先会进行一些校验,如果客户端请求的scheme有自定的RoundTrip,则使用自定义的RoundTrip处理request,并返回response。

该方法主要的请求处理逻辑在for循环里,首先会根据请求从空闲的连接池中获取一个连接或新建一个连接pconn。忽略HTTP/2请求的处理,其他常用的HTTP请求会调用roundTrip方法将客户端发送给服务器,并等待返回response。

getConn获取或新建连接

  1. func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) {
  2. req := treq.Request
  3. trace := treq.trace
  4. ctx := req.Context()
  5. //GetConn是钩子函数在获取连接前调用
  6. if trace != nil && trace.GetConn != nil {
  7. trace.GetConn(cm.addr())
  8. }
  9. //如果可以获取到空闲的连接
  10. if pc, idleSince := t.getIdleConn(cm); pc != nil {
  11. if trace != nil && trace.GotConn != nil { //GotConn是钩子函数,成功获取连接后调用
  12. trace.GotConn(pc.gotIdleConnTrace(idleSince))
  13. }
  14. // set request canceler to some non-nil function so we
  15. // can detect whether it was cleared between now and when
  16. // we enter roundTrip
  17. /*
  18. 将请求的canceler设置为某些非零函数,以便我们可以检测它是否在现在和我们进入roundTrip之间被清除
  19. */
  20. t.setReqCanceler(req, func(error) {})
  21. return pc, nil
  22. }
  23. type dialRes struct {
  24. pc *persistConn
  25. err error
  26. }
  27. dialc := make(chan dialRes)
  28. // Copy these hooks so we don't race on the postPendingDial in
  29. // the goroutine we launch. Issue 11136.
  30. testHookPrePendingDial := testHookPrePendingDial
  31. testHookPostPendingDial := testHookPostPendingDial
  32. //该内部函数handlePendingDial的主要作用是,新开启一个协程,当新建连接完成后但没有被使用,将其放到连接池(缓存)中或将其关闭
  33. handlePendingDial := func() {
  34. testHookPrePendingDial()
  35. go func() {
  36. if v := <-dialc; v.err == nil {
  37. t.putOrCloseIdleConn(v.pc)
  38. }
  39. testHookPostPendingDial()
  40. }()
  41. }
  42. cancelc := make(chan error, 1)
  43. t.setReqCanceler(req, func(err error) { cancelc <- err })
  44. go func() {//开启一个协程新建一个连接
  45. pc, err := t.dialConn(ctx, cm)
  46. dialc <- dialRes{pc, err}
  47. }()
  48. idleConnCh := t.getIdleConnCh(cm)
  49. select {
  50. case v := <-dialc: //获取新建的连接
  51. // Our dial finished.
  52. if v.pc != nil { //如果新建的连接不为nil,则返回新建的连接
  53. if trace != nil && trace.GotConn != nil && v.pc.alt == nil {
  54. trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn})
  55. }
  56. return v.pc, nil
  57. }
  58. // Our dial failed. See why to return a nicer error
  59. // value.
  60. select {
  61. case <-req.Cancel:
  62. // It was an error due to cancelation, so prioritize that
  63. // error value. (Issue 16049)
  64. return nil, errRequestCanceledConn
  65. case <-req.Context().Done():
  66. return nil, req.Context().Err()
  67. case err := <-cancelc:
  68. if err == errRequestCanceled {
  69. err = errRequestCanceledConn
  70. }
  71. return nil, err
  72. default:
  73. // It wasn't an error due to cancelation, so
  74. // return the original error message:
  75. return nil, v.err
  76. }
  77. case pc := <-idleConnCh: //如果在新建连接的过程中,有空闲的连接,则返回该空闲的连接
  78. // Another request finished first and its net.Conn
  79. // became available before our dial. Or somebody
  80. // else's dial that they didn't use.
  81. // But our dial is still going, so give it away
  82. // when it finishes:
  83. //如果在dial连接的时候,有空闲的连接,但是这个时候我们仍然正在新建连接,所以当它新建完成后将其放到连接池或丢弃
  84. handlePendingDial()
  85. if trace != nil && trace.GotConn != nil {
  86. trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})
  87. }
  88. return pc, nil
  89. case <-req.Cancel:
  90. handlePendingDial()
  91. return nil, errRequestCanceledConn
  92. case <-req.Context().Done():
  93. handlePendingDial()
  94. return nil, req.Context().Err()
  95. case err := <-cancelc:
  96. handlePendingDial()
  97. if err == errRequestCanceled {
  98. err = errRequestCanceledConn
  99. }
  100. return nil, err
  101. }
  102. }

第一步:尝试从空闲的连接池中获取空闲连接(通过getIdleConn方法)

如果缓存中有空闲的连接,则获取空闲的连接,并从idleConn和idleLRU中删除该连接,getIdleConn方法的源码如下:

  1. func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn, idleSince time.Time) {
  2. key := cm.key()
  3. t.idleMu.Lock()
  4. defer t.idleMu.Unlock()
  5. for {
  6. pconns, ok := t.idleConn[key]
  7. if !ok {
  8. return nil, time.Time{}
  9. }
  10. if len(pconns) == 1 {
  11. pconn = pconns[0]
  12. delete(t.idleConn, key)
  13. } else {
  14. // 2 or more cached connections; use the most
  15. // recently used one at the end.
  16. pconn = pconns[len(pconns)-1]
  17. t.idleConn[key] = pconns[:len(pconns)-1]
  18. }
  19. t.idleLRU.remove(pconn)
  20. if pconn.isBroken() { //如果该连接被关闭,则继续从缓存中查找
  21. // There is a tiny window where this is
  22. // possible, between the connecting dying and
  23. // the persistConn readLoop calling
  24. // Transport.removeIdleConn. Just skip it and
  25. // carry on.
  26. continue
  27. }
  28. if pconn.idleTimer != nil && !pconn.idleTimer.Stop() {
  29. // We picked this conn at the ~same time it
  30. // was expiring and it's trying to close
  31. // itself in another goroutine. Don't use it.
  32. continue
  33. }
  34. return pconn, pconn.idleAt
  35. }
  36. }
  37. 前面我们提到过空闲的连接存放在idleConn字段中,该字段是map结构,可以通过客户端的请求信息(proxy,scheme,addr)来获取持久连接persistConngetIdleConn方法尝试从空闲的连接池idleConn中获取空闲连接,如果获取到空闲的连接,则从该idleConn map中删除,并判断该连接是否被关闭,如果该连接已经被关闭则继续获取。如果可以获取到空闲连接则返回该连接,将使用该连接与服务器进行通讯,如果获取不到则新建连接。

第二步:如果空闲的连接池中没有可用的连接,则会调用dialConn方法新建连接

  1. 当我们无法从空闲的连接池中获取连接,就要新建连接。新建连接的大致过程如下:
  2. ***首先** *初始化dialc channel channel用于等待新建的连接,如果连接创建成功则将创建的连接放入到dialc
  3. go func() {//开启一个协程新建一个连接
  4. pc, err := t.dialConn(ctx, cm)
  5. dialc <- dialRes{pc, err}
  6. }()
  7. handlePendingDial函数,该内部函数的主要作用用于开启一个协程,当新建连接成功但没有被使用,则通过该函数将其放到连接池中或将其关闭。
  8. handlePendingDial := func() {
  9. testHookPrePendingDial()
  10. go func() {
  11. if v := <-dialc; v.err == nil {
  12. t.putOrCloseIdleConn(v.pc)
  13. }
  14. testHookPostPendingDial()
  15. }()
  16. }

其次 用select case 监听事件

1.监听连接是否重建成功,如果连接创建成功则返回该新建的连接

  1. 2.通过idleConnCh channel监听是否在创建连接的时候有空闲的连接,如果有空闲的连接则返回空闲连接,并调用handlePendingDial函数,处理新建的连接,将新建的连接放入到空闲的连接池中或将其关闭。

idleConnCh是为了多个http请求之间复用,当一个客户端的请求处理完成之后,首先会尝试将该连接写入到idleConnCh中,如果有其他http在监听等待该idleConnCh,则会写入成功。从而降低客户端请求的等待时间。如果该连接无法放入到idleConnCh中,则会尝试将该连接放入到idleConn中

  1. 3.在等待创建连接的过程中也在监听是否有取消客户端请求的消息,如果有也会调用handlePendingDial函数,并返回错误信息。

新建连接的过程

  1. 前面我们看到新建连接时,会开启一个协程来执行dialConn方法来创建连接,dialConn方法源码如下:
  2. func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error) {
  3. //初始化persistConn结构体pconn
  4. pconn := &persistConn{
  5. t: t,
  6. cacheKey: cm.key(),
  7. reqch: make(chan requestAndChan, 1),
  8. writech: make(chan writeRequest, 1),
  9. closech: make(chan struct{}),
  10. writeErrCh: make(chan error, 1),
  11. writeLoopDone: make(chan struct{}),
  12. }
  13. trace := httptrace.ContextClientTrace(ctx)
  14. wrapErr := func(err error) error {
  15. if cm.proxyURL != nil {
  16. // Return a typed error, per Issue 16997
  17. return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
  18. }
  19. return err
  20. }
  21. //如果scheme为https,并且DialTLS函数不为nil
  22. if cm.scheme() == "https" && t.DialTLS != nil {
  23. var err error
  24. pconn.conn, err = t.DialTLS("tcp", cm.addr())
  25. if err != nil {
  26. return nil, wrapErr(err)
  27. }
  28. if pconn.conn == nil {
  29. return nil, wrapErr(errors.New("net/http: Transport.DialTLS returned (nil, nil)"))
  30. }
  31. if tc, ok := pconn.conn.(*tls.Conn); ok {
  32. // Handshake here, in case DialTLS didn't. TLSNextProto below
  33. // depends on it for knowing the connection state.
  34. if trace != nil && trace.TLSHandshakeStart != nil {
  35. trace.TLSHandshakeStart()
  36. }
  37. if err := tc.Handshake(); err != nil {
  38. go pconn.conn.Close()
  39. if trace != nil && trace.TLSHandshakeDone != nil {
  40. trace.TLSHandshakeDone(tls.ConnectionState{}, err)
  41. }
  42. return nil, err
  43. }
  44. cs := tc.ConnectionState()
  45. if trace != nil && trace.TLSHandshakeDone != nil {
  46. trace.TLSHandshakeDone(cs, nil)
  47. }
  48. pconn.tlsState = &cs
  49. }
  50. } else {
  51. //创建tcp连接
  52. conn, err := t.dial(ctx, "tcp", cm.addr())
  53. if err != nil {
  54. return nil, wrapErr(err)
  55. }
  56. pconn.conn = conn
  57. if cm.scheme() == "https" {
  58. var firstTLSHost string
  59. if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
  60. return nil, wrapErr(err)
  61. }
  62. if err = pconn.addTLS(firstTLSHost, trace); err != nil {
  63. return nil, wrapErr(err)
  64. }
  65. }
  66. }
  67. ....... 处理代理等消息省略
  68. //初始化br和bw
  69. pconn.br = bufio.NewReader(pconn)
  70. pconn.bw = bufio.NewWriter(persistConnWriter{pconn})
  71. go pconn.readLoop()
  72. go pconn.writeLoop()
  73. return pconn, nil
  74. }

新建连接的过程主要在dialConn方法中,新建连接的大致过程如下:

  1. 1. 首先初始化persistConn结构体
  2. 2. 创建连接,创建连接时区分httpshttp
  3. 3. 连接创建成功后,会开启两个协程,一个用于处理输入流writeLoop,一个用于处理输出流readLoop

从中我们看到当客户端和服务端每建立一个连接,都会开启两个协程,一个处理输入流writeLoop,一个处理输出流readLoop。

readLoop方法

  1. /*
  2. 从网络连接中读取消息并解析成Response
  3. */
  4. func (pc *persistConn) readLoop() {
  5. closeErr := errReadLoopExiting // default value, if not changed below
  6. defer func() {
  7. pc.close(closeErr)
  8. pc.t.removeIdleConn(pc)
  9. }()
  10. //函数作用:尝试将pc放入空闲连接池中
  11. tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
  12. if err := pc.t.tryPutIdleConn(pc); err != nil {
  13. closeErr = err
  14. if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
  15. trace.PutIdleConn(err)
  16. }
  17. return false
  18. }
  19. if trace != nil && trace.PutIdleConn != nil {
  20. trace.PutIdleConn(nil)
  21. }
  22. return true
  23. }
  24. // eofc is used to block caller goroutines reading from Response.Body
  25. // at EOF until this goroutines has (potentially) added the connection
  26. // back to the idle pool.
  27. eofc := make(chan struct{})
  28. defer close(eofc) // unblock reader on errors
  29. // Read this once, before loop starts. (to avoid races in tests)
  30. testHookMu.Lock()
  31. testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
  32. testHookMu.Unlock()
  33. alive := true
  34. for alive {
  35. pc.readLimit = pc.maxHeaderResponseSize()
  36. _, err := pc.br.Peek(1)
  37. pc.mu.Lock()
  38. if pc.numExpectedResponses == 0 {
  39. pc.readLoopPeekFailLocked(err)
  40. pc.mu.Unlock()
  41. return
  42. }
  43. pc.mu.Unlock()
  44. //从reqch通道中获取请求数据和等待返回的response的channel
  45. rc := <-pc.reqch
  46. trace := httptrace.ContextClientTrace(rc.req.Context()) //从请求的上下文中获取trace
  47. var resp *Response
  48. if err == nil {
  49. resp, err = pc.readResponse(rc, trace) //从网络连接中读取http的响应信息Response
  50. } else {
  51. err = transportReadFromServerError{err}
  52. closeErr = err
  53. }
  54. if err != nil { //错误处理
  55. if pc.readLimit <= 0 {
  56. err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
  57. }
  58. select {
  59. case rc.ch <- responseAndError{err: err}:
  60. case <-rc.callerGone:
  61. return
  62. }
  63. return
  64. }
  65. pc.readLimit = maxInt64 // effictively no limit for response bodies
  66. pc.mu.Lock()
  67. pc.numExpectedResponses--
  68. pc.mu.Unlock()
  69. hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0 //判断是否响应的消息有body
  70. if resp.Close || rc.req.Close || resp.StatusCode <= 199 {
  71. // Don't do keep-alive on error if either party requested a close
  72. // or we get an unexpected informational (1xx) response.
  73. // StatusCode 100 is already handled above.
  74. alive = false
  75. }
  76. if !hasBody { //如果响应体没有body
  77. pc.t.setReqCanceler(rc.req, nil) //将reqCanceler取消函数中对应的req删除
  78. // Put the idle conn back into the pool before we send the response
  79. // so if they process it quickly and make another request, they'll
  80. // get this same conn. But we use the unbuffered channel 'rc'
  81. // to guarantee that persistConn.roundTrip got out of its select
  82. // potentially waiting for this persistConn to close.
  83. // but after
  84. /*
  85. 在返回response之前,将空闲的conn放到连接池中
  86. */
  87. alive = alive &&
  88. !pc.sawEOF &&
  89. pc.wroteRequest() &&
  90. tryPutIdleConn(trace)
  91. select {
  92. case rc.ch <- responseAndError{res: resp}: //将响应信息返回到roundTrip中
  93. case <-rc.callerGone:
  94. return
  95. }
  96. // Now that they've read from the unbuffered channel, they're safely
  97. // out of the select that also waits on this goroutine to die, so
  98. // we're allowed to exit now if needed (if alive is false)
  99. testHookReadLoopBeforeNextRead()
  100. continue
  101. }
  102. waitForBodyRead := make(chan bool, 2)
  103. body := &bodyEOFSignal{
  104. body: resp.Body,
  105. earlyCloseFn: func() error {
  106. waitForBodyRead <- false
  107. <-eofc // will be closed by deferred call at the end of the function
  108. return nil
  109. },
  110. fn: func(err error) error {
  111. isEOF := err == io.EOF
  112. waitForBodyRead <- isEOF
  113. if isEOF {
  114. <-eofc // see comment above eofc declaration
  115. } else if err != nil {
  116. if cerr := pc.canceled(); cerr != nil {
  117. return cerr
  118. }
  119. }
  120. return err
  121. },
  122. }
  123. resp.Body = body
  124. if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
  125. resp.Body = &gzipReader{body: body}
  126. resp.Header.Del("Content-Encoding")
  127. resp.Header.Del("Content-Length")
  128. resp.ContentLength = -1
  129. resp.Uncompressed = true
  130. }
  131. //将response返回到到roundTrip中
  132. select {
  133. case rc.ch <- responseAndError{res: resp}:
  134. case <-rc.callerGone:
  135. return
  136. }
  137. // Before looping back to the top of this function and peeking on
  138. // the bufio.Reader, wait for the caller goroutine to finish
  139. // reading the response body. (or for cancelation or death)
  140. /*
  141. 等待返回的response中的body被读完后,才会将连接放入到连接池中,等待再次使用该连接
  142. */
  143. select {
  144. case bodyEOF := <-waitForBodyRead:
  145. pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
  146. alive = alive &&
  147. bodyEOF &&
  148. !pc.sawEOF &&
  149. pc.wroteRequest() &&
  150. tryPutIdleConn(trace)
  151. if bodyEOF {
  152. eofc <- struct{}{}
  153. }
  154. case <-rc.req.Cancel:
  155. alive = false
  156. pc.t.CancelRequest(rc.req)
  157. case <-rc.req.Context().Done():
  158. alive = false
  159. pc.t.cancelRequest(rc.req, rc.req.Context().Err())
  160. case <-pc.closech:
  161. alive = false
  162. }
  163. testHookReadLoopBeforeNextRead()
  164. }
  165. }
  166. readLoop方法的主要作用是从网络中读取消息并解析成Response返回
  167. 定义内部函数tryPutIdleConn,该函数的主要作用是将持久化连接persistConn放入到空闲连接池中
  168. persistConn持久化连接中的reqch chan requestAndChanchannel是用于获取客户端的请求信息并等待返回的responserequestAndChan结构体如下:
  169. type requestAndChan struct {
  170. req *Request
  171. ch chan responseAndError // unbuffered; always send in select on callerGone
  172. // whether the Transport (as opposed to the user client code)
  173. // added the Accept-Encoding gzip header. If the Transport
  174. // set it, only then do we transparently decode the gzip.
  175. addedGzip bool
  176. // Optional blocking chan for Expect: 100-continue (for send).
  177. // If the request has an "Expect: 100-continue" header and
  178. // the server responds 100 Continue, readLoop send a value
  179. // to writeLoop via this chan.
  180. continueCh chan<- struct{}
  181. callerGone <-chan struct{} // closed when roundTrip caller has returned
  182. }
  183. 其中req是用户请求信息。chresponseerror相关信息,用于等待从服务端读取响应信息。continueCh用于判断100-continue的情况。

其中req是用户请求信息。ch是response和error相关信息,用于等待从服务端读取响应信息。continueCh用于判断100-continue的情况。

readLoop的大致流程如下:

1.如果连接正常则轮询读取要发送到服务端的客户端请求信息rc

2.调用readResponse方法从网络连接中读取http的响应信息并封装成Response

3.如果读取时有错误信息,则将error信息发送通过rc中的ch字段将错误信息返回

  1. 4.如果读取到的响应信息没有响应体(即Body),如果连接正常则尝试将连接放入到连接池中。并将响应信息通过rc中的ch channel发送到roundTrip中,从而响应给http客户端请求。
  2. 5.如果读取到的响应信息有响应体(即Body),则会将响应体进行封装,封装成bodyEOFSignal结构体,目的是为了当客户端读取响应体之后,才会将该连接放入到连接池中,等待再次被使用。

tryPutIdleConn将连接放入到空闲连接池中

  1. func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
  2. ........
  3. waitingDialer := t.idleConnCh[key]
  4. select {
  5. case waitingDialer <- pconn:
  6. // We're done with this pconn and somebody else is
  7. // currently waiting for a conn of this type (they're
  8. // actively dialing, but this conn is ready
  9. // first). Chrome calls this socket late binding. See
  10. // https://insouciant.org/tech/connection-management-in-chromium/
  11. return nil
  12. default:
  13. if waitingDialer != nil {
  14. // They had populated this, but their dial won
  15. // first, so we can clean up this map entry.
  16. delete(t.idleConnCh, key)
  17. }
  18. }
  19. if t.wantIdle {
  20. return errWantIdle
  21. }
  22. if t.idleConn == nil {
  23. t.idleConn = make(map[connectMethodKey][]*persistConn)
  24. }
  25. idles := t.idleConn[key]
  26. if len(idles) >= t.maxIdleConnsPerHost() {
  27. return errTooManyIdleHost
  28. }
  29. for _, exist := range idles {
  30. if exist == pconn {
  31. log.Fatalf("dup idle pconn %p in freelist", pconn)
  32. }
  33. }
  34. t.idleConn[key] = append(idles, pconn)
  35. t.idleLRU.add(pconn)
  36. if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
  37. oldest := t.idleLRU.removeOldest()
  38. oldest.close(errTooManyIdle)
  39. t.removeIdleConnLocked(oldest)
  40. }
  41. if t.IdleConnTimeout > 0 {
  42. if pconn.idleTimer != nil {
  43. pconn.idleTimer.Reset(t.IdleConnTimeout)
  44. } else {
  45. pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
  46. }
  47. }
  48. pconn.idleAt = time.Now()
  49. return nil
  50. }

tryPutIdleConn将持久化连接放入到空闲连接池的过程大致如下:

1.如果有其他http请求正在等待该连接,则将该连接写入到waitingDialer,从而使其他http请求复用

2.尝试将该连接放入到空闲的连接池中idleConn

writeLoop方法


  1. func (pc *persistConn) writeLoop() {
  2. defer close(pc.writeLoopDone)
  3. for {
  4. select {
  5. case wr := <-pc.writech: //获取到要写入到输入流中的request相关数据
  6. startBytesWritten := pc.nwrite
  7. err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) //向网络连接中写入数据
  8. if bre, ok := err.(requestBodyReadError); ok { //错误处理
  9. err = bre.error
  10. // Errors reading from the user's
  11. // Request.Body are high priority.
  12. // Set it here before sending on the
  13. // channels below or calling
  14. // pc.close() which tears town
  15. // connections and causes other
  16. // errors.
  17. wr.req.setError(err)
  18. }
  19. if err == nil { //写入时没有错误,则刷新缓存
  20. err = pc.bw.Flush()
  21. }
  22. if err != nil { //错误处理
  23. wr.req.Request.closeBody()
  24. if pc.nwrite == startBytesWritten {
  25. err = nothingWrittenError{err}
  26. }
  27. }
  28. pc.writeErrCh <- err // to the body reader, which might recycle us
  29. wr.ch <- err // to the roundTrip function,将err发送到roundTrip,roundTrip根据是否为nil,来判断是否请求发送成功
  30. if err != nil {
  31. pc.close(err)
  32. return
  33. }
  34. case <-pc.closech:
  35. return
  36. }
  37. }
  38. }
  39. writeLoop相对比较简单,主要是向输入流中写入数据,监听pc.writech获取要写入到输入流的request相关数据,并写入到网络连接中。

到此新建连接的大致过程已经讲解完成。

RoundTrip方法源码中的通过getConn方法获取或新建连接我们已经了解,建立连接之后就可以数据的读写了,后面http事务主要在roundTrip方法中完成

roundTrip方法处理请求并等待响应

  1. 从上面新建连接我们知道每一个持久连接persistConn,都会有两个协程,一个处理输入流,一个处理输出流。输出流readLoop主要读取 reqch chan requestAndChan中的数据,读取到数据后会等待从网络连接中读取响应数据。输入流writeLoop主要处理writech chan writeRequest中的消息,并将该消息写入到网络连接中。
  2. func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
  3. testHookEnterRoundTrip()
  4. ......
  5. /*
  6. 将请求消息写入到输入流
  7. */
  8. startBytesWritten := pc.nwrite
  9. writeErrCh := make(chan error, 1)
  10. pc.writech <- writeRequest{req, writeErrCh, continueCh}
  11. //将请求消息发送到输出流,并等待返回
  12. resc := make(chan responseAndError)
  13. pc.reqch <- requestAndChan{
  14. req: req.Request,
  15. ch: resc,
  16. addedGzip: requestedGzip,
  17. continueCh: continueCh,
  18. callerGone: gone,
  19. }
  20. var respHeaderTimer <-chan time.Time
  21. cancelChan := req.Request.Cancel
  22. ctxDoneChan := req.Context().Done()
  23. for {
  24. testHookWaitResLoop()
  25. select {
  26. case err := <-writeErrCh:
  27. if debugRoundTrip {
  28. req.logf("writeErrCh resv: %T/%#v", err, err)
  29. }
  30. if err != nil {
  31. pc.close(fmt.Errorf("write error: %v", err))
  32. return nil, pc.mapRoundTripError(req, startBytesWritten, err)
  33. }
  34. if d := pc.t.ResponseHeaderTimeout; d > 0 {
  35. if debugRoundTrip {
  36. req.logf("starting timer for %v", d)
  37. }
  38. timer := time.NewTimer(d)
  39. defer timer.Stop() // prevent leaks
  40. respHeaderTimer = timer.C
  41. }
  42. case <-pc.closech:
  43. if debugRoundTrip {
  44. req.logf("closech recv: %T %#v", pc.closed, pc.closed)
  45. }
  46. return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
  47. case <-respHeaderTimer:
  48. if debugRoundTrip {
  49. req.logf("timeout waiting for response headers.")
  50. }
  51. pc.close(errTimeout)
  52. return nil, errTimeout
  53. case re := <-resc: //等待获取response信息
  54. if (re.res == nil) == (re.err == nil) {
  55. panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
  56. }
  57. if debugRoundTrip {
  58. req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
  59. }
  60. if re.err != nil {
  61. return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
  62. }
  63. return re.res, nil
  64. case <-cancelChan:
  65. pc.t.CancelRequest(req.Request)
  66. cancelChan = nil
  67. case <-ctxDoneChan:
  68. pc.t.cancelRequest(req.Request, req.Context().Err())
  69. cancelChan = nil
  70. ctxDoneChan = nil
  71. }
  72. }
  73. }

roundTrip方法的大致流程:

1.将请求消息写入到writech,将请求信息发送给输入流

2.将请求消息写入到reqch,等待服务端响应的消息

3.resc chan就是为了等待从服务端响应的消息。

4.返回从服务端响应的消息或错误信息

以上就是RoundTrip方法的主要流程,如果有理解不足或错误的地方还请指正。

发表评论

表情:
评论列表 (有 0 条评论,836人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Golanghttp server

    基于HTTP构建的网络应用包括两个端,即客户端(Client)和服务端(Server)。两个端的交互行为包括从客户端发出request、服务端接受request进行处理并返回r