Netty4之业务线程池的使用

不念不忘少年蓝@ 2024-04-18 23:03 73阅读 0赞

此文章是基于Netty4.1,一般在使用Netty做服务端开发时,通常会定义I/O线程池及业务线程池。I/O线程池顾名思义用于处理网络连接及维护Channel的相关事件(一般像心跳及编解码都可以使用I/O线程池)。当需要处理比较耗时的业务逻辑也共用I/O线程池话会对整个服务的吞吐量有比较大的影响(曾经遇到过)。所以在生产环境中建议定义业务线程池。下面说说如何使用业务线程池及业务线程池处理逻辑的原理。

下面是一个Netty服务端初始化的简单例子:

  1. public class NettyServer {
  2. public static void main(String[] args) throws Exception {
  3. new NettyServer().start("127.0.0.1", 8081);
  4. }
  5. public void start(String host, int port) throws Exception {
  6. ExecutorService executorService = Executors.newCachedThreadPool();
  7. EventLoopGroup bossGroup = new NioEventLoopGroup(0, executorService);//Boss I/O线程池,用于处理客户端连接,连接建立之后交给work I/O处理
  8. EventLoopGroup workerGroup = new NioEventLoopGroup(0, executorService);//Work I/O线程池
  9. EventExecutorGroup businessGroup = new DefaultEventExecutorGroup(2);//业务线程池
  10. ServerBootstrap server = new ServerBootstrap();//启动类
  11. server.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
  12. .option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<SocketChannel>() {
  13. @Override
  14. protected void initChannel(SocketChannel ch) throws Exception {
  15. ChannelPipeline pipeline = ch.pipeline();
  16. pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, 3));
  17. pipeline.addLast(businessGroup, new ServerHandler());
  18. }
  19. });
  20. server.childOption(ChannelOption.TCP_NODELAY, true);
  21. server.childOption(ChannelOption.SO_RCVBUF, 32 * 1024);
  22. server.childOption(ChannelOption.SO_SNDBUF, 32 * 1024);
  23. InetSocketAddress addr = new InetSocketAddress(host, port);
  24. server.bind(addr).sync().channel();//重启服务
  25. }
  26. }

此文章主要是介绍对业务线程池的使用,其他Netty相关知识就不再说明。例子中initChannel()表示初始化一个Channel,并向Channel的Pipeline中添加处理的逻辑的Handler形成一个处理链,其中我们对ServerHandler这个处理器使用了一个业务线程池。下面看addList()的逻辑:

  1. @Override
  2. public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
  3. if (handlers == null) {
  4. throw new NullPointerException("handlers");
  5. }
  6. for (ChannelHandler h: handlers) {
  7. if (h == null) {
  8. break;
  9. }
  10. addLast(executor, null, h);
  11. }
  12. return this;
  13. }
  14. @Override
  15. public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
  16. final AbstractChannelHandlerContext newCtx;
  17. synchronized (this) {
  18. checkMultiplicity(handler);
  19. newCtx = newContext(group, filterName(name, handler), handler);
  20. addLast0(newCtx);
  21. // If the registered is false it means that the channel was not registered on an eventLoop yet.
  22. // In this case we add the context to the pipeline and add a task that will call
  23. // ChannelHandler.handlerAdded(...) once the channel is registered.
  24. if (!registered) {
  25. newCtx.setAddPending();
  26. callHandlerCallbackLater(newCtx, true);
  27. return this;
  28. }
  29. EventExecutor executor = newCtx.executor();
  30. if (!executor.inEventLoop()) {
  31. callHandlerAddedInEventLoop(newCtx, executor);
  32. return this;
  33. }
  34. }
  35. callHandlerAdded0(newCtx);
  36. return this;
  37. }

addList方法是将Handler包装成一个 AbstractChannelHandlerContext(链表结构)然后添加到处理链之中,其中线程分配是在newContext()方法中实现的。下面重点来了,

  1. private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
  2. return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
  3. }
  4. private EventExecutor childExecutor(EventExecutorGroup group) {
  5. if (group == null) {
  6. return null;
  7. }
  8. Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
  9. //是否每个事件分组一个单线程的事件执行器
  10. if (pinEventExecutor != null && !pinEventExecutor) {
  11. return group.next();
  12. }
  13. Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
  14. if (childExecutors == null) {
  15. // Use size of 4 as most people only use one extra EventExecutor.
  16. childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
  17. }
  18. // Pin one of the child executors once and remember it so that the same child executor
  19. // is used to fire events for the same channel.
  20. EventExecutor childExecutor = childExecutors.get(group);
  21. if (childExecutor == null) {
  22. childExecutor = group.next();
  23. childExecutors.put(group, childExecutor);
  24. }
  25. return childExecutor;
  26. }

上面的childExecutor(group)表示从group分配一个EventExecutor给这个Handler来处理业务,group就是在初始化传进来的businessGroup,childExecutor()先会判断是否需要为每个事件处理器handler分配一个执行器,一般默认为true,false表示如果两个处理器(Handler)使用同一个group那么可能会被分配同一个EventExecuto。然后会为这个group分配一个子的执行器集合。然后从group中拿一个执行器放到这个集合中。其中group.next表示从EventExecutorGroup随机拿一个执行器childExecutor。接下来看EventExecutor如何处理任务的。

上面说的EventExecutor一般是DefaultEventLoop extends SingleThreadEventLoop,在DefaultEventLoop有如下:

  1. @Override
  2. protected void run() {
  3. for (;;) {
  4. Runnable task = takeTask();
  5. if (task != null) {
  6. task.run();
  7. updateLastExecutionTime();
  8. }
  9. if (confirmShutdown()) {
  10. break;
  11. }
  12. }
  13. }

上面可以看出DefaultEventLoop起了一个循环任务,一直都获取任务执行,这个taskTask()方法在其父类中定义:

  1. protected Runnable takeTask() {
  2. assert inEventLoop();
  3. if (!(taskQueue instanceof BlockingQueue)) {
  4. throw new UnsupportedOperationException();
  5. }
  6. BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
  7. for (;;) {
  8. ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
  9. if (scheduledTask == null) {
  10. Runnable task = null;
  11. try {
  12. task = taskQueue.take();
  13. if (task == WAKEUP_TASK) {
  14. task = null;
  15. }
  16. } catch (InterruptedException e) {
  17. // Ignore
  18. }
  19. return task;
  20. } else {
  21. long delayNanos = scheduledTask.delayNanos();
  22. Runnable task = null;
  23. if (delayNanos > 0) {
  24. try {
  25. task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
  26. } catch (InterruptedException e) {
  27. // Waken up.
  28. return null;
  29. }
  30. }
  31. if (task == null) {
  32. // We need to fetch the scheduled tasks now as otherwise there may be a chance that
  33. // scheduled tasks are never executed if there is always one task in the taskQueue.
  34. // This is for example true for the read task of OIO Transport
  35. // See https://github.com/netty/netty/issues/1614
  36. fetchFromScheduledTaskQueue();
  37. task = taskQueue.poll();
  38. }
  39. if (task != null) {
  40. return task;
  41. }
  42. }
  43. }
  44. }

上面代码表示从一个队列中的获取任务。当channel中触发一个ServerHandler事件时,会将这个事件封装成一个task放到BlockingQueue这个阻塞队列中,等待这个执行器去执行。

总结:

1、在使用业务线程池的时候同一个Channel的同一个处理器Handler使用的是同一个EventExecutor,也可理解是单线程在执行同一个处理器的任务。

2、Handler任务是通过BlockingQueue来解藕且只有一个线程在处理同一个Handler的任务,所以同一个Channel的同一个处理器的任务执行是有序的,从而可以兼容Netty3中的OrderedMemoryAwareThreadPoolExecutor的有序性

3、在处理业务逻辑的尽量不要使用I/O线程,这样会影响服务有吞吐量。(之前用Netty实现http接口,没有定义线程池然后表现是应用内部处理很快,但是调用方就是超时。就是I/O线程线时间被占用,导致请求一直在等待连接,从而调用方超时)

4、业务线程池要用EventExecutorGroup,EventLoopGroup这个是给I/O线程使用,里面有一些处理网络的方法。

发表评论

表情:
评论列表 (有 0 条评论,73人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Netty4线模型

    Reactor模型 单线程模型 Reactor单线程模型,指的是所有的IO操作都在同一个NIO线程上面完成,NIO线程的职责如下: 1)作为NIO服务端,接收客户