Netty传输
Netty传输的API的核心是interface Channel,它用于所有的I/O操作。
每个Channel都将会被分配一个ChannelPipeline和ChannelConfig。ChannelConfig包含了该Channel的所有配置设置,并且支持热更新。由于特定的传输可能具有独特的设置,所以它可能会实现一个ChannelConfig的子类型。
由于Channel是独一无二的,所以为了保证顺序将Channel声明为java.lang. Comparable的一个子接口。因此,如果两个不同的Channel实例都返回了相同的散列码,那么AbstractChannel中的compareTo()方法的实现将会抛出一个Error。
ChannelPipeline(ChannelPipeline实现了一种常见的设计模式—拦截过滤器(Intercepting Filter))持有所有将应用于入站和出站数据以及事件的ChannelHandler实例,这些ChannelHandler实现了应用程序用于处理状态变化以及数据处理的逻辑。
ChannelHandler的典型用途包括:
- 将数据从一种格式转换为另一种格式;
- 提供异常的通知;
- 提供Channel变为活动的或者非活动的通知;
- 提供当Channel注册到EventLoop或者从EventLoop注销时的通知;
提供有关用户自定义事件的通知。
channel方法(重要)
方法名 | 描述 |
eventLoop | 返回分配给Channel 的EventLoop |
pipeline | 返回分配给Channel 的 ChannelPipeline |
isActive | 如果Channel是活动的,返回true,活动的意义可能依赖于底层的传输。 (例:一个Socket 传输一旦连接到远程节点便是活动的,一个Datagram传输一旦被打开便是活动的) |
localAddress | 返回本地的SocketAddress |
remoteAddress | 返回本地的SocketAddress |
write | 将数据写到远程节点。数据将被传递给ChannelPipeline,并排队直到数据被冲刷 |
flush | 将之前已写的数据冲刷到底层传输,(如一个socket) |
writeAndFlush | 一个简便的方法,等同于调用write()并接着调用flush() |
内置的传输
名称 | 包 | 描述 |
NIO | io.netty.channel.socket.nio | 使用java.nio.channels 包作为基础 — 基于选择器的方式 |
Epoll | io.netty.channel.epoll | 由JNI驱动的epoll()和非租塞IO。这个传输支持只有在Linux上可用的多种特性, (如:so_reuseport, 比NIO传输更快,而且是完全非租塞的) |
OIO | io.netty.channel.socket.oio | 使用java.net 包作为基础 — 使用阻塞流 |
Local | io.netty.channel.local | 可以在VM内部通过管道进行通信的本地传输 |
Embedded | io.netty.channel.embedded | Embedded传输,允许使用ChannelHandler而又不需要一个真正的基于网络的传输。 这在测试你的ChannelHandler实现时非常有用 |
注:零拷贝
一种目前只有在使用NIO和Epoll传输时才可以使用的特性。
可以快速高效的将数据从文件系统移动到网络接口,且不需要将其从内核空间复制到用户空间。在FTP或HTTP协议中可以显著地提升性能
注:只能传输文件的原始内容。可以传输加密的文件,不可以传输加密的数据或压缩的文件。
NIO(非阻塞I/O)
NIO提供了一个所有I/O操作的全异步的实现。它利用了自NIO子系统被引入JDK 1.4时便可用的基于选择器的API。
选择器背后的基本概念是充当一个注册表,在那里你将可以请求在Channel的状态发生变化时得到通知。可能的状态变化有:
- 新的Channel已被接受并且就绪;
- Channel连接已经完成;
- Channel有已经就绪的可供读取的数据;
Channel可用于写数据。
选择器运行在一个检查状态变化并对其做出相应响应的线程上,在应用程序对状态的改变做出响应之后,选择器将会被重置,并将重复 这个过程。
选择操作的位模式
名称 | 描述 |
OP_ACCEPT | 请求在接受新连接并创建Channel时获得通知 |
OP_CONNECT | 请求在建立一个连接时获得通知 |
OP_READ | 请求当数据已经就绪,可以从Channel中读取时获得通知 |
OP_WRITE | 请求当可以向Channel中写更多的数据时获得通知。这处理了套接字缓冲区被完全填满时的情况,这种情况通常发生在数据的发送速度比远程节点可处理的速度更快的时候 |
Epoll(用于Linux的本地非阻塞传输)
Netty为Linux提供了一组NIO API,其以一种和它本身的设计更加一致的方式使用epoll,并且以一种更加轻量的方式使用中断。如果你的应用程序旨在运行于Linux系统, 那么请考虑利用这个版本的传输;你将发现在高负载下它的性能要优于JDK的NIO实现。
OIO(阻塞I/O)
它可以通过常规的传输API使用,但是由于它是建立在java.net包的阻塞实现之上的,所以它不是异步的。
用于 JVM内部通信的Local传输
Netty提供了一个Local传输, 用于在同一个JVM中运行 的客户端和服务程序之间的异步通信。同样,这个 传输也支持 对于所有Netty传输实现都共同的API。
在这个传输中,和服务器Channel相关联的SocketAddress并没有绑定物理网络地址;相反,只要服务器还在运行,它就会被存储在注册表里,并在Channel关闭时注销。因为这个传输并不接受真正的网络流量,所以它并不能够和其他传输实现进行互操作。因此,客户端希望连接到(在同一个JVM中)使用了这个传输的服务器端时也必须使用它。除了这个限制,它的使用方式和其他的传输一模一样
Embedded传输
Netty提供了一种额外的传输,使得你可以将一组ChannelHandler作为帮助器类嵌入到其他的ChannelHandler内部。通过这种方式,你将可以扩展个ChannelHandler的功能,而又不需要修改其内部代码。
传输用例
支持的传输和网络协议
传输 | TCP | UDP | SCTP | UDT |
NIO | X | X | X | X |
Epoll(仅Linux) | X | X | ||
OIO | X | X | X | X |
在Linux上启用SCTP需要安装用户库。
- 非阻塞代码库——如果你的代码库中没有阻塞调用(或者你能够限制它们的范围),那么在Linux上使用NIO或者epoll比较适合。虽然NIO/epoll旨在处理大量的并发连接,但 是在处理较小数目的并发连接时,它也能很好地工作,尤其是考虑到它在连接之间共享线程的方式。
- 阻塞代码库——如果你的代码库严重地依赖于阻塞I/O,而且你的应用程序也有一个相应的设计,那么在你尝试将其直接转换为Netty的NIO传输时,你将可能会遇到和阻塞操作相关的问题。不要为此而重写你的代码,可以考虑分阶段迁移:先从OIO开始,等你的代码修改好之后,再迁移到NIO(或者使用epoll,如果你在使用Linux)。
- 在同一个JVM内部的通信——在同一个JVM内部的通信,不需要通过网络暴露服务,是Local传输的完美用例。这将消除所有真实网络操作的销,同时仍然使用你的Netty代码库。如果随后需要通过网络暴露服务,那么你将只需要 把传输改为NIO或者OIO即可。
- 测试你的ChannelHandler实现——如果你想要为自己的ChannelHandler实现编写单元测试,那么请考虑使用Embedded传输。这既便于测试你的代码,而又不需要创建大量的模拟(mock)对象。你的类将仍然符合常规的API事件流, 保证该ChannelHandler在和真实的传输一起使用时能够正确地工作。
应用程序的需求 | 推荐的传输 |
非阻塞代码库或者一个常规的起点 | NIO(或者LInux上使用epoll) |
阻塞代码库 | OIO |
在同一个JVM内部的通信 | Local |
测试 ChannelHandler 的实现 | Embedded |
Demo:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.nio.charset.Charset;
public class EchoServer {
final ByteBuf bufs= Unpooled.copiedBuffer("Hello,刘德华", Charset.forName("UTF-8"));
public void bind(int port) throws Exception {
//配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChildChannelHandler());
// 绑定端口,同步等待成功
ChannelFuture f=b.bind(port).sync();
//等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
//退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("服务端启动……");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println(body);
ChannelFuture future=ctx.writeAndFlush(bufs);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess())
System.out.println("成功");
else
System.out.println("失败");
}
});
}
});
}
}
public static void main(String[] args) throws Exception {
int port=8080;
new EchoServer().bind(port);
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.nio.charset.Charset;
public class EchoClient {
final ByteBuf buf= Unpooled.copiedBuffer("Hello,王宝强", Charset.forName("UTF-8"));
public void connect(int port, String host) throws Exception {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChildChannelHandler() );
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
// 等待客户端链路关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("客户端启动……");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(buf);
}
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println(body);
ctx.close();
}
});
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
new EchoClient().connect(port, "127.0.0.1");
}
}
参考《Netty实战》
还没有评论,来说两句吧...