提交任务到任务队列 水深无声 2022-09-04 11:40 136阅读 0赞 * 在服务端处理器中,对于比较耗时的任务,可以提交任务到任务队列进行异步执行; * 任务队列有两种任务队列,和定时任务队列,定时任务队列在放入任务开始计时,到时在队列取出执行; * 提交任务队列需要先获取通道,根据通道获取EventLoop,根据事件循环的方法提交任务到任务队列; 服务端: package com.tech.netty.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * @author lw * @since 2021/8/11 **/ public class NettyServer { public static void main(String[] args) throws InterruptedException { //1 创建两个线程组bossGroup workerGroup //2 bossGroup处理客户端连接请求 workerGroup处理客户端数据读写请求 //3 他们都是事件循环组 默认含有CPU核数*2个事件循环NioEventLoop NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { //创建服务器端启动对象,进行参数配置 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) //设置两个线程组 .channel(NioServerSocketChannel.class) //设置服务器端通道为NioServerSocketChannel .option(ChannelOption.SO_BACKLOG, 128) //设置连接队列允许的连接个数 .childOption(ChannelOption.SO_KEEPALIVE, true) //设置连接连接状态为活动连接 .childHandler(new ChannelInitializer<SocketChannel>() { //给workerGroup NioEventLoop通过管道添加处理器 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyServerHandler()); } }); System.out.println("...服务器 is ready..."); //启动服务器并绑定一个端口 ChannelFuture channelFuture = serverBootstrap.bind(6668).sync(); //对通道的关闭事件进行监听 channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } package com.tech.netty.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; /** * @author lw * @since 2021/8/11 **/ public class NettyClientHandler extends ChannelInboundHandlerAdapter { //当通道就绪会触发该方法 给服务器发送数据 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client ctx:"+ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("hello 服务器", CharsetUtil.UTF_8)); } //当通道有读取事件时触发 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; System.out.println("服务器回复的消息:"+byteBuf.toString(CharsetUtil.UTF_8)); System.out.println("服务器的地址:"+ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } 客户端: package com.tech.netty.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * @author lw * @since 2021/8/11 **/ public class NettyServer { public static void main(String[] args) throws InterruptedException { //1 创建两个线程组bossGroup workerGroup //2 bossGroup处理客户端连接请求 workerGroup处理客户端数据读写请求 //3 他们都是事件循环组 默认含有CPU核数*2个事件循环NioEventLoop NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { //创建服务器端启动对象,进行参数配置 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) //设置两个线程组 .channel(NioServerSocketChannel.class) //设置服务器端通道为NioServerSocketChannel .option(ChannelOption.SO_BACKLOG, 128) //设置连接队列允许的连接个数 .childOption(ChannelOption.SO_KEEPALIVE, true) //设置连接连接状态为活动连接 .childHandler(new ChannelInitializer<SocketChannel>() { //给workerGroup NioEventLoop通过管道添加处理器 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyServerHandler()); } }); System.out.println("...服务器 is ready..."); //启动服务器并绑定一个端口 ChannelFuture channelFuture = serverBootstrap.bind(6668).sync(); //对通道的关闭事件进行监听 channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } package com.tech.netty.netty; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.TimeUnit; /** * @author lw * @since 2021/8/11 **/ //自定义一个Handler @Slf4j public class NettyServerHandler extends ChannelInboundHandlerAdapter { //读取客户端发送的消息 // ChannelHandlerContext 是一个上下文对象,可以获取通道和管道信息 // msg 是客户端发送的消息 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // System.out.println("服务器读取线程 "+Thread.currentThread().getName()); // System.out.println("server ctx:"+ctx); // Channel channel = ctx.channel(); // ChannelPipeline pipeline = ctx.pipeline(); // ByteBuf byteBuf = (ByteBuf) msg; // System.out.println("客户端发送消息是:"+byteBuf.toString(CharsetUtil.UTF_8)); // System.out.println("客户端地址:"+channel.remoteAddress()); ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(5*1000); } catch (InterruptedException e) { e.printStackTrace(); } ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 1",CharsetUtil.UTF_8)); log.info("channel:"+ctx.channel().hashCode()); } }); ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(5*1000); } catch (InterruptedException e) { e.printStackTrace(); } ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 2",CharsetUtil.UTF_8)); log.info("channel:"+ctx.channel().hashCode()); } }); ctx.channel().eventLoop().schedule(new Runnable() { @Override public void run() { try { Thread.sleep(5*1000); } catch (InterruptedException e) { e.printStackTrace(); } ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端 3",CharsetUtil.UTF_8)); log.info("channel:"+ctx.channel().hashCode()); } },15, TimeUnit.SECONDS); log.info("go on..."); } //数据读取完毕 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //发送数据 ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端",CharsetUtil.UTF_8)); } //处理异常 一般是需要关闭通道 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } 可以发服务端现执行结果: 处理器的方法和任务队列的方法都采用了同一个线程执行; 处理器的方法执行完成才会消费任务队列里的任务; 定时任务队列在放入任务开始就已经计时了;
还没有评论,来说两句吧...