GVKun编程网logo

将Java Future转换为CompletableFuture(java future completable future)

10

对于想了解将JavaFuture转换为CompletableFuture的读者,本文将是一篇不可错过的文章,我们将详细介绍javafuturecompletablefuture,并且为您提供关于005

对于想了解将Java Future转换为CompletableFuture的读者,本文将是一篇不可错过的文章,我们将详细介绍java future completable future,并且为您提供关于005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture、CompletableFuture CompletableFuture.supplyAsync 异常处理、CompletableFuture 源码详解之 java.util.concurrent.CompletableFuture#supplyAsync (java.util.function.Suppl...、CompletableFuture.allOf that doens''t return Void (CompletableFuture.allOf 不能返回 Void 的解决方法)的有价值信息。

本文目录一览:

将Java Future转换为CompletableFuture(java future completable future)

将Java Future转换为CompletableFuture(java future completable future)

Java
8引入CompletableFuture了可组合的Future的新实现(包括一堆thenXxx方法)。我想专门使用它,但是我想使用的许多库仅返回非可组合Future实例。

有没有一种方法可以将返回的Future实例包装在内,CompleteableFuture以便我可以编写它?

答案1

小编典典

有一种方法,但是您不喜欢它。以下方法将a Future<T>转换为a CompletableFuture<T>

public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {  if (future.isDone())    return transformDoneFuture(future);  return CompletableFuture.supplyAsync(() -> {    try {      if (!future.isDone())        awaitFutureIsDoneInForkJoinPool(future);      return future.get();    } catch (ExecutionException e) {      throw new RuntimeException(e);    } catch (InterruptedException e) {      // Normally, this should never happen inside ForkJoinPool      Thread.currentThread().interrupt();      // Add the following statement if the future doesn''t have side effects      // future.cancel(true);      throw new RuntimeException(e);    }  });}private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {  CompletableFuture<T> cf = new CompletableFuture<>();  T result;  try {    result = future.get();  } catch (Throwable ex) {    cf.completeExceptionally(ex);    return cf;  }  cf.complete(result);  return cf;}private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)    throws InterruptedException {  ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {    @Override public boolean block() throws InterruptedException {      try {        future.get();      } catch (ExecutionException e) {        throw new RuntimeException(e);      }      return true;    }    @Override public boolean isReleasable() {      return future.isDone();    }  });}

显然,这种方法的问题在于,对于每个 Future ,都会阻塞线程以等待 Future 的结果-与 Future
的想法相矛盾。在某些情况下,可能会做得更好。但是,总的来说,没有积极等待 Future 的结果就没有解决方案。

005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture

005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture

一、概述

  创建线程的两种方式,一种是直接继承 Thread,另外一种就是实现 Runnable 接口。这两种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。而自从 Java 1.5 开始,就提供了 Callable 和 Future,通过它们可以在任务执行完毕之后得到任务执行结果。

  详述:https://www.cnblogs.com/bjlhx/p/7588971.html

1.1、Runnable 接口

它是一个接口,里面只声明了一个 run () 方法:

public interface Runnable {
    public abstract void run();
}

由于 run () 方法返回值为 void 类型,所以在执行完任务之后无法返回任何结果。

1.2、Callable 接口

Callable 接口位于 java.util.concurrent 包下,在它里面也只声明了一个方法,只不过这个方法叫做 call ()。

public interface Callable<V> {   
    V call() throws Exception;
}

是一个泛型接口,call () 函数返回的类型就是传递进来的 V 类型。Callable 接口可以看作是 Runnable 接口的补充,call 方法带有返回值,并且可以抛出异常。

1.3、Future 接口

  Future 的核心思想是:

    一个方法,计算过程可能非常耗时,等待方法返回,显然不明智。可以在调用方法的时候,立马返回一个 Future,可以通过 Future 这个数据结构去控制方法 f 的计算过程。

  Future 类位于 java.util.concurrent 包下,它是一个接口:这里的控制包括:

    get 方法:获取计算结果(如果还没计算完,也是必须等待的)这个方法会产生阻塞,会一直等到任务执行完毕才返回;

    get (long timeout, TimeUnit unit) 用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回 null。

    cancel 方法:还没计算完,可以取消计算过程,如果取消任务成功则返回 true,如果取消任务失败则返回 false。参数 mayInterruptIfRunning 表示是否允许取消正在执行却没有执行完毕的任务,如果设置 true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论 mayInterruptIfRunning 为 true 还是 false,此方法肯定返回 false,即如果取消已经完成的任务会返回 false;如果任务正在执行,若 mayInterruptIfRunning 设置为 true,则返回 true,若 mayInterruptIfRunning 设置为 false,则返回 false;如果任务还没有执行,则无论 mayInterruptIfRunning 为 true 还是 false,肯定返回 true。

    isDone 方法:判断是否计算完

    isCancelled 方法:判断计算是否被取消,方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。

  也就是说 Future 提供了三种功能:

    1)判断任务是否完成;

    2)能够中断任务;

    3)能够获取任务执行结果。

  Future 就是对于具体的 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过 get 方法获取执行结果,该方法会阻塞直到任务返回结果。

  因为 Future 只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的 FutureTask。

使用 Callable+Future 获取执行结果:

public class Test {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        executor.shutdown();
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}
View Code

1.4、FutureTask 类

FutureTask 继承体系中的核心接口是 Future。事实上,FutureTask 是 Future 接口的一个唯一实现类。

如何获取 Callable 的返回结果:一般是通过 FutureTask 这个中间媒介来实现的。整体的流程是这样的:

把 Callable 实例当作参数,生成一个 FutureTask 的对象,然后把这个对象当作一个 Runnable,作为参数另起线程。

1.4.1、FutureTask 结构

  

1.4.2、FutureTask 使用

方式一、使用 thread 方式

  FutureTask 实现了 Runnable,因此它既可以通过 Thread 包装来直接执行,也可以提交给 ExecuteService 来执行。以下使用 Thread 包装线程方式启动

public static void main(String[] args) throws Exception {
        Callable<Integer> call = () -> {
            System.out.println("计算线程正在计算结果...");
            Thread.sleep(3000);
            return 1;
        };
        FutureTask<Integer> task = new FutureTask<>(call);
        Thread thread = new Thread(task);
        thread.start();

        System.out.println("main线程干点别的...");

        Integer result = task.get();
        System.out.println("从计算线程拿到的结果为:" + result);
    }

方式二、使用 ExecutorService

   ExecutorService executor = Executors.newFixedThreadPool (2); 线程池方式

public static void main(String[] args) {
        Callable<String> callable1=()->{
            Thread.sleep(2000);
            return Thread.currentThread().getName();
        };
        Callable<String> callable2=()->{
            Thread.sleep(3000);
            return Thread.currentThread().getName();
        };
        FutureTask<String> futureTask1 = new FutureTask<>(callable1);// 将Callable写的任务封装到一个由执行者调度的FutureTask对象
        FutureTask<String> futureTask2 = new FutureTask<>(callable2);

        ExecutorService executor = Executors.newFixedThreadPool(2);        // 创建线程池并返回ExecutorService实例
        executor.execute(futureTask1);  // 执行任务
        executor.execute(futureTask2);
        //同时开启了两个任务
        long startTime = System.currentTimeMillis();
        while (true) {
            try {
                if(futureTask1.isDone() && futureTask2.isDone()){//  两个任务都完成
                    System.out.println("Done");
                    executor.shutdown();                          // 关闭线程池和服务
                    return;
                }

                if(!futureTask1.isDone()){ // 任务1没有完成,会等待,直到任务完成
                    System.out.println("FutureTask1 output="+futureTask1.get());
                }

                System.out.println("Waiting for FutureTask2 to complete");
                String s = futureTask2.get(200L, TimeUnit.MILLISECONDS);
                if(s !=null){
                    System.out.println("FutureTask2 output="+s);
                }
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }catch(TimeoutException e){
                //do nothing
            }
            System.out.println((System.currentTimeMillis()-startTime));
        }
    }
View Code

使用 Callable+FutureTask 获取执行结果

public class Test {
    public static void main(String[] args) {
        //第一种方式
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        executor.submit(futureTask);
        executor.shutdown();
         
        //第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread
        /*Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        Thread thread = new Thread(futureTask);
        thread.start();*/
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}
View Code

1.5、CompletionService

原理:内部通过阻塞队列 + FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序。

package com.lhx.cloud.futruetask;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class CompletionServiceDemo {
    public static void main(String[] args)  {
        Long start = System.currentTimeMillis();
        //开启5个线程
        ExecutorService exs = Executors.newFixedThreadPool(5);
        try {
            int taskCount = 10;
            //结果集
            List<Integer> list = new ArrayList<>();
            //1.定义CompletionService
            CompletionService<Integer> completionService = new ExecutorCompletionService<>(exs);
            List<Future<Integer>> futureList = new ArrayList<>();
            //2.添加任务
            for(int i=0;i<taskCount;i++){
                futureList.add(completionService.submit(new Task(i+1)));
            }
            //==================结果归集===================
            //方法1:future是提交时返回的,遍历queue则按照任务提交顺序,获取结果
//            for (Future<Integer> future : futureList) {
//                System.out.println("====================");
//                Integer result = future.get();//线程在这里阻塞等待该任务执行完毕,按照
//                System.out.println("任务result="+result+"获取到结果!"+new Date());
//                list.add(result);
//            }

//            //方法2.使用内部阻塞队列的take()
            for(int i=0;i<taskCount;i++){
                Integer result = completionService.take().get();//采用completionService.take(),内部维护阻塞队列,任务先完成的先获取到
                System.out.println(LocalDateTime.now()+"---任务i=="+result+"完成!");
                list.add(result);
            }
            System.out.println("list="+list);
            System.out.println("总耗时="+(System.currentTimeMillis()-start));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();//关闭线程池
        }

    }


    static class Task implements Callable<Integer>{
        Integer i;

        public Task(Integer i) {
            super();
            this.i=i;
        }

        @Override
        public Integer call() throws Exception {
            if(i==5){
                Thread.sleep(5000);
            }else{
                Thread.sleep(1000);
            }
            System.out.println("线程:"+Thread.currentThread().getName()+"任务i="+i+",执行完成!");
            return i;
        }

    }
}
View Code

建议:使用率也挺高,而且能按照完成先后排序,建议如果有排序需求的优先使用。只是多线程并发执行任务结果归集,也可以使用。

二、CompletableFuture

2.1、对标 Futrue

  Future 接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。

  阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式呢?即当计算结果完成及时通知监听者。

    Future 局限性,它很难直接表述多个 Future 结果之间的依赖性。

2.2、类图

  

2.2.1、CompletionStage

  • CompletionStage 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段

  • 一个阶段的计算执行可以是一个 Function,Consumer 或者 Runnable。比如:stage.thenApply (x -> square (x)).thenAccept (x -> System.out.print (x)).thenRun (() -> System.out.println ())

  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

2.2.2、Future

2.3、创建 CompletableFuture 对象

  CompletableFuture.compleatedFuture 是一个静态辅助方法,用来返回一个已经计算好的 CompletableFuture.

  以下四个静态方法用来为一段异步执行的代码创建 CompletableFuture 对象:

public static CompletableFuture<Void>     runAsync(Runnable runnable)
public static CompletableFuture<Void>     runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U>     supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>     supplyAsync(Supplier<U> supplier, Executor executor)

  以 Async 结尾并且没有指定 Executor 的方法会使用 ForkJoinPool.commonPool () 作为它的线程池执行异步代码。

  runAsync 方法:它以 Runnabel 函数式接口类型为参数,所以 CompletableFuture 的计算结果为空。

  supplyAsync 方法以 Supplier<U> 函数式接口类型为参数,CompletableFuture 的计算结果类型为 U。

  注意:这些线程都是 Daemon 线程,主线程结束 Daemon 线程不结束,只有 JVM 关闭时,生命周期终止。

示例:简单同步用法

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            //长时间的计算任务
            try {
                System.out.println("计算型任务开始");
                Thread.sleep(2000);
                return "计算型任务结束";
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "·00";
        });
        System.out.println(future.get());
    }
View Code

2.4、计算结果完成时的处理

当 CompletableFuture 的计算结果完成,或者抛出异常的时候,可以执行特定的 Action。主要是下面的方法:

public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)  

  可以看到 Action 的类型是 BiConsumer<? super T,? super Throwable> 它可以处理正常的计算结果,或者异常情况。

  方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

示例:

package com.lhx.cloud.futruetask;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class BasicFuture {

    private static Random rand = new Random();
    private static long t = System.currentTimeMillis();

    static int getMoreData()  {
        System.out.println("begin to start compute");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("end to compute,passed " + (System.currentTimeMillis()-t));
        return rand.nextInt(1000);
    }


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(BasicFuture::getMoreData);
        Future<Integer> f = future.whenComplete((v, e) -> {
            System.out.println(v);
            System.out.println(e);
        });
        System.out.println(f.get());
    }}
View Code

2.5、转换

CompletableFuture 可以作为 monad (单子) 和 functor. 由于回调风格的实现,我们不必因为等待一个计算完成而阻塞着调用线程,而是告诉 CompletableFuture 当计算完成的时候请执行某个 Function. 还可以串联起来。

public <U> CompletableFuture<U>     thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

2.6、异常处理 completeExceptionally

  为了能获取任务线程内发生的异常,需要使用 CompletableFuture 的 completeExceptionally 方法将导致 CompletableFuture 内发生问题的异常抛出。

  这样,当执行任务发生异常时,调用 get() 方法的线程将会收到一个 ExecutionException 异常,该异常接收了一个包含失败原因的 Exception 参数。

/**
     * 任务没有异常 正常执行,然后结束
     */
    @Test
    public void test1() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成
            completableFuture.complete("ok");
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }

    /**
     * 线程有异常  正常执行,然后无法结束,主线程会一直等待
     */
    @Test
    public void test2() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                int i=1/0;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成
            completableFuture.complete("ok");
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }
View Code
/**
     * 线程有异常  正常执行,然后通过completableFuture.completeExceptionally(e);告诉completableFuture任务发生异常了
     * 主线程接收到 程序继续处理,至结束
     */
    @Test
    public void test3() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                int i = 1/0;
            } catch (Exception e) {
                // 告诉completableFuture任务发生异常了
                completableFuture.completeExceptionally(e);
            }
            // 告诉completableFuture任务已经完成
            completableFuture.complete("ok");
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }

2.7、多任务组合方法 allOf 和 anyOf

allOf 是等待所有任务完成,构造后 CompletableFuture 完成

anyOf 是只要有一个任务完成,构造后 CompletableFuture 就完成

package com.lhx.cloud.futruetask;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureDemo {
    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        // 结果集
        List<String> list = new ArrayList<>();

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10);
        // 全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
        CompletableFuture[] cfs = taskList.stream()
                .map(integer -> CompletableFuture.supplyAsync(() -> calc(integer), executorService)
                        .thenApply(h -> Integer.toString(h))
                        .whenComplete((s, e) -> {
                            System.out.println(LocalDateTime.now()+"---任务" + s + "完成!result=" + s + ",异常 e=" + e);
                            list.add(s);
                        })
                ).toArray(CompletableFuture[]::new);
        // 封装后无返回值,必须自己whenComplete()获取
        CompletableFuture.allOf(cfs).join();
        System.out.println("list=" + list + ",耗时=" + (System.currentTimeMillis() - start));
    }

    public static Integer calc(Integer i) {
        try {
            if (i == 1) {
                Thread.sleep(3000);//任务1耗时3秒
            } else if (i == 5) {
                Thread.sleep(5000);//任务5耗时5秒
            } else {
                Thread.sleep(1000);//其它任务耗时1秒
            }
            System.out.println(LocalDateTime.now()+"---task线程:" + Thread.currentThread().getName()
                    + "任务i=" + i + ",完成!" );
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }
}
View Code

2.8、常用多线程并发,取结果归集的几种实现方案

描述 Future FutureTask CompletionService CompletableFuture
原理 Future 接口 接口 RunnableFuture 的唯一实现类,RunnableFuture 接口继承自 Future+Runnable 内部通过阻塞队列 + FutureTask 接口 JDK8 实现了 Future, CompletionStage 两个接口
多任务并发执行 支持 支持 支持 支持
获取任务结果的顺序 按照提交顺序获取结果 未知 支持任务完成的先后顺序 支持任务完成的先后顺序
异常捕捉 自己捕捉 自己捕捉 自己捕捉 原生 API 支持,返回每个任务的异常
建议 CPU 高速轮询,耗资源,或者阻塞,可以使用,但不推荐 功能不对口,并发任务这一块多套一层,不推荐使用 推荐使用,没有 JDK8CompletableFuture 之前最好的方案 API 极端丰富,配合流式编程,推荐使用!

上表来源:https://www.cnblogs.com/dennyzhangdd/p/7010972.html

 

 

   

CompletableFuture CompletableFuture.supplyAsync 异常处理

CompletableFuture CompletableFuture.supplyAsync 异常处理

CompletableFuture 异常处理completeExceptionally可以把异常抛到主线程
/**
 * User: laizhenwei
 * Date: 2018-01-30 Time: 22:26
 * Description:
 */
@RunWith(SpringRunner.class)
//@SpringBootTest
public class CompletableFutureTests {

    @Test
    public void testMethod() {

        String[] orders = {"1", "2", "3", "4", "5", "6"};

        List<CompletableFuture<Boolean>> futures = new ArrayList<>();

        Arrays.stream(orders).forEach(id -> {
            try{
                futures.add(submitAsync(id));
            }catch (Exception ex){
                System.out.println(ex);
            }
        });

        futures.stream().forEach(f-> {
            try {
                System.out.println(f.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    private static Boolean submit(String order) {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        throw new RuntimeException("抛一个异常" + order);
    }

    private static CompletableFuture<Boolean> submitAsync(String order) {
        CompletableFuture<Boolean> future = new CompletableFuture<>();
        new Thread(() -> {
            try {
                Boolean result = submit(order);
                future.complete(result);
            } catch (Exception ex) {
                future.completeExceptionally(ex);
            }
        }).start();
        return future;
    }

}

 

使用 CompletableFuture.supplyAsync  简化代码 加入线程池,exceptionally处理异常

/**
 * User: laizhenwei
 * Date: 2018-01-30 Time: 22:26
 * Description:
 */
@RunWith(SpringRunner.class)
//@SpringBootTest
public class CompletableFutureTests {

    ExecutorService executor = Executors.newFixedThreadPool(3);

    @Test
    public void testMethod() {
        String[] orders = {"1", "2", "3", "4", "5", "6"};
        Arrays.stream(orders).forEach(id -> CompletableFuture.supplyAsync(() -> submit(id), executor).exceptionally(e -> {
            System.out.println(e);
            return false;
        }));

        executor.shutdown();
        while (!executor.isTerminated()) {
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static Boolean submit(String order) {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        throw new RuntimeException("抛一个异常" + order);
    }

}

 

CompletableFuture 源码详解之 java.util.concurrent.CompletableFuture#supplyAsync (java.util.function.Suppl...

CompletableFuture 源码详解之 java.util.concurrent.CompletableFuture#supplyAsync (java.util.function.Suppl...

/**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the {@link ForkJoinPool#commonPool()} with
     * the value obtained by calling the given Supplier.
     *
     * @param supplier a function returning the value to be used
     * to complete the returned CompletableFuture
     * @param <U> the function''s return type
     * @return the new CompletableFuture
     */
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

  CompletableFuture 下的 supplyAsync 方法是一个执行异步任务且有返回结果的任务,使用例子如下:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * 测试类
 * @author yangchangkui
 */
public class TestMy {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        });
        //判断是否已经完成
        System.out.println("耗时:"+(System.currentTimeMillis()-start)+",isDone:"+completableFuture.isDone());
        //阻塞获取结果
        String result = completableFuture.get();
        System.out.println("耗时:"+(System.currentTimeMillis()-start)+",result:"+result);
        //判断是否已经完成
        System.out.println("耗时:"+(System.currentTimeMillis()-start)+",isDone:"+completableFuture.isDone());

    }
}

  执行结果如下,显然,达到了异步执行的效果,比如在一些调用很多外部接口的聚合接口,只要接口不作为入参,那就可以进行异步执行,最后阻塞拿结果,提高接口的 QPS,提高系统性能。

 

CompletableFuture.allOf that doens''t return Void (CompletableFuture.allOf 不能返回 Void 的解决方法)

CompletableFuture.allOf that doens''t return Void (CompletableFuture.allOf 不能返回 Void 的解决方法)

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class Main {

    static CompletableFuture<List<?>> allOf(CompletableFuture<?>... cfs) {
        return CompletableFuture.allOf(cfs)
                .thenApply(ignore -> Stream.of(cfs)
                        .map(cf -> cf.join())
                        .collect(Collectors.toList()));
    }

    public static void main(String[] args) {

        // we have 3 (or any number) of CompletableFutures
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {sleep(1); return "HELLO";});
        CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {sleep(1); return 10;});
        CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(() -> {sleep(1); return 20d;});

        CompletableFuture<List<?>> allOf = allOf(cf1, cf2, cf3); //we call the method we just created above

        // we can get the -already there - result either using then
        // Get result using:
        allOf.thenAccept(l -> l.forEach(System.out::println));

        // or using CompletableFuture.join() (or CompletableFuture.get())
        // OR (non-typesafe)
        String s = (String) allOf.join().get(0);
        Integer i = (Integer) allOf.join().get(1);
        Double d = (Double) allOf.join().get(2);
        System.out.println(s + ", " + i + ", "+ d);

        sleep(2); // because default CompletableFuture Executor is a daemon-thread based executor
    }

    private static void sleep(int seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

摘录地址:http://m-hewedy.blogspot.com/2017/02/completablefutureallof-that-doenst.html

我们今天的关于将Java Future转换为CompletableFuturejava future completable future的分享已经告一段落,感谢您的关注,如果您想了解更多关于005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture、CompletableFuture CompletableFuture.supplyAsync 异常处理、CompletableFuture 源码详解之 java.util.concurrent.CompletableFuture#supplyAsync (java.util.function.Suppl...、CompletableFuture.allOf that doens''t return Void (CompletableFuture.allOf 不能返回 Void 的解决方法)的相关信息,请在本站查询。

本文标签: