什么叫网站降权,875网站建设怎么样,制作网站要花多少钱,it培训网站JAVA中使用CompletableFuture进行异步编程
1、什么是CompletableFuture
CompletableFuture 是 JDK8 提供的 Future 增强类#xff0c;CompletableFuture 异步任务执行线程池#xff0c;默认是把异步任
务都放在 ForkJoinPool 中执行。
在这种方式中#xff0c;主线程不会…JAVA中使用CompletableFuture进行异步编程
1、什么是CompletableFuture
CompletableFuture 是 JDK8 提供的 Future 增强类CompletableFuture 异步任务执行线程池默认是把异步任
务都放在 ForkJoinPool 中执行。
在这种方式中主线程不会被阻塞不需要一直等到子线程完成主线程可以并行的执行其他任务。
2、Future存在的问题
Future 实际采用 FutureTask 实现该对象相当于是消费者和生产者的桥梁消费者通过 FutureTask 存储任务
的处理结果更新任务的状态未开始、正在处理、已完成等。而生产者拿到的 FutureTask 被转型为 Future 接
口可以阻塞式获取任务的处理结果非阻塞式获取任务处理状态。
通常的线程池接口类 ExecutorService其中execute方法的返回值是void即无法获取异步任务的执行状态3个
重载的 submit 方法的返回值是 Future可以据此获取任务执行的状态和结果。
package com;import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;public class FutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {// 创建异步执行任务ExecutorService executorService Executors.newSingleThreadExecutor();FutureDouble cf executorService.submit(() - {System.out.println(Thread.currentThread() start,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}if (false) {throw new RuntimeException(test);} else {System.out.println(Thread.currentThread() exit,time- System.currentTimeMillis());return 1.2;}});executorService.shutdown();System.out.println(main thread start,time- System.currentTimeMillis());// 等待子任务执行完成,如果已完成则直接返回结果// 如果执行任务异常,则get方法会把之前捕获的异常重新抛出System.out.println(run result- cf.get());System.out.println(main thread exit,time- System.currentTimeMillis());}
}# 程序输出
main thread start,time-1693019539222
Thread[pool-1-thread-1,5,main] start,time-1693019539222
Thread[pool-1-thread-1,5,main] exit,time-1693019541223
run result-1.2
main thread exit,time-1693019541226子线程是异步执行的主线程休眠等待子线程执行完成子线程执行完成后唤醒主线程主线程获取任务执行结果
后退出。
很多博客说使用不带等待时间限制的get方法时如果子线程执行异常了会导致主线程长期阻塞这其实是错误
的子线程执行异常时其异常会被捕获然后修改任务的状态为异常结束并唤醒等待的主线程get方法判断任务
状态发生变更就终止等待了并抛出异常。将上述用例中if(false)改成if(true) 执行结果如下
main thread start,time-1693019872552
Thread[pool-1-thread-1,5,main] start,time-1693019872552
Exception in thread main java.util.concurrent.ExecutionException: java.lang.RuntimeException: testat java.util.concurrent.FutureTask.report(FutureTask.java:122)at java.util.concurrent.FutureTask.get(FutureTask.java:192)at com.FutureTest.test(FutureTest.java:34)at com.FutureTest.main(FutureTest.java:11)
Caused by: java.lang.RuntimeException: testat com.FutureTest.lambda$test$0(FutureTest.java:25)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)get方法抛出异常导致主线程异常终止。
Future 的局限性它没法直接对多个任务进行链式、组合等处理需要借助并发工具类才能完成实现逻辑比较
复杂。
而 CompletableFuture 是对 Future 的扩展和增强CompletableFuture 实现了 Future 接口并在此基础上进
行了丰富的扩展完美弥补了 Future 的局限性同时 CompletableFuture 实现了对任务编排的能力。借助这项
能力可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说这项能力是它的核心能力。而在以
往虽然通过 CountDownLatch 等工具类也可以实现任务的编排但需要复杂的逻辑处理不仅耗费精力且难以
维护。
CompletableFuture实现了CompletionStage接口和Future接口前者是对后者的一个扩展增加了异步回调、
流式处理、多个Future组合处理的能力使Java在处理多任务的协同工作时更加顺畅便利。
CompletionStage 接口定义了任务编排的方法执行某一阶段可以向下执行后续阶段。异步执行的默认线程
池是 ForkJoinPool.commonPool()但为了业务之间互不影响且便于定位问题强烈推荐使用自定义线程池。
CompletableFuture 中默认线程池如下
// 根据commonPool的并行度来选择,而并行度的计算是在ForkJoinPool的静态代码段完成的
private static final boolean useCommonPool (ForkJoinPool.getCommonPoolParallelism() 1);private static final Executor asyncPool useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();ForkJoinPool 中初始化 commonPool 的参数
static {// initialize field offsets for CAS etctry {U sun.misc.Unsafe.getUnsafe();Class? k ForkJoinPool.class;CTL U.objectFieldOffset(k.getDeclaredField(ctl));RUNSTATE U.objectFieldOffset(k.getDeclaredField(runState));STEALCOUNTER U.objectFieldOffset(k.getDeclaredField(stealCounter));Class? tk Thread.class;……} catch (Exception e) {throw new Error(e);}commonMaxSpares DEFAULT_COMMON_MAX_SPARES;defaultForkJoinWorkerThreadFactory new DefaultForkJoinWorkerThreadFactory();modifyThreadPermission new RuntimePermission(modifyThread);// 调用makeCommonPool方法创建commonPool,其中并行度为逻辑核数-1common java.security.AccessController.doPrivileged(new java.security.PrivilegedActionForkJoinPool() {public ForkJoinPool run() { return makeCommonPool(); }});int par common.config SMASK; // report 1 even if threads disabledcommonParallelism par 0 ? par : 1;
}3、CompletableFuture功能
3.1 依赖关系
thenApply()把前面任务的执行结果交给后面的Function。
thenCompose()用来连接两个有依赖关系的任务结果由第二个任务返回。
3.2 and集合关系
thenCombine()合并任务有返回值。
thenAccepetBoth()两个任务执行完成后将结果交给thenAccepetBoth处理无返回值。
runAfterBoth()两个任务都执行完成后执行下一步操作(Runnable类型任务)。
3.3 or聚合关系
applyToEither()两个任务哪个执行的快就使用哪一个结果有返回值。
acceptEither()两个任务哪个执行的快就消费哪一个结果无返回值。
runAfterEither()任意一个任务执行完成进行下一步操作(Runnable类型任务)。
3.4 并行执行
allOf()当所有给定的 CompletableFuture 完成时返回一个新的 CompletableFuture。
anyOf()当任何一个给定的 CompletablFuture 完成时返回一个新的 CompletableFuture。
3.5 结果处理
whenComplete当任务完成时将使用结果(或null)和此阶段的异常(或 null如果没有)执行给定操作。
exceptionally返回一个新的 CompletableFuture当前面的 CompletableFuture 完成时它也完成当它
异常完成时给定函数的异常触发这个 CompletableFuture 的完成。
3、CompletableFuture(runAsync和supplyAsync)创建异步任务
CompletableFuture 提供了四个静态方法来创建一个异步操作
public static CompletableFutureVoid runAsync(Runnable runnable)public static CompletableFutureVoid runAsync(Runnable runnable, Executor executor)public static U CompletableFutureU supplyAsync(SupplierU supplier)public static U CompletableFutureU supplyAsync(SupplierU supplier, Executor executor)这四个方法的区别 runAsync() 以Runnable函数式接口类型为参数没有返回结果supplyAsync() 以Supplier函数式接 口类型为参数返回结果类型为USupplier接口的get()是有返回值的(会阻塞)。 使用没有指定Executor的方法时内部使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。 如果指定线程池则使用指定的线程池运行。 默认情况下CompletableFuture会使用公共的ForkJoinPool线程池这个线程池默认创建的线程数是CPU 的核数也可以通过JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism来设置 ForkJoinPool线程池的线程数。如果所有CompletableFuture共享一个线程池那么一旦有任务执行一 些很慢的 I/O 操作就会导致线程池中所有线程都阻塞在 I/O 操作上从而造成线程饥饿进而影响整个系统 的性能。所以强烈建议你要根据不同的业务类型创建不同的线程池以避免互相干扰。
supplyAsync 表示创建带返回值的异步任务的相当于 ExecutorService submit(Callable task) 方法。
runAsync 表示创建无返回值的异步任务相当于 ExecutorService submit(Runnable task)方法。
这两方法的效果跟 submit 是一样的。
这两方法各有一个重载版本可以指定执行异步任务的Executor实现如果不指定默认使用
ForkJoinPool.commonPool()如果机器是单核的则默认使用ThreadPerTaskExecutor该类是一个内部类每
次执行execute都会创建一个新线程。
3.1 runAsync
runAsync 没有返回值
package com;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;public class RunAsyncTest {public static void main(String[] args) {ListInteger numberList new ArrayList();for (int i 1; i 11; i) {numberList.add(i);}System.out.println(start!);long start System.currentTimeMillis();for (Integer number : numberList) {CompletableFuture.runAsync(() - {try {Thread.sleep(1000);System.out.println(number);} catch (InterruptedException e) {e.printStackTrace();}});}long end System.currentTimeMillis();System.out.println(耗时: (end - start));System.out.println(end!);}
}# 程序输出
start!
耗时:44
end!package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class Test1 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {// 创建异步执行任务无返回值CompletableFuture cf CompletableFuture.runAsync(() - {System.out.println(Thread.currentThread() start,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}if (false) {throw new RuntimeException(test);} else {System.out.println(Thread.currentThread() exit,time- System.currentTimeMillis());}});System.out.println(main thread start,time- System.currentTimeMillis());//等待子任务执行完成System.out.println(run result- cf.get());System.out.println(main thread exit,time- System.currentTimeMillis());}
}# 程序输出
main thread start,time-1693021661122
Thread[ForkJoinPool.commonPool-worker-1,5,main] start,time-1693021661122
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit,time-1693021663123
run result-null
main thread exit,time-1693021663123package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 自定义线程池*/
public class Test3 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {ExecutorService executorService Executors.newSingleThreadExecutor();// 创建异步执行任务:CompletableFuture cf CompletableFuture.runAsync(() - {System.out.println(Thread.currentThread() start,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}if (false) {throw new RuntimeException(test);} else {System.out.println(Thread.currentThread() exit,time- System.currentTimeMillis());}}, executorService);System.out.println(main thread start,time- System.currentTimeMillis());//等待子任务执行完成System.out.println(run result- cf.get());System.out.println(main thread exit,time- System.currentTimeMillis());}
}# 程序输出
main thread start,time-1693022272784
Thread[pool-1-thread-1,5,main] start,time-1693022272784
Thread[pool-1-thread-1,5,main] exit,time-1693022274784
run result-null
main thread exit,time-16930222747843.2 supplyAsync
supplyAsync 有返回值
package com;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class SupplyAsynctest {public static void main(String[] args) {ListInteger numberList new ArrayList();for (int i 1; i 11; i) {numberList.add(i);}ListCompletableFutureInteger futureList new ArrayList();System.out.println(start!);long start System.currentTimeMillis();for (Integer number : numberList) {futureList.add(CompletableFuture.supplyAsync(() - {try {Thread.sleep(1000);return number;} catch (InterruptedException e) {e.printStackTrace();return 0;}}));}for (CompletableFutureInteger completableFuture : futureList) {Integer number null;try {number completableFuture.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println(number);}long end System.currentTimeMillis();System.out.println(耗时: (end - start));System.out.println(end!);}
}# 程序输出
start!
1
2
3
4
5
6
7
8
9
10
耗时:2047
end!package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class Test2 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {// 创建异步执行任务有返回值CompletableFutureDouble cf CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() start,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}if (false) {throw new RuntimeException(test);} else {System.out.println(Thread.currentThread() exit,time- System.currentTimeMillis());return 1.2;}});System.out.println(main thread start,time- System.currentTimeMillis());//等待子任务执行完成System.out.println(run result- cf.get());System.out.println(main thread exit,time- System.currentTimeMillis());}}# 程序输出
main thread start,time-1693021915960
Thread[ForkJoinPool.commonPool-worker-1,5,main] start,time-1693021915960
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit,time-1693021917960
run result-1.2
main thread exit,time-1693021917962package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;public class Test4 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {ForkJoinPool pool new ForkJoinPool();// 创建异步执行任务:CompletableFutureDouble cf CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() start,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}if (false) {throw new RuntimeException(test);} else {System.out.println(Thread.currentThread() exit,time- System.currentTimeMillis());return 1.2;}}, pool);System.out.println(main thread start,time- System.currentTimeMillis());//等待子任务执行完成System.out.println(run result- cf.get());System.out.println(main thread exit,time- System.currentTimeMillis());}}# 程序输出
main thread start,time-1693022336576
Thread[ForkJoinPool-1-worker-1,5,main] start,time-1693022336576
Thread[ForkJoinPool-1-worker-1,5,main] exit,time-1693022338577
run result-1.2
main thread exit,time-16930223385784、获取结果(get和join)
join()和get()方法都是用来获取CompletableFuture异步之后的返回值。join()方法抛出的是uncheck异
常即未经检查的异常)不会强制开发者抛出。get()方法抛出的是经过检查的异常ExecutionException
InterruptedException 需要用户手动处理抛出或者 try catch。
5、结果处理( whenComplete和exceptionally和handle)
5.1 whenComplete和exceptionally方法
当CompletableFuture的计算结果完成或者抛出异常的时候我们可以执行特定的Action。主要是下面的方
法
public CompletableFutureT whenComplete(BiConsumer? super T,? super Throwable action)public CompletableFutureT whenCompleteAsync(BiConsumer? super T,? super Throwable action)public CompletableFutureT whenCompleteAsync(BiConsumer? super T,? super Throwable action, Executor executor)public CompletableFutureT exceptionally(FunctionThrowable,? extends T fn);whenComplete可以处理正常和异常的计算结果exceptionally处理异常情况。 Action的类型是BiConsumer? super T,? super Throwable它可以处理正常的计算结果或者异 常情况。 方法不以Async结尾意味着Action使用相同的线程执行而Async可能会使用其它的线程去执行(如果使用 相同的线程池也可能会被同一个线程选中执行)。 这几个方法都会返回CompletableFuture当Action执行完毕后它的结果返回原始的CompletableFuture 的计算结果或者返回异常。 whenComplete和whenCompleteAsync 的区别whenComplete是执行当前任务的线程继续执行 whenComplete的任务。whenCompleteAsync是执行把whenCompleteAsync 这个任务继续提交给线程池来 进行执行。
whenComplete 是当某个任务执行完成后执行的回调方法会将执行结果或者执行期间抛出的异常传递给回调方
法如果是正常执行则异常为null回调方法对应的CompletableFuture的result和该任务一致如果该任务正常
执行则get方法返回执行结果如果是执行异常则get方法抛出异常。
exceptionally方法指定某个任务执行异常时执行的回调方法会将抛出异常作为参数传递到回调方法中如果该
任务正常执行则会exceptionally方法返回的CompletionStage的result就是该任务正常执行的结果。
package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;public class WhenCompleteAndExceptionally {public static void main(String[] args) {CompletableFutureInteger future CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}Integer randomNum new Random().nextInt(10);if (randomNum % 2 0) {System.out.println(任务发生异常,返回给exceptionally!);Integer num 12 / 0;}System.out.println(任务处理完成,返回给whenComplete!);return randomNum;});// 任务完成或异常方法完成时执行(whenComplete),无论有异常与否,这个方法都会执行// 如果出现了异常,任务结果为nullfuture.whenComplete(new BiConsumerInteger, Throwable() {Overridepublic void accept(Integer t, Throwable action) {System.out.println(任务正常,接收supplyAsync的返回值, 结果是: t);}});// 出现异常时先执行(exceptionally),然后再执行(whenComplete)// 如果没有出现异常,(exceptionally)不会被执行future.exceptionally(new FunctionThrowable, Integer() {Overridepublic Integer apply(Throwable t) {System.out.println(任务异常,接收supplyAsync的返回值,异常是 t.getMessage());return -1;}});// 如果发生了异常,此处无法获取返回值try {Integer result future.get();System.out.println(get()获取到的结果是: result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}# 程序输出
# 无异常 supplyAsync-whenComplete-get()
任务处理完成,返回给whenComplete!
任务正常,接收supplyAsync的返回值,结果是:9
get()获取到的结果是:9# 程序输出
# 出现异常 supplyAsync-exceptionally-whenComplete
任务发生异常,返回给exceptionally!
任务异常,接收supplyAsync的返回值,异常是java.lang.ArithmeticException: / by zero
任务正常,接收supplyAsync的返回值,结果是:null
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zeroat java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)at com.WhenCompleteAndExceptionally.main(WhenCompleteAndExceptionally.java:48)
Caused by: java.lang.ArithmeticException: / by zeroat com.WhenCompleteAndExceptionally.lambda$main$0(WhenCompleteAndExceptionally.java:22)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)另一种写法
package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;public class WhenCompleteAndExceptionally2 {public static void main(String[] args) {CompletableFutureInteger future CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}Integer randomNum new Random().nextInt(10);if (randomNum % 2 0) {System.out.println(任务发生异常,返回给exceptionally!);Integer num 12 / 0;}System.out.println(任务处理完成,返回给whenComplete!);return randomNum;})// 任务完成或异常方法完成时执行(whenComplete),无论有异常与否,这个方法都会执行// 如果出现了异常,任务结果为null.whenComplete(new BiConsumerInteger, Throwable() {Overridepublic void accept(Integer t, Throwable action) {System.out.println(任务正常,接收supplyAsync的返回值, 结果是: t);}})// 出现异常时先执行(exceptionally),然后再执行(whenComplete)// 如果没有出现异常,(exceptionally)不会被执行.exceptionally(new FunctionThrowable, Integer() {Overridepublic Integer apply(Throwable t) {System.out.println(任务异常,接收supplyAsync的返回值,异常是 t.getMessage());return -1;}});// 如果发生了异常,此处无法获取返回值try {Integer result future.get();System.out.println(get()获取到的结果是: result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}# 程序输出
# 无异常 supplyAsync-whenComplete-get()
任务处理完成,返回给whenComplete!
任务正常,接收supplyAsync的返回值,结果是:7
get()获取到的结果是:7# 程序输出
# 有异常 supplyAsync-whenComplete-exceptionally-get()
任务发生异常,返回给exceptionally!
任务正常,接收supplyAsync的返回值,结果是:null
任务异常,接收supplyAsync的返回值,异常是java.lang.ArithmeticException: / by zero
get()获取到的结果是:-1package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;public class WhenCompleteAndExceptionally3 {public static void main(String[] args) {CompletableFutureInteger future CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}Integer randomNum new Random().nextInt(10);if (randomNum % 2 0) {System.out.println(任务发生异常,返回给exceptionally!);Integer num 12 / 0;}System.out.println(任务处理完成,返回给whenComplete!);return randomNum;});// 任务完成或异常方法完成时执行(whenComplete),无论有异常与否,这个方法都会执行// 如果出现了异常,任务结果为nullCompletableFutureInteger future1 future.whenComplete(new BiConsumerInteger, Throwable() {Overridepublic void accept(Integer t, Throwable action) {System.out.println(任务正常,接收supplyAsync的返回值, 结果是: t);}});// 出现异常时先执行(exceptionally),然后再执行(whenComplete)// 如果没有出现异常,(exceptionally)不会被执行CompletableFutureInteger future2 future.exceptionally(new FunctionThrowable, Integer() {Overridepublic Integer apply(Throwable t) {System.out.println(任务异常,接收supplyAsync的返回值,异常是 t.getMessage());return -1;}});// 如果发生了异常,此处无法获取返回值try {Integer result future2.get();System.out.println(get()获取到的结果是: result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}# 程序输出
# 无异常 supplyAsync-whenComplete-get()
任务处理完成,返回给whenComplete!
任务正常,接收supplyAsync的返回值,结果是:3
get()获取到的结果是:3# 程序输出
# 有异常 supplyAsync-whenComplete-exceptionally-get()
任务发生异常,返回给exceptionally!
任务异常,接收supplyAsync的返回值,异常是java.lang.ArithmeticException: / by zero
任务正常,接收supplyAsync的返回值,结果是:null
get()获取到的结果是:-1package com;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;public class WhenCompleteAndExceptionally4 {public static void main(String[] args) {CompletableFuture future CompletableFuture.supplyAsync(new SupplierObject() {Overridepublic Object get() {System.out.println(Thread.currentThread().getName() completableFuture);int i 10 / 0;return 1024;}});CompletableFuture future1 future.whenComplete(new BiConsumerObject, Throwable() {Overridepublic void accept(Object o, Throwable throwable) {System.out.println(-------O o);System.out.println(-------throwable throwable);}});CompletableFuture future2 future.exceptionally(new FunctionThrowable, Object() {Overridepublic Object apply(Throwable throwable) {System.out.println(throwable throwable);return 6666;}});try {System.out.println(结果是: future2.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}# 程序输出
# 有异常 supplyAsync-whenComplete-exceptionally-get()
ForkJoinPool.commonPool-worker-1 completableFuture
-------Onull
-------throwablejava.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
throwablejava.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
结果是:6666package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class Test5 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {// 创建异步执行任务:CompletableFutureDouble cf CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() job1 start,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}if (false) {throw new RuntimeException(test);} else {System.out.println(Thread.currentThread() job1 exit,time- System.currentTimeMillis());return 1.2;}});//cf执行完成后会将执行结果和执行过程中抛出的异常传入回调方法如果是正常执行的则传入的异常为nullCompletableFutureDouble cf2 cf.whenComplete((a, b) - {System.out.println(Thread.currentThread() job2 start,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}if (b ! null) {System.out.println(error stack trace-);b.printStackTrace();} else {System.out.println(run succ,result- a);}System.out.println(Thread.currentThread() job2 exit,time- System.currentTimeMillis());});//等待子任务执行完成System.out.println(main thread start wait,time- System.currentTimeMillis());//如果cf是正常执行的cf2.get的结果就是cf执行的结果//如果cf是执行异常则cf2.get会抛出异常System.out.println(run result- cf2.get());System.out.println(main thread exit,time- System.currentTimeMillis());}
}# 程序输出
Thread[ForkJoinPool.commonPool-worker-1,5,main]job1 start,time-1693027350240
main thread start wait,time-1693027350240
Thread[ForkJoinPool.commonPool-worker-1,5,main]job1 exit,time-1693027352241
Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 start,time-1693027352241
run succ,result-1.2
Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 exit,time-1693027354241
run result-1.2
main thread exit,time-1693027354241将上述示例中的if(false) 改成if(true)其输出如下
# 程序输出
Thread[ForkJoinPool.commonPool-worker-1,5,main]job1 start,time-1693027394831
main thread start wait,time-1693027394832
Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 start,time-1693027396832
error stack trace-
Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 exit,time-1693027398834
java.util.concurrent.CompletionException: java.lang.RuntimeException: testat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: testat com.test.Test5.lambda$test$0(Test5.java:21)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)... 5 more
Exception in thread main java.util.concurrent.ExecutionException: java.lang.RuntimeException: testat java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)at com.test.Test5.test(Test5.java:46)at com.test.Test5.main(Test5.java:9)
Caused by: java.lang.RuntimeException: testat com.test.Test5.lambda$test$0(Test5.java:21)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;public class Test6 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {ForkJoinPool pool new ForkJoinPool();// 创建异步执行任务:CompletableFutureDouble cf CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() job1 start,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}if (false) {throw new RuntimeException(test);} else {System.out.println(Thread.currentThread() job1 exit,time- System.currentTimeMillis());return 1.2;}}, pool);//cf执行异常时将抛出的异常作为入参传递给回调方法CompletableFutureDouble cf2 cf.exceptionally((param) - {System.out.println(Thread.currentThread() start,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(error stack trace-);param.printStackTrace();System.out.println(Thread.currentThread() exit,time- System.currentTimeMillis());return -1.1;});System.out.println(main thread start,time- System.currentTimeMillis());//等待子任务执行完成,此处无论是job2和job3都可以实现job2退出主线程才退出如果是cf则主线程不会等待job2执行完成自动退出了//cf2.get时没有异常但是依然有返回值就是cf的返回值System.out.println(run result- cf2.get());System.out.println(main thread exit,time- System.currentTimeMillis());}
}# 程序输出
Thread[ForkJoinPool-1-worker-1,5,main]job1 start,time-1693038412532
main thread start,time-1693038412532
Thread[ForkJoinPool-1-worker-1,5,main]job1 exit,time-1693038414533
run result-1.2
main thread exit,time-1693038414534将上述示例中的if(true) 改成if(false)其输出如下
# 程序输出
Thread[ForkJoinPool-1-worker-1,5,main]job1 start,time-1693038448098
main thread start,time-1693038448098
Thread[ForkJoinPool-1-worker-1,5,main] start,time-1693038450099
error stack trace-
Thread[ForkJoinPool-1-worker-1,5,main] exit,time-1693038452104
run result--1.1
main thread exit,time-1693038452107
java.util.concurrent.CompletionException: java.lang.RuntimeException: testat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: testat com.test.Test6.lambda$test$0(Test6.java:23)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)... 5 more5.2 handle 方法
public U CompletionStageU handle(BiFunction? super T, Throwable, ? extends U fn);
public U CompletionStageU handleAsync(BiFunction? super T, Throwable, ? extends U fn);
public U CompletionStageU handleAsync(BiFunction? super T, Throwable, ? extends U fn,Executor executor);handle是执行任务完成时对结果的处理handle是在任务完成后再执行还可以处理异常的任务。
handleAsync方法即可以获取执行结果也可以感知异常信息并能处理执行结果并返回。
跟whenComplete基本一致区别在于handle的回调方法有返回值且handle方法返回的CompletableFuture的
result是回调方法的执行结果或者回调方法执行期间抛出的异常与原始CompletableFuture的result无关了。
package com;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class handleAsyncTest {public static void main(String[] args) {System.out.println(main start ...);CompletableFutureInteger future CompletableFuture.supplyAsync(() - {System.out.println(开启异步任务...);int i 10 % 2;if (i 0) {throw new RuntimeException(远程服务调用失败);}return i;}).handleAsync((res, thr) - {System.out.println(进入handleAsync方法);if (res ! null) {return res * 2;}if (thr ! null) {System.out.println(捕获到异常: thr);return 0;}return 10;});try {Integer result future.get();System.out.println(获取异步任务返回值 result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println(main end ...);}
}# 程序输出
# 有异常
main start ...
开启异步任务...
进入handleAsync方法
捕获到异常:java.util.concurrent.CompletionException: java.lang.RuntimeException: 远程服务调用失败
获取异步任务返回值0
main end ...# 程序输出
# 无异常
main start ...
开启异步任务...
进入handleAsync方法
获取异步任务返回值2
main end ...package com;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;public class handleAsyncTest2 {public static void main(String[] args) {CountDownLatch countDownLatch new CountDownLatch(2);System.out.println(start!);CompletableFutureInteger future1 CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread().getName() 进行一连串操作1....);try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return 1;});// whenComplete方法,返回的future的结果还是1CompletableFutureInteger future future1.whenComplete((x, y) - {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() whenComplete,future返回: x);});// handler返回的future结果是字符串2CompletableFutureString handle future.handle((x, y) - {System.out.println(Thread.currentThread().getName() handle接收的结果: x);countDownLatch.countDown();return 2;});CompletableFutureInteger handle1 handle.handle((x, y) - {System.out.println(Thread.currentThread().getName() handle返回的结果: x);countDownLatch.countDown();return 2;});try {countDownLatch.await();System.out.println(接收到的返回值为:handle1.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}System.out.println(end!);}
}
package com;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;public class handleAsyncTest2 {public static void main(String[] args) {CountDownLatch countDownLatch new CountDownLatch(2);System.out.println(start!);CompletableFutureInteger future1 CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread().getName() 进行一连串操作1....);try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return 1;});// whenComplete方法,返回的future的结果还是1CompletableFutureInteger future future1.whenComplete((x, y) - {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() whenComplete,future返回: x);});// handler返回的future结果是字符串2CompletableFutureString handle future.handle((x, y) - {System.out.println(Thread.currentThread().getName() handle接收的结果: x);countDownLatch.countDown();return 2;});CompletableFutureInteger handle1 handle.handle((x, y) - {System.out.println(Thread.currentThread().getName() handle返回的结果: x);countDownLatch.countDown();return 2;});try {countDownLatch.await();System.out.println(接收到的返回值为:handle1.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}System.out.println(end!);}
}# 程序输出
start!
ForkJoinPool.commonPool-worker-1 进行一连串操作1....
ForkJoinPool.commonPool-worker-1 whenComplete,future返回:1
ForkJoinPool.commonPool-worker-1 handle接收的结果:1
ForkJoinPool.commonPool-worker-1 handle返回的结果:2
接收到的返回值为:2
end!package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class Test7 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {// 创建异步执行任务:CompletableFutureDouble cf CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() job1 start,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}if (false) {throw new RuntimeException(test);} else {System.out.println(Thread.currentThread() job1 exit,time- System.currentTimeMillis());return 1.2;}});//cf执行完成后会将执行结果和执行过程中抛出的异常传入回调方法如果是正常执行的则传入的异常为nullCompletableFutureString cf2 cf.handle((a, b) - {System.out.println(Thread.currentThread() job2 start,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}if (b ! null) {System.out.println(error stack trace-);b.printStackTrace();} else {System.out.println(run succ,result- a);}System.out.println(Thread.currentThread() job2 exit,time- System.currentTimeMillis());if (b ! null) {return run error;} else {return run succ;}});//等待子任务执行完成System.out.println(main thread start wait,time- System.currentTimeMillis());//get的结果是cf2的返回值跟cf没关系了System.out.println(run result- cf2.get());System.out.println(main thread exit,time- System.currentTimeMillis());}}# 程序输出
Thread[ForkJoinPool.commonPool-worker-1,5,main]job1 start,time-1693040276755
main thread start wait,time-1693040276755
Thread[ForkJoinPool.commonPool-worker-1,5,main]job1 exit,time-1693040278755
Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 start,time-1693040278755
run succ,result-1.2
Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 exit,time-1693040280757
run result-run succ
main thread exit,time-1693040280758将上述示例中的if(true) 改成if(false)其输出如下
# 程序输出
Thread[ForkJoinPool.commonPool-worker-1,5,main]job1 start,time-1693040314676
main thread start wait,time-1693040314677
Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 start,time-1693040316676
error stack trace-
Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 exit,time-1693040318680
run result-run error
main thread exit,time-1693040318681
java.util.concurrent.CompletionException: java.lang.RuntimeException: testat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: testat com.test.Test7.lambda$test$0(Test7.java:21)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)... 5 more6、结果转换(thenApply和thenCompose)
将上一段任务的执行结果作为下一阶段任务的入参参与重新计算产生新的结果。
6.1 thenApply
thenApply接收一个函数作为参数使用该函数处理上一个CompletableFuture调用的结果并返回一个具有处
理结果的Future对象。
常用使用
public U CompletableFutureU thenApply(Function? super T,? extends U fn)public U CompletableFutureU thenApplyAsync(Function? super T,? extends U fn)T上一个任务返回结果的类型。
U当前任务的返回值类型。
thenApply 表示某个任务执行完成后执行的动作即回调方法会将该任务的执行结果即方法返回值作为入参传
递到回调方法中。
package com;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class ThenApplyTest {public static void main(String[] args) {CompletableFutureInteger future CompletableFuture.supplyAsync(() - {int result 100;System.out.println(第一次运算 result);return result;}).thenApply(number - {int result number * 3;System.out.println(第二次运算 result);return result;});try {Integer result future.get();System.out.println(结果是: result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}# 运行程序
第一次运算100
第二次运算300
结果是:300package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;public class Test8 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {ForkJoinPool pool new ForkJoinPool();// 创建异步执行任务:CompletableFutureDouble cf CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() start job1,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job1,time- System.currentTimeMillis());return 1.2;}, pool);//cf关联的异步任务的返回值作为方法入参传入到thenApply的方法中//thenApply这里实际创建了一个新的CompletableFuture实例CompletableFutureString cf2 cf.thenApply((result) - {System.out.println(Thread.currentThread() start job2,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job2,time- System.currentTimeMillis());return test: result;});System.out.println(main thread start cf.get(),time- System.currentTimeMillis());//等待子任务执行完成System.out.println(run result- cf.get());System.out.println(main thread start cf2.get(),time- System.currentTimeMillis());System.out.println(run result- cf2.get());System.out.println(main thread exit,time- System.currentTimeMillis());}
}# 运行程序
Thread[ForkJoinPool-1-worker-1,5,main] start job1,time-1693042190558
main thread start cf.get(),time-1693042190558
Thread[ForkJoinPool-1-worker-1,5,main] exit job1,time-1693042192558
Thread[ForkJoinPool-1-worker-1,5,main] start job2,time-1693042192558
run result-1.2
main thread start cf2.get(),time-1693042192559
Thread[ForkJoinPool-1-worker-1,5,main] exit job2,time-1693042194558
run result-test:1.2
main thread exit,time-1693042194558job1执行结束后将job1的方法返回值作为入参传递到job2中并立即执行job2。thenApplyAsync与thenApply的
区别在于前者是将job2提交到线程池中异步执行实际执行job2的线程可能是另外一个线程后者是由执行
job1的线程立即执行job2即两个job都是同一个线程执行的。
package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;public class Test9 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {ForkJoinPool pool new ForkJoinPool();// 创建异步执行任务:CompletableFutureDouble cf CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() start job1,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job1,time- System.currentTimeMillis());return 1.2;}, pool);//cf关联的异步任务的返回值作为方法入参传入到thenApply的方法中//thenApply这里实际创建了一个新的CompletableFuture实例CompletableFutureString cf2 cf.thenApplyAsync((result) - {System.out.println(Thread.currentThread() start job2,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job2,time- System.currentTimeMillis());return test: result;});System.out.println(main thread start cf.get(),time- System.currentTimeMillis());//等待子任务执行完成System.out.println(run result- cf.get());System.out.println(main thread start cf2.get(),time- System.currentTimeMillis());System.out.println(run result- cf2.get());System.out.println(main thread exit,time- System.currentTimeMillis());}
}# 程序输出
Thread[ForkJoinPool-1-worker-1,5,main] start job1,time-1693042362021
main thread start cf.get(),time-1693042362022
Thread[ForkJoinPool-1-worker-1,5,main] exit job1,time-1693042364022
Thread[ForkJoinPool.commonPool-worker-1,5,main] start job2,time-1693042364023
run result-1.2
main thread start cf2.get(),time-1693042364024
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job2,time-1693042366024
run result-test:1.2
main thread exit,time-1693042366024从输出可知执行job1和job2是两个不同的线程。
Executor实现如果不指定默认使用ForkJoinPool.commonPool()。 下述的多个方法每个方法都有两个以
Async结尾的方法一个使用默认的Executor实现一个使用指定的Executor实现不带Async的方法是由触发该
任务的线程执行该任务带Async的方法是由触发该任务的线程将任务提交到线程池执行任务的线程跟触发任务
的线程不一定是同一个。
6.2 thenCompose
thenCompose的参数为一个返回CompletableFuture实例的函数该函数的参数是先前计算步骤的结果。
常用方法
public U CompletableFutureU thenCompose(Function? super T, ? extends CompletionStageU fn);public U CompletableFutureU thenComposeAsync(Function? super T, ? extends CompletionStageU fn) ;thenCompose 方法会在某个任务执行完成后将该任务的执行结果作为方法入参然后执行指定的方法该方法会
返回一个新的CompletableFuture实例如果该CompletableFuture实例的result不为null则返回一个基于该
result的新的CompletableFuture实例然后执行这个新任务。
package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Supplier;public class ThenComposeTest {public static void main(String[] args) {CompletableFutureInteger future CompletableFuture.supplyAsync(new SupplierInteger() {Overridepublic Integer get() {int number new Random().nextInt(30);System.out.println(第一次运算 number);return number;}}).thenCompose(new FunctionInteger, CompletionStageInteger() {Overridepublic CompletionStageInteger apply(Integer param) {return CompletableFuture.supplyAsync(new SupplierInteger() {Overridepublic Integer get() {int number param * 2;System.out.println(第二次运算 number);return number;}});}});try {Integer result future.get();System.out.println(结果是: result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}# 程序输出
第一次运算24
第二次运算48
结果是:48thenApply和thenCompose的区别
thenApply转换的是泛型中的类型返回的是同一个CompletableFuture
thenCompose将内部的CompletableFuture调用展开来并使用上一个CompletableFutre调用的结果在下一步的
CompletableFuture调用中进行运算是生成一个新的CompletableFuture。
package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class Test10 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {// 创建异步执行任务:CompletableFutureDouble cf CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() start job1,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job1,time- System.currentTimeMillis());return 1.2;});CompletableFutureString cf2 cf.thenCompose((param) - {System.out.println(Thread.currentThread() start job2,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job2,time- System.currentTimeMillis());return CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() start job3,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job3,time- System.currentTimeMillis());return job3 test;});});System.out.println(main thread start cf.get(),time- System.currentTimeMillis());//等待子任务执行完成System.out.println(cf run result- cf.get());System.out.println(main thread start cf2.get(),time- System.currentTimeMillis());System.out.println(cf2 run result- cf2.get());System.out.println(main thread exit,time- System.currentTimeMillis());}
}# 程序输出
Thread[ForkJoinPool.commonPool-worker-1,5,main] start job1,time-1693043028531
main thread start cf.get(),time-1693043028531
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job1,time-1693043030531
Thread[ForkJoinPool.commonPool-worker-1,5,main] start job2,time-1693043030531
cf run result-1.2
main thread start cf2.get(),time-1693043030534
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job2,time-1693043032532
Thread[ForkJoinPool.commonPool-worker-1,5,main] start job3,time-1693043032533
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job3,time-1693043034534
cf2 run result-job3 test
main thread exit,time-1693043034534job1执行完成后job2开始执行等job2执行完成后会把job3返回然后执行job3等job3执行完成后主线程退
出。
7、结果消费(thenAccept和thenAcceptBoth和thenRun)
与结果处理和结果转换系列函数返回一个新的CompletableFuture不同结果消费系列函数只对结果执行
Action而不返回新的计算值。
根据对结果的处理方式结果消费函数又可以分为下面三大类
thenAccept()对单个结果进行消费。
thenAcceptBoth()对两个结果进行消费。
thenRun()不关心结果只对结果执行Action。
7.1 thenAccept
观察该系列函数的参数类型可知它们是函数式接口Consumer这个接口只有输入没有返回值。
thenAccept 同 thenApply 接收上一个任务的返回值作为参数但是无返回值。
常用方法
public CompletionStageVoid thenAccept(Consumer? super T action);public CompletionStageVoid thenAcceptAsync(Consumer? super T action);package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;public class ThenAcceptTest {public static void main(String[] args) {CompletableFutureVoid future CompletableFuture.supplyAsync(() - {int number new Random().nextInt(10);System.out.println(第一次运算 number);return number;}).thenAccept(number - System.out.println(第二次运算 number * 5));}
}# 程序输出
第一次运算3
第二次运算15package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;public class Test11 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {ForkJoinPool pool new ForkJoinPool();// 创建异步执行任务:CompletableFutureDouble cf CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() start job1,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job1,time- System.currentTimeMillis());return 1.2;}, pool);// cf关联的异步任务的返回值作为方法入参传入到thenApply的方法中CompletableFuture cf2 cf.thenApply((result) - {System.out.println(Thread.currentThread() start job2,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job2,time- System.currentTimeMillis());return test: result;}).thenAccept((result) - {//接收上一个任务的执行结果作为入参但是没有返回值System.out.println(Thread.currentThread() start job3,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(result);System.out.println(Thread.currentThread() exit job3,time- System.currentTimeMillis());}).thenRun(() - {//无入参也没有返回值System.out.println(Thread.currentThread() start job4,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(thenRun do something);System.out.println(Thread.currentThread() exit job4,time- System.currentTimeMillis());});System.out.println(main thread start cf.get(),time- System.currentTimeMillis());//等待子任务执行完成System.out.println(run result- cf.get());System.out.println(main thread start cf2.get(),time- System.currentTimeMillis());//cf2 等待最后一个thenRun执行完成System.out.println(run result- cf2.get());System.out.println(main thread exit,time- System.currentTimeMillis());}}# 程序输出
Thread[ForkJoinPool-1-worker-1,5,main] start job1,time-1693043895706
main thread start cf.get(),time-1693043895707
Thread[ForkJoinPool-1-worker-1,5,main] exit job1,time-1693043897706
Thread[ForkJoinPool-1-worker-1,5,main] start job2,time-1693043897706
run result-1.2
main thread start cf2.get(),time-1693043897707
Thread[ForkJoinPool-1-worker-1,5,main] exit job2,time-1693043899706
Thread[ForkJoinPool-1-worker-1,5,main] start job3,time-1693043899706
test:1.2
Thread[ForkJoinPool-1-worker-1,5,main] exit job3,time-1693043901707
Thread[ForkJoinPool-1-worker-1,5,main] start job4,time-1693043901707
thenRun do something
Thread[ForkJoinPool-1-worker-1,5,main] exit job4,time-1693043903707
run result-null
main thread exit,time-1693043903707package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;public class Test12 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {ForkJoinPool pool new ForkJoinPool();// 创建异步执行任务:CompletableFutureDouble cf CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() job1 start,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}if (true) {throw new RuntimeException(test);} else {System.out.println(Thread.currentThread() job1 exit,time- System.currentTimeMillis());return 1.2;}}, pool);//cf执行异常时将抛出的异常作为入参传递给回调方法CompletableFutureDouble cf2 cf.exceptionally((param) - {System.out.println(Thread.currentThread() start,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(error stack trace-);param.printStackTrace();System.out.println(Thread.currentThread() exit,time- System.currentTimeMillis());return -1.1;});//cf正常执行时执行的逻辑如果执行异常则不调用此逻辑cf2.thenAccept((param) - {System.out.println(Thread.currentThread() job2 start,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(param- param);System.out.println(Thread.currentThread() job2 exit,time- System.currentTimeMillis());});System.out.println(main thread start,time- System.currentTimeMillis());//等待子任务执行完成,此处无论是job2和job3都可以实现job2退出主线程才退出如果是cf则主线程不会等待job2执行完成自动退出了//cf2.get时没有异常但是依然有返回值就是cf的返回值System.out.println(run result- cf2.get());System.out.println(main thread exit,time- System.currentTimeMillis());}
}# 程序输出
Thread[ForkJoinPool-1-worker-1,5,main]job1 start,time-1693044222992
main thread start,time-1693044222993
Thread[ForkJoinPool-1-worker-1,5,main] start,time-1693044224993
error stack trace-
Thread[ForkJoinPool-1-worker-1,5,main] exit,time-1693044226996
Thread[ForkJoinPool-1-worker-1,5,main]job2 start,time-1693044226996
java.util.concurrent.CompletionException: java.lang.RuntimeException: testat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: testat com.test.Test12.lambda$test$0(Test12.java:23)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)... 5 more
param--1.1
Thread[ForkJoinPool-1-worker-1,5,main]job2 exit,time-1693044228998
run result--1.1
main thread exit,time-1693044228999将上述示例中的if(true) 改成if(false)其输出如下
# 程序输出
Thread[ForkJoinPool-1-worker-1,5,main]job1 start,time-1693044349555
main thread start,time-1693044349556
Thread[ForkJoinPool-1-worker-1,5,main]job1 exit,time-1693044351556
Thread[ForkJoinPool-1-worker-1,5,main]job2 start,time-1693044351556
param-1.2
Thread[ForkJoinPool-1-worker-1,5,main]job2 exit,time-1693044353559
run result-1.2
main thread exit,time-1693044353559cf2没有指定其result就是cf执行的结果理论上cf2.get应该立即返回的此处是等待了cf3即job2执行完成后
才返回。
7.2 thenAcceptBoth
thenAcceptBoth函数的作用是当两个CompletionStage都正常完成计算的时候就会执行提供的action消
费两个异步的结果。
常用方法
public U CompletionStageVoid thenAcceptBoth(CompletionStage? extends U other,BiConsumer? super T, ? super U action);public U CompletionStageVoid thenAcceptBothAsync(CompletionStage? extends U other,BiConsumer? super T, ? super U action);thenAcceptBoth 将两个任务的执行结果作为方法入参但是无返回值。
package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;public class ThenAcceptBothTest {public static void main(String[] args) {System.out.println(main thread start time- System.currentTimeMillis());CountDownLatch countDownLatch new CountDownLatch(1);CompletableFutureInteger future1 CompletableFuture.supplyAsync(new SupplierInteger() {Overridepublic Integer get() {int number new Random().nextInt(3) 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(任务1结果 number);return number;}});CompletableFutureInteger future2 CompletableFuture.supplyAsync(new SupplierInteger() {Overridepublic Integer get() {int number new Random().nextInt(3) 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(任务2结果 number);return number;}});future1.thenAcceptBoth(future2, new BiConsumerInteger, Integer() {Overridepublic void accept(Integer x, Integer y) {System.out.println(最终结果 (x y));countDownLatch.countDown();}});try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(main thread end time- System.currentTimeMillis());}
}# 程序输出
main thread start time-1693053398891
任务1结果3
任务2结果3
最终结果6
main thread end time-16930534018967.3 thenRun
thenRun也是对线程任务结果的一种消费函数与thenAccept不同的是thenRun会在上一阶段
CompletableFuture计算完成的时候执行一个Runnable而Runnable并不使用该CompletableFuture计算的
结果。
thenRun 的方法没有入参也没有返回值。
常用方法
public CompletionStageVoid thenRun(Runnable action);public CompletionStageVoid thenRunAsync(Runnable action);package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;public class ThenRunTest {public static void main(String[] args) {CompletableFutureVoid future CompletableFuture.supplyAsync(() - {int number new Random().nextInt(10);System.out.println(第一阶段 number);return number;}).thenRun(() - System.out.println(thenRun 执行));}
}# 程序输出
第一阶段8
thenRun 执行8、结果组合(thenCombine)
8.1 thenCombine
合并两个线程任务的结果并进一步处理。
常用方法
public U,V CompletableFutureV thenCombine(CompletionStage? extends U other,BiFunction? super T,? super U,? extends V fn);public U,V CompletableFutureV thenCombineAsync(CompletionStage? extends U other,BiFunction? super T,? super U,? extends V fn);public U,V CompletableFutureV thenCombineAsync(CompletionStage? extends U other,BiFunction? super T,? super U,? extends V fn, Executor executor);thenCombine 会将两个任务的执行结果作为方法入参传递到指定方法中且该方法有返回值。
package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Supplier;public class ThenCombineTest {public static void main(String[] args) {CompletableFutureInteger future1 CompletableFuture.supplyAsync(new SupplierInteger() {Overridepublic Integer get() {int number new Random().nextInt(10);System.out.println(任务1结果 number);return number;}});CompletableFutureInteger future2 CompletableFuture.supplyAsync(new SupplierInteger() {Overridepublic Integer get() {int number new Random().nextInt(10);System.out.println(任务2结果 number);return number;}});CompletableFutureInteger result future1.thenCombine(future2, new BiFunctionInteger, Integer, Integer() {Overridepublic Integer apply(Integer x, Integer y) {return x y;}});try {Integer integer result.get();System.out.println(结果是: integer);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}# 程序输出
任务1结果0
任务2结果4
结果是:4thenCombine / thenAcceptBoth / runAfterBoth:
这三个方法都是将两个CompletableFuture组合起来只有这两个都正常执行完了才会执行某个任务区别在于
thenCombine会将两个任务的执行结果作为方法入参传递到指定方法中且该方法有返回值thenAcceptBoth同
样将两个任务的执行结果作为方法入参但是无返回值runAfterBoth没有入参也没有返回值。注意两个任务
中只要有一个执行异常则将该异常信息作为指定任务的执行结果。
package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;public class Test13 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {ForkJoinPool pool new ForkJoinPool();// 创建异步执行任务:CompletableFutureDouble cf CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() start job1,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job1,time- System.currentTimeMillis());return 1.2;});CompletableFutureDouble cf2 CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() start job2,time- System.currentTimeMillis());try {Thread.sleep(1500);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job2,time- System.currentTimeMillis());return 3.2;});//cf和cf2的异步任务都执行完成后会将其执行结果作为方法入参传递给cf3,且有返回值CompletableFutureDouble cf3 cf.thenCombine(cf2, (a, b) - {System.out.println(Thread.currentThread() start job3,time- System.currentTimeMillis());System.out.println(job3 param a- a ,b- b);try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job3,time- System.currentTimeMillis());return a b;});//cf和cf2的异步任务都执行完成后会将其执行结果作为方法入参传递给cf3,无返回值CompletableFuture cf4 cf.thenAcceptBoth(cf2, (a, b) - {System.out.println(Thread.currentThread() start job4,time- System.currentTimeMillis());System.out.println(job4 param a- a ,b- b);try {Thread.sleep(1500);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job4,time- System.currentTimeMillis());});//cf4和cf3都执行完成后执行cf5无入参无返回值CompletableFuture cf5 cf4.runAfterBoth(cf3, () - {System.out.println(Thread.currentThread() start job5,time- System.currentTimeMillis());try {Thread.sleep(1000);} catch (InterruptedException e) {}System.out.println(cf5 do something);System.out.println(Thread.currentThread() exit job5,time- System.currentTimeMillis());});System.out.println(main thread start cf.get(),time- System.currentTimeMillis());//等待子任务执行完成System.out.println(cf run result- cf.get());System.out.println(main thread start cf5.get(),time- System.currentTimeMillis());System.out.println(cf5 run result- cf5.get());System.out.println(main thread exit,time- System.currentTimeMillis());}
}# 程序输出
Thread[ForkJoinPool.commonPool-worker-1,5,main] start job1,time-1693053679581
Thread[ForkJoinPool.commonPool-worker-2,5,main] start job2,time-1693053679582
main thread start cf.get(),time-1693053679583
Thread[ForkJoinPool.commonPool-worker-2,5,main] exit job2,time-1693053681082
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job1,time-1693053681582
Thread[ForkJoinPool.commonPool-worker-1,5,main] start job4,time-1693053681582
Thread[main,5,main] start job3,time-1693053681582
job3 param a-1.2,b-3.2
job4 param a-1.2,b-3.2
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job4,time-1693053683085
Thread[main,5,main] exit job3,time-1693053683585
Thread[main,5,main] start job5,time-1693053683585
cf5 do something
Thread[main,5,main] exit job5,time-1693053684586
cf run result-1.2
main thread start cf5.get(),time-1693053684586
cf5 run result-null
main thread exit,time-1693053684586job1 和 job2几乎同时运行job2比job1先执行完成等job1退出后job3和job4几乎同时开始运行job4先退
出等job3执行完成job5开始了等job5执行完成后主线程退出。
9、任务交互(applyToEither和acceptEither和runAfterEither和anyOf和allOf和runAfterBoth)
线程交互指将两个线程任务获取结果的速度相比较按一定的规则进行下一步处理。
9.1 applyToEither
两个线程任务相比较先获得执行结果的就对该结果进行下一步的转化操作。
常用方法
public U CompletionStageU applyToEither(CompletionStage? extends T other,Function? super T, U fn);public U CompletionStageU applyToEitherAsync(CompletionStage? extends T other,Function? super T, U fn);applyToEither 会将已经执行完成的任务的执行结果作为方法入参并有返回值。
package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;public class ApplyToEitherTest {public static void main(String[] args) {CompletableFutureInteger future1 CompletableFuture.supplyAsync(new SupplierInteger() {Overridepublic Integer get() {int number new Random().nextInt(10);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(任务1结果: number);return number;}});CompletableFutureInteger future2 CompletableFuture.supplyAsync(new SupplierInteger() {Overridepublic Integer get() {int number new Random().nextInt(10);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(任务2结果: number);return number;}});future1.applyToEither(future2, new FunctionInteger, Integer() {Overridepublic Integer apply(Integer number) {System.out.println(最快结果 number);return number * 2;}});try {Integer result future1.get();System.out.println(结果是: result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}# 程序输出
任务2结果:4
最快结果4
任务1结果:8
结果是:89.2 acceptEither
两个线程任务相比较先获得执行结果的就对该结果进行下一步的消费操作。
常用方法
public CompletionStageVoid acceptEither(CompletionStage? extends T other,Consumer? super T action);public CompletionStageVoid acceptEitherAsync(CompletionStage? extends T other,Consumer? super T action);acceptEither同样将已经执行完成的任务的执行结果作为方法入参但是没有返回值。
package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;public class AcceptEitherTest {public static void main(String[] args) {CountDownLatch countDownLatch new CountDownLatch(1);CompletableFutureInteger future1 CompletableFuture.supplyAsync(new SupplierInteger() {Overridepublic Integer get() {int number new Random().nextInt(10) 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(第一阶段 number);return number;}});CompletableFutureInteger future2 CompletableFuture.supplyAsync(new SupplierInteger() {Overridepublic Integer get() {int number new Random().nextInt(10) 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(第二阶段 number);return number;}});future1.acceptEither(future2, new ConsumerInteger() {Overridepublic void accept(Integer number) {System.out.println(最快结果 number);countDownLatch.countDown();}});try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}}
}# 程序输出
第二阶段2
最快结果29.3 runAfterEither
两个线程任务相比较有任何一个执行完成就进行下一步操作不关心运行结果。
常用方法
public CompletionStageVoid runAfterEither(CompletionStage? other,Runnable action);public CompletionStageVoid runAfterEitherAsync(CompletionStage? other,Runnable action);runAfterEither没有方法入参也没有返回值。
package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;public class RunAfterEitherTest {public static void main(String[] args) {CompletableFutureInteger future1 CompletableFuture.supplyAsync(new SupplierInteger() {Overridepublic Integer get() {int number new Random().nextInt(5);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(任务1结果 number);return number;}});CompletableFutureInteger future2 CompletableFuture.supplyAsync(new SupplierInteger() {Overridepublic Integer get() {int number new Random().nextInt(5);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(任务2结果: number);return number;}});future1.runAfterEither(future2, new Runnable() {Overridepublic void run() {System.out.println(已经有一个任务完成了);}}).join();}
}# 程序输出
任务2结果:4
任务1结果4
已经有一个任务完成了applyToEither / acceptEither / runAfterEither
这三个方法都是将两个CompletableFuture组合起来只要其中一个执行完了就会执行某个任务其区别在于
applyToEither会将已经执行完成的任务的执行结果作为方法入参并有返回值acceptEither同样将已经执行完
成的任务的执行结果作为方法入参但是没有返回值runAfterEither没有方法入参也没有返回值。注意两个任
务中只要有一个执行异常则将该异常信息作为指定任务的执行结果。
package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class Test14 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {// 创建异步执行任务:CompletableFutureDouble cf CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() start job1,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job1,time- System.currentTimeMillis());return 1.2;});CompletableFutureDouble cf2 CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() start job2,time- System.currentTimeMillis());try {Thread.sleep(1500);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job2,time- System.currentTimeMillis());return 3.2;});//cf和cf2的异步任务都执行完成后会将其执行结果作为方法入参传递给cf3,且有返回值CompletableFutureDouble cf3 cf.applyToEither(cf2, (result) - {System.out.println(Thread.currentThread() start job3,time- System.currentTimeMillis());System.out.println(job3 param result- result);try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job3,time- System.currentTimeMillis());return result;});//cf和cf2的异步任务都执行完成后会将其执行结果作为方法入参传递给cf3,无返回值CompletableFuture cf4 cf.acceptEither(cf2, (result) - {System.out.println(Thread.currentThread() start job4,time- System.currentTimeMillis());System.out.println(job4 param result- result);try {Thread.sleep(1500);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job4,time- System.currentTimeMillis());});//cf4和cf3都执行完成后执行cf5无入参无返回值CompletableFuture cf5 cf4.runAfterEither(cf3, () - {System.out.println(Thread.currentThread() start job5,time- System.currentTimeMillis());try {Thread.sleep(1000);} catch (InterruptedException e) {}System.out.println(cf5 do something);System.out.println(Thread.currentThread() exit job5,time- System.currentTimeMillis());});System.out.println(main thread start cf.get(),time- System.currentTimeMillis());//等待子任务执行完成System.out.println(cf run result- cf.get());System.out.println(main thread start cf5.get(),time- System.currentTimeMillis());System.out.println(cf5 run result- cf5.get());System.out.println(main thread exit,time- System.currentTimeMillis());}
}# 程序输出
Thread[ForkJoinPool.commonPool-worker-1,5,main] start job1,time-1693054246333
Thread[ForkJoinPool.commonPool-worker-2,5,main] start job2,time-1693054246334
main thread start cf.get(),time-1693054246334
Thread[ForkJoinPool.commonPool-worker-2,5,main] exit job2,time-1693054247835
Thread[ForkJoinPool.commonPool-worker-2,5,main] start job4,time-1693054247835
job4 param result-3.2
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job1,time-1693054248335
Thread[ForkJoinPool.commonPool-worker-1,5,main] start job3,time-1693054248335
cf run result-1.2
job3 param result-1.2
main thread start cf5.get(),time-1693054248335
Thread[ForkJoinPool.commonPool-worker-2,5,main] exit job4,time-1693054249339
Thread[ForkJoinPool.commonPool-worker-2,5,main] start job5,time-1693054249339
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job3,time-1693054250336
cf5 do something
Thread[ForkJoinPool.commonPool-worker-2,5,main] exit job5,time-1693054250339
cf5 run result-null
main thread exit,time-1693054250339job1 和job2 同时开始运行job2先执行完成然后job4开始执行理论上job3和job4应该同时开始运行但是此
时只有job4开始执行了job3是等待job1执行完成后才开始执行job4先于job3执行完成然后job5开始执行
等job5执行完成后主线程退出。
9.4 anyOf
anyOf() 的参数是多个给定的 CompletableFuture当其中的任何一个完成时方法返回这个
CompletableFuture。
常用方法
public static CompletableFutureObject anyOf(CompletableFuture?... cfs)anyOf返回的CompletableFuture是多个任务只要其中一个执行完成就会执行其get返回的是已经执行完成的任
务的执行结果如果该任务执行异常则抛出异常。
package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;public class AnyOfTest {public static void main(String[] args) {Random random new Random();CompletableFutureString future1 CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(random.nextInt(5));} catch (InterruptedException e) {e.printStackTrace();}return hello;});CompletableFutureString future2 CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(random.nextInt(1));} catch (InterruptedException e) {e.printStackTrace();}return world;});CompletableFutureObject result CompletableFuture.anyOf(future1, future2);try {Object object result.get();System.out.println(object);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}# 程序输出
worldpackage com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class Test15 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {// 创建异步执行任务:CompletableFutureDouble cf CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() start job1,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job1,time- System.currentTimeMillis());return 1.2;});CompletableFutureDouble cf2 CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() start job2,time- System.currentTimeMillis());try {Thread.sleep(1500);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job2,time- System.currentTimeMillis());return 3.2;});CompletableFutureDouble cf3 CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() start job3,time- System.currentTimeMillis());try {Thread.sleep(1300);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job3,time- System.currentTimeMillis());return 2.2;});//allof等待所有任务执行完成才执行cf4如果有一个任务异常终止则cf4.get时会抛出异常都是正常执行cf4.get返回null//anyOf是只有一个任务执行完成无论是正常执行或者执行异常都会执行cf4cf4.get的结果就是已执行完成的任务的执行结果CompletableFuture cf4 CompletableFuture.anyOf(cf, cf2, cf3).whenComplete((a, b) - {if (b ! null) {System.out.println(error stack trace-);b.printStackTrace();} else {System.out.println(run succ,result- a);}});System.out.println(main thread start cf4.get(),time- System.currentTimeMillis());//等待子任务执行完成System.out.println(cf4 run result- cf4.get());System.out.println(main thread exit,time- System.currentTimeMillis());}}# 程序输出
Thread[ForkJoinPool.commonPool-worker-1,5,main] start job1,time-1693054996820
Thread[ForkJoinPool.commonPool-worker-2,5,main] start job2,time-1693054996820
Thread[ForkJoinPool.commonPool-worker-3,5,main] start job3,time-1693054996821
main thread start cf4.get(),time-1693054996821
Thread[ForkJoinPool.commonPool-worker-3,5,main] exit job3,time-1693054998122
run succ,result-2.2
cf4 run result-2.2
main thread exit,time-16930549981259.5 allOf
allOf方法用来实现多 CompletableFuture 的同时返回。
常用方法
public static CompletableFutureVoid allOf(CompletableFuture?... cfs)allOf返回的CompletableFuture是多个任务都执行完成后才会执行只有有一个任务执行异常则返回的
CompletableFuture执行get方法时会抛出异常如果都是正常执行则get返回null。
package com;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;public class AllOfTest {public static void main(String[] args) {CompletableFutureString future1 CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(future1完成);return future1完成;});CompletableFutureString future2 CompletableFuture.supplyAsync(() - {System.out.println(future2完成);return future2完成;});CompletableFutureVoid completableFuture CompletableFuture.allOf(future1, future2);try {String result String.valueOf(completableFuture.get());System.out.println(result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}# 程序输出
future2完成
future1完成
future1完成
future2完成
nullpackage com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class Test16 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {// 创建异步执行任务:CompletableFutureDouble cf CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() start job1,time- System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job1,time- System.currentTimeMillis());return 1.2;});CompletableFutureDouble cf2 CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() start job2,time- System.currentTimeMillis());try {Thread.sleep(1500);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() exit job2,time- System.currentTimeMillis());return 3.2;});CompletableFutureDouble cf3 CompletableFuture.supplyAsync(() - {System.out.println(Thread.currentThread() start job3,time- System.currentTimeMillis());try {Thread.sleep(1300);} catch (InterruptedException e) {}
// throw new RuntimeException(test);System.out.println(Thread.currentThread() exit job3,time- System.currentTimeMillis());return 2.2;});//allof等待所有任务执行完成才执行cf4如果有一个任务异常终止则cf4.get时会抛出异常都是正常执行cf4.get返回null//anyOf是只有一个任务执行完成无论是正常执行或者执行异常都会执行cf4cf4.get的结果就是已执行完成的任务的执行结果CompletableFuture cf4 CompletableFuture.allOf(cf, cf2, cf3).whenComplete((a, b) - {if (b ! null) {System.out.println(error stack trace-);b.printStackTrace();} else {System.out.println(run succ,result- a);}});System.out.println(main thread start cf4.get(),time- System.currentTimeMillis());//等待子任务执行完成System.out.println(cf4 run result- cf4.get());System.out.println(main thread exit,time- System.currentTimeMillis());}}# 程序输出
Thread[ForkJoinPool.commonPool-worker-1,5,main] start job1,time-1693055202550
Thread[ForkJoinPool.commonPool-worker-2,5,main] start job2,time-1693055202550
Thread[ForkJoinPool.commonPool-worker-3,5,main] start job3,time-1693055202550
main thread start cf4.get(),time-1693055202551
Thread[ForkJoinPool.commonPool-worker-3,5,main] exit job3,time-1693055203852
Thread[ForkJoinPool.commonPool-worker-2,5,main] exit job2,time-1693055204051
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job1,time-1693055204551
run succ,result-null
cf4 run result-null
main thread exit,time-1693055204551主线程等待最后一个job1执行完成后退出。
9.6 join
package com;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;public class JoinTest {public static void main(String[] args) throws InterruptedException {ListInteger numberList new ArrayList();for (int i 1; i 11; i) {numberList.add(i);}ListCompletableFuture? futureList new ArrayList();System.out.println(start!);long start System.currentTimeMillis();for (Integer number : numberList) {futureList.add(CompletableFuture.runAsync(() - {try {Thread.sleep(1000);System.out.println(number);} catch (InterruptedException e) {e.printStackTrace();}}));}CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()])).join();long end System.currentTimeMillis();System.out.println(end!);System.out.println(耗时: (end - start));}
}# 程序输出
start!
4
1
2
3
5
6
7
8
9
10
end!
耗时:2046package com;import java.util.concurrent.CompletableFuture;public class JoinTest2 {public static void main(String[] args) {CompletableFutureVoid future1 CompletableFuture.runAsync(() - {System.out.println(Task 1 started);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Task 1 completed);});CompletableFutureVoid future2 CompletableFuture.runAsync(() - {System.out.println(Task 2 started);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Task 2 completed);});CompletableFutureVoid future3 CompletableFuture.runAsync(() - {System.out.println(Task 3 started);try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Task 3 completed);});CompletableFutureVoid allFutures CompletableFuture.allOf(future1, future2, future3);allFutures.thenRun(() - {System.out.println(All tasks completed);});// 防止 JVM 在 CompletableFuture 执行完之前退出try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}
}# 程序输出
Task 1 started
Task 2 started
Task 3 started
Task 1 completed
Task 2 completed
Task 3 completed
All tasks completedpackage com;import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;public class JoinTest3 {public static void main(String[] args) {ExecutorService threadPool Executors.newFixedThreadPool(10);ListCompletableFutureVoid futures IntStream.rangeClosed(1, 10).mapToObj(n - CompletableFuture.runAsync(() - {System.out.println(done n);}, threadPool)).collect(Collectors.toList());futures.forEach(CompletableFuture::join);System.out.println(all done);threadPool.shutdown();}
}# 程序输出
done 2
done 5
done 4
done 3
done 1
done 6
done 7
done 8
done 9
done 10
all done9.7 runAfterBoth
runAfterBoth组合两个future不需要获取future的结果只需两个future处理完任务后处理该任务。
package com;import java.util.concurrent.CompletableFuture;public class RunAfterBothTest {public static void main(String[] args) {CompletableFutureInteger future1 CompletableFuture.supplyAsync(() - {System.out.println(线程1开始了 Thread.currentThread().getName());int i 100 / 10;System.out.println(线程1结束了 Thread.currentThread().getName());return i;});CompletableFutureInteger future2 CompletableFuture.supplyAsync(() - {System.out.println(线程2开始了 Thread.currentThread().getName());int i 100 / 5;System.out.println(线程2结束了 Thread.currentThread().getName());return i;});// 希望在future1 future2任务执行完之后执行future3// runAfterBothAsync不能获取前面两个线程的返回结果,本身也没有返回结果CompletableFutureVoid voidCompletableFuture future1.runAfterBothAsync(future2, () - {System.out.println(线程3执行了);});}
}# 程序输出
线程1开始了ForkJoinPool.commonPool-worker-1
线程1结束了ForkJoinPool.commonPool-worker-1
线程2开始了ForkJoinPool.commonPool-worker-2
线程2结束了ForkJoinPool.commonPool-worker-2
线程3执行了10、CompletableFuture常用方法总结 11、实现最优的烧水泡茶程序
著名数学家华罗庚先生在《统筹方法》这篇文章里介绍了一个烧水泡茶的例子文中提到最优的工序应该是下面这
样 11.1 基于Future实现
package com;import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;/*** 烧茶案例*/
public class FutureTaskTest {public static void main(String[] args) throws ExecutionException, InterruptedException {long start System.currentTimeMillis();// 创建任务T2的FutureTaskFutureTaskString ft2 new FutureTask(new T2Task());// 创建任务T1的FutureTaskFutureTaskString ft1 new FutureTask(new T1Task(ft2));// 线程T1执行任务ft2Thread T1 new Thread(ft2);T1.start();// 线程T2执行任务ft1Thread T2 new Thread(ft1);T2.start();// 等待线程T1执行结果System.out.println(ft1.get());long end System.currentTimeMillis();System.out.println(耗时: (end - start));}
}// T1Task需要执行的任务
// 洗水壶、烧开水、泡茶
class T1Task implements CallableString {FutureTaskString ft2;// T1任务需要T2任务的FutureTaskT1Task(FutureTaskString ft2) {this.ft2 ft2;}Overridepublic String call() throws Exception {// 洗水壶System.out.println(T1:洗水壶...);TimeUnit.SECONDS.sleep(1);// 烧开水System.out.println(T1:烧开水...);TimeUnit.SECONDS.sleep(15);// 获取T2线程的茶叶String tf ft2.get();System.out.println(T1:拿到茶叶: tf);// 泡茶System.out.println(T1:泡茶...);return 上茶: tf;}
}// T2Task需要执行的任务
// 洗茶壶、洗茶杯、拿茶叶
class T2Task implements CallableString {Overridepublic String call() throws Exception {// 洗茶壶System.out.println(T2:洗茶壶...);TimeUnit.SECONDS.sleep(1);// 洗茶杯System.out.println(T2:洗茶杯...);TimeUnit.SECONDS.sleep(2);// 拿茶叶System.out.println(T2:拿茶叶...);TimeUnit.SECONDS.sleep(1);return 龙井;}
}# 程序输出
T2:洗茶壶...
T1:洗水壶...
T2:洗茶杯...
T1:烧开水...
T2:拿茶叶...
T1:拿到茶叶:龙井
T1:泡茶...
上茶:龙井
耗时:1600311.2 基于CompletableFuture实现
package com;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureTest {public static void main(String[] args) {long start System.currentTimeMillis();// 任务1洗水壶-烧开水CompletableFutureVoid f1 CompletableFuture.runAsync(() - {// 洗水壶System.out.println(T1:洗水壶...);sleep(1, TimeUnit.SECONDS);// 烧开水System.out.println(T1:烧开水...);sleep(15, TimeUnit.SECONDS);});// 任务2洗茶壶-洗茶杯-拿茶叶CompletableFutureString f2 CompletableFuture.supplyAsync(() - {// 洗茶壶System.out.println(T2:洗茶壶...);sleep(1, TimeUnit.SECONDS);// 洗茶杯System.out.println(T2:洗茶杯...);sleep(2, TimeUnit.SECONDS);// 拿茶叶System.out.println(T2:拿茶叶...);sleep(1, TimeUnit.SECONDS);return 龙井;});// 任务3任务1和任务2完成后执行泡茶CompletableFutureString f3 f1.thenCombine(f2, (a, b) - {System.out.println(T1:拿到茶叶: b);System.out.println(T1:泡茶...);return 上茶: b;});//等待任务3执行结果System.out.println(f3.join());long end System.currentTimeMillis();System.out.println(耗时: (end - start));}static void sleep(int t, TimeUnit u) {try {u.sleep(t);} catch (InterruptedException e) {}}}结果
# 程序输出
T1:洗水壶...
T2:洗茶壶...
T1:烧开水...
T2:洗茶杯...
T2:拿茶叶...
T1:拿到茶叶:龙井
T1:泡茶...
上茶:龙井
耗时:16055