netty源码分析之客户端

快来打我* 2022-06-06 01:28 311阅读 0赞

一、前言

上篇博客介绍了服务端的启动源码,这篇就开始介绍客户端

二、源码分析

首先贴上客户端的简单代码~

  1. public class SocketClient {
  2. public static void main(String[] args) throws InterruptedException {
  3. EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
  4. try {
  5. Bootstrap bootstrap = new Bootstrap();
  6. bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))
  7. .handler(new SocketClientInitializer());
  8. ChannelFuture channelFuture = bootstrap.connect("localhost", 7878).sync();
  9. channelFuture.channel().closeFuture().sync();
  10. } finally {
  11. eventLoopGroup.shutdownGracefully();
  12. }
  13. }
  14. }

设置group和server端不同的是,这里这设置一个group

  1. public B group(EventLoopGroup group) {
  2. if (group == null) {
  3. throw new NullPointerException("group");
  4. }
  5. if (this.group != null) {
  6. throw new IllegalStateException("group set already");
  7. }
  8. this.group = group;
  9. return (B) this;
  10. }

设置channel

  1. public B channel(Class<? extends C> channelClass) {
  2. if (channelClass == null) {
  3. throw new NullPointerException("channelClass");
  4. }
  5. // server端使用的是ReflectiveChannelFactory
  6. // 最后newChannel()后面也是通过反射来实现的了
  7. return channelFactory(new BootstrapChannelFactory<C>(channelClass));
  8. }

设置handler

  1. @SuppressWarnings("unchecked")
  2. public B handler(ChannelHandler handler) {
  3. if (handler == null) {
  4. throw new NullPointerException("handler");
  5. }
  6. this.handler = handler;
  7. return (B) this;
  8. }

接着就是核心代码连接了

  1. public ChannelFuture connect(String inetHost, int inetPort) {
  2. return connect(new InetSocketAddress(inetHost, inetPort));
  3. }
  4. public ChannelFuture connect(SocketAddress remoteAddress) {
  5. if (remoteAddress == null) {
  6. throw new NullPointerException("remoteAddress");
  7. }
  8. validate();
  9. return doConnect(remoteAddress, localAddress());
  10. }
  11. // 这部分代码和server绑定类似
  12. private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
  13. // 初始化和注册channel
  14. final ChannelFuture regFuture = initAndRegister();
  15. // 获取channel
  16. final Channel channel = regFuture.channel();
  17. if (regFuture.cause() != null) {
  18. return regFuture;
  19. }
  20. final ChannelPromise promise = channel.newPromise();
  21. if (regFuture.isDone()) {
  22. doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
  23. } else {
  24. // 为future添加一个监听器
  25. regFuture.addListener(new ChannelFutureListener() {
  26. @Override
  27. public void operationComplete(ChannelFuture future) throws Exception {
  28. doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
  29. }
  30. });
  31. }
  32. return promise;
  33. }
  34. final ChannelFuture initAndRegister() {
  35. // 创建channel
  36. final Channel channel = channelFactory().newChannel();
  37. try {
  38. // 初始化channel
  39. init(channel);
  40. } catch (Throwable t) {
  41. channel.unsafe().closeForcibly();
  42. return channel.newFailedFuture(t);
  43. }
  44. // 注册channel
  45. ChannelFuture regFuture = group().register(channel);
  46. if (regFuture.cause() != null) {
  47. if (channel.isRegistered()) {
  48. channel.close();
  49. } else {
  50. channel.unsafe().closeForcibly();
  51. }
  52. }
  53. return regFuture;
  54. }
  55. //Bootstrap channel初始化部分
  56. void init(Channel channel) throws Exception {
  57. // 获取通道
  58. ChannelPipeline p = channel.pipeline();
  59. // 为通道添加handler,这里的handler就是我们添加的ChannelInitializer类
  60. p.addLast(handler());
  61. // 下面的一些配置就不是很重要的了
  62. final Map<ChannelOption<?>, Object> options = options();
  63. synchronized (options) {
  64. for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
  65. try {
  66. if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
  67. logger.warn("Unknown channel option: " + e);
  68. }
  69. } catch (Throwable t) {
  70. logger.warn("Failed to set a channel option: " + channel, t);
  71. }
  72. }
  73. }
  74. final Map<AttributeKey<?>, Object> attrs = attrs();
  75. synchronized (attrs) {
  76. for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
  77. channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
  78. }
  79. }
  80. }
  81. // MultithreadEventLoopGroup类
  82. // group().register(channel)对应的注册
  83. @Override
  84. public ChannelFuture register(Channel channel) {
  85. return next().register(channel);
  86. }
  87. // SingleThreadEventLoop 类
  88. @Override
  89. public ChannelFuture register(Channel channel) {
  90. return register(channel, new DefaultChannelPromise(channel, this));
  91. }
  92. // AbstractChannel 类
  93. @Override
  94. public final void register(EventLoop eventLoop, final ChannelPromise promise) {
  95. if (eventLoop == null) {
  96. throw new NullPointerException("eventLoop");
  97. }
  98. if (isRegistered()) {
  99. promise.setFailure(new IllegalStateException("registered to an event loop already"));
  100. return;
  101. }
  102. if (!isCompatible(eventLoop)) {
  103. promise.setFailure(
  104. new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
  105. return;
  106. }
  107. AbstractChannel.this.eventLoop = eventLoop;
  108. if (eventLoop.inEventLoop()) {
  109. register0(promise);
  110. } else {
  111. try {
  112. eventLoop.execute(new Runnable() {
  113. @Override
  114. public void run() {
  115. register0(promise);
  116. }
  117. });
  118. } catch (Throwable t) {
  119. logger.warn(
  120. "Force-closing a channel whose registration task was not accepted by an event loop: {}",
  121. AbstractChannel.this, t);
  122. closeForcibly();
  123. closeFuture.setClosed();
  124. promise.setFailure(t);
  125. }
  126. }
  127. }
  128. // 再看register0()吧
  129. private void register0(ChannelPromise promise) {
  130. try {
  131. // check if the channel is still open as it could be closed in the mean time when the register
  132. // call was outside of the eventLoop
  133. if (!ensureOpen(promise)) {
  134. return;
  135. }
  136. // 真正执行注册
  137. doRegister();
  138. registered = true;
  139. promise.setSuccess();
  140. pipeline.fireChannelRegistered();
  141. if (isActive()) {
  142. pipeline.fireChannelActive();
  143. }
  144. } catch (Throwable t) {
  145. // Close the channel directly to avoid FD leak.
  146. closeForcibly();
  147. closeFuture.setClosed();
  148. if (!promise.tryFailure(t)) {
  149. logger.warn(
  150. "Tried to fail the registration promise, but it is complete already. " +
  151. "Swallowing the cause of the registration failure:", t);
  152. }
  153. }
  154. }
  155. @Override
  156. protected void doRegister() throws Exception {
  157. boolean selected = false;
  158. for (;;) {
  159. try {
  160. // 这里的javaChannel()是:java.nio.channels.SocketChannel,很明显这里的channel就是jdk中的channel
  161. selectionKey = javaChannel().register(eventLoop().selector, 0, this);
  162. return;
  163. } catch (CancelledKeyException e) {
  164. if (!selected) {
  165. // Force the Selector to select now as the "canceled" SelectionKey may still be
  166. // cached and not removed because no Select.select(..) operation was called yet.
  167. eventLoop().selectNow();
  168. selected = true;
  169. } else {
  170. // We forced a select operation on the selector before but the SelectionKey is still cached
  171. // for whatever reason. JDK bug ?
  172. throw e;
  173. }
  174. }
  175. }
  176. }

ok,到了channel的初始化和注册完成了。继续看注册完了之后

  1. private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
  2. final ChannelFuture regFuture = initAndRegister();
  3. final Channel channel = regFuture.channel();
  4. if (regFuture.cause() != null) {
  5. return regFuture;
  6. }
  7. final ChannelPromise promise = channel.newPromise();
  8. if (regFuture.isDone()) {
  9. doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
  10. } else {
  11. // 添加监听器
  12. regFuture.addListener(new ChannelFutureListener() {
  13. // 将会在调用sync之后调用此方法
  14. @Override
  15. public void operationComplete(ChannelFuture future) throws Exception {
  16. doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
  17. }
  18. });
  19. }
  20. return promise;
  21. }
  22. // 调用sync()之后
  23. private static void doConnect0(
  24. final ChannelFuture regFuture, final Channel channel,
  25. final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
  26. channel.eventLoop().execute(new Runnable() {
  27. @Override
  28. public void run() {
  29. if (regFuture.isSuccess()) {
  30. if (localAddress == null) {
  31. //channel: java.nio.channels.SocketChannel
  32. // 真正连接了
  33. channel.connect(remoteAddress, promise);
  34. } else {
  35. channel.connect(remoteAddress, localAddress, promise);
  36. }
  37. promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
  38. } else {
  39. promise.setFailure(regFuture.cause());
  40. }
  41. }
  42. });
  43. }

三、总结

总的来说,client的源码和server端的开启还是有点类似的,如果服务端的源码能大致了解,那么client比较简单的了~~~

发表评论

表情:
评论列表 (有 0 条评论,311人围观)

还没有评论,来说两句吧...

相关阅读