Netty之文件传输

╰+哭是因爲堅強的太久メ 2022-07-12 23:46 280阅读 0赞

1、工程的目录结构

20161215223056082

2、GZIP进行压缩和解压的工具类

  1. import java.io.ByteArrayInputStream;
  2. import java.io.ByteArrayOutputStream;
  3. import java.io.File;
  4. import java.io.FileInputStream;
  5. import java.io.FileOutputStream;
  6. import java.util.zip.GZIPInputStream;
  7. import java.util.zip.GZIPOutputStream;
  8. public class GzipUtils {
  9. public static byte[] gzip(byte[] data) throws Exception {
  10. ByteArrayOutputStream bos = new ByteArrayOutputStream();
  11. GZIPOutputStream gzip = new GZIPOutputStream(bos);
  12. gzip.write(data);
  13. gzip.finish();
  14. gzip.close();
  15. byte[] ret = bos.toByteArray();
  16. bos.close();
  17. return ret;
  18. }
  19. public static byte[] ungzip(byte[] data) throws Exception {
  20. ByteArrayInputStream bis = new ByteArrayInputStream(data);
  21. GZIPInputStream gzip = new GZIPInputStream(bis);
  22. byte[] buf = new byte[1024];
  23. int num = -1;
  24. ByteArrayOutputStream bos = new ByteArrayOutputStream();
  25. while ((num = gzip.read(buf, 0, buf.length)) != -1) {
  26. bos.write(buf, 0, num);
  27. }
  28. gzip.close();
  29. bis.close();
  30. byte[] ret = bos.toByteArray();
  31. bos.flush();
  32. bos.close();
  33. return ret;
  34. }
  35. public static void main(String[] args) throws Exception {
  36. // 读取文件
  37. String readPath = System.getProperty("user.dir") + File.separatorChar
  38. + "sources" + File.separatorChar + "005.jpg";
  39. File file = new File(readPath);
  40. FileInputStream in = new FileInputStream(file);
  41. byte[] data = new byte[in.available()];
  42. in.read(data);
  43. in.close();
  44. System.out.println("文件原始大小:" + data.length);
  45. // 测试压缩
  46. byte[] ret1 = GzipUtils.gzip(data);
  47. System.out.println("压缩之后大小:" + ret1.length);
  48. // 还原文件
  49. byte[] ret2 = GzipUtils.ungzip(ret1);
  50. System.out.println("还原之后大小:" + ret2.length);
  51. // 写出文件
  52. String writePath = System.getProperty("user.dir") + File.separatorChar
  53. + "receive" + File.separatorChar + "005.jpg";
  54. FileOutputStream fos = new FileOutputStream(writePath);
  55. fos.write(ret2);
  56. fos.close();
  57. }
  58. }

3、在客户端的请求Request中,加入要传输的文件(private byte[] attachment)

  1. import java.io.Serializable;
  2. public class Request implements Serializable {
  3. /**
  4. *
  5. */
  6. private static final long serialVersionUID = -2813211330451521507L;
  7. private String id;
  8. private String name;
  9. private String requestMessage;
  10. private byte[] attachment;
  11. public String getId() {
  12. return id;
  13. }
  14. public void setId(String id) {
  15. this.id = id;
  16. }
  17. public String getName() {
  18. return name;
  19. }
  20. public void setName(String name) {
  21. this.name = name;
  22. }
  23. public String getRequestMessage() {
  24. return requestMessage;
  25. }
  26. public void setRequestMessage(String requestMessage) {
  27. this.requestMessage = requestMessage;
  28. }
  29. public byte[] getAttachment() {
  30. return attachment;
  31. }
  32. public void setAttachment(byte[] attachment) {
  33. this.attachment = attachment;
  34. }
  35. }

4、服务端的Response

  1. import java.io.Serializable;
  2. public class Response implements Serializable {
  3. /**
  4. *
  5. */
  6. private static final long serialVersionUID = -5640678664176009458L;
  7. private String id;
  8. private String name;
  9. private String responseMessage;
  10. public String getId() {
  11. return id;
  12. }
  13. public void setId(String id) {
  14. this.id = id;
  15. }
  16. public String getName() {
  17. return name;
  18. }
  19. public void setName(String name) {
  20. this.name = name;
  21. }
  22. public String getResponseMessage() {
  23. return responseMessage;
  24. }
  25. public void setResponseMessage(String responseMessage) {
  26. this.responseMessage = responseMessage;
  27. }
  28. @Override
  29. public String toString() {
  30. return "Response [id=" + id + ", name=" + name + ", responseMessage="
  31. + responseMessage + "]";
  32. }
  33. }

5、Jboss序列化工具

  1. import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
  2. import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
  3. import io.netty.handler.codec.marshalling.MarshallerProvider;
  4. import io.netty.handler.codec.marshalling.MarshallingDecoder;
  5. import io.netty.handler.codec.marshalling.MarshallingEncoder;
  6. import io.netty.handler.codec.marshalling.UnmarshallerProvider;
  7. import org.jboss.marshalling.MarshallerFactory;
  8. import org.jboss.marshalling.Marshalling;
  9. import org.jboss.marshalling.MarshallingConfiguration;
  10. /**
  11. * Marshalling工厂
  12. *
  13. */
  14. public final class MarshallingCodeCFactory {
  15. /**
  16. * 创建Jboss Marshalling解码器MarshallingDecoder
  17. *
  18. * @return MarshallingDecoder
  19. */
  20. public static MarshallingDecoder buildMarshallingDecoder() {
  21. // 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
  22. final MarshallerFactory marshallerFactory = Marshalling
  23. .getProvidedMarshallerFactory("serial");
  24. // 创建了MarshallingConfiguration对象,配置了版本号为5
  25. final MarshallingConfiguration configuration = new MarshallingConfiguration();
  26. configuration.setVersion(5);
  27. // 根据marshallerFactory和configuration创建provider
  28. UnmarshallerProvider provider = new DefaultUnmarshallerProvider(
  29. marshallerFactory, configuration);
  30. // 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
  31. MarshallingDecoder decoder = new MarshallingDecoder(provider,
  32. 1024 * 1024 * 1);
  33. return decoder;
  34. }
  35. /**
  36. * 创建Jboss Marshalling编码器MarshallingEncoder
  37. *
  38. * @return MarshallingEncoder
  39. */
  40. public static MarshallingEncoder buildMarshallingEncoder() {
  41. final MarshallerFactory marshallerFactory = Marshalling
  42. .getProvidedMarshallerFactory("serial");
  43. final MarshallingConfiguration configuration = new MarshallingConfiguration();
  44. configuration.setVersion(5);
  45. MarshallerProvider provider = new DefaultMarshallerProvider(
  46. marshallerFactory, configuration);
  47. // 构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
  48. MarshallingEncoder encoder = new MarshallingEncoder(provider);
  49. return encoder;
  50. }
  51. }

6、服务端的实现

  1. import io.netty.bootstrap.ServerBootstrap;
  2. import io.netty.channel.ChannelFuture;
  3. import io.netty.channel.ChannelInitializer;
  4. import io.netty.channel.ChannelOption;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioServerSocketChannel;
  9. import io.netty.handler.logging.LogLevel;
  10. import io.netty.handler.logging.LoggingHandler;
  11. public class Server {
  12. public Server() {
  13. }
  14. public void bind(int port) throws Exception {
  15. // 配置NIO线程组
  16. EventLoopGroup bossGroup = new NioEventLoopGroup();
  17. EventLoopGroup workerGroup = new NioEventLoopGroup();
  18. try {
  19. // 服务器辅助启动类配置
  20. ServerBootstrap b = new ServerBootstrap();
  21. b.group(bossGroup, workerGroup)
  22. .channel(NioServerSocketChannel.class)
  23. .handler(new LoggingHandler(LogLevel.INFO))
  24. .childHandler(new ChildChannelHandler())//
  25. .option(ChannelOption.SO_BACKLOG, 1024) // 设置tcp缓冲区 // (5)
  26. .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
  27. // 绑定端口 同步等待绑定成功
  28. ChannelFuture f = b.bind(port).sync(); // (7)
  29. // 等到服务端监听端口关闭
  30. f.channel().closeFuture().sync();
  31. } finally {
  32. // 优雅释放线程资源
  33. workerGroup.shutdownGracefully();
  34. bossGroup.shutdownGracefully();
  35. }
  36. }
  37. /**
  38. * 网络事件处理器
  39. */
  40. private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
  41. @Override
  42. protected void initChannel(SocketChannel ch) throws Exception {
  43. // 添加Jboss的序列化,编解码工具
  44. ch.pipeline().addLast(
  45. MarshallingCodeCFactory.buildMarshallingEncoder());
  46. ch.pipeline().addLast(
  47. MarshallingCodeCFactory.buildMarshallingDecoder());
  48. // 处理网络IO
  49. ch.pipeline().addLast(new ServerHandler());
  50. }
  51. }
  52. public static void main(String[] args) throws Exception {
  53. new Server().bind(9999);
  54. }
  55. }

7、服务端的Handler的实现

  1. import java.io.File;
  2. import java.io.FileOutputStream;
  3. import com.netty.utils.GzipUtils;
  4. import io.netty.channel.ChannelHandlerAdapter;
  5. import io.netty.channel.ChannelHandlerContext;
  6. public class ServerHandler extends ChannelHandlerAdapter {
  7. // 用于获取客户端发送的信息
  8. @Override
  9. public void channelRead(ChannelHandlerContext ctx, Object msg)
  10. throws Exception {
  11. Request req = (Request) msg;
  12. System.out.println("Server : " + req.getId() + ", " + req.getName()
  13. + ", " + req.getRequestMessage());
  14. // 进行图片资源的还原
  15. byte[] attachment = GzipUtils.ungzip(req.getAttachment());
  16. // 获取图片的保存目录
  17. String path = System.getProperty("user.dir") + File.separatorChar
  18. + "receive" + File.separatorChar + req.getName();
  19. // 进行图片的保存
  20. FileOutputStream fos = new FileOutputStream(path);
  21. fos.write(attachment);
  22. fos.close();
  23. // 给客户端,响应数据
  24. Response resp = new Response();
  25. resp.setId(req.getId());
  26. resp.setName("resp" + req.getId());
  27. resp.setResponseMessage("响应内容" + req.getId());
  28. ctx.writeAndFlush(resp);// .addListener(ChannelFutureListener.CLOSE);
  29. }
  30. @Override
  31. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
  32. throws Exception {
  33. // cause.printStackTrace();
  34. ctx.close();
  35. }
  36. }

8、客户端的实现

  1. import io.netty.bootstrap.Bootstrap;
  2. import io.netty.channel.ChannelFuture;
  3. import io.netty.channel.ChannelInitializer;
  4. import io.netty.channel.ChannelOption;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. public class Client {
  10. /**
  11. * 连接服务器
  12. *
  13. * @param port
  14. * @param host
  15. * @throws Exception
  16. */
  17. public void connect(int port, String host) throws Exception {
  18. // 配置客户端NIO线程组
  19. EventLoopGroup group = new NioEventLoopGroup();
  20. try {
  21. // 客户端辅助启动类 对客户端配置
  22. Bootstrap b = new Bootstrap();
  23. b.group(group)//
  24. .channel(NioSocketChannel.class)//
  25. .option(ChannelOption.TCP_NODELAY, true)//
  26. .handler(new MyChannelHandler());//
  27. // 异步链接服务器 同步等待链接成功
  28. ChannelFuture f = b.connect(host, port).sync();
  29. // 等待链接关闭
  30. f.channel().closeFuture().sync();
  31. } finally {
  32. group.shutdownGracefully();
  33. System.out.println("客户端优雅的释放了线程资源...");
  34. }
  35. }
  36. /**
  37. * 网络事件处理器
  38. */
  39. private class MyChannelHandler extends ChannelInitializer<SocketChannel> {
  40. @Override
  41. protected void initChannel(SocketChannel ch) throws Exception {
  42. // 添加Jboss的序列化,编解码工具
  43. ch.pipeline().addLast(
  44. MarshallingCodeCFactory.buildMarshallingEncoder());
  45. ch.pipeline().addLast(
  46. MarshallingCodeCFactory.buildMarshallingDecoder());
  47. // 处理网络IO
  48. ch.pipeline().addLast(new ClientHandler());// 处理网络IO
  49. }
  50. }
  51. public static void main(String[] args) throws Exception {
  52. new Client().connect(9999, "127.0.0.1");
  53. }
  54. }

9、客户端的Handler的实现

  1. import java.io.File;
  2. import java.io.FileInputStream;
  3. import com.netty.utils.GzipUtils;
  4. import io.netty.channel.ChannelHandlerAdapter;
  5. import io.netty.channel.ChannelHandlerContext;
  6. import io.netty.util.ReferenceCountUtil;
  7. //用于读取客户端发来的信息
  8. public class ClientHandler extends ChannelHandlerAdapter {
  9. // 客户端与服务端,连接成功的售后
  10. @Override
  11. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  12. // 进行图片的传输
  13. for (int i = 0; i < 5; i++) {
  14. String fileName = "00" + (i + 1) + ".jpg";
  15. // 进行文件的读取
  16. String path = System.getProperty("user.dir") + File.separatorChar
  17. + "sources" + File.separatorChar + fileName;
  18. File file = new File(path);
  19. Request req = new Request();
  20. req.setId("" + i);
  21. req.setName(file.getName());
  22. req.setRequestMessage("数据信息" + i);
  23. // 进行图片的读取
  24. FileInputStream in = new FileInputStream(file);
  25. byte[] data = new byte[in.available()];
  26. in.read(data);
  27. in.close();
  28. // 进行数据的压缩
  29. req.setAttachment(GzipUtils.gzip(data));
  30. // 向服务端,传送图片信息
  31. ctx.channel().writeAndFlush(req);
  32. }
  33. }
  34. // 只是读数据,没有写数据的话
  35. // 需要自己手动的释放的消息
  36. @Override
  37. public void channelRead(ChannelHandlerContext ctx, Object msg)
  38. throws Exception {
  39. try {
  40. Response response = (Response) msg;
  41. System.out.println(response);
  42. } finally {
  43. ReferenceCountUtil.release(msg);
  44. }
  45. }
  46. @Override
  47. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
  48. throws Exception {
  49. ctx.close();
  50. }
  51. }

发表评论

表情:
评论列表 (有 0 条评论,280人围观)

还没有评论,来说两句吧...

相关阅读

    相关 netty传输java bean对象

    在上一篇博客(netty入门实现简单的echo程序)中,我们知道了如何使用netty发送一个简单的消息,但是这远远是不够的。在这篇博客中,我们来使用netty发送一个java

    相关 Netty权威指南文件传输

    本章相关知识点: 文件是最常见的数据源之一,在程序经常需要将数据存储到文件中,比如:图片文件、声音文件等数据文件。在实际使用中,文件都包含一个特定的格式,这个格式需要程序员根

    相关 netty去实现文件传输

    在写出了Netty Hello World 和 netty对象传输之后,又觉得不够,看了官网的例子,所以有了现在的这个文件传输。 顺便说下,netty官网的例子真的好,如果