切换语言为:繁体

基于 Netty 实现一个自定义的简单的 RPC 框架

  • 爱糖宝
  • 2024-07-22
  • 2057
  • 0
  • 0

1、RPC(远程过程调用概述)

远程过程调用(RPC, Remote Procedure Call)是一种通过网络从远程计算机程序上请求服务,而无需了解网络细节的通信技术。在分布式系统中,RPC是一种常用的技术,能够简化客户端与服务器之间的交互。本文将介绍如何基于Netty(网络编程框架)实现一个自定义的简单的RPC框架。

首先简单介绍一下RPC 主要特点:

1.1、RPC远程过程调用的主要特点

•透明性: 调用方(客户端)调用远程服务就像调用本地API函数一样,而无需关心执行过程中的底层的网络通信细节。

•客户端-服务器模型:RPC通常基于客户端-服务器模型,客户端发送请求到服务器,服务器处理请求并返回结果。

•序列化及反序列化:RPC需要将请求参数序列化成字节流(即数据转换成网络可传输的格式)并通过网络传输到服务器端,服务器端接收到字节流后,需按照约定的协议将数据进行反序列化(即恢复成三原始格式)

•同步及异步调用:RPC支持同步、异步调用。同步调用会阻塞直到服务器返回结果,或超时、异常等。而异步调用则可以立即返回,通过注册一个回调函数,在有结果返回的时候再进行处理。从而让客户端可以继续执行其它操作。

•错误处理:PRC由于涉及网络通信,因此需要处理各种可能的网络异常,如网络故障,服务宕机,请求超时,服务重启、或上下线、扩缩容等,这些对调用方来说需要保持透明。

•协议及传输层:RPC可以基于多种协议和传输层实现,如HTTP、TCP等,本文采用的是基于TCP的自定义协议。

1.2、RPC的应用场景

•分布式系统:多个服务之间进行通信,如微服务框架。

•客户端-服务器架构:如移动应用与后台服务器的交互。

•跨平台调用:不同技术栈之间的服务调用。

•API服务:通过公开API对外提供功能,使用客户端能方便使用服务提供的功能,如支付网关,身份验证服务等。

•大数据处理:在大数据处理框架中,不同节点之间需要频繁通信来协调任务和交接数据,RPC可以提供高效的节点通信机制,如Hadoop 和Spark等大数据框架中节点间的通信。

•云计算:在云计算环境中,服务通常分布在多个虚拟机或容器中,通过RPC实现实现服务间的通信和管理。

•跨网络服务调用:当应用需要调用部署在不同网络中的服务时,RPC提供了一种简单而建议目前的调用方式,如。跨数据中心或嘴唇地域的服务调用。

1.3、常见的RPC框架

•JSF:京东开源的分布式服务框架,提供高性能、可扩展、稳定的服务治理能力,支持服务注册及发现,负载均衡、容错机制、服务监控、多种协议支持等。

•gRPC:基于HTTP/2和Protocol Buffers的高性能RPC框架,由Google开发。

•Dubbo:一个高性能、轻量级的Java RPC框架,用于提供基于接口的远程服务调用,支持负载均衡、服务自动注册及服务、容错等。

•JSON-RPC:使用JSON格式编码调用和结果的RPC协议。

•Apache Thrift:由Facebook开发,支持多种编程语言和协议

2、实现自定义的RPC

要实现一个自定义的RPC框架需解决以下几个主要问题:

1.客户端调用:客户端调用本地的代理函数(stub代码,这个函数负责将调用转换为RPC请求)。这其实就是一个接口描述文件,它可以有多种形式如JSON、XML、甚至是一份word文档或是口头约定均可,只要客户端及服务端都是遵守这份接口描述文件契约即可。在我们的实际开发中一种常见的方式是服务提供者发布一个包含服务接口类的 jar 包到 maven 中央仓库,调用方通过pom文件将之依赖到本地。

2.参数序列化:代理函数将调用参数进行序列化,并将请求发送到服务器。

3.服务端数据接收:服务器端接收到请求,并将其反序列化,恢复成原始参数。

4.执行远程过程:服务端调用实际的服务过程(函数)并获取结果。

5.返回结果:服务端将调用结果进行序列化,并通过网络传给客户端。

6.客户端接收调用结果:客户到接收到服务端传输的字节流,进行反序列化,转换为实际的结果数据格式,并返回到原始调用方。

下面需我们通过代码一一展示上述各功能是如何实现的。

2.1、自定义通信协议

本文的目的是要实现一个自定义通信协议的远程调用框架,所以首先要定义一个通信协议数据格式。

整个自定义协议总体上分为Header 及 Body Content两部分;Header 占16个字节,又分为4个部分。

前2位为魔法值用于Netty编解码组件,解决网络通信中的粘包、半包等问题,此处不展开细讲。

msgtype用于表示消息的类型,如request(请求)、respone(响应)、heartbeat(心跳)等。

code 占1位,表示请求的响应状态,成功还是失败。

request id占8位,表示请求的序列号,用于后续调用结果的匹配,保证线程内唯一。

body size 占4位,表示实现请求内容的长度,在反序化时读取此长度的内容字节,解析出正确的数据。

客户端、服务端在通信过程中都要按照上述约定的通信协议进行数据的编、解码工作。

2.2、客户端调用

2.2.1 客户端的使用

客户端一般通过接口代理工厂通过动态代理技术来生成一个代理实例,所有的远程调用中的细节,如参数序列化,网络传输,异常处理等都隐藏在代理实例中实现,对调用方来说调用过程是透明的,就像调用本地方法一样。

首先看一下客户端的使用方式,本文假设一个IShoppingCartService (购物车)的接口类,基中有一个方法根据传入的用户pin,返回购物车详情。

//接口方法
ShoppingCart shopping(String pin);

//客户端通过代理工厂实现接口的一个代理实例
IShoppingCartService serviceProxy = ProxyFactory.factory(IShoppingCartService.class)                
                .setSerializerType(SerializerType.JDK) //客户端设置所使用的序列化工具,此处为JDK原生
                .newProxyInstance(); //返回代理 实现

//像调用本地方法一样,调用此代理实例的shopping 方法
ShoppingCart result = serviceProxy.shopping("userPin");
log.info("result={}", JSONObject.toJSONString(result));

2.2.2、客户端代理工厂的核心功能
public class ProxyFactory<I> {
    //……省略
    /**
     * 代理对象
     *
     * @return
     */
    public I newProxyInstance() {     
        //服务的元数据信息
        ServiceData serviceData = new ServiceData(
                group, //分组
                providerName, //服务名称,一般为接口的class的全限定名称
                StringUtils.isNotBlank(version) ? version : "1.0.0" //版本号
        );

        //调用器
        Calller caller = newCaller().timeoutMillis(timeoutMillis);
        //集群策略,用于实现快速失败或失败转等功能
        Strategy strategy = StrategyConfigContext.of(strategy, retries);
        Object handler = null;
        switch (invokeType) {
            case "syncCall":
                //同步调用handler
                handler = new SyncCaller(serviceData, caller);
                break;
            case "asyncCall":
                //异步调用handler
                handler = new AsyncCaller(client.appName(), serviceData, caller, strategy);
                break;
            default:
                throw new RuntimeException("未知类型: " + invokeType);
        }

        //返回代理实例
        return ProxyEnum.getDefault().newProxy(interfaceClass, handler);
    }
    //……省略
}

代码 ProxyEnum.getDefault().newProxy(interfaceClass, handler) 返回一个具体的代理实例,此方法要求传入两个参数,interfaceClass 被代理的接口类class,即服务方所发布的服务接口类。

handler 为动态代理所需要代码增强逻辑,即所有的调用细节都由此增强类完成。按照动态代理的实现方式的不同,本文支持两种动态代理方式:

1.JDK动态代码,如采用此方式,handler 需要实现接口 InvocationHandler

2.ByteBuddy,它是一个用于在运行时生成、修改和操作Java类的库,允许开发者通过简单的API生成新的类或修改已有的类,而无需手动编写字节码,它广泛应用于框架开发、动态代理、字节码操作和类加载等领域。

本文默认采用第二种方式,通代码简单展示一下代理实例的的生成方式。

//方法newProxy 的具体实现
public <T> T newProxy(Class<T> interfaceType, Object handler) {
            Class<? extends T> cls = new ByteBuddy()
                     //生成接口的子类
                    .subclass(interfaceType) 
                     //默认代理接口中所有声明的方法
                    .method(ElementMatchers.isDeclaredBy(interfaceType))
                     //代码增强,即接口中所有被代理的方法都
                     //委托给用户自定义的handler处理,这也是动态代理的意义所在
                   .intercept(MethodDelegation.to(handler, "handlerInstance"))
                    .make()
                     //通过类加载器加载
                   .load(interfaceType.getClassLoader(), ClassLoadingStrategy.Default.INJECTION)
                    .getLoaded();

            try {
                //通过newInstance构建一个代理实例并返回    
               return cls.newInstance();
            } catch (Throwable t) {
                ……
            }
        }

本文以同步调用为例,现在展示一下 SyncInvoker 的具体实现逻辑

public class SyncCaller extends AbstractCaller {
    //……省略   
    /**
     * @RuntimeType 的作用提示ByteBuddy根据被拦截方法的实际类型,对此拦截器的返回值进行类型转换
     */
    @RuntimeType
    public Object syncCall(@Origin Method method, @AllArguments @RuntimeType Object[] args) throws Throwable {
        //封装请求的接口中的方法名及方法参数,组成一个request请求对象
        StarGateRequest request = createRequest(methodName, args);
        //集群容错策略调度器接口
        //提供快速失败,失败转移等策略供调用方选择,此处默认采用了快速失败的策略
        Invoker invoker = new FastFailInvoker();
        //returnType 的类型决定了泛型方法的实际结果类型,用于后续调用结果的类型转换
        Future<?> future = invoker.invoke(request, method.getReturnType());
        if (sync) {
            //同步调用,线程会阻塞在get方法,直到超时或结果可用
            Object result = future.getResult();
            return result;
        } else {
            return future;
        }
    }
}

//同步,异步调用的关键点就在于InvokeFuture,它继承了Java的CompletionStage类,用于异步编程

通过以上核心代码,客户端就完成了服务调用环节,下一步RPC框架需要将客户端请求的接口方法及方法参数进行序列化并通过网络进行传输。下面通过代码片段展示一下序列化的实现方式。

2.2.3、请求参数序列化

我们将请求参数序列化的目的就是将具体的请求参数转换成字节组,填充进入上述自定义协议的 body content 部分。下面通过代码演示一下如何进行反序列化。

本文默认采用JDK原生的对象序列化及反序列化框架,也可通过SPI技术扩展支持Protocol Buffers等。

//上述代码行Future<?> future = invoker.invoke(request, method.getReturnType());
//具体实现

public <T> Future<T> invoke(StarGateRequest request, Class<T> returnType) throws Exception {
        //对象序列化器,默认为JDK
        final Serializer _serializer = serializer();
        //message对象包含此次请求的接口名,方法名及实际参数列表
        final Message message = request.message();
        //通过软负载均衡选择一个 Netty channel
        Channel channel = selectChannel(message.getMetadata());
        byte code = _serializer.code();
        //将message对象序列成字节数组
        byte[] bytes = _serializer.writeObject(message);
        request.bytes(code, bytes);

        //数据写入 channel 并返回 future 约定,用于同步或异步获得调用结果
        return write(channel, request, returnType);
    }

//对象的序列化,JDK 原生方式
public <T> byte[] writeObject(T obj) {
        ByteArrayOutputStream buf = OutputStreams.getByteArrayOutputStream();
        try (ObjectOutputStream output = new ObjectOutputStream(buf)) {
            output.writeObject(obj);
            output.flush();
            return buf.toByteArray();
        } catch (IOException e) {
            ThrowUtil.throwException(e);
        } finally {
            OutputStreams.resetBuf(buf);
        }
        return null; 
    }

2.2.4、请求参数通过网络发送
//上述代码  write(channel, request, returnType);
//具体实现
protected <T> DefaultFuture<T> write(final Channel channel,
                                               final StarGateRequest request,
                                               final Class<T> returnType) {
        //……省略
        
        //调用结果占位 future对象,这也是promise编程模式
        final Future<T> future = DefaultFuture.newFuture(request.invokeId(), channel, timeoutMillis, returnType);
        
        //将请求负载对象写入Netty channel通道,并绑定监听器处理写入结果
        channel.writeAndFlush(request).addListener((ChannelFutureListener) listener -> {
            if (listener.isSuccess()) {
                //网络写入成功
                ……
            } else {
                //异常时,构造造调用结果,供调用方进行处理
                DefaultFuture.errorFuture(channel, response, dispatchType);
            }
        });
        
        //因为Netty 是非阻塞的,所以写入后可立刻返回
        return future;
    }

2.2.4.1、Netty 消息编码器

消息写入Netty channel 后,会依次经过 channel pipline 上所安装的各种handler处理,然后再通过物理网络将数据发送出去,这里展示了客户端及服务端所使用的自定义编、解解器

//自定义的编码器 继承自Netty 的 MessageToByteEncoder
public class StarGateEncoder extends MessageToByteEncoder<Payload> {

    //……省略
    
    private void doEncodeRequest(RequestPayload request, ByteBuf out) {
        byte sign = StarGateProtocolHeader.toSign(request.serializerCode(), StarGateProtocolHeader.REQUEST);
        long invokeId = request.invokeId();
        byte[] bytes = request.bytes();
        int length = bytes.length;

        out.writeShort(StarGateProtocolHeader.Head)  //写入两个字节
                .writeByte(sign)              //写入1个字节
                .writeByte(0x00)            //写入1个字节
                .writeLong(invokeId)          //写入8个节节
                .writeInt(length)             //写入4个字节
                .writeBytes(bytes);
    }

}

至此,通过上述核心代码,客户的请求已经按照自定义的协议格式进行了序列化,并把数据写入到Netty channel中,最后通过物理网络传输到服务器端。

2.3、服务端接收数据

2.3.1、消息解码器

服务器端接收到客户端的发送的数据后,需要进行正确的消息解码,下面是解码器的实现。

//消息解码器,继承自Netty 的ReplayingDecoder,将客户端请求解码为 RequestPayload 对象,供业务处理handler使用
public class StarGateDecoder extends ReplayingDecoder<StarGateDecoder.State> {

    //……省略
    
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        switch (state()) {
            case HEAD:
                checkMagic(in.readShort());         // HEAD
                checkpoint(State.HEAD);
            case SIGN:
                header.sign(in.readByte());         // 消息标志位
                checkpoint(State.STATUS);
            case STATUS:
                header.status(in.readByte());       // 状态位
                checkpoint(State.ID);
            case ID:
                header.id(in.readLong());           // 消息id
                checkpoint(State.BODY_SIZE);
            case BODY_SIZE:
                header.bodySize(in.readInt());      // 消息体长度
                checkpoint(State.BODY);
            case BODY:
                switch (header.messageCode()) {
                    //……省略
                    case StarGateProtocolHeader.REQUEST: {
                        //消息体长度信息
                        int length = checkBodySize(header.bodySize());
                        byte[] bytes = new byte[length];
                        //读取指定长度字节
                        in.readBytes(bytes);
                        //调用请求
                        RequestPayload request = new RequestPayload(header.id());
                        //设置序列化器编码,有效载荷
                        request.bytes(header.serializerCode(), bytes);
                        out.add(request);
                        break;
                    }
                    
                    default:
                        throw new Exception("错误标志位");
                }
                checkpoint(State.HEAD);
        }
    }

   //……省略
}

2.3.2、请求参数反序列化
//服务端 Netty channel pipline 上所安装的业务处理 handler
//业务处理handler 对RequestPayload 所携带的字节数组进行反序列化,解析出客户端所传递的实际参数
public class ServiceHandler extends ChannelInboundHandlerAdapter {

    //……省略
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        if (msg instanceof RequestPayload) {       
            StarGateRequest request = new StarGateRequest((RequestPayload) msg);
            //约定的反序列化器, 由客户端设置
            byte code = request.serializerCode();
            Serializer serializer = SerializerFactory.getSerializer(code);
            //实际请求参数字组数组
            byte[] bytes = payload.bytes();
            //对象反序列化
            Message message = serializer.readObject(bytes, Message.class);
            log.info("message={}", JSONObject.toJSONString(message));
       
            request.message(message);
            
            //业务处理
            process(message);
        } else {
            //引用释放
            ReferenceCountUtil.release(msg);
        }
    }

    //……省略
}

2.3.3、处理客户端请求

经过反序列化后,服务端可以知道用户所请求的是哪个接口、方法、以及实际的参数值,下一步就可进行真实的方法调用。

//处理调用
public void process(Message message) {         
   try {
       ServiceMetadata metadata = msg.getMetadata(); //客户端请求的元数据
       String providerName = metadata.getProviderName(); //服务名,即接口类名
       
       //根据接口类名,查找服务端实现此接口的类的全限定类名
       providerName = findServiceImpl(providerName);
       String methodName = msg.getMethodName();  //方法名
       Object[] args = msg.getArgs();    //客户设置的实际参数

       //线程上下文类加载器
       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
       //加载具体实现类
       Class<?> clazz = classLoader.loadClass(providerName);
       //创建接口类实例
       Object instance = clazz.getDeclaredConstructor().newInstance();

       Method method = null;
       Class<?>[] parameterTypes = new Class[args.length];
       for (int i = 0; i < args.length; i++) {
           parameterTypes[i] = args[i].getClass();
       }
       method = clazz.getMethod(methodName, parameterTypes);
       
       //反射调用 
       Object invokeResult = method.invoke(instance, args);
       } catch (Exception e) {
            log.error("调用异常:", e);
           throw new RuntimeException(e);
       }

       //处理同步调用结果
      doProcess(invokeResult);
        
}

2.3.4、 返回调用结果

通过反射调用接口实现类,获取调用结果,然后对结果进行序列化并包装成response响应消息,将消息写入到channel, 经过channel pipline 上所安装的编码器对消息对象进行编码,最后发送给调用客户端。

//处理同步调用结果,并将结果写回到 Netty channel
private void doProcess(Object realResult) {
        ResultWrapper result = new ResultWrapper();
        result.setResult(realResult);
        byte code = request.serializerCode();
        Serializer serializer = SerializerFactory.getSerializer(code);
        //new response 响应消息对象
        Response response = new Response(request.invokeId());
        //调用结果序列成字节数组
        byte[] bytes = serializer.writeObject(result);
        response.bytes(code, bytes);
        response.status(Status.OK.value());
        
        //响应消息对象 response 写入 Netty channel
        channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    log.info("响应成功");
                } else {
                    //记录调用失败日志
                    log.error("响应失败, channel: {}, cause: {}.", channel, channelFuture.cause());
                }
            }
        });
    }

同样的,消息写入channel 后,先依次经过 pipline 上所安装的消息编码器,再发送给客户端。具体编码方式同客户端编码器类似,此处不再赘述。

2.4、客户端接收调用结果

客户端收到服务端写入响应消息后,同样经过Netty channel pipline 上所安装的解码器,进行正确的解码。然后再对解码后的对象进行正确的反序列化,最终获得调用结果 。具体的解码,反序列化过程不再赘述,流程基本同上面服务端的解码及反序列化类似。

public class consumerHandler extends ChannelInboundHandlerAdapter {

    //……省略

    //客户端处理所接收到的消息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        if (msg instanceof ResponseMessage) {
            try {
                //类型转换
                ResponseMessage responseMessage= (ResponseMessage)msg
                StarGateResponse response = new StarGateResponse(ResponseMessage.getMsg());
                byte code = response.serializerCode();
                Serializer serializer = SerializerFactory.getSerializer(code);
                byte[] bytes = responseMessage.bytes();
                //反序列化成调用结果的包装类
                Result result = serializer.readObject(bytes, Result.class);
                response.result(result);

                //处理调用结果
                long invokeId = response.id();
                //通过 rnvokeid,从地缓存中拿到客户端调用的结果点位对象 futrue
                DefaultFuture<?> future = FUTURES_MAP.remove(invokeId);
                
                //判断调用是否成功
                byte status = response.status();
                if (status == Status.OK.value()) {
                    //对调用结果进行强制类型转换,并设置future结果,对阻塞在future.get()的客户端同步调用来说,调用返回。
                    complete((V) response.getResult());
                } else {
                    //todo 处理异常
                }

            } catch (Throwable t) {
                log.error("调用记录: {}, on {} #channelRead().", t, ch);
            }
        } else {
            log.warn("消息类型不匹配: {}, channel: {}.", msg.getClass(), ch);
            //计数器减1
            ReferenceCountUtil.release(msg);
        }
    }
    
}

下面再通过一个简单的调用时序图展示一下一次典型的Rpc调用所经历的步骤。



基于 Netty 实现一个自定义的简单的 RPC 框架

3、结尾

本文首先简单介绍了一下RPC的概念、应用场景及常用的RPC框架,然后讲述了一下如何自己手动实现一个RPC框架的基本功能。目的是想让大家对RPC框架的实现有一个大概思路,并对 Netty 这一高效网络编程框架有一个了解,通过对Netty 的编、解码器的学习,了解如何自定义一个私有的通信协议。限于篇幅本文只简单讲解了RPC的核心的调用逻辑的实现。真正生产可用的RPC框架还需要有更多复杂的功能,如限流、负载均衡、融断、降级、泛型调用、自动重连、自定义可扩展的拦截器等等。

另外RPC框架中一般有三种角色,服务提供者、服务消费者、注册中心,本文并没有介绍注册中心如何实现。并假定服务提供者已经将服务发布到了注册中心,服务消费者跟服务提供者之间建立起了TCP 长连接。

0条评论

您的电子邮件等信息不会被公开,以下所有项均必填

OK! You can skip this field.