netty学习之路一
netty学习之路一
- server端
- ServerHandler
- 客户端
- 客户端handler
客户端给服务端发送数据,服务端再将该数据返回给客户端
server端
public class Server {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(); // 负责处理来自客户端的链接
EventLoopGroup workerGroup = new NioEventLoopGroup();//负责处理任务
ServerBootstrap bootstrap = new ServerBootstrap();//配置类
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ServerHandler());//添加处理器
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(8888).sync();
future.channel().closeFuture().sync();//阻塞线程
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
ServerHandler
/**
* 收到数据时调用
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
InetSocketAddress address = (InetSocketAddress)ctx.channel().remoteAddress();
System.out.println(address.getPort());
System.out.println(address.getAddress());
//doing else something
ByteBuf buf = (ByteBuf) msg;
String result = convertByteBufToString(buf);
System.out.println("收到客户端的信息:"+result);
System.out.println("开始响应");
//给客户端响应完成之后,关闭该客户端连接
ctx.writeAndFlush(Unpooled.copiedBuffer(result.getBytes())).addListener(ChannelFutureListener.CLOSE);
System.out.println("响应结束");
}finally {
ReferenceCountUtil.release(msg);
}
}
/**
* 抛出异常时调用
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/**
* 将buf转为string
* @param buf
* @return
*/
public String convertByteBufToString(ByteBuf buf) {
String str;
if (buf.hasArray()) { // 处理堆缓冲区
str = new String(buf.array(), buf.arrayOffset() + buf.readerIndex(), buf.readableBytes());
} else { // 处理直接缓冲区以及复合缓冲区
byte[] bytes = new byte[buf.readableBytes()];
buf.getBytes(buf.readerIndex(), bytes);
str = new String(bytes, 0, buf.readableBytes());
}
return str;
}
客户端
public class Client {
public static void main(String[] args) throws Exception{
EventLoopGroup workerGroup = new NioEventLoopGroup();//负责处理任务
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ClientHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = bootstrap.connect("127.0.0.1",8888);
f.channel().writeAndFlush(Unpooled.copiedBuffer("中国".getBytes()));
f.channel().closeFuture().sync();
workerGroup.shutdownGracefully();
}
}
客户端handler
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ByteBuf buf = (ByteBuf)msg;
String response = convertByteBufToString(buf);
System.out.println("收到的响应为:"+response);
}finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
public String convertByteBufToString(ByteBuf buf) {
String str;
if (buf.hasArray()) { // 处理堆缓冲区
str = new String(buf.array(), buf.arrayOffset() + buf.readerIndex(), buf.readableBytes());
} else { // 处理直接缓冲区以及复合缓冲区
byte[] bytes = new byte[buf.readableBytes()];
buf.getBytes(buf.readerIndex(), bytes);
str = new String(bytes, 0, buf.readableBytes());
}
return str;
}
}
还没有评论,来说两句吧...