Netty 使用异步线程池执行耗时任务

约定不等于承诺〃 2022-09-10 10:15 437阅读 0赞

当遇到在Handler需要执行耗时较高的操作时候,可以采用异步的方式来解决,多线程异步实现方式有两种:在Handler中添加线程池和在Context中添加线程池。任务队列并没有使用多线程,它是使用同一个线程执行IO操作和运行任务队列中的任务。

一 任务队列

  1. 这种方式运行任务队列线程和事件循环线程是同一个线程,并没有使用新的线程。
  2. Netty的事件循环EventLoop是一个不断循环着执行读取就绪事件、处理事件、运行任务队列这三个操作的一个线程。事件循环关联了一个任务队列,用于存放耗时较长的业务处理操作。事件循环线程先读取就绪事件,如果任务队列为空则用阻塞一定时间的方式读取,如果任务队列非空,则使用非阻塞的方式读取,读取就绪事件。读取后进行处理事件,记录处理事件花费的时间,在任务队列中不断取出任务执行,默认花费在任务队列上取执行任务的时间和处理事件的时间相同,可以使用ioradio参数调整IO操作和非IO操作花费的时间比例,到时间后又去读取就绪事件。
  3. 在处理器中使用ChannelHandlerContext获取channel获取事件循环提交任务
  4. package com.tech.netty.netty.source.async;
  5. import io.netty.bootstrap.ServerBootstrap;
  6. import io.netty.channel.ChannelFuture;
  7. import io.netty.channel.ChannelFutureListener;
  8. import io.netty.channel.ChannelInitializer;
  9. import io.netty.channel.ChannelOption;
  10. import io.netty.channel.nio.NioEventLoopGroup;
  11. import io.netty.channel.socket.SocketChannel;
  12. import io.netty.channel.socket.nio.NioServerSocketChannel;
  13. import java.util.HashSet;
  14. import java.util.Set;
  15. /**
  16. * @author lw
  17. * @since 2021/8/11
  18. **/
  19. public class NettyServer {
  20. public static void main(String[] args) throws InterruptedException {
  21. //1 创建两个线程组bossGroup workerGroup
  22. //2 bossGroup处理客户端连接请求 workerGroup处理客户端数据读写请求
  23. //3 他们都是事件循环组 默认含有CPU核数*2个事件循环NioEventLoop
  24. NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
  25. NioEventLoopGroup workerGroup = new NioEventLoopGroup();
  26. try {
  27. //创建服务器端启动对象,进行参数配置
  28. ServerBootstrap serverBootstrap = new ServerBootstrap();
  29. serverBootstrap.group(bossGroup, workerGroup) //设置两个线程组
  30. .channel(NioServerSocketChannel.class) //设置服务器端通道为NioServerSocketChannel
  31. .option(ChannelOption.SO_BACKLOG, 128) //设置连接队列允许的连接个数
  32. .childOption(ChannelOption.SO_KEEPALIVE, true) //设置连接连接状态为活动连接
  33. .childHandler(new ChannelInitializer<SocketChannel>() { //给workerGroup NioEventLoop通过管道添加处理器
  34. @Override
  35. protected void initChannel(SocketChannel socketChannel) throws Exception {
  36. socketChannel.pipeline().addLast(new NettyServerHandler());
  37. }
  38. });
  39. //启动服务器并绑定一个端口
  40. ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
  41. System.out.println("ok");
  42. //对通道的关闭事件进行监听
  43. channelFuture.channel().closeFuture().sync();
  44. } finally {
  45. bossGroup.shutdownGracefully();
  46. workerGroup.shutdownGracefully();
  47. }
  48. }
  49. }
  50. package com.tech.netty.netty.source.async;
  51. import io.netty.buffer.ByteBuf;
  52. import io.netty.buffer.Unpooled;
  53. import io.netty.channel.ChannelHandlerContext;
  54. import io.netty.channel.ChannelInboundHandlerAdapter;
  55. import io.netty.util.CharsetUtil;
  56. import lombok.extern.slf4j.Slf4j;
  57. /**
  58. * @author lw
  59. * @since 2021/8/11
  60. **/
  61. //自定义一个Handler
  62. @Slf4j
  63. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
  64. @Override
  65. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  66. log.info("channel={} | handlerAdded",ctx.channel().id());
  67. }
  68. //读取客户端发送的消息
  69. // ChannelHandlerContext 是一个上下文对象,可以获取通道和管道信息
  70. // msg 是客户端发送的消息
  71. @Override
  72. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  73. log.info("服务器读取线程 "+Thread.currentThread().getName());
  74. ByteBuf byteBuf = (ByteBuf) msg;
  75. System.out.println("客户端发送消息是:"+byteBuf.toString(CharsetUtil.UTF_8));
  76. ctx.channel().eventLoop().execute(() -> {
  77. try {
  78. log.info("第一个任务开始执行");
  79. Thread.sleep(5*1000);
  80. } catch (InterruptedException e) {
  81. e.printStackTrace();
  82. }
  83. ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 1",CharsetUtil.UTF_8));
  84. log.info("第一个提交任务的线程 "+Thread.currentThread().getName());
  85. });
  86. ctx.channel().eventLoop().execute(() -> {
  87. try {
  88. log.info("第二个任务开始执行");
  89. Thread.sleep(5*1000);
  90. } catch (InterruptedException e) {
  91. e.printStackTrace();
  92. }
  93. ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 1",CharsetUtil.UTF_8));
  94. log.info("第二个提交任务的线程 "+Thread.currentThread().getName());
  95. });
  96. ctx.writeAndFlush(Unpooled.copiedBuffer("go on....",CharsetUtil.UTF_8));
  97. log.info("go on...");
  98. }
  99. //数据读取完毕
  100. @Override
  101. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  102. }
  103. //处理异常 一般是需要关闭通道
  104. @Override
  105. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  106. ctx.close();
  107. }
  108. }
  109. package com.tech.netty.netty.source.async;
  110. import io.netty.bootstrap.Bootstrap;
  111. import io.netty.channel.ChannelFuture;
  112. import io.netty.channel.ChannelInitializer;
  113. import io.netty.channel.nio.NioEventLoopGroup;
  114. import io.netty.channel.socket.SocketChannel;
  115. import io.netty.channel.socket.nio.NioSocketChannel;
  116. /**
  117. * @author lw
  118. * @since 2021/8/11
  119. **/
  120. public class NettyClient {
  121. public static void main(String[] args) throws InterruptedException {
  122. //客户端需要一个事件循环组
  123. NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
  124. try {
  125. // 创建客户端启动对象 并配置参数
  126. Bootstrap bootstrap = new Bootstrap();
  127. bootstrap.group(eventExecutors) //设置线程组
  128. .channel(NioSocketChannel.class) //设置客户端通道
  129. .handler(new ChannelInitializer<SocketChannel>() { //设置处理器
  130. @Override
  131. protected void initChannel(SocketChannel socketChannel) throws Exception {
  132. socketChannel.pipeline().addLast(new NettyClientHandler());
  133. }
  134. });
  135. System.out.println("...客户端 is ready...");
  136. //连接服务器
  137. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7000).sync();
  138. //对通道关闭事件进行监听
  139. channelFuture.channel().closeFuture().sync();
  140. } finally {
  141. eventExecutors.shutdownGracefully();
  142. }
  143. }
  144. }
  145. package com.tech.netty.netty.source.async;
  146. import io.netty.buffer.ByteBuf;
  147. import io.netty.buffer.Unpooled;
  148. import io.netty.channel.ChannelHandlerContext;
  149. import io.netty.channel.ChannelInboundHandlerAdapter;
  150. import io.netty.util.CharsetUtil;
  151. /**
  152. * @author lw
  153. * @since 2021/8/11
  154. **/
  155. public class NettyClientHandler extends ChannelInboundHandlerAdapter {
  156. //当通道就绪会触发该方法 给服务器发送数据
  157. @Override
  158. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  159. ctx.writeAndFlush(Unpooled.copiedBuffer("hello 服务器", CharsetUtil.UTF_8));
  160. }
  161. //当通道有读取事件时触发
  162. @Override
  163. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  164. ByteBuf byteBuf = (ByteBuf) msg;
  165. System.out.println("服务器回复的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
  166. }
  167. @Override
  168. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  169. cause.printStackTrace();
  170. ctx.close();
  171. }
  172. }
  173. 通过结果可以看到
  174. handler线程和运行任务队列里任务的线程是同一个线程
  175. 任务队列里任务按序逐一取出执行

#

二 在Handler中定义线程池

通过在Handler中定义一个线程池,执行耗时较高的操作,IO线程执行IO操作,业务操作由业务线程来处理,提高IO效率,如果业务操作中需要进行IO操作,则在调用的时候Netty会判断当前线程是否为IO线程,如果不是则会则会将IO操作添加到对应事件循环的任务队列中,有事件循环的IO线程来运行。

  1. package com.tech.netty.netty.source.async;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.ChannelInboundHandlerAdapter;
  6. import io.netty.util.CharsetUtil;
  7. import io.netty.util.concurrent.DefaultEventExecutor;
  8. import io.netty.util.concurrent.DefaultEventExecutorGroup;
  9. import io.netty.util.concurrent.EventExecutorGroup;
  10. import lombok.extern.slf4j.Slf4j;
  11. import java.util.concurrent.Callable;
  12. /**
  13. * @author lw
  14. * @since 2021/8/11
  15. **/
  16. //自定义一个Handler
  17. @Slf4j
  18. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
  19. static final EventExecutorGroup group=new DefaultEventExecutorGroup(16);
  20. @Override
  21. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  22. log.info("channel={} | handlerAdded",ctx.channel().id());
  23. }
  24. //读取客户端发送的消息
  25. // ChannelHandlerContext 是一个上下文对象,可以获取通道和管道信息
  26. // msg 是客户端发送的消息
  27. @Override
  28. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  29. // log.info("服务器读取线程 "+Thread.currentThread().getName());
  30. // ByteBuf byteBuf = (ByteBuf) msg;
  31. // log.info("客户端发送消息是:"+byteBuf.toString(CharsetUtil.UTF_8));
  32. //
  33. // ctx.channel().eventLoop().execute(() -> {
  34. // try {
  35. // log.info("第一个任务开始执行");
  36. // Thread.sleep(5*1000);
  37. // } catch (InterruptedException e) {
  38. // e.printStackTrace();
  39. // }
  40. // ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 1",CharsetUtil.UTF_8));
  41. // log.info("第一个提交任务的线程 "+Thread.currentThread().getName());
  42. // });
  43. //
  44. // ctx.channel().eventLoop().execute(() -> {
  45. // try {
  46. // log.info("第二个任务开始执行");
  47. // Thread.sleep(5*1000);
  48. // } catch (InterruptedException e) {
  49. // e.printStackTrace();
  50. // }
  51. // ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 1",CharsetUtil.UTF_8));
  52. // log.info("第二个提交任务的线程 "+Thread.currentThread().getName());
  53. // });
  54. //
  55. // ctx.writeAndFlush(Unpooled.copiedBuffer("go on....",CharsetUtil.UTF_8));
  56. final Object msgCop=msg;
  57. final ChannelHandlerContext ctxCop=ctx;
  58. //在handler中添加业务线程池 执行耗时任务
  59. group.submit(new Callable<Object>() {
  60. @Override
  61. public Object call() throws Exception {
  62. ByteBuf buff = (ByteBuf) msgCop;
  63. byte[] bytes = new byte[buff.readableBytes()];
  64. buff.readBytes(bytes);
  65. String s = new String(bytes, CharsetUtil.UTF_8);
  66. log.info("读取到消息:{}",s);
  67. Thread.sleep(10*1000);
  68. ctxCop.writeAndFlush(Unpooled.copiedBuffer("hello client",CharsetUtil.UTF_8));
  69. log.info("发送完成");
  70. return null;
  71. }
  72. });
  73. group.submit(new Callable<Object>() {
  74. @Override
  75. public Object call() throws Exception {
  76. ByteBuf buff = (ByteBuf) msgCop;
  77. byte[] bytes = new byte[buff.readableBytes()];
  78. buff.readBytes(bytes);
  79. String s = new String(bytes, CharsetUtil.UTF_8);
  80. log.info("1读取到消息:{}",s);
  81. Thread.sleep(10*1000);
  82. ctxCop.writeAndFlush(Unpooled.copiedBuffer("hello client1",CharsetUtil.UTF_8));
  83. log.info("1发送完成");
  84. return null;
  85. }
  86. });
  87. log.info("go on...");
  88. }
  89. //数据读取完毕
  90. @Override
  91. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  92. ctx.flush();
  93. }
  94. //处理异常 一般是需要关闭通道
  95. @Override
  96. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  97. cause.printStackTrace();
  98. ctx.close();
  99. }
  100. }

执行结果:

  1. IO线程打印了go on
  2. 两个业务线程同时开始读取消息,休眠10s 发送消息

实现异步执行。

#

三 在Context中指定线程池

在pipeline添加handler时,指定业务线程池,则handler中的非IO操作都由业务线程来运行,遇到IO操作将操作封装为Task提交到任务队列,由事件循环的线程来运行任务队列中的任务时执行。

  1. package com.tech.netty.netty.source.async;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelFutureListener;
  5. import io.netty.channel.ChannelInitializer;
  6. import io.netty.channel.ChannelOption;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioServerSocketChannel;
  10. import io.netty.util.concurrent.DefaultEventExecutorGroup;
  11. import io.netty.util.concurrent.EventExecutorGroup;
  12. import java.util.HashSet;
  13. import java.util.Set;
  14. /**
  15. * @author lw
  16. * @since 2021/8/11
  17. **/
  18. public class NettyServer {
  19. static final EventExecutorGroup group=new DefaultEventExecutorGroup(16);
  20. public static void main(String[] args) throws InterruptedException {
  21. //1 创建两个线程组bossGroup workerGroup
  22. //2 bossGroup处理客户端连接请求 workerGroup处理客户端数据读写请求
  23. //3 他们都是事件循环组 默认含有CPU核数*2个事件循环NioEventLoop
  24. NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
  25. NioEventLoopGroup workerGroup = new NioEventLoopGroup();
  26. try {
  27. //创建服务器端启动对象,进行参数配置
  28. ServerBootstrap serverBootstrap = new ServerBootstrap();
  29. serverBootstrap.group(bossGroup, workerGroup) //设置两个线程组
  30. .channel(NioServerSocketChannel.class) //设置服务器端通道为NioServerSocketChannel
  31. .option(ChannelOption.SO_BACKLOG, 128) //设置连接队列允许的连接个数
  32. .childOption(ChannelOption.SO_KEEPALIVE, true) //设置连接连接状态为活动连接
  33. .childHandler(new ChannelInitializer<SocketChannel>() { //给workerGroup NioEventLoop通过管道添加处理器
  34. @Override
  35. protected void initChannel(SocketChannel socketChannel) throws Exception {
  36. socketChannel.pipeline()
  37. .addLast(group,new NettyServerHandler());
  38. }
  39. });
  40. //启动服务器并绑定一个端口
  41. ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
  42. System.out.println("ok");
  43. //对通道的关闭事件进行监听
  44. channelFuture.channel().closeFuture().sync();
  45. } finally {
  46. bossGroup.shutdownGracefully();
  47. workerGroup.shutdownGracefully();
  48. }
  49. }
  50. }
  51. package com.tech.netty.netty.source.async;
  52. import io.netty.buffer.ByteBuf;
  53. import io.netty.buffer.Unpooled;
  54. import io.netty.channel.ChannelHandlerContext;
  55. import io.netty.channel.ChannelInboundHandlerAdapter;
  56. import io.netty.util.CharsetUtil;
  57. import io.netty.util.concurrent.DefaultEventExecutorGroup;
  58. import io.netty.util.concurrent.EventExecutorGroup;
  59. import lombok.extern.slf4j.Slf4j;
  60. import java.util.concurrent.Callable;
  61. /**
  62. * @author lw
  63. * @since 2021/8/11
  64. **/
  65. //自定义一个Handler
  66. @Slf4j
  67. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
  68. static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
  69. @Override
  70. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  71. log.info("channel={} | handlerAdded", ctx.channel().id());
  72. }
  73. //读取客户端发送的消息
  74. // ChannelHandlerContext 是一个上下文对象,可以获取通道和管道信息
  75. // msg 是客户端发送的消息
  76. @Override
  77. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  78. // log.info("服务器读取线程 "+Thread.currentThread().getName());
  79. // ByteBuf byteBuf = (ByteBuf) msg;
  80. // log.info("客户端发送消息是:"+byteBuf.toString(CharsetUtil.UTF_8));
  81. //
  82. // ctx.channel().eventLoop().execute(() -> {
  83. // try {
  84. // log.info("第一个任务开始执行");
  85. // Thread.sleep(5*1000);
  86. // } catch (InterruptedException e) {
  87. // e.printStackTrace();
  88. // }
  89. // ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 1",CharsetUtil.UTF_8));
  90. // log.info("第一个提交任务的线程 "+Thread.currentThread().getName());
  91. // });
  92. //
  93. // ctx.channel().eventLoop().execute(() -> {
  94. // try {
  95. // log.info("第二个任务开始执行");
  96. // Thread.sleep(5*1000);
  97. // } catch (InterruptedException e) {
  98. // e.printStackTrace();
  99. // }
  100. // ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 1",CharsetUtil.UTF_8));
  101. // log.info("第二个提交任务的线程 "+Thread.currentThread().getName());
  102. // });
  103. //
  104. // ctx.writeAndFlush(Unpooled.copiedBuffer("go on....",CharsetUtil.UTF_8));
  105. final Object msgCop = msg;
  106. final ChannelHandlerContext ctxCop = ctx;
  107. //在handler中添加业务线程池 执行耗时任务
  108. // group.submit(new Callable<Object>() {
  109. // @Override
  110. // public Object call() throws Exception {
  111. // ByteBuf buff = (ByteBuf) msgCop;
  112. // byte[] bytes = new byte[buff.readableBytes()];
  113. // buff.readBytes(bytes);
  114. // String s = new String(bytes, CharsetUtil.UTF_8);
  115. // log.info("读取到消息:{}",s);
  116. // Thread.sleep(10*1000);
  117. // ctxCop.writeAndFlush(Unpooled.copiedBuffer("hello client",CharsetUtil.UTF_8));
  118. // log.info("发送完成");
  119. // return null;
  120. // }
  121. // });
  122. // group.submit(new Callable<Object>() {
  123. // @Override
  124. // public Object call() throws Exception {
  125. // ByteBuf buff = (ByteBuf) msgCop;
  126. // byte[] bytes = new byte[buff.readableBytes()];
  127. // buff.readBytes(bytes);
  128. // String s = new String(bytes, CharsetUtil.UTF_8);
  129. // log.info("1读取到消息:{}",s);
  130. // Thread.sleep(10*1000);
  131. // ctxCop.writeAndFlush(Unpooled.copiedBuffer("hello client1",CharsetUtil.UTF_8));
  132. // log.info("1发送完成");
  133. // return null;
  134. // }
  135. // });
  136. ByteBuf buff = (ByteBuf) msgCop;
  137. byte[] bytes = new byte[buff.readableBytes()];
  138. buff.readBytes(bytes);
  139. String s = new String(bytes, CharsetUtil.UTF_8);
  140. log.info("读取到消息:{}", s);
  141. Thread.sleep(10 * 1000);
  142. ctxCop.writeAndFlush(Unpooled.copiedBuffer("hello client", CharsetUtil.UTF_8));
  143. log.info("发送完成");
  144. log.info("go on...");
  145. }
  146. //数据读取完毕
  147. @Override
  148. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  149. ctx.flush();
  150. }
  151. //处理异常 一般是需要关闭通道
  152. @Override
  153. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  154. cause.printStackTrace();
  155. ctx.close();
  156. }
  157. }

这样NettyServerHandler中的操作将会使用业务线程运行,在调用IO操作时(读写数据)会提交任务队列由时间循环线程执行。

当给handler指定线程池后,会发现将任务提交到了队列

watermark_type_ZHJvaWRzYW5zZmFsbGJhY2s_shadow_50_text_Q1NETiBA6KaB5LqJ5rCU_size_20_color_FFFFFF_t_70_g_se_x_16

于此同时日志前缀输出不再是nioEventLoopGroup,而是 DefaultEventExecutorGroup,说明配置的线程池生效。

发表评论

表情:
评论列表 (有 0 条评论,437人围观)

还没有评论,来说两句吧...

相关阅读