2.1.3 客户端网络连接对象 川长思鸟来 2023-01-12 15:53 41阅读 0赞 #### 2.1.3 客户端网络连接对象 #### 客户端网络连接对象(NetworkClient )管理了客户端和服务端之间的 网络通信,包括连接的建立、发送客户端请求 、 读取客户端响应。回顾下2 . 1.2节中第 1小节“从记录收集器获取数据”部分,发送线程的run()方法中会调用 NetworkClient的3个方法,如下 。 * ready()方法。从记录收集器获取准备完毕的节点,并连接所有准备好的节点 ; * send()方法。为每个节点创建一个客户端请求 , 将请求暂存到节点对应的通道 中; * poll()方法 。轮询动作会真正执行网 络请求, 比如发送请求给节点,并读取响应 。 我们把前面两个方法都叫作准备阶段,因为调用这两个方法并没有真正地将客户端请求发送到服务端上,只有第三个方法才会发送客户端请求 。 1. 准备发送客户端请求 客户端向服务端发送请求需要先建立网络连接。 如果服务端还没有准备好,即还不能连接,这个节点在客户端就会被移除掉,确保消息不会发送给还没有准备好的节点;如果服务端已经准备好了,则调用selector. connect ()方法建立到目标节点的网络连接 。 相关代码如下: public boolean ready(Node node, long now) { if (node.isEmpty()) throw new IllegalArgumentException("Cannot connect to empty node " + node); if (isReady(node, now)) return true; if (connectionStates.canConnect(node.idString(), now)) // if we are interested in sending to a node and we don't have a connection to it, initiate one initiateConnect(node, now); return false; } private void initiateConnect(Node node, long now) { String nodeConnectionId = node.idString(); try { log.debug("Initiating connection to node {}", node); this.connectionStates.connecting(nodeConnectionId, now); selector.connect(nodeConnectionId, new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnected(nodeConnectionId, now); /* maybe the problem is our metadata, update it */ metadataUpdater.requestUpdate(); log.debug("Error connecting to node {}", node, e); } } 连接建立后,发送线程调用NetworkClientsend() ,先将客户端请求加入initFlightRequests列表,然后调用 selector. send()方法。 注意:这一步只是将请求暂存到节点对应的网络通道中,还没有真正地将客户端请求发送出去。 相关代码如下: public void send(ClientRequest request, long now) { doSend(request, false, now); } private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) { String nodeId = clientRequest.destination(); if (!isInternalRequest) { // If this request came from outside the NetworkClient, validate // that we can send data. If the request is internal, we trust // that that internal code has done this validation. Validation // will be slightly different for some internal requests (for // example, ApiVersionsRequests can be sent prior to being in // READY state.) if (!canSendRequest(nodeId)) throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); } AbstractRequest.Builder<?> builder = clientRequest.requestBuilder(); try { NodeApiVersions versionInfo = apiVersions.get(nodeId); short version; // Note: if versionInfo is null, we have no server version information. This would be // the case when sending the initial ApiVersionRequest which fetches the version // information itself. It is also the case when discoverBrokerVersions is set to false. if (versionInfo == null) { version = builder.desiredOrLatestVersion(); if (discoverBrokerVersions && log.isTraceEnabled()) log.trace("No version information found when sending {} with correlation id {} to node {}. " + "Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version); } else { version = versionInfo.usableVersion(clientRequest.apiKey(), builder.desiredVersion()); } // The call to build may also throw UnsupportedVersionException, if there are essential // fields that cannot be represented in the chosen version. doSend(clientRequest, isInternalRequest, now, builder.build(version)); } catch (UnsupportedVersionException e) { // If the version is not supported, skip sending the request over the wire. // Instead, simply add it to the local queue of aborted requests. log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder, clientRequest.correlationId(), clientRequest.destination(), e); ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.desiredOrLatestVersion()), clientRequest.callback(), clientRequest.destination(), now, now, false, e, null); abortedSends.add(clientResponse); } } 为了保证服务端的处理性能,客户端网络连接对象有一个限制条件:针对同一个服务端,如果上一个客户端请求还没有发送完成,则不允许发送新的客户端请求 。 客户端网络连接对象用i.nFli.ghtReq四sts变量在客户端缓存了还没有收到响应的客户端请求, InFli.ghtRequests类包含一个节点到双端队列的映射结构 。 在准备发送客户端请求时,请求将添加到指定节点对应的队列中;在收到响应后 ,才会将请求从队列中移除 。 1. 客户端轮询并调用回调函数 发送线程 run ()方法的最后一步是调用 NetworkClient的 poll ()方法 。 轮询的最关键步骤是调用selectorpoll ()方法,而在轮询之后,定义了多个处理方法 。 轮询不仅仅会发送客户端请求,也会接收客户端响应 。 客户端发送请求后会调用 handleCol’lpletedSends ()处理已经完成的发送,客户端接收到响应后会调用 handleCol’lpletedReceives ()处理已经完成的接收。 如果客户端发送完请求不需要响应,在处理已经完成的发送时,就会将对应的请求从inFlightRequests 队列中移踪 。 而因为没有响应结果,也就不会有机会调用 handleCollpletedReceives()方法。 如果客户端请求需要响应, 则只有在handleCol’lpletedReceives () 中才会删除对应的请求 : 因为inFli.ghtRequests队列保存的是未收到响应的客户端请求,请求已经有响应,就不需要存在于队列 中 。 相关代码如下 : public List<ClientResponse> poll(long timeout, long now) { if (!abortedSends.isEmpty()) { // If there are aborted sends because of unsupported version exceptions or disconnects, // handle them immediately without waiting for Selector#poll. List<ClientResponse> responses = new ArrayList<>(); handleAbortedSends(responses); completeResponses(responses); return responses; } long metadataTimeout = metadataUpdater.maybeUpdate(now); try { this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O", e); } // process completed actions long updatedNow = this.time.milliseconds(); List<ClientResponse> responses = new ArrayList<>(); handleCompletedSends(responses, updatedNow); handleCompletedReceives(responses, updatedNow); handleDisconnections(responses, updatedNow); handleConnections(); handleInitiateApiVersionRequests(updatedNow); handleTimedOutRequests(responses, updatedNow); completeResponses(responses); return responses; } private void handleCompletedSends(List<ClientResponse> responses, long now) { // if no response is expected then when the send is completed, return it for (Send send : this.selector.completedSends()) { InFlightRequest request = this.inFlightRequests.lastSent(send.destination()); if (!request.expectResponse) { this.inFlightRequests.completeLastSent(send.destination()); responses.add(request.completed(null, now)); } } } private void handleCompletedSends(List<ClientResponse> responses, long now) { // if no response is expected then when the send is completed, return it for (Send send : this.selector.completedSends()) { InFlightRequest request = this.inFlightRequests.lastSent(send.destination()); if (!request.expectResponse) { this.inFlightRequests.completeLastSent(send.destination()); responses.add(request.completed(null, now)); } } } 下面总结了客户端是否需要响应结果的两种场景下 , 从队列 中删除或添加请求的顺序 。 * 不需要响应的流程。开始发送请求→添加客户端请求到队列→发送请求→请求发送成功→从队列中删除发送请求→构造客户端响应。 * 需要晌应的流程。开始发送请求→添加客户端请求到队列→发送请求→请求发送成功→等待接收响应→接收响应→接收到完整的响应→从队列中删除客户端请求→构造客户端响应 。 上面几个处理方法创建的客户端响应对象(ClientResponse )都需要从队列中获取对应的客户端请求(ClientRequest ),这是因为最后要调用回调函数 , 只有客户端请求中才有回调对象。 把客户端请求作为客户端响应的一个成员变量,在接收到客户端响应时, 通过获取其中的客户端请求 , 就可以得到客户端请求中的回调对象,也就可以调用到回调函数。 客户端响应包含客户端请求的目的是 : 根据响应获取请求中的回调对象 , 在收到响应后调用回调函数。 3 . 客户端请求和客户端响应的关系 客户端请求(ClientRequest )包含客户端发送的请求和回调处理器,客户端响应(ClientResponse )包含客户端请求对象和响应结果的内容。 相关代码如下 : public final class ClientRequest { private final String destination; private final AbstractRequest.Builder<?> requestBuilder; private final int correlationId; private final String clientId; private final long createdTimeMs; private final boolean expectResponse; private final RequestCompletionHandler callback; ………… } public class ClientResponse { private final RequestHeader requestHeader; private final RequestCompletionHandler callback; private final String destination; private final long receivedTimeMs; private final long latencyMs; private final boolean disconnected; private final UnsupportedVersionException versionMismatch; private final AbstractResponse responseBody; ………… } 客户端请求和客户端响应的生命周期都在客户端的连接管理类(NetworkClient)里 。NetworkClient不仅负责将发送钱程构造好的客户端请求发送出去,而且还要将服务端的响应结果构造成客户端响应 并返回给客户端。 图2-9以“客户端发送请求,服务端接收请求,服务端返回结果,客户端接收请求”这个完整的流程,来梳理这些对象之间的关联 。 1. 发送线程创建的客户端请求对象包括请求本身和回调对象 。 2. 发送线程将客户端请求交给 NetworkClient,并记录目标节点到客户端请求的映射关系 。 3. NetworkClient的轮询得到发送请求,将客户端请求发送到对应的服务端目标节点 。 4. 服务端处理客户端请求 , 将客户端响应通过服务端的请求通道返回给客户端 。 5. NetworkClient的轮询得到响应结果, 说明客户端收到服务端发送过来的请求处理结果 。 6. 由于客户端发送请求 时发送到了不 同节点,收到的结果也可能来向不同节点 。 服务端发送过来的响应结果都表示了它是从哪里来的,客户端根据NetworkRecelve的 source查找步骤(2)记录的信息,得到对应的客户端请求, 把客户端请求作为客户端响应的成员变量。 7. 调用口 lentResponse.ClientRequest.Callback.onComplete() ,触发回调函数的调用 。 8. 客户端请求中的回调对象会使用客户端的响应结果 , 来调用生产者应用程序向定义的回调函数 。 ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L1dBTlRBV0FZMzE0_size_16_color_FFFFFF_t_70] 客户端请求对应的底层数据来源于Send ,客户端响应对应的底层数据来源于NetworkReceive。客户端网络连接对象(NetworkClient )的底层网络操作都交给了选择器(Selector)。 -------------------- [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L1dBTlRBV0FZMzE0_size_16_color_FFFFFF_t_70]: /images/20221022/22874fb421b24cebbcc7a126f249a35c.png
相关 Java网络编程:客户端socket连接问题解析 在Java的网络编程中,客户端通过Socket进行连接。这里可能会遇到一些问题,以下是这些问题及解析: 1. **连接失败**:可能是服务器端未开启监听,或者客户端IP、端口 痛定思痛。/ 2024年09月17日 00:21/ 0 赞/ 12 阅读
相关 Java网络编程:客户端连接服务器的示例 在Java中,我们可以使用Socket和ServerSocket类来实现客户端连接服务器的功能。下面是一个简单的示例: **客户端代码(Client.java):** `` 墨蓝/ 2024年09月16日 11:45/ 0 赞/ 29 阅读
相关 网络编程问题:TCP客户端连接失败案例 在TCP客户端的连接过程中,可能会遇到连接失败的情况。以下是一个典型的案例: **案例描述:** 1. **客户端程序准备:** 一个简单的Python TCP客户端代码被 本是古典 何须时尚/ 2024年09月10日 05:36/ 0 赞/ 21 阅读
相关 2.1.3 客户端网络连接对象 2.1.3 客户端网络连接对象 客户端网络连接对象(NetworkClient )管理了客户端和服务端之间的 网络通信,包括连接的建立、发送客户端请求 、 读取客户端响应 川长思鸟来/ 2023年01月12日 15:53/ 0 赞/ 42 阅读
相关 redis客户端连接 备注:测试版本 redis 4.0.9 一.redis连接数 查看redis的连接数,redis的默认连接数是1w,当然也可以通过配置文件来修改 10.31. 雨点打透心脏的1/2处/ 2022年12月21日 10:51/ 0 赞/ 221 阅读
相关 Redis 客户端连接 Redis 通过监听一个 TCP 端口或者 Unix socket 的方式来接收来自客户端的连接,当一个连接建立后,Redis 内部会进行以下一些操作: 首先,客户端 ╰+攻爆jí腚メ/ 2022年09月24日 06:27/ 0 赞/ 236 阅读
相关 Python mongoDB 客户端连接 coding:utf-8 import pymongo if __name__ == '__main__': client = 心已赠人/ 2022年08月18日 11:56/ 0 赞/ 154 阅读
相关 java 网络编程 多个客户端连接服务器 import java.io.BufferedReader; import java.io.IOException; import java.io.InputStrea 女爷i/ 2022年08月17日 14:22/ 0 赞/ 247 阅读
相关 Redis 客户端连接 Redis 命令用于在 redis 服务上执行操作。 要在 redis 服务上执行命令需要一个 redis 客户端。Redis客户端在Redis包中有提供,这个包在我们 约定不等于承诺〃/ 2021年09月22日 06:26/ 0 赞/ 447 阅读
还没有评论,来说两句吧...