基于Netty实现HttpClinet/HttpServer
基于Netty实现 HTTP Server 和 HTTP Client
【订阅专栏合集,作者所有付费文章都能看(持续更新)】
推荐【Kafka教程】https://bigbird.blog.csdn.net/article/details/108770504
推荐【rabbitmq教程】https://bigbird.blog.csdn.net/article/details/81436980
推荐【Flink教程】https://blog.csdn.net/hellozpc/article/details/109413465
推荐【JVM面试与调优教程】https://bigbird.blog.csdn.net/article/details/113888604
推荐【SpringBoot全套教程】https://blog.csdn.net/hellozpc/article/details/107095951
推荐【SpringCloud教程】https://blog.csdn.net/hellozpc/article/details/83692496
推荐【Mybatis教程】https://blog.csdn.net/hellozpc/article/details/80878563
推荐【SnowFlake教程】https://blog.csdn.net/hellozpc/article/details/108248227
推荐【并发限流教程】https://blog.csdn.net/hellozpc/article/details/107582771
推荐【Redis教程】https://bigbird.blog.csdn.net/article/details/81267030
推荐【Netty教程】https://blog.csdn.net/hellozpc/category_10945233.html
文章目录
- 基于Netty实现 HTTP Server 和 HTTP Client
- 简单HTTP Server/Client
- 服务端
- 客户端
- 进阶版HTTP Server/Client
- 服务端
- 客户端
- 通用HTTP Client
- 通用HTTP Server
- 小结
适合人群:对 Netty 感兴趣,需要进行实战训练的同学。实际开发中需要使用高性能的 HTTP 客户端工具请求服务端接口的研发人员。
GitChat订阅地址:《基于 Netty 实现 HTTP Server 和 HTTP Client》
HTTP服务器就是通常我们所说的 Web 服务器,比如 Tomcat 就是我们最常用 Web Server 之一。传统的web服务器有时也被称作web容器或者Servlet容器,因为其基本上都是基于Servlet规范的。通过之前对 Netty的学习,我们知道Netty支持各种各样的协议,当然也包括HTTP协议。因此使用 Netty 可以方便地开发一个和传统 Web 容器 Tomcat、Jetty 类似的 HTTP 服务器。同样的,也可以基于 Netty 开发一个通用的 HTTP Client,类似 Apache HTTP Client 或 Spring Resttemplate 的功能。本文就一步一步引导大家基于Netty实现一个HTTP Server 和 HTTP Client。关于Netty的基础知识,请读者先行补充,可以参见《循序渐进学Netty》。
简单HTTP Server/Client
我们首先从一个最简单的 HTTP 服务器开始,通过程序实例展示Netty程序如何配置启动,以及netty中的引导器(BootStrap)如何与核心组件产生联系。
开发步骤一般分为3步:
1)搭建 HTTP 服务器程序,配置相关参数并启动。
2)从浏览器或者其它终端发起 HTTP 请求。
3)客户端成功得到服务端的响应结果。
本文使用的netty版本如下:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.32.Final</version>
</dependency>
服务端
public class HttpServer1 {
public void start(int port) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new HttpRequestDecoder()) // HTTP解码
.addLast("aggregator", new HttpObjectAggregator(2 * 1024 * 1024))//HTTP消息聚合
.addLast("encoder", new HttpResponseEncoder())// HTTP编码
.addLast("compressor", new HttpContentCompressor())//HttpContent压缩
.addLast("handler", new HttpServerHandler1());
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true);
b.bind().sync();//同步阻塞,避免程序退出
System.out.println("netty http server started on port(" + port + ")");
}
public static void main(String[] args) throws InterruptedException {
new HttpServer1().start(8800);
}
}
熟悉netty编程流程的同学都知道,一般业务开发都是编写一个个的ChannelHandler,并将其加入到Pipeline中处理数据。上述代码中使用Netty内置的针对HTTP的处理器:HttpResponseEncoder、HttpResponseDecoder。业务处理器如下,只是简单的将数据回显。
public class HttpServerHandler1 extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
String content = String.format("Received http request, uri: %s, method: %s, content: %s%n",
msg.uri(), msg.method(), msg.content().toString(CharsetUtil.UTF_8));
//将服务器生成的内容回显到客户端
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
Unpooled.wrappedBuffer(content.getBytes()));
HttpHeaders heads = response.headers();
heads.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
heads.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ctx.writeAndFlush(response);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive 客户端地址:" + ctx.channel().remoteAddress());
super.channelActive(ctx);
}
}
客户端
public class HttpClient1 {
public void connect(String host, int port) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new HttpResponseDecoder());//HTTP编码
ch.pipeline().addLast(new HttpRequestEncoder());//HTTP解码
ch.pipeline().addLast(new HttpClientHandler1());//业务处理器
}
});
//建立长连接
ChannelFuture f = b.connect(host, port).sync();
System.out.println("netty http client connected on host(" + host + ") port(" + port + ")");
//发送http请求
URI uri = new URI("http://127.0.0.1:8800");
String content = "hello netty http Server!";
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
uri.toASCIIString(), Unpooled.wrappedBuffer(content.getBytes(StandardCharsets.UTF_8)));
request.headers().set(HttpHeaderNames.HOST, host);
request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());
f.channel().writeAndFlush(request);
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new HttpClient1().connect("127.0.0.1", 8800);
}
}
客户端业务处理器
public class HttpClientHandler1 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
ByteBuf buf = content.content();
System.out.println("收到服务端的消息:" + buf.toString(io.netty.util.CharsetUtil.UTF_8));
buf.release();
}
}
}
进阶版HTTP Server/Client
上述案例以最简化的代码清晰地展示了基于Netty的http客户端、服务端通信流程。下面给出进阶版的代码案例。
服务端
public class NettyHttpServer {
String host = "*";
int port = 8800;
int backlog = 128;
int maxContentLength = 1024 * 1024;
EventLoopGroup bossGroup;
EventLoopGroup workerGroup;
ServerBootstrap serverBootstrap;
AtomicBoolean stopFlag = new AtomicBoolean();
public NettyHttpServer() {
}
public NettyHttpServer(int port) {
this.port = port;
}
public void init() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("codec", new HttpServerCodec());// HTTP编解码
pipeline.addLast("decompressor", new HttpContentDecompressor());//HttpContent解压缩
pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength));//HTTP 消息聚合
pipeline.addLast("compressor", new HttpContentCompressor());//HttpContent压缩
pipeline.addLast("handler", new NettyHttpServerHandler());//自定义业务逻辑处理器
}
});
serverBootstrap.option(ChannelOption.SO_REUSEADDR, true);
serverBootstrap.option(ChannelOption.SO_BACKLOG, backlog);
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
}
public void start() {
InetSocketAddress addr = null;
if (host == null || "*".equals(host)) {
addr = new InetSocketAddress(port);
} else {
addr = new InetSocketAddress(host, port);
}
try {
serverBootstrap.bind(addr).sync();
System.out.println("netty http server started on host(" + addr.getHostName() + ") port(" + port + ")");
} catch (Exception e) {
System.out.println("netty http server bind exception, port=" + port);
System.exit(-1);
}
}
public void close() {
System.out.println("stopping netty server");
if (bossGroup != null) {
bossGroup.shutdownGracefully();
bossGroup = null;
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
workerGroup = null;
}
System.out.println("netty server stopped");
}
public void stop() {
stopFlag.set(true);
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public int getBacklog() {
return backlog;
}
public void setBacklog(int backlog) {
this.backlog = backlog;
}
public int getMaxContentLength() {
return maxContentLength;
}
public void setMaxContentLength(int maxContentLength) {
this.maxContentLength = maxContentLength;
}
}
业务处理器
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
System.out.println("NettyHttpServerHandler.channelRead0");
String content = String.format("NettyHttpServerHandler Received http request, uri: %s, method: %s, content: %s%n", msg.uri(), msg.method(), msg.content().toString(CharsetUtil.UTF_8));
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
Unpooled.wrappedBuffer(content.getBytes()));
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("NettyHttpServerHandler.channelRegistered");
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("NettyHttpServerHandler.channelUnregistered");
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("NettyHttpServerHandler.channelActive");
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("NettyHttpServerHandler.channelInactive");
super.channelInactive(ctx);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("NettyHttpServerHandler.channelReadComplete");
super.channelReadComplete(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("NettyHttpServerHandler.userEventTriggered");
super.userEventTriggered(ctx, evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("NettyHttpServerHandler.exceptionCaught:" + cause);
super.exceptionCaught(ctx, cause);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("NettyHttpServerHandler.handlerAdded");
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("NettyHttpServerHandler.handlerRemoved");
super.handlerRemoved(ctx);
}
}
启动类
public class MainAppServer {
public static void main(String[] args) throws InterruptedException {
NettyHttpServer nettyHttpServer = new NettyHttpServer();
nettyHttpServer.init();
nettyHttpServer.start();
Thread.sleep(1200000);
nettyHttpServer.close();
}
}
客户端
public class NettyHttpClient {
String host = "*";
int port = 8800;
int maxResponseContentLength = 1024 * 1024;
EventLoopGroup workerGroup;
Bootstrap bootstrap;
public void init() {
workerGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(workerGroup).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("codec", new HttpClientCodec());
pipeline.addLast("decompressor", new HttpContentDecompressor());
pipeline.addLast("aggregator", new HttpObjectAggregator(maxResponseContentLength));
pipeline.addLast("handler", new NettyHttpClientHandler());
}
});
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
}
public ChannelFuture connect() {
ChannelFuture channelFuture = null;
try {
channelFuture = bootstrap.connect(host, port).sync();
System.out.println("netty http client connected on host(" + host + ") port(" + port + ")");
} catch (Exception e) {
System.out.println("netty http client connect exception, host=" + host + ",port=" + port);
System.exit(-1);
}
return channelFuture;
}
public void close() {
System.out.println("stopping netty client");
if (workerGroup != null) {
workerGroup.shutdownGracefully();
workerGroup = null;
}
System.out.println("netty client stopped");
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
}
业务处理器
public class NettyHttpClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
ByteBuf buf = content.content();
System.out.println(buf.toString(CharsetUtil.UTF_8));
buf.release();
}
}
}
启动类
public class MainAppClient {
public static void main(String[] args) throws InterruptedException, URISyntaxException {
NettyHttpClient nettyHttpClient = new NettyHttpClient();
nettyHttpClient.setHost("127.0.0.1");
nettyHttpClient.setPort(8800);
nettyHttpClient.init();
ChannelFuture f = nettyHttpClient.connect();
if (f != null) {
URI uri = new URI("http://127.0.0.1:8800");
String content = "hello netty http Server!";
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, uri.toASCIIString(), Unpooled.wrappedBuffer(content.getBytes(StandardCharsets.UTF_8)));
request.headers().set(HttpHeaderNames.HOST, "127.0.0.1");
request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());
f.channel().write(request);//发送消息
f.channel().flush();
}
Thread.sleep(1200000);
nettyHttpClient.close();
}
}
进阶版的代码显得更加规范化,也可以传入一些配置参数。读者可以比较一下相对初始版的变化,领悟每个版本的改进之处。
通用HTTP Client
基于 Netty 开发的一个通用的 HTTP Client,类似 Apache HTTP Client 或 Spring Resttemplate 的功能。可以在实际开发中使用,用作一个发送http请求的客户端工具。
完整代码github地址:https://github.com/bigbirditedu/nettylearn
/** * 基于Netty实现的通用的HTTP客户端工具,可以在实际项目中使用 */@Sharablepublic class CommonHttpClient extends ChannelDuplexHandler implements HttpClient, InitClose { static Logger log = LoggerFactory.getLogger(CommonHttpClient.class); int maxResponseContentLength = 2 * 1024 * 1024; int workerThreads = 1; int minSizeToGzip = 2048; int keepAliveSeconds = 60; //保持的请求数 int keepAliveRequests = 100; //缓存的连接数 int keepAliveConnections = 1; //自定义线程池 NamedThreadFactory workThreadFactory = new NamedThreadFactory("netty_httpclient"); EventLoopGroup workerGroup; GZip gzip; SslContext sslCtx; Bootstrap bootstrap; Bootstrap sslBootstrap; //保存每一次的请求、响应信息 映射关系 ConcurrentHashMap<String, ReqResInfo> dataMap = new ConcurrentHashMap<>(); //保存每个目标地址的缓存连接数映射关系 ConcurrentHashMap<String, ArrayBlockingQueue<ChannelInfo>> channelMap = new ConcurrentHashMap<>(); public void init() { gzip = new GZip(); try { sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } catch (Exception e) { log.error("ssl context init failed"); } workerGroup = new NioEventLoopGroup(workerThreads, workThreadFactory); bootstrap = new Bootstrap(); bootstrap.group(workerGroup).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("codec", new HttpClientCodec()); pipeline.addLast("decompressor", new HttpContentDecompressor()); pipeline.addLast("aggregator", new HttpObjectAggregator(maxResponseContentLength)); pipeline.addLast("handler", CommonHttpClient.this); //CommonHttpClient本身就是一个ChannelHandler,同时支持出站、入站操作 } }); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); sslBootstrap = new Bootstrap(); sslBootstrap.group(workerGroup).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("ssl", sslCtx.newHandler(ch.alloc())); pipeline.addLast("codec", new HttpClientCodec()); pipeline.addLast("decompressor", new HttpContentDecompressor()); pipeline.addLast("aggregator", new HttpObjectAggregator(maxResponseContentLength)); pipeline.addLast("handler", CommonHttpClient.this); } }); sslBootstrap.option(ChannelOption.TCP_NODELAY, true); sslBootstrap.option(ChannelOption.SO_KEEPALIVE, true); log.info("netty http client started"); } public void close() { if (workerGroup == null) return; workerGroup.shutdownGracefully(); workerGroup = null; log.info("netty http client stopped"); } /** * 发起Http调用 * * @param req * @return */ public HttpClientRes call(HttpClientReq req) { HttpClientRes res = null; ConnectResult cr = null; String connId = null; try { cr = getConnection(req); if (cr.retCode != 0) return new HttpClientRes(cr.retCode); Channel channel = cr.channelInfo.channel; connId = getConnId(channel); ReqResInfo info = new ReqResInfo(req); dataMap.put(connId, info); channel.writeAndFlush(req); res = info.future.get(req.getTimeout(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { res = new HttpClientRes(RetCodes.HTTPCLIENT_INTERRUPTED); } catch (TimeoutException e) { res = new HttpClientRes(RetCodes.HTTPCLIENT_TIMEOUT); } catch (Exception e) { log.error("http client call exception", e); res = new HttpClientRes(RetCodes.HTTPCLIENT_CONNECTION_BROKEN); } if (connId != null) dataMap.remove(connId); if (cr != null) closeConnection(req, res, cr.channelInfo); return res; } String getKey(HttpClientReq req) { URL url = req.getUrlObj(); String schema = url.getProtocol(); String host = url.getHost(); int port = url.getPort(); if (port == -1) port = url.getDefaultPort(); return schema + "//" + host + ":" + port; } ConnectResult getConnection(HttpClientReq req) throws InterruptedException { if (!req.isKeepAlive()) { return getConnectionNoCache(req); } String key = getKey(req); ArrayBlockingQueue<ChannelInfo> queue = channelMap.get(key); if (queue == null) { return getConnectionNoCache(req); } ChannelInfo channelInfo = queue.poll(); if (channelInfo == null || !channelInfo.channel.isActive()) { return getConnectionNoCache(req); } if (keepAliveSeconds > 0) { if (System.currentTimeMillis() - channelInfo.createTime >= keepAliveSeconds * 1000) { channelInfo.channel.close(); return getConnectionNoCache(req); } } if (keepAliveRequests > 0) { if (channelInfo.count >= keepAliveRequests) { channelInfo.channel.close(); return getConnectionNoCache(req); } channelInfo.count++; } return new ConnectResult(channelInfo); } ConnectResult getConnectionNoCache(HttpClientReq req) throws InterruptedException { URL url = req.getUrlObj(); if (url == null) return new ConnectResult(RetCodes.HTTPCLIENT_URL_PARSE_ERROR); String schema = url.getProtocol(); String host = url.getHost(); int port = url.getPort(); if (port == -1) port = url.getDefaultPort(); ChannelFuture future = schema.equals("http") ? bootstrap.connect(host, port) : sslBootstrap.connect(host, port); Channel channel = future.channel(); boolean done = future.await(req.getTimeout(), TimeUnit.MILLISECONDS); if (!done) { channel.close(); return new ConnectResult(RetCodes.HTTPCLIENT_TIMEOUT); } if (channel.isActive()) return new ConnectResult(new ChannelInfo(channel)); else return new ConnectResult(RetCodes.HTTPCLIENT_NO_CONNECT); } void closeConnection(HttpClientReq req, HttpClientRes res, ChannelInfo channelInfo) { if (!req.isKeepAlive() || res.getRetCode() != 0) { channelInfo.channel.close(); return; } String key = getKey(req); ArrayBlockingQueue<ChannelInfo> queue = channelMap.get(key); if (queue == null) { queue = new ArrayBlockingQueue<ChannelInfo>(keepAliveConnections); ArrayBlockingQueue<ChannelInfo> old = channelMap.putIfAbsent(key, queue); if (old != null) queue = old; } boolean ok = queue.offer(channelInfo); if (!ok) { channelInfo.channel.close(); return; } } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (msg instanceof HttpClientReq) { HttpClientReq data = (HttpClientReq) msg; DefaultFullHttpRequest req = convertReq(data, ctx); ctx.writeAndFlush(req, promise); } else { super.write(ctx, msg, promise); } } DefaultFullHttpRequest convertReq(HttpClientReq data, ChannelHandlerContext ctx) { URL url = data.getUrlObj(); DefaultFullHttpRequest req = null; if (data.getContent() != null && !data.getContent().isEmpty()) { ByteBuf bb = null; boolean allowGzip = data.isGzip() && data.getContent().length() >= minSizeToGzip; if (!allowGzip) { bb = ByteBufUtil.writeUtf8(ctx.alloc(), data.getContent()); } else { bb = ctx.alloc().buffer(); try { String charset = parseCharSet(data.getContentType()); byte[] bytes = data.getContent().getBytes(charset); gzip.zip(bytes, bb); } catch (Exception e) { ReferenceCountUtil.release(bb); throw new RuntimeException("encode request exception", e); } } req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(data.getMethod()), data.getPathQuery(), bb); req.headers().set(HttpHeaderNames.CONTENT_TYPE, data.getContentType()); req.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, bb.readableBytes()); if (allowGzip) { req.headers().set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP); } } else { req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(data.getMethod()), data.getPathQuery()); } req.headers().set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP); if (data.getHeaders() != null) { for (Map.Entry<String, String> entry : data.getHeaders().entrySet()) { req.headers().set(entry.getKey(), entry.getValue()); } } String host = url.getHost(); int port = url.getPort(); if (port != -1) host += ":" + port; req.headers().set(HttpHeaderNames.HOST, host); HttpUtil.setKeepAlive(req, data.isKeepAlive()); req.headers().set(HttpHeaderNames.USER_AGENT, "common httpclient 1.0"); return req; } @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) { String connId = getConnId(ctx.channel()); ReqResInfo info = dataMap.get(connId); try { FullHttpResponse httpRes = (FullHttpResponse) msg; if (!httpRes.decoderResult().isSuccess()) { if (info != null) info.setRes(new HttpClientRes(RetCodes.HTTPCLIENT_RES_PARSE_ERROR)); return; } HttpClientRes res = convertRes(httpRes); if (info != null) info.setRes(res); } finally { ReferenceCountUtil.release(msg); } } HttpClientRes convertRes(FullHttpResponse data) { HttpClientRes res = new HttpClientRes(0); res.setHttpCode(data.status().code()); res.setHeaders(data.headers()); String contentType = parseContentType(data.headers().get(HttpHeaderNames.CONTENT_TYPE)); if (contentType != null) res.setContentType(contentType); ByteBuf bb = data.content(); if (bb != null) { if (contentType != null && contentType.equals("application/octet-stream")) { byte[] buff = new byte[bb.readableBytes()]; bb.readBytes(buff); res.setRawContent(buff); } else { String content = bb.toString(Charset.forName("utf-8")); res.setContent(content); } } // System.out.println(HttpUtil.isKeepAlive(data)); return res; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String connId = getConnId(ctx.channel()); log.debug("http connection started, connId={}", connId); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { String connId = getConnId(ctx.channel()); log.debug("http connection ended, connId={}", connId); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { String connId = getConnId(ctx.channel()); log.error("http connection exception, connId=" + connId + ",msg=" + cause.toString(), cause); ctx.close(); } String getConnId(Channel ch) { SocketAddress addr = ch.remoteAddress(); return parseIpPort(addr.toString()) + ":" + ch.id().asShortText(); } String parseIpPort(String s) { int p = s.indexOf("/"); if (p >= 0) return s.substring(p + 1); else return s; } String parseContentType(String contentType) { if (contentType == null) return null; int p = contentType.indexOf(";"); if (p >= 0) return contentType.substring(0, p).trim(); return contentType; } String parseCharSet(String contentType) { if (contentType == null) return null; int p = contentType.indexOf(";"); if (p >= 0) return contentType.substring(p + 1).trim(); return "utf-8"; } public int getWorkerThreads() { return workerThreads; } public void setWorkerThreads(int workerThreads) { this.workerThreads = workerThreads; } public int getKeepAliveSeconds() { return keepAliveSeconds; } public void setKeepAliveSeconds(int keepAliveSeconds) { this.keepAliveSeconds = keepAliveSeconds; } public int getKeepAliveConnections() { return keepAliveConnections; } public void setKeepAliveConnections(int keepAliveConnections) { this.keepAliveConnections = keepAliveConnections; } public int getKeepAliveRequests() { return keepAliveRequests; } public void setKeepAliveRequests(int keepAliveRequests) { this.keepAliveRequests = keepAliveRequests; } public int getMaxResponseContentLength() { return maxResponseContentLength; } public void setMaxResponseContentLength(int maxResponseContentLength) { this.maxResponseContentLength = maxResponseContentLength; } public int getMinSizeToGzip() { return minSizeToGzip; } public void setMinSizeToGzip(int minSizeToGzip) { this.minSizeToGzip = minSizeToGzip; } static class ReqResInfo { HttpClientReq req; HttpClientRes res; CompletableFuture<HttpClientRes> future; ReqResInfo(HttpClientReq req) { this.req = req; this.future = new CompletableFuture<HttpClientRes>(); } void setRes(HttpClientRes res) { this.res = res; future.complete(res); } } static class ChannelInfo { Channel channel = null; long createTime = System.currentTimeMillis(); int count = 1; ChannelInfo(Channel channel) { this.channel = channel; } } static class ConnectResult { int retCode = 0; ChannelInfo channelInfo = null; ConnectResult(int retCode) { this.retCode = retCode; } ConnectResult(ChannelInfo channelInfo) { this.channelInfo = channelInfo; } }}
测试类
public class TestHttpClient { public static void main(String[] args) { CommonHttpClient commonHttpClient = new CommonHttpClient(); commonHttpClient.init(); HttpClientReq httpClientReq = new HttpClientReq("POST", "http://127.0.0.1:8080/userService/queryUserInfo"); httpClientReq.setContent("{\"uid\":\"121\"}"); httpClientReq.setContentType("application/json"); HttpClientRes res = commonHttpClient.call(httpClientReq); System.out.println(res.getContent()); commonHttpClient.close(); }}
其它代码由于篇幅限制,不直接贴出来了。可以参考:https://github.com/bigbirditedu/nettylearn
我们另起一个SpringBoot工程,充当服务方。使用上述httpClient调用服务方的接口。
先运行下面的SpringBoot工程,再运行上述HttpClient测试类。
UserController
@RestController@RequestMapping("userService")public class UserController { @RequestMapping("/queryUserInfo") public QueryUserInfoRes queryUserInfo(@RequestBody QueryUserInfoReq req) { String uid = req.getUid(); //模拟查询数据库返回数据 User user = new User(); user.setUid(uid); user.setAge(22); user.setAddress("南京市栖霞区"); user.setName("吊哥"); user.setSex(1); QueryUserInfoRes res = new QueryUserInfoRes(); res.setUser(user); res.setRetCode(0); res.setRetMsg("成功"); return res; }}
QueryUserInfoReq
public class QueryUserInfoReq { private String uid; public String getUid() { return uid; } public void setUid(String uid) { this.uid = uid; }}
QueryUserInfoRes
public class QueryUserInfoRes { private User user; private String retMsg; private int retCode; public User getUser() { return user; } public void setUser(User user) { this.user = user; } public String getRetMsg() { return retMsg; } public void setRetMsg(String retMsg) { this.retMsg = retMsg; } public int getRetCode() { return retCode; } public void setRetCode(int retCode) { this.retCode = retCode; }}
public class User { private String name; private String uid; private int age; private int sex; private String address; //省略getter/setter方法}
httpclient调用输出:
{"user":{"name":"吊哥","uid":"121","age":22,"sex":1,"address":"南京市栖霞区"},"retMsg":"成功","retCode":0}
通用HTTP Server
由于Netty 内置了支持Http协议的处理器(各种编解码器)。因此基于Netty 可以方便地开发一个类似传统 Web服务器Tomcat、Jetty 的HTTP 服务器。本文给出一个样例代码,有更好的实现欢迎留言。完整代码请参考:https://github.com/bigbirditedu/nettylearn
@ChannelHandler.Sharablepublic class MyNettyHttpServer extends ChannelDuplexHandler implements InitClose, StartStop { static Logger log = LoggerFactory.getLogger(MyNettyHttpServer.class); int port = 8888; String host = "*"; int idleSeconds = 30; int maxConnections = 100000; int workerThreads = 0; int backlog = 128; boolean nativeNetty = false; int maxInitialLineLength = 4096; int maxHeaderSize = 8192; int maxChunkSize = 8192; int maxContentLength = 2 * 1024 * 1024; NamedThreadFactory bossThreadFactory = new NamedThreadFactory("netty_webserver_boss"); NamedThreadFactory workThreadFactory = new NamedThreadFactory("netty_webserver_worker"); EventLoopGroup bossGroup; EventLoopGroup workerGroup; Channel serverChannel; ConcurrentHashMap<String, Channel> conns = new ConcurrentHashMap<String, Channel>(); AtomicBoolean stopFlag = new AtomicBoolean(); ServerBootstrap serverBootstrap; public MyNettyHttpServer() { } public MyNettyHttpServer(int port) { this.port = port; } @Override public void init() { int processors = Runtime.getRuntime().availableProcessors(); if (workerThreads == 0) { workerThreads = processors; } String osName = System.getProperty("os.name"); if (nativeNetty && osName != null && osName.toLowerCase().indexOf("linux") >= 0) { nativeNetty = true; } else { nativeNetty = false; } if (nativeNetty) { bossGroup = new EpollEventLoopGroup(1, bossThreadFactory); workerGroup = new EpollEventLoopGroup(workerThreads, workThreadFactory); } else { bossGroup = new NioEventLoopGroup(1, bossThreadFactory); workerGroup = new NioEventLoopGroup(workerThreads, workThreadFactory); } serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup).channel(nativeNetty ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("timeout", new IdleStateHandler(0, 0, idleSeconds)); pipeline.addLast("codec", new HttpServerCodec(maxInitialLineLength, maxHeaderSize, maxChunkSize)); pipeline.addLast("decompressor", new HttpContentDecompressor()); pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength)); pipeline.addLast("compressor", new HttpContentCompressor()); pipeline.addLast("handler", MyNettyHttpServer.this); } }); serverBootstrap.option(ChannelOption.SO_REUSEADDR, true); serverBootstrap.option(ChannelOption.SO_BACKLOG, backlog); serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); } @Override public void start() { InetSocketAddress addr; if (host == null || "*".equals(host)) { addr = new InetSocketAddress(port); // "0.0.0.0" } else { addr = new InetSocketAddress(host, port); } try { serverChannel = serverBootstrap.bind(addr).syncUninterruptibly().channel(); log.info("netty http server started on host(" + addr.getHostString() + ") port(" + port + ")"); } catch (Exception e) { log.error("netty http server bind exception, port=" + port); System.exit(-1); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { if (stopFlag.get()) { ctx.close(); return; } String connId = getConnId(ctx); if (conns.size() >= maxConnections) { ctx.close(); log.error("connection started, connId={}, but not allowed,server max connections exceeded.", connId); return; } log.info("connection started, connId={}", connId); conns.put(connId, ctx.channel()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { String connId = getConnId(ctx); conns.remove(connId); log.info("connection ended, connId={}", connId); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; if (e.state() == IdleState.ALL_IDLE) { String connId = getConnId(ctx); ctx.close(); log.error("connection timeout, ctx closed,connId={}", connId); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { String connId = getConnId(ctx); log.error("connection exception, connId=" + connId + ",msg=" + cause.toString(), cause); ctx.close(); } @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) { try { //由于使用了HttpObjectAggregator聚合处理器,这里的msg类型必然是FullHttpRequest FullHttpRequest fullHttpRequest = (FullHttpRequest) msg; if (!fullHttpRequest.decoderResult().isSuccess()) { sendError(ctx, HttpResponseStatus.BAD_REQUEST, RetCodes.DECODE_REQ_ERROR); return; } if (fullHttpRequest.method() != HttpMethod.GET && fullHttpRequest.method() != HttpMethod.POST && fullHttpRequest.method() != HttpMethod.PUT && fullHttpRequest.method() != HttpMethod.DELETE && fullHttpRequest.method() != HttpMethod.HEAD && fullHttpRequest.method() != HttpMethod.OPTIONS) { sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED, RetCodes.HTTP_METHOD_NOT_ALLOWED); return; } if (fullHttpRequest.method() == HttpMethod.OPTIONS) { String origin = fullHttpRequest.headers().get(HttpHeaderNames.ORIGIN); String requestMethod = fullHttpRequest.headers().get(HttpHeaderNames.ACCESS_CONTROL_REQUEST_METHOD); if (origin == null || requestMethod == null) { sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED, RetCodes.HTTP_METHOD_NOT_ALLOWED); return; } } //为了方便地处理http请求响应,封装了WebReq、WebRes对象 DefaultWebReq req = convertReq(fullHttpRequest); DefaultWebRes res = process(req); ctx.channel().writeAndFlush(res); } finally { //如果不在pipeline中传递ChannelHandler了,则此处手动释放 ReferenceCountUtil.release(msg); } } /** * 处理Http请求,通常是调用服务端某个业务接口,并返回响应 * 这里只是简单演示使用反射方式调用服务端接口方法,并返回json数据,未做复杂的接口路由 * * @param req * @return */ private DefaultWebRes process(DefaultWebReq req) { String path = req.getPath(); int i = path.lastIndexOf("/"); String methodName = path.substring(i + 1); String reqContent = req.getContent(); DefaultWebRes webRes = null; try { webRes = new DefaultWebRes(req, 200); webRes.setContentType("application/json"); Method method = OrderService.class.getDeclaredMethod(methodName, QueryOrderInfoReq.class); method.setAccessible(true); Class<?>[] parameterTypes = method.getParameterTypes(); Class<?> parameterType = parameterTypes[0]; Object param = JSON.parseObject(reqContent, parameterType); Object resContent = method.invoke(new OrderService(), param); webRes.setContent(JSON.toJSONString(resContent)); } catch (NoSuchMethodException noSuchMethodException) { String s = String.format(WebConstants.ContentFormat, -627, RetCodes.retCodeText(-627)); webRes.setContent(s); } catch (Exception e) { log.error("http call exception", e); webRes = new DefaultWebRes(req, 500); } return webRes; } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { //这里可以自定义数据处理逻辑,如果已经是FullHttpMessage类型,比如sendError(),则直接透传到pipeline中的出站处理器中 if (msg instanceof DefaultWebRes) { DefaultWebRes data = (DefaultWebRes) msg; writeHttpResponse(ctx, promise, data); } else { super.write(ctx, msg, promise); } } private void writeHttpResponse(ChannelHandlerContext ctx, ChannelPromise promise, DefaultWebRes data) { DefaultFullHttpResponse res = null; if (data.getContent() != null && !data.getContent().isEmpty()) { int size = ByteBufUtil.utf8MaxBytes(data.getContent()); ByteBuf bb = ctx.alloc().buffer(size); bb.writeCharSequence(data.getContent(), Charset.forName(data.getCharSet())); int len = bb.readableBytes(); if (data.isHeadMethod()) { res = new DefaultFullHttpResponse(data.getVersion(), HttpResponseStatus.valueOf(data.getHttpCode())); ReferenceCountUtil.release(bb); } else { res = new DefaultFullHttpResponse(data.getVersion(), HttpResponseStatus.valueOf(data.getHttpCode()), bb); } res.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, len); } else { res = new DefaultFullHttpResponse(data.getVersion(), HttpResponseStatus.valueOf(data.getHttpCode())); res.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, 0); // 302 不返回长度会导致客户端卡住 } res.headers().set(HttpHeaderNames.SERVER, WebConstants.Server); if (stopFlag.get()) res.headers().set(WebConstants.ShutdownFlag, "1"); if (data.isKeepAlive()) { res.headers().set(HttpHeaderNames.CONNECTION, "keep-alive"); } if (data.getHeaders() != null) { for (String key : data.getHeaders().names()) { res.headers().set(key, data.getHeaders().get(key)); } } if (data.getCookies() != null) { for (io.netty.handler.codec.http.cookie.Cookie c : data.getCookies()) { String s = ServerCookieEncoder.STRICT.encode(c); res.headers().add(HttpHeaderNames.SET_COOKIE, s); } } ChannelFuture future = ctx.writeAndFlush(res, promise); if (!data.isKeepAlive()) { //如果没有KeepAlive则在数据传输完成后关闭通道 future.addListener(ChannelFutureListener.CLOSE); } } DefaultWebReq convertReq(FullHttpRequest fullHttpRequest) { DefaultWebReq req = new DefaultWebReq(); req.setVersion(fullHttpRequest.protocolVersion()); req.setMethod(fullHttpRequest.method()); req.setKeepAlive(HttpUtil.isKeepAlive(fullHttpRequest)); String uri = fullHttpRequest.uri(); int p1 = findPathEndIndex(uri); String path = p1 >= 0 ? uri.substring(0, p1) : uri; path = WebUtils.decodeUrl(path); int p2 = uri.indexOf('?'); String queryString = p2 >= 0 ? uri.substring(p2 + 1) : ""; req.setPath(path); req.setQueryString(queryString); req.setHeaders(fullHttpRequest.headers()); ByteBuf bb = fullHttpRequest.content(); if (bb != null) { String cs = req.getCharSet(); String content = bb.toString(Charset.forName(cs)); req.setContent(content); } return req; } @Override public void stop() { stopFlag.set(true); } @Override public void close() { if (workerGroup != null) { log.info("stopping netty http server"); bossGroup.shutdownGracefully(); bossGroup = null; ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); allChannels.add(serverChannel); for (Channel ch : conns.values()) { allChannels.add(ch); } ChannelGroupFuture future = allChannels.close(); future.awaitUninterruptibly(); workerGroup.shutdownGracefully(); workerGroup = null; log.info("netty http server stopped"); } } Channel getChannel(String connId) { return conns.get(connId); } String getConnId(ChannelHandlerContext ctx) { Channel ch = ctx.channel(); return parseIpPort(ch.remoteAddress().toString()) + ":" + ch.id().asShortText(); } public String getAddr(String connId) { int p = connId.lastIndexOf(":"); return connId.substring(0, p); } String parseIp(String connId) { int p = connId.lastIndexOf(":"); if (p < 0) return connId; int p2 = connId.lastIndexOf(":", p - 1); if (p2 < 0) return connId; return connId.substring(0, p2); } String parseIpPort(String s) { int p = s.indexOf("/"); if (p >= 0) return s.substring(p + 1); else return s; } int findPathEndIndex(String uri) { int len = uri.length(); for (int i = 0; i < len; i++) { char c = uri.charAt(i); if (c == '?' || c == '#') { return i; } } return -1; } void sendError(ChannelHandlerContext ctx, HttpResponseStatus status, int retCode) { ByteBuf bb = ctx.alloc().buffer(32); String s = String.format(WebConstants.ContentFormat, retCode, RetCodes.retCodeText(retCode)); bb.writeCharSequence(s, CharsetUtil.UTF_8); int len = bb.readableBytes(); DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, bb); response.headers().set(HttpHeaderNames.CONTENT_TYPE, WebConstants.MIMETYPE_JSON); response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, len); ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); }}
业务接口
这里服务器端只是简单提供了一个queryOrderInfo的业务接口。
服务器启动后,客户端访问http://127.0.0.1:8899/orderService/queryOrderInfo即可测试。
public class OrderService { public QueryOrderInfoRes queryOrderInfo(QueryOrderInfoReq req) { QueryOrderInfoRes res = new QueryOrderInfoRes(); Order order = new Order(); order.setOrderId(req.getOrderId()); order.setOrderAmount(19999); order.setProductId(UUID.randomUUID().toString()); order.setUserId(UUID.randomUUID().toString().replace("-", "").toUpperCase()); res.setOrder(order); res.setRetMsg("success"); return res; }}
启动测试
public class MyNettyHttpServerTest { public static void main(String[] args) throws InterruptedException { MyNettyHttpServer nettyHttpServer = new MyNettyHttpServer(8899); nettyHttpServer.init(); nettyHttpServer.start(); Thread.sleep(5 * 60 * 1000); nettyHttpServer.stop(); nettyHttpServer.close(); }}
无论使用postman还是我们自己基于Netty编写的httpclient都可以访问http://127.0.0.1:8899/orderService/queryOrderInfo请求接口。服务端/客户端数据交互格式使用post/json。
小结
要基于Netty完整地实现一个高性能、功能完备、健壮易用的 HTTP 服务器非常复杂,相当于要实现Tomcat以及SpringMVC的部分功能。本文仅为了方便理解 Netty网络应用开发的基本过程,所以在案例中只是实现最基本的http请求-响应的流程。如果读者发现有更好的案例或者学习资源也欢迎留言哦。本文代码地址:https://github.com/bigbirditedu/nettylearn
持续关注【程猿薇茑】,后续获取更多实战教程。
还没有评论,来说两句吧...