netty实战笔记 第二章 第一个Netty程序
2.1 编写Echo服务器
所有的Netty服务器都需要一下两个部分:
- 至少一个ChannelHandler
该组件实现了服务器对客户端接收的数据的处理,即他的业务逻辑。 - 引导
这是配置服务器的启动代码. 至少,他会将服务器绑定到它要监听连接请求的端口上.
2.1.1 ChannelHandler 业务处理逻辑
接口ChannelInboundHandler
用来响应入站事件的方法。使用的时候只需要继承ChannelInboundHandlerAdapter类就行了。它提供了ChannelInboundHandler的默认实现。
主要的方法:
- channelRead() 对于每个传入的消息都会调用
- channelReadComplete() 通知ChannelInboundHandler最后一次对channelRead()的调用时当前批量读取中的最后一个条消息。
exceptionCaught() 在读取操作期间,有异常抛出时会调用。
package com.moyang.echo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;/**
- nettydemo1 — 这个组件实现了服务器的业务逻辑,决定了连接创建后和接收到信息后该如何处理
- 这种使用 ChannelHandler 的方式体现了关注点分离的设计原则,并简化业务逻辑的迭代开发的要求
* - @author 墨阳
- @date 2018-11-09
*/
// 标识ChannelHandler的实例之间可以在Channel里面共享
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);
ByteBuf buf = (ByteBuf) msg;
System.out.println("server has received: " + buf.toString(CharsetUtil.UTF_8));
// 将接受到的消息发送给发送者,注意,这里并没有flush。
ctx.write(buf);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 冲刷所有待审消息到远程节点。并关闭通道
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 打印异常栈跟踪
cause.printStackTrace();
// 关闭Channel
ctx.close();
}
}
2.2.2 引导服务器
引导服务器主要涉及一下内容:
- 绑定到服务器上,进行监听,并接受传入连接请求的端口。
配置channel以将有关的入站消息通知给EchoServerHandler实例。
package com.moyang.echo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;import java.net.InetSocketAddress;
/**
- nettydemo1
- 1.监听和接收进来的连接请求
- 2.配置 Channel 来通知一个关于入站消息的 EchoServerHandler 实例
* - @author 墨阳
@date 2018-11-09
*/
public class EchoServer {private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws InterruptedException {
int port = 8000;
new EchoServer(port).start();
}
public void start() throws InterruptedException {
// 创建EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();
try {
// 创建ServerBootStrap
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
// 为NIO指定使用 NioServerSocketChannel 这种信道类型
.channel(NioServerSocketChannel.class)
// 使用指定的端口设置套接字地址
.localAddress(new InetSocketAddress(port))
// 添加EchoServerHandler到子Channel的ChannelPipeline
.childHandler(new ChannelInitializer<SocketChannel>() { //添加 EchoServerHandler 到 Channel 的 ChannelPipeline
//当一个新的连接被接受,一个新的子 Channel 将被创建, ChannelInitializer 会添加我们EchoServerHandler 的实例到 Channel 的 ChannelPipeline。
protected void initChannel(SocketChannel ch) throws Exception {
// EchoServerHandler被标注为@Shareable,所以我们可以总是使用同样的实例。
ch.pipeline().addLast(new EchoServerHandler());
}
});
// 异步的绑定服务;调用sync()方法阻塞等待直到绑定完成
ChannelFuture f = bootstrap.bind().sync(); // 8
System.out.println(EchoServer.class.getName() + " started and listen on " + f.channel().localAddress());
//获取channel的CloseFuture,调用sync()会导致阻塞当前线程直到它完成。
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
//关闭 EventLoopGroup,释放所有资源。包括所有创建的线程
group.shutdownGracefully().sync();
}
}
}
ChannelInitializer
,当一个新的连接被接受的时候,一个新的子Channel
将会被建立.ChannelInitializer
将会把一个你的EchoServerHandler
的实例添加到该Channel
的ChannelPipeline
中。这个ChannelHandler
就会收到有关入站消息的通知。
2.2 编写Echo客户端
Echo客户端会完成的工作:
- 连接到服务器
- 发送一个或者多个消息
- 对于每个消息,等待并接受从服务器发回来的消息
- 关闭连接
2.2.1 通过ChannelHandler实现业务逻辑
客户端将拥有一个用来处理数据库的ChannelInboundHandler
,客户端使用SimpleChannelInboundHandler类来完成任务。主要的方法有:
- channelActive() 在到服务器的连接已经建立之后被调用
- channelRead0() 当从服务器接到一条消息的时候被调用
exceptionCaught() 在处理的过程中已发异常的时候被调用
package com.moyang.echo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;/**
- nettydemo1
* - @author 墨阳
@date 2018-11-09
*/
// 标记该实例可以被多个Channel共享
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler{ /**
- 建立连接后该 channelActive() 方法被调用一次.
- 逻辑很简单:一旦建立了连接,字节序列被发送到服务器。
- 该消息的内容并不重要;在这里,我们使用了 Netty 编码字符串 “Netty rocks!”
- 通过覆盖这种方法,我们确保东西被尽快写入到服务器。
* - @param ctx
@throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer(“netty rocks!!! “, CharsetUtil.UTF_8));
}/**
- 这种方法会在接收到数据时被调用。
- 注意,由服务器所发送的消息可以以块的形式被接收。
- 即,当服务器发送 5 个字节是不是保证所有的 5 个字节会立刻收到 - 即使是只有 5 个字节,
- channelRead0() 方法可被调用两次,第一次用一个ByteBuf(Netty的字节容器)装载3个字节和第二次一个 ByteBuf 装载 2 个字节。
- 唯一要保证的是,该字节将按照它们发送的顺序分别被接收。 (注意,这是真实的,只有面向流的协议如TCP)。
* - @param ctx
- @param msg
@throws Exception
*/
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer(“Netty rocks!”, CharsetUtil.UTF_8));
System.out.println(“Client Received: “ + msg.toString(CharsetUtil.UTF_8));
}@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- nettydemo1
思考
为什么客户端使用的SimpleChannelInboundHandler而服务端使用的ChannelInboundHandler?
这个关系到:业务逻辑如何处理消息以及Netty如何管理资源.
在客户端,当channelRead0()方法完成时,你已经有了传入消息,并且已经处理完它了。当该方法返回时,SimpleChannelInboundHandler 负责释放指向保存该消息的ByteBuf 的内存引用。在EchoServerHandler 中,你仍然需要将传入消息回送给发送者,而write()操作是异步的,直到channelRead()方法返回后可能仍然没有完成(如代码清单2-1 所示)。为此,EchoServerHandler扩展了ChannelInboundHandlerAdapter,其在这个时间点上不会释放消息。消息在EchoServerHandler 的channelReadComplete()方法中,当writeAndFlush()方法被调用时被释放
2.4.2 引导客户端
package com.moyang.echo;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
/**
* nettydemo1
*
* @author 墨阳
* @date 2018-11-09
*/
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
// 创建BootStrap
Bootstrap bootstrap = new Bootstrap();
// 指定EventLoopGroup以处理客户端事件;
bootstrap.group(group)
.channel(NioSocketChannel.class)
// 设置服务器的InetSocketAddress
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
// 连接到远程节点,调用sync()阻塞直到连接完成
ChannelFuture f = bootstrap.connect().sync();
// 阻塞,直到Channel关闭
f.channel().closeFuture().sync();
} finally {
// 关闭线程池,释放所有资源
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws InterruptedException {
String host = "127.0.0.1";
int port = 8000;
new EchoClient(host, port).start();
}
}
注意:
可以在客户端和服务端使用不同的传输。
最后
如果你觉得写的还不错,就关注下公众号呗,关注后,有点小礼物回赠给你。
你可以获得5000+电子书,java,springCloud,adroid,python等各种视频教程,IT类经典书籍,各种软件的安装及破解教程。
希望一块学习,一块进步!
还没有评论,来说两句吧...