nio与netty
IO分为三种
- BIO(传统IO) 同步阻塞IO
- NIO同步非阻塞IO
- AIO异步非阻塞IO(涉及linux网络底层,太难,这里不讨论)
阻塞和非阻塞区别:
传统IO是阻塞的,因为read(),write()方法就是阻塞的,此时必须等读完了或者写完了才能进行后面的操作。NIO是非阻塞的,才用多路复用技术,把read()和write()单独给了一个线程,由操作系统完成,所以操作系统读完了或者写完了只需要给我们代码发个消息,代码直接去拿结果,中间不用等待,可以做自己的事。
原生NIO代码
package cn.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class NIOService {
private Selector selector;
public void initService() throws IOException{
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(12345));
this.selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("【星际联邦】:开启服务成功");
}
public void listenService() throws IOException{
while(true){
selector.select();
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
iterator.remove();
if (selectionKey.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
SocketChannel channel = server.accept();
channel.configureBlocking(false);
channel.write(ByteBuffer.wrap("【星际联邦】:您已成功连接星际联邦。".getBytes("UTF-8")));
channel.register(this.selector, SelectionKey.OP_READ);
}else if (selectionKey.isReadable()) {
SocketChannel channel =(SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
channel.read(byteBuffer);
byte[] data = byteBuffer.array();
String message = new String(data).trim();
System.out.println("【星际联邦】:接受到消息"+message);
ByteBuffer responseBuffer = ByteBuffer.wrap(new String("【星际联邦】:星际联邦已派企业号前往").getBytes("UTF-8"));
channel.write(responseBuffer);
}
}
}
}
}
public class StartService {
public static void main(String[] args) {
NIOService service = new NIOService();
try {
service.initService();
service.listenService();
} catch (IOException e) {
e.printStackTrace();
}
}
}
package cn.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class NIOClient {
private Selector selector;
public void initClient() throws IOException{
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
selector = Selector.open();
channel.connect(new InetSocketAddress("127.0.0.1", 12345));
channel.register(selector,SelectionKey.OP_CONNECT);
System.out.println("客户端启动成功");
}
public void listenClient() throws IOException{
while(true){
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isConnectable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
if (socketChannel.isConnectionPending()) {
socketChannel.finishConnect();
}
socketChannel.configureBlocking(false);
socketChannel.write(ByteBuffer.wrap(new String("hello word").getBytes("UTF-8")));
socketChannel.register(selector, SelectionKey.OP_READ);
}else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
socketChannel.read(byteBuffer);
byte[] date = byteBuffer.array();
String message = new String(date).trim();
System.out.println("【客户端收到的消息为】:"+message);
//ByteBuffer outBuffer = ByteBuffer.wrap(message.getBytes("utf-8"));
//socketChannel.write(outBuffer);// 将消息回送给客户端
}
}
}
}
}
package cn.nio;
import java.io.IOException;
public class StartClinet {
public static void main(String[] args) {
NIOClient client = new NIOClient();
try {
client.initClient();
client.listenClient();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
由于每次需要写这么多代码,于是一般使用第三方框架netty
netty注重点不在连接,而在我们的处理逻辑,逻辑一般写在handler
package day01.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class EchoService {
public void initService() {
NioEventLoopGroup nioEventLoopGroup = null;
try {
//server端引导类
ServerBootstrap serverBootstrap = new ServerBootstrap();
//连接池数据,netty自动弄的连接池
nioEventLoopGroup = new NioEventLoopGroup();
//装配bootstap
serverBootstrap.group(nioEventLoopGroup)
//通道类型为NioServerSocketChannel
.channel(NioServerSocketChannel.class)
//监听端口号
.localAddress(12345)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
//全是模板代码,主要是这里绑定handler
channel.pipeline().addLast(new EchoServiceHandler());
}
});
// 最后绑定服务器等待直到绑定完成,调用sync()方法会阻塞直到服务器完成绑定,然后服务器等待通道关闭,因为使用sync(),所以关闭操作也会被阻塞。
ChannelFuture channelFuture = serverBootstrap.bind().sync();
System.out.println("netty服务端启动成功");
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
nioEventLoopGroup.shutdownGracefully().sync();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) {
EchoService echoService = new EchoService();
echoService.initService();
}
}
package day01.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoServiceHandler extends ChannelInboundHandlerAdapter {
/**
* 服务端获取请求的时候被调用
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String content = new String(req,"UTF-8");
System.out.println("netty服务端接受到的消息是:"+content);
String rsp = "我是netty服务端,我已收到"+content;//响应内容
ByteBuf rspBuf = Unpooled.copiedBuffer(rsp.getBytes());
ctx.write(rspBuf);//响应给客户端
}
/**
* channelRead方法完成之后调用
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();//flush之后才是真的发送了
System.out.println("netty服务端响应成功");
}
/**
* 发生异常时候被调用
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
package day01.netty;
import java.net.InetSocketAddress;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class EchoClient {
private String ip;
private Integer port;
public EchoClient() {
super();
}
public EchoClient(String ip, Integer port) {
super();
this.ip = ip;
this.port = port;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
public void initClient() throws Exception {
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(ip, port)).handler(new ChannelInitializer<SocketChannel>() {//业务处理类
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new EchoClientHandler());//注册handler
}
});
ChannelFuture channelFuture = bootstrap.connect().sync();
channelFuture.channel().closeFuture().sync();
nioEventLoopGroup.shutdownGracefully().sync();
}
public static void main(String[] args) throws Exception {
EchoClient client = new EchoClient("127.0.0.1", 12345);
client.initClient();
}
}
package day01.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf>{
/**
* 客户端连接服务器后被调用
* 发送请求
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
byte[] req = "hello word".getBytes();//消息
ByteBuf buf = Unpooled.buffer(req.length);//把消息放入缓冲里面
buf.writeBytes(req);//发送消息
ctx.writeAndFlush(buf);//flush之后才是发送过去了
System.out.println("netty客户端发送请求成功hello word");
}
/**
* 服务端给客户端发消息时被调用
* 获得响应
*/
@Override
protected void channelRead0(ChannelHandlerContext chx, ByteBuf msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String rsp = new String(req, "UTF-8");
System.out.println("netty客户端获得响应,内容是:"+rsp);
}
/**
* 发生异常后被调用
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
还没有评论,来说两句吧...