切換語言為:簡體
從多執行緒設計模式到對 CompletableFuture 的應用

從多執行緒設計模式到對 CompletableFuture 的應用

  • 爱糖宝
  • 2024-06-15
  • 2066
  • 0
  • 0

最近在開發 延保服務 頻道頁時,爲了提高查詢效率,使用到了多執行緒技術。爲了對多執行緒方案設計有更加充分的瞭解,在業餘時間讀完了《圖解 Java 多執行緒設計模式》這本書,覺得收穫良多。本篇文章將介紹其中提到的 Future 模式,以及在實際業務開發中對該模式的應用,而這些內容對於本書來說只是冰山一角,還是推薦大家有時間去閱讀原書。

1. Future 模式:“先給您提貨單”

我們先來看一個場景:假如我們去蛋糕店買蛋糕,下單後,店員會遞給我們提貨單並告知“請您傍晚來取蛋糕”。到了傍晚我們拿著提貨單去取蛋糕,店員會先和我們說“您的蛋糕已經做好了”,然後將蛋糕拿給我們。

如果將下單蛋糕到取蛋糕的過程抽象成一個方法的話,那麼意味著這個方法需要花很長的時間才能獲取執行結果,與其一直等待結果,不如先拿著一張“提貨單”,到我們需要取貨的時候,再透過它去取,而獲取“提貨單”的過程是幾乎不耗時的,而這個提貨單物件就被稱為 Future,後續便可以透過它來獲取方法的返回值。用 Java 來表示這個過程的話,需要使用到 FutureTask 和  Callable 兩個類,如下:

public class Example {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 預定蛋糕,並定義“提貨單”
        System.out.println("我:預定蛋糕");
        FutureTask<String> future = new FutureTask<>(() -> {
            System.out.println("店員:請您傍晚來取蛋糕");
            Thread.sleep(2000);
            System.out.println("店員:您的蛋糕已經做好了");

            return "Holiland";
        });
        // 開始做蛋糕
        new Thread(future).start();

        // 去做其他事情
        Thread.sleep(1000);
        System.out.println("我:忙碌中...");
        // 取蛋糕
        System.out.println("我:取蛋糕 " + future.get());
    }
}

// 執行結果:
// 我:預定蛋糕
// 店員:請您傍晚來取蛋糕
// 我:忙碌中...
// 店員:您的蛋糕已經做好了
// 我:取蛋糕 Holiland

方法的呼叫者可以將任務交給其他執行緒去處理,無需阻塞等待方法的執行,這樣呼叫者便可以繼續執行其他任務,並能透過 Future 物件獲取執行結果。

它的執行原理如下:建立 FutureTask 例項時,Callable 物件會被傳遞給建構函式,當執行緒呼叫 FutureTaskrun 方法時,Callable 物件的 call 方法也會被執行。呼叫 call 方法的執行緒會同步地獲取結果,並透過 FutureTaskset 方法來記錄結果物件,如果 call 方法執行期間發生了異常,則會呼叫 setException 方法記錄異常。最後,透過呼叫 get 方法獲取方法的結果,注意這裏可能會丟擲方法執行時產生的異常

    public void run() {
        // ...
        try {
            // “提貨任務”
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 呼叫 callable 的 call 方法
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    // 捕獲並設定異常
                    setException(ex);
                }
                if (ran)
                    // 為結果賦值
                    set(result);
            }
        } finally {
            // ...
        }
    }

    protected void set(V v) {
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            // 將結果賦值給 outcome 全域性變數,供 get 時獲取
            outcome = v;
            // 修改狀態為 NORMAL
            STATE.setRelease(this, NORMAL); // final state
            finishCompletion();
        }
    }

    protected void setException(Throwable t) {
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            // 將異常賦值給 outcome 變數,供 get 時丟擲
            outcome = t;
            // 修改狀態為 EXCEPTIONAL
            STATE.setRelease(this, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        // 未完成時阻塞等一等
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        // 正常結束的話能正常獲取到結果
        if (s == NORMAL)
            return (V)x;
        // 否則會丟擲異常,注意如果執行中出現異常,呼叫 get 時會被丟擲
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

現在對 Future 模式 已經有了基本的瞭解:它透過 Future 介面來表示未來的結果,實現 呼叫者與執行者之間的解耦提高系統的吞吐量和響應速度,那在實踐中對該模式是如何使用的呢?

2. 對 Future 模式的實踐

因為 延保服務 頻道頁訪問量大且對介面效能要求較高,單執行緒處理並不能滿足效能要求,所以應用了 Future 模式 來提高查詢效率,但是並沒有藉助上文所述的 FutureTask 來實現,而是使用了 CompletableFuture 工具類,它們的實現原理基本一致,但是後者提供的方法和對 鏈式程式設計 的支援使程式碼更加簡潔,實現更加容易(相關 API 參考見文末)。

如下是使用 CompletableFuture 非同步多執行緒查詢訂單列表的邏輯,根據配置的 pageNo 分多條執行緒查詢各頁的訂單資料:

        List<OrderListInfo> result = new ArrayList<>();
        // 併發查詢訂單列表
        List<CompletableFuture<List<OrderListInfo>>> futureList = new ArrayList<>();
        try {
            // 配置需要查詢的頁數 pageNo,併發查詢不同頁碼的訂單
            for (int i = 1; i <= pageNo; i++) {
                int curPageNo = i;
                CompletableFuture<List<OrderListInfo>> future = CompletableFuture.supplyAsync(
                        () -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor);

                futureList.add(future);
            }
            // 等待所有執行緒處理完畢,並封裝結果值
            for (CompletableFuture<List<OrderListInfo>> future : futureList) {
                result.addAll(future.get());
            }
        } catch (Exception e) {
            log.error("併發查詢使用者訂單資訊異常", e);
        }


這段程式碼中對異常的處理能進行最佳化:第 15 行程式碼,如果某條執行緒查詢訂單列表時發生異常,那麼在呼叫 get 方法時會丟擲該異常,被 catch 後返回空結果,即使有其他執行緒查詢成功,這些訂單結果值也會被忽略掉,可以針對這一點進行最佳化,如下:

        List<OrderListInfo> result = new ArrayList<>();
        // 併發查詢訂單列表
        List<CompletableFuture<List<OrderListInfo>>> futureList = new ArrayList<>();
        try {
            // 配置需要查詢的頁數 pageNo,併發查詢不同頁碼的訂單
            for (int i = 1; i <= pageNo; i++) {
                int curPageNo = i;
                CompletableFuture<List<OrderListInfo>> future = CompletableFuture
                        .supplyAsync(() -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor)
                // 新增異常處理
                .exceptionally(e -> {
                    log.error("查詢使用者訂單資訊異常", e);
                    return Collections.emptyList();
                });

                futureList.add(future);
            }
            // 等待所有執行緒處理完畢,並封裝結果值
            for (CompletableFuture<List<OrderListInfo>> future : futureList) {
                result.addAll(future.get());
            }
        } catch (Exception e) {
            log.error("併發查詢使用者訂單資訊異常", e);
        }

最佳化後針對查詢發生異常的任務列印異常日誌,並返回空集合,這樣即使單執行緒查詢失敗,也不會影響到其他執行緒查詢成功的結果。

CompletableFuture 還提供了 allOf 方法,它返回的 CompletableFuture 物件在所有 CompletableFuture 執行完成時完成,相比於對每個任務都呼叫 get 阻塞等待任務完成的實現可讀性更好,改造後代碼如下:

        List<OrderListInfo> result = new ArrayList<>();
        // 併發查詢訂單列表
        CompletableFuture<List<OrderListInfo>>[] futures = new CompletableFuture[pageNo];
        // 配置需要查詢的頁數 pageNo,併發查詢不同頁碼的訂單
        for (int i = 1; i <= pageNo; i++) {
            int curPageNo = i;
            CompletableFuture<List<OrderListInfo>> future = CompletableFuture
                    .supplyAsync(() -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor)
                    // 新增異常處理
                    .exceptionally(e -> {
                        log.error("查詢使用者訂單資訊異常", e);
                        return Collections.emptyList();
                    });

            futures[i - 1] = future;
        }

        try {
            // 等待所有執行緒處理完畢
            CompletableFuture.allOf(futures).get();
            for (CompletableFuture<List<OrderListInfo>> future : futures) {
                List<OrderListInfo> orderInfoList = future.get();
                if (CollectionUtils.isEmpty(orderInfoList)) {
                    result.addAll(orderInfoList);
                }
            }
        } catch (Exception e) {
            log.error("處理使用者訂單結果資訊異常", e);
        }

Tips: CompletableFuture 的設計初衷是支援非同步程式設計,所以應儘量避免在CompletableFuture 鏈中使用 get()/join() 方法,因為這些方法會阻塞當前執行緒直到CompletableFuture 完成,應該在必須使用該結果值時才呼叫它們。

相關的模式:命令模式

命令模式能將操作的呼叫者和執行者解耦,它能很容易的與 Future 模式 結合,以查詢訂單的任務為例,我們可以將該任務封裝為“命令”物件的形式,執行時為每個執行緒提交一個命令,實現解耦並提高擴充套件性。在命令模式中,命令物件需要 支援撤銷和重做,那麼這便在查詢出現異常時,提供了補償處理的可能,命令模式類圖關係如下:

從多執行緒設計模式到對 CompletableFuture 的應用

3.《圖解Java多執行緒設計模式》書籍推薦

我覺得本書算得上是一本老書:05 年出版的基於 JDK1.5 的Java多執行緒書籍,相比於目前我們常用的 JDK1.8 和時髦的 JDK21,在讀之前總會讓人覺得有一種過時的感覺。但是當我讀完時,發現其中的模式能對應上程式碼中的處理邏輯:對 CompletableFuture 的使用正對應了其中的 Future 模式(非同步獲取其他執行緒的執行結果)等等,所以我覺得模式的應用不會侷限於技術的新老,它是在某種情況下,研發人員共識或通用的解決方案,在知曉某種模式,採用已有的技術實現它是容易的,而反過來在只掌握技術去探索模式是困難且沒有方向的。

同時,我也在考慮一個問題:對於新人學習多執行緒技術來說,究竟適不適合直接從模式入門呢?因為我對設計模式有了比較多的實踐經驗,所以對“模式”相關的內容足夠敏感,如果新人沒有這些經驗的話,這對他們來說會不會更像是一個個知識點的堆砌呢?好在的是,本書除了模式相關的內容,對基礎知識也做足了鋪墊,而且提出的關於多執行緒程式設計的思考點也是非常值得參考和學習的,以執行緒互斥和協同為例,書中談到:在對執行緒進行互斥處理時需要考慮 “要保護的東西是什麼”,這樣便能夠 清晰的確定鎖的粒度;對於執行緒的協同,書中提到的是需要考慮 “放在中間的東西是什麼”,直接的丟擲這個觀點是不容易理解的,“中間的東西”是在多執行緒的 生產者和消費者模式 中提出的,部分執行緒負責生產,生產完成後將物件放在“中間”,部分執行緒負責消費,消費時取的便是“中間”的物件,而合理規劃這些中間的東西便能 消除生產者和消費者之間的速度差異,提高系統的吞吐量和響應速度。而再深入考慮這兩個角度時,執行緒的互斥和協同其實是內外統一的:爲了讓執行緒協調執行,必須執行互斥處理,以防止共享的內容被破壞,而執行緒的互斥是爲了執行緒的協調執行才進行的必要操作。


附:CompletableFuture 常用 API

使用 supplyAsync 方法非同步執行任務,並返回 CompletableFuture 物件

如下程式碼所示,呼叫 CompletableFuture.supplyAsync 靜態方法非同步執行查詢邏輯,並返回一個新的 CompletableFuture 物件

CompletableFuture<List<Object>> future = CompletableFuture.supplyAsync(() -> doQuery(), executor);

使用 join 方法阻塞獲取完成結果

如下程式碼所示,在封裝結果前,呼叫 join 方法阻塞等待獲取結果

futureList.forEach(CompletableFuture::join);

它與 get 方法的主要區別在於,join 方法丟擲的是未經檢查的異常 CompletionException,並將原始異常作為其原因,這意味著我們可以不需要在方法簽名中宣告它或在呼叫 join 方法的地方進行異常處理,而 get 方法會丟擲 InterruptedExceptionExecutionException 異常,我們必須對它進行處理,get 方法原始碼如下:

    public T get() throws InterruptedException, ExecutionException {
        Object r;
        if ((r = result) == null)
            r = waitingGet(true);
        return (T) reportGet(r);
    }

用 thenApply(Function) 和 thenAccept(Consumer) 等回撥函式處理結果

如下是使用 thenApply() 方法對 CompletableFuture 的結果進行轉換的操作:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
    .thenApply(greeting -> greeting + " World");

使用 exceptionally() 處理 CompletableFuture 中的異常

CompletableFuture 提供了exceptionally() 方法來處理異常,這是一個非常重要的步驟。如果在 CompletableFuture 的執行過程中丟擲異常,那麼這個異常會被傳遞到最終的結果中。如果沒有適當的異常處理,那麼在呼叫 get()join() 方法時可能會丟擲異常。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (true) {
        throw new RuntimeException("Exception occurred");
    }
    return "Hello, World!";
}).exceptionally(e -> "An error occurred");

使用 allOf() 和 anyOf() 處理多個 CompletableFuture

如果有多個 CompletableFuture 需要處理,可以使用 CompletableFuture.allOf() 或者 CompletableFuture.anyOf()allOf() 在所有的 CompletableFuture 完成時完成,而 anyOf() 則會在任意一個 CompletableFuture 完成時完成。

complete()、completeExceptionally()、cancel() 方法

CompletableFuture 的執行是在呼叫了 complete()completeExceptionally()cancel() 等方法後纔會被標記為完成。如果沒有正確地完成 CompletableFuture,那麼在呼叫 get() 方法時可能會永久阻塞。這三個方法在 Java 併發程式設計中有著重要的應用。以下是這三個方法的常見使用場景:

  1. complete(T value): 此方法用於顯式地完成一個 CompletableFuture,並設定它的結果值。這在你需要在某個計算完成時,手動設定 CompletableFuture 的結果值的場景中非常有用。例如,你可能在一個非同步操作完成時,需要設定 CompletableFuture 的結果值。

CompletableFuture<String> future = new CompletableFuture<>();
// Some asynchronous operation
future.complete("Operation Result");
  1. completeExceptionally(Throwable ex): 此方法用於顯式地以異常完成一個 CompletableFuture。這在你需要在某個計算失敗時,手動設定 CompletableFuture 的異常的場景中非常有用。例如,你可能在一個非同步操作失敗時,需要設定 CompletableFuture 的異常。

CompletableFuture<String> future = new CompletableFuture<>();
// Some asynchronous operation
future.completeExceptionally(new RuntimeException("Operation Failed"));
  1. cancel(boolean mayInterruptIfRunning): 此方法用於取消與 CompletableFuture 關聯的計算。這在你需要取消一個長時間執行的或者不再需要的計算的場景中非常有用。例如,你可能在使用者取消操作或者超時的情況下,需要取消 CompletableFuture 的計算。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // Long running operation
});
// Some condition
future.cancel(true);

這些方法都是執行緒安全的,可以從任何執行緒中呼叫。

使用 thenCompose() 處理巢狀的 CompletableFuture

如果在處理 CompletableFuture 的結果時又建立了新的CompletableFuture,那麼就會產生巢狀的 CompletableFuture。這時可以使用 thenCompose() 方法來避免 CompletableFuture 的巢狀,如下程式碼所示:

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

使用 thenCombine() 處理兩個 CompletableFuture 的結果

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCombine(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> s1 + s2);

0則評論

您的電子郵件等資訊不會被公開,以下所有項目均必填

OK! You can skip this field.