在日常开发中,为了提高程序的性能,我们经常会使用异步方式来完成,在本文中,我们将学习一种常用的工具类: CompletableFuture
,并且学习如何使用它来提高 Java 应用程序的性能,让我们开始学习旅程吧!
在分析 CompletableFuture
之前,我们先来看看 Future
。
什么是 Future?
Future
是 Java 5 中引入的 Java 接口,用于表示将来可用的值,使用 Future
给 Java 带来了巨大的好处,它可以帮助我们异步执行一些非常密集的计算,而不会阻塞当前线程,因此,我们可以在当前线程中继续做一些工作。
我们可以把 Future
想象成去餐厅吃晚饭,在晚餐准备期间,我们可以干些其他的事情,一旦晚餐准备好,就可以吃饭了。
Future
的源码如下:
package java.util.concurrent; public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
什么是 CompletableFuture?
CompletableFuture
是 Java 8 引入的一个类,用于处理异步编程。它是 Future 接口的一个增强版,提供了一些有用的方法来管理异步任务。其源码如下:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { // 方法太多,此处省略 }
CompletableFuture 和 Future
在本节中,我们将了解 Future
接口以及它的一些局限性,并且分析如何使用 CompletableFuture
类来解决这些问题。
定义超时
Future
接口只提供了 get() 方法来获取计算结果,但如果计算时间过长,我们的线程就会一直堵塞。
为了更好地理解,让我们看一些示例代码:
import java.util.concurrent.*; public class FutureDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newSingleThreadExecutor(); Future<String> future = executor.submit(() -> threadSleep()); System.out.println("The result is: " + future.get()); } private static String threadSleep() throws InterruptedException { TimeUnit.DAYS.sleep(365 * 10); return "finishSleep"; } }
上述示例中,我们创建了一个 ExecutorService 实例,并且使用它来提交一个线程睡眠的任务(threadSleep()),然后通过调用 future.get() 方法在控制台上打印结果值,因为 threadSleep() 方法中,我们让线程睡眠了 10年,所以控制台要等待 10年才会打印值,而且 Future
也没有任何方法可以手动完成任务。
那么,CompletableFuture
是如何克服这个问题的呢?
我们还是使用相同的场景,并且在流程中调用了 CompletableFuture.complete() 方法,示例代码如下:
import java.util.concurrent.*; public class CompletableFutureDemo { public static void main(String[] args) { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> threadSleep()); completableFuture.complete("Completed"); System.out.println("result: " + completableFuture.get()); System.out.println("completableFuture done ? " + completableFuture.isDone()); } private static String threadSleep(){ try { TimeUnit.DAYS.sleep(365 * 10); } catch (InterruptedException e) { throw new RuntimeException(e); } return "finishSleep"; } }
在上述示例中:
首先,通过调用 CompletableFuture.supplyAsync() 方法创建一个 String 类型的 CompletableFuture;
然后,调用 completableFuture.complete()方法;
接着,调用 isDone() 方法,并打印其结果值;
最后,main() 方法的输出如下:
result: Completed completableFuture done ? true
通过运行结果我们可以看到:需要睡眠 10年的 threadSleep() 居然完成了,为什么呢?在整个代码中嫌疑最大的是 completableFuture.complete(),因此我们来看看这个方法到底做了什么?其源码如下:
/** * If not already completed, sets the value returned by {@link * #get()} and related methods to the given value. * * @param value the result value * @return {@code true} if this invocation caused this CompletableFuture * to transition to a completed state, else {@code false} */ public boolean complete(T value) { boolean triggered = completeValue(value); postComplete(); return triggered; }
通过源码我们可以知道:complete(T value) 方法是用于手动完成一个 CompletableFuture 任务(即使任务尚未执行或未完成)并且返回 value。
因此,CompletableFuture
是通过 complete(T value)方法手动结束 任务,从而客服了 Futrue
无法手动结束任务的限制。
组合异步操作
假设我们需要调用两个 Method:firstMethod() 和 secondMethod(),并且将 firstMethod() 的结果作为 secondMethod() 的输入。
通过使用 Future
接口,我们无法异步组合这两个操作,只能同步完成,示例代码如下:
import java.util.concurrent.*; public class FutureDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newSingleThreadExecutor(); Future<Integer> firstFuture = executor.submit(() -> firstMethod(1)); int firstMethodResult = firstFuture.get(); // 获取 firstMethod的结果值 System.out.println("firstMethodResult:" + firstMethodResult); Future<Integer> secondFuture = executor.submit(() -> secondMethod(firstMethodResult)); System.out.println("secondMethodResult:" + secondFuture.get()); executor.shutdown(); } private static int firstMethod(int num) { return num; } private static int secondMethod(int firstMethodResult) { return 2 + firstMethodResult; } }
在上述示例代码中:
首先,通过 ExecutorService 提交一个返回
Future
的任务来调用 firstMethod;然后,需要将 firstMethod 的结果值传递给第 secondMethod,但检索 firstMethod 结果值的唯一方法是使用 Future.get(),该方法会阻塞主线程;
接着,我们必须等到 firstMethod 返回结果,然后再执行 secondMethod 操作,整个流程就变成同步过程;
最后,main() 的输出如下:
firstMethodResult:1 secondMethodResult:3
通过运行结果可以看出,结果符合预期而且整个过程是串行执行的。
那么,CompletableFuture
是如何在不阻塞主线程的前提下,异步组合两个过程的呢?具体操作如下示例代码:
import java.util.concurrent.*; public class CompletableFutureDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { var finalResult = CompletableFuture.supplyAsync(() -> firstMethod(1)) .thenApply(firstMethodResult -> secondMethod(firstMethodResult)); System.out.println("finalResult:" + finalResult.get()); } private static int firstMethod(int num) { return num; } private static int secondMethod(int firstMethodResult) { return 2 + firstMethodResult; } }
在上述示例代码中:
首先,通过 CompletableFuture.supplyAsync 方法,返回一个新的
CompletableFuture
,该CompletableFuture
是在 ForkJoinPool.commonPool() 中异步完成的,并且将结果值赋值给 Supplier;接着,获取 firstMethod() 的结果并使用 thenApply() 方法,将其传递给另一个调用 secondMethod();
最后,main() 的输出如下:
finalResult:3
通过运行结果可以看出,结果符合预期而且 CompletableFuture
成功的把 firstMethod() 和 secondMethod() 两个异步过程整合。
CompletableFuture如何提升性能?
在本节中,我们将通过设定的场景来验证 CompletableFuture
是如何提升性能的。
场景设定
定义一个 Transaction 类,并且包含 id 属性;
定义一个 TransactionExecutor类,包含 transactionId 属性和静态方法 doTransaction(Transaction transaction),方法接收 Transaction对象;
在 doTransaction() 方法中,通过线程睡眠 1秒来模拟业务耗时;
场景涉及的代码如下:
public class TransactionExecutor { private final String transactionId; public TransactionExecutor(String transactionId) { this.transactionId = transactionId; } public static TransactionExecutor doTransaction(Transaction transaction) { Thread.sleep(1000L); // 通过线程睡眠来模拟真实的业务耗时 return new TransactionExecutor("transactionId: " + transaction.getId()); } @Override public String toString() { return "TransactionExecutor{" + "transactionId='" + transactionId + ''' + '}'; } } public class Transaction { private String id; public Transaction(String id) { this.id = id; } // getter setter方法 }
接着,我们将通过以下 3种方式来执行给定的场景:
同步实现
并行流实现
CompletableFuture 实现
同步实现
public class Demo { public static void main(String[] args) { long start = System.currentTimeMillis(); var executor = Stream.of( new Transaction("1"), new Transaction("2"), new Transaction("3")) .map(TransactionExecutor::doTransaction) .collect(Collectors.toList()); long end = System.currentTimeMillis(); System.out.printf("The operation take %s ms%n", end - start); System.out.println("TransactionExecutor are: " + executor); } }
运行代码结果如下:
The operation take 3087 ms TransactionExecutor are: [TransactionExecutor{transactionId='transactionId: 1'}, TransactionExecutor{transactionId='transactionId: 2'}, TransactionExecutor{transactionId='transactionId: 3'}]
从运行结果可以看出:该程序花费了 3 秒多,因为每个事务都是串行执行,并且每个事务消耗 1秒,和预期时间比较吻合。
并行流实现
我们使用 Lambda 的 parallel并行流来实现,示例代码如下:
public class Demo { public static void main(String[] args) { long start = System.currentTimeMillis(); var executor = Stream.of( new Transaction("1"), new Transaction("2"), new Transaction("3")) .parallel() .map(TransactionExecutor::doTransaction) .collect(Collectors.toList()); long end = System.currentTimeMillis(); System.out.printf("The operation took %s ms%n", end - start); System.out.println("TransactionExecutor are: " + executor); } }
通过运行结果,我们发现它合同步方法的差异是巨大的!应用程序运行速度几乎快了三倍,如下所示:
The operation took 1007 ms TransactionExecutor are: [TransactionExecutor{transactionId='transactionId: 1'}, TransactionExecutor{transactionId='transactionId: 2'}, TransactionExecutor{transactionId='transactionId: 3'}]
CompletableFuture实现
现在将重构我们的客户端应用程序以利用 CompletableFuture:
public class Demo { public static void main(String[] args) { Executor executor = Executors.newFixedThreadPool(10); long start = System.currentTimeMillis(); var future = Stream.of( new Transaction("1"), new Transaction("2"), new Transaction("3"), new Transaction("4"), new Transaction("5"), new Transaction("6"), new Transaction("7"), new Transaction("8"), new Transaction("9"), new Transaction("10") ).map(transaction -> CompletableFuture.supplyAsync( () -> TransactionExecutor::doTransaction(transaction), executor) ).collect(toList()); var categories = future.stream() .map(CompletableFuture::join) .collect(toList()); long end = System.currentTimeMillis(); System.out.printf("The operation took %s ms%n", end - start); System.out.println("TransactionExecutor are: " + categories); } }
使用 CompletableFuture
,对于 10 个线程,执行的时间也在 1 秒左右,性能简直杆杆的,运行结果如下:
The operation took 1040 ms TransactionExecutor are:[TransactionExecutor{transactionId='transactionId: 1'}, TransactionExecutor{transactionId='transactionId: 2'}, TransactionExecutor{transactionId='transactionId: 3'}, TransactionExecutor{transactionId='transactionId: 4'}, TransactionExecutor{transactionId='transactionId: 5'}, TransactionExecutor{transactionId='transactionId: 6'}, TransactionExecutor{transactionId='transactionId: 7'}, TransactionExecutor{transactionId='transactionId: 8'}, TransactionExecutor{transactionId='transactionId: 9'}, TransactionExecutor{transactionId='transactionId: 10'}]
适用场景
CompletableFuture
适合在处理异步编程和并发任务时使用。以下是一些适合使用 CompletableFuture 的典型场景:
异步 I/O 操作
在进行 I/O 操作时,例如文件读取、数据库访问或网络请求,使用 CompletableFuture 可以使这些操作异步执行,从而避免阻塞主线程,提高应用程序的响应性。
并行处理任务
当需要并行处理多个独立任务时,使用 CompletableFuture 可以有效利用多核 CPU,提高计算效率。
流水线处理
当有一系列依赖的操作需要按顺序执行时,CompletableFuture 可以使这些操作异步执行,形成处理流水线,从而提高处理效率。
事件驱动的异步处理
在事件驱动的系统中,例如 GUI 应用或服务器请求处理,CompletableFuture 可以在事件发生时异步处理任务。
异常处理和回退机制
CompletableFuture 提供了灵活的异常处理机制,可以在异步任务发生异常时执行回退操作或提供默认值。
并发任务的组合
CompletableFuture 可以组合多个并发任务的结果,例如使用 allOf 和 anyOf 方法。
总结
在本文中,我们学习了如何在 Java 中使用 Future
接口以及它的局限性,同时,我们还学习了如何使用 CompletableFuture
类来克服 Future
的这些限制。
接着,我们通过一个业务场景演示,通过同步执行,并发执行,CompletableFuture
执行来比较它们的执行效率。
最后,CompletableFuture
适合在异步编程、并发任务、非阻塞 I/O、事件驱动处理、异常处理、任务组合等场景中使用。它提供了丰富的 API 来处理异步操作,使代码更加简洁、可读和高效。通过使用 CompletableFuture,开发者可以更好地利用系统资源,提高应用程序的性能和响应性。