在日常開發中,爲了提高程式的效能,我們經常會使用非同步方式來完成,在本文中,我們將學習一種常用的工具類: CompletableFuture
,並且學習如何使用它來提高 Java 應用程式的效能,讓我們開始學習旅程吧!
在分析 CompletableFuture
之前,我們先來看看 Future
。
什麼是 Future?
Future
是 Java 5 中引入的 Java 介面,用於表示將來可用的值,使用 Future
給 Java 帶來了巨大的好處,它可以幫助我們非同步執行一些非常密集的計算,而不會阻塞當前執行緒,因此,我們可以在當前執行緒中繼續做一些工作。
我們可以把 Future
想象成去餐廳吃晚飯,在晚餐準備期間,我們可以幹些其他的事情,一旦晚餐準備好,就可以吃飯了。
Future
的原始碼如下:
package java.util.concurrent; public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
什麼是 CompletableFuture?
CompletableFuture
是 Java 8 引入的一個類,用於處理非同步程式設計。它是 Future 介面的一個增強版,提供了一些有用的方法來管理非同步任務。其原始碼如下:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { // 方法太多,此處省略 }
CompletableFuture 和 Future
在本節中,我們將瞭解 Future
介面以及它的一些侷限性,並且分析如何使用 CompletableFuture
類來解決這些問題。
定義超時
Future
介面只提供了 get() 方法來獲取計算結果,但如果計算時間過長,我們的執行緒就會一直堵塞。
爲了更好地理解,讓我們看一些示例程式碼:
import java.util.concurrent.*; public class FutureDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newSingleThreadExecutor(); Future<String> future = executor.submit(() -> threadSleep()); System.out.println("The result is: " + future.get()); } private static String threadSleep() throws InterruptedException { TimeUnit.DAYS.sleep(365 * 10); return "finishSleep"; } }
上述示例中,我們建立了一個 ExecutorService 例項,並且使用它來提交一個執行緒睡眠的任務(threadSleep()),然後透過呼叫 future.get() 方法在控制檯上列印結果值,因為 threadSleep() 方法中,我們讓執行緒睡眠了 10年,所以控制檯要等待 10年纔會列印值,而且 Future
也沒有任何方法可以手動完成任務。
那麼,CompletableFuture
是如何克服這個問題的呢?
我們還是使用相同的場景,並且在流程中呼叫了 CompletableFuture.complete() 方法,示例程式碼如下:
import java.util.concurrent.*; public class CompletableFutureDemo { public static void main(String[] args) { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> threadSleep()); completableFuture.complete("Completed"); System.out.println("result: " + completableFuture.get()); System.out.println("completableFuture done ? " + completableFuture.isDone()); } private static String threadSleep(){ try { TimeUnit.DAYS.sleep(365 * 10); } catch (InterruptedException e) { throw new RuntimeException(e); } return "finishSleep"; } }
在上述示例中:
首先,透過呼叫 CompletableFuture.supplyAsync() 方法建立一個 String 型別的 CompletableFuture;
然後,呼叫 completableFuture.complete()方法;
接著,呼叫 isDone() 方法,並列印其結果值;
最後,main() 方法的輸出如下:
result: Completed completableFuture done ? true
透過執行結果我們可以看到:需要睡眠 10年的 threadSleep() 居然完成了,為什麼呢?在整個程式碼中嫌疑最大的是 completableFuture.complete(),因此我們來看看這個方法到底做了什麼?其原始碼如下:
/** * If not already completed, sets the value returned by {@link * #get()} and related methods to the given value. * * @param value the result value * @return {@code true} if this invocation caused this CompletableFuture * to transition to a completed state, else {@code false} */ public boolean complete(T value) { boolean triggered = completeValue(value); postComplete(); return triggered; }
透過原始碼我們可以知道:complete(T value) 方法是用於手動完成一個 CompletableFuture 任務(即使任務尚未執行或未完成)並且返回 value。
因此,CompletableFuture
是透過 complete(T value)方法手動結束 任務,從而客服了 Futrue
無法手動結束任務的限制。
組合非同步操作
假設我們需要呼叫兩個 Method:firstMethod() 和 secondMethod(),並且將 firstMethod() 的結果作為 secondMethod() 的輸入。
透過使用 Future
介面,我們無法非同步組合這兩個操作,只能同步完成,示例程式碼如下:
import java.util.concurrent.*; public class FutureDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newSingleThreadExecutor(); Future<Integer> firstFuture = executor.submit(() -> firstMethod(1)); int firstMethodResult = firstFuture.get(); // 獲取 firstMethod的結果值 System.out.println("firstMethodResult:" + firstMethodResult); Future<Integer> secondFuture = executor.submit(() -> secondMethod(firstMethodResult)); System.out.println("secondMethodResult:" + secondFuture.get()); executor.shutdown(); } private static int firstMethod(int num) { return num; } private static int secondMethod(int firstMethodResult) { return 2 + firstMethodResult; } }
在上述示例程式碼中:
首先,透過 ExecutorService 提交一個返回
Future
的任務來呼叫 firstMethod;然後,需要將 firstMethod 的結果值傳遞給第 secondMethod,但檢索 firstMethod 結果值的唯一方法是使用 Future.get(),該方法會阻塞主執行緒;
接著,我們必須等到 firstMethod 返回結果,然後再執行 secondMethod 操作,整個流程就變成同步過程;
最後,main() 的輸出如下:
firstMethodResult:1 secondMethodResult:3
透過執行結果可以看出,結果符合預期而且整個過程是序列執行的。
那麼,CompletableFuture
是如何在不阻塞主執行緒的前提下,非同步組合兩個過程的呢?具體操作如下示例程式碼:
import java.util.concurrent.*; public class CompletableFutureDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { var finalResult = CompletableFuture.supplyAsync(() -> firstMethod(1)) .thenApply(firstMethodResult -> secondMethod(firstMethodResult)); System.out.println("finalResult:" + finalResult.get()); } private static int firstMethod(int num) { return num; } private static int secondMethod(int firstMethodResult) { return 2 + firstMethodResult; } }
在上述示例程式碼中:
首先,透過 CompletableFuture.supplyAsync 方法,返回一個新的
CompletableFuture
,該CompletableFuture
是在 ForkJoinPool.commonPool() 中非同步完成的,並且將結果值賦值給 Supplier;接著,獲取 firstMethod() 的結果並使用 thenApply() 方法,將其傳遞給另一個呼叫 secondMethod();
最後,main() 的輸出如下:
finalResult:3
透過執行結果可以看出,結果符合預期而且 CompletableFuture
成功的把 firstMethod() 和 secondMethod() 兩個非同步過程整合。
CompletableFuture如何提升效能?
在本節中,我們將透過設定的場景來驗證 CompletableFuture
是如何提升效能的。
場景設定
定義一個 Transaction 類,並且包含 id 屬性;
定義一個 TransactionExecutor類,包含 transactionId 屬性和靜態方法 doTransaction(Transaction transaction),方法接收 Transaction物件;
在 doTransaction() 方法中,透過執行緒睡眠 1秒來模擬業務耗時;
場景涉及的程式碼如下:
public class TransactionExecutor { private final String transactionId; public TransactionExecutor(String transactionId) { this.transactionId = transactionId; } public static TransactionExecutor doTransaction(Transaction transaction) { Thread.sleep(1000L); // 透過執行緒睡眠來模擬真實的業務耗時 return new TransactionExecutor("transactionId: " + transaction.getId()); } @Override public String toString() { return "TransactionExecutor{" + "transactionId='" + transactionId + ''' + '}'; } } public class Transaction { private String id; public Transaction(String id) { this.id = id; } // getter setter方法 }
接著,我們將透過以下 3種方式來執行給定的場景:
同步實現
並行流實現
CompletableFuture 實現
同步實現
public class Demo { public static void main(String[] args) { long start = System.currentTimeMillis(); var executor = Stream.of( new Transaction("1"), new Transaction("2"), new Transaction("3")) .map(TransactionExecutor::doTransaction) .collect(Collectors.toList()); long end = System.currentTimeMillis(); System.out.printf("The operation take %s ms%n", end - start); System.out.println("TransactionExecutor are: " + executor); } }
執行程式碼結果如下:
The operation take 3087 ms TransactionExecutor are: [TransactionExecutor{transactionId='transactionId: 1'}, TransactionExecutor{transactionId='transactionId: 2'}, TransactionExecutor{transactionId='transactionId: 3'}]
從執行結果可以看出:該程式花費了 3 秒多,因為每個事務都是序列執行,並且每個事務消耗 1秒,和預期時間比較吻合。
並行流實現
我們使用 Lambda 的 parallel並行流來實現,示例程式碼如下:
public class Demo { public static void main(String[] args) { long start = System.currentTimeMillis(); var executor = Stream.of( new Transaction("1"), new Transaction("2"), new Transaction("3")) .parallel() .map(TransactionExecutor::doTransaction) .collect(Collectors.toList()); long end = System.currentTimeMillis(); System.out.printf("The operation took %s ms%n", end - start); System.out.println("TransactionExecutor are: " + executor); } }
透過執行結果,我們發現它合同步方法的差異是巨大的!應用程式執行速度幾乎快了三倍,如下所示:
The operation took 1007 ms TransactionExecutor are: [TransactionExecutor{transactionId='transactionId: 1'}, TransactionExecutor{transactionId='transactionId: 2'}, TransactionExecutor{transactionId='transactionId: 3'}]
CompletableFuture實現
現在將重構我們的客戶端應用程式以利用 CompletableFuture:
public class Demo { public static void main(String[] args) { Executor executor = Executors.newFixedThreadPool(10); long start = System.currentTimeMillis(); var future = Stream.of( new Transaction("1"), new Transaction("2"), new Transaction("3"), new Transaction("4"), new Transaction("5"), new Transaction("6"), new Transaction("7"), new Transaction("8"), new Transaction("9"), new Transaction("10") ).map(transaction -> CompletableFuture.supplyAsync( () -> TransactionExecutor::doTransaction(transaction), executor) ).collect(toList()); var categories = future.stream() .map(CompletableFuture::join) .collect(toList()); long end = System.currentTimeMillis(); System.out.printf("The operation took %s ms%n", end - start); System.out.println("TransactionExecutor are: " + categories); } }
使用 CompletableFuture
,對於 10 個執行緒,執行的時間也在 1 秒左右,效能簡直杆杆的,執行結果如下:
The operation took 1040 ms TransactionExecutor are:[TransactionExecutor{transactionId='transactionId: 1'}, TransactionExecutor{transactionId='transactionId: 2'}, TransactionExecutor{transactionId='transactionId: 3'}, TransactionExecutor{transactionId='transactionId: 4'}, TransactionExecutor{transactionId='transactionId: 5'}, TransactionExecutor{transactionId='transactionId: 6'}, TransactionExecutor{transactionId='transactionId: 7'}, TransactionExecutor{transactionId='transactionId: 8'}, TransactionExecutor{transactionId='transactionId: 9'}, TransactionExecutor{transactionId='transactionId: 10'}]
適用場景
CompletableFuture
適合在處理非同步程式設計和併發任務時使用。以下是一些適合使用 CompletableFuture 的典型場景:
非同步 I/O 操作
在進行 I/O 操作時,例如檔案讀取、資料庫訪問或網路請求,使用 CompletableFuture 可以使這些操作非同步執行,從而避免阻塞主執行緒,提高應用程式的響應性。
並行處理任務
當需要並行處理多個獨立任務時,使用 CompletableFuture 可以有效利用多核 CPU,提高計算效率。
流水線處理
當有一系列依賴的操作需要按順序執行時,CompletableFuture 可以使這些操作非同步執行,形成處理流水線,從而提高處理效率。
事件驅動的非同步處理
在事件驅動的系統中,例如 GUI 應用或伺服器請求處理,CompletableFuture 可以在事件發生時非同步處理任務。
異常處理和回退機制
CompletableFuture 提供了靈活的異常處理機制,可以在非同步任務發生異常時執行回退操作或提供預設值。
併發任務的組合
CompletableFuture 可以組合多個併發任務的結果,例如使用 allOf 和 anyOf 方法。
總結
在本文中,我們學習瞭如何在 Java 中使用 Future
介面以及它的侷限性,同時,我們還學習瞭如何使用 CompletableFuture
類來克服 Future
的這些限制。
接著,我們透過一個業務場景演示,透過同步執行,併發執行,CompletableFuture
執行來比較它們的執行效率。
最後,CompletableFuture
適合在非同步程式設計、併發任務、非阻塞 I/O、事件驅動處理、異常處理、任務組合等場景中使用。它提供了豐富的 API 來處理非同步操作,使程式碼更加簡潔、可讀和高效。透過使用 CompletableFuture,開發者可以更好地利用系統資源,提高應用程式的效能和響應性。