Java中的利器CompletableFuture
Java中的利器CompletableFuture
三生石异步是一种程序设计的思想,使用异步模式设计的程序可以显著减少线程等待,从而在高吞吐量的场景中,极大提升系统的整体性能,显著降低时延。
在实际工作中可以使用异步框架或者响应式框架来解决异步编程问题,但是异步框架或者响应式框架学习成本较高,但是jdk8提供了CompletableFuture也可以实现异步编程的问题。
初始化
CompletableFuture提供了四个静态方法实现异步操作。
runAsybc 无返回值,supplyAsync具有返回值,如果没有指定线程池则使用 ForkJoinPool.commonPool()
// 无返回值的CompletableFuture
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
// 无返回值的CompletableFuture,并使用指定的线程池
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
// 具有返回值的CompletableFuture
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
// 具有返回值的CompletableFuture,并使用指定的线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
// 不指定线程池则使用ForkJoinPoll
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
使用示例:
// 创建一个异步操作,借助CompletableFuture
@Test
public void initCompletableFuture() throws ExecutionException, InterruptedException {
//没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。
// 如果指定线程池,则使用指定的线程池运行
CompletableFuture<Void> noResult = CompletableFuture.runAsync(() -> {
System.out.println("开启一个无返回值的,并使用默认的线程池ForkJoinPool.commonPool()");
}, Executors.newSingleThreadScheduledExecutor());
CompletableFuture<String> havResult = CompletableFuture.supplyAsync(() -> {
return "有返回值的CompletableFuture";
});
System.out.println(havResult.get());
}
thenApply方法
如果一个线程依赖一个线程的结果时,可以使用thenApply方法将串行化执行。
Function<? super T,? extends U> T:上一个任务返回结果的类型,U:当前任务的返回值类型
源码如下:
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
使用实例
@Test
public void initCompletableFuture() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 1000;
}).thenApply(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) {
return integer + 24;
}
});
int result = future.get();
System.out.println(result);
}
handle 方法
handle是计算完成是对结果进行处理,handle和thenApply方法基本一致,与thenApply不同的是handle方式计算完成时在调用,还可以处理异常任务,而thenApply方法只能处理正常任务,出现异常时不在执行thenApply,
源码如下:
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(asyncPool, fn);
}
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
return uniHandleStage(screenExecutor(executor), fn);
}
使用示例:
// 创建一个异步操作,借助CompletableFuture
@Test
public void initCompletableFuture() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 1000;
}).handle(new BiFunction<Integer, Throwable, Integer>() {
@Override
public Integer apply(Integer integer, Throwable throwable) {
if (throwable == null) {
return 24 + integer;
} else {
System.out.println(throwable);
}
return integer;
}
});
int result = future.get();
System.out.println(result);
}
完成后回调的方法
当异步任务完成时、或者抛出异常,指定执行active,可以使用以下方法:
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(asyncPool, action);
}
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}
// 当异步任务执行失败时,需要指定的处理任务
public CompletableFuture<T> exceptionally(
Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(fn);
}
使用的示例:
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("hello world");
});
future.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void aVoid, Throwable throwable) {
if (throwable == null) {
System.out.println("After one thousand,");
System.out.println("You are already a great god level");
} else {
System.out.println("Start to quit");
}
}
});
future.exceptionally(new Function<Throwable, Void>() {
@Override
public Void apply(Throwable throwable) {
System.out.println("Give it up, boy. It's hopeless");
return null;
}
});
thenAccept方法
thenAccept方式主要处理结果,并可以加以处理消费的,与其相似的方法(重载的方法)
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(asyncPool, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
使用示例:
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
return "hello world";
}).thenAccept(str -> {
str += "<----------->";
System.out.println(str);
});
thenRun
thenRun方法与thenAccept方法基本一致,不同的是,它不依赖线程的执行结果,只要异步任务完成就开始执行指定的active.与该方法功能类似(或其重构方法)
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
使用示例:
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("First Async");
}).thenRun(() -> {
System.out.println("Second Async");
});
thenCombine方法:
thenCombine方法会把有返回值的两个CompletableFuture异步任务都执行完成后,使用thenCombine方法进行聚合操作,以下是功能相似(或重构方法)
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(asyncPool, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}
使用示例:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 1000;
});
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
return 24;
});
CompletableFuture<String> future2 = future.thenCombine(future1, new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer integer, Integer integer2) {
return String.valueOf(integer + integer2);
}
});
System.out.println(future2.get()); // print 1024
thenCompose方法:
该方法运行两个方法进行流操作,将第一个执行的结果作为第二执行方法的参数,类似于一个流程的有状态的中间处理操作
public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(null, fn);
}
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(asyncPool, fn);
}
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn,
Executor executor) {
return uniComposeStage(screenExecutor(executor), fn);
}
使用示例:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 1000;
}).thenCompose(new Function<Integer, CompletionStage<Integer>>() {
@Override
public CompletionStage<Integer> apply(Integer integer) {
return CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return integer + 24;
}
});
}
});
System.out.println(future.get()); // print 1024
runAfterBoth方法:
两个CompletionStage方法执行完成后,才会执行下一步操作,类似功能(或重构)
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action) {
return biRunStage(null, other, action);
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action) {
return biRunStage(asyncPool, other, action);
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
return biRunStage(screenExecutor(executor), other, action);
}
使用示例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "hello ";
});
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "world";
});
future.runAfterBoth(future1, () -> {
try {
String str = future.get() + future1.get();
System.out.println(str);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
System.out.println(future.get());
runAfterEither 方法
该方法表示两个CompletionStage方法,只要其中一个执行完成了就执行指定的方法,提供类似功能(或重载方法)的有:
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
Runnable action) {
return orRunStage(null, other, action);
}
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action) {
return orRunStage(asyncPool, other, action);
}
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
return orRunStage(screenExecutor(executor), other, action);
}
简单的使用实例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "hello ";
});
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "world";
});
future.runAfterEither(future1, () -> {
System.out.println("future 或者 future1 其中一个已经完成了!");
});
thenAcceptBoth方法
该方法表示只要CompletionStage都完成了,把其中任何一个的结果交给一个消费处理,提供类似方法(或者重载方法)有如下:
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(null, other, action);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(asyncPool, other, action);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor) {
return biAcceptStage(screenExecutor(executor), other, action);
}
简单使用实例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "hello ";
});
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "world";
});
future.thenAcceptBoth(future1, new BiConsumer<String, String>() {
@Override
public void accept(String s, String s2) {
String str = s +s2;
System.out.println(str);
}
});
***applyToEither方法 ***
该方法表示两个CompletionStage,谁执行快点就用谁的结果做为消耗。类似功能有:
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(null, other, fn);
}
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(asyncPool, other, fn);
}
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor) {
return orApplyStage(screenExecutor(executor), other, fn);
}
简单的使用实例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "hello ";
});
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "world";
});
CompletableFuture<String> result = future.applyToEither(future1, new Function<String, String>() {
@Override
public String apply(String s) {
return s;
}
});
System.out.println(result.get());
acceptEither方法
该方法表示两个CompletionStage谁执行快点,就使用快点的那个为消费的操作,类似功能的有:
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(null, other, action);
}
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(asyncPool, other, action);
}
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor) {
return orAcceptStage(screenExecutor(executor), other, action);
}
简单的使用实例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "hello ";
});
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "world";
});
future.acceptEither(future1, new Consumer<String>() {
// 没有任何返回值,若需要返回值的则使用applyToEither
@Override
public void accept(String s) {
System.out.println(s);
}
});
**runAfterEither **
该方法两个CompletionStage方法,任何一个完成了都会执行指定的active方法,类似的方法.
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
Runnable action) {
return orRunStage(null, other, action);
}
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action) {
return orRunStage(asyncPool, other, action);
}
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
return orRunStage(screenExecutor(executor), other, action);
}
使用实例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "hello ";
});
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "world";
});
future.runAfterEither(future1, () -> {
System.out.println("执行好了一个");
});
以上是对CompletableFuture的api简单使用。