netty如何实现优化百万并发http请求

缺乏、安全感 2022-09-15 05:42 577阅读 0赞

问题引入:如果项目存在数量大的并发,如何优化项目性能

  • 首先判断项目性能瓶颈是什么, CPU,还是I/O请求
    大部分是网络I/o瓶颈,主要是因为网络请求是阻塞,导致太慢

Netty 是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。
JDK 原生 NIO 程序的问题
JDK 原生也有一套网络应用程序 API,但是存在一系列问题,主要如下:
NIO 的类库和 API 繁杂,使用麻烦。你需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。
需要具备其他的额外技能做铺垫。例如熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的 NIO 程序。
可靠性能力补齐,开发工作量和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等。
NIO 编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大。
JDK NIO 的 Bug。例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。
官方声称在 JDK 1.6 版本的 update 18 修复了该问题,但是直到 JDK 1.7 版本该问题仍旧存在,只不过该 Bug 发生概率降低了一些而已,它并没有被根本解决。
Netty 的特点
Netty 对 JDK 自带的 NIO 的 API 进行封装,解决上述问题,主要特点有:
设计优雅,适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池;真正的无连接数据报套接字支持(自 3.1 起)。
使用方便,详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。
高性能,吞吐量更高,延迟更低;减少资源消耗;最小化不必要的内存复制。
安全,完整的 SSL/TLS 和 StartTLS 支持。
社区活跃,不断更新,社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入。

Netty 常见使用场景

Netty 常见的使用场景如下:
互联网行业。在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。
典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。
游戏行业。无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。
非常方便定制和开发私有协议栈,账号登录服务器,地图服务器之间可以方便的通过 Netty 进行高性能的通信。
大数据领域。经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现。
有兴趣的读者可以了解一下目前有哪些开源项目使用了 Netty:Related Projects。
Netty 高性能设计
Netty 作为异步事件驱动的网络,高性能之处主要来自于其 I/O 模型和线程处理模型,前者决定如何收发数据,后者决定如何处理数据。

netty实现http客户端解决网络I/O瓶颈

  1. @Component
  2. @Order(2)
  3. public class HttpClient {
  4. public static void start() {
  5. EventLoopGroup group = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
  6. Bootstrap bootstrap = new Bootstrap();
  7. try {
  8. bootstrap.group(group)
  9. .channel(NioSocketChannel.class)
  10. .option(ChannelOption.SO_KEEPALIVE, true)
  11. .handler(new ChannelInitializer<Channel>() {
  12. @Override
  13. protected void initChannel(Channel channel)
  14. throws Exception {
  15. /*//包含编码器和解码器 channel.pipeline().addLast(new HttpClientCodec()); //聚合 channel.pipeline().addLast(new HttpObjectAggregator(1024 * 10 * 1024)); //解压 channel.pipeline().addLast(new HttpContentDecompressor());*/
  16. //请求编码
  17. channel.pipeline().addLast(new HttpRequestEncoder());
  18. //响应解码
  19. channel.pipeline().addLast(new HttpResponseDecoder());
  20. channel.pipeline().addLast(new HttpClientHandler());
  21. }
  22. });
  23. SimpleClientHttpRequestFactory httpRequestFactory = new SimpleClientHttpRequestFactory();
  24. SocketAddress address = new InetSocketAddress("net-proxy-pub-test.sftcwl.com", 1080);
  25. SocketAddress address1 = new InetSocketAddress("gis.sf-express.com", 8080);
  26. Proxy proxy = new Proxy(Proxy.Type.HTTP, address);
  27. httpRequestFactory.setProxy(proxy);
  28. ChannelFuture future = bootstrap.connect("localhost",8162).sync();
  29. future.channel().closeFuture().sync();
  30. } catch (Exception e) {
  31. e.printStackTrace();
  32. } finally {
  33. group.shutdownGracefully();
  34. }
  35. }
  36. public static void main(String[] args) {
  37. start();
  38. }
  39. public class HttpClientHandler extends ChannelInboundHandlerAdapter {
  40. private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
  41. private static final String CLOSE_FREE_CHANNEL = "closeFreeChannel";
  42. /** * 关闭channel的原因,为closeFreeChannel时表示原因为关闭空闲socket连接 */
  43. public static final String CLOSE_CHANNEL_CAUSE = "closeChannelCause";
  44. private static Set<String> channelIDSet = new CopyOnWriteArraySet<>();
  45. @Override
  46. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  47. URI uri = new URI("/abc/project/list");
  48. Map<String, Object> param = new HashMap<>(2);
  49. param.put("address", "雁塔区");
  50. param.put("ak", "58008e825a944a45909e056be12f511f");
  51. String prefix = "?";
  52. StringBuilder urlBuilder = new StringBuilder();
  53. for (String key : param.keySet()) {
  54. urlBuilder.append(prefix).append(key).append("=").append(param.get(key));
  55. prefix = "&";
  56. }
  57. LOGGER.info(urlBuilder.toString());
  58. //配置HttpRequest的请求数据和一些配置信息
  59. FullHttpRequest request = new DefaultFullHttpRequest(
  60. HTTP_1_1,
  61. HttpMethod.GET,
  62. uri.toASCIIString()
  63. );
  64. LOGGER.info(request.toString());
  65. request.headers()
  66. .set(HttpHeaderNames.HOST, "127.0.0.1")
  67. .set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
  68. //开启长连接
  69. .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
  70. //设置传递请求内容的长度
  71. .set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());
  72. ctx.writeAndFlush(request);
  73. }
  74. @Override
  75. public void channelRead(ChannelHandlerContext ctx, Object msg)
  76. throws Exception {
  77. System.out.println("msg -> " + msg);
  78. if (msg instanceof HttpContent) {
  79. HttpContent content = (HttpContent) msg;
  80. ByteBuf buf = content.content();
  81. String result = buf.toString(CharsetUtil.UTF_8);
  82. System.out.println("response -> " + result);
  83. LOGGER.info(result);
  84. }
  85. }
  86. /** * 通道失活事件,用于断线重连 * * @param ctx */
  87. @Override
  88. public void channelInactive(final ChannelHandlerContext ctx) {
  89. LOGGER.info("Channel为 {} 断开,Disconnected Server:{}", ctx.channel().id(), ctx.channel().remoteAddress());
  90. EventLoop eventLoop = ctx.channel().eventLoop();
  91. eventLoop.schedule(new Runnable() {
  92. @Override
  93. public void run() {
  94. SocketChannel channel = (SocketChannel) ctx.channel();
  95. int port = channel.remoteAddress().getPort();
  96. //获取channel中存储的关闭连接的原因,如果是关闭空闲连接,则不再重连
  97. String closeChannelCause = channel.attr(AttributeKey.<String>valueOf(CLOSE_CHANNEL_CAUSE)).get();
  98. if (CLOSE_FREE_CHANNEL.equals(closeChannelCause)) {
  99. LOGGER.info("检测到Channel为 {} 的空闲连接,已经断开.", channel.id());
  100. return;
  101. }
  102. LOGGER.info("检测到与服务端断开连接,NettyClient开始重连{}:{}", channel.remoteAddress().getHostName(), port);
  103. }
  104. }, 3, TimeUnit.SECONDS);
  105. ctx.fireChannelInactive();
  106. }
  107. /** * 监听idle事件,用于监听心跳 * * @param ctx * @param evt * @throws Exception */
  108. @Override
  109. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  110. Channel channel = ctx.channel();
  111. if (evt instanceof IdleStateEvent) {
  112. if (channel.isActive()) {
  113. LOGGER.info("[客户端心跳监测] 通道编号: {} ,客户端信息: {}", channel.id(), channel.localAddress());
  114. channelIDSet.add(channel.id().toString());
  115. LOGGER.info("[客户端心跳监测] 关闭当前空闲,通道编号: {} ,客户端信息: {}", channel.id(), channel.localAddress());
  116. return;
  117. }
  118. }
  119. }
  120. /** * 异常处理 * * @param ctx * @param cause * @throws Exception */
  121. @Override
  122. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  123. LOGGER.error("Socket通信过程中出现异常,异常原因为:", cause);
  124. super.exceptionCaught(ctx, cause);
  125. }
  126. }

发表评论

表情:
评论列表 (有 0 条评论,577人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Netty如何做到单机并发

    相信很多人知道石中剑这个典故,在此典故中,天命注定的亚瑟很容易的就拔出了这把石中剑,但是由于资历不被其他人认可,所以他颇费了一番周折才成为了真正意义上的英格兰全境之王,亚瑟王。