java:CompletableFuture使用
1 前言FutureTask的get()方法会造成阻塞,而轮询isDone()又会耗费CPU,jdk1.8引入了CompletableFuture,作者是Doug Lea。CompletableFuture针对Future的一些缺点进行了优化,例如回调通知,异步任务完成或产生异常,自动调用回调方法;创建异步任务,多个任务前后可组合处理等等。2 使用2.1 CompletableFuture的4个静
java:CompletableFuture使用
1 前言
FutureTask的get()方法会造成阻塞,而轮询isDone()又会耗费CPU,jdk1.8引入了CompletableFuture,作者是Doug Lea。CompletableFuture针对Future的一些缺点进行了优化,例如回调通知,异步任务完成或产生异常,自动调用回调方法;创建异步任务,多个任务前后可组合处理等等。
2 使用
2.1 CompletableFuture的4个静态方法简介
CompletableFuture除了实现了Future接口,还实现了CompletionStage接口:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
CompletableFuture不推荐使用new的方式创建对象来调用(new CompletableFuture()文档述:Creates a new incomplete CompletableFuture),一般通过主要的4个静态方法来进行调用。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
区别在于,runAsync的参数接受的是Runnable对象,且没有返回值(Runnable的run方法本身就是没有返回值的);supplyAsync的参数接受的是Supplier对象,即生产者对象(java.util.function中,Supplier无参数,有返回值,为生产者;Consumer有参数,无返回值,为消费者;Function接收1个参数T,并有返回值R;Predicate接收1个参数T,并有boolean返回值),有返回值。Executor即线程池,可传入自定义的线程池,否则会使用默认的线程池。
其余常用的静态方法如下:
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}
这里提到了自定义线程池供CompletableFuture使用,而实际开发中,线程资源最好是通过线程池提供,不要在应用中自行显式创建线程,因为线程池可以减少创建、销毁线程所消耗的时间和系统资源开销。若不使用线程池,可能造成系统创建大量的同类线程导致内存消耗完或者"过度切换"问题。并且线程池也不建议使用Executors去创建(newSingleThreadExecutor(),创建只有一个线程的线程池;newFixedThreadPool(int nThreads),创建固定大小的线程池;newCachedThreadPool(),创建一个不限制线程数量的线程池,任何提交的任务将立刻执行,但空闲线程会得到及时回收;newScheduledThreadPool(),创建一个可定期或者延时执行任务的线程池),如newFixedThreadPool,因为队列是无边界的,如果消费任务的速度慢于生产任务的速度,会一直往队列中加任务,可能导致OOM(OutOfMemory)。一般建议使用ThreadPoolExecutor创建线程池,便于开发人员明确线程池运行规则,避免产生资源耗尽的风险。
newFixedThreadPool(int nThreads)是无界队列(2^31-1):
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize:核心线程数
maximumPoolSize:最大线程数
keepAliveTime:生存时间
unit:时间单位
workQueue:任务队列
threadFactory:线程工厂
handler:拒绝策略
2.2 CompletableFuture的详细使用
2.2.1 CompletableFuture.runAsync使用自定义的线程池,不加锁和加锁的场景
CompletableFuture.runAsync接收参数Runnable对象,并且加上自定义的线程池:
package com.xiaoxu.thread;
import com.google.common.collect.Lists;
import javax.validation.constraints.NotNull;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
/**
* @author xiaoxu
* @date 2022-09-29
* spring_boot:com.xiaoxu.thread.CompletableFutureTest2
*/
public class CompletableFutureTest2 {
private static int i = 0;
// 执行的cpu核数是4
private static final int cores = Runtime.getRuntime().availableProcessors();
private static final int limitSize = 1000000;
private static final ReentrantLock reentrantLock = new ReentrantLock();
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(cores,
cores,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(limitSize),
new ThreadFactory() {
final AtomicInteger a = new AtomicInteger(0);
@Override
public Thread newThread(@NotNull Runnable r) {
return new Thread(r,"xiaoxu-"+a.getAndAdd(1));
}
},new ThreadPoolExecutor.AbortPolicy());
public static void runTask(){
long start = System.currentTimeMillis();
List<CompletableFuture<Void>> objects = Lists.newArrayList();
for(int k=0;k<limitSize;k++){
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
i += 1;
}
}, executor);
objects.add(voidCompletableFuture);
}
CompletableFuture.allOf(objects.toArray(new CompletableFuture[limitSize])).join();
long end = System.currentTimeMillis();
System.out.println(MessageFormat.format("未加锁耗时:{0}ms",(end-start)));
}
public static void runTaskUseLock(){
long start = System.currentTimeMillis();
List<CompletableFuture<Void>> objects = Lists.newArrayList();
for(int k=0;k<limitSize;k++){
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
// 不要将获取锁的过程写在try中,如果在获取锁(自定义锁)时发生异常,
// 异常抛出同时,也会导致锁无故释放
reentrantLock.lock();
try {
i += 1;
}finally {
reentrantLock.unlock();
}
}
}, executor);
objects.add(voidCompletableFuture);
}
CompletableFuture.allOf(objects.toArray(new CompletableFuture[limitSize])).join();
long end = System.currentTimeMillis();
System.out.println(MessageFormat.format("加锁耗时:{0}ms",(end-start)));
}
public static String createNumbers(int i){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "xiaoxu"+i;
}
public static List<Integer> getInts(){
List<Integer> list = new ArrayList<>();
for (int j = 0; j < 10; j++) {
list.add(j);
}
return list;
}
public static void runMyTask1(){
/*
* 使用CompletableFuture的默认线程池,且map获取后,
* */
List<Integer> list = getInts();
System.out.println("开始执行runMyTask1:");
long start = System.currentTimeMillis();
// 相当于对每条数据串行执行,总大小10,每次createNumbers(i)耗时1秒左右,总约耗时10154ms
List<String> collect = list.stream().map(i -> CompletableFuture.supplyAsync(() -> createNumbers(i)))
.map(CompletableFuture::join).collect(Collectors.toList());
System.out.println("执行完毕,总list大小是:"+collect.size());
long end = System.currentTimeMillis();
System.out.println("runMyTask1总耗时:"+(end-start)+"ms");
collect.forEach(System.out::println);
executor.shutdown();
}
public static void runMyTask2(){
/*
* 使用CompletableFuture的默认线程池
* */
List<Integer> list = getInts();
System.out.println("开始执行runMyTask2:");
long start = System.currentTimeMillis();
// 总共10条数据,耗时仅4秒,因为核(cpu数目)是4,所以线程池是3个,每次只会执行3条数据,所以耗时4秒左右
// CompletableFuture的默认线程池大小,Runtime.getRuntime().availableProcessors()-1(2核以上的cpu)
List<String> collect = list.stream().map(i -> CompletableFuture.supplyAsync(() -> createNumbers(i)))
.collect(Collectors.toList()).stream().map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println("执行完毕,总list大小是:"+collect.size());
long end = System.currentTimeMillis();
System.out.println("runMyTask2总耗时:"+(end-start)+"ms");
collect.forEach(System.out::println);
executor.shutdown();
}
public static void runMyTask3(){
List<Integer> list = new ArrayList<>();
for (int j = 0; j < 10; j++) {
list.add(j);
}
System.out.println("开始执行:");
long start = System.currentTimeMillis();
// 自定义的线程池是4个线程,所以耗时是3秒左右
List<String> collect = list.stream().map(i -> CompletableFuture.supplyAsync(() -> createNumbers(i),executor))
.collect(Collectors.toList()).stream().map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println("执行完毕,总list大小是:"+collect.size());
long end = System.currentTimeMillis();
System.out.println("runMyTask3总耗时:"+(end-start)+"ms");
collect.forEach(System.out::println);
executor.shutdown();
}
public static int getRes(){
System.out.println("初始的i是"+i);
runTask();
executor.shutdown();
System.out.println("最终i的结果是"+i);
return i;
}
public static int getResForLock(){
System.out.println("初始的i是"+i);
runTaskUseLock();
executor.shutdown();
System.out.println("最终i的结果是"+i);
return i;
}
public static void main(String[] args) {
getRes();
// getResForLock();
// runMyTask1();
// runMyTask2();
// runMyTask3();
}
}
执行结果如下:
初始的i是0
未加锁耗时:312ms
最终i的结果是998943
由结果可知,因为i+=、i-=等操作是线程不安全的(非原子操作),所以不加锁,会导致执行完1000000次,数据结果可能不正确,故而执行下列第2种方式,使用可重入锁ReentrantLock来加锁,ReentrantLock可替代synchronized关键字,且因为加锁是编程式,更加方便(下述代码使用的是new ReentrantLock(),即非公平锁):
public static void main(String[] args) {
// getRes();
getResForLock();
// runMyTask1();
// runMyTask2();
// runMyTask3();
}
执行结果如下:
初始的i是0
加锁耗时:341ms
最终i的结果是1000000
可见,加锁后,数据已正确,不过耗时相对于不加锁会增加一些。
注意:不要将获取锁的过程写在try中,如果在获取锁(自定义锁)时发生异常,异常抛出同时,也会导致锁无故释放,如下简单演示try、finally:
package com.xiaoxu.demo;
/**
* @author xiaoxu
* @date 2022-10-01 13:03
* spring_boot:com.xiaoxu.demo.TestDemo
*/
public class TestDemo {
public static String tryThing(boolean isThrow){
System.out.println("首先进入");
if(isThrow){
throw new RuntimeException("try代码块前抛出异常,不进入finally代码块");
}
try{
System.out.println("进入try");
throw new RuntimeException("try代码块中抛出异常,会进入finally代码块");
}
finally{
System.out.println("进入finally,执行锁释放");
}
}
public static void main(String[] args) {
System.out.println(tryThing(false));
// System.out.println(tryThing(true));
}
}
异常在try中发生,finally代码块执行:
首先进入
进入try
进入finally,执行锁释放
Exception in thread "main" java.lang.RuntimeException: try代码块中抛出异常,会进入finally代码块
at com.xiaoxu.demo.TestDemo.tryThing(TestDemo.java:17)
at com.xiaoxu.demo.TestDemo.main(TestDemo.java:25)
异常在try代码块前发生:
public static void main(String[] args) {
// System.out.println(tryThing(false));
System.out.println(tryThing(true));
}
首先进入
Exception in thread "main" java.lang.RuntimeException: try代码块前抛出异常,不进入finally代码块
at com.xiaoxu.demo.TestDemo.tryThing(TestDemo.java:13)
at com.xiaoxu.demo.TestDemo.main(TestDemo.java:26)
2.2.2 CompletableFuture.supplyAsync使用默认的线程池,map串行阻塞
执行如下代码:
public static void main(String[] args) {
// getRes();
// getResForLock();
runMyTask1();
// runMyTask2();
// runMyTask3();
}
执行结果如下:
开始执行runMyTask1:
执行完毕,总list大小是:10
runMyTask1总耗时:10072ms
xiaoxu0
xiaoxu1
xiaoxu2
xiaoxu3
xiaoxu4
xiaoxu5
xiaoxu6
xiaoxu7
xiaoxu8
xiaoxu9
可见,list.stream().map(i -> CompletableFuture.supplyAsync(() -> createNumbers(i))).map(CompletableFuture::join).collect(Collectors.toList())的流式方式,是串行执行的10条数据,所以执行时间约莫10秒。
其实上述的操作方式,可改为如下的形式,效果是一致的:
public static void runMyTask1(){
/*
* 使用CompletableFuture的默认线程池,且map获取后,
* */
List<Integer> list = getInts();
System.out.println("开始执行runMyTask1:");
long start = System.currentTimeMillis();
// 相当于对每条数据串行执行,总大小10,每次createNumbers(i)耗时1秒左右,总约耗时10154ms
// List<String> collect = list.stream().map(i -> CompletableFuture.supplyAsync(() -> createNumbers(i)))
// .map(CompletableFuture::join).collect(Collectors.toList());
List<String> collect = list.stream().map(i ->
CompletableFuture.supplyAsync(() -> createNumbers(i)).join()
).collect(Collectors.toList());
System.out.println("执行完毕,总list大小是:"+collect.size());
long end = System.currentTimeMillis();
System.out.println("runMyTask1总耗时:"+(end-start)+"ms");
collect.forEach(System.out::println);
executor.shutdown();
}
再次执行结果如下:
开始执行runMyTask1:
执行完毕,总list大小是:10
runMyTask1总耗时:10071ms
xiaoxu0
xiaoxu1
xiaoxu2
xiaoxu3
xiaoxu4
xiaoxu5
xiaoxu6
xiaoxu7
xiaoxu8
xiaoxu9
2.2.3 CompletableFuture.supplyAsync使用默认的线程池,map并行阻塞执行(异步执行全部成功)
那么如何优化map的流式串行执行呢?因为java的Stream中,有延迟计算的特点。map即是延迟计算的(因为map返回Stream< R >对象,可简单认为,返回Stream对象的,都具有延迟计算的特点),故而可修改为如下代码,相当于先for循环获取全部的CompletableFuture.supplyAsync任务,存入1个list中(此时为List<CompletableFuture< String >>(同理,因为CompletableFuture.supplyAsync的返回值是CompletableFuture< U >,可以理解也是延迟执行的),还没开始执行,再使用流式阻塞执行list中的任务,即可达到并行执行的效果,提高执行的效率):
list.stream().map(i -> CompletableFuture.supplyAsync(() -> createNumbers(i)))
.collect(Collectors.toList()).stream().map(CompletableFuture::join)
.collect(Collectors.toList())
执行的代码如下:
public static void main(String[] args) {
// getRes();
// getResForLock();
// runMyTask1();
runMyTask2();
// runMyTask3();
}
结果如下所示,因为使用的是CompletableFuture默认的线程池,执行时的环境,核数(cpu数)是4,CompletableFuture的默认线程池大小是3(4-1),所以10条数据分4次才能执行完(约莫4秒,相比于串行,效率大大提升):
开始执行runMyTask2:
执行完毕,总list大小是:10
runMyTask2总耗时:4069ms
xiaoxu0
xiaoxu1
xiaoxu2
xiaoxu3
xiaoxu4
xiaoxu5
xiaoxu6
xiaoxu7
xiaoxu8
xiaoxu9
修改代码为如下,list.stream().map(i -> CompletableFuture.supplyAsync(() -> createNumbers(i))).collect(Collectors.toList())的含义是将同步转换成可异步执行的List<CompletableFuture< String >>:
public static void runMyTask2(){
/*
* 使用CompletableFuture的默认线程池
* */
List<Integer> list = getInts();
System.out.println("开始执行runMyTask2:");
long start = System.currentTimeMillis();
// 总共10条数据,耗时仅4秒,因为核(cpu数目)是4,所以线程池是3个,每次只会执行3条数据,所以耗时4秒左右
// CompletableFuture的默认线程池大小,Runtime.getRuntime().availableProcessors()-1(2核以上的cpu)
// List<String> collect = list.stream().map(i -> CompletableFuture.supplyAsync(() -> createNumbers(i)))
// .collect(Collectors.toList()).stream().map(CompletableFuture::join)
// .collect(Collectors.toList());
List<CompletableFuture<String>> collect = list.stream().map(i -> CompletableFuture.supplyAsync(() -> createNumbers(i)))
.collect(Collectors.toList());
System.out.println("执行完毕,总list大小是:"+collect.size());
long end = System.currentTimeMillis();
System.out.println("runMyTask2总耗时:"+(end-start)+"ms");
collect.forEach(System.out::println);
executor.shutdown();
}
再次执行结果如下(总耗时:62ms):
开始执行runMyTask2:
执行完毕,总list大小是:10
runMyTask2总耗时:62ms
java.util.concurrent.CompletableFuture@63c12fb0[Not completed]
java.util.concurrent.CompletableFuture@b1a58a3[Not completed]
java.util.concurrent.CompletableFuture@6438a396[Not completed]
java.util.concurrent.CompletableFuture@e2144e4[Not completed]
java.util.concurrent.CompletableFuture@6477463f[Not completed]
java.util.concurrent.CompletableFuture@3d71d552[Not completed]
java.util.concurrent.CompletableFuture@1cf4f579[Not completed]
java.util.concurrent.CompletableFuture@18769467[Not completed]
java.util.concurrent.CompletableFuture@46ee7fe8[Not completed]
java.util.concurrent.CompletableFuture@7506e922[Not completed]
注意:CompletableFuture.supplyAsync(或runAsync)使用默认的线程池时,如果执行的机器是2核的,获取的ForkJoinPool.getCommonPoolParallelism()将会是1,那么会执行new ThreadPerTaskExecutor(),由代码可知,将每次新起一个线程来执行(不使用线程池,会增加创建、销毁线程所消耗的时间和系统资源开销,如果消费速度远慢于生产速度,导致任务大量堆积在队列中,可能导致内存消耗完等问题),故而CompletableFuture一般不建议使用默认线程池,建议使用自定义线程池。
CompletableFuture源码可示:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
static final class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) { new Thread(r).start(); }
}
2.2.4 CompletableFuture.supplyAsync使用自定义的固定大小为4的线程池,map并行阻塞执行
执行代码如下:
public static void main(String[] args) {
// getRes();
// getResForLock();
// runMyTask1();
// runMyTask2();
runMyTask3();
}
结果:
开始执行:
执行完毕,总list大小是:10
runMyTask3总耗时:3135ms
xiaoxu0
xiaoxu1
xiaoxu2
xiaoxu3
xiaoxu4
xiaoxu5
xiaoxu6
xiaoxu7
xiaoxu8
xiaoxu9
可见自定义的固定大小为4的线程池,需要分3批执行完10条数据,故而耗时约3秒。
2.2.5 CompletableFuture获取结果
获取结果的方法如下:
public T get() throws InterruptedException, ExecutionException
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException
public T getNow(T valueIfAbsent)
public T join()
get和join均为阻塞获取值,区别在于join没有显示的抛出异常,而get均显示的抛出了异常。getNow是立马获取值,如果没有马上获取到值,则默认返回valueIfAbsent。
回调方法获取值:
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action)
使用如下方法打断后,可使用join()获取值:
public boolean complete(T value)
LockTool:
package com.xiaoxu.lock;
import java.text.MessageFormat;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/**
* @author xiaoxu
* @date 2022-10-03 14:53
* spring_boot:com.xiaoxu.lock.LockTool
*/
public class LockTool {
private static final long randWaitTime = 3;
public static Supplier<String> getRandomId(boolean error){
return ()->{
String uuid = null;
try {
System.out.println(MessageFormat.format("开始获取uuid:{0}",error));
TimeUnit.SECONDS.sleep(randWaitTime);
if(error){
throw new RuntimeException("获取uuid失败");
}
uuid = UUID.randomUUID().toString();
} catch (Throwable e) {
System.out.println(e.getMessage());;
}
return uuid;
};
}
public static void TimeSleep(long seconds){
if(seconds<=0){
throw new RuntimeException("秒数要>0");
}
try{
TimeUnit.SECONDS.sleep(seconds);
}catch (InterruptedException i){
throw new RuntimeException("中断错误:"+i.getCause()+"\t"+i.getMessage());
}
}
}
AbstractFuture:
package com.xiaoxu.lock;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author xiaoxu
* @date 2022-10-03 16:12
* spring_boot:com.xiaoxu.lock.AbstractFuture
*/
public abstract class AbstractFuture {
protected static final ThreadPoolExecutor executor;
static {
// guava包中ThreadFactoryBuilder可快捷创建ThreadFactory
executor = new ThreadPoolExecutor(3,
4,
10,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(4),
new ThreadFactoryBuilder().setNameFormat("xiaoxu-thread-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
}
CompletableFutureTestGetResult:
package com.xiaoxu.lock;
import java.text.MessageFormat;
import java.util.concurrent.*;
/**
* @author xiaoxu
* @date 2022-10-03 14:57
* spring_boot:com.xiaoxu.lock.CompletableFutureTest
*/
public class CompletableFutureTestGetResult extends AbstractFuture{
/*获取结果*/
// 获取结果的方式
public static void getResultForRunAndSupplyAsync(){
// CompletableFuture.runAsync,无返回值
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
System.out.println("worker:"+Thread.currentThread().getName());
LockTool.TimeSleep(3);
},executor).whenComplete((val,throwable)->{
//BiConsumer<? super T, ? super Throwable> action
if(throwable==null){
//没有报错信息
System.out.println(MessageFormat.format("whenComplete方式获取值:{0}",
val));
}
});
try {
System.out.println(MessageFormat.format("Future的get()接口阻塞获取值:{0}",voidCompletableFuture.get()));
System.out.println(MessageFormat.format("Future的get(long timeout, TimeUnit unit)接口获取值:{0}",voidCompletableFuture.get(1,TimeUnit.SECONDS)));
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
executor.shutdown();
System.out.println("#############################");
// CompletableFuture.supplyAsync,有返回值
CompletableFuture<String> supplyCompletableFuture = CompletableFuture.supplyAsync(LockTool.getRandomId(false));
// getNow(T val),如果没有立马返回值,就返回默认值val
System.out.println(MessageFormat.format("Future的getNow:{0}",supplyCompletableFuture.getNow("我是立马返回的值(等不及)")));
try {
// 1秒内未返回值,则报错 TimeoutException
System.out.println(MessageFormat.format("Future的get(long timeout, TimeUnit unit)接口获取值:{0}",supplyCompletableFuture.get(1,TimeUnit.SECONDS)));
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
try {
// get()直接阻塞获取值,但是需要try、catch处理异常
System.out.println(MessageFormat.format("Future的get()接口阻塞获取值:{0}",supplyCompletableFuture.get()));
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
//join获取值和get获取值,都是阻塞获取值,直到值返回;明显区别是get()有异常处理,join()没有异常处理
System.out.println(
MessageFormat.format("supply使用join()获取值:{0}",supplyCompletableFuture.join())
);
// whenComplete 获取值
supplyCompletableFuture.whenComplete((val,exp)-> System.out.println(
MessageFormat.format("supply使用whenComplete获取值:{0}",val)
));
}
// 获取结果的方式2:complete,打断
public static void getResult(){
// future是3秒返回结果
CompletableFuture<String> future = CompletableFuture.supplyAsync(LockTool.getRandomId(false));
// 但是1秒后就执行complete获取结果,打断成功,则返回complete中的默认值
LockTool.TimeSleep(1);
// 返回true,则说明打断成功
System.out.println(future.complete("我直接打断")+"\t"+future.join());
}
public static void main(String[] args) {
getResultForRunAndSupplyAsync();
// getResult();
}
}
执行getResultForRunAndSupplyAsync():
worker:xiaoxu-thread-0
whenComplete方式获取值:null
Future的get()接口阻塞获取值:null
Future的get(long timeout, TimeUnit unit)接口获取值:null
#############################
Future的getNow:我是立马返回的值(等不及)
开始获取uuid:false
java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at com.xiaoxu.lock.CompletableFutureTestGetResult.getResultForRunAndSupplyAsync(CompletableFutureTestGetResult.java:43)
at com.xiaoxu.lock.CompletableFutureTestGetResult.main(CompletableFutureTestGetResult.java:74)
Future的get()接口阻塞获取值:7320df2d-2ed6-48c7-a3f3-538c29c38621
supply使用join()获取值:7320df2d-2ed6-48c7-a3f3-538c29c38621
supply使用whenComplete获取值:7320df2d-2ed6-48c7-a3f3-538c29c38621
执行getResult():
开始获取uuid:false
true 我直接打断
2.2.6 CompletableFuture处理结果
thenApply:计算存在依赖关系,串行计算中某步骤报错,后面将不再执行
handle:计算存在依赖关系,串行计算中某步骤报错,可处理该问题,后面步骤依然可以执行
CompletableFutureTestHandleResult:
package com.xiaoxu.lock;
import java.util.concurrent.CompletableFuture;
/**
* @author xiaoxu
* @date 2022-10-03 16:16
* spring_boot:com.xiaoxu.lock.CompletableFutureTestHandleResult
*/
public class CompletableFutureTestHandleResult extends AbstractFuture{
/*处理结果*/
public static int getRInt(){
System.out.println("开始获取值,请等待...");
LockTool.TimeSleep(2);
return 1;
}
public static void handleResultForRunAndSupplyAsync(){
// thenApply:当前步骤有异常则直接退出
CompletableFuture.completedFuture(getRInt())
.thenApply(val->{
System.out.println("step1");
return val+2;
}).thenApply(val->{
System.out.println("step2");
return val+3;
}).whenComplete((val,exp)->{
if(exp==null){
System.out.println("计算结束"+val);
}
}).exceptionally(e->{
System.out.println("发生异常:"+e.getCause()+"\t"+e.getMessage());
return null;
}).join();
System.out.println("############");
CompletableFuture.supplyAsync(CompletableFutureTestHandleResult::getRInt,executor)
.thenApply(val->{
System.out.println("step1");
return val+2;
}).thenApply(val->{
System.out.println("step2");
if(val!=99){
throw new RuntimeException("步骤2报错");
}
return val+3;
}).thenApply(val->{
System.out.println("step3");
return val+4;
}).whenComplete((val,exp)->{
if(exp==null){
System.out.println("计算结束"+val);
}
}).exceptionally(e->{
System.out.println("发生异常:"+e.getCause()+"\t"+e.getMessage());
return null;
}).join();
executor.shutdown();
System.out.println("############");
CompletableFuture.supplyAsync(CompletableFutureTestHandleResult::getRInt)
.handle((val,throwable)->{
System.out.println("step1-handle");
return val+2;
}).handle((val,throwable)->{
System.out.println("step2-handle");
if(val!=99){
throw new RuntimeException("步骤2又报错");
}
return val+3;
}).handle((val,throwable)->{
System.out.println("step3-handle");
if(val!=null){
// handle的上个步骤如果报错,那么这里的val获取的就是null
// 避免报空指针异常,需要在这里做出非空判断
return val+4;
}else{
System.out.println("handle的上个步骤报错,导致本次获取的val为null");
return 99;
}
}).whenComplete((val,exp)->{
if(exp==null){
System.out.println("计算结束"+val);
}
})
.exceptionally(e->{
System.out.println("发生异常:"+e.getCause()+"\t"+e.getMessage());
return null;
})
.join();
}
public static void main(String[] args) {
handleResultForRunAndSupplyAsync();
}
}
执行结果:
开始获取值,请等待...
step1
step2
计算结束6
############
开始获取值,请等待...
step1
step2
发生异常:java.lang.RuntimeException: 步骤2报错 java.lang.RuntimeException: 步骤2报错
############
开始获取值,请等待...
step1-handle
step2-handle
step3-handle
handle的上个步骤报错,导致本次获取的val为null
计算结束99
注意:
exceptionally:thenApply没有第二个异常相关的参数传至下游(参数为Function<? super T,? extends U> fn),所以若thenApply中抛出异常,可使用exceptionally捕获到该异常;
handle/whenComplete:handle和whenComplete处理时都有第二个异常的参数(参数为BiFunction<? super T, Throwable, ? extends U> fn),用于返回下游异常的信息。比如handle中发生异常(发生异常会导致下个串行的handle没有返回值,下游接收到的返回值为null),会返回给下一个handle的第一个参数val为null,第二个throwable参数为捕获的异常相关信息,如果handle后面有whenComplete和exceptionally,且多个串行的handle均有报错,那么exceptionally获取的报错信息是最后一次报错的信息。
2.2.7 CompletableFuture消费结果
thenRun:A执行完执行B,B不需要A的结果
thenAccept:A执行完执行B,B需要A的结果,B无返回值
thenApply:A执行完执行B,B需要A的结果,B有返回值
CompletableFutureTestConsumeResult:
package com.xiaoxu.lock;
import java.util.concurrent.CompletableFuture;
/**
* @author xiaoxu
* @date 2022-10-04 13:11
* spring_boot:com.xiaoxu.lock.CompletableFutureTestConsumeResult
*/
public class CompletableFutureTestConsumeResult extends AbstractFuture{
public static void consume(){
CompletableFuture.supplyAsync(()->1)
.thenApply(e->e+2).thenApply(e->e+3)
.thenAccept(System.out::println);
// thenRun:不需要任务A的结果,所以打印出来是null,实际只是执行了任务A而已
// 结果:null
System.out.println(CompletableFuture.supplyAsync(LockTool.getRandomId(false))
.thenRun(()->{}).join());
System.out.println("&&&&&&&&&&&&&&&&&&&&&");
// thenAccept: 需要任务A的结果,但是因为参数是Consumer<? super T> action类型,所以不会有结果,
// 故而打印出来的结果还是null
// 结果:null
System.out.println(CompletableFuture.supplyAsync(LockTool.getRandomId(false))
.thenAccept(a-> System.out.println(a+"-uuid_suffix")).join());
// thenApply:需要任务A的结果,且需要返回结果,类型相同,都是U
System.out.println("#####################");
System.out.println(CompletableFuture.supplyAsync(LockTool.getRandomId(false))
.thenApply(a->a+"-uuid的结果").join());
}
public static void main(String[] args) {
consume();
}
}
执行结果:
6
开始获取uuid:false
null
&&&&&&&&&&&&&&&&&&&&&
开始获取uuid:false
e9ef9b96-c5b9-41f1-a335-f7d39ea66409-uuid_suffix
null
#####################
开始获取uuid:false
b3a22a6b-0b85-437c-8f03-ef66f939c725-uuid的结果
2.2.8 CompletableFuture中thenXXX和thenXXXAsync的使用区别
源码中找到形如如下的代码,可知thenXXX和thenXXXAsync的区别在于传参,thenXXX使用的是null,而thenXXXAsync使用的是asyncPool。
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
(1)处理太快,系统优化切换原则,直接使用main线程处理
package com.xiaoxu.lock;
import java.util.concurrent.CompletableFuture;
/**
* @author xiaoxu
* @date 2022-10-05 15:59
* spring_boot:com.xiaoxu.lock.CompletableFutureTestAsyncAndNoAsync
*/
public class CompletableFutureTestAsyncAndNoAsync extends AbstractFuture{
public static void testAsyncCompare(){
CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+" step 1");
return "xiaoxu";
}).thenAccept(x->{
System.out.println(Thread.currentThread().getName()+" step 2:"+x);
}).thenAccept(x->{
System.out.println(Thread.currentThread().getName()+" step 3:"+x);
}).thenAccept(x->{
System.out.println(Thread.currentThread().getName()+" step 4:"+x);
});
}
public static void main(String[] args) {
testAsyncCompare();
}
}
ForkJoinPool.commonPool-worker-9 step 1
main step 2:xiaoxu
main step 3:null
main step 4:null
(2)没有传入自定义的线程池,都默认使用线程池ForkJoinPool
package com.xiaoxu.lock;
import java.util.concurrent.CompletableFuture;
/**
* @author xiaoxu
* @date 2022-10-05 15:59
* spring_boot:com.xiaoxu.lock.CompletableFutureTestAsyncAndNoAsync
*/
public class CompletableFutureTestAsyncAndNoAsync extends AbstractFuture{
public static void testAsyncCompare(){
CompletableFuture.supplyAsync(()->{
LockTool.TimeSleep(1);
System.out.println(Thread.currentThread().getName()+" step 1");
return "xiaoxu";
}).thenAccept(x->{
System.out.println(Thread.currentThread().getName()+" step 2:"+x);
}).thenAccept(x->{
System.out.println(Thread.currentThread().getName()+" step 3:"+x);
}).thenAccept(x->{
System.out.println(Thread.currentThread().getName()+" step 4:"+x);
}).join();
}
public static void main(String[] args) {
testAsyncCompare();
}
}
ForkJoinPool.commonPool-worker-9 step 1
ForkJoinPool.commonPool-worker-9 step 2:xiaoxu
ForkJoinPool.commonPool-worker-9 step 3:null
ForkJoinPool.commonPool-worker-9 step 4:null
(3)执行第1个任务时,传入自定义的线程池,调用thenXXX执行第二个任务时,则第二个和第一个任务共用同一个线程池;若是调用thenXXXAsync执行第二个任务,则第一个任务是使用自定义线程池,第二个任务使用的是ForkJoinPool:
package com.xiaoxu.lock;
import java.util.concurrent.CompletableFuture;
/**
* @author xiaoxu
* @date 2022-10-05 15:59
* spring_boot:com.xiaoxu.lock.CompletableFutureTestAsyncAndNoAsync
*/
public class CompletableFutureTestAsyncAndNoAsync extends AbstractFuture{
public static void testAsyncCompare(){
CompletableFuture.supplyAsync(()->{
LockTool.TimeSleep(1);
System.out.println(Thread.currentThread().getName()+" step 1");
return "xiaoxu";
},executor).thenAccept(x->{
System.out.println(Thread.currentThread().getName()+" step 2:"+x);
}).thenAccept(x->{
System.out.println(Thread.currentThread().getName()+" step 3:"+x);
}).thenAccept(x->{
System.out.println(Thread.currentThread().getName()+" step 4:"+x);
}).join();
executor.shutdown();
}
public static void main(String[] args) {
testAsyncCompare();
}
}
xiaoxu-thread-0 step 1
xiaoxu-thread-0 step 2:xiaoxu
xiaoxu-thread-0 step 3:null
xiaoxu-thread-0 step 4:null
package com.xiaoxu.lock;
import java.util.concurrent.CompletableFuture;
/**
* @author xiaoxu
* @date 2022-10-05 15:59
* spring_boot:com.xiaoxu.lock.CompletableFutureTestAsyncAndNoAsync
*/
public class CompletableFutureTestAsyncAndNoAsync extends AbstractFuture{
public static void testAsyncCompare(){
CompletableFuture.supplyAsync(()->{
LockTool.TimeSleep(1);
System.out.println(Thread.currentThread().getName()+" step 1");
return "xiaoxu";
},executor).thenAccept(x->{
System.out.println(Thread.currentThread().getName()+" step 2:"+x);
}).thenAcceptAsync(x->{
System.out.println(Thread.currentThread().getName()+" step 3:"+x);
}).thenAccept(x->{
System.out.println(Thread.currentThread().getName()+" step 4:"+x);
}).join();
executor.shutdown();
}
public static void main(String[] args) {
testAsyncCompare();
}
}
xiaoxu-thread-0 step 1
xiaoxu-thread-0 step 2:xiaoxu
ForkJoinPool.commonPool-worker-9 step 3:null
ForkJoinPool.commonPool-worker-9 step 4:null
thenRun(Async)、thenAccept(Async)、thenApply(Async)均同理。
2.2.9 CompletableFuture计算速度选用
package com.xiaoxu.lock;
import java.util.concurrent.CompletableFuture;
/**
* @author xiaoxu
* @date 2022-10-07 17:04
* spring_boot:com.xiaoxu.lock.CompletableFutureTestSpeed
*/
public class CompletableFutureTestSpeed extends AbstractFuture{
public static void run(){
// a和b,谁先返回值,则取谁
// CompletableFuture.applyToEither
CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
System.out.println("Boy A start");
LockTool.TimeSleep(1);
return "BoyA";
});
CompletableFuture<String> b = CompletableFuture.supplyAsync(()->{
System.out.println("Boy B start");
LockTool.TimeSleep(2);
return "BoyB";
});
CompletableFuture<String> res = a.applyToEither(b, x -> x + " win!");
System.out.println(Thread.currentThread().getName()+"\t"+res.join());
}
public static void main(String[] args) {
run();
}
}
Boy A start
Boy B start
main BoyA win!
2.2.10 CompletableFuture计算结果合并
thenCombine:任务都完成后,最终把两个任务的结果一起交于thenCombine处理,先完成的需要等待,直到任务均完成。
package com.xiaoxu.lock;
import java.util.concurrent.CompletableFuture;
/**
* @author xiaoxu
* @date 2022-10-07 17:27
* spring_boot:com.xiaoxu.lock.CompletableFutureTestCombine
*/
public class CompletableFutureTestCombine extends AbstractFuture{
public static void run(){
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> {
System.out.println("A开始计算值");
LockTool.TimeSleep(2);
return 15;
});
CompletableFuture<Integer> b = CompletableFuture.supplyAsync(() -> {
System.out.println("B开始计算值");
LockTool.TimeSleep(3);
return 30;
});
CompletableFuture<Integer> f = a.thenCombine(b, (a1, b1) -> {
System.out.println("值1:" + a1 + ";值2:" + b1);
return a1 + b1;
});
System.out.println("最终计算出的结果:"+f.join());
}
public static void main(String[] args) {
run();
}
}
A开始计算值
B开始计算值
值1:15;值2:30
最终计算出的结果:45
2.2.11 CompletableFuture计算结果合并方式2(thenCompose,与2.2.10比较)
package com.xiaoxu.lock;
import java.util.concurrent.CompletableFuture;
/**
* @author xiaoxu
* @date 2022-10-07 18:23
* spring_boot:com.xiaoxu.lock.CompletableFutureTestCompose
*/
public class CompletableFutureTestCompose extends AbstractFuture{
public static void run(){
long start = System.currentTimeMillis();
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> {
System.out.println("A开始计算值");
LockTool.TimeSleep(4);
return 15;
});
CompletableFuture<Integer> b = CompletableFuture.supplyAsync(() -> {
System.out.println("B开始计算值");
LockTool.TimeSleep(2);
return 30;
});
CompletableFuture<Integer> f = a.thenCombine(b, (a1, b1) -> {
System.out.println("值1:" + a1 + ";值2:" + b1);
return a1 + b1;
});
System.out.println("thenCombine最终计算出的结果:"+f.join());
long end = System.currentTimeMillis();
System.out.println("thenCombine耗时:"+(end-start)+"ms");
}
public static void run2(){
long start = System.currentTimeMillis();
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> {
System.out.println("A开始计算值");
LockTool.TimeSleep(4);
return 15;
});
CompletableFuture<Integer> b = CompletableFuture.supplyAsync(() -> {
System.out.println("B开始计算值");
LockTool.TimeSleep(2);
return 30;
});
CompletableFuture<Integer> f = a.thenCompose(a1 -> {
System.out.println("值1:" + a1);
System.out.println("现在开始计算b1的值:");
// thenCompose 返回的需要是一个CompletionStage,即CompletableFuture<Integer> b
return b.thenApply(bRes->{
System.out.println("thenCompose获取到b的结果:"+bRes);
return a1+bRes;
});
});
System.out.println("thenCompose最终计算出的结果:"+f.join());
long end = System.currentTimeMillis();
System.out.println("thenCompose耗时:"+(end-start)+"ms");
}
public static void main(String[] args) {
run();
// run2();
}
}
A开始计算值
B开始计算值
值1:15;值2:30
thenCombine最终计算出的结果:45
thenCombine耗时:4052ms
使用thenCompose合并结果:
public static void main(String[] args) {
// run();
run2();
}
A开始计算值
B开始计算值
值1:15
现在开始计算b1的值:
thenCompose获取到b的结果:30
thenCompose最终计算出的结果:45
thenCompose耗时:4053ms
可见上述两种方式thenCombine和thenCompose合并结果,耗时相差无几,结果一致。
2.2.12 CompletableFuture的allOf和anyOf
package com.xiaoxu.lock;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* @author xiaoxu
* @date 2022-10-07 17:40
* spring_boot:com.xiaoxu.lock.CompletableFutureTestOf
*/
public class CompletableFutureTestOf extends AbstractFuture{
public static void testAllOf(){
System.out.println("allOf开始执行:");
CompletableFuture<Void> a = CompletableFuture.runAsync(() -> {
System.out.println("all-A开始计算");
LockTool.TimeSleep(2);
System.out.println("all-A获得结果45");
});
CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
System.out.println("all-B开始计算");
LockTool.TimeSleep(3);
System.out.println("all-B获得结果30");
});
CompletableFuture<Void> c = CompletableFuture.runAsync(() -> {
System.out.println("all-C开始计算");
LockTool.TimeSleep(1);
System.out.println("all-C获得结果100");
});
// CompletableFuture.allOf:全部都执行完
CompletableFuture.allOf(a,b,c).join();
}
public static void testAnyOf(){
System.out.println("anyOf开始执行:");
CompletableFuture<Void> a = CompletableFuture.runAsync(() -> {
System.out.println("any-A开始计算");
LockTool.TimeSleep(2);
System.out.println("any-A获得结果45");
});
CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
System.out.println("any-B开始计算");
LockTool.TimeSleep(3);
System.out.println("any-B获得结果30");
});
CompletableFuture<Void> c = CompletableFuture.runAsync(() -> {
System.out.println("any-C开始计算");
LockTool.TimeSleep(1);
System.out.println("any-C获得结果100");
});
// CompletableFuture.anyOf:1个执行完则完成
CompletableFuture.anyOf(a,b,c).join();
}
public static void main(String[] args) {
testAnyOf();
testAllOf();
}
}
anyOf在前,allOf在后:执行结果:
anyOf开始执行:
any-A开始计算
any-B开始计算
any-C开始计算
any-C获得结果100
allOf开始执行:
all-A开始计算
all-B开始计算
all-C开始计算
any-A获得结果45
all-C获得结果100
any-B获得结果30
all-A获得结果45
all-B获得结果30
allOf则和anyOf相反,allOf必须全部执行完,才会执行后续代码:
修改如下执行顺序,先执行allOf,再执行anyOf:
public static void main(String[] args) {
testAllOf();
testAnyOf();
}
执行结果如下(因为CompletableFuture默认的ForkJoinPool线程池,主线程结束时会shutdown默认线程池,故而anyOf只获取到C时,交由后续主线程执行,主线程立马结束,故而只打印了any-C的结果):
allOf开始执行:
all-A开始计算
all-B开始计算
all-C开始计算
all-C获得结果100
all-A获得结果45
all-B获得结果30
anyOf开始执行:
any-A开始计算
any-B开始计算
any-C开始计算
any-C获得结果100
修改如下:
public static void main(String[] args) {
testAllOf();
testAnyOf();
LockTool.TimeSleep(5);
}
结果如下所示:
allOf开始执行:
all-A开始计算
all-B开始计算
all-C开始计算
all-C获得结果100
all-A获得结果45
all-B获得结果30
anyOf开始执行:
any-A开始计算
any-B开始计算
any-C开始计算
any-C获得结果100
any-A获得结果45
any-B获得结果30
更多推荐
所有评论(0)