切换语言为:繁体

Java 中的工具类 CompletableFuture 是如何提升性能的?

  • 爱糖宝
  • 2024-07-13
  • 2063
  • 0
  • 0

在日常开发中,为了提高程序的性能,我们经常会使用异步方式来完成,在本文中,我们将学习一种常用的工具类: CompletableFuture,并且学习如何使用它来提高 Java 应用程序的性能,让我们开始学习旅程吧!

在分析 CompletableFuture 之前,我们先来看看 Future

什么是 Future?

Future 是 Java 5 中引入的 Java 接口,用于表示将来可用的值,使用 Future 给 Java 带来了巨大的好处,它可以帮助我们异步执行一些非常密集的计算,而不会阻塞当前线程,因此,我们可以在当前线程中继续做一些工作。

我们可以把 Future 想象成去餐厅吃晚饭,在晚餐准备期间,我们可以干些其他的事情,一旦晚餐准备好,就可以吃饭了。

Future 的源码如下:

package java.util.concurrent;

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);
    
    boolean isCancelled();
   
    boolean isDone();
    
    V get() throws InterruptedException, ExecutionException;
    
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

什么是 CompletableFuture?

CompletableFuture 是 Java 8 引入的一个类,用于处理异步编程。它是 Future 接口的一个增强版,提供了一些有用的方法来管理异步任务。其源码如下:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    // 方法太多,此处省略
}

CompletableFuture 和 Future

在本节中,我们将了解 Future 接口以及它的一些局限性,并且分析如何使用 CompletableFuture 类来解决这些问题。

定义超时

Future 接口只提供了 get() 方法来获取计算结果,但如果计算时间过长,我们的线程就会一直堵塞。

为了更好地理解,让我们看一些示例代码:

import java.util.concurrent.*;

public class FutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(() -> threadSleep());
        System.out.println("The result is: " + future.get());
    }

    private static String threadSleep() throws InterruptedException {
        TimeUnit.DAYS.sleep(365 * 10);
        return "finishSleep";
    }
}

上述示例中,我们创建了一个 ExecutorService 实例,并且使用它来提交一个线程睡眠的任务(threadSleep()),然后通过调用 future.get() 方法在控制台上打印结果值,因为 threadSleep() 方法中,我们让线程睡眠了 10年,所以控制台要等待 10年才会打印值,而且 Future 也没有任何方法可以手动完成任务。

那么,CompletableFuture 是如何克服这个问题的呢?

我们还是使用相同的场景,并且在流程中调用了 CompletableFuture.complete() 方法,示例代码如下:

import java.util.concurrent.*;

public class CompletableFutureDemo {

    public static void main(String[] args) {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> threadSleep());
        completableFuture.complete("Completed");
        System.out.println("result: " + completableFuture.get());
        System.out.println("completableFuture done ? " + completableFuture.isDone());
    }

    private static String threadSleep(){
        try {
            TimeUnit.DAYS.sleep(365 * 10);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "finishSleep";
    }
}

在上述示例中:

  • 首先,通过调用 CompletableFuture.supplyAsync() 方法创建一个 String 类型的 CompletableFuture;

  • 然后,调用 completableFuture.complete()方法;

  • 接着,调用 isDone() 方法,并打印其结果值;

  • 最后,main() 方法的输出如下:

result: Completed
completableFuture done ? true

通过运行结果我们可以看到:需要睡眠 10年的 threadSleep() 居然完成了,为什么呢?在整个代码中嫌疑最大的是 completableFuture.complete(),因此我们来看看这个方法到底做了什么?其源码如下:

    /**
     * If not already completed, sets the value returned by {@link
     * #get()} and related methods to the given value.
     *
     * @param value the result value
     * @return {@code true} if this invocation caused this CompletableFuture
     * to transition to a completed state, else {@code false}
     */
    public boolean complete(T value) {
        boolean triggered = completeValue(value);
        postComplete();
        return triggered;
    }

通过源码我们可以知道:complete(T value) 方法是用于手动完成一个 CompletableFuture 任务(即使任务尚未执行或未完成)并且返回 value。

因此,CompletableFuture 是通过 complete(T value)方法手动结束 任务,从而客服了 Futrue 无法手动结束任务的限制。

组合异步操作

假设我们需要调用两个 Method:firstMethod() 和 secondMethod(),并且将 firstMethod() 的结果作为 secondMethod() 的输入。

通过使用 Future 接口,我们无法异步组合这两个操作,只能同步完成,示例代码如下:

import java.util.concurrent.*;

public class FutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<Integer> firstFuture = executor.submit(() -> firstMethod(1));

        int firstMethodResult = firstFuture.get(); // 获取 firstMethod的结果值
        System.out.println("firstMethodResult:" + firstMethodResult);
        Future<Integer> secondFuture = executor.submit(() -> secondMethod(firstMethodResult));
        System.out.println("secondMethodResult:" + secondFuture.get());
        executor.shutdown();
    }

    private static int firstMethod(int num) {
        return num;
    }

    private static int secondMethod(int firstMethodResult) {
        return 2 + firstMethodResult;
    }
}

在上述示例代码中:

  • 首先,通过 ExecutorService 提交一个返回 Future 的任务来调用 firstMethod;

  • 然后,需要将 firstMethod 的结果值传递给第 secondMethod,但检索 firstMethod 结果值的唯一方法是使用 Future.get(),该方法会阻塞主线程;

  • 接着,我们必须等到 firstMethod 返回结果,然后再执行 secondMethod 操作,整个流程就变成同步过程;

  • 最后,main() 的输出如下:

firstMethodResult:1
secondMethodResult:3

通过运行结果可以看出,结果符合预期而且整个过程是串行执行的。

那么,CompletableFuture 是如何在不阻塞主线程的前提下,异步组合两个过程的呢?具体操作如下示例代码:

import java.util.concurrent.*;

public class CompletableFutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        var finalResult = CompletableFuture.supplyAsync(() -> firstMethod(1))
                .thenApply(firstMethodResult -> secondMethod(firstMethodResult));
        System.out.println("finalResult:" + finalResult.get());
    }
    private static int firstMethod(int num) {
        return num;
    }

    private static int secondMethod(int firstMethodResult) {
        return 2 + firstMethodResult;
    }
}

在上述示例代码中:

  • 首先,通过 CompletableFuture.supplyAsync 方法,返回一个新的 CompletableFuture,该 CompletableFuture 是在 ForkJoinPool.commonPool() 中异步完成的,并且将结果值赋值给 Supplier;

  • 接着,获取 firstMethod() 的结果并使用 thenApply() 方法,将其传递给另一个调用 secondMethod();

  • 最后,main() 的输出如下:

finalResult:3

通过运行结果可以看出,结果符合预期而且 CompletableFuture 成功的把 firstMethod() 和 secondMethod() 两个异步过程整合。

CompletableFuture如何提升性能?

在本节中,我们将通过设定的场景来验证 CompletableFuture 是如何提升性能的。

场景设定

  • 定义一个 Transaction 类,并且包含 id 属性;

  • 定义一个 TransactionExecutor类,包含 transactionId 属性和静态方法 doTransaction(Transaction transaction),方法接收 Transaction对象;

  • 在 doTransaction() 方法中,通过线程睡眠 1秒来模拟业务耗时;

场景涉及的代码如下:

public class TransactionExecutor {
    private final String transactionId;

    public TransactionExecutor(String transactionId) {
        this.transactionId = transactionId;
    }

    public static TransactionExecutor doTransaction(Transaction transaction) {
        Thread.sleep(1000L); // 通过线程睡眠来模拟真实的业务耗时
        return new TransactionExecutor("transactionId: " + transaction.getId());
    }
    
    @Override
    public String toString() {
        return "TransactionExecutor{" +
                "transactionId='" + transactionId + ''' +
                '}';
    }
}

public class Transaction {
  private String id;

  public Transaction(String id) {
    this.id = id;
  }
  // getter setter方法
}

接着,我们将通过以下 3种方式来执行给定的场景:

  • 同步实现

  • 并行流实现

  • CompletableFuture 实现

同步实现

public class Demo {

  public static void main(String[] args) {
    long start = System.currentTimeMillis();
    var executor = Stream.of(
            new Transaction("1"),
            new Transaction("2"),
            new Transaction("3"))
        .map(TransactionExecutor::doTransaction)
        .collect(Collectors.toList());
    long end = System.currentTimeMillis();

    System.out.printf("The operation take %s ms%n", end - start);
    System.out.println("TransactionExecutor are: " + executor);
  }
}

运行代码结果如下:

The operation take 3087 ms
TransactionExecutor are: [TransactionExecutor{transactionId='transactionId: 1'},
TransactionExecutor{transactionId='transactionId: 2'}, 
TransactionExecutor{transactionId='transactionId: 3'}]

从运行结果可以看出:该程序花费了 3 秒多,因为每个事务都是串行执行,并且每个事务消耗 1秒,和预期时间比较吻合。

并行流实现

我们使用 Lambda 的 parallel并行流来实现,示例代码如下:

public class Demo {

  public static void main(String[] args) {
    long start = System.currentTimeMillis();
    var executor = Stream.of(
            new Transaction("1"),
            new Transaction("2"),
            new Transaction("3"))
        .parallel()
        .map(TransactionExecutor::doTransaction)
        .collect(Collectors.toList());
    long end = System.currentTimeMillis();

    System.out.printf("The operation took %s ms%n", end - start);
    System.out.println("TransactionExecutor are: " + executor);
  }
}

通过运行结果,我们发现它合同步方法的差异是巨大的!应用程序运行速度几乎快了三倍,如下所示:

The operation took 1007 ms
TransactionExecutor are: [TransactionExecutor{transactionId='transactionId: 1'},
TransactionExecutor{transactionId='transactionId: 2'},
TransactionExecutor{transactionId='transactionId: 3'}]

CompletableFuture实现

现在将重构我们的客户端应用程序以利用 CompletableFuture:

public class Demo {

  public static void main(String[] args) {
    Executor executor = Executors.newFixedThreadPool(10);
    long start = System.currentTimeMillis();
    var future = Stream.of(
            new Transaction("1"),
            new Transaction("2"),
            new Transaction("3"),
            new Transaction("4"),
            new Transaction("5"),
            new Transaction("6"),
            new Transaction("7"),
            new Transaction("8"),
            new Transaction("9"),
            new Transaction("10")
        ).map(transaction -> CompletableFuture.supplyAsync(
                () -> TransactionExecutor::doTransaction(transaction), executor)
        ).collect(toList());

    var categories = future.stream()
        .map(CompletableFuture::join)
        .collect(toList());
    long end = System.currentTimeMillis();

    System.out.printf("The operation took %s ms%n", end - start);
    System.out.println("TransactionExecutor are: " + categories);
  }
}

使用 CompletableFuture,对于 10 个线程,执行的时间也在 1 秒左右,性能简直杆杆的,运行结果如下:

The operation took 1040 ms
TransactionExecutor are:[TransactionExecutor{transactionId='transactionId: 1'},
TransactionExecutor{transactionId='transactionId: 2'},
TransactionExecutor{transactionId='transactionId: 3'},
TransactionExecutor{transactionId='transactionId: 4'},
TransactionExecutor{transactionId='transactionId: 5'},
TransactionExecutor{transactionId='transactionId: 6'},
TransactionExecutor{transactionId='transactionId: 7'},
TransactionExecutor{transactionId='transactionId: 8'},
TransactionExecutor{transactionId='transactionId: 9'},
TransactionExecutor{transactionId='transactionId: 10'}]

适用场景

CompletableFuture 适合在处理异步编程和并发任务时使用。以下是一些适合使用 CompletableFuture 的典型场景:

异步 I/O 操作

在进行 I/O 操作时,例如文件读取、数据库访问或网络请求,使用 CompletableFuture 可以使这些操作异步执行,从而避免阻塞主线程,提高应用程序的响应性。

并行处理任务

当需要并行处理多个独立任务时,使用 CompletableFuture 可以有效利用多核 CPU,提高计算效率。

流水线处理

当有一系列依赖的操作需要按顺序执行时,CompletableFuture 可以使这些操作异步执行,形成处理流水线,从而提高处理效率。

事件驱动的异步处理

在事件驱动的系统中,例如 GUI 应用或服务器请求处理,CompletableFuture 可以在事件发生时异步处理任务。

异常处理和回退机制

CompletableFuture 提供了灵活的异常处理机制,可以在异步任务发生异常时执行回退操作或提供默认值。

并发任务的组合

CompletableFuture 可以组合多个并发任务的结果,例如使用 allOf 和 anyOf 方法。

总结

在本文中,我们学习了如何在 Java 中使用 Future 接口以及它的局限性,同时,我们还学习了如何使用 CompletableFuture 类来克服 Future 的这些限制。

接着,我们通过一个业务场景演示,通过同步执行,并发执行,CompletableFuture执行来比较它们的执行效率。

最后,CompletableFuture 适合在异步编程、并发任务、非阻塞 I/O、事件驱动处理、异常处理、任务组合等场景中使用。它提供了丰富的 API 来处理异步操作,使代码更加简洁、可读和高效。通过使用 CompletableFuture,开发者可以更好地利用系统资源,提高应用程序的性能和响应性。

0条评论

您的电子邮件等信息不会被公开,以下所有项均必填

OK! You can skip this field.