java:CompletableFuture使用

1 前言

FutureTask的get()方法会造成阻塞,而轮询isDone()又会耗费CPU,jdk1.8引入了CompletableFuture,作者是Doug Lea。CompletableFuture针对Future的一些缺点进行了优化,例如回调通知,异步任务完成或产生异常,自动调用回调方法;创建异步任务,多个任务前后可组合处理等等。

在这里插入图片描述

2 使用

2.1 CompletableFuture的4个静态方法简介

CompletableFuture除了实现了Future接口,还实现了CompletionStage接口:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {

CompletableFuture不推荐使用new的方式创建对象来调用(new CompletableFuture()文档述:Creates a new incomplete CompletableFuture),一般通过主要的4个静态方法来进行调用。

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                   Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable,
                                               Executor executor) {
    return asyncRunStage(screenExecutor(executor), runnable);
}

区别在于,runAsync的参数接受的是Runnable对象,且没有返回值(Runnable的run方法本身就是没有返回值的);supplyAsync的参数接受的是Supplier对象,即生产者对象(java.util.function中,Supplier无参数,有返回值,为生产者;Consumer有参数,无返回值,为消费者;Function接收1个参数T,并有返回值R;Predicate接收1个参数T,并有boolean返回值),有返回值。Executor即线程池,可传入自定义的线程池,否则会使用默认的线程池。

其余常用的静态方法如下:

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
    return andTree(cfs, 0, cfs.length - 1);
}
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
    return orTree(cfs, 0, cfs.length - 1);
}

这里提到了自定义线程池供CompletableFuture使用,而实际开发中,线程资源最好是通过线程池提供,不要在应用中自行显式创建线程,因为线程池可以减少创建、销毁线程所消耗的时间和系统资源开销。若不使用线程池,可能造成系统创建大量的同类线程导致内存消耗完或者"过度切换"问题。并且线程池也不建议使用Executors去创建(newSingleThreadExecutor(),创建只有一个线程的线程池;newFixedThreadPool(int nThreads),创建固定大小的线程池;newCachedThreadPool(),创建一个不限制线程数量的线程池,任何提交的任务将立刻执行,但空闲线程会得到及时回收;newScheduledThreadPool(),创建一个可定期或者延时执行任务的线程池),如newFixedThreadPool,因为队列是无边界的,如果消费任务的速度慢于生产任务的速度,会一直往队列中加任务,可能导致OOM(OutOfMemory)。一般建议使用ThreadPoolExecutor创建线程池,便于开发人员明确线程池运行规则,避免产生资源耗尽的风险。

newFixedThreadPool(int nThreads)是无界队列(2^31-1):

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
corePoolSize:核心线程数
maximumPoolSize:最大线程数
keepAliveTime:生存时间
unit:时间单位
workQueue:任务队列
threadFactory:线程工厂
handler:拒绝策略

2.2 CompletableFuture的详细使用

2.2.1 CompletableFuture.runAsync使用自定义的线程池,不加锁和加锁的场景

CompletableFuture.runAsync接收参数Runnable对象,并且加上自定义的线程池:

package com.xiaoxu.thread;

import com.google.common.collect.Lists;

import javax.validation.constraints.NotNull;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

/**
 * @author xiaoxu
 * @date 2022-09-29
 * spring_boot:com.xiaoxu.thread.CompletableFutureTest2
 */
public class CompletableFutureTest2 {
    private static int i = 0;

    // 执行的cpu核数是4
    private static final int cores = Runtime.getRuntime().availableProcessors();

    private static final int limitSize = 1000000;

    private static final ReentrantLock reentrantLock = new ReentrantLock();

    private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(cores,
            cores,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(limitSize),
            new ThreadFactory() {
                final AtomicInteger a = new AtomicInteger(0);
                @Override
                public Thread newThread(@NotNull Runnable r) {
                    return new Thread(r,"xiaoxu-"+a.getAndAdd(1));
                }
            },new ThreadPoolExecutor.AbortPolicy());

    public static void runTask(){
        long start = System.currentTimeMillis();
        List<CompletableFuture<Void>> objects = Lists.newArrayList();
        for(int k=0;k<limitSize;k++){
            CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(new Runnable() {
                @Override
                public void run() {
                    i += 1;
                }
            }, executor);
            objects.add(voidCompletableFuture);
        }
        CompletableFuture.allOf(objects.toArray(new CompletableFuture[limitSize])).join();
        long end = System.currentTimeMillis();
        System.out.println(MessageFormat.format("未加锁耗时:{0}ms",(end-start)));
    }

    public static void runTaskUseLock(){
        long start = System.currentTimeMillis();
        List<CompletableFuture<Void>> objects = Lists.newArrayList();
        for(int k=0;k<limitSize;k++){
            CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(new Runnable() {
                @Override
                public void run() {
                    // 不要将获取锁的过程写在try中,如果在获取锁(自定义锁)时发生异常,
                    // 异常抛出同时,也会导致锁无故释放
                    reentrantLock.lock();
                    try {
                        i += 1;
                    }finally {
                        reentrantLock.unlock();
                    }
                }
            }, executor);
            objects.add(voidCompletableFuture);
        }
        CompletableFuture.allOf(objects.toArray(new CompletableFuture[limitSize])).join();
        long end = System.currentTimeMillis();
        System.out.println(MessageFormat.format("加锁耗时:{0}ms",(end-start)));
    }

    public static String createNumbers(int i){
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "xiaoxu"+i;
    }

    public static List<Integer> getInts(){
        List<Integer> list = new ArrayList<>();
        for (int j = 0; j < 10; j++) {
            list.add(j);
        }
        return list;
    }

    public static void runMyTask1(){
        /*
         * 使用CompletableFuture的默认线程池,且map获取后,
         * */
        List<Integer> list = getInts();
        System.out.println("开始执行runMyTask1:");
        long start = System.currentTimeMillis();

        //  相当于对每条数据串行执行,总大小10,每次createNumbers(i)耗时1秒左右,总约耗时10154ms
        List<String> collect = list.stream().map(i -> CompletableFuture.supplyAsync(() -> createNumbers(i)))
                .map(CompletableFuture::join).collect(Collectors.toList());

        System.out.println("执行完毕,总list大小是:"+collect.size());
        long end = System.currentTimeMillis();
        System.out.println("runMyTask1总耗时:"+(end-start)+"ms");
        collect.forEach(System.out::println);
        executor.shutdown();
    }

    public static void runMyTask2(){
        /*
         * 使用CompletableFuture的默认线程池
         * */
        List<Integer> list = getInts();
        System.out.println("开始执行runMyTask2:");
        long start = System.currentTimeMillis();

        // 总共10条数据,耗时仅4秒,因为核(cpu数目)是4,所以线程池是3个,每次只会执行3条数据,所以耗时4秒左右
        // CompletableFuture的默认线程池大小,Runtime.getRuntime().availableProcessors()-1(2核以上的cpu)
        List<String> collect = list.stream().map(i -> CompletableFuture.supplyAsync(() -> createNumbers(i)))
                .collect(Collectors.toList()).stream().map(CompletableFuture::join)
                .collect(Collectors.toList());
        System.out.println("执行完毕,总list大小是:"+collect.size());
        long end = System.currentTimeMillis();
        System.out.println("runMyTask2总耗时:"+(end-start)+"ms");
        collect.forEach(System.out::println);
        executor.shutdown();
    }

    public static void runMyTask3(){
        List<Integer> list = new ArrayList<>();
        for (int j = 0; j < 10; j++) {
            list.add(j);
        }
        System.out.println("开始执行:");
        long start = System.currentTimeMillis();

        // 自定义的线程池是4个线程,所以耗时是3秒左右
        List<String> collect = list.stream().map(i -> CompletableFuture.supplyAsync(() -> createNumbers(i),executor))
                .collect(Collectors.toList()).stream().map(CompletableFuture::join)
                .collect(Collectors.toList());
        System.out.println("执行完毕,总list大小是:"+collect.size());
        long end = System.currentTimeMillis();
        System.out.println("runMyTask3总耗时:"+(end-start)+"ms");
        collect.forEach(System.out::println);
        executor.shutdown();
    }

    public static int getRes(){
        System.out.println("初始的i是"+i);
        runTask();
        executor.shutdown();
        System.out.println("最终i的结果是"+i);
        return i;
    }

    public static int getResForLock(){
        System.out.println("初始的i是"+i);
        runTaskUseLock();
        executor.shutdown();
        System.out.println("最终i的结果是"+i);
        return i;
    }

    public static void main(String[] args) {
        getRes();
//        getResForLock();
//        runMyTask1();
//        runMyTask2();
//        runMyTask3();
    }
}

执行结果如下:

初始的i是0
未加锁耗时:312ms
最终i的结果是998943

由结果可知,因为i+=、i-=等操作是线程不安全的(非原子操作),所以不加锁,会导致执行完1000000次,数据结果可能不正确,故而执行下列第2种方式,使用可重入锁ReentrantLock来加锁,ReentrantLock可替代synchronized关键字,且因为加锁是编程式,更加方便(下述代码使用的是new ReentrantLock(),即非公平锁):

public static void main(String[] args) {
//        getRes();
    getResForLock();
//        runMyTask1();
//        runMyTask2();
//        runMyTask3();
}

执行结果如下:

初始的i是0
加锁耗时:341ms
最终i的结果是1000000

可见,加锁后,数据已正确,不过耗时相对于不加锁会增加一些。

注意:不要将获取锁的过程写在try中,如果在获取锁(自定义锁)时发生异常,异常抛出同时,也会导致锁无故释放,如下简单演示try、finally:

package com.xiaoxu.demo;

/**
 * @author xiaoxu
 * @date 2022-10-01 13:03
 * spring_boot:com.xiaoxu.demo.TestDemo
 */
public class TestDemo {

    public static String tryThing(boolean isThrow){
        System.out.println("首先进入");
        if(isThrow){
            throw new RuntimeException("try代码块前抛出异常,不进入finally代码块");
        }
        try{
            System.out.println("进入try");
            throw new RuntimeException("try代码块中抛出异常,会进入finally代码块");
        }
        finally{
            System.out.println("进入finally,执行锁释放");
        }
    }

    public static void main(String[] args) {
        System.out.println(tryThing(false));
//        System.out.println(tryThing(true));
    }
}

异常在try中发生,finally代码块执行:

首先进入
进入try
进入finally,执行锁释放
Exception in thread "main" java.lang.RuntimeException: try代码块中抛出异常,会进入finally代码块
	at com.xiaoxu.demo.TestDemo.tryThing(TestDemo.java:17)
	at com.xiaoxu.demo.TestDemo.main(TestDemo.java:25)

异常在try代码块前发生:

public static void main(String[] args) {
//        System.out.println(tryThing(false));
    System.out.println(tryThing(true));
}
首先进入
Exception in thread "main" java.lang.RuntimeException: try代码块前抛出异常,不进入finally代码块
	at com.xiaoxu.demo.TestDemo.tryThing(TestDemo.java:13)
	at com.xiaoxu.demo.TestDemo.main(TestDemo.java:26)

2.2.2 CompletableFuture.supplyAsync使用默认的线程池,map串行阻塞

执行如下代码:

public static void main(String[] args) {
//        getRes();
//        getResForLock();
    runMyTask1();
//        runMyTask2();
//        runMyTask3();
}

执行结果如下:

开始执行runMyTask1:
执行完毕,总list大小是:10
runMyTask1总耗时:10072ms
xiaoxu0
xiaoxu1
xiaoxu2
xiaoxu3
xiaoxu4
xiaoxu5
xiaoxu6
xiaoxu7
xiaoxu8
xiaoxu9

可见,list.stream().map(i -> CompletableFuture.supplyAsync(() -> createNumbers(i))).map(CompletableFuture::join).collect(Collectors.toList())的流式方式,是串行执行的10条数据,所以执行时间约莫10秒。

其实上述的操作方式,可改为如下的形式,效果是一致的:

public static void runMyTask1(){
    /*
     * 使用CompletableFuture的默认线程池,且map获取后,
     * */
    List<Integer> list = getInts();
    System.out.println("开始执行runMyTask1:");
    long start = System.currentTimeMillis();

    //  相当于对每条数据串行执行,总大小10,每次createNumbers(i)耗时1秒左右,总约耗时10154ms
    //        List<String> collect = list.stream().map(i -> CompletableFuture.supplyAsync(() -> createNumbers(i)))
    //                .map(CompletableFuture::join).collect(Collectors.toList());

    List<String> collect = list.stream().map(i ->
                CompletableFuture.supplyAsync(() -> createNumbers(i)).join()
            ).collect(Collectors.toList());
    
    System.out.println("执行完毕,总list大小是:"+collect.size());
    long end = System.currentTimeMillis();
    System.out.println("runMyTask1总耗时:"+(end-start)+"ms");
    collect.forEach(System.out::println);
    executor.shutdown();
}

再次执行结果如下:

开始执行runMyTask1:
执行完毕,总list大小是:10
runMyTask1总耗时:10071ms
xiaoxu0
xiaoxu1
xiaoxu2
xiaoxu3
xiaoxu4
xiaoxu5
xiaoxu6
xiaoxu7
xiaoxu8
xiaoxu9

2.2.3 CompletableFuture.supplyAsync使用默认的线程池,map并行阻塞执行(异步执行全部成功)

那么如何优化map的流式串行执行呢?因为java的Stream中,有延迟计算的特点。map即是延迟计算的(因为map返回Stream< R >对象,可简单认为,返回Stream对象的,都具有延迟计算的特点),故而可修改为如下代码,相当于先for循环获取全部的CompletableFuture.supplyAsync任务,存入1个list中(此时为List<CompletableFuture< String >>(同理,因为CompletableFuture.supplyAsync的返回值是CompletableFuture< U >,可以理解也是延迟执行的),还没开始执行,再使用流式阻塞执行list中的任务,即可达到并行执行的效果,提高执行的效率):

list.stream().map(i -> CompletableFuture.supplyAsync(() -> createNumbers(i)))
                .collect(Collectors.toList()).stream().map(CompletableFuture::join)
                .collect(Collectors.toList())

执行的代码如下:

public static void main(String[] args) {
//        getRes();
//        getResForLock();
//        runMyTask1();
    runMyTask2();
//        runMyTask3();
}

结果如下所示,因为使用的是CompletableFuture默认的线程池,执行时的环境,核数(cpu数)是4,CompletableFuture的默认线程池大小是3(4-1),所以10条数据分4次才能执行完(约莫4秒,相比于串行,效率大大提升):

开始执行runMyTask2:
执行完毕,总list大小是:10
runMyTask2总耗时:4069ms
xiaoxu0
xiaoxu1
xiaoxu2
xiaoxu3
xiaoxu4
xiaoxu5
xiaoxu6
xiaoxu7
xiaoxu8
xiaoxu9

修改代码为如下,list.stream().map(i -> CompletableFuture.supplyAsync(() -> createNumbers(i))).collect(Collectors.toList())的含义是将同步转换成可异步执行的List<CompletableFuture< String >>:

public static void runMyTask2(){
    /*
    * 使用CompletableFuture的默认线程池
    * */
    List<Integer> list = getInts();
    System.out.println("开始执行runMyTask2:");
    long start = System.currentTimeMillis();

    // 总共10条数据,耗时仅4秒,因为核(cpu数目)是4,所以线程池是3个,每次只会执行3条数据,所以耗时4秒左右
    // CompletableFuture的默认线程池大小,Runtime.getRuntime().availableProcessors()-1(2核以上的cpu)
//        List<String> collect = list.stream().map(i -> CompletableFuture.supplyAsync(() -> createNumbers(i)))
//                .collect(Collectors.toList()).stream().map(CompletableFuture::join)
//                .collect(Collectors.toList());

    List<CompletableFuture<String>> collect = list.stream().map(i -> CompletableFuture.supplyAsync(() -> createNumbers(i)))
            .collect(Collectors.toList());

    System.out.println("执行完毕,总list大小是:"+collect.size());
    long end = System.currentTimeMillis();
    System.out.println("runMyTask2总耗时:"+(end-start)+"ms");
    collect.forEach(System.out::println);
    executor.shutdown();
}

再次执行结果如下(总耗时:62ms):

开始执行runMyTask2:
执行完毕,总list大小是:10
runMyTask2总耗时:62ms
java.util.concurrent.CompletableFuture@63c12fb0[Not completed]
java.util.concurrent.CompletableFuture@b1a58a3[Not completed]
java.util.concurrent.CompletableFuture@6438a396[Not completed]
java.util.concurrent.CompletableFuture@e2144e4[Not completed]
java.util.concurrent.CompletableFuture@6477463f[Not completed]
java.util.concurrent.CompletableFuture@3d71d552[Not completed]
java.util.concurrent.CompletableFuture@1cf4f579[Not completed]
java.util.concurrent.CompletableFuture@18769467[Not completed]
java.util.concurrent.CompletableFuture@46ee7fe8[Not completed]
java.util.concurrent.CompletableFuture@7506e922[Not completed]

注意:CompletableFuture.supplyAsync(或runAsync)使用默认的线程池时,如果执行的机器是2核的,获取的ForkJoinPool.getCommonPoolParallelism()将会是1,那么会执行new ThreadPerTaskExecutor(),由代码可知,将每次新起一个线程来执行(不使用线程池,会增加创建、销毁线程所消耗的时间和系统资源开销,如果消费速度远慢于生产速度,导致任务大量堆积在队列中,可能导致内存消耗完等问题),故而CompletableFuture一般不建议使用默认线程池,建议使用自定义线程池

CompletableFuture源码可示:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(asyncPool, supplier);
}

private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

private static final boolean useCommonPool =
    (ForkJoinPool.getCommonPoolParallelism() > 1);

static final class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) { new Thread(r).start(); }
}

2.2.4 CompletableFuture.supplyAsync使用自定义的固定大小为4的线程池,map并行阻塞执行

执行代码如下:

public static void main(String[] args) {
//        getRes();
//        getResForLock();
//        runMyTask1();
//        runMyTask2();
    runMyTask3();
}

结果:

开始执行:
执行完毕,总list大小是:10
runMyTask3总耗时:3135ms
xiaoxu0
xiaoxu1
xiaoxu2
xiaoxu3
xiaoxu4
xiaoxu5
xiaoxu6
xiaoxu7
xiaoxu8
xiaoxu9

可见自定义的固定大小为4的线程池,需要分3批执行完10条数据,故而耗时约3秒。

2.2.5 CompletableFuture获取结果

获取结果的方法如下:

public T get() throws InterruptedException, ExecutionException
public T get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException
public T getNow(T valueIfAbsent) 
public T join()

get和join均为阻塞获取值,区别在于join没有显示的抛出异常,而get均显示的抛出了异常。getNow是立马获取值,如果没有马上获取到值,则默认返回valueIfAbsent。

回调方法获取值:

public CompletableFuture<T> whenComplete(
    BiConsumer<? super T, ? super Throwable> action)

使用如下方法打断后,可使用join()获取值:

public boolean complete(T value)

LockTool:

package com.xiaoxu.lock;

import java.text.MessageFormat;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
 * @author xiaoxu
 * @date 2022-10-03 14:53
 * spring_boot:com.xiaoxu.lock.LockTool
 */
public class LockTool {
    private static final long randWaitTime = 3;

    public static Supplier<String> getRandomId(boolean error){
        return ()->{
            String uuid = null;
            try {
                System.out.println(MessageFormat.format("开始获取uuid:{0}",error));
                TimeUnit.SECONDS.sleep(randWaitTime);
                if(error){
                    throw new RuntimeException("获取uuid失败");
                }
                uuid = UUID.randomUUID().toString();
            } catch (Throwable e) {
                System.out.println(e.getMessage());;
            }
            return uuid;
        };
    }

    public static void TimeSleep(long seconds){
        if(seconds<=0){
            throw new RuntimeException("秒数要>0");
        }
        try{
            TimeUnit.SECONDS.sleep(seconds);
        }catch (InterruptedException i){
            throw new RuntimeException("中断错误:"+i.getCause()+"\t"+i.getMessage());
        }
    }
}

AbstractFuture:

package com.xiaoxu.lock;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author xiaoxu
 * @date 2022-10-03 16:12
 * spring_boot:com.xiaoxu.lock.AbstractFuture
 */
public abstract class AbstractFuture {

    protected static final ThreadPoolExecutor executor;

    static {
        // guava包中ThreadFactoryBuilder可快捷创建ThreadFactory
        executor = new ThreadPoolExecutor(3,
                4,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(4),
                new ThreadFactoryBuilder().setNameFormat("xiaoxu-thread-%d").build(),
                new ThreadPoolExecutor.CallerRunsPolicy());
    }
}

CompletableFutureTestGetResult:

package com.xiaoxu.lock;


import java.text.MessageFormat;
import java.util.concurrent.*;

/**
 * @author xiaoxu
 * @date 2022-10-03 14:57
 * spring_boot:com.xiaoxu.lock.CompletableFutureTest
 */
public class CompletableFutureTestGetResult extends AbstractFuture{
   /*获取结果*/

    // 获取结果的方式
    public static void getResultForRunAndSupplyAsync(){
        // CompletableFuture.runAsync,无返回值
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("worker:"+Thread.currentThread().getName());
            LockTool.TimeSleep(3);
        },executor).whenComplete((val,throwable)->{
            //BiConsumer<? super T, ? super Throwable> action
            if(throwable==null){
                //没有报错信息
                System.out.println(MessageFormat.format("whenComplete方式获取值:{0}",
                        val));
            }
        });
        try {
            System.out.println(MessageFormat.format("Future的get()接口阻塞获取值:{0}",voidCompletableFuture.get()));
            System.out.println(MessageFormat.format("Future的get(long timeout, TimeUnit unit)接口获取值:{0}",voidCompletableFuture.get(1,TimeUnit.SECONDS)));
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
        }
        executor.shutdown();
        System.out.println("#############################");
        // CompletableFuture.supplyAsync,有返回值
        CompletableFuture<String> supplyCompletableFuture = CompletableFuture.supplyAsync(LockTool.getRandomId(false));
        // getNow(T val),如果没有立马返回值,就返回默认值val
        System.out.println(MessageFormat.format("Future的getNow:{0}",supplyCompletableFuture.getNow("我是立马返回的值(等不及)")));
        try {
            // 1秒内未返回值,则报错 TimeoutException
            System.out.println(MessageFormat.format("Future的get(long timeout, TimeUnit unit)接口获取值:{0}",supplyCompletableFuture.get(1,TimeUnit.SECONDS)));
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
        }
        try {
            // get()直接阻塞获取值,但是需要try、catch处理异常
            System.out.println(MessageFormat.format("Future的get()接口阻塞获取值:{0}",supplyCompletableFuture.get()));
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        //join获取值和get获取值,都是阻塞获取值,直到值返回;明显区别是get()有异常处理,join()没有异常处理
        System.out.println(
                MessageFormat.format("supply使用join()获取值:{0}",supplyCompletableFuture.join())
        );
        // whenComplete 获取值
        supplyCompletableFuture.whenComplete((val,exp)-> System.out.println(
                MessageFormat.format("supply使用whenComplete获取值:{0}",val)
        ));
    }

    // 获取结果的方式2:complete,打断
    public static void getResult(){
        // future是3秒返回结果
        CompletableFuture<String> future = CompletableFuture.supplyAsync(LockTool.getRandomId(false));
        // 但是1秒后就执行complete获取结果,打断成功,则返回complete中的默认值
        LockTool.TimeSleep(1);
        // 返回true,则说明打断成功
        System.out.println(future.complete("我直接打断")+"\t"+future.join());
    }

    public static void main(String[] args) {
        getResultForRunAndSupplyAsync();
//        getResult();
    }

}

执行getResultForRunAndSupplyAsync():

worker:xiaoxu-thread-0
whenComplete方式获取值:null
Futureget()接口阻塞获取值:null
Futureget(long timeout, TimeUnit unit)接口获取值:null
#############################
Future的getNow:我是立马返回的值(等不及)
开始获取uuid:false
java.util.concurrent.TimeoutException
	at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
	at com.xiaoxu.lock.CompletableFutureTestGetResult.getResultForRunAndSupplyAsync(CompletableFutureTestGetResult.java:43)
	at com.xiaoxu.lock.CompletableFutureTestGetResult.main(CompletableFutureTestGetResult.java:74)
Futureget()接口阻塞获取值:7320df2d-2ed6-48c7-a3f3-538c29c38621
supply使用join()获取值:7320df2d-2ed6-48c7-a3f3-538c29c38621
supply使用whenComplete获取值:7320df2d-2ed6-48c7-a3f3-538c29c38621

执行getResult():

开始获取uuid:false
true	我直接打断

2.2.6 CompletableFuture处理结果

thenApply:计算存在依赖关系,串行计算中某步骤报错,后面将不再执行
handle:计算存在依赖关系,串行计算中某步骤报错,可处理该问题,后面步骤依然可以执行

CompletableFutureTestHandleResult:

package com.xiaoxu.lock;

import java.util.concurrent.CompletableFuture;

/**
 * @author xiaoxu
 * @date 2022-10-03 16:16
 * spring_boot:com.xiaoxu.lock.CompletableFutureTestHandleResult
 */
public class CompletableFutureTestHandleResult extends AbstractFuture{
    /*处理结果*/

    public static int getRInt(){
        System.out.println("开始获取值,请等待...");
        LockTool.TimeSleep(2);
        return 1;
    }

    public static void handleResultForRunAndSupplyAsync(){
        // thenApply:当前步骤有异常则直接退出
        CompletableFuture.completedFuture(getRInt())
                .thenApply(val->{
                    System.out.println("step1");
                    return val+2;
                }).thenApply(val->{
                    System.out.println("step2");
                    return val+3;
                }).whenComplete((val,exp)->{
                    if(exp==null){
                        System.out.println("计算结束"+val);
                    }
                }).exceptionally(e->{
                    System.out.println("发生异常:"+e.getCause()+"\t"+e.getMessage());
                    return null;
                }).join();
        System.out.println("############");
        CompletableFuture.supplyAsync(CompletableFutureTestHandleResult::getRInt,executor)
                .thenApply(val->{
                    System.out.println("step1");
                    return val+2;
                }).thenApply(val->{
                    System.out.println("step2");
                    if(val!=99){
                        throw new RuntimeException("步骤2报错");
                    }
                    return val+3;
                }).thenApply(val->{
                    System.out.println("step3");
                    return val+4;
                }).whenComplete((val,exp)->{
                    if(exp==null){
                        System.out.println("计算结束"+val);
                    }
                }).exceptionally(e->{
                    System.out.println("发生异常:"+e.getCause()+"\t"+e.getMessage());
                    return null;
                }).join();
        executor.shutdown();
        System.out.println("############");
        CompletableFuture.supplyAsync(CompletableFutureTestHandleResult::getRInt)
                .handle((val,throwable)->{
                    System.out.println("step1-handle");
                    return val+2;
                }).handle((val,throwable)->{
                    System.out.println("step2-handle");
                    if(val!=99){
                        throw new RuntimeException("步骤2又报错");
                    }
                    return val+3;
                }).handle((val,throwable)->{
                    System.out.println("step3-handle");
                    if(val!=null){
                        // handle的上个步骤如果报错,那么这里的val获取的就是null
                        // 避免报空指针异常,需要在这里做出非空判断                       
                        return val+4;
                    }else{
                        System.out.println("handle的上个步骤报错,导致本次获取的val为null");
                        return 99;
                    }
                }).whenComplete((val,exp)->{
                    if(exp==null){
                        System.out.println("计算结束"+val);
                    }
                })
                .exceptionally(e->{
                    System.out.println("发生异常:"+e.getCause()+"\t"+e.getMessage());
                    return null;
                })
                .join();
    }

    public static void main(String[] args) {
        handleResultForRunAndSupplyAsync();
    }
}

执行结果:

开始获取值,请等待...
step1
step2
计算结束6
############
开始获取值,请等待...
step1
step2
发生异常:java.lang.RuntimeException: 步骤2报错	java.lang.RuntimeException: 步骤2报错
############
开始获取值,请等待...
step1-handle
step2-handle
step3-handle
handle的上个步骤报错,导致本次获取的val为null
计算结束99

注意:

exceptionally:thenApply没有第二个异常相关的参数传至下游(参数为Function<? super T,? extends U> fn),所以若thenApply中抛出异常,可使用exceptionally捕获到该异常;

handle/whenComplete:handle和whenComplete处理时都有第二个异常的参数(参数为BiFunction<? super T, Throwable, ? extends U> fn),用于返回下游异常的信息。比如handle中发生异常(发生异常会导致下个串行的handle没有返回值,下游接收到的返回值为null),会返回给下一个handle的第一个参数val为null,第二个throwable参数为捕获的异常相关信息,如果handle后面有whenComplete和exceptionally,且多个串行的handle均有报错,那么exceptionally获取的报错信息是最后一次报错的信息。

2.2.7 CompletableFuture消费结果

thenRun:A执行完执行B,B不需要A的结果
thenAccept:A执行完执行B,B需要A的结果,B无返回值
thenApply:A执行完执行B,B需要A的结果,B有返回值

CompletableFutureTestConsumeResult:

package com.xiaoxu.lock;

import java.util.concurrent.CompletableFuture;

/**
 * @author xiaoxu
 * @date 2022-10-04 13:11
 * spring_boot:com.xiaoxu.lock.CompletableFutureTestConsumeResult
 */
public class CompletableFutureTestConsumeResult extends AbstractFuture{
    public static void consume(){
        CompletableFuture.supplyAsync(()->1)
                .thenApply(e->e+2).thenApply(e->e+3)
                .thenAccept(System.out::println);

        // thenRun:不需要任务A的结果,所以打印出来是null,实际只是执行了任务A而已
        // 结果:null
        System.out.println(CompletableFuture.supplyAsync(LockTool.getRandomId(false))
        .thenRun(()->{}).join());
        System.out.println("&&&&&&&&&&&&&&&&&&&&&");
        //  thenAccept: 需要任务A的结果,但是因为参数是Consumer<? super T> action类型,所以不会有结果,
        //  故而打印出来的结果还是null
        // 结果:null
        System.out.println(CompletableFuture.supplyAsync(LockTool.getRandomId(false))
                .thenAccept(a-> System.out.println(a+"-uuid_suffix")).join());

        // thenApply:需要任务A的结果,且需要返回结果,类型相同,都是U
        System.out.println("#####################");
        System.out.println(CompletableFuture.supplyAsync(LockTool.getRandomId(false))
                .thenApply(a->a+"-uuid的结果").join());
    }

    public static void main(String[] args) {
        consume();
    }
}

执行结果:

6
开始获取uuid:false
null
&&&&&&&&&&&&&&&&&&&&&
开始获取uuid:false
e9ef9b96-c5b9-41f1-a335-f7d39ea66409-uuid_suffix
null
#####################
开始获取uuid:false
b3a22a6b-0b85-437c-8f03-ef66f939c725-uuid的结果

2.2.8 CompletableFuture中thenXXX和thenXXXAsync的使用区别

源码中找到形如如下的代码,可知thenXXX和thenXXXAsync的区别在于传参,thenXXX使用的是null,而thenXXXAsync使用的是asyncPool。

public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}

public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(asyncPool, fn);
}

(1)处理太快,系统优化切换原则,直接使用main线程处理

package com.xiaoxu.lock;

import java.util.concurrent.CompletableFuture;

/**
 * @author xiaoxu
 * @date 2022-10-05 15:59
 * spring_boot:com.xiaoxu.lock.CompletableFutureTestAsyncAndNoAsync
 */
public class CompletableFutureTestAsyncAndNoAsync extends AbstractFuture{

    public static void testAsyncCompare(){
        CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+" step 1");
            return "xiaoxu";
        }).thenAccept(x->{
            System.out.println(Thread.currentThread().getName()+" step 2:"+x);
        }).thenAccept(x->{
            System.out.println(Thread.currentThread().getName()+" step 3:"+x);
        }).thenAccept(x->{
            System.out.println(Thread.currentThread().getName()+" step 4:"+x);
        });
    }

    public static void main(String[] args) {
        testAsyncCompare();
    }
}
ForkJoinPool.commonPool-worker-9 step 1
main step 2:xiaoxu
main step 3:null
main step 4:null

(2)没有传入自定义的线程池,都默认使用线程池ForkJoinPool

package com.xiaoxu.lock;

import java.util.concurrent.CompletableFuture;

/**
 * @author xiaoxu
 * @date 2022-10-05 15:59
 * spring_boot:com.xiaoxu.lock.CompletableFutureTestAsyncAndNoAsync
 */
public class CompletableFutureTestAsyncAndNoAsync extends AbstractFuture{

    public static void testAsyncCompare(){
        CompletableFuture.supplyAsync(()->{
            LockTool.TimeSleep(1);
            System.out.println(Thread.currentThread().getName()+" step 1");
            return "xiaoxu";
        }).thenAccept(x->{
            System.out.println(Thread.currentThread().getName()+" step 2:"+x);
        }).thenAccept(x->{
            System.out.println(Thread.currentThread().getName()+" step 3:"+x);
        }).thenAccept(x->{
            System.out.println(Thread.currentThread().getName()+" step 4:"+x);
        }).join();
    }

    public static void main(String[] args) {
        testAsyncCompare();
    }
}
ForkJoinPool.commonPool-worker-9 step 1
ForkJoinPool.commonPool-worker-9 step 2:xiaoxu
ForkJoinPool.commonPool-worker-9 step 3:null
ForkJoinPool.commonPool-worker-9 step 4:null

(3)执行第1个任务时,传入自定义的线程池,调用thenXXX执行第二个任务时,则第二个和第一个任务共用同一个线程池;若是调用thenXXXAsync执行第二个任务,则第一个任务是使用自定义线程池,第二个任务使用的是ForkJoinPool:

package com.xiaoxu.lock;

import java.util.concurrent.CompletableFuture;

/**
 * @author xiaoxu
 * @date 2022-10-05 15:59
 * spring_boot:com.xiaoxu.lock.CompletableFutureTestAsyncAndNoAsync
 */
public class CompletableFutureTestAsyncAndNoAsync extends AbstractFuture{

    public static void testAsyncCompare(){
        CompletableFuture.supplyAsync(()->{
            LockTool.TimeSleep(1);
            System.out.println(Thread.currentThread().getName()+" step 1");
            return "xiaoxu";
        },executor).thenAccept(x->{
            System.out.println(Thread.currentThread().getName()+" step 2:"+x);
        }).thenAccept(x->{
            System.out.println(Thread.currentThread().getName()+" step 3:"+x);
        }).thenAccept(x->{
            System.out.println(Thread.currentThread().getName()+" step 4:"+x);
        }).join();
        executor.shutdown();
    }

    public static void main(String[] args) {
        testAsyncCompare();
    }
}
xiaoxu-thread-0 step 1
xiaoxu-thread-0 step 2:xiaoxu
xiaoxu-thread-0 step 3:null
xiaoxu-thread-0 step 4:null
package com.xiaoxu.lock;

import java.util.concurrent.CompletableFuture;

/**
 * @author xiaoxu
 * @date 2022-10-05 15:59
 * spring_boot:com.xiaoxu.lock.CompletableFutureTestAsyncAndNoAsync
 */
public class CompletableFutureTestAsyncAndNoAsync extends AbstractFuture{

    public static void testAsyncCompare(){
        CompletableFuture.supplyAsync(()->{
            LockTool.TimeSleep(1);
            System.out.println(Thread.currentThread().getName()+" step 1");
            return "xiaoxu";
        },executor).thenAccept(x->{
            System.out.println(Thread.currentThread().getName()+" step 2:"+x);
        }).thenAcceptAsync(x->{
            System.out.println(Thread.currentThread().getName()+" step 3:"+x);
        }).thenAccept(x->{
            System.out.println(Thread.currentThread().getName()+" step 4:"+x);
        }).join();
        executor.shutdown();
    }

    public static void main(String[] args) {
        testAsyncCompare();
    }
}
xiaoxu-thread-0 step 1
xiaoxu-thread-0 step 2:xiaoxu
ForkJoinPool.commonPool-worker-9 step 3:null
ForkJoinPool.commonPool-worker-9 step 4:null

thenRun(Async)、thenAccept(Async)、thenApply(Async)均同理。

2.2.9 CompletableFuture计算速度选用

package com.xiaoxu.lock;

import java.util.concurrent.CompletableFuture;

/**
 * @author xiaoxu
 * @date 2022-10-07 17:04
 * spring_boot:com.xiaoxu.lock.CompletableFutureTestSpeed
 */
public class CompletableFutureTestSpeed extends AbstractFuture{

    public static void run(){
        // a和b,谁先返回值,则取谁
        // CompletableFuture.applyToEither
        CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
            System.out.println("Boy A start");
            LockTool.TimeSleep(1);
            return "BoyA";
        });
        CompletableFuture<String> b = CompletableFuture.supplyAsync(()->{
            System.out.println("Boy B start");
            LockTool.TimeSleep(2);
            return "BoyB";
        });
        CompletableFuture<String> res = a.applyToEither(b, x -> x + " win!");
        System.out.println(Thread.currentThread().getName()+"\t"+res.join());
    }

    public static void main(String[] args) {
        run();
    }
}
Boy A start
Boy B start
main	BoyA win!

2.2.10 CompletableFuture计算结果合并

thenCombine:任务都完成后,最终把两个任务的结果一起交于thenCombine处理,先完成的需要等待,直到任务均完成。

package com.xiaoxu.lock;

import java.util.concurrent.CompletableFuture;

/**
 * @author xiaoxu
 * @date 2022-10-07 17:27
 * spring_boot:com.xiaoxu.lock.CompletableFutureTestCombine
 */
public class CompletableFutureTestCombine extends AbstractFuture{
    public static void run(){
        CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> {
            System.out.println("A开始计算值");
            LockTool.TimeSleep(2);
            return 15;
        });
        CompletableFuture<Integer> b = CompletableFuture.supplyAsync(() -> {
            System.out.println("B开始计算值");
            LockTool.TimeSleep(3);
            return 30;
        });
        CompletableFuture<Integer> f = a.thenCombine(b, (a1, b1) -> {
            System.out.println("值1:" + a1 + ";值2:" + b1);
            return a1 + b1;
        });
        System.out.println("最终计算出的结果:"+f.join());
    }

    public static void main(String[] args) {
        run();
    }
}
A开始计算值
B开始计算值
值1:15;2:30
最终计算出的结果:45

2.2.11 CompletableFuture计算结果合并方式2(thenCompose,与2.2.10比较)

package com.xiaoxu.lock;

import java.util.concurrent.CompletableFuture;

/**
 * @author xiaoxu
 * @date 2022-10-07 18:23
 * spring_boot:com.xiaoxu.lock.CompletableFutureTestCompose
 */
public class CompletableFutureTestCompose extends AbstractFuture{
    public static void run(){
        long start = System.currentTimeMillis();
        CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> {
            System.out.println("A开始计算值");
            LockTool.TimeSleep(4);
            return 15;
        });
        CompletableFuture<Integer> b = CompletableFuture.supplyAsync(() -> {
            System.out.println("B开始计算值");
            LockTool.TimeSleep(2);
            return 30;
        });
        CompletableFuture<Integer> f = a.thenCombine(b, (a1, b1) -> {
            System.out.println("值1:" + a1 + ";值2:" + b1);
            return a1 + b1;
        });
        System.out.println("thenCombine最终计算出的结果:"+f.join());
        long end = System.currentTimeMillis();
        System.out.println("thenCombine耗时:"+(end-start)+"ms");
    }

    public static void run2(){
        long start = System.currentTimeMillis();
        CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> {
            System.out.println("A开始计算值");
            LockTool.TimeSleep(4);
            return 15;
        });
        CompletableFuture<Integer> b = CompletableFuture.supplyAsync(() -> {
            System.out.println("B开始计算值");
            LockTool.TimeSleep(2);
            return 30;
        });
        CompletableFuture<Integer> f = a.thenCompose(a1 -> {
            System.out.println("值1:" + a1);
            System.out.println("现在开始计算b1的值:");
            // thenCompose 返回的需要是一个CompletionStage,即CompletableFuture<Integer> b
            return b.thenApply(bRes->{
                System.out.println("thenCompose获取到b的结果:"+bRes);
                return a1+bRes;
            });
        });
        System.out.println("thenCompose最终计算出的结果:"+f.join());
        long end = System.currentTimeMillis();
        System.out.println("thenCompose耗时:"+(end-start)+"ms");
    }

    public static void main(String[] args) {
        run();
//        run2();
    }
}
A开始计算值
B开始计算值
值1:15;2:30
thenCombine最终计算出的结果:45
thenCombine耗时:4052ms

使用thenCompose合并结果:

public static void main(String[] args) {
//        run();
    run2();
}
A开始计算值
B开始计算值
值1:15
现在开始计算b1的值:
thenCompose获取到b的结果:30
thenCompose最终计算出的结果:45
thenCompose耗时:4053ms

可见上述两种方式thenCombine和thenCompose合并结果,耗时相差无几,结果一致。

2.2.12 CompletableFuture的allOf和anyOf

package com.xiaoxu.lock;

import com.google.common.collect.Lists;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
 * @author xiaoxu
 * @date 2022-10-07 17:40
 * spring_boot:com.xiaoxu.lock.CompletableFutureTestOf
 */
public class CompletableFutureTestOf extends AbstractFuture{
    public static void testAllOf(){
        System.out.println("allOf开始执行:");
        CompletableFuture<Void> a = CompletableFuture.runAsync(() -> {
            System.out.println("all-A开始计算");
            LockTool.TimeSleep(2);
            System.out.println("all-A获得结果45");
        });
        CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
            System.out.println("all-B开始计算");
            LockTool.TimeSleep(3);
            System.out.println("all-B获得结果30");
        });
        CompletableFuture<Void> c = CompletableFuture.runAsync(() -> {
            System.out.println("all-C开始计算");
            LockTool.TimeSleep(1);
            System.out.println("all-C获得结果100");
        });
        // CompletableFuture.allOf:全部都执行完
        CompletableFuture.allOf(a,b,c).join();
    }

    public static void testAnyOf(){
        System.out.println("anyOf开始执行:");
        CompletableFuture<Void> a = CompletableFuture.runAsync(() -> {
            System.out.println("any-A开始计算");
            LockTool.TimeSleep(2);
            System.out.println("any-A获得结果45");
        });
        CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
            System.out.println("any-B开始计算");
            LockTool.TimeSleep(3);
            System.out.println("any-B获得结果30");
        });
        CompletableFuture<Void> c = CompletableFuture.runAsync(() -> {
            System.out.println("any-C开始计算");
            LockTool.TimeSleep(1);
            System.out.println("any-C获得结果100");
        });
        // CompletableFuture.anyOf:1个执行完则完成
        CompletableFuture.anyOf(a,b,c).join();
    }

    public static void main(String[] args) {
        testAnyOf();
        testAllOf();
    }
}

anyOf在前,allOf在后:执行结果:

anyOf开始执行:
any-A开始计算
any-B开始计算
any-C开始计算
any-C获得结果100
allOf开始执行:
all-A开始计算
all-B开始计算
all-C开始计算
any-A获得结果45
all-C获得结果100
any-B获得结果30
all-A获得结果45
all-B获得结果30

在这里插入图片描述
在这里插入图片描述
allOf则和anyOf相反,allOf必须全部执行完,才会执行后续代码:

修改如下执行顺序,先执行allOf,再执行anyOf:

public static void main(String[] args) {
    testAllOf();
    testAnyOf();
}

执行结果如下(因为CompletableFuture默认的ForkJoinPool线程池,主线程结束时会shutdown默认线程池,故而anyOf只获取到C时,交由后续主线程执行,主线程立马结束,故而只打印了any-C的结果):

allOf开始执行:
all-A开始计算
all-B开始计算
all-C开始计算
all-C获得结果100
all-A获得结果45
all-B获得结果30
anyOf开始执行:
any-A开始计算
any-B开始计算
any-C开始计算
any-C获得结果100

在这里插入图片描述
修改如下:

public static void main(String[] args) {
    testAllOf();
    testAnyOf();
    LockTool.TimeSleep(5);
}

结果如下所示:

allOf开始执行:
all-A开始计算
all-B开始计算
all-C开始计算
all-C获得结果100
all-A获得结果45
all-B获得结果30
anyOf开始执行:
any-A开始计算
any-B开始计算
any-C开始计算
any-C获得结果100
any-A获得结果45
any-B获得结果30

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐