EpollEventLoopGroup 与 NioEventLoopGroup你知道吗 ゞ 浴缸里的玫瑰 2022-02-12 11:07 341阅读 0赞 Java NIO根据操作系统不同,比如 macosx 是KQueueSelectorProvider、windows有WindowsSelectorProvider、Linux有EPollSelectorProvider (**Linux kernels >= 2.6**,是epoll模式)或PollSelectorProvider(selector模式), 足以可见不同的系统对nio中的Selector有不同的实现,自4.0.16起,Netty为Linux通过JNI的方式提供了native socket transport。 Oracle jdk会自动选择合适的Selector,Oracle JDK在Linux已经默认使用epoll方式, 为什么netty还要提供一个基于epoll的实现呢? > [stackoverflow][]也解释过,具体可参阅官方[native-transports][],[Tomcat Native][] > > If you are running on linux you can use EpollEventLoopGroup and so get better performance, less GC and have more advanced features that are only available on linux. * Netty的 epoll transport使用 epoll edge-triggered 而 java的 nio 使用 level-triggered * Netty的 epoll transport 暴露了更多的nio没有的配置参数, 如 TCP\_CORK, SO\_REUSEADDR等。 * C代码,更少GC,更少synchronized 总之,linux上使用EpollEventLoopGroup会有较少的gc有更高级的特性,性能更好~! 那该如何使用native socket transport(epoll)呢? 其实只需将相应的类替换即可 <table> <tbody> <tr> <td style="width:327px;">NioEventLoopGroup</td> <td style="width:522px;">EpollEventLoopGroup</td> </tr> <tr> <td style="width:327px;">NioEventLoop </td> <td style="width:522px;">EpollEventLoop</td> </tr> <tr> <td style="width:327px;">NioServerSocketChannel </td> <td style="width:522px;">EpollServerSocketChannel</td> </tr> <tr> <td style="width:327px;">NioSocketChannel </td> <td style="width:522px;">EpollSocketChannel</td> </tr> </tbody> </table> 很多优秀的源码中对此都进行了兼容性处理,比如rocketmq //源于org.apache.rocketmq.remoting.netty public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer { public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) { super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue()); this.serverBootstrap = new ServerBootstrap(); this.nettyServerConfig = nettyServerConfig; this.channelEventListener = channelEventListener; int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads(); if (publicThreadNums <= 0) { publicThreadNums = 4; } this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet()); } }); if (useEpoll()) { this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet())); } }); this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } }); } else { this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet())); } }); this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } }); } loadSslContext(); } private boolean useEpoll() { return RemotingUtil.isLinuxPlatform() && nettyServerConfig.isUseEpollNativeSelector() && Epoll.isAvailable(); } } 怎么读懂这块的源码? 首先从EventExecutorGroup开始,EventExecutorGroup是NioEventLoopGroup最上层的接口,它继承了**java.util.concurrent.ScheduledExecutorService**接口,因此它可以调度执行task。 * EventExecutorGroup内部管理了n个EventExecutor,**next()**方法返回其中的一个 * EventExecutor也是EventExecutorGroup(的子类) ![20190429205226741.png][] EventExecutorGroup就像一个BOSS,每当有活儿的时候,就派一个小弟(EventExecutor)去干。 MultithreadEventExecutorGroup的每一个小弟都是一个SingleThreadEventExecutor,而且小弟的数量在构造的时候就确定了,这个BOSS的小弟分配逻辑相当简单,无非就是轮流使唤(next方法),这个BOSS的每一个小弟都是一个NioEventLoop。 public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> { } //实现了ScheduledExecutorService接口 public abstract class AbstractEventExecutorGroup implements EventExecutorGroup { public Future<?> submit(Runnable task) { return this.next().submit(task); } public <T> Future<T> submit(Callable<T> task) { return this.next().submit(task); } } public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup { private final EventExecutor[] children; private final Set<EventExecutor> readonlyChildren; private final AtomicInteger terminatedChildren; private final Promise<?> terminationFuture; private final EventExecutorChooser chooser; public EventExecutor next() { return this.chooser.next(); } } public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory { public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory(); private static final class GenericEventExecutorChooser implements EventExecutorChooser { //调度逻辑,这里调用了Math.abs()方法以防止executors溢出 public EventExecutor next() { return this.executors[Math.abs(this.idx.getAndIncrement() % this.executors.length)]; } } } public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup { public EventLoop next() { return (EventLoop)super.next(); } protected abstract EventLoop newChild(Executor var1, Object... var2) throws Exception; public ChannelFuture register(Channel channel) { return this.next().register(channel); } public ChannelFuture register(ChannelPromise promise) { return this.next().register(promise); } } public class NioEventLoopGroup extends MultithreadEventLoopGroup { public void rebuildSelectors() { Iterator var1 = this.iterator(); while(var1.hasNext()) { EventExecutor e = (EventExecutor)var1.next(); ((NioEventLoop)e).rebuildSelector(); } } } ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2l0c29mdGNoZW5mZWk_size_16_color_FFFFFF_t_70][] * NioEventLoopGroup实际上就是个**线程池** * NioEventLoopGroup在后台启动了n个NioEventLoop来处理Channel事件 * 每一个NioEventLoop负责处理m个Channel,可以**理解为nio中Selector**,它有两个职责:一是作为IO线程处理IO事件,二是作为普通的线程处理通过execute等方法提交上来的任务 * NioEventLoopGroup从NioEventLoop数组里挨个取出NioEventLoop来处理Channel ![20190429214502529.png][] NioEventLoop在单线程里同时处理IO事件和其他任务(单线程模式下的Reactor),NioEventLoop尽量(但不能保证)按照给定的比率(默认为50%)来分配花在这两种事情上的时间。换句话说,我们不应该在NioEventLoop里执行耗时的操作(比如数据库操作),这样会卡死NioEventLoop,降低程序的响应性。![50589924][] [stackoverflow]: http://stackoverflow.com/questions/23465401/why-native-epoll-support-is-intoduced-in-netty [native-transports]: https://netty.io/wiki/native-transports.html [Tomcat Native]: https://netty.io/wiki/forked-tomcat-native.html [20190429205226741.png]: /images/20220212/01e1c77cad51450ab3a34269c2f63823.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2l0c29mdGNoZW5mZWk_size_16_color_FFFFFF_t_70]: /images/20220212/dc22390d54104500b2daff5c4c89e429.png [20190429214502529.png]: /images/20220212/f254d84a0e624d46b295a5354ef385fc.png [50589924]: /images/20220212/0432122ff4134913b04e1a3ea1c8b406.png
还没有评论,来说两句吧...