切換語言為:簡體

基於 Netty 實現一個自定義的簡單的 RPC 框架

  • 爱糖宝
  • 2024-07-22
  • 2058
  • 0
  • 0

1、RPC(遠端過程呼叫概述)

遠端過程呼叫(RPC, Remote Procedure Call)是一種透過網路從遠端計算機程式上請求服務,而無需瞭解網路細節的通信技術。在分散式系統中,RPC是一種常用的技術,能夠簡化客戶端與伺服器之間的互動。本文將介紹如何基於Netty(網路程式設計框架)實現一個自定義的簡單的RPC框架。

首先簡單介紹一下RPC 主要特點:

1.1、RPC遠端過程呼叫的主要特點

•透明性: 呼叫方(客戶端)呼叫遠端服務就像呼叫本地API函式一樣,而無需關心執行過程中的底層的網路通訊細節。

•客戶端-伺服器模型:RPC通常基於客戶端-伺服器模型,客戶端傳送請求到伺服器,伺服器處理請求並返回結果。

•序列化及反序列化:RPC需要將請求引數序列化成位元組流(即資料轉換成網路可傳輸的格式)並透過網路傳輸到伺服器端,伺服器端接收到位元組流後,需按照約定的協議將資料進行反序列化(即恢復成三原始格式)

•同步及非同步呼叫:RPC支援同步、非同步呼叫。同步呼叫會阻塞直到伺服器返回結果,或超時、異常等。而非同步呼叫則可以立即返回,透過註冊一個回撥函式,在有結果返回的時候再進行處理。從而讓客戶端可以繼續執行其它操作。

•錯誤處理:PRC由於涉及網路通訊,因此需要處理各種可能的網路異常,如網路故障,服務宕機,請求超時,服務重啟、或上下線、擴縮容等,這些對呼叫方來說需要保持透明。

•協議及傳輸層:RPC可以基於多種協議和傳輸層實現,如HTTP、TCP等,本文采用的是基於TCP的自定義協議。

1.2、RPC的應用場景

•分散式系統:多個服務之間進行通訊,如微服務框架。

•客戶端-伺服器架構:如移動應用與後臺伺服器的互動。

•跨平臺呼叫:不同技術棧之間的服務呼叫。

•API服務:透過公開API對外提供功能,使用客戶端能方便使用服務提供的功能,如支付閘道器,身份驗證服務等。

•大數據處理:在大數據處理框架中,不同節點之間需要頻繁通訊來協調任務和交接資料,RPC可以提供高效的節點通訊機制,如Hadoop 和Spark等大資料框架中節點間的通訊。

•雲端計算:在雲端計算環境中,服務通常分佈在多個虛擬機器或容器中,透過RPC實現實現服務間的通訊和管理。

•跨網路服務呼叫:當應用需要呼叫部署在不同網路中的服務時,RPC提供了一種簡單而建議目前的呼叫方式,如。跨資料中心或嘴唇地域的服務呼叫。

1.3、常見的RPC框架

•JSF:京東開源的分散式服務框架,提供高效能、可擴充套件、穩定的服務治理能力,支援服務註冊及發現,負載均衡、容錯機制、服務監控、多種協議支援等。

•gRPC:基於HTTP/2和Protocol Buffers的高效能RPC框架,由Google開發。

•Dubbo:一個高效能、輕量級的Java RPC框架,用於提供基於介面的遠端服務呼叫,支援負載均衡、服務自動註冊及服務、容錯等。

•JSON-RPC:使用JSON格式編碼呼叫和結果的RPC協議。

•Apache Thrift:由Facebook開發,支援多種程式語言和協議

2、實現自定義的RPC

要實現一個自定義的RPC框架需解決以下幾個主要問題:

1.客戶端呼叫:客戶端呼叫本地的代理函式(stub程式碼,這個函式負責將呼叫轉換為RPC請求)。這其實就是一個介面描述檔案,它可以有多種形式如JSON、XML、甚至是一份word文件或是口頭約定均可,只要客戶端及服務端都是遵守這份介面描述檔案契約即可。在我們的實際開發中一種常見的方式是服務提供者釋出一個包含服務介面類的 jar 包到 maven 中央倉庫,呼叫方透過pom檔案將之依賴到本地。

2.引數序列化:代理函式將呼叫引數進行序列化,並將請求傳送到伺服器。

3.服務端資料接收:伺服器端接收到請求,並將其反序列化,恢復成原始引數。

4.執行遠端過程:服務端呼叫實際的服務過程(函式)並獲取結果。

5.返回結果:服務端將呼叫結果進行序列化,並透過網路傳給客戶端。

6.客戶端接收呼叫結果:客戶到接收到服務端傳輸的位元組流,進行反序列化,轉換為實際的結果資料格式,並返回到原始呼叫方。

下面需我們透過程式碼一一展示上述各功能是如何實現的。

2.1、自定義通訊協議

本文的目的是要實現一個自定義通訊協議的遠端呼叫框架,所以首先要定義一個通訊協議資料格式。

整個自定義協議總體上分為Header 及 Body Content兩部分;Header 佔16個位元組,又分為4個部分。

前2位為魔法值用於Netty編解碼元件,解決網路通訊中的粘包、半包等問題,此處不展開細講。

msgtype用於表示訊息的型別,如request(請求)、respone(響應)、heartbeat(心跳)等。

code 佔1位,表示請求的響應狀態,成功還是失敗。

request id佔8位,表示請求的序列號,用於後續呼叫結果的匹配,保證執行緒內唯一。

body size 佔4位,表示實現請求內容的長度,在反序化時讀取此長度的內容位元組,解析出正確的資料。

客戶端、服務端在通訊過程中都要按照上述約定的通訊協議進行資料的編、解碼工作。

2.2、客戶端呼叫

2.2.1 客戶端的使用

客戶端一般透過介面代理工廠透過動態代理技術來生成一個代理例項,所有的遠端呼叫中的細節,如引數序列化,網路傳輸,異常處理等都隱藏在代理例項中實現,對呼叫方來說呼叫過程是透明的,就像呼叫本地方法一樣。

首先看一下客戶端的使用方式,本文假設一個IShoppingCartService (購物車)的介面類,基中有一個方法根據傳入的使用者pin,返回購物車詳情。

//介面方法
ShoppingCart shopping(String pin);

//客戶端透過代理工廠實現介面的一個代理例項
IShoppingCartService serviceProxy = ProxyFactory.factory(IShoppingCartService.class)                
                .setSerializerType(SerializerType.JDK) //客戶端設定所使用的序列化工具,此處為JDK原生
                .newProxyInstance(); //返回代理 實現

//像呼叫本地方法一樣,呼叫此代理例項的shopping 方法
ShoppingCart result = serviceProxy.shopping("userPin");
log.info("result={}", JSONObject.toJSONString(result));

2.2.2、客戶端代理工廠的核心功能
public class ProxyFactory<I> {
    //……省略
    /**
     * 代理物件
     *
     * @return
     */
    public I newProxyInstance() {     
        //服務的後設資料資訊
        ServiceData serviceData = new ServiceData(
                group, //分組
                providerName, //服務名稱,一般為介面的class的全限定名稱
                StringUtils.isNotBlank(version) ? version : "1.0.0" //版本號
        );

        //呼叫器
        Calller caller = newCaller().timeoutMillis(timeoutMillis);
        //叢集策略,用於實現快速失敗或失敗轉等功能
        Strategy strategy = StrategyConfigContext.of(strategy, retries);
        Object handler = null;
        switch (invokeType) {
            case "syncCall":
                //同步呼叫handler
                handler = new SyncCaller(serviceData, caller);
                break;
            case "asyncCall":
                //非同步呼叫handler
                handler = new AsyncCaller(client.appName(), serviceData, caller, strategy);
                break;
            default:
                throw new RuntimeException("未知型別: " + invokeType);
        }

        //返回代理例項
        return ProxyEnum.getDefault().newProxy(interfaceClass, handler);
    }
    //……省略
}

程式碼 ProxyEnum.getDefault().newProxy(interfaceClass, handler) 返回一個具體的代理例項,此方法要求傳入兩個引數,interfaceClass 被代理的介面類class,即服務方所釋出的服務介面類。

handler 為動態代理所需要程式碼增強邏輯,即所有的呼叫細節都由此增強類完成。按照動態代理的實現方式的不同,本文支援兩種動態代理方式:

1.JDK動態程式碼,如採用此方式,handler 需要實現介面 InvocationHandler

2.ByteBuddy,它是一個用於在執行時生成、修改和操作Java類的庫,允許開發者透過簡單的API生成新的類或修改已有的類,而無需手動編寫位元組碼,它廣泛應用於框架開發、動態代理、位元組碼操作和類載入等領域。

本文預設採用第二種方式,通程式碼簡單展示一下代理例項的的生成方式。

//方法newProxy 的具體實現
public <T> T newProxy(Class<T> interfaceType, Object handler) {
            Class<? extends T> cls = new ByteBuddy()
                     //生成介面的子類
                    .subclass(interfaceType) 
                     //預設代理介面中所有宣告的方法
                    .method(ElementMatchers.isDeclaredBy(interfaceType))
                     //程式碼增強,即介面中所有被代理的方法都
                     //委託給使用者自定義的handler處理,這也是動態代理的意義所在
                   .intercept(MethodDelegation.to(handler, "handlerInstance"))
                    .make()
                     //透過類載入器載入
                   .load(interfaceType.getClassLoader(), ClassLoadingStrategy.Default.INJECTION)
                    .getLoaded();

            try {
                //透過newInstance構建一個代理例項並返回    
               return cls.newInstance();
            } catch (Throwable t) {
                ……
            }
        }

本文以同步呼叫為例,現在展示一下 SyncInvoker 的具體實現邏輯

public class SyncCaller extends AbstractCaller {
    //……省略   
    /**
     * @RuntimeType 的作用提示ByteBuddy根據被攔截方法的實際型別,對此攔截器的返回值進行型別轉換
     */
    @RuntimeType
    public Object syncCall(@Origin Method method, @AllArguments @RuntimeType Object[] args) throws Throwable {
        //封裝請求的介面中的方法名及方法引數,組成一個request請求物件
        StarGateRequest request = createRequest(methodName, args);
        //叢集容錯策略排程器介面
        //提供快速失敗,失敗轉移等策略供呼叫方選擇,此處預設採用了快速失敗的策略
        Invoker invoker = new FastFailInvoker();
        //returnType 的型別決定了泛型方法的實際結果型別,用於後續呼叫結果的型別轉換
        Future<?> future = invoker.invoke(request, method.getReturnType());
        if (sync) {
            //同步呼叫,執行緒會阻塞在get方法,直到超時或結果可用
            Object result = future.getResult();
            return result;
        } else {
            return future;
        }
    }
}

//同步,非同步呼叫的關鍵點就在於InvokeFuture,它繼承了Java的CompletionStage類,用於非同步程式設計

透過以上核心程式碼,客戶端就完成了服務呼叫環節,下一步RPC框架需要將客戶端請求的介面方法及方法引數進行序列化並透過網路進行傳輸。下面透過程式碼片段展示一下序列化的實現方式。

2.2.3、請求引數序列化

我們將請求引數序列化的目的就是將具體的請求引數轉換成位元組組,填充進入上述自定義協議的 body content 部分。下面透過程式碼演示一下如何進行反序列化。

本文預設採用JDK原生的物件序列化及反序列化框架,也可透過SPI技術擴充套件支援Protocol Buffers等。

//上述程式碼行Future<?> future = invoker.invoke(request, method.getReturnType());
//具體實現

public <T> Future<T> invoke(StarGateRequest request, Class<T> returnType) throws Exception {
        //物件序列化器,預設為JDK
        final Serializer _serializer = serializer();
        //message物件包含此次請求的介面名,方法名及實際引數列表
        final Message message = request.message();
        //透過軟負載均衡選擇一個 Netty channel
        Channel channel = selectChannel(message.getMetadata());
        byte code = _serializer.code();
        //將message物件序列成位元組陣列
        byte[] bytes = _serializer.writeObject(message);
        request.bytes(code, bytes);

        //資料寫入 channel 並返回 future 約定,用於同步或非同步獲得呼叫結果
        return write(channel, request, returnType);
    }

//物件的序列化,JDK 原生方式
public <T> byte[] writeObject(T obj) {
        ByteArrayOutputStream buf = OutputStreams.getByteArrayOutputStream();
        try (ObjectOutputStream output = new ObjectOutputStream(buf)) {
            output.writeObject(obj);
            output.flush();
            return buf.toByteArray();
        } catch (IOException e) {
            ThrowUtil.throwException(e);
        } finally {
            OutputStreams.resetBuf(buf);
        }
        return null; 
    }

2.2.4、請求引數透過網路傳送
//上述程式碼  write(channel, request, returnType);
//具體實現
protected <T> DefaultFuture<T> write(final Channel channel,
                                               final StarGateRequest request,
                                               final Class<T> returnType) {
        //……省略
        
        //呼叫結果佔位 future物件,這也是promise程式設計模式
        final Future<T> future = DefaultFuture.newFuture(request.invokeId(), channel, timeoutMillis, returnType);
        
        //將請求負載物件寫入Netty channel通道,並繫結監聽器處理寫入結果
        channel.writeAndFlush(request).addListener((ChannelFutureListener) listener -> {
            if (listener.isSuccess()) {
                //網路寫入成功
                ……
            } else {
                //異常時,構造造呼叫結果,供呼叫方進行處理
                DefaultFuture.errorFuture(channel, response, dispatchType);
            }
        });
        
        //因為Netty 是非阻塞的,所以寫入後可立刻返回
        return future;
    }

2.2.4.1、Netty 訊息編碼器

訊息寫入Netty channel 後,會依次經過 channel pipline 上所安裝的各種handler處理,然後再透過物理網路將資料傳送出去,這裏展示了客戶端及服務端所使用的自定義編、解解器

//自定義的編碼器 繼承自Netty 的 MessageToByteEncoder
public class StarGateEncoder extends MessageToByteEncoder<Payload> {

    //……省略
    
    private void doEncodeRequest(RequestPayload request, ByteBuf out) {
        byte sign = StarGateProtocolHeader.toSign(request.serializerCode(), StarGateProtocolHeader.REQUEST);
        long invokeId = request.invokeId();
        byte[] bytes = request.bytes();
        int length = bytes.length;

        out.writeShort(StarGateProtocolHeader.Head)  //寫入兩個位元組
                .writeByte(sign)              //寫入1個位元組
                .writeByte(0x00)            //寫入1個位元組
                .writeLong(invokeId)          //寫入8個節節
                .writeInt(length)             //寫入4個位元組
                .writeBytes(bytes);
    }

}

至此,透過上述核心程式碼,客戶的請求已經按照自定義的協議格式進行了序列化,並把資料寫入到Netty channel中,最後透過物理網路傳輸到伺服器端。

2.3、服務端接收資料

2.3.1、訊息解碼器

伺服器端接收到客戶端的傳送的資料後,需要進行正確的訊息解碼,下面是解碼器的實現。

//訊息解碼器,繼承自Netty 的ReplayingDecoder,將客戶端請求解碼為 RequestPayload 物件,供業務處理handler使用
public class StarGateDecoder extends ReplayingDecoder<StarGateDecoder.State> {

    //……省略
    
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        switch (state()) {
            case HEAD:
                checkMagic(in.readShort());         // HEAD
                checkpoint(State.HEAD);
            case SIGN:
                header.sign(in.readByte());         // 訊息標誌位
                checkpoint(State.STATUS);
            case STATUS:
                header.status(in.readByte());       // 狀態位
                checkpoint(State.ID);
            case ID:
                header.id(in.readLong());           // 訊息id
                checkpoint(State.BODY_SIZE);
            case BODY_SIZE:
                header.bodySize(in.readInt());      // 訊息體長度
                checkpoint(State.BODY);
            case BODY:
                switch (header.messageCode()) {
                    //……省略
                    case StarGateProtocolHeader.REQUEST: {
                        //訊息體長度資訊
                        int length = checkBodySize(header.bodySize());
                        byte[] bytes = new byte[length];
                        //讀取指定長度位元組
                        in.readBytes(bytes);
                        //呼叫請求
                        RequestPayload request = new RequestPayload(header.id());
                        //設定序列化器編碼,有效載荷
                        request.bytes(header.serializerCode(), bytes);
                        out.add(request);
                        break;
                    }
                    
                    default:
                        throw new Exception("錯誤標誌位");
                }
                checkpoint(State.HEAD);
        }
    }

   //……省略
}

2.3.2、請求引數反序列化
//服務端 Netty channel pipline 上所安裝的業務處理 handler
//業務處理handler 對RequestPayload 所攜帶的位元組陣列進行反序列化,解析出客戶端所傳遞的實際引數
public class ServiceHandler extends ChannelInboundHandlerAdapter {

    //……省略
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        if (msg instanceof RequestPayload) {       
            StarGateRequest request = new StarGateRequest((RequestPayload) msg);
            //約定的反序列化器, 由客戶端設定
            byte code = request.serializerCode();
            Serializer serializer = SerializerFactory.getSerializer(code);
            //實際請求引數字組陣列
            byte[] bytes = payload.bytes();
            //物件反序列化
            Message message = serializer.readObject(bytes, Message.class);
            log.info("message={}", JSONObject.toJSONString(message));
       
            request.message(message);
            
            //業務處理
            process(message);
        } else {
            //引用釋放
            ReferenceCountUtil.release(msg);
        }
    }

    //……省略
}

2.3.3、處理客戶端請求

經過反序列化後,服務端可以知道使用者所請求的是哪個介面、方法、以及實際的引數值,下一步就可進行真實的方法呼叫。

//處理呼叫
public void process(Message message) {         
   try {
       ServiceMetadata metadata = msg.getMetadata(); //客戶端請求的後設資料
       String providerName = metadata.getProviderName(); //服務名,即介面類名
       
       //根據介面類名,查詢服務端實現此介面的類的全限定類名
       providerName = findServiceImpl(providerName);
       String methodName = msg.getMethodName();  //方法名
       Object[] args = msg.getArgs();    //客戶設定的實際引數

       //執行緒上下文類載入器
       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
       //載入具體實現類
       Class<?> clazz = classLoader.loadClass(providerName);
       //建立介面類例項
       Object instance = clazz.getDeclaredConstructor().newInstance();

       Method method = null;
       Class<?>[] parameterTypes = new Class[args.length];
       for (int i = 0; i < args.length; i++) {
           parameterTypes[i] = args[i].getClass();
       }
       method = clazz.getMethod(methodName, parameterTypes);
       
       //反射呼叫 
       Object invokeResult = method.invoke(instance, args);
       } catch (Exception e) {
            log.error("呼叫異常:", e);
           throw new RuntimeException(e);
       }

       //處理同步呼叫結果
      doProcess(invokeResult);
        
}

2.3.4、 返回呼叫結果

透過反射呼叫介面實現類,獲取呼叫結果,然後對結果進行序列化幷包裝成response響應訊息,將訊息寫入到channel, 經過channel pipline 上所安裝的編碼器對訊息物件進行編碼,最後傳送給呼叫客戶端。

//處理同步呼叫結果,並將結果寫回到 Netty channel
private void doProcess(Object realResult) {
        ResultWrapper result = new ResultWrapper();
        result.setResult(realResult);
        byte code = request.serializerCode();
        Serializer serializer = SerializerFactory.getSerializer(code);
        //new response 響應訊息物件
        Response response = new Response(request.invokeId());
        //呼叫結果序列成位元組陣列
        byte[] bytes = serializer.writeObject(result);
        response.bytes(code, bytes);
        response.status(Status.OK.value());
        
        //響應訊息物件 response 寫入 Netty channel
        channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    log.info("響應成功");
                } else {
                    //記錄呼叫失敗日誌
                    log.error("響應失敗, channel: {}, cause: {}.", channel, channelFuture.cause());
                }
            }
        });
    }

同樣的,訊息寫入channel 後,先依次經過 pipline 上所安裝的訊息編碼器,再發送給客戶端。具體編碼方式同客戶端編碼器類似,此處不再贅述。

2.4、客戶端接收呼叫結果

客戶端收到服務端寫入響應訊息後,同樣經過Netty channel pipline 上所安裝的解碼器,進行正確的解碼。然後再對解碼後的物件進行正確的反序列化,最終獲得呼叫結果 。具體的解碼,反序列化過程不再贅述,流程基本同上麵服務端的解碼及反序列化類似。

public class consumerHandler extends ChannelInboundHandlerAdapter {

    //……省略

    //客戶端處理所接收到的訊息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        if (msg instanceof ResponseMessage) {
            try {
                //型別轉換
                ResponseMessage responseMessage= (ResponseMessage)msg
                StarGateResponse response = new StarGateResponse(ResponseMessage.getMsg());
                byte code = response.serializerCode();
                Serializer serializer = SerializerFactory.getSerializer(code);
                byte[] bytes = responseMessage.bytes();
                //反序列化成呼叫結果的包裝類
                Result result = serializer.readObject(bytes, Result.class);
                response.result(result);

                //處理呼叫結果
                long invokeId = response.id();
                //透過 rnvokeid,從地快取中拿到客戶端呼叫的結果點位物件 futrue
                DefaultFuture<?> future = FUTURES_MAP.remove(invokeId);
                
                //判斷呼叫是否成功
                byte status = response.status();
                if (status == Status.OK.value()) {
                    //對呼叫結果進行強制型別轉換,並設定future結果,對阻塞在future.get()的客戶端同步呼叫來說,呼叫返回。
                    complete((V) response.getResult());
                } else {
                    //todo 處理異常
                }

            } catch (Throwable t) {
                log.error("呼叫記錄: {}, on {} #channelRead().", t, ch);
            }
        } else {
            log.warn("訊息型別不匹配: {}, channel: {}.", msg.getClass(), ch);
            //計數器減1
            ReferenceCountUtil.release(msg);
        }
    }
    
}

下面再透過一個簡單的呼叫時序圖展示一下一次典型的Rpc呼叫所經歷的步驟。



基於 Netty 實現一個自定義的簡單的 RPC 框架

3、結尾

本文首先簡單介紹了一下RPC的概念、應用場景及常用的RPC框架,然後講述了一下如何自己手動實現一個RPC框架的基本功能。目的是想讓大家對RPC框架的實現有一個大概思路,並對 Netty 這一高效網路程式設計框架有一個瞭解,透過對Netty 的編、解碼器的學習,瞭解如何自定義一個私有的通訊協議。限於篇幅本文只簡單講解了RPC的核心的呼叫邏輯的實現。真正生產可用的RPC框架還需要有更多複雜的功能,如限流、負載均衡、融斷、降級、泛型呼叫、自動重連、自定義可擴充套件的攔截器等等。

另外RPC框架中一般有三種角色,服務提供者、服務消費者、註冊中心,本文並沒有介紹註冊中心如何實現。並假定服務提供者已經將服務釋出到了註冊中心,服務消費者跟服務提供者之間建立起了TCP 長連線。

0則評論

您的電子郵件等資訊不會被公開,以下所有項目均必填

OK! You can skip this field.