重学Netty(三)——Netty的概念及其简单的Echo服务器编写
Netty简介
刚开始学习一门技术的时候,不要先去看那些博客和视频,应该先去官网上通读一遍,然后有什么看不懂的再去找其他途径。下图为官网截图
据官网描述,Netty是一个异步的事件驱动型的网络应用框架,可以用来快速的开发高性能且可维护协议的服务器和客户端。
也可以从官网中看到它的特点:
- 从设计方面来看:不仅可以实现非阻塞,当然也可以实现阻塞socket通信,还有灵活可扩展的事件模型,还可以自定义线程模型等等
- 从使用方面来看:良好的Javadoc有使用案例,不需要一些额外的依赖
- 从性能方面来看:更高的吞吐量,更低的延迟,尽量减少内存拷贝,更小的资源消耗等
- 从安全方面来看:完整的SSL / TLS和StartTLS支持
- 从社区方面来看:社区比较活跃经常发布新版本,当然使用的地方也很多,因此常见问题会有相对成熟的解决方案
接下来可以再看一下它可以用来干什么,它的核心就是基于缓冲区零拷贝,通用的一些APO还有一些可扩展的事件模型。它不仅支持Socket,Datagram还支持Http Tunnel和Pipe数据传输服务,还有支持HTTP,SSL等常见的协议
可以看出来Netty是一个十分强大的网络框架!
从定义分析核心
Netty的定义就是说的是一个异步的事件驱动型网络框架,基于非阻塞的基础上开发的,因此我们就可以根据定义来推组件
- 非阻塞的:那就需要在Channel和Buffer的基础上
- 异步的:说明需要Future和方法的回调
- 事件驱动:需要事件和ChannelHandler(在Channel中对事件的处理)
- 事件分发机制:EventLoop
Channel和Buffer
Channel和Buffer已经很熟悉了,在nio中是必要的组件,Channel可以看做是一个数据传输的载体,但是需要注意的是数据并不是放在Channel里,而是Buffer里。
方法回调和Future
方法回调
用通俗的话来说,它还是一个方法调用,只不过这个方法调用不同于普通的方法调用,比如A方法调用了B方法,在B执行时某条件触发,A方法又会调用C方法。
在前面的网络程序设计中,对Server端的连接处理中,可以设置一个回调方法,如果isConnec()就打印一句话,当请求连接的时候这个回调方法就会打印出一句话,可以看做是自定义一个监听器,然后并在特定情况下进行处理。这个在ChannelHandler中会经常使用。
Future
Future学Java的都很熟悉了,它位于java.util.concurrent包下,它提供了一种可以在一种操作完成后对应用进行通知,可以从接口中的方法看出,很明显是对异步的一个支持,可以对任务进行取消,可以判断异步任务是否完成,可以去获取异步任务的执行结果。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
在Netty中并没有直接使用Future方法,而是对Future进行了扩展,因为Java中的Future只能手动进行检查是否完成,因此Netty提供了自己实现的ChannelFuture。因为如果有了监听器就不需要手动进行检查,可以由监听器判断和执行相应的回调。
public interface ChannelFuture extends Future<Void> {
Channel channel();
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> var1);
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... var1);
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> var1);
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... var1);
ChannelFuture sync() throws InterruptedException;
ChannelFuture syncUninterruptibly();
ChannelFuture await() throws InterruptedException;
ChannelFuture awaitUninterruptibly();
boolean isVoid();
}
事件和ChannelHandler
事件
对于事件,经过前面的NIO学习应该也不陌生了,因此可以将前面的一起串联起来,事件是什么?事件就是触发监听器的条件,触发后从而执行相应的回调逻辑,怎么控制回调逻辑?那就需要ChannelHandler来进行处理了。
因为Channel是一个双向通信的一个通道,因此它的事件也大致是基于请求/数据的进站/出站顺序而来的,比如以下这些事件
- 连接请求状态
- 连接的打开和断开
- 数据从Socket读取和写入Socket
- 用户自定义事件
- 错误处理事件
这些可以算得上是事件,当然事件还都是人定义的。
ChannelHandler
上面说到了事件,那么对事件的处理就得交给ChannelHandler了,每一个事件都要有相对于的一个ChannelHandler进行事件处理,一个数据的发送或者接收往往是触发了很多的事件,因此ChannelHandler会连成一条责任链,每个ChannelHandler只负责自己的事件,如下图所示
在Netty中也有很多封装好的ChannelHandler供开发人员使用,避免重复造轮子,提高开发效率。
EventLoop和EventLoopGroup
在NIO编程中需要将事件注册到Selector上去,然后手动的根据情况分派任务,而Netty将Selector这写操作抽象了出来,不需要手动的进行分配。它内部会为每一个Channel都绑定一个EventLoop,用来事件处理,并在检测到感兴趣的事件后将事件自动分派给ChannelHandler进行处理。
EventLoop它是EventLoopGroup的子接口,因此它继承了很多父接口的方法,如父类的注册方法等(由它的注册方法也可以看出一个EventLoop对应一个Channel,因为入参只能有一个Channel)
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
}
Echo服务器编写
服务端
package Netty.SimpleExample;
/*
* @Author Wrial
* @Date Created in 14:54 2020/4/13
* @Description
*/
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class EchoServer {
public static void main(String[] args) {
new EchoServer(8090).service();
}
private final ServerHandler serverHandler;
private final EventLoopGroup eventLoopGroup;
private final ServerBootstrap bootstrap;
private final int port;
public EchoServer(int port) {
this.port = port;
serverHandler = new ServerHandler();
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
}
public void service() {
try {
bootstrap.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)
.localAddress(port).childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(serverHandler);
}
});
ChannelFuture channelFuture = bootstrap.bind().sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
eventLoopGroup.shutdownGracefully().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package Netty.SimpleExample;
/*
* @Author Wrial
* @Date Created in 14:56 2020/4/13
* @Description 简单的处理器 因为Echo只需要对入站的消息进行处理
* 因此就可以使用一个简单的ChannelInboundHandlerAdapter进行处理
*/
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
@ChannelHandler.Sharable // 可以对此实例进行复用
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public boolean isSharable() {
return super.isSharable();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("接收到" +byteBuf.toString(CharsetUtil.UTF_8));
ctx.write(byteBuf);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("读完数据了,write and flush 并关闭Channel");
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.channel();
}
}
客户端
package Netty.SimpleExample;
/*
* @Author Wrial
* @Date Created in 14:54 2020/4/13
* @Description
*/
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class EchoClient {
private final String host;
private final int port;
private final SocketChannel socketChannel;
private final EventLoopGroup group;
private final Bootstrap bootstrap;
private final ClientHandler clientHandler;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
socketChannel = new NioSocketChannel();
group = new NioEventLoopGroup();
clientHandler = new ClientHandler();
bootstrap = new Bootstrap();
}
private void start() {
try {
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(host,port).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(clientHandler);
}
});
ChannelFuture future = bootstrap.connect().sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
group.shutdownGracefully().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
new EchoClient("127.0.0.1", 8090).start();
}
}
package Netty.SimpleExample;
/*
* @Author Wrial
* @Date Created in 15:50 2020/4/13
* @Description 此处使用SimpleChannelInboundHandler是根据资源分配角度来设定的
*/
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.util.CharsetUtil;
@ChannelHandler.Sharable
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
// 活跃的时候发一条消息
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("发给服务端 Netty Client");
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty Client ", CharsetUtil.UTF_8));
super.channelActive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
System.out.println("接收到服务端的响应:"+byteBuf.toString(CharsetUtil.UTF_8));
}
}
运行结果如下:
可以看得出Netty编写一个非阻塞的Echo服务器和客户端是十分简单的,这几行代码如果用NIO实现的话是需要上百行代码的!
还没有评论,来说两句吧...