netty实战笔记 第四章 传输
探索Netty所提供的不同类型的传输,以及如何选择一个最适合你的应用程序的传输。
本章主要内容
OIO 阻塞传输
NIO 异步传输
Local jvm内部的通信机制
Embedded 测试你的Channelhandler
写一个案例:
java 写一个应用程序简单地接收连接,向客户端写Hi,然后关闭连接。
我们分为java的阻塞IO(OIO),NIO,netty的OIO和NIO分别来实现。
java NIO版本
package com.moyang.netty.transport;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NIOSocketDemo {
public void server(int port) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
serverSocket.bind(address);
// 打开Selector来处理Channel
Selector selector = Selector.open();
// 将ServerSocket注册到Selector以接受连接
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer msg = ByteBuffer.wrap("hi!\r\n".getBytes());
for (; ; ) {
try {
selector.select();
} catch (IOException e) {
e.printStackTrace();
break;
}
// 获取所有接受时间的selectionKey实例
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_ACCEPT | SelectionKey.OP_READ, msg.duplicate());
System.out.println("accept connection from " + client);
}
if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
if (buffer.hasRemaining()) {
if (client.write(buffer) == 0) {
break;
}
}
client.close();
}
}catch (Exception e){
key.cancel();
key.channel().close();
}
}
}
}
public static void main(String[] args) {
try {
new NIOSocketDemo().server(8000);
} catch (IOException e) {
e.printStackTrace();
}
}
}
netty OIO版本
package com.moyang.netty.transport;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.oio.OioServerSocketChannel;
import java.nio.charset.Charset;
/**
* 使用netty实现的阻塞版网络处理
*/
public class NettyOIODemo {
public void server(int port) {
final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi\r\n", Charset.defaultCharset().forName("UTF-8")));
EventLoopGroup group = new OioEventLoopGroup();
try {
// 创建ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(OioServerSocketChannel.class)
.localAddress(port)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { // 添加一个ChannelInboundHandler来拦截和处理事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(buf.duplicate())
//将消息重写到客户端,并添加ChannelFutureListener,以便消息一被写完就关闭连接。
.addListener(ChannelFutureListener.CLOSE);
}
});
}
});
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
} finally {
try {
group.shutdownGracefully().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new NettyOIODemo().server(8080);
}
}
netty NIO版本
package com.moyang.netty.transport;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.nio.charset.Charset;
public class NettyNIODemo {
public void server(int port) {
final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi\r\n",
Charset.forName("UTF-8")));
// 注意:使用了不同EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();
try {
// 创建ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
b.group(group)
// 使用NioServerSocketChannel,非阻塞模式
// 注意和OIO模式相比
.channel(NioServerSocketChannel.class)
.localAddress(port)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 将消息写完关闭连接。
ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
}
});
}
});
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
group.shutdownGracefully().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new NettyNIODemo().server(8081);
}
}
以上就是简单的对Netty的使用。接下来详细的学习一下Netty的API
4.2 传输API
传输API的核心是Channel接口。public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable
每个Channel都会被分配一个ChannelPipeline和ChannelConfig。ChannelConfig包含了该Channel的所有配置,并且支持热更新。可以接收一个ChannelConfig的子类。
Channel实现Comparable接口,如果两个不同的Channel实例返回了相同的散列码就会报错。
ChannelPipeline持有将所有的入站和出站的数据及事件的Channel实例。这些Channel实例实现了应用处理状态边和数据处理的逻辑。
ChannelHandler的典型用途:
- 将数据从一种格式转换为另一中格式。
- 提供异常的通知。
- 提供Channel变为活动的或者非活动的通知。
- 提供当Channel注册到EventLoop或者EventLoop注销时的通知
- 提供有关用户自定义事件的通知。
Netty的Channel实现是线程安全的.
4.3 内置的传输
netty内置了一些开箱急用的传输。但是并不是他们所有的传输都支持每一种协议。所以你必须选择一个和你的应用程序所有的协议相容的传输。
表格: netty所提供的传输
名称 | 包 | 描述 |
---|---|---|
NIO | io.netty.channel.socket.nio | 使用java.nio.channels包作为基础,基于选择器的方式 |
Epoll | io.netty.channel.epoll | 由JNI驱动的epoll()和非阻塞IO。这个传输支持只有在Liunx上可用的多种特性,如SO_REUSEOIRT比NIO传输更快,而且完全是非阻塞的。 |
OIO | io.netty.channel.socket.oio | 使用java.net包作为基础–使用阻塞流 |
Local | io.netty.channel.local | 可以在VM内部通过管道进行通信的本地传输 |
Embedded | io.netty.channel.embedded | Embedded传输,允许使用Channelhandler而用不需要一个真正的基于网网络的传输,在测试你的ChannelHandler实现时非常有用 |
4.3.1 NIO – 非阻塞IO
NIO提供了一个所有IO操作的全异步的实现,是基于选择器的API。
选择器的基本概念是充当一个注册表,在请求在Channel的状态发生变化的时候得到通知。可能变化有:
- 新的Channel已被接受并且就绪。
- Channel连接已经完成
- Channel有已经就绪的可供读取的数据。
- Channel可用于写数据。
选择器运行一个检查状态变化并对其做出响应的线程上,在应用程序对状态的改变作出响应之后,选择器将会被重置,并重复这个过程。
4.3.2 Epoll–用于Liunx的本地非阻塞传输
性能要比NIO高。
使用的方式,替换为EpollServerSocketChannel.class即可。
4.3.3 OIO – 旧的阻塞IO
Netty利用了SO_TIMEOUT这个Socket标志,它指定了等待一个I/O操作完成的最大毫秒数。如果操作在指定的时间间隔内没有完成,则将会抛出一个SocketTimeout Exception。Netty将捕获这个异常并继续处理循环。在EventLoop下一次运行时,它将再次尝试。这实际上也是类似于Netty这样的异步框架能够支持OIO的唯一方式。
4.3.4 用于JVM内容通信的Local传输
在这个传输中,和服务器Channel 相关联的SocketAddress 并没有绑定物理网络地址;相反,只要服务器还在运行,它就会被存储在注册表里,并在Channel 关闭时注销。因为这个传输并不接受真正的网络流量,所以它并不能够和其他传输实现进行互操作。因此,客户端希望连接到(在同一个JVM 中)使用了这个传输的服务器端时也必须使用它。除了这个限制,它的使用方式和其他的传输一模一样。
4.3.5 Embedded传输
Netty提供了一种额外的传输,使得你可以将一组CHannelHandler作为帮助器类嵌入到其他的ChannelHandler内部、通过这种方式,你将可以扩展一个ChannelHandler的功能,而又不需要修改内部的代码。
Embedded传输的关键是一个被称为EmbeddedChannel的具体Channel实现。
下一章,介绍ByteBuf和ButeBufHolder-Netty的数据容器。
最后
如果你觉得写的还不错,就关注下公众号呗,关注后,有点小礼物回赠给你。
你可以获得5000+电子书,java,springCloud,adroid,python等各种视频教程,IT类经典书籍,各种软件的安装及破解教程。
希望一块学习,一块进步!
还没有评论,来说两句吧...