自己动手实现RPC框架 ゝ一纸荒年。 2022-12-19 15:23 237阅读 0赞 ### 文章目录 ### * * * 1.跨进程数据交换 * * 1.1 依赖中间件做数据交互 * 2.1 直接交互 * 2.RPC架构 * 3.现有RPC框架对比 * 4.自己动手实现RPC框架 * * 4.0 共同模块(common) * 4.1 协议模块(proto) * 4.2 序列化模块(codec) * 4.3 网络传输模块(transport) * 4.4 服务端模块(server) * 4.5 客户端模块(server) * 4.6 使用案例模块(example) > RPC(Remote Procedure Call)远程过程调用,简单的理解是一个节点请求另一个节点提供的服务 **源码:** [https://github.com/wantao666/jrpc][https_github.com_wantao666_jrpc] **参考视频:** [自己动手实现RPC框架][RPC] ### 1.跨进程数据交换 ### #### 1.1 依赖中间件做数据交互 #### ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQ1NDUzMjY2_size_16_color_FFFFFF_t_70_pic_center] **优点:** 解耦合,异步 **缺点:** 系统复杂性提高,可用性降低 #### 2.1 直接交互 #### ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQ1NDUzMjY2_size_16_color_FFFFFF_t_70_pic_center 1] **优点:** 系统复杂性低,速度快 **缺点:** 耦合度高,同步 **http和rpc的区别:** 相同点: 都是基于socket通信,都可以实现远程调用 不同点: rpc: 自定义tcp协议,报文更小,调用快,处理快。 http: 通用性更强,实现较为复杂 总结: 如果局域网内服务的调用,最好使用rpc,因为这样更快。如果需要提供对外的环境,如浏览器调用、APP调用、第三方服务调用,则最好使用http ### 2.RPC架构 ### ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQ1NDUzMjY2_size_16_color_FFFFFF_t_70_pic_center 2] 服务提供者(RPC Server): 运行在服务器端,提供服务接口定义与服务实现类。 服务中心(Registry): 运行在服务器端,负责将本地服务发布成远程服务,管理远程服务,提供给服务消费者使用。 服务消费者(RPC Client): 运行在客户端,通过远程代理对象调用远程服务。 ### 3.现有RPC框架对比 ### ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQ1NDUzMjY2_size_16_color_FFFFFF_t_70_pic_center 3] ### 4.自己动手实现RPC框架 ### ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQ1NDUzMjY2_size_16_color_FFFFFF_t_70_pic_center 4] 核心主要有5个主要的模块组成. 1. 协议模块 2. 序列化模块 3. 网络传输模块 4. 服务端模块 5. 客户端模块 #### 4.0 共同模块(common) #### 提供反射工具类 `ReflectionUtils.java` /** * 反射工具类 */ public class ReflectionUtils { //根据类clazz创建对象 public static <T> T newInstance(Class<T> clazz) { T instance = null; try { instance = clazz.newInstance(); } catch (Exception e) { e.printStackTrace(); } return instance; } //获得该类自身声明的所有public方法 public static Method[] getPublicMethods(Class clazz) { List<Method> pMethods = new ArrayList<>(); //返回该类声明的所有方法 Method[] methods = clazz.getDeclaredMethods(); //过滤到public方法 pMethods = Arrays .stream(methods) .filter(method -> Modifier.isPublic(method.getModifiers()) ).collect(Collectors.toList()); return pMethods.toArray(new Method[0]); } //调用指定对象的指定方法 public static Object invoke(Object obj, Method method, Object... args) { Object object = null; try { object = method.invoke(obj, args); } catch (Exception e) { e.printStackTrace(); } return object; } } #### 4.1 协议模块(proto) #### 主要是封装了RPC的请求,响应和服务描述类。 `ServiceDescriptor.java` public class ServiceDescriptor { private String clazz; private String method; private String returnType; private String[] paramTypes; } `Request.java` public class Request { private ServiceDescriptor service; private Object[] params; } `Response.java` public class Response { private Integer code = 0; // 0 success private String msg = "success"; private Object data; } #### 4.2 序列化模块(codec) #### 使用fastjson,基于json进行序列化和反序列化,将java对象转为json形式进行网络传输,接受到json后再转为java对象。 `JsonEncoder.java` /** * 基于Json的序列化 */ public class JsonEncoder implements Encoder { @Override public byte[] encode(Object obj) { return JSON.toJSONBytes(obj); } } `JsonDecoder.java` /** * 基于Json的反序列化 */ public class JsonDecoder implements Decoder { @Override public <T> T decode(byte[] bytes, Class<T> clazz) { return JSON.parseObject(bytes, clazz); } } #### 4.3 网络传输模块(transport) #### 客户端: 基于java原生的HttpURLConnection,建立短连接。 `HttpTransportClient.java` /** * 基于Http(HttpURLConnection)的客户端:短连接 */ @Slf4j public class HttpTransportClient implements TransportClient { private String url; @Override public void connect(Peer peer) { this.url = "http://" + peer.getHost() + ":" + peer.getPort(); } @Override public InputStream write(InputStream data) { try { log.info("URL:" + this.url); HttpURLConnection connection = (HttpURLConnection) new URL(this.url).openConnection(); //是否能够向connection中输入,如发送post请求,默认是false connection.setDoOutput(true); connection.setDoInput(true); connection.setUseCaches(false); connection.setRequestMethod("POST"); connection.connect(); //将data写入connection IOUtils.copy(data, connection.getOutputStream()); int code = connection.getResponseCode(); if (code == HttpURLConnection.HTTP_OK) { return connection.getInputStream(); } else { return connection.getErrorStream(); } } catch (IOException e) { e.printStackTrace(); } return null; } @Override public void close() { } } 服务器端: 基于jetty,构建嵌入式web服务器。 `HttpTransportServer.java` /** * 基于http(jetty)的服务器端 */ @Slf4j public class HttpTransportServer implements TransportServer { private Server server; private RequestHandler handler; @Override public void init(int port, RequestHandler handler) { this.server = new Server(port); this.handler = handler; //servlet接收请求 ServletContextHandler ctx = new ServletContextHandler(); server.setHandler(ctx); //ServletHolder:网络请求抽象 ServletHolder holder = new ServletHolder(new RequestServlet()); ctx.addServlet(holder, "/*"); } @Override public void start() { try { server.start(); //让server一直挂起 server.join(); } catch (Exception e) { e.printStackTrace(); log.error(e.getMessage(), e); } } @Override public void stop() { try { server.stop(); } catch (Exception e) { e.printStackTrace(); log.error(e.getMessage(), e); } } class RequestServlet extends HttpServlet { @Override protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { log.info("get request success"); ServletInputStream in = req.getInputStream(); ServletOutputStream out = resp.getOutputStream(); if (handler != null) { handler.onRequest(in, out); } out.flush(); } } } #### 4.4 服务端模块(server) #### ServiceManager:以ConcurentHashMap作为容器,提供服务注册和服务查找功能。 `ServiceManager.java` @Slf4j public class ServiceManager { private Map<ServiceDescriptor, ServiceInstance> services; public ServiceManager() { this.services = new ConcurrentHashMap<>(); } //注册服务 public <T> void register(Class<T> interfaceClass, T bean) { Method[] methods = ReflectionUtils.getPublicMethods(interfaceClass); for (Method method : methods) { ServiceInstance serviceInstance = new ServiceInstance(bean, method); ServiceDescriptor sdp = ServiceDescriptor.from(interfaceClass, method); services.put(sdp, serviceInstance); log.info("register service: {}:{}", sdp.getClazz(), sdp.getMethod()); } } //查找服务 public ServiceInstance lookup(Request request) { ServiceDescriptor sdp = request.getService(); return services.get(sdp); } } ServiceInvoker:通过动态代理,调用具体服务 `ServiceInvoker.java` /** * 调用具体服务 */ public class ServiceInvoker { public Object invoke(ServiceInstance service, Request request) { return ReflectionUtils.invoke(service.getTarget(), service.getMethod(), request.getParams()); } } #### 4.5 客户端模块(server) #### RemoteInvoker: 使用jdk默认的动态代理,通过代理对象,进程远程方法调用。 `RemoteInvoker.java` /** * 调用远程服务的代理类 */ @Slf4j public class RemoteInvoker implements InvocationHandler { private Class clazz; private Encoder encoder; private Decoder decoder; private TransportSelector selector; RemoteInvoker(Class clazz, Encoder encoder, Decoder decoder, TransportSelector selector) { this.clazz = clazz; this.encoder = encoder; this.decoder = decoder; this.selector = selector; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Request request = new Request(); request.setService(ServiceDescriptor.from(clazz, method)); request.setParams(args); Response response = invokeRemote(request); if (response == null || response.getCode() != 0) { throw new IllegalStateException("fail to invoke remote: " + response); } return response.getData(); } private Response invokeRemote(Request request) { TransportClient client = null; Response response = null; try { client = selector.select(); byte[] bytes = encoder.encode(request); InputStream in = client.write(new ByteArrayInputStream(bytes)); byte[] inBytes = IOUtils.readFully(in, in.available(), true); response = decoder.decode(inBytes, Response.class); } catch (Exception e) { log.error(e.getMessage(), e); response.setCode(-1); response.setMsg("RpcClient got error:" + e.getClass() + ":" + e.getMessage()); } finally { if (client != null) { selector.release(client); } } return response; } } #### 4.6 使用案例模块(example) #### > eg:使用rpc完成add方法的调用 调用接口和具体实现: public interface CalcService { int add(int a,int b); } public class CalcServiceImpl implements CalcService { @Override public int add(int a, int b) { return a+b; } } 客户端: public class Client { public static void main(String[] args) { RpcClient client = new RpcClient(); CalcService service = client.getProxy(CalcService.class); int result = service.add(1, 2); System.out.println(result); } } 服务器端: public class Server { public static void main(String[] args) { RpcServer server = new RpcServer(); server.register(CalcService.class, new CalcServiceImpl()); server.start(); } } 执行结果: ![在这里插入图片描述][20201108211855411.png_pic_center] **源码:** [https://github.com/wantao666/jrpc][https_github.com_wantao666_jrpc] **参考视频:** [自己动手实现RPC框架][RPC] [https_github.com_wantao666_jrpc]: https://github.com/wantao666/jrpc [RPC]: https://www.imooc.com/learn/1158 [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQ1NDUzMjY2_size_16_color_FFFFFF_t_70_pic_center]: /images/20221120/73102299c0574d1388a61e7ec359424e.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQ1NDUzMjY2_size_16_color_FFFFFF_t_70_pic_center 1]: /images/20221120/deb89e2174974731bb6f1689cf895271.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQ1NDUzMjY2_size_16_color_FFFFFF_t_70_pic_center 2]: /images/20221120/195eb52bc71c415788f43da18d68b323.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQ1NDUzMjY2_size_16_color_FFFFFF_t_70_pic_center 3]: /images/20221120/83cd7ae214c3421194d92776ba34ddaf.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQ1NDUzMjY2_size_16_color_FFFFFF_t_70_pic_center 4]: /images/20221120/f4583c4de3ba44abb939237469845eb8.png [20201108211855411.png_pic_center]: /images/20221120/90bb4032f0bc42eda0736c58fca1df1b.png
还没有评论,来说两句吧...