由浅入深Netty代码调优

快来打我* 2024-03-17 09:13 99阅读 0赞

目录

    1. 优化
    • 1.1 扩展序列化算法
  • 2 参数调优
    • 2.1 CONNECT_TIMEOUT_MILLIS
    • 2.2 SO_BACKLOG
    • 2.3 ulimit -n
    • 2.4 TCP_NODELAY
    • 2.5 SO_SNDBUF & SO_RCVBUF
    • 2.6 ALLOCATOR
    • 2.7 RCVBUF_ALLOCATOR

1. 优化

在这里插入图片描述

1.1 扩展序列化算法

序列化,反序列化主要用在消息正文的转换上

  • 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
  • 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理

目前的代码仅支持 Java 自带的序列化,反序列化机制,核心代码如下

  1. // 反序列化
  2. byte[] body = new byte[bodyLength];
  3. byteByf.readBytes(body);
  4. ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
  5. Message message = (Message) in.readObject();
  6. message.setSequenceId(sequenceId);
  7. // 序列化
  8. ByteArrayOutputStream out = new ByteArrayOutputStream();
  9. new ObjectOutputStream(out).writeObject(message);
  10. byte[] bytes = out.toByteArray();

为了支持更多序列化算法,抽象一个 Serializer 接口

  1. public interface Serializer {
  2. // 反序列化方法
  3. <T> T deserialize(Class<T> clazz, byte[] bytes);
  4. // 序列化方法
  5. <T> byte[] serialize(T object);
  6. }

提供两个实现,我这里直接将实现加入了枚举类 Serializer.Algorithm 中

  1. enum SerializerAlgorithm implements Serializer {
  2. // Java 实现
  3. Java {
  4. @Override
  5. public <T> T deserialize(Class<T> clazz, byte[] bytes) {
  6. try {
  7. ObjectInputStream in =
  8. new ObjectInputStream(new ByteArrayInputStream(bytes));
  9. Object object = in.readObject();
  10. return (T) object;
  11. } catch (IOException | ClassNotFoundException e) {
  12. throw new RuntimeException("SerializerAlgorithm.Java 反序列化错误", e);
  13. }
  14. }
  15. @Override
  16. public <T> byte[] serialize(T object) {
  17. try {
  18. ByteArrayOutputStream out = new ByteArrayOutputStream();
  19. new ObjectOutputStream(out).writeObject(object);
  20. return out.toByteArray();
  21. } catch (IOException e) {
  22. throw new RuntimeException("SerializerAlgorithm.Java 序列化错误", e);
  23. }
  24. }
  25. },
  26. // Json 实现(引入了 Gson 依赖)
  27. Json {
  28. @Override
  29. public <T> T deserialize(Class<T> clazz, byte[] bytes) {
  30. return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);
  31. }
  32. @Override
  33. public <T> byte[] serialize(T object) {
  34. return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8);
  35. }
  36. };
  37. // 需要从协议的字节中得到是哪种序列化算法
  38. public static SerializerAlgorithm getByInt(int type) {
  39. SerializerAlgorithm[] array = SerializerAlgorithm.values();
  40. if (type < 0 || type > array.length - 1) {
  41. throw new IllegalArgumentException("超过 SerializerAlgorithm 范围");
  42. }
  43. return array[type];
  44. }
  45. }

增加配置类和配置文件

  1. public abstract class Config {
  2. static Properties properties;
  3. static {
  4. try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
  5. properties = new Properties();
  6. properties.load(in);
  7. } catch (IOException e) {
  8. throw new ExceptionInInitializerError(e);
  9. }
  10. }
  11. public static int getServerPort() {
  12. String value = properties.getProperty("server.port");
  13. if(value == null) {
  14. return 8080;
  15. } else {
  16. return Integer.parseInt(value);
  17. }
  18. }
  19. public static Serializer.Algorithm getSerializerAlgorithm() {
  20. String value = properties.getProperty("serializer.algorithm");
  21. if(value == null) {
  22. return Serializer.Algorithm.Java;
  23. } else {
  24. return Serializer.Algorithm.valueOf(value);
  25. }
  26. }
  27. }

配置文件

  1. serializer.algorithm=Json

修改编解码器

  1. /**
  2. * 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
  3. */
  4. public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
  5. @Override
  6. public void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
  7. ByteBuf out = ctx.alloc().buffer();
  8. // 1. 4 字节的魔数
  9. out.writeBytes(new byte[]{
  10. 1, 2, 3, 4});
  11. // 2. 1 字节的版本,
  12. out.writeByte(1);
  13. // 3. 1 字节的序列化方式 jdk 0 , json 1
  14. out.writeByte(Config.getSerializerAlgorithm().ordinal());
  15. // 4. 1 字节的指令类型
  16. out.writeByte(msg.getMessageType());
  17. // 5. 4 个字节
  18. out.writeInt(msg.getSequenceId());
  19. // 无意义,对齐填充
  20. out.writeByte(0xff);
  21. // 6. 获取内容的字节数组
  22. byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);
  23. // 7. 长度
  24. out.writeInt(bytes.length);
  25. // 8. 写入内容
  26. out.writeBytes(bytes);
  27. outList.add(out);
  28. }
  29. @Override
  30. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
  31. int magicNum = in.readInt();
  32. byte version = in.readByte();
  33. byte serializerAlgorithm = in.readByte(); // 0 或 1
  34. byte messageType = in.readByte(); // 0,1,2...
  35. int sequenceId = in.readInt();
  36. in.readByte();
  37. int length = in.readInt();
  38. byte[] bytes = new byte[length];
  39. in.readBytes(bytes, 0, length);
  40. // 找到反序列化算法
  41. Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm];
  42. // 确定具体消息类型
  43. Class<? extends Message> messageClass = Message.getMessageClass(messageType);
  44. Message message = algorithm.deserialize(messageClass, bytes);
  45. // log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
  46. // log.debug("{}", message);
  47. out.add(message);
  48. }
  49. }

其中确定具体消息类型,可以根据 消息类型字节 获取到对应的 消息 class

  1. @Data
  2. public abstract class Message implements Serializable {
  3. /**
  4. * 根据消息类型字节,获得对应的消息 class
  5. * @param messageType 消息类型字节
  6. * @return 消息 class
  7. */
  8. public static Class<? extends Message> getMessageClass(int messageType) {
  9. return messageClasses.get(messageType);
  10. }
  11. private int sequenceId;
  12. private int messageType;
  13. public abstract int getMessageType();
  14. public static final int LoginRequestMessage = 0;
  15. public static final int LoginResponseMessage = 1;
  16. public static final int ChatRequestMessage = 2;
  17. public static final int ChatResponseMessage = 3;
  18. public static final int GroupCreateRequestMessage = 4;
  19. public static final int GroupCreateResponseMessage = 5;
  20. public static final int GroupJoinRequestMessage = 6;
  21. public static final int GroupJoinResponseMessage = 7;
  22. public static final int GroupQuitRequestMessage = 8;
  23. public static final int GroupQuitResponseMessage = 9;
  24. public static final int GroupChatRequestMessage = 10;
  25. public static final int GroupChatResponseMessage = 11;
  26. public static final int GroupMembersRequestMessage = 12;
  27. public static final int GroupMembersResponseMessage = 13;
  28. public static final int PingMessage = 14;
  29. public static final int PongMessage = 15;
  30. private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();
  31. static {
  32. messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
  33. messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
  34. messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
  35. messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
  36. messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
  37. messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
  38. messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
  39. messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
  40. messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
  41. messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
  42. messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
  43. messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
  44. messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
  45. messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
  46. }
  47. }

2 参数调优

2.1 CONNECT_TIMEOUT_MILLIS

  • 属于 SocketChannal 参数
  • 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
  • SO_TIMEOUT 主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间

    @Slf4j
    public class TestConnectionTimeout {

    1. public static void main(String[] args) {
    2. NioEventLoopGroup group = new NioEventLoopGroup();
    3. try {
    4. Bootstrap bootstrap = new Bootstrap()
    5. .group(group)
    6. .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300)
    7. .channel(NioSocketChannel.class)
    8. .handler(new LoggingHandler());
    9. ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
    10. future.sync().channel().closeFuture().sync(); // 断点1
    11. } catch (Exception e) {
    12. e.printStackTrace();
    13. log.debug("timeout");
    14. } finally {
    15. group.shutdownGracefully();
    16. }
    17. }

    }

另外源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect

  1. @Override
  2. public final void connect(
  3. final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
  4. // ...
  5. // Schedule connect timeout.
  6. int connectTimeoutMillis = config().getConnectTimeoutMillis();
  7. if (connectTimeoutMillis > 0) {
  8. connectTimeoutFuture = eventLoop().schedule(new Runnable() {
  9. @Override
  10. public void run() {
  11. ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
  12. ConnectTimeoutException cause =
  13. new ConnectTimeoutException("connection timed out: " + remoteAddress); // 断点2
  14. if (connectPromise != null && connectPromise.tryFailure(cause)) {
  15. close(voidPromise());
  16. }
  17. }
  18. }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
  19. }
  20. // ...
  21. }

2.2 SO_BACKLOG

  • 属于 ServerSocketChannal 参数

clientserversyns queueaccept queuebind()listen()connect()1. SYNSYN_SENDputSYN_RCVD2. SYN + ACKESTABLISHED3. ACKputESTABLISHEDaccept()clientserversyns queueaccept queue

  1. 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
  2. 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
  3. 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue

其中

  • 在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制
  • sync queue - 半连接队列

    • 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
  • accept queue - 全连接队列

    • 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
    • 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client

netty 中

可以通过 option(ChannelOption.SO_BACKLOG, 值) 来设置大小

可以通过下面源码查看默认大小

  1. public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
  2. implements ServerSocketChannelConfig {
  3. private volatile int backlog = NetUtil.SOMAXCONN;
  4. // ...
  5. }

课堂调试关键断点为:io.netty.channel.nio.NioEventLoop#processSelectedKey

oio 中更容易说明,不用 debug 模式

  1. public class Server {
  2. public static void main(String[] args) throws IOException {
  3. ServerSocket ss = new ServerSocket(8888, 2);
  4. Socket accept = ss.accept();
  5. System.out.println(accept);
  6. System.in.read();
  7. }
  8. }

客户端启动 4 个

  1. public class Client {
  2. public static void main(String[] args) throws IOException {
  3. try {
  4. Socket s = new Socket();
  5. System.out.println(new Date()+" connecting...");
  6. s.connect(new InetSocketAddress("localhost", 8888),1000);
  7. System.out.println(new Date()+" connected...");
  8. s.getOutputStream().write(1);
  9. System.in.read();
  10. } catch (IOException e) {
  11. System.out.println(new Date()+" connecting timeout...");
  12. e.printStackTrace();
  13. }
  14. }
  15. }

第 1,2,3 个客户端都打印,但除了第一个处于 accpet 外,其它两个都处于 accept queue 中

  1. Tue Apr 21 20:30:28 CST 2020 connecting...
  2. Tue Apr 21 20:30:28 CST 2020 connected...

第 4 个客户端连接时

  1. Tue Apr 21 20:53:58 CST 2020 connecting...
  2. Tue Apr 21 20:53:59 CST 2020 connecting timeout...
  3. java.net.SocketTimeoutException: connect timed out

2.3 ulimit -n

  • 属于操作系统参数

2.4 TCP_NODELAY

  • 属于 SocketChannal 参数

2.5 SO_SNDBUF & SO_RCVBUF

  • SO_SNDBUF 属于 SocketChannal 参数
  • SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)

2.6 ALLOCATOR

  • 属于 SocketChannal 参数
  • 用来分配 ByteBuf, ctx.alloc()

2.7 RCVBUF_ALLOCATOR

  • 属于 SocketChannal 参数
  • 控制 netty 接收缓冲区大小
  • 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定

发表评论

表情:
评论列表 (有 0 条评论,99人围观)

还没有评论,来说两句吧...

相关阅读

    相关 代码

    “”代码调优“”首先确定程序中开销较大的部分,然后对齐修改,但其也是双刃剑,当可能存在的危害影响较大时,请考虑适当将效率放一放。接下来举几个常见的手段: (1)使用开销小的

    相关 spark JVM

    我们的堆内存分为:新生代,和年老代, 年轻代又分为:Eden区,幸存一区,幸存二区, 每一次访对象的时候,都是放入eden区域,和其中的一个幸存一区中,幸存二区是不放对象的