一、寫在開頭
在我們一開始講多執行緒的時候,提到過非同步與同步的概念,這裏麵我們再回顧一下:
同步:呼叫方在呼叫某個方法後,等待被呼叫方返回結果;呼叫方在取得被呼叫方的返回值後,再繼續執行。呼叫方順序執行,同步等待被呼叫方的返回值,這就是阻塞式呼叫;
非同步:呼叫方在呼叫某個方法後,直接返回,不需要等待被呼叫方返回結果;被呼叫方開啟一個執行緒處理任務,呼叫方可以同時去處理其他工作。呼叫方和被呼叫方是非同步的,這就是非阻塞式呼叫。
適應場景
同步:如果資料存線上程間的共享,或競態條件,需要同步。如多個執行緒同時對同一個變數進行讀和寫的操作,必須等前一個請求完成,後一個請求去呼叫前一個請求的結果,這時候就只能採用同步方式。 非同步:當應用程式在物件上呼叫了一個需要花費很長時間來執行的方法,並且不希望讓程式等待方法的返回時,就可以使用非同步,提高效率、加快程式的響應。
而我們今天探討的話題就是Java中的非同步程式設計。
二、Future
爲了提升Java程式的響應速度,在JDK1.5時引入了JUC包,裡面包含了一個介面檔案:Future,這是Java中實現非同步程式設計的開端,我們可以將Future理解為一種非同步思想或者一種設計模式;當我們執行某一耗時的任務時,可以將這個耗時任務交給一個子執行緒去非同步執行,同時我們可以乾點其他事情,不用傻傻等待耗時任務執行完成。等我們的事情幹完後,我們再透過 Future
類獲取到耗時任務的執行結果。
它的底層也是幾個很容易理解的介面方法:
// V 代表了Future執行的任務返回值的型別 public interface Future<V> { // 取消任務執行 // 成功取消返回 true,否則返回 false boolean cancel(boolean mayInterruptIfRunning); // 判斷任務是否被取消 boolean isCancelled(); // 判斷任務是否已經執行完成 boolean isDone(); // 獲取任務執行結果 V get() throws InterruptedException, ExecutionException; // 指定時間內沒有返回計算結果就丟擲 TimeOutException 異常 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutExceptio }
這些介面大致提供的服務是:我有一個任務分配給了Future,然後我可以繼續去幹其他的事情,然後我可以在這個過程中去看任務是否完成,也可以取消任務,一段時間後我也可以去獲取到任務執行後的結果,也可以設定任務多久執行完,沒執行完拋異常等。
對於Future的使用,我想大家應該並不陌生的,我們在學習執行緒池的時候就有涉及,看下面這個測試案例:
//這裏使用Executors只是方便測試,正常使用時推薦使用ThreadPoolExecutor! ExecutorService executorService = Executors.newFixedThreadPool(3); Future<String> submit = executorService.submit(() -> { try { Thread.sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } return "javabuild"; }); String s = submit.get(); System.out.println(s); executorService.shutdown();
這裏我們透過executorService.submit()方法去提交一個任務,執行緒池會返回一個 Future 型別的物件,透過這個 Future 物件可以判斷任務是否執行成功,並且可以透過 Future 的 get()方法來獲取返回值。
三、Future實戰
經過了上面的學習瞭解,我們來根據案例場景進行實戰使用Future,畢竟現在很多大廠除了問面試八股文之外,更多的會涉及到場景題!
場景模擬
假如你是一個12306的開發人員,爲了在節假日滿足大量使用者的出行需要,請高效的完成:使用者搜尋一個目的地,推薦出所有的交通方案+酒店+耗時,並根據價格從低到高排序
拿到這種場景題的時候,我們往往需要分步處理:
根據目的地,搜尋出所有的飛機、火車、客車路線,每個路線間隔30分鐘;
計算出每種路線的耗時;
根據交通方案中最後一個到站點進行可用酒店匹配;
根據不同交通方案+對應的酒店價格進行最終出行總價格計算;
將所有組合的出行方案反饋給使用者。
好了,分析完我們大概需要做的步驟,我們就來透過程式碼實現一下吧。
第一步: 我們先來建立一個固定10個執行緒的執行緒池,用來處理以上每一步的任務。
//這裏使用Executors只是演示,正常使用時推薦使用ThreadPoolExecutor! ExecutorService executor = Executors.newFixedThreadPool(10);
第二步: 部分程式碼例項,方法就不貼了,太多太長了,大家需要對Future的用法理解即可
// 1. 根據傳入的目的地查詢所有出行方案,包括交通組合,價格,到站地點,出發時間,到站時間等 Future<List<TripMethods>> tripMethods = executor.submit(() -> searchMethods(searchCondition)); List<TripMethods> methods; try { methods = tripMethods.get(); } catch (InterruptedException | ExecutionException e) { // 處理異常 } // 2. 對每個出行方案的最終到站點查詢酒店 List<Future<List<Hotel>>> futureHotelsList = new ArrayList<>(); for (TripMethods method : methods) { Future<List<Hotel>> futureHotels = executor.submit(() -> searchHotels(method)); futureHotelsList.add(futureHotels); } // 出行方案=交通方案+酒店+耗時+價格 List<Future<List<TravelPackage>>> futureTravelPackagesList = new ArrayList<>(); for (Future<List<Hotel>> futureHotels : futureHotelsList) { List<Hotel> hotels; try { hotels = futureHotels.get(); } catch (InterruptedException | ExecutionException e) { // 處理異常 } // 3. 對每個交通方案的價格和其對應的酒店價格進行求和 for (Hotel hotel : hotels) { Future<List<TravelPackage>> futureTravelPackages = executor.submit(() -> calculatePrices(hotel)); futureTravelPackagesList.add(futureTravelPackages); } } List<TravelPackage> travelPackages = new ArrayList<>(); for (Future<List<TravelPackage>> futureTravelPackages : futureTravelPackagesList) { try { travelPackages.addAll(futureTravelPackages.get()); } catch (InterruptedException | ExecutionException e) { // 處理異常 } } // 4. 將所有出行方案按照價格排序 travelPackages.sort(Comparator.comparing(TravelPackage::getPrice)); // 5. 返回結果 return travelPackages;
我們在這裏將每一步分任務,都作為一個future物件,處理完返回。但是這樣會帶來諸多問題,比如:我們呼叫future的get方法是阻塞操作,大大影響效率,並且在複雜的鏈路關係中,這種拆分式的寫法,很難理清楚關聯關係,先後關係等;
四、CompletableFuture 調優
在這種背景下,Java 8 時引入CompletableFuture 類,它的誕生是爲了解決Future 的這些缺陷。CompletableFuture 除了提供了更為好用和強大的 Future 特性之外,還提供了函數語言程式設計、非同步任務編排組合(可以將多個非同步任務串聯起來,組成一個完整的鏈式呼叫)等能力。
//CompletableFuture實現了Future的介面方法,CompletionStage 介面描述了一個非同步計算的階段。很多計算可以分成多個階段或步驟,此時可以透過它將所有步驟組合起來,形成非同步計算的流水線。 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { }
在CompletableFuture類中透過CompletionStage提供了大量的介面方法,他們讓CompletableFuture擁有了出色的函數語言程式設計能力,方法太多,我們無法一一講解,只能透過對上面測試原始碼進行調優時,去使用,使用到的解釋一下哈。
在這裏插入圖片描述
【CompletableFuture最佳化程式碼】
CompletableFuture.supplyAsync(() -> searchMethods()) // 1. 根據傳入的目的地查詢所有出行方案,包括交通組合,價格,到站地點,出發時間,到站時間等 .thenCompose(methods -> { // 2. 對每個出行方案的最終到站點查詢酒店 List<CompletableFuture<List<TravelPackage>>> travelPackageFutures = methods.stream() .map(method -> CompletableFuture.supplyAsync(() -> searchHotels(method)) // 查詢酒店 .thenCompose(hotels -> { // 3. 對每個交通方案的價格和其對應的酒店價格進行求和 List<CompletableFuture<TravelPackage>> packageFutures = hotels.stream() .map(hotel -> CompletableFuture.supplyAsync(() -> new TravelPackage(method, hotel))) .collect(Collectors.toList()); return CompletableFuture.allOf(packageFutures.toArray(new CompletableFuture[0])) .thenApply(v -> packageFutures.stream() .map(CompletableFuture::join) .collect(Collectors.toList())); })) .collect(Collectors.toList()); return CompletableFuture.allOf(travelPackageFutures.toArray(new CompletableFuture[0])) .thenApply(v -> travelPackageFutures.stream() .flatMap(future -> future.join().stream()) .collect(Collectors.toList())); }) .thenApply(travelPackages -> { // 4. 將所有出行方案按照價格排序 return travelPackages.stream() .sorted(Comparator.comparing(TravelPackage::getPrice)) .collect(Collectors.toList()); }) .exceptionally(e -> { // 處理所有的異常 // 處理異常 return null; });
在這裏我們將整個實現都以一種函式鏈式呼叫的方式完成了,看似冗長,實則各個關係的先後非常明確,對於複雜的業務邏輯實現更加容易進行問題的排查與理解。
【解析】
1)在這段程式碼的開頭,我們透過CompletableFuture 自帶的靜態工廠方法supplyAsync() 進行物件的建立,平時還可以用以new關鍵字或者runAsync()方法建立例項;
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier); // 使用自定義執行緒池(推薦) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor); static CompletableFuture<Void> runAsync(Runnable runnable); // 使用自定義執行緒池(推薦) static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
2)thenCompose():用 thenCompose() 按順序連結兩個 CompletableFuture 物件,實現非同步的任務鏈。它的作用是將前一個任務的返回結果作為下一個任務的輸入引數,從而形成一個依賴關係。注意:這個方法是非阻塞的,即查詢酒店的操作會立即開始,而不需要等待查詢交通方案的操作完成。
3)thenApply():thenApply() 方法接受一個 Function 例項,用它來處理結果;
4)allOf() :方法會等到所有的 CompletableFuture 都執行完成之後再返回;
5) 呼叫 join() 可以讓程式等待都執行完了之後再繼續執行。
6)exceptionally():這個方法用於處理CompletableFuture的異常情況,如果CompletableFuture的計算過程中丟擲異常,那麼這個方法會被呼叫。
五、總結
好了,今天就講這麼多,其實在Java中透過條用CompletableFuture實現非同步編排的工作還是稍微有點難度的,大量的API支援,需要我們在一次次的實戰中去熟悉,並靈活使用。推薦大家去看看京東的asyncTool這個框架,裡面就大量使用了CompletableFuture。