netty如何实现优化百万并发http请求
问题引入:如果项目存在数量大的并发,如何优化项目性能
- 首先判断项目性能瓶颈是什么, CPU,还是I/O请求
大部分是网络I/o瓶颈,主要是因为网络请求是阻塞,导致太慢
Netty 是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。
JDK 原生 NIO 程序的问题
JDK 原生也有一套网络应用程序 API,但是存在一系列问题,主要如下:
NIO 的类库和 API 繁杂,使用麻烦。你需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。
需要具备其他的额外技能做铺垫。例如熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的 NIO 程序。
可靠性能力补齐,开发工作量和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等。
NIO 编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大。
JDK NIO 的 Bug。例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。
官方声称在 JDK 1.6 版本的 update 18 修复了该问题,但是直到 JDK 1.7 版本该问题仍旧存在,只不过该 Bug 发生概率降低了一些而已,它并没有被根本解决。
Netty 的特点
Netty 对 JDK 自带的 NIO 的 API 进行封装,解决上述问题,主要特点有:
设计优雅,适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池;真正的无连接数据报套接字支持(自 3.1 起)。
使用方便,详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。
高性能,吞吐量更高,延迟更低;减少资源消耗;最小化不必要的内存复制。
安全,完整的 SSL/TLS 和 StartTLS 支持。
社区活跃,不断更新,社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入。
Netty 常见使用场景
Netty 常见的使用场景如下:
互联网行业。在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。
典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。
游戏行业。无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。
非常方便定制和开发私有协议栈,账号登录服务器,地图服务器之间可以方便的通过 Netty 进行高性能的通信。
大数据领域。经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现。
有兴趣的读者可以了解一下目前有哪些开源项目使用了 Netty:Related Projects。
Netty 高性能设计
Netty 作为异步事件驱动的网络,高性能之处主要来自于其 I/O 模型和线程处理模型,前者决定如何收发数据,后者决定如何处理数据。
netty实现http客户端解决网络I/O瓶颈
@Component
@Order(2)
public class HttpClient {
public static void start() {
EventLoopGroup group = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel)
throws Exception {
/*//包含编码器和解码器 channel.pipeline().addLast(new HttpClientCodec()); //聚合 channel.pipeline().addLast(new HttpObjectAggregator(1024 * 10 * 1024)); //解压 channel.pipeline().addLast(new HttpContentDecompressor());*/
//请求编码
channel.pipeline().addLast(new HttpRequestEncoder());
//响应解码
channel.pipeline().addLast(new HttpResponseDecoder());
channel.pipeline().addLast(new HttpClientHandler());
}
});
SimpleClientHttpRequestFactory httpRequestFactory = new SimpleClientHttpRequestFactory();
SocketAddress address = new InetSocketAddress("net-proxy-pub-test.sftcwl.com", 1080);
SocketAddress address1 = new InetSocketAddress("gis.sf-express.com", 8080);
Proxy proxy = new Proxy(Proxy.Type.HTTP, address);
httpRequestFactory.setProxy(proxy);
ChannelFuture future = bootstrap.connect("localhost",8162).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
start();
}
public class HttpClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String CLOSE_FREE_CHANNEL = "closeFreeChannel";
/** * 关闭channel的原因,为closeFreeChannel时表示原因为关闭空闲socket连接 */
public static final String CLOSE_CHANNEL_CAUSE = "closeChannelCause";
private static Set<String> channelIDSet = new CopyOnWriteArraySet<>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
URI uri = new URI("/abc/project/list");
Map<String, Object> param = new HashMap<>(2);
param.put("address", "雁塔区");
param.put("ak", "58008e825a944a45909e056be12f511f");
String prefix = "?";
StringBuilder urlBuilder = new StringBuilder();
for (String key : param.keySet()) {
urlBuilder.append(prefix).append(key).append("=").append(param.get(key));
prefix = "&";
}
LOGGER.info(urlBuilder.toString());
//配置HttpRequest的请求数据和一些配置信息
FullHttpRequest request = new DefaultFullHttpRequest(
HTTP_1_1,
HttpMethod.GET,
uri.toASCIIString()
);
LOGGER.info(request.toString());
request.headers()
.set(HttpHeaderNames.HOST, "127.0.0.1")
.set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
//开启长连接
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
//设置传递请求内容的长度
.set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());
ctx.writeAndFlush(request);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("msg -> " + msg);
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
ByteBuf buf = content.content();
String result = buf.toString(CharsetUtil.UTF_8);
System.out.println("response -> " + result);
LOGGER.info(result);
}
}
/** * 通道失活事件,用于断线重连 * * @param ctx */
@Override
public void channelInactive(final ChannelHandlerContext ctx) {
LOGGER.info("Channel为 {} 断开,Disconnected Server:{}", ctx.channel().id(), ctx.channel().remoteAddress());
EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(new Runnable() {
@Override
public void run() {
SocketChannel channel = (SocketChannel) ctx.channel();
int port = channel.remoteAddress().getPort();
//获取channel中存储的关闭连接的原因,如果是关闭空闲连接,则不再重连
String closeChannelCause = channel.attr(AttributeKey.<String>valueOf(CLOSE_CHANNEL_CAUSE)).get();
if (CLOSE_FREE_CHANNEL.equals(closeChannelCause)) {
LOGGER.info("检测到Channel为 {} 的空闲连接,已经断开.", channel.id());
return;
}
LOGGER.info("检测到与服务端断开连接,NettyClient开始重连{}:{}", channel.remoteAddress().getHostName(), port);
}
}, 3, TimeUnit.SECONDS);
ctx.fireChannelInactive();
}
/** * 监听idle事件,用于监听心跳 * * @param ctx * @param evt * @throws Exception */
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
Channel channel = ctx.channel();
if (evt instanceof IdleStateEvent) {
if (channel.isActive()) {
LOGGER.info("[客户端心跳监测] 通道编号: {} ,客户端信息: {}", channel.id(), channel.localAddress());
channelIDSet.add(channel.id().toString());
LOGGER.info("[客户端心跳监测] 关闭当前空闲,通道编号: {} ,客户端信息: {}", channel.id(), channel.localAddress());
return;
}
}
}
/** * 异常处理 * * @param ctx * @param cause * @throws Exception */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOGGER.error("Socket通信过程中出现异常,异常原因为:", cause);
super.exceptionCaught(ctx, cause);
}
}
还没有评论,来说两句吧...