Netty进阶之路--客户端池化

妖狐艹你老母 2022-04-25 02:18 549阅读 0赞

Netty进阶之路—客户端池化

  • 前言
  • Netty线程池
  • Netty客户端处理类
  • 效果

前言

目前的RPC框架中消费者端是调用一次服务就会进行一些netty的初始化,绑定端口等操作。那发起100次的话 就是进行100次这样的连接,消耗的资源太多了。所以希望客户端池化。

Netty线程池

之前自己是初步接触Netty,用的还是5.x版本。再查阅了资料发现Netty自带了线程池很开心,但是自己试了半天好像没有这个工具类,才发现好像4.x版本是有的,一查发现5.x版本好像弃用了,我的天只好降版本了。(不知道为什么5.x版本弃用了)

废话不多说直接贴代码(把客户端改造了一下 可以看我之前写的 自己设计一个的轻量级的RPC框架–客户端netty 对比一些)
注意初始化完毕之后 我们去FixedChannelPool中获取的channelhandler的时候用完需要放回去。

  1. public class RPCRequestNet {
  2. //管理以ip:端口号为key的连接池 FixedChannelPool继承SimpleChannelPool,有大小限制的连接池实现
  3. public static ChannelPoolMap<InetSocketAddress, FixedChannelPool> poolMap;
  4. //启动辅助类 用于配置各种参数
  5. private static Bootstrap b=new Bootstrap();
  6. //单利
  7. private static RPCRequestNet rpcClient = new RPCRequestNet();
  8. static {
  9. b.group(new NioEventLoopGroup())
  10. .channel(NioSocketChannel.class)
  11. .option(ChannelOption.TCP_NODELAY,true);//禁止使用Nagle算法 作用小数据即时传输
  12. }
  13. public static RPCRequestNet getRPCRequestNet(){
  14. return rpcClient;
  15. }
  16. private RPCRequestNet(){
  17. init();
  18. }
  19. public void init() {
  20. poolMap = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() {
  21. @Override
  22. protected FixedChannelPool newPool(InetSocketAddress inetSocketAddress) {
  23. ChannelPoolHandler handler = new ChannelPoolHandler() {
  24. //使用完channel需要释放才能放入连接池
  25. @Override
  26. public void channelReleased(Channel ch) throws Exception {
  27. // TODO Auto-generated method stub
  28. }
  29. //当链接创建的时候添加channelhandler,只有当channel不足时会创建,但不会超过限制的最大channel数
  30. @Override
  31. public void channelCreated(Channel ch) throws Exception {
  32. System.out.println("channelCreated. Channel ID: " + ch.id());
  33. //ch.pipeline().addLast(new LineBasedFrameDecoder(8192));//以换行符分包
  34. //ch.pipeline().addLast(new StringDecoder());//将接收到的对象转为字符串
  35. ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2,0,2));
  36. ch.pipeline().addLast(new LengthFieldPrepender(2));
  37. ch.pipeline().addLast(new ClientMsgPackEncode());//编码
  38. ch.pipeline().addLast(new ClientMsgPackDecode());//解码
  39. ch.pipeline().addLast(new RPCRequestHandler());//添加相应回调处理
  40. }
  41. //获取连接池中的channel
  42. @Override
  43. public void channelAcquired(Channel ch) throws Exception {
  44. // TODO Auto-generated method stub
  45. }
  46. };
  47. return new FixedChannelPool(b.remoteAddress(inetSocketAddress), handler, 5); //单个服务端连接池大小
  48. }
  49. };
  50. }
  51. public void connect(String host,int port,final RPCRequest request){
  52. InetSocketAddress addr = new InetSocketAddress(host, port);
  53. final SimpleChannelPool pool = RPCRequestNet.poolMap.get(addr);
  54. final Future<Channel> f = pool.acquire();
  55. f.addListener(new FutureListener<Channel>() {
  56. @Override
  57. public void operationComplete(Future<Channel> arg0) throws Exception {
  58. if (f.isSuccess()) {
  59. Channel ch = f.getNow();
  60. ch.writeAndFlush(request);
  61. //放回去
  62. pool.release(ch);
  63. }
  64. }
  65. });
  66. synchronized (request) {
  67. //因为异步 所以不阻塞的话 该线程获取不到返回值
  68. //放弃对象锁 并阻塞等待notify
  69. try {
  70. request.wait();
  71. } catch (InterruptedException e) {
  72. // TODO Auto-generated catch block
  73. e.printStackTrace();
  74. }
  75. }
  76. }
  77. }

Netty客户端处理类

  1. public class RPCRequestHandler extends ChannelInboundHandlerAdapter {
  2. public static ChannelHandlerContext channelCtx;
  3. @Override
  4. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  5. super.channelActive(ctx);
  6. }
  7. @Override
  8. //异步调用读取管道数据
  9. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  10. RPCResponse response= (RPCResponse) msg;
  11. //System.out.println("获取到服务器返回值"+responseJson);
  12. synchronized (RPCProxyHandler.requestLockMap.get(response.getRequestID())) {
  13. //当客户段获取到返回值的时候唤醒在该对象锁上wait的线程
  14. RPCRequest request= (RPCRequest) RPCProxyHandler.requestLockMap.get(response.getRequestID());
  15. //System.out.println("2 : "+response.getResult());
  16. request.setResult(response.getResult());
  17. request.notifyAll();
  18. }
  19. }
  20. }

效果

普通调用只需要创建一个足够了,大大的利用了资源。
在这里插入图片描述
如果我们通过JMeter 20个用户 调用20次
在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,549人围观)

还没有评论,来说两句吧...

相关阅读

    相关 golang

    目录 1、go 的调度 2、 golang中 map的底层实现 3、go 的垃圾回收机制 4、数组与切片的区别 5、go struct能不能比较 6、select