切換語言為:簡體

Java 中的工具類 CompletableFuture 是如何提升效能的?

  • 爱糖宝
  • 2024-07-13
  • 2062
  • 0
  • 0

在日常開發中,爲了提高程式的效能,我們經常會使用非同步方式來完成,在本文中,我們將學習一種常用的工具類: CompletableFuture,並且學習如何使用它來提高 Java 應用程式的效能,讓我們開始學習旅程吧!

在分析 CompletableFuture 之前,我們先來看看 Future

什麼是 Future?

Future 是 Java 5 中引入的 Java 介面,用於表示將來可用的值,使用 Future 給 Java 帶來了巨大的好處,它可以幫助我們非同步執行一些非常密集的計算,而不會阻塞當前執行緒,因此,我們可以在當前執行緒中繼續做一些工作。

我們可以把 Future 想象成去餐廳吃晚飯,在晚餐準備期間,我們可以幹些其他的事情,一旦晚餐準備好,就可以吃飯了。

Future 的原始碼如下:

package java.util.concurrent;

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);
    
    boolean isCancelled();
   
    boolean isDone();
    
    V get() throws InterruptedException, ExecutionException;
    
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

什麼是 CompletableFuture?

CompletableFuture 是 Java 8 引入的一個類,用於處理非同步程式設計。它是 Future 介面的一個增強版,提供了一些有用的方法來管理非同步任務。其原始碼如下:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    // 方法太多,此處省略
}

CompletableFuture 和 Future

在本節中,我們將瞭解 Future 介面以及它的一些侷限性,並且分析如何使用 CompletableFuture 類來解決這些問題。

定義超時

Future 介面只提供了 get() 方法來獲取計算結果,但如果計算時間過長,我們的執行緒就會一直堵塞。

爲了更好地理解,讓我們看一些示例程式碼:

import java.util.concurrent.*;

public class FutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(() -> threadSleep());
        System.out.println("The result is: " + future.get());
    }

    private static String threadSleep() throws InterruptedException {
        TimeUnit.DAYS.sleep(365 * 10);
        return "finishSleep";
    }
}

上述示例中,我們建立了一個 ExecutorService 例項,並且使用它來提交一個執行緒睡眠的任務(threadSleep()),然後透過呼叫 future.get() 方法在控制檯上列印結果值,因為 threadSleep() 方法中,我們讓執行緒睡眠了 10年,所以控制檯要等待 10年纔會列印值,而且 Future 也沒有任何方法可以手動完成任務。

那麼,CompletableFuture 是如何克服這個問題的呢?

我們還是使用相同的場景,並且在流程中呼叫了 CompletableFuture.complete() 方法,示例程式碼如下:

import java.util.concurrent.*;

public class CompletableFutureDemo {

    public static void main(String[] args) {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> threadSleep());
        completableFuture.complete("Completed");
        System.out.println("result: " + completableFuture.get());
        System.out.println("completableFuture done ? " + completableFuture.isDone());
    }

    private static String threadSleep(){
        try {
            TimeUnit.DAYS.sleep(365 * 10);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "finishSleep";
    }
}

在上述示例中:

  • 首先,透過呼叫 CompletableFuture.supplyAsync() 方法建立一個 String 型別的 CompletableFuture;

  • 然後,呼叫 completableFuture.complete()方法;

  • 接著,呼叫 isDone() 方法,並列印其結果值;

  • 最後,main() 方法的輸出如下:

result: Completed
completableFuture done ? true

透過執行結果我們可以看到:需要睡眠 10年的 threadSleep() 居然完成了,為什麼呢?在整個程式碼中嫌疑最大的是 completableFuture.complete(),因此我們來看看這個方法到底做了什麼?其原始碼如下:

    /**
     * If not already completed, sets the value returned by {@link
     * #get()} and related methods to the given value.
     *
     * @param value the result value
     * @return {@code true} if this invocation caused this CompletableFuture
     * to transition to a completed state, else {@code false}
     */
    public boolean complete(T value) {
        boolean triggered = completeValue(value);
        postComplete();
        return triggered;
    }

透過原始碼我們可以知道:complete(T value) 方法是用於手動完成一個 CompletableFuture 任務(即使任務尚未執行或未完成)並且返回 value。

因此,CompletableFuture 是透過 complete(T value)方法手動結束 任務,從而客服了 Futrue 無法手動結束任務的限制。

組合非同步操作

假設我們需要呼叫兩個 Method:firstMethod() 和 secondMethod(),並且將 firstMethod() 的結果作為 secondMethod() 的輸入。

透過使用 Future 介面,我們無法非同步組合這兩個操作,只能同步完成,示例程式碼如下:

import java.util.concurrent.*;

public class FutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<Integer> firstFuture = executor.submit(() -> firstMethod(1));

        int firstMethodResult = firstFuture.get(); // 獲取 firstMethod的結果值
        System.out.println("firstMethodResult:" + firstMethodResult);
        Future<Integer> secondFuture = executor.submit(() -> secondMethod(firstMethodResult));
        System.out.println("secondMethodResult:" + secondFuture.get());
        executor.shutdown();
    }

    private static int firstMethod(int num) {
        return num;
    }

    private static int secondMethod(int firstMethodResult) {
        return 2 + firstMethodResult;
    }
}

在上述示例程式碼中:

  • 首先,透過 ExecutorService 提交一個返回 Future 的任務來呼叫 firstMethod;

  • 然後,需要將 firstMethod 的結果值傳遞給第 secondMethod,但檢索 firstMethod 結果值的唯一方法是使用 Future.get(),該方法會阻塞主執行緒;

  • 接著,我們必須等到 firstMethod 返回結果,然後再執行 secondMethod 操作,整個流程就變成同步過程;

  • 最後,main() 的輸出如下:

firstMethodResult:1
secondMethodResult:3

透過執行結果可以看出,結果符合預期而且整個過程是序列執行的。

那麼,CompletableFuture 是如何在不阻塞主執行緒的前提下,非同步組合兩個過程的呢?具體操作如下示例程式碼:

import java.util.concurrent.*;

public class CompletableFutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        var finalResult = CompletableFuture.supplyAsync(() -> firstMethod(1))
                .thenApply(firstMethodResult -> secondMethod(firstMethodResult));
        System.out.println("finalResult:" + finalResult.get());
    }
    private static int firstMethod(int num) {
        return num;
    }

    private static int secondMethod(int firstMethodResult) {
        return 2 + firstMethodResult;
    }
}

在上述示例程式碼中:

  • 首先,透過 CompletableFuture.supplyAsync 方法,返回一個新的 CompletableFuture,該 CompletableFuture 是在 ForkJoinPool.commonPool() 中非同步完成的,並且將結果值賦值給 Supplier;

  • 接著,獲取 firstMethod() 的結果並使用 thenApply() 方法,將其傳遞給另一個呼叫 secondMethod();

  • 最後,main() 的輸出如下:

finalResult:3

透過執行結果可以看出,結果符合預期而且 CompletableFuture 成功的把 firstMethod() 和 secondMethod() 兩個非同步過程整合。

CompletableFuture如何提升效能?

在本節中,我們將透過設定的場景來驗證 CompletableFuture 是如何提升效能的。

場景設定

  • 定義一個 Transaction 類,並且包含 id 屬性;

  • 定義一個 TransactionExecutor類,包含 transactionId 屬性和靜態方法 doTransaction(Transaction transaction),方法接收 Transaction物件;

  • 在 doTransaction() 方法中,透過執行緒睡眠 1秒來模擬業務耗時;

場景涉及的程式碼如下:

public class TransactionExecutor {
    private final String transactionId;

    public TransactionExecutor(String transactionId) {
        this.transactionId = transactionId;
    }

    public static TransactionExecutor doTransaction(Transaction transaction) {
        Thread.sleep(1000L); // 透過執行緒睡眠來模擬真實的業務耗時
        return new TransactionExecutor("transactionId: " + transaction.getId());
    }
    
    @Override
    public String toString() {
        return "TransactionExecutor{" +
                "transactionId='" + transactionId + ''' +
                '}';
    }
}

public class Transaction {
  private String id;

  public Transaction(String id) {
    this.id = id;
  }
  // getter setter方法
}

接著,我們將透過以下 3種方式來執行給定的場景:

  • 同步實現

  • 並行流實現

  • CompletableFuture 實現

同步實現

public class Demo {

  public static void main(String[] args) {
    long start = System.currentTimeMillis();
    var executor = Stream.of(
            new Transaction("1"),
            new Transaction("2"),
            new Transaction("3"))
        .map(TransactionExecutor::doTransaction)
        .collect(Collectors.toList());
    long end = System.currentTimeMillis();

    System.out.printf("The operation take %s ms%n", end - start);
    System.out.println("TransactionExecutor are: " + executor);
  }
}

執行程式碼結果如下:

The operation take 3087 ms
TransactionExecutor are: [TransactionExecutor{transactionId='transactionId: 1'},
TransactionExecutor{transactionId='transactionId: 2'}, 
TransactionExecutor{transactionId='transactionId: 3'}]

從執行結果可以看出:該程式花費了 3 秒多,因為每個事務都是序列執行,並且每個事務消耗 1秒,和預期時間比較吻合。

並行流實現

我們使用 Lambda 的 parallel並行流來實現,示例程式碼如下:

public class Demo {

  public static void main(String[] args) {
    long start = System.currentTimeMillis();
    var executor = Stream.of(
            new Transaction("1"),
            new Transaction("2"),
            new Transaction("3"))
        .parallel()
        .map(TransactionExecutor::doTransaction)
        .collect(Collectors.toList());
    long end = System.currentTimeMillis();

    System.out.printf("The operation took %s ms%n", end - start);
    System.out.println("TransactionExecutor are: " + executor);
  }
}

透過執行結果,我們發現它合同步方法的差異是巨大的!應用程式執行速度幾乎快了三倍,如下所示:

The operation took 1007 ms
TransactionExecutor are: [TransactionExecutor{transactionId='transactionId: 1'},
TransactionExecutor{transactionId='transactionId: 2'},
TransactionExecutor{transactionId='transactionId: 3'}]

CompletableFuture實現

現在將重構我們的客戶端應用程式以利用 CompletableFuture:

public class Demo {

  public static void main(String[] args) {
    Executor executor = Executors.newFixedThreadPool(10);
    long start = System.currentTimeMillis();
    var future = Stream.of(
            new Transaction("1"),
            new Transaction("2"),
            new Transaction("3"),
            new Transaction("4"),
            new Transaction("5"),
            new Transaction("6"),
            new Transaction("7"),
            new Transaction("8"),
            new Transaction("9"),
            new Transaction("10")
        ).map(transaction -> CompletableFuture.supplyAsync(
                () -> TransactionExecutor::doTransaction(transaction), executor)
        ).collect(toList());

    var categories = future.stream()
        .map(CompletableFuture::join)
        .collect(toList());
    long end = System.currentTimeMillis();

    System.out.printf("The operation took %s ms%n", end - start);
    System.out.println("TransactionExecutor are: " + categories);
  }
}

使用 CompletableFuture,對於 10 個執行緒,執行的時間也在 1 秒左右,效能簡直杆杆的,執行結果如下:

The operation took 1040 ms
TransactionExecutor are:[TransactionExecutor{transactionId='transactionId: 1'},
TransactionExecutor{transactionId='transactionId: 2'},
TransactionExecutor{transactionId='transactionId: 3'},
TransactionExecutor{transactionId='transactionId: 4'},
TransactionExecutor{transactionId='transactionId: 5'},
TransactionExecutor{transactionId='transactionId: 6'},
TransactionExecutor{transactionId='transactionId: 7'},
TransactionExecutor{transactionId='transactionId: 8'},
TransactionExecutor{transactionId='transactionId: 9'},
TransactionExecutor{transactionId='transactionId: 10'}]

適用場景

CompletableFuture 適合在處理非同步程式設計和併發任務時使用。以下是一些適合使用 CompletableFuture 的典型場景:

非同步 I/O 操作

在進行 I/O 操作時,例如檔案讀取、資料庫訪問或網路請求,使用 CompletableFuture 可以使這些操作非同步執行,從而避免阻塞主執行緒,提高應用程式的響應性。

並行處理任務

當需要並行處理多個獨立任務時,使用 CompletableFuture 可以有效利用多核 CPU,提高計算效率。

流水線處理

當有一系列依賴的操作需要按順序執行時,CompletableFuture 可以使這些操作非同步執行,形成處理流水線,從而提高處理效率。

事件驅動的非同步處理

在事件驅動的系統中,例如 GUI 應用或伺服器請求處理,CompletableFuture 可以在事件發生時非同步處理任務。

異常處理和回退機制

CompletableFuture 提供了靈活的異常處理機制,可以在非同步任務發生異常時執行回退操作或提供預設值。

併發任務的組合

CompletableFuture 可以組合多個併發任務的結果,例如使用 allOf 和 anyOf 方法。

總結

在本文中,我們學習瞭如何在 Java 中使用 Future 介面以及它的侷限性,同時,我們還學習瞭如何使用 CompletableFuture 類來克服 Future 的這些限制。

接著,我們透過一個業務場景演示,透過同步執行,併發執行,CompletableFuture執行來比較它們的執行效率。

最後,CompletableFuture 適合在非同步程式設計、併發任務、非阻塞 I/O、事件驅動處理、異常處理、任務組合等場景中使用。它提供了豐富的 API 來處理非同步操作,使程式碼更加簡潔、可讀和高效。透過使用 CompletableFuture,開發者可以更好地利用系統資源,提高應用程式的效能和響應性。

0則評論

您的電子郵件等資訊不會被公開,以下所有項目均必填

OK! You can skip this field.