gRPC 中泛化调用服务接口

野性酷女 2022-10-22 02:58 403阅读 0赞

gRPC 中泛化调用服务接口

gRPC 没有直接支持泛化调用,protobuf 可以不依赖于生成的代码实现调用,所以可以通过反射接口间接实现泛化调用

要求 Server 端提供 grpc.reflection.v1alpha.ServerReflection 服务,用于获取服务的描述文件

大致的流程是:

  1. 根据方法名称,调用服务端反射服务的方法,获取方法所在 proto 文件的描述
  2. 根据 proto 描述文件,获取文件描述、服务描述,用于重新构建要被调用方法的方法描述 MethodDescriptor
  3. 根据方法描述,将请求内容序列化为对应的类型
  4. 使用重新构建的MethodDescriptor和其他参数对 Server 端相应的方法发起调用
  5. 解析响应并返回

实现

使用 JSON 格式请求被调用的服务方法,并返回 JSON 格式的响应

proto 定义

  1. syntax = "proto3";
  2. package io.github.helloworlde.grpc;
  3. option go_package = "api;grpc_gateway";
  4. option java_package = "io.github.helloworlde.grpc";
  5. option java_multiple_files = true;
  6. option java_outer_classname = "HelloWorldGrpc";
  7. service HelloService{
  8. rpc SayHello(HelloMessage) returns (HelloResponse){
  9. }
  10. }
  11. message HelloMessage {
  12. string message = 2;
  13. }
  14. message HelloResponse {
  15. string message = 1;
  16. }

调用

1. 构建反射服务 Stub

需要调用反射服务的方法,该方法是双向流

  1. // 构建 Channel
  2. ManagedChannel channel=ManagedChannelBuilder.forAddress("127.0.0.1",9090)
  3. .usePlaintext()
  4. .build();
  5. // 使用 Channel 构建 BlockingStub
  6. ServerReflectionGrpc.ServerReflectionStub reflectionStub=ServerReflectionGrpc.newStub(channel);
  7. // 响应观察器
  8. StreamObserver<ServerReflectionResponse> streamObserver=new StreamObserver<ServerReflectionResponse>(){
  9. @Override
  10. public void onNext(ServerReflectionResponse response){
  11. // 处理响应
  12. }
  13. @Override
  14. public void onError(Throwable t){
  15. }
  16. @Override
  17. public void onCompleted(){
  18. log.info("Complete");
  19. }
  20. };
  21. // 请求观察器
  22. StreamObserver<ServerReflectionRequest> requestStreamObserver=reflectionStub.serverReflectionInfo(streamObserver);

2. 根据方法名称获取文件描述

这里的 methodSymbol 即服务或方法的限定名,可以是 package.service 或者 package.service.method
,如 io.github.helloworlde.grpc.HelloService.SayHello,需要注意方法前是 .不是/

  1. // 构建并发送获取方法文件描述请求
  2. ServerReflectionRequest getFileContainingSymbolRequest=ServerReflectionRequest.newBuilder()
  3. .setFileContainingSymbol(methodSymbol)
  4. .build();
  5. requestStreamObserver.onNext(getFileContainingSymbolRequest);

3. 处理响应,解析 FileDescriptor

返回的响应后会触发 onNext 方法,如果响应类型是文件描述类型,即 FILE_DESCRIPTOR_RESPONSE,则进行处理

  1. public void onNext(ServerReflectionResponse response) {
  2. try {
  3. // 只需要关注文件描述类型的响应
  4. if (response.getMessageResponseCase() == ServerReflectionResponse.MessageResponseCase.FILE_DESCRIPTOR_RESPONSE) {
  5. List<ByteString> fileDescriptorProtoList = response.getFileDescriptorResponse().getFileDescriptorProtoList();
  6. handleResponse(fileDescriptorProtoList, channel, methodSymbol, requestContent);
  7. } else {
  8. log.warn("未知响应类型: " + response.getMessageResponseCase());
  9. }
  10. } catch (Exception e) {
  11. log.error("处理响应失败: {}", e.getMessage(), e);
  12. }
  13. }
  • handleResponse

在处理请求时,先解析了包名、服务名和方法名,然后根据包名和服务名,从返回的文件描述中获取到了响应方法所在文件的描述;然后从文件描述中获取服务描述,最终获取到方法描述,根据方法描述执行调用

  1. private static void handleResponse(List<ByteString> fileDescriptorProtoList,
  2. ManagedChannel channel,
  3. String methodFullName,
  4. String requestContent) {
  5. try {
  6. // 解析方法和服务名称
  7. String fullServiceName = extraPrefix(methodFullName);
  8. String methodName = extraSuffix(methodFullName);
  9. String packageName = extraPrefix(fullServiceName);
  10. String serviceName = extraSuffix(fullServiceName);
  11. // 根据响应解析 FileDescriptor
  12. Descriptors.FileDescriptor fileDescriptor = getFileDescriptor(fileDescriptorProtoList, packageName, serviceName);
  13. // 查找服务描述
  14. Descriptors.ServiceDescriptor serviceDescriptor = fileDescriptor.getFile().findServiceByName(serviceName);
  15. // 查找方法描述
  16. Descriptors.MethodDescriptor methodDescriptor = serviceDescriptor.findMethodByName(methodName);
  17. // 发起请求
  18. executeCall(channel, fileDescriptor, methodDescriptor, requestContent);
  19. } catch (Exception e) {
  20. log.error(e.getMessage(), e);
  21. }
  22. }
  • getFileDescriptor

根据响应找到方法对应的文件的 FileDescriptorProto,然后构建出对应的 FileDescriptor

  1. private static Descriptors.FileDescriptor getFileDescriptor(List<ByteString> fileDescriptorProtoList,
  2. String packageName,
  3. String serviceName) throws Exception {
  4. Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap =
  5. fileDescriptorProtoList.stream()
  6. .map(bs -> {
  7. try {
  8. return DescriptorProtos.FileDescriptorProto.parseFrom(bs);
  9. } catch (InvalidProtocolBufferException e) {
  10. e.printStackTrace();
  11. }
  12. return null;
  13. })
  14. .filter(Objects::nonNull)
  15. .collect(Collectors.toMap(DescriptorProtos.FileDescriptorProto::getName, f -> f));
  16. if (fileDescriptorProtoMap.isEmpty()) {
  17. log.error("服务不存在");
  18. throw new IllegalArgumentException("方法的文件描述不存在");
  19. }
  20. // 查找服务对应的 Proto 描述
  21. DescriptorProtos.FileDescriptorProto fileDescriptorProto = findServiceFileDescriptorProto(packageName, serviceName, fileDescriptorProtoMap);
  22. // 获取这个 Proto 的依赖
  23. Descriptors.FileDescriptor[] dependencies = getDependencies(fileDescriptorProto, fileDescriptorProtoMap);
  24. // 生成 Proto 的 FileDescriptor
  25. return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies);
  26. }

4. 执行调用

  • 生成方法描述

在执行调用时,需要重新生成 MethodDescriptor;因为获取到的 MethodDescriptor 中的方法全名是package.service.method
格式,而需要的是package.service/method格式,同时请求和响应类型也需要重新设置为 DynamicMessage,所以需要重新生成 MethodDescriptor

  1. private static MethodDescriptor<DynamicMessage, DynamicMessage> generateMethodDescriptor(Descriptors.MethodDescriptor originMethodDescriptor) {
  2. // 生成方法全名
  3. String fullMethodName = MethodDescriptor.generateFullMethodName(originMethodDescriptor.getService().getFullName(), originMethodDescriptor.getName());
  4. // 请求和响应类型
  5. MethodDescriptor.Marshaller<DynamicMessage> inputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getInputType())
  6. .buildPartial());
  7. MethodDescriptor.Marshaller<DynamicMessage> outputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getOutputType())
  8. .buildPartial());
  9. // 生成方法描述, originMethodDescriptor 的 fullMethodName 不正确
  10. return MethodDescriptor.<DynamicMessage, DynamicMessage>newBuilder()
  11. .setFullMethodName(fullMethodName)
  12. .setRequestMarshaller(inputTypeMarshaller)
  13. .setResponseMarshaller(outputTypeMarshaller)
  14. // 使用 UNKNOWN,自动修改
  15. .setType(MethodDescriptor.MethodType.UNKNOWN)
  16. .build();
  17. }
  • 执行调用

同时需要根据文件描述,将请求的类型转为对应的请求类型,生成 DynamicMessage 对象;然后根据方法类型,使用MethodDescriptorCallOptions
发起请求;当接收到响应后将 DynamicMessage 解析为对应的格式的字符串;完成调用


参考文档

  • 相关实现代码参考 ReflectionCall.java
  • protobuf-dynamic
  • grpcurl
  • grpc-swagger
  • gRPC + JSON
  • gRPC Server Reflection Tutorial
  • Reflection
  • gRPC Server Reflection Tutorial
  • Protocol buffer objects generated at runtime
  • How can I send a gRPC message whose format is determined at runtime
  • How to create GRPC client directly from protobuf without compiling it into java code

发表评论

表情:
评论列表 (有 0 条评论,403人围观)

还没有评论,来说两句吧...

相关阅读

    相关 dubbo调用

    dubbo泛化调用 一、前言 > 泛接口调用方式主要用于客户端没有API接口及模型类元的情况,参数及返回值中的所有POJO均用Map表示,通常用于框架集成,比如:实